From c9995eecb63f6e10f6f8a9b2de00c00022e18f5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Katarzyna=20Ka=C5=82ek?= <145936017+trina242@users.noreply.github.com> Date: Thu, 17 Oct 2024 23:51:55 +0200 Subject: [PATCH] FIX #18309: fixed task deserialization in Airflow metadata ingestion (#18310) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fixed task deserialization in Airflow metadata ingestion * fixed formatting --------- Co-authored-by: Katarzyna Kałek --- .../source/pipeline/airflow/metadata.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py index 3d20858099c..9e3bd5d7541 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py @@ -15,7 +15,7 @@ import traceback from collections import Counter from datetime import datetime from enum import Enum -from typing import Iterable, List, Optional, cast +from typing import Dict, Iterable, List, Optional, cast from airflow.models import BaseOperator, DagRun, TaskInstance from airflow.models.dag import DagModel @@ -142,6 +142,16 @@ class AirflowSource(PipelineServiceSource): return self._session + @staticmethod + def _extract_serialized_task(task: Dict) -> Dict: + """ + Given the serialization changes introduced in Airflow 2.10, + ensure compatibility with all versions. + """ + if task.keys() == {"__var", "__type"}: + return task["__var"] + return task + def get_pipeline_status(self, dag_id: str) -> List[DagRun]: """ Return the DagRuns of given dag @@ -328,7 +338,9 @@ class AirflowSource(PipelineServiceSource): max_active_runs=data.get("max_active_runs", None), description=data.get("_description", None), start_date=data.get("start_date", None), - tasks=data.get("tasks", []), + tasks=list( + map(self._extract_serialized_task, data.get("tasks", [])) + ), schedule_interval=get_schedule_interval(data), owner=self.fetch_dag_owners(data), )