Fix #7904: Filter out old tasks which doesn't have dag runs while publishing status (#7905)

* Fix #7904: Filter out old tasks which doesn't have dag runs while publishing status

* Fix #7904: Filter out old tasks which doesn't have dag runs while publishing status
This commit is contained in:
Sriharsha Chintalapani 2022-10-03 21:34:55 -07:00 committed by GitHub
parent 566602373f
commit 8efc0370e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -207,37 +207,40 @@ class AirflowSource(PipelineServiceSource):
dag_run_list = self.get_pipeline_status(pipeline_details.dag_id)
for dag_run in dag_run_list:
tasks = self.get_task_instances(
dag_id=dag_run.dag_id,
run_id=dag_run.run_id,
execution_date=dag_run.execution_date, # Used for Airflow 2.1.4 query fallback
)
if (
dag_run.run_id
): # Airflow dags can have old task which are turned off/commented out in code
tasks = self.get_task_instances(
dag_id=dag_run.dag_id,
run_id=dag_run.run_id,
execution_date=dag_run.execution_date, # Used for Airflow 2.1.4 query fallback
)
task_statuses = [
TaskStatus(
name=task.task_id,
task_statuses = [
TaskStatus(
name=task.task_id,
executionStatus=STATUS_MAP.get(
task.state, StatusType.Pending.value
),
startTime=datetime_to_ts(task.start_date),
endTime=datetime_to_ts(
task.end_date
), # Might be None for running tasks
) # Log link might not be present in all Airflow versions
for task in tasks
]
pipeline_status = PipelineStatus(
taskStatus=task_statuses,
executionStatus=STATUS_MAP.get(
task.state, StatusType.Pending.value
dag_run.state, StatusType.Pending.value
),
startTime=datetime_to_ts(task.start_date),
endTime=datetime_to_ts(
task.end_date
), # Might be None for running tasks
) # Log link might not be present in all Airflow versions
for task in tasks
]
pipeline_status = PipelineStatus(
taskStatus=task_statuses,
executionStatus=STATUS_MAP.get(
dag_run.state, StatusType.Pending.value
),
timestamp=dag_run.execution_date.timestamp(),
)
yield OMetaPipelineStatus(
pipeline_fqn=self.context.pipeline.fullyQualifiedName.__root__,
pipeline_status=pipeline_status,
)
timestamp=dag_run.execution_date.timestamp(),
)
yield OMetaPipelineStatus(
pipeline_fqn=self.context.pipeline.fullyQualifiedName.__root__,
pipeline_status=pipeline_status,
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
@ -264,7 +267,6 @@ class AirflowSource(PipelineServiceSource):
json_data_column,
SerializedDagModel.fileloc,
).all():
yield OMSerializedDagDetails(
dag_id=serialized_dag[0],
data=serialized_dag[1],