Fix #4287: Remove task updates from activity feed; Fix #4291: Test connection send better error msg (#4352)

This commit is contained in:
Sriharsha Chintalapani 2022-04-22 01:49:24 -07:00 committed by GitHub
parent fa0d386bd6
commit b08f963601
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 36 additions and 26 deletions

View File

@ -80,6 +80,7 @@ public class AirflowRESTClient {
}
public String deploy(IngestionPipeline ingestionPipeline) {
HttpResponse<String> response;
try {
String token = authenticate();
String authToken = String.format(AUTH_TOKEN, token);
@ -93,17 +94,18 @@ public class AirflowRESTClient {
.header(AUTH_HEADER, authToken)
.POST(HttpRequest.BodyPublishers.ofString(pipelinePayload))
.build();
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
response = client.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 200) {
return response.body();
}
throw new AirflowException(
String.format(
"%s Failed to deploy Ingestion Pipeline due to airflow API returned %s and response %s",
ingestionPipeline.getName(), Response.Status.fromStatusCode(response.statusCode()), response.body()));
} catch (Exception e) {
throw IngestionPipelineDeploymentException.byMessage(ingestionPipeline.getName(), e.getMessage());
}
throw new AirflowException(
String.format(
"%s Failed to deploy Ingestion Pipeline due to airflow API returned %s and response %s",
ingestionPipeline.getName(), Response.Status.fromStatusCode(response.statusCode()), response.body()));
}
public String deletePipeline(String pipelineName) {
@ -129,6 +131,7 @@ public class AirflowRESTClient {
}
public String runPipeline(String pipelineName) {
HttpResponse<String> response;
try {
String token = authenticate();
String authToken = String.format(AUTH_TOKEN, token);
@ -142,19 +145,20 @@ public class AirflowRESTClient {
.header(AUTH_HEADER, authToken)
.POST(HttpRequest.BodyPublishers.ofString(requestPayload.toString()))
.build();
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
response = client.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 200) {
return response.body();
}
throw IngestionPipelineDeploymentException.byMessage(
pipelineName, "Failed to trigger IngestionPipeline", Response.Status.fromStatusCode(response.statusCode()));
} catch (Exception e) {
throw IngestionPipelineDeploymentException.byMessage(pipelineName, e.getMessage());
}
throw IngestionPipelineDeploymentException.byMessage(
pipelineName, "Failed to trigger IngestionPipeline", Response.Status.fromStatusCode(response.statusCode()));
}
public IngestionPipeline getStatus(IngestionPipeline ingestionPipeline) {
HttpResponse<String> response;
try {
String token = authenticate();
String authToken = String.format(AUTH_TOKEN, token);
@ -167,7 +171,7 @@ public class AirflowRESTClient {
.header(AUTH_HEADER, authToken)
.POST(HttpRequest.BodyPublishers.ofString(requestPayload.toString()))
.build();
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
response = client.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 200) {
List<PipelineStatus> statuses = JsonUtils.readObjects(response.body(), PipelineStatus.class);
ingestionPipeline.setPipelineStatuses(statuses);
@ -176,17 +180,17 @@ public class AirflowRESTClient {
} else if (response.statusCode() == 404) {
ingestionPipeline.setDeployed(false);
}
throw AirflowException.byMessage(
ingestionPipeline.getName(),
"Failed to fetch ingestion pipeline runs",
Response.Status.fromStatusCode(response.statusCode()));
} catch (Exception e) {
throw AirflowException.byMessage(ingestionPipeline.getName(), e.getMessage());
}
throw AirflowException.byMessage(
ingestionPipeline.getName(),
"Failed to fetch ingestion pipeline runs",
Response.Status.fromStatusCode(response.statusCode()));
}
public HttpResponse<String> testConnection(TestServiceConnection testServiceConnection) {
HttpResponse<String> response;
try {
String token = authenticate();
String authToken = String.format(AUTH_TOKEN, token);
@ -199,14 +203,14 @@ public class AirflowRESTClient {
.header(AUTH_HEADER, authToken)
.POST(HttpRequest.BodyPublishers.ofString(connectionPayload))
.build();
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
response = client.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 200) {
return response;
}
throw AirflowException.byMessage(
"Failed to test connection.", String.valueOf(Response.Status.fromStatusCode(response.statusCode())));
} catch (Exception e) {
throw AirflowException.byMessage("Failed to test connection.", e.getMessage());
}
throw AirflowException.byMessage("Failed to test connection.", response.toString());
}
}

View File

@ -366,7 +366,6 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
recordChange("pipelineUrl", origPipeline.getPipelineUrl(), updatedPipeline.getPipelineUrl());
recordChange("concurrency", origPipeline.getConcurrency(), updatedPipeline.getConcurrency());
recordChange("pipelineLocation", origPipeline.getPipelineLocation(), updatedPipeline.getPipelineLocation());
recordChange("startDate", origPipeline.getStartDate(), updatedPipeline.getStartDate());
}
private void updateTasks(Pipeline origPipeline, Pipeline updatedPipeline) throws JsonProcessingException {
@ -384,19 +383,22 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
List<Task> updatedTasks = listOrEmpty(updatedPipeline.getTasks());
List<Task> origTasks = listOrEmpty(origPipeline.getTasks());
List<Task> added = new ArrayList<>();
List<Task> deleted = new ArrayList<>();
recordListChange("tasks", origTasks, updatedTasks, added, deleted, taskMatch);
boolean newTasks = false;
// Update the task descriptions
for (Task updated : updatedTasks) {
Task stored = origTasks.stream().filter(c -> taskMatch.test(c, updated)).findAny().orElse(null);
if (stored == null || updated == null) { // New task added
newTasks = true;
continue;
}
updateTaskDescription(stored, updated);
}
if (newTasks) {
List<Task> added = new ArrayList<>();
List<Task> deleted = new ArrayList<>();
recordListChange("tasks", origTasks, updatedTasks, added, deleted, taskMatch);
}
}
private void updateTaskDescription(Task origTask, Task updatedTask) throws JsonProcessingException {

View File

@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.openmetadata.catalog.util.TestUtils.ADMIN_AUTH_HEADERS;
import static org.openmetadata.catalog.util.TestUtils.UpdateType.MINOR_UPDATE;
import static org.openmetadata.catalog.util.TestUtils.UpdateType.NO_CHANGE;
import static org.openmetadata.catalog.util.TestUtils.assertListNotNull;
import static org.openmetadata.catalog.util.TestUtils.assertListNull;
import static org.openmetadata.catalog.util.TestUtils.assertResponseContains;
@ -241,6 +242,7 @@ public class PipelineResourceTest extends EntityResourceTest<Pipeline, CreatePip
ChangeDescription change = getChangeDescription(pipeline.getVersion());
change.getFieldsAdded().add(new FieldChange().withName("description").withNewValue("newDescription"));
change.getFieldsAdded().add(new FieldChange().withName("tasks").withNewValue(TASKS));
pipeline =
updateAndCheckEntity(
request.withDescription("newDescription").withTasks(TASKS), OK, ADMIN_AUTH_HEADERS, MINOR_UPDATE, change);
@ -251,10 +253,12 @@ public class PipelineResourceTest extends EntityResourceTest<Pipeline, CreatePip
Task taskEmptyDesc = new Task().withName("taskEmpty").withTaskUrl(new URI("http://localhost:0"));
tasks.add(taskEmptyDesc);
change.getFieldsAdded().add(new FieldChange().withName("tasks").withNewValue(tasks));
// Create new request with all the Tasks
List<Task> updatedTasks = Stream.concat(TASKS.stream(), tasks.stream()).collect(Collectors.toList());
updateAndCheckEntity(request.withTasks(updatedTasks), OK, ADMIN_AUTH_HEADERS, MINOR_UPDATE, change);
pipeline = updateAndCheckEntity(request.withTasks(updatedTasks), OK, ADMIN_AUTH_HEADERS, MINOR_UPDATE, change);
change = getChangeDescription(pipeline.getVersion());
// create a request with same tasks we shouldn't see any change
updateAndCheckEntity(request.withTasks(updatedTasks), OK, ADMIN_AUTH_HEADERS, NO_CHANGE, change);
}
@Test