Fix #2317: Override Pipeline tasks from client

This commit is contained in:
Pere Miquel Brull 2022-02-06 19:17:08 +01:00 committed by GitHub
parent e6343a79d7
commit c2ad7f6373
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 54 additions and 18 deletions

View File

@ -26,8 +26,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.data.Pipeline;
@ -328,19 +326,20 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
}
private void updateTasks(Pipeline origPipeline, Pipeline updatedPipeline) throws JsonProcessingException {
// Airflow lineage backend gets executed per task in a DAG. This means we will not a get full picture of the
// pipeline in each call. Hence, we may create a pipeline and add a single task when one task finishes in a
// pipeline in the next task run we may have to update. To take care of this we will merge the tasks
// While the Airflow lineage only gets executed for one Task at a time, we will consider the
// client Task information as the source of truth. This means that at each update, we will
// expect to receive all the tasks known until that point.
// The lineage backend will take care of controlling new & deleted tasks, while passing to the
// API the full list of Tasks to consider for a given Pipeline. Having a single point of control
// of the Tasks and their status, simplifies the logic on how to add/delete tasks.
// The API will only take care of marking tasks as added/updated/deleted based on the original
// and incoming changes.
List<Task> updatedTasks = Optional.ofNullable(updatedPipeline.getTasks()).orElse(Collections.emptyList());
List<Task> origTasks = Optional.ofNullable(origPipeline.getTasks()).orElse(Collections.emptyList());
// Merge the tasks
updatedTasks =
new ArrayList<>(
Stream.concat(origTasks.stream(), updatedTasks.stream())
.collect(Collectors.groupingBy(Task::getName, Collectors.reducing(null, (t1, t2) -> t2)))
.values());
List<Task> added = new ArrayList<>();
List<Task> deleted = new ArrayList<>();
recordListChange("tasks", origTasks, updatedTasks, added, deleted, taskMatch);

View File

@ -27,11 +27,14 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.ws.rs.client.WebTarget;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.HttpResponseException;
@ -245,7 +248,32 @@ public class PipelineResourceTest extends EntityResourceTest<Pipeline, CreatePip
Task taskEmptyDesc = new Task().withName("taskEmpty").withTaskUrl(new URI("http://localhost:0"));
tasks.add(taskEmptyDesc);
change.getFieldsAdded().add(new FieldChange().withName("tasks").withNewValue(tasks));
updateAndCheckEntity(request.withTasks(tasks), OK, ADMIN_AUTH_HEADERS, MINOR_UPDATE, change);
// Create new request with all the Tasks
List<Task> updatedTasks = Stream.concat(TASKS.stream(), tasks.stream()).collect(Collectors.toList());
updateAndCheckEntity(request.withTasks(updatedTasks), OK, ADMIN_AUTH_HEADERS, MINOR_UPDATE, change);
}
@Test
void put_PipelineTasksOverride_200(TestInfo test) throws IOException, URISyntaxException {
// A PUT operation with a new Task should override the current tasks in the Pipeline
// This change will always be minor, both with deletes/adds
CreatePipeline request = createRequest(test).withService(AIRFLOW_REFERENCE);
Pipeline pipeline = createAndCheckEntity(request, ADMIN_AUTH_HEADERS);
List<Task> newTask =
Collections.singletonList(
new Task()
.withName("newTask")
.withDescription("description")
.withDisplayName("displayName")
.withTaskUrl(new URI("http://localhost:0")));
ChangeDescription change = getChangeDescription(pipeline.getVersion());
change.getFieldsAdded().add(new FieldChange().withName("tasks").withNewValue(newTask));
change.getFieldsDeleted().add(new FieldChange().withName("tasks").withOldValue(TASKS));
updateAndCheckEntity(request.withTasks(newTask), OK, ADMIN_AUTH_HEADERS, MINOR_UPDATE, change);
}
@Test
@ -261,8 +289,11 @@ public class PipelineResourceTest extends EntityResourceTest<Pipeline, CreatePip
tasks.add(taskEmptyDesc);
change.getFieldsAdded().add(new FieldChange().withName("tasks").withNewValue(tasks));
change.getFieldsAdded().add(new FieldChange().withName("description").withNewValue("newDescription"));
// Create new request with all the Tasks
List<Task> updatedTasks = Stream.concat(TASKS.stream(), tasks.stream()).collect(Collectors.toList());
pipeline.setTasks(updatedTasks);
pipeline.setDescription("newDescription");
pipeline.setTasks(tasks);
pipeline = patchEntityAndCheck(pipeline, origJson, ADMIN_AUTH_HEADERS, MINOR_UPDATE, change);
// add a description to an existing task
@ -274,7 +305,9 @@ public class PipelineResourceTest extends EntityResourceTest<Pipeline, CreatePip
change
.getFieldsAdded()
.add(new FieldChange().withName("tasks.taskEmpty.description").withNewValue("taskDescription"));
pipeline.setTasks(newTasks);
List<Task> updatedNewTasks = Stream.concat(TASKS.stream(), newTasks.stream()).collect(Collectors.toList());
pipeline.setTasks(updatedNewTasks);
pipeline = patchEntityAndCheck(pipeline, origJson, ADMIN_AUTH_HEADERS, MINOR_UPDATE, change);
// update the descriptions of pipeline and task
@ -293,7 +326,9 @@ public class PipelineResourceTest extends EntityResourceTest<Pipeline, CreatePip
change
.getFieldsUpdated()
.add(new FieldChange().withName("description").withOldValue("newDescription").withNewValue("newDescription2"));
pipeline.setTasks(newTasks);
updatedNewTasks = Stream.concat(TASKS.stream(), newTasks.stream()).collect(Collectors.toList());
pipeline.setTasks(updatedNewTasks);
pipeline.setDescription("newDescription2");
pipeline = patchEntityAndCheck(pipeline, origJson, ADMIN_AUTH_HEADERS, MINOR_UPDATE, change);
@ -313,7 +348,9 @@ public class PipelineResourceTest extends EntityResourceTest<Pipeline, CreatePip
change
.getFieldsDeleted()
.add(new FieldChange().withName("description").withOldValue("newDescription2").withNewValue(null));
pipeline.setTasks(newTasks);
updatedNewTasks = Stream.concat(TASKS.stream(), newTasks.stream()).collect(Collectors.toList());
pipeline.setTasks(updatedNewTasks);
pipeline.setDescription(null);
patchEntityAndCheck(pipeline, origJson, ADMIN_AUTH_HEADERS, MINOR_UPDATE, change);
}

View File

@ -122,7 +122,7 @@ test = {
"isort",
"pre-commit",
"pylint",
"pytest",
"pytest==6.2.5",
"pytest-cov",
"faker",
"coverage",