Minor: Airflow skip task status if task deleted (#14373)

This commit is contained in:
Mayur Singal 2023-12-13 18:34:14 +05:30 committed by GitHub
parent 35d4c64e69
commit 4f65287dae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -202,7 +202,7 @@ class AirflowSource(PipelineServiceSource):
for dag_run in dag_run_list:
if (
dag_run.run_id
dag_run.run_id and self.context.task_names
): # Airflow dags can have old task which are turned off/commented out in code
tasks = self.get_task_instances(
dag_id=dag_run.dag_id, run_id=dag_run.run_id
@ -220,6 +220,7 @@ class AirflowSource(PipelineServiceSource):
), # Might be None for running tasks
) # Log link might not be present in all Airflow versions
for task in tasks
if task.task_id in self.context.task_names
]
pipeline_status = PipelineStatus(
@ -381,7 +382,11 @@ class AirflowSource(PipelineServiceSource):
)
yield Either(right=pipeline_request)
self.register_record(pipeline_request=pipeline_request)
self.context.task_names = {
task.name for task in pipeline_request.tasks or []
}
except TypeError as err:
self.context.task_names = set()
yield Either(
left=StackTraceError(
name=pipeline_details.dag_id,
@ -393,6 +398,7 @@ class AirflowSource(PipelineServiceSource):
)
)
except ValidationError as err:
self.context.task_names = set()
yield Either(
left=StackTraceError(
name=pipeline_details.dag_id,
@ -402,6 +408,7 @@ class AirflowSource(PipelineServiceSource):
)
except Exception as err:
self.context.task_names = set()
yield Either(
left=StackTraceError(
name=pipeline_details.dag_id,