Fixes #21106: Support owner extraction from serialized Airflow DAGs (#22071)

* fix(airflow): correctly extract owners from serialized Airflow DAGs

Airflow serialization format wraps tasks under `__var` and `__type`.
Previously, the OpenMetadata Airflow connector failed to extract task owners properly in this format.

This patch:
- Flattens `__var` when parsing task owners
- Fallbacks to `default_args["owner"]` if no task-level owner is explicitly present
- Ensures correct DAG owner is picked as the most common task owner
- Handles compatibility with older Airflow versions

Fixes: #21106

* test(airflow): add tests for owner extraction from serialized Airflow DAGs

Adds new test cases to validate owner extraction logic:
- Owners from serialized task format (`__var`)
- Fallback to `default_args['owner']` if task owners are missing
- Resolution of most common owner
- Compatibility with unstructured or missing owners

* remove test version specific comment

* simplify comments and warnings

* fix return statement

* fixing formatting

* adding handling of default args

* fixing and adding more tests
This commit is contained in:
Ferjani Nasraoui 2025-07-03 10:51:36 +02:00 committed by GitHub
parent 93cef8d868
commit b0e1a136cf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 81 additions and 10 deletions

View File

@ -414,18 +414,41 @@ class AirflowSource(PipelineServiceSource):
- `owners`: Applied at the tasks. In Airflow's source code, DAG ownership is then a
list joined with the owners of all the tasks.
We will pick the owner from the tasks that appears in most tasks.
We will pick the owner from the tasks that appears in most tasks,
or fall back to the default_args owner if available.
"""
try:
if self.source_config.includeOwners:
task_owners = [
task.get("owner")
for task in data.get("tasks", [])
if task.get("owner") is not None
]
if task_owners:
most_common_owner, _ = Counter(task_owners).most_common(1)[0]
return most_common_owner
if not self.source_config.includeOwners:
return None
tasks = data.get("tasks", [])
task_owners = []
# Handle default_args.owner (wrapped or not)
default_args = data.get("default_args", {})
if isinstance(default_args, dict) and "__var" in default_args:
default_args = default_args["__var"]
default_owner = default_args.get("owner")
for task in tasks:
# Flatten serialized task
task_data = (
task.get("__var")
if isinstance(task, dict) and "__var" in task
else task
)
owner = task_data.get("owner") or default_owner
if owner:
task_owners.append(owner)
if task_owners:
most_common_owner, _ = Counter(task_owners).most_common(1)[0]
return most_common_owner
return default_owner
except Exception as exc:
self.status.warning(
data.get("dag_id"), f"Could not extract owner information due to {exc}"

View File

@ -272,3 +272,51 @@ class TestAirflow(TestCase):
}
}
self.assertEqual(get_schedule_interval(pipeline_data), "*/2 * * * *")
def test_get_dag_owners_with_serialized_tasks(self):
# Case 1: All tasks have no explicit owner → fallback to default_args
data = {
"default_args": {"__var": {"owner": "default_owner"}},
"tasks": [
{"__var": {"task_id": "t1"}, "__type": "EmptyOperator"},
{"__var": {"task_id": "t2"}, "__type": "EmptyOperator"},
],
}
self.assertEqual("default_owner", self.airflow.fetch_dag_owners(data))
# Case 2: One task explicitly overrides the owner → tie between two owners
data = {
"default_args": {"__var": {"owner": "default_owner"}},
"tasks": [
{
"__var": {"task_id": "t1"},
"__type": "EmptyOperator",
}, # uses default_owner
{
"__var": {"task_id": "t2", "owner": "overridden_owner"},
"__type": "EmptyOperator",
},
],
}
result = self.airflow.fetch_dag_owners(data)
self.assertIn(result, {"default_owner", "overridden_owner"})
# Case 3: One owner is majority -> must return that owner
data = {
"default_args": {"__var": {"owner": "default_owner"}},
"tasks": [
{
"__var": {"task_id": "t1", "owner": "overridden_owner"},
"__type": "EmptyOperator",
},
{
"__var": {"task_id": "t2", "owner": "overridden_owner"},
"__type": "EmptyOperator",
},
{
"__var": {"task_id": "t3", "owner": "another_owner"},
"__type": "EmptyOperator",
},
],
}
self.assertEqual("overridden_owner", self.airflow.fetch_dag_owners(data))