Fixes 14528: skip statuses for tasks that are no longer present in Airflow DAG (#14529)

* 🐛 Fix attribute name

* import AirflowTask model

* make py_format

* make py_format_check
This commit is contained in:
mgorsk1 2023-12-30 22:26:34 +01:00 committed by GitHub
parent 62d0331b53
commit 5ae83b284a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -56,6 +56,7 @@ from metadata.ingestion.source.pipeline.airflow.lineage_parser import (
from metadata.ingestion.source.pipeline.airflow.models import (
AirflowDag,
AirflowDagDetails,
AirflowTask,
)
from metadata.ingestion.source.pipeline.airflow.utils import get_schedule_interval
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
@ -164,7 +165,9 @@ class AirflowSource(PipelineServiceSource):
for elem in dag_run_dict
]
def get_task_instances(self, dag_id: str, run_id: str) -> List[OMTaskInstance]:
def get_task_instances(
self, dag_id: str, run_id: str, serialized_tasks: List[AirflowTask]
) -> List[OMTaskInstance]:
"""
We are building our own scoped TaskInstance
class to only focus on core properties required
@ -174,6 +177,7 @@ class AirflowSource(PipelineServiceSource):
sources we support.
"""
task_instance_list = None
serialized_tasks_ids = {task.task_id for task in serialized_tasks}
try:
task_instance_list = (
@ -210,6 +214,7 @@ class AirflowSource(PipelineServiceSource):
end_date=elem.get("end_date"),
)
for elem in task_instance_dict
if elem.get("task_id") in serialized_tasks_ids
]
def yield_pipeline_status(
@ -223,7 +228,9 @@ class AirflowSource(PipelineServiceSource):
dag_run.run_id and self.context.task_names
): # Airflow dags can have old task which are turned off/commented out in code
tasks = self.get_task_instances(
dag_id=dag_run.dag_id, run_id=dag_run.run_id
dag_id=dag_run.dag_id,
run_id=dag_run.run_id,
serialized_tasks=pipeline_details.tasks,
)
task_statuses = [