From b796db1cafacd2d0f2dc7c78df89be211027698a Mon Sep 17 00:00:00 2001 From: Stijn De Haes Date: Fri, 30 Dec 2022 19:12:25 +0100 Subject: [PATCH] fix(ingest/airflow): reorder imports to avoid cyclical dependencies (#6719) Co-authored-by: Harshal Sheth --- metadata-ingestion/src/datahub_provider/_airflow_shims.py | 3 +++ metadata-ingestion/src/datahub_provider/_lineage_core.py | 2 +- .../src/datahub_provider/client/airflow_generator.py | 7 +++++-- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub_provider/_airflow_shims.py b/metadata-ingestion/src/datahub_provider/_airflow_shims.py index 6ee06fd8f5..24917c982d 100644 --- a/metadata-ingestion/src/datahub_provider/_airflow_shims.py +++ b/metadata-ingestion/src/datahub_provider/_airflow_shims.py @@ -6,6 +6,9 @@ try: from airflow.models.mappedoperator import MappedOperator from airflow.models.operator import Operator except ModuleNotFoundError: + # Operator isn't a real class, but rather a type alias defined + # as the union of BaseOperator and MappedOperator. + # Since older versions of Airflow don't have MappedOperator, we can just use BaseOperator. Operator = BaseOperator # type: ignore MappedOperator = None # type: ignore diff --git a/metadata-ingestion/src/datahub_provider/_lineage_core.py b/metadata-ingestion/src/datahub_provider/_lineage_core.py index 4fca2313af..9ec44d7ffc 100644 --- a/metadata-ingestion/src/datahub_provider/_lineage_core.py +++ b/metadata-ingestion/src/datahub_provider/_lineage_core.py @@ -5,7 +5,6 @@ import datahub.emitter.mce_builder as builder from datahub.api.entities.dataprocess.dataprocess_instance import InstanceRunResult from datahub.configuration.common import ConfigModel from datahub.utilities.urns.dataset_urn import DatasetUrn -from datahub_provider._airflow_shims import Operator from datahub_provider.client.airflow_generator import AirflowGenerator from datahub_provider.entities import _Entity @@ -14,6 +13,7 @@ if TYPE_CHECKING: from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance + from datahub_provider._airflow_shims import Operator from datahub_provider.hooks.datahub import DatahubGenericHook diff --git a/metadata-ingestion/src/datahub_provider/client/airflow_generator.py b/metadata-ingestion/src/datahub_provider/client/airflow_generator.py index 449386468f..67db9b2c97 100644 --- a/metadata-ingestion/src/datahub_provider/client/airflow_generator.py +++ b/metadata-ingestion/src/datahub_provider/client/airflow_generator.py @@ -3,7 +3,6 @@ from datahub_provider._airflow_compat import AIRFLOW_PATCHED from typing import TYPE_CHECKING, Dict, List, Optional, Set, Union, cast from airflow.configuration import conf -from airflow.models.baseoperator import BaseOperator from datahub.api.entities.datajob import DataFlow, DataJob from datahub.api.entities.dataprocess.dataprocess_instance import ( @@ -13,7 +12,6 @@ from datahub.api.entities.dataprocess.dataprocess_instance import ( from datahub.metadata.schema_classes import DataProcessTypeClass from datahub.utilities.urns.data_flow_urn import DataFlowUrn from datahub.utilities.urns.data_job_urn import DataJobUrn -from datahub_provider._airflow_shims import ExternalTaskSensor, Operator assert AIRFLOW_PATCHED @@ -23,6 +21,7 @@ if TYPE_CHECKING: from datahub.emitter.kafka_emitter import DatahubKafkaEmitter from datahub.emitter.rest_emitter import DatahubRestEmitter + from datahub_provider._airflow_shims import Operator def _task_downstream_task_ids(operator: "Operator") -> Set[str]: @@ -36,6 +35,8 @@ class AirflowGenerator: def _get_dependencies( task: "Operator", dag: "DAG", flow_urn: DataFlowUrn ) -> List[DataJobUrn]: + from datahub_provider._airflow_shims import ExternalTaskSensor + # resolve URNs for upstream nodes in subdags upstream of the current task. upstream_subdag_task_urns: List[DataJobUrn] = [] @@ -182,6 +183,8 @@ class AirflowGenerator: @staticmethod def _get_description(task: "Operator") -> Optional[str]: + from airflow.models.baseoperator import BaseOperator + if not isinstance(task, BaseOperator): # TODO: Get docs for mapped operators. return None