diff --git a/metadata-ingestion/src/datahub_provider/client/airflow_generator.py b/metadata-ingestion/src/datahub_provider/client/airflow_generator.py index b7864ddb71..69943df50d 100644 --- a/metadata-ingestion/src/datahub_provider/client/airflow_generator.py +++ b/metadata-ingestion/src/datahub_provider/client/airflow_generator.py @@ -32,12 +32,11 @@ class AirflowGenerator: upstream_task = dag.task_dict[upstream_task_id] # if upstream task is not a subdag, then skip it - if upstream_task.subdag is None: + upstream_subdag = getattr(upstream_task, "subdag", None) + if upstream_subdag is None: continue # else, link the leaf tasks of the upstream subdag as upstream tasks - upstream_subdag = upstream_task.subdag - for upstream_subdag_task_id in upstream_subdag.task_dict: upstream_subdag_task = upstream_subdag.task_dict[ upstream_subdag_task_id @@ -113,7 +112,7 @@ class AirflowGenerator: [ DataJobUrn.create_from_ids(job_id=task_id, data_flow_urn=str(flow_urn)) for task_id in task.upstream_task_ids - if dag.task_dict[task_id].subdag is None + if getattr(dag.task_dict[task_id], "subdag", None) is None ] + upstream_subdag_task_urns + upstream_subdag_triggers