Fix #14421: Airflow - Handle pipeline status when dag killed (#14562)

This commit is contained in:
Mayur Singal 2024-01-05 18:14:26 +05:30 committed by GitHub
parent e416810be2
commit d82770a91e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 25 additions and 4 deletions

View File

@ -57,6 +57,10 @@ class AppRunner(Step, ABC):
super().__init__()
@property
def name(self) -> str:
return "AppRunner"
@abstractmethod
def run(self) -> None:
"""App logic to execute"""

View File

@ -56,7 +56,7 @@ def get_fn(blueprint: Blueprint) -> Callable:
if json_request is None:
return ApiResponse.error(
status=ApiResponse.STATUS_BAD_REQUEST,
error=f"Did not receive any JSON request to deploy",
error="Did not receive any JSON request to deploy",
)
ingestion_pipeline = parse_ingestion_pipeline_config_gracefully(

View File

@ -38,6 +38,7 @@ from metadata.ingestion.models.encoders import show_secrets_encoder
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.workflow_output_handler import print_status
# pylint: disable=ungrouped-imports
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
@ -141,7 +142,8 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource:
type=service_type,
serviceName=ingestion_pipeline.service.name,
sourceConfig=ingestion_pipeline.sourceConfig,
serviceConnection=None, # retrieved from the test suite workflow using the `sourceConfig.config.entityFullyQualifiedName`
# retrieved from the test suite workflow using the `sourceConfig.config.entityFullyQualifiedName`
serviceConnection=None,
)
if entity_class is None:
@ -303,6 +305,19 @@ def send_failed_status_callback(workflow_config: OpenMetadataWorkflowConfig, _):
)
class CustomPythonOperator(PythonOperator):
def on_kill(self) -> None:
"""
Override this method to clean up subprocesses when a task instance
gets killed. Any use of the threading, subprocess or multiprocessing
module within an operator needs to be cleaned up, or it will leave
ghost processes behind.
"""
workflow_config = self.op_kwargs.get("workflow_config")
if workflow_config:
send_failed_status_callback(workflow_config, None)
def build_dag(
task_name: str,
ingestion_pipeline: IngestionPipeline,
@ -318,14 +333,16 @@ def build_dag(
# generating it inside the Workflow itself.
workflow_config.pipelineRunId = str(uuid.uuid4())
PythonOperator(
CustomPythonOperator(
task_id=task_name,
python_callable=workflow_fn,
op_kwargs={"workflow_config": workflow_config},
# There's no need to retry if we have had an error. Wait until the next schedule or manual rerun.
retries=ingestion_pipeline.airflowConfig.retries or 0,
# each DAG will call its own OpenMetadataWorkflowConfig
on_failure_callback=partial(send_failed_status_callback, workflow_config),
on_failure_callback=partial(
send_failed_status_callback, workflow_config, None
),
# Add tag and ownership to easily identify DAGs generated by OM
owner=ingestion_pipeline.owner.name
if ingestion_pipeline.owner