From b0e1a136cf5807551a98eebd3c66df5f4ef6b538 Mon Sep 17 00:00:00 2001 From: Ferjani Nasraoui Date: Thu, 3 Jul 2025 10:51:36 +0200 Subject: [PATCH] Fixes #21106: Support owner extraction from serialized Airflow DAGs (#22071) * fix(airflow): correctly extract owners from serialized Airflow DAGs Airflow serialization format wraps tasks under `__var` and `__type`. Previously, the OpenMetadata Airflow connector failed to extract task owners properly in this format. This patch: - Flattens `__var` when parsing task owners - Fallbacks to `default_args["owner"]` if no task-level owner is explicitly present - Ensures correct DAG owner is picked as the most common task owner - Handles compatibility with older Airflow versions Fixes: #21106 * test(airflow): add tests for owner extraction from serialized Airflow DAGs Adds new test cases to validate owner extraction logic: - Owners from serialized task format (`__var`) - Fallback to `default_args['owner']` if task owners are missing - Resolution of most common owner - Compatibility with unstructured or missing owners * remove test version specific comment * simplify comments and warnings * fix return statement * fixing formatting * adding handling of default args * fixing and adding more tests --- .../source/pipeline/airflow/metadata.py | 43 +++++++++++++---- .../unit/topology/pipeline/test_airflow.py | 48 +++++++++++++++++++ 2 files changed, 81 insertions(+), 10 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py index 82f0ae7b6b6..781602428de 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py @@ -414,18 +414,41 @@ class AirflowSource(PipelineServiceSource): - `owners`: Applied at the tasks. In Airflow's source code, DAG ownership is then a list joined with the owners of all the tasks. - We will pick the owner from the tasks that appears in most tasks. + We will pick the owner from the tasks that appears in most tasks, + or fall back to the default_args owner if available. """ try: - if self.source_config.includeOwners: - task_owners = [ - task.get("owner") - for task in data.get("tasks", []) - if task.get("owner") is not None - ] - if task_owners: - most_common_owner, _ = Counter(task_owners).most_common(1)[0] - return most_common_owner + if not self.source_config.includeOwners: + return None + + tasks = data.get("tasks", []) + task_owners = [] + + # Handle default_args.owner (wrapped or not) + default_args = data.get("default_args", {}) + if isinstance(default_args, dict) and "__var" in default_args: + default_args = default_args["__var"] + default_owner = default_args.get("owner") + + for task in tasks: + # Flatten serialized task + task_data = ( + task.get("__var") + if isinstance(task, dict) and "__var" in task + else task + ) + + owner = task_data.get("owner") or default_owner + + if owner: + task_owners.append(owner) + + if task_owners: + most_common_owner, _ = Counter(task_owners).most_common(1)[0] + return most_common_owner + + return default_owner + except Exception as exc: self.status.warning( data.get("dag_id"), f"Could not extract owner information due to {exc}" diff --git a/ingestion/tests/unit/topology/pipeline/test_airflow.py b/ingestion/tests/unit/topology/pipeline/test_airflow.py index 0d721aa77b4..964ad463716 100644 --- a/ingestion/tests/unit/topology/pipeline/test_airflow.py +++ b/ingestion/tests/unit/topology/pipeline/test_airflow.py @@ -272,3 +272,51 @@ class TestAirflow(TestCase): } } self.assertEqual(get_schedule_interval(pipeline_data), "*/2 * * * *") + + def test_get_dag_owners_with_serialized_tasks(self): + # Case 1: All tasks have no explicit owner → fallback to default_args + data = { + "default_args": {"__var": {"owner": "default_owner"}}, + "tasks": [ + {"__var": {"task_id": "t1"}, "__type": "EmptyOperator"}, + {"__var": {"task_id": "t2"}, "__type": "EmptyOperator"}, + ], + } + self.assertEqual("default_owner", self.airflow.fetch_dag_owners(data)) + + # Case 2: One task explicitly overrides the owner → tie between two owners + data = { + "default_args": {"__var": {"owner": "default_owner"}}, + "tasks": [ + { + "__var": {"task_id": "t1"}, + "__type": "EmptyOperator", + }, # uses default_owner + { + "__var": {"task_id": "t2", "owner": "overridden_owner"}, + "__type": "EmptyOperator", + }, + ], + } + result = self.airflow.fetch_dag_owners(data) + self.assertIn(result, {"default_owner", "overridden_owner"}) + + # Case 3: One owner is majority -> must return that owner + data = { + "default_args": {"__var": {"owner": "default_owner"}}, + "tasks": [ + { + "__var": {"task_id": "t1", "owner": "overridden_owner"}, + "__type": "EmptyOperator", + }, + { + "__var": {"task_id": "t2", "owner": "overridden_owner"}, + "__type": "EmptyOperator", + }, + { + "__var": {"task_id": "t3", "owner": "another_owner"}, + "__type": "EmptyOperator", + }, + ], + } + self.assertEqual("overridden_owner", self.airflow.fetch_dag_owners(data))