fix(ingest): airflow: update subdag check for compatibility with older Airflow versions (#5523)

* use getattr to default None if no subdag

* add None check

* add other None check

* Apply suggestions from code review- double quotes

Co-authored-by: Harshal Sheth <hsheth2@gmail.com>

* minor tweak to fix lint

Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
This commit is contained in:
Vishal Shah 2022-07-29 16:28:20 -04:00 committed by GitHub
parent 55cb34e5d4
commit f11172f94c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -32,12 +32,11 @@ class AirflowGenerator:
upstream_task = dag.task_dict[upstream_task_id]
# if upstream task is not a subdag, then skip it
if upstream_task.subdag is None:
upstream_subdag = getattr(upstream_task, "subdag", None)
if upstream_subdag is None:
continue
# else, link the leaf tasks of the upstream subdag as upstream tasks
upstream_subdag = upstream_task.subdag
for upstream_subdag_task_id in upstream_subdag.task_dict:
upstream_subdag_task = upstream_subdag.task_dict[
upstream_subdag_task_id
@ -113,7 +112,7 @@ class AirflowGenerator:
[
DataJobUrn.create_from_ids(job_id=task_id, data_flow_urn=str(flow_urn))
for task_id in task.upstream_task_ids
if dag.task_dict[task_id].subdag is None
if getattr(dag.task_dict[task_id], "subdag", None) is None
]
+ upstream_subdag_task_urns
+ upstream_subdag_triggers