FIX #18309: fixed task deserialization in Airflow metadata ingestion (#18310)

* fixed task deserialization in Airflow metadata ingestion

* fixed formatting

---------

Co-authored-by: Katarzyna Kałek <kkalek@olx.pl>
This commit is contained in:
Katarzyna Kałek 2024-10-17 23:51:55 +02:00 committed by Ayush Shah
parent 1bc2cc1d71
commit 010761ed5c

View File

@ -15,7 +15,7 @@ import traceback
from collections import Counter from collections import Counter
from datetime import datetime from datetime import datetime
from enum import Enum from enum import Enum
from typing import Iterable, List, Optional, cast from typing import Dict, Iterable, List, Optional, cast
from airflow.models import BaseOperator, DagRun, TaskInstance from airflow.models import BaseOperator, DagRun, TaskInstance
from airflow.models.dag import DagModel from airflow.models.dag import DagModel
@ -142,6 +142,16 @@ class AirflowSource(PipelineServiceSource):
return self._session return self._session
@staticmethod
def _extract_serialized_task(task: Dict) -> Dict:
"""
Given the serialization changes introduced in Airflow 2.10,
ensure compatibility with all versions.
"""
if task.keys() == {"__var", "__type"}:
return task["__var"]
return task
def get_pipeline_status(self, dag_id: str) -> List[DagRun]: def get_pipeline_status(self, dag_id: str) -> List[DagRun]:
""" """
Return the DagRuns of given dag Return the DagRuns of given dag
@ -328,7 +338,9 @@ class AirflowSource(PipelineServiceSource):
max_active_runs=data.get("max_active_runs", None), max_active_runs=data.get("max_active_runs", None),
description=data.get("_description", None), description=data.get("_description", None),
start_date=data.get("start_date", None), start_date=data.get("start_date", None),
tasks=data.get("tasks", []), tasks=list(
map(self._extract_serialized_task, data.get("tasks", []))
),
schedule_interval=get_schedule_interval(data), schedule_interval=get_schedule_interval(data),
owner=self.fetch_dag_owners(data), owner=self.fetch_dag_owners(data),
) )