[WIP] Fix #4610 - Pipeline Tasks removal (#4615)

Co-authored-by: Sriharsha Chintalapani <harsha@getcollate.io>
This commit is contained in:
Pere Miquel Brull 2022-05-08 06:10:35 +02:00 committed by GitHub
parent 5700f6d68e
commit 04421901c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 162 additions and 7 deletions

View File

@ -395,7 +395,9 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
updateTaskDescription(stored, updated); updateTaskDescription(stored, updated);
} }
if (newTasks) { boolean removedTasks = updatedTasks.size() < origTasks.size();
if (newTasks || removedTasks) {
List<Task> added = new ArrayList<>(); List<Task> added = new ArrayList<>();
List<Task> deleted = new ArrayList<>(); List<Task> deleted = new ArrayList<>();
recordListChange("tasks", origTasks, updatedTasks, added, deleted, taskMatch); recordListChange("tasks", origTasks, updatedTasks, added, deleted, taskMatch);

View File

@ -259,6 +259,13 @@ public class PipelineResourceTest extends EntityResourceTest<Pipeline, CreatePip
change = getChangeDescription(pipeline.getVersion()); change = getChangeDescription(pipeline.getVersion());
// create a request with same tasks we shouldn't see any change // create a request with same tasks we shouldn't see any change
updateAndCheckEntity(request.withTasks(updatedTasks), OK, ADMIN_AUTH_HEADERS, NO_CHANGE, change); updateAndCheckEntity(request.withTasks(updatedTasks), OK, ADMIN_AUTH_HEADERS, NO_CHANGE, change);
// create new request with few tasks removed
updatedTasks.remove(taskEmptyDesc);
change = getChangeDescription(pipeline.getVersion());
change.getFieldsDeleted().add(new FieldChange().withName("tasks").withOldValue(List.of(taskEmptyDesc)));
updateAndCheckEntity(request.withTasks(updatedTasks), OK, ADMIN_AUTH_HEADERS, MINOR_UPDATE, change);
pipeline = getPipeline(pipeline.getId(), "tasks", ADMIN_AUTH_HEADERS);
validateTasks(pipeline.getTasks(), updatedTasks);
} }
@Test @Test
@ -484,11 +491,24 @@ public class PipelineResourceTest extends EntityResourceTest<Pipeline, CreatePip
ADMIN_AUTH_HEADERS, ADMIN_AUTH_HEADERS,
MINOR_UPDATE, MINOR_UPDATE,
change); change);
// TODO update this once task removal is figured out
// remove a task assertEquals(3, pipeline.getTasks().size());
// TASKS.remove(0);
// change = getChangeDescription(pipeline.getVersion()).withFieldsUpdated(singletonList("tasks")); List<Task> new_tasks = new ArrayList<>();
// updateAndCheckEntity(request.withTasks(TASKS), OK, ADMIN_AUTH_HEADERS, MINOR_UPDATE, change); for (int i = 1; i < 3; i++) { // remove task0
Task task =
new Task()
.withName("task" + i)
.withDescription("description")
.withDisplayName("displayName")
.withTaskUrl(new URI("http://localhost:0"));
new_tasks.add(task);
}
request.setTasks(new_tasks);
change = getChangeDescription(pipeline.getVersion());
change.getFieldsUpdated().add(new FieldChange().withNewValue(new_tasks).withOldValue(TASKS));
pipeline = updateEntity(request, OK, ADMIN_AUTH_HEADERS);
assertEquals(2, pipeline.getTasks().size());
} }
@Override @Override

View File

@ -11,6 +11,10 @@
""" """
Test airflow lineage backend Test airflow lineage backend
These tests should be run with Airflow 2.1.4
Other airflow versions require a different way to
mock the DAG and Task runs.
""" """
from datetime import datetime, timedelta from datetime import datetime, timedelta
@ -301,3 +305,126 @@ class AirflowLineageTest(TestCase):
self.assertIn("group1.task1", {task.name for task in pipeline.tasks}) self.assertIn("group1.task1", {task.name for task in pipeline.tasks})
self.assertIn("group1.task2", {task.name for task in pipeline.tasks}) self.assertIn("group1.task2", {task.name for task in pipeline.tasks})
self.assertIn("end", {task.name for task in pipeline.tasks}) self.assertIn("end", {task.name for task in pipeline.tasks})
def test_clean_tasks(self):
"""
Check that we can safely remove tasks from a Pipeline
"""
with DAG(
"clean_test",
description="A lineage test DAG",
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
) as dag:
t1 = BashOperator( # Using BashOperator as a random example
task_id="task1",
bash_command="date",
)
t2 = BashOperator( # Using BashOperator as a random example
task_id="task2",
bash_command="sleep 5",
)
t1 >> t2
self.backend.send_lineage(
operator=dag.get_task("task1"),
context={
"dag": dag,
"task": dag.get_task("task1"),
"task_instance": TaskInstance(
task=dag.get_task("task1"),
execution_date=datetime.strptime(
"2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S"
),
state="running",
),
},
)
self.backend.send_lineage(
operator=dag.get_task("task2"),
context={
"dag": dag,
"task": dag.get_task("task2"),
"task_instance": TaskInstance(
task=dag.get_task("task2"),
execution_date=datetime.strptime(
"2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S"
),
state="running",
),
},
)
pipeline = self.metadata.get_by_name(
entity=Pipeline, fqdn="local_airflow_3.clean_test", fields=["tasks"]
)
self.assertIsNotNone(pipeline)
self.assertIn("task1", {task.name for task in pipeline.tasks})
self.assertIn("task2", {task.name for task in pipeline.tasks})
with DAG(
"clean_test",
description="A lineage test DAG",
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
) as dag:
t1 = BashOperator(
task_id="task1",
bash_command="date",
)
renamed_task = BashOperator(
task_id="new_task2",
bash_command="sleep 5",
)
t1 >> renamed_task
self.backend.send_lineage(
operator=dag.get_task("task1"),
context={
"dag": dag,
"task": dag.get_task("task1"),
"task_instance": TaskInstance(
task=dag.get_task("task1"),
execution_date=datetime.strptime(
"2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S"
),
state="running",
),
},
)
self.backend.send_lineage(
operator=dag.get_task("new_task2"),
context={
"dag": dag,
"task": dag.get_task("new_task2"),
"task_instance": TaskInstance(
task=dag.get_task("new_task2"),
execution_date=datetime.strptime(
"2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S"
),
state="running",
),
},
)
pipeline: Pipeline = self.metadata.get_by_name(
entity=Pipeline, fqdn="local_airflow_3.clean_test", fields=["tasks"]
)
self.assertIsNotNone(pipeline)
self.assertIn("task1", {task.name for task in pipeline.tasks})
self.assertIn("new_task2", {task.name for task in pipeline.tasks})
self.assertNotIn("task2", {task.name for task in pipeline.tasks})
self.metadata.delete(
entity=Pipeline,
entity_id=pipeline.id,
recursive=True,
hard_delete=True,
)

View File

@ -340,10 +340,16 @@ class OMetaPipelineTest(TestCase):
pipeline = self.metadata.create_or_update(data=create_pipeline) pipeline = self.metadata.create_or_update(data=create_pipeline)
updated_pipeline = self.metadata.clean_pipeline_tasks( self.metadata.clean_pipeline_tasks(
pipeline=pipeline, task_ids=["task3", "task4"] pipeline=pipeline, task_ids=["task3", "task4"]
) )
updated_pipeline = self.metadata.get_by_name(
entity=Pipeline,
fqdn="test-service-pipeline.pipeline-test",
fields=["tasks"],
)
assert len(updated_pipeline.tasks) == 2 assert len(updated_pipeline.tasks) == 2
assert {task.name for task in updated_pipeline.tasks} == {"task3", "task4"} assert {task.name for task in updated_pipeline.tasks} == {"task3", "task4"}