diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py index 3d20858099c..9e3bd5d7541 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py @@ -15,7 +15,7 @@ import traceback from collections import Counter from datetime import datetime from enum import Enum -from typing import Iterable, List, Optional, cast +from typing import Dict, Iterable, List, Optional, cast from airflow.models import BaseOperator, DagRun, TaskInstance from airflow.models.dag import DagModel @@ -142,6 +142,16 @@ class AirflowSource(PipelineServiceSource): return self._session + @staticmethod + def _extract_serialized_task(task: Dict) -> Dict: + """ + Given the serialization changes introduced in Airflow 2.10, + ensure compatibility with all versions. + """ + if task.keys() == {"__var", "__type"}: + return task["__var"] + return task + def get_pipeline_status(self, dag_id: str) -> List[DagRun]: """ Return the DagRuns of given dag @@ -328,7 +338,9 @@ class AirflowSource(PipelineServiceSource): max_active_runs=data.get("max_active_runs", None), description=data.get("_description", None), start_date=data.get("start_date", None), - tasks=data.get("tasks", []), + tasks=list( + map(self._extract_serialized_task, data.get("tasks", [])) + ), schedule_interval=get_schedule_interval(data), owner=self.fetch_dag_owners(data), )