diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py index c68b4752a55..b80a7a11b96 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py @@ -56,6 +56,7 @@ from metadata.ingestion.source.pipeline.airflow.lineage_parser import ( from metadata.ingestion.source.pipeline.airflow.models import ( AirflowDag, AirflowDagDetails, + AirflowTask, ) from metadata.ingestion.source.pipeline.airflow.utils import get_schedule_interval from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource @@ -164,7 +165,9 @@ class AirflowSource(PipelineServiceSource): for elem in dag_run_dict ] - def get_task_instances(self, dag_id: str, run_id: str) -> List[OMTaskInstance]: + def get_task_instances( + self, dag_id: str, run_id: str, serialized_tasks: List[AirflowTask] + ) -> List[OMTaskInstance]: """ We are building our own scoped TaskInstance class to only focus on core properties required @@ -174,6 +177,7 @@ class AirflowSource(PipelineServiceSource): sources we support. """ task_instance_list = None + serialized_tasks_ids = {task.task_id for task in serialized_tasks} try: task_instance_list = ( @@ -210,6 +214,7 @@ class AirflowSource(PipelineServiceSource): end_date=elem.get("end_date"), ) for elem in task_instance_dict + if elem.get("task_id") in serialized_tasks_ids ] def yield_pipeline_status( @@ -223,7 +228,9 @@ class AirflowSource(PipelineServiceSource): dag_run.run_id and self.context.task_names ): # Airflow dags can have old task which are turned off/commented out in code tasks = self.get_task_instances( - dag_id=dag_run.dag_id, run_id=dag_run.run_id + dag_id=dag_run.dag_id, + run_id=dag_run.run_id, + serialized_tasks=pipeline_details.tasks, ) task_statuses = [