From 04421901c04afa71d3a3226adf74b7e709ef7611 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Sun, 8 May 2022 06:10:35 +0200 Subject: [PATCH] [WIP] Fix #4610 - Pipeline Tasks removal (#4615) Co-authored-by: Sriharsha Chintalapani --- .../catalog/jdbi3/PipelineRepository.java | 4 +- .../pipelines/PipelineResourceTest.java | 30 ++++- .../lineage/airflow/test_airflow_lineage.py | 127 ++++++++++++++++++ .../ometa/test_ometa_pipeline_api.py | 8 +- 4 files changed, 162 insertions(+), 7 deletions(-) diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java index f37a4ff4b81..32fbfbeb7f9 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineRepository.java @@ -395,7 +395,9 @@ public class PipelineRepository extends EntityRepository { updateTaskDescription(stored, updated); } - if (newTasks) { + boolean removedTasks = updatedTasks.size() < origTasks.size(); + + if (newTasks || removedTasks) { List added = new ArrayList<>(); List deleted = new ArrayList<>(); recordListChange("tasks", origTasks, updatedTasks, added, deleted, taskMatch); diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/pipelines/PipelineResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/pipelines/PipelineResourceTest.java index 26ccf0db288..16258cd4b79 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/pipelines/PipelineResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/pipelines/PipelineResourceTest.java @@ -259,6 +259,13 @@ public class PipelineResourceTest extends EntityResourceTest new_tasks = new ArrayList<>(); + for (int i = 1; i < 3; i++) { // remove task0 + Task task = + new Task() + .withName("task" + i) + .withDescription("description") + .withDisplayName("displayName") + .withTaskUrl(new URI("http://localhost:0")); + new_tasks.add(task); + } + request.setTasks(new_tasks); + change = getChangeDescription(pipeline.getVersion()); + change.getFieldsUpdated().add(new FieldChange().withNewValue(new_tasks).withOldValue(TASKS)); + pipeline = updateEntity(request, OK, ADMIN_AUTH_HEADERS); + assertEquals(2, pipeline.getTasks().size()); } @Override diff --git a/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py b/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py index 10a8bef960c..6decfe3f15a 100644 --- a/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py +++ b/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py @@ -11,6 +11,10 @@ """ Test airflow lineage backend + +These tests should be run with Airflow 2.1.4 +Other airflow versions require a different way to +mock the DAG and Task runs. """ from datetime import datetime, timedelta @@ -301,3 +305,126 @@ class AirflowLineageTest(TestCase): self.assertIn("group1.task1", {task.name for task in pipeline.tasks}) self.assertIn("group1.task2", {task.name for task in pipeline.tasks}) self.assertIn("end", {task.name for task in pipeline.tasks}) + + def test_clean_tasks(self): + """ + Check that we can safely remove tasks from a Pipeline + """ + + with DAG( + "clean_test", + description="A lineage test DAG", + schedule_interval=timedelta(days=1), + start_date=datetime(2021, 1, 1), + ) as dag: + t1 = BashOperator( # Using BashOperator as a random example + task_id="task1", + bash_command="date", + ) + + t2 = BashOperator( # Using BashOperator as a random example + task_id="task2", + bash_command="sleep 5", + ) + + t1 >> t2 + + self.backend.send_lineage( + operator=dag.get_task("task1"), + context={ + "dag": dag, + "task": dag.get_task("task1"), + "task_instance": TaskInstance( + task=dag.get_task("task1"), + execution_date=datetime.strptime( + "2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S" + ), + state="running", + ), + }, + ) + + self.backend.send_lineage( + operator=dag.get_task("task2"), + context={ + "dag": dag, + "task": dag.get_task("task2"), + "task_instance": TaskInstance( + task=dag.get_task("task2"), + execution_date=datetime.strptime( + "2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S" + ), + state="running", + ), + }, + ) + + pipeline = self.metadata.get_by_name( + entity=Pipeline, fqdn="local_airflow_3.clean_test", fields=["tasks"] + ) + self.assertIsNotNone(pipeline) + self.assertIn("task1", {task.name for task in pipeline.tasks}) + self.assertIn("task2", {task.name for task in pipeline.tasks}) + + with DAG( + "clean_test", + description="A lineage test DAG", + schedule_interval=timedelta(days=1), + start_date=datetime(2021, 1, 1), + ) as dag: + t1 = BashOperator( + task_id="task1", + bash_command="date", + ) + + renamed_task = BashOperator( + task_id="new_task2", + bash_command="sleep 5", + ) + + t1 >> renamed_task + + self.backend.send_lineage( + operator=dag.get_task("task1"), + context={ + "dag": dag, + "task": dag.get_task("task1"), + "task_instance": TaskInstance( + task=dag.get_task("task1"), + execution_date=datetime.strptime( + "2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S" + ), + state="running", + ), + }, + ) + + self.backend.send_lineage( + operator=dag.get_task("new_task2"), + context={ + "dag": dag, + "task": dag.get_task("new_task2"), + "task_instance": TaskInstance( + task=dag.get_task("new_task2"), + execution_date=datetime.strptime( + "2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S" + ), + state="running", + ), + }, + ) + + pipeline: Pipeline = self.metadata.get_by_name( + entity=Pipeline, fqdn="local_airflow_3.clean_test", fields=["tasks"] + ) + self.assertIsNotNone(pipeline) + self.assertIn("task1", {task.name for task in pipeline.tasks}) + self.assertIn("new_task2", {task.name for task in pipeline.tasks}) + self.assertNotIn("task2", {task.name for task in pipeline.tasks}) + + self.metadata.delete( + entity=Pipeline, + entity_id=pipeline.id, + recursive=True, + hard_delete=True, + ) diff --git a/ingestion/tests/integration/ometa/test_ometa_pipeline_api.py b/ingestion/tests/integration/ometa/test_ometa_pipeline_api.py index 07662e7ea35..2e37515d072 100644 --- a/ingestion/tests/integration/ometa/test_ometa_pipeline_api.py +++ b/ingestion/tests/integration/ometa/test_ometa_pipeline_api.py @@ -340,10 +340,16 @@ class OMetaPipelineTest(TestCase): pipeline = self.metadata.create_or_update(data=create_pipeline) - updated_pipeline = self.metadata.clean_pipeline_tasks( + self.metadata.clean_pipeline_tasks( pipeline=pipeline, task_ids=["task3", "task4"] ) + updated_pipeline = self.metadata.get_by_name( + entity=Pipeline, + fqdn="test-service-pipeline.pipeline-test", + fields=["tasks"], + ) + assert len(updated_pipeline.tasks) == 2 assert {task.name for task in updated_pipeline.tasks} == {"task3", "task4"}