diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py index 82f0ae7b6b6..781602428de 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py @@ -414,18 +414,41 @@ class AirflowSource(PipelineServiceSource): - `owners`: Applied at the tasks. In Airflow's source code, DAG ownership is then a list joined with the owners of all the tasks. - We will pick the owner from the tasks that appears in most tasks. + We will pick the owner from the tasks that appears in most tasks, + or fall back to the default_args owner if available. """ try: - if self.source_config.includeOwners: - task_owners = [ - task.get("owner") - for task in data.get("tasks", []) - if task.get("owner") is not None - ] - if task_owners: - most_common_owner, _ = Counter(task_owners).most_common(1)[0] - return most_common_owner + if not self.source_config.includeOwners: + return None + + tasks = data.get("tasks", []) + task_owners = [] + + # Handle default_args.owner (wrapped or not) + default_args = data.get("default_args", {}) + if isinstance(default_args, dict) and "__var" in default_args: + default_args = default_args["__var"] + default_owner = default_args.get("owner") + + for task in tasks: + # Flatten serialized task + task_data = ( + task.get("__var") + if isinstance(task, dict) and "__var" in task + else task + ) + + owner = task_data.get("owner") or default_owner + + if owner: + task_owners.append(owner) + + if task_owners: + most_common_owner, _ = Counter(task_owners).most_common(1)[0] + return most_common_owner + + return default_owner + except Exception as exc: self.status.warning( data.get("dag_id"), f"Could not extract owner information due to {exc}" diff --git a/ingestion/tests/unit/topology/pipeline/test_airflow.py b/ingestion/tests/unit/topology/pipeline/test_airflow.py index 0d721aa77b4..964ad463716 100644 --- a/ingestion/tests/unit/topology/pipeline/test_airflow.py +++ b/ingestion/tests/unit/topology/pipeline/test_airflow.py @@ -272,3 +272,51 @@ class TestAirflow(TestCase): } } self.assertEqual(get_schedule_interval(pipeline_data), "*/2 * * * *") + + def test_get_dag_owners_with_serialized_tasks(self): + # Case 1: All tasks have no explicit owner → fallback to default_args + data = { + "default_args": {"__var": {"owner": "default_owner"}}, + "tasks": [ + {"__var": {"task_id": "t1"}, "__type": "EmptyOperator"}, + {"__var": {"task_id": "t2"}, "__type": "EmptyOperator"}, + ], + } + self.assertEqual("default_owner", self.airflow.fetch_dag_owners(data)) + + # Case 2: One task explicitly overrides the owner → tie between two owners + data = { + "default_args": {"__var": {"owner": "default_owner"}}, + "tasks": [ + { + "__var": {"task_id": "t1"}, + "__type": "EmptyOperator", + }, # uses default_owner + { + "__var": {"task_id": "t2", "owner": "overridden_owner"}, + "__type": "EmptyOperator", + }, + ], + } + result = self.airflow.fetch_dag_owners(data) + self.assertIn(result, {"default_owner", "overridden_owner"}) + + # Case 3: One owner is majority -> must return that owner + data = { + "default_args": {"__var": {"owner": "default_owner"}}, + "tasks": [ + { + "__var": {"task_id": "t1", "owner": "overridden_owner"}, + "__type": "EmptyOperator", + }, + { + "__var": {"task_id": "t2", "owner": "overridden_owner"}, + "__type": "EmptyOperator", + }, + { + "__var": {"task_id": "t3", "owner": "another_owner"}, + "__type": "EmptyOperator", + }, + ], + } + self.assertEqual("overridden_owner", self.airflow.fetch_dag_owners(data))