From d82770a91e29a86bcaf9af5017075d8bac0c95b0 Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Fri, 5 Jan 2024 18:14:26 +0530 Subject: [PATCH] Fix #14421: Airflow - Handle pipeline status when dag killed (#14562) --- .../src/metadata/workflow/application.py | 4 ++++ .../api/routes/deploy.py | 2 +- .../workflows/ingestion/common.py | 23 ++++++++++++++++--- 3 files changed, 25 insertions(+), 4 deletions(-) diff --git a/ingestion/src/metadata/workflow/application.py b/ingestion/src/metadata/workflow/application.py index 9884d3cf2ec..fd43d0a51bb 100644 --- a/ingestion/src/metadata/workflow/application.py +++ b/ingestion/src/metadata/workflow/application.py @@ -57,6 +57,10 @@ class AppRunner(Step, ABC): super().__init__() + @property + def name(self) -> str: + return "AppRunner" + @abstractmethod def run(self) -> None: """App logic to execute""" diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/deploy.py b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/deploy.py index 60138de7822..26352b4876c 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/deploy.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/deploy.py @@ -56,7 +56,7 @@ def get_fn(blueprint: Blueprint) -> Callable: if json_request is None: return ApiResponse.error( status=ApiResponse.STATUS_BAD_REQUEST, - error=f"Did not receive any JSON request to deploy", + error="Did not receive any JSON request to deploy", ) ingestion_pipeline = parse_ingestion_pipeline_config_gracefully( diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py index 764284c7ebc..c5841899be8 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py @@ -38,6 +38,7 @@ from metadata.ingestion.models.encoders import show_secrets_encoder from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.workflow.workflow_output_handler import print_status +# pylint: disable=ungrouped-imports try: from airflow.operators.python import PythonOperator except ModuleNotFoundError: @@ -141,7 +142,8 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource: type=service_type, serviceName=ingestion_pipeline.service.name, sourceConfig=ingestion_pipeline.sourceConfig, - serviceConnection=None, # retrieved from the test suite workflow using the `sourceConfig.config.entityFullyQualifiedName` + # retrieved from the test suite workflow using the `sourceConfig.config.entityFullyQualifiedName` + serviceConnection=None, ) if entity_class is None: @@ -303,6 +305,19 @@ def send_failed_status_callback(workflow_config: OpenMetadataWorkflowConfig, _): ) +class CustomPythonOperator(PythonOperator): + def on_kill(self) -> None: + """ + Override this method to clean up subprocesses when a task instance + gets killed. Any use of the threading, subprocess or multiprocessing + module within an operator needs to be cleaned up, or it will leave + ghost processes behind. + """ + workflow_config = self.op_kwargs.get("workflow_config") + if workflow_config: + send_failed_status_callback(workflow_config, None) + + def build_dag( task_name: str, ingestion_pipeline: IngestionPipeline, @@ -318,14 +333,16 @@ def build_dag( # generating it inside the Workflow itself. workflow_config.pipelineRunId = str(uuid.uuid4()) - PythonOperator( + CustomPythonOperator( task_id=task_name, python_callable=workflow_fn, op_kwargs={"workflow_config": workflow_config}, # There's no need to retry if we have had an error. Wait until the next schedule or manual rerun. retries=ingestion_pipeline.airflowConfig.retries or 0, # each DAG will call its own OpenMetadataWorkflowConfig - on_failure_callback=partial(send_failed_status_callback, workflow_config), + on_failure_callback=partial( + send_failed_status_callback, workflow_config, None + ), # Add tag and ownership to easily identify DAGs generated by OM owner=ingestion_pipeline.owner.name if ingestion_pipeline.owner