diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java index 922eb9a635b..e58a56dbaf1 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java @@ -26,8 +26,6 @@ import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.UUID; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.entity.data.Pipeline; @@ -328,19 +326,20 @@ public class PipelineRepository extends EntityRepository { } private void updateTasks(Pipeline origPipeline, Pipeline updatedPipeline) throws JsonProcessingException { - // Airflow lineage backend gets executed per task in a DAG. This means we will not a get full picture of the - // pipeline in each call. Hence, we may create a pipeline and add a single task when one task finishes in a - // pipeline in the next task run we may have to update. To take care of this we will merge the tasks + // While the Airflow lineage only gets executed for one Task at a time, we will consider the + // client Task information as the source of truth. This means that at each update, we will + // expect to receive all the tasks known until that point. + + // The lineage backend will take care of controlling new & deleted tasks, while passing to the + // API the full list of Tasks to consider for a given Pipeline. Having a single point of control + // of the Tasks and their status, simplifies the logic on how to add/delete tasks. + + // The API will only take care of marking tasks as added/updated/deleted based on the original + // and incoming changes. + List updatedTasks = Optional.ofNullable(updatedPipeline.getTasks()).orElse(Collections.emptyList()); List origTasks = Optional.ofNullable(origPipeline.getTasks()).orElse(Collections.emptyList()); - // Merge the tasks - updatedTasks = - new ArrayList<>( - Stream.concat(origTasks.stream(), updatedTasks.stream()) - .collect(Collectors.groupingBy(Task::getName, Collectors.reducing(null, (t1, t2) -> t2))) - .values()); - List added = new ArrayList<>(); List deleted = new ArrayList<>(); recordListChange("tasks", origTasks, updatedTasks, added, deleted, taskMatch); diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/pipelines/PipelineResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/pipelines/PipelineResourceTest.java index f0e1ea62cb9..81f57bb28b9 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/pipelines/PipelineResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/pipelines/PipelineResourceTest.java @@ -27,11 +27,14 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.ws.rs.client.WebTarget; import lombok.extern.slf4j.Slf4j; import org.apache.http.client.HttpResponseException; @@ -245,7 +248,32 @@ public class PipelineResourceTest extends EntityResourceTest updatedTasks = Stream.concat(TASKS.stream(), tasks.stream()).collect(Collectors.toList()); + updateAndCheckEntity(request.withTasks(updatedTasks), OK, ADMIN_AUTH_HEADERS, MINOR_UPDATE, change); + } + + @Test + void put_PipelineTasksOverride_200(TestInfo test) throws IOException, URISyntaxException { + // A PUT operation with a new Task should override the current tasks in the Pipeline + // This change will always be minor, both with deletes/adds + CreatePipeline request = createRequest(test).withService(AIRFLOW_REFERENCE); + Pipeline pipeline = createAndCheckEntity(request, ADMIN_AUTH_HEADERS); + + List newTask = + Collections.singletonList( + new Task() + .withName("newTask") + .withDescription("description") + .withDisplayName("displayName") + .withTaskUrl(new URI("http://localhost:0"))); + + ChangeDescription change = getChangeDescription(pipeline.getVersion()); + change.getFieldsAdded().add(new FieldChange().withName("tasks").withNewValue(newTask)); + change.getFieldsDeleted().add(new FieldChange().withName("tasks").withOldValue(TASKS)); + + updateAndCheckEntity(request.withTasks(newTask), OK, ADMIN_AUTH_HEADERS, MINOR_UPDATE, change); } @Test @@ -261,8 +289,11 @@ public class PipelineResourceTest extends EntityResourceTest updatedTasks = Stream.concat(TASKS.stream(), tasks.stream()).collect(Collectors.toList()); + pipeline.setTasks(updatedTasks); pipeline.setDescription("newDescription"); - pipeline.setTasks(tasks); pipeline = patchEntityAndCheck(pipeline, origJson, ADMIN_AUTH_HEADERS, MINOR_UPDATE, change); // add a description to an existing task @@ -274,7 +305,9 @@ public class PipelineResourceTest extends EntityResourceTest updatedNewTasks = Stream.concat(TASKS.stream(), newTasks.stream()).collect(Collectors.toList()); + pipeline.setTasks(updatedNewTasks); pipeline = patchEntityAndCheck(pipeline, origJson, ADMIN_AUTH_HEADERS, MINOR_UPDATE, change); // update the descriptions of pipeline and task @@ -293,7 +326,9 @@ public class PipelineResourceTest extends EntityResourceTest