FIX #18309: fixed task deserialization in Airflow metadata ingestion (#18310)

* fixed task deserialization in Airflow metadata ingestion

* fixed formatting

---------

Co-authored-by: Katarzyna Kałek <kkalek@olx.pl>
This commit is contained in:
Katarzyna Kałek 2024-10-17 23:51:55 +02:00 committed by GitHub
parent 47c993be92
commit c9995eecb6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -15,7 +15,7 @@ import traceback
from collections import Counter
from datetime import datetime
from enum import Enum
from typing import Iterable, List, Optional, cast
from typing import Dict, Iterable, List, Optional, cast
from airflow.models import BaseOperator, DagRun, TaskInstance
from airflow.models.dag import DagModel
@ -142,6 +142,16 @@ class AirflowSource(PipelineServiceSource):
return self._session
@staticmethod
def _extract_serialized_task(task: Dict) -> Dict:
"""
Given the serialization changes introduced in Airflow 2.10,
ensure compatibility with all versions.
"""
if task.keys() == {"__var", "__type"}:
return task["__var"]
return task
def get_pipeline_status(self, dag_id: str) -> List[DagRun]:
"""
Return the DagRuns of given dag
@ -328,7 +338,9 @@ class AirflowSource(PipelineServiceSource):
max_active_runs=data.get("max_active_runs", None),
description=data.get("_description", None),
start_date=data.get("start_date", None),
tasks=data.get("tasks", []),
tasks=list(
map(self._extract_serialized_task, data.get("tasks", []))
),
schedule_interval=get_schedule_interval(data),
owner=self.fetch_dag_owners(data),
)