From b08f96360150da9dfbf8e379722612c7af468952 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Fri, 22 Apr 2022 01:49:24 -0700 Subject: [PATCH] Fix #4287: Remove task updates from activity feed; Fix #4291: Test connection send better error msg (#4352) --- .../catalog/airflow/AirflowRESTClient.java | 40 ++++++++++--------- .../catalog/jdbi3/PipelineRepository.java | 14 ++++--- .../pipelines/PipelineResourceTest.java | 8 +++- 3 files changed, 36 insertions(+), 26 deletions(-) diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java index d050a9b4527..7755464dc87 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java @@ -80,6 +80,7 @@ public class AirflowRESTClient { } public String deploy(IngestionPipeline ingestionPipeline) { + HttpResponse response; try { String token = authenticate(); String authToken = String.format(AUTH_TOKEN, token); @@ -93,17 +94,18 @@ public class AirflowRESTClient { .header(AUTH_HEADER, authToken) .POST(HttpRequest.BodyPublishers.ofString(pipelinePayload)) .build(); - HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString()); + response = client.send(request, HttpResponse.BodyHandlers.ofString()); if (response.statusCode() == 200) { return response.body(); } - throw new AirflowException( - String.format( - "%s Failed to deploy Ingestion Pipeline due to airflow API returned %s and response %s", - ingestionPipeline.getName(), Response.Status.fromStatusCode(response.statusCode()), response.body())); } catch (Exception e) { throw IngestionPipelineDeploymentException.byMessage(ingestionPipeline.getName(), e.getMessage()); } + + throw new AirflowException( + String.format( + "%s Failed to deploy Ingestion Pipeline due to airflow API returned %s and response %s", + ingestionPipeline.getName(), Response.Status.fromStatusCode(response.statusCode()), response.body())); } public String deletePipeline(String pipelineName) { @@ -129,6 +131,7 @@ public class AirflowRESTClient { } public String runPipeline(String pipelineName) { + HttpResponse response; try { String token = authenticate(); String authToken = String.format(AUTH_TOKEN, token); @@ -142,19 +145,20 @@ public class AirflowRESTClient { .header(AUTH_HEADER, authToken) .POST(HttpRequest.BodyPublishers.ofString(requestPayload.toString())) .build(); - HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString()); + response = client.send(request, HttpResponse.BodyHandlers.ofString()); if (response.statusCode() == 200) { return response.body(); } - - throw IngestionPipelineDeploymentException.byMessage( - pipelineName, "Failed to trigger IngestionPipeline", Response.Status.fromStatusCode(response.statusCode())); } catch (Exception e) { throw IngestionPipelineDeploymentException.byMessage(pipelineName, e.getMessage()); } + + throw IngestionPipelineDeploymentException.byMessage( + pipelineName, "Failed to trigger IngestionPipeline", Response.Status.fromStatusCode(response.statusCode())); } public IngestionPipeline getStatus(IngestionPipeline ingestionPipeline) { + HttpResponse response; try { String token = authenticate(); String authToken = String.format(AUTH_TOKEN, token); @@ -167,7 +171,7 @@ public class AirflowRESTClient { .header(AUTH_HEADER, authToken) .POST(HttpRequest.BodyPublishers.ofString(requestPayload.toString())) .build(); - HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString()); + response = client.send(request, HttpResponse.BodyHandlers.ofString()); if (response.statusCode() == 200) { List statuses = JsonUtils.readObjects(response.body(), PipelineStatus.class); ingestionPipeline.setPipelineStatuses(statuses); @@ -176,17 +180,17 @@ public class AirflowRESTClient { } else if (response.statusCode() == 404) { ingestionPipeline.setDeployed(false); } - - throw AirflowException.byMessage( - ingestionPipeline.getName(), - "Failed to fetch ingestion pipeline runs", - Response.Status.fromStatusCode(response.statusCode())); } catch (Exception e) { throw AirflowException.byMessage(ingestionPipeline.getName(), e.getMessage()); } + throw AirflowException.byMessage( + ingestionPipeline.getName(), + "Failed to fetch ingestion pipeline runs", + Response.Status.fromStatusCode(response.statusCode())); } public HttpResponse testConnection(TestServiceConnection testServiceConnection) { + HttpResponse response; try { String token = authenticate(); String authToken = String.format(AUTH_TOKEN, token); @@ -199,14 +203,14 @@ public class AirflowRESTClient { .header(AUTH_HEADER, authToken) .POST(HttpRequest.BodyPublishers.ofString(connectionPayload)) .build(); - HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString()); + response = client.send(request, HttpResponse.BodyHandlers.ofString()); if (response.statusCode() == 200) { return response; } - throw AirflowException.byMessage( - "Failed to test connection.", String.valueOf(Response.Status.fromStatusCode(response.statusCode()))); + } catch (Exception e) { throw AirflowException.byMessage("Failed to test connection.", e.getMessage()); } + throw AirflowException.byMessage("Failed to test connection.", response.toString()); } } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java index 5fac3af1869..121973a37b3 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java @@ -366,7 +366,6 @@ public class PipelineRepository extends EntityRepository { recordChange("pipelineUrl", origPipeline.getPipelineUrl(), updatedPipeline.getPipelineUrl()); recordChange("concurrency", origPipeline.getConcurrency(), updatedPipeline.getConcurrency()); recordChange("pipelineLocation", origPipeline.getPipelineLocation(), updatedPipeline.getPipelineLocation()); - recordChange("startDate", origPipeline.getStartDate(), updatedPipeline.getStartDate()); } private void updateTasks(Pipeline origPipeline, Pipeline updatedPipeline) throws JsonProcessingException { @@ -384,19 +383,22 @@ public class PipelineRepository extends EntityRepository { List updatedTasks = listOrEmpty(updatedPipeline.getTasks()); List origTasks = listOrEmpty(origPipeline.getTasks()); - List added = new ArrayList<>(); - List deleted = new ArrayList<>(); - recordListChange("tasks", origTasks, updatedTasks, added, deleted, taskMatch); - + boolean newTasks = false; // Update the task descriptions for (Task updated : updatedTasks) { Task stored = origTasks.stream().filter(c -> taskMatch.test(c, updated)).findAny().orElse(null); if (stored == null || updated == null) { // New task added + newTasks = true; continue; } - updateTaskDescription(stored, updated); } + + if (newTasks) { + List added = new ArrayList<>(); + List deleted = new ArrayList<>(); + recordListChange("tasks", origTasks, updatedTasks, added, deleted, taskMatch); + } } private void updateTaskDescription(Task origTask, Task updatedTask) throws JsonProcessingException { diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/pipelines/PipelineResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/pipelines/PipelineResourceTest.java index 0365a9d601a..a802351f1a6 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/pipelines/PipelineResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/pipelines/PipelineResourceTest.java @@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.openmetadata.catalog.util.TestUtils.ADMIN_AUTH_HEADERS; import static org.openmetadata.catalog.util.TestUtils.UpdateType.MINOR_UPDATE; +import static org.openmetadata.catalog.util.TestUtils.UpdateType.NO_CHANGE; import static org.openmetadata.catalog.util.TestUtils.assertListNotNull; import static org.openmetadata.catalog.util.TestUtils.assertListNull; import static org.openmetadata.catalog.util.TestUtils.assertResponseContains; @@ -241,6 +242,7 @@ public class PipelineResourceTest extends EntityResourceTest updatedTasks = Stream.concat(TASKS.stream(), tasks.stream()).collect(Collectors.toList()); - updateAndCheckEntity(request.withTasks(updatedTasks), OK, ADMIN_AUTH_HEADERS, MINOR_UPDATE, change); + pipeline = updateAndCheckEntity(request.withTasks(updatedTasks), OK, ADMIN_AUTH_HEADERS, MINOR_UPDATE, change); + change = getChangeDescription(pipeline.getVersion()); + // create a request with same tasks we shouldn't see any change + updateAndCheckEntity(request.withTasks(updatedTasks), OK, ADMIN_AUTH_HEADERS, NO_CHANGE, change); } @Test