From f11172f94c47d76384e3b1cc5129bf3affc60aba Mon Sep 17 00:00:00 2001 From: Vishal Shah Date: Fri, 29 Jul 2022 16:28:20 -0400 Subject: [PATCH] fix(ingest): airflow: update subdag check for compatibility with older Airflow versions (#5523) * use getattr to default None if no subdag * add None check * add other None check * Apply suggestions from code review- double quotes Co-authored-by: Harshal Sheth * minor tweak to fix lint Co-authored-by: Harshal Sheth --- .../src/datahub_provider/client/airflow_generator.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub_provider/client/airflow_generator.py b/metadata-ingestion/src/datahub_provider/client/airflow_generator.py index b7864ddb71..69943df50d 100644 --- a/metadata-ingestion/src/datahub_provider/client/airflow_generator.py +++ b/metadata-ingestion/src/datahub_provider/client/airflow_generator.py @@ -32,12 +32,11 @@ class AirflowGenerator: upstream_task = dag.task_dict[upstream_task_id] # if upstream task is not a subdag, then skip it - if upstream_task.subdag is None: + upstream_subdag = getattr(upstream_task, "subdag", None) + if upstream_subdag is None: continue # else, link the leaf tasks of the upstream subdag as upstream tasks - upstream_subdag = upstream_task.subdag - for upstream_subdag_task_id in upstream_subdag.task_dict: upstream_subdag_task = upstream_subdag.task_dict[ upstream_subdag_task_id @@ -113,7 +112,7 @@ class AirflowGenerator: [ DataJobUrn.create_from_ids(job_id=task_id, data_flow_urn=str(flow_urn)) for task_id in task.upstream_task_ids - if dag.task_dict[task_id].subdag is None + if getattr(dag.task_dict[task_id], "subdag", None) is None ] + upstream_subdag_task_urns + upstream_subdag_triggers