diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java index 8224e14a056..46638156abe 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java @@ -344,7 +344,7 @@ public class PipelineRepository extends EntityRepository { // Update the task descriptions for (Task updated : updatedTasks) { Task stored = origTasks.stream().filter(c -> taskMatch.test(c, updated)).findAny().orElse(null); - if (stored == null) { // New task added + if (stored == null || updated == null) { // New task added continue; } @@ -359,7 +359,9 @@ public class PipelineRepository extends EntityRepository { return; } // Don't record a change if descriptions are the same - if (!origTask.getDescription().equals(updatedTask.getDescription())) { + if (origTask != null + && ((origTask.getDescription() != null && !origTask.getDescription().equals(updatedTask.getDescription())) + || updatedTask.getDescription() != null)) { recordChange( "tasks." + origTask.getName() + ".description", origTask.getDescription(), updatedTask.getDescription()); } diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/pipelines/PipelineResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/pipelines/PipelineResourceTest.java index 9e15b61053e..2223f874af0 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/pipelines/PipelineResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/pipelines/PipelineResourceTest.java @@ -147,7 +147,7 @@ public class PipelineResourceTest extends EntityResourceTest { if (expected == null && actual == null) { return; } - if (fieldName.contains("tasks")) { + if (fieldName.contains("tasks") && !fieldName.contains(".")) { List expectedTasks = (List) expected; List actualTasks = JsonUtils.readObjects(actual.toString(), Task.class); assertEquals(expectedTasks, actualTasks); @@ -244,7 +244,7 @@ public class PipelineResourceTest extends EntityResourceTest { } @Test - void put_PipelineTasksUpdate_200(TestInfo test) throws IOException { + void put_PipelineTasksUpdate_200(TestInfo test) throws IOException, URISyntaxException { CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription(null); Pipeline pipeline = createAndCheckEntity(request, adminAuthHeaders()); @@ -252,8 +252,87 @@ public class PipelineResourceTest extends EntityResourceTest { ChangeDescription change = getChangeDescription(pipeline.getVersion()); change.getFieldsAdded().add(new FieldChange().withName("description").withNewValue("newDescription")); change.getFieldsAdded().add(new FieldChange().withName("tasks").withNewValue(TASKS)); - updateAndCheckEntity( - request.withDescription("newDescription").withTasks(TASKS), OK, adminAuthHeaders(), MINOR_UPDATE, change); + pipeline = + updateAndCheckEntity( + request.withDescription("newDescription").withTasks(TASKS), OK, adminAuthHeaders(), MINOR_UPDATE, change); + + // Add a task without description + change = getChangeDescription(pipeline.getVersion()); + List tasks = new ArrayList<>(); + Task taskEmptyDesc = new Task().withName("taskEmpty").withTaskUrl(new URI("http://localhost:0")); + tasks.add(taskEmptyDesc); + change.getFieldsAdded().add(new FieldChange().withName("tasks").withNewValue(tasks)); + updateAndCheckEntity(request.withTasks(tasks), OK, adminAuthHeaders(), MINOR_UPDATE, change); + } + + @Test + void patch_PipelineTasksUpdate_200_ok(TestInfo test) throws IOException, URISyntaxException { + CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE); + Pipeline pipeline = createAndCheckEntity(request, adminAuthHeaders()); + + String origJson = JsonUtils.pojoToJson(pipeline); + // Add a task without description + ChangeDescription change = getChangeDescription(pipeline.getVersion()); + List tasks = new ArrayList<>(); + Task taskEmptyDesc = new Task().withName("taskEmpty").withTaskUrl(new URI("http://localhost:0")); + tasks.add(taskEmptyDesc); + change.getFieldsAdded().add(new FieldChange().withName("tasks").withNewValue(tasks)); + change.getFieldsAdded().add(new FieldChange().withName("description").withNewValue("newDescription")); + pipeline.setDescription("newDescription"); + pipeline.setTasks(tasks); + pipeline = patchEntityAndCheck(pipeline, origJson, adminAuthHeaders(), MINOR_UPDATE, change); + + // add a description to an existing task + origJson = JsonUtils.pojoToJson(pipeline); + change = getChangeDescription(pipeline.getVersion()); + List newTasks = new ArrayList<>(); + Task taskWithDesc = taskEmptyDesc.withDescription("taskDescription"); + newTasks.add(taskWithDesc); + change + .getFieldsAdded() + .add(new FieldChange().withName("tasks.taskEmpty.description").withNewValue("taskDescription")); + pipeline.setTasks(newTasks); + pipeline = patchEntityAndCheck(pipeline, origJson, adminAuthHeaders(), MINOR_UPDATE, change); + + // update the descriptions of pipeline and task + origJson = JsonUtils.pojoToJson(pipeline); + change = getChangeDescription(pipeline.getVersion()); + newTasks = new ArrayList<>(); + taskWithDesc = taskEmptyDesc.withDescription("newTaskDescription"); + newTasks.add(taskWithDesc); + change + .getFieldsUpdated() + .add( + new FieldChange() + .withName("tasks.taskEmpty.description") + .withOldValue("taskDescription") + .withNewValue("newTaskDescription")); + change + .getFieldsUpdated() + .add(new FieldChange().withName("description").withOldValue("newDescription").withNewValue("newDescription2")); + pipeline.setTasks(newTasks); + pipeline.setDescription("newDescription2"); + pipeline = patchEntityAndCheck(pipeline, origJson, adminAuthHeaders(), MINOR_UPDATE, change); + + // delete task and pipeline description by setting them to null + origJson = JsonUtils.pojoToJson(pipeline); + change = getChangeDescription(pipeline.getVersion()); + newTasks = new ArrayList<>(); + Task taskWithoutDesc = taskEmptyDesc.withDescription(null); + newTasks.add(taskWithoutDesc); + change + .getFieldsDeleted() + .add( + new FieldChange() + .withName("tasks.taskEmpty.description") + .withOldValue("newTaskDescription") + .withNewValue(null)); + change + .getFieldsDeleted() + .add(new FieldChange().withName("description").withOldValue("newDescription2").withNewValue(null)); + pipeline.setTasks(newTasks); + pipeline.setDescription(null); + patchEntityAndCheck(pipeline, origJson, adminAuthHeaders(), MINOR_UPDATE, change); } @Test