diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow.py index f891b9d8133..d50772a49ad 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow.py @@ -207,37 +207,40 @@ class AirflowSource(PipelineServiceSource): dag_run_list = self.get_pipeline_status(pipeline_details.dag_id) for dag_run in dag_run_list: - tasks = self.get_task_instances( - dag_id=dag_run.dag_id, - run_id=dag_run.run_id, - execution_date=dag_run.execution_date, # Used for Airflow 2.1.4 query fallback - ) + if ( + dag_run.run_id + ): # Airflow dags can have old task which are turned off/commented out in code + tasks = self.get_task_instances( + dag_id=dag_run.dag_id, + run_id=dag_run.run_id, + execution_date=dag_run.execution_date, # Used for Airflow 2.1.4 query fallback + ) - task_statuses = [ - TaskStatus( - name=task.task_id, + task_statuses = [ + TaskStatus( + name=task.task_id, + executionStatus=STATUS_MAP.get( + task.state, StatusType.Pending.value + ), + startTime=datetime_to_ts(task.start_date), + endTime=datetime_to_ts( + task.end_date + ), # Might be None for running tasks + ) # Log link might not be present in all Airflow versions + for task in tasks + ] + + pipeline_status = PipelineStatus( + taskStatus=task_statuses, executionStatus=STATUS_MAP.get( - task.state, StatusType.Pending.value + dag_run.state, StatusType.Pending.value ), - startTime=datetime_to_ts(task.start_date), - endTime=datetime_to_ts( - task.end_date - ), # Might be None for running tasks - ) # Log link might not be present in all Airflow versions - for task in tasks - ] - - pipeline_status = PipelineStatus( - taskStatus=task_statuses, - executionStatus=STATUS_MAP.get( - dag_run.state, StatusType.Pending.value - ), - timestamp=dag_run.execution_date.timestamp(), - ) - yield OMetaPipelineStatus( - pipeline_fqn=self.context.pipeline.fullyQualifiedName.__root__, - pipeline_status=pipeline_status, - ) + timestamp=dag_run.execution_date.timestamp(), + ) + yield OMetaPipelineStatus( + pipeline_fqn=self.context.pipeline.fullyQualifiedName.__root__, + pipeline_status=pipeline_status, + ) except Exception as exc: logger.debug(traceback.format_exc()) logger.warning( @@ -264,7 +267,6 @@ class AirflowSource(PipelineServiceSource): json_data_column, SerializedDagModel.fileloc, ).all(): - yield OMSerializedDagDetails( dag_id=serialized_dag[0], data=serialized_dag[1],