fix(ingest/airflow): reorder imports to avoid cyclical dependencies (#6719)

Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
This commit is contained in:
Stijn De Haes 2022-12-30 19:12:25 +01:00 committed by GitHub
parent 092d4c808d
commit b796db1caf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 9 additions and 3 deletions

View File

@ -6,6 +6,9 @@ try:
from airflow.models.mappedoperator import MappedOperator
from airflow.models.operator import Operator
except ModuleNotFoundError:
# Operator isn't a real class, but rather a type alias defined
# as the union of BaseOperator and MappedOperator.
# Since older versions of Airflow don't have MappedOperator, we can just use BaseOperator.
Operator = BaseOperator # type: ignore
MappedOperator = None # type: ignore

View File

@ -5,7 +5,6 @@ import datahub.emitter.mce_builder as builder
from datahub.api.entities.dataprocess.dataprocess_instance import InstanceRunResult
from datahub.configuration.common import ConfigModel
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub_provider._airflow_shims import Operator
from datahub_provider.client.airflow_generator import AirflowGenerator
from datahub_provider.entities import _Entity
@ -14,6 +13,7 @@ if TYPE_CHECKING:
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from datahub_provider._airflow_shims import Operator
from datahub_provider.hooks.datahub import DatahubGenericHook

View File

@ -3,7 +3,6 @@ from datahub_provider._airflow_compat import AIRFLOW_PATCHED
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Union, cast
from airflow.configuration import conf
from airflow.models.baseoperator import BaseOperator
from datahub.api.entities.datajob import DataFlow, DataJob
from datahub.api.entities.dataprocess.dataprocess_instance import (
@ -13,7 +12,6 @@ from datahub.api.entities.dataprocess.dataprocess_instance import (
from datahub.metadata.schema_classes import DataProcessTypeClass
from datahub.utilities.urns.data_flow_urn import DataFlowUrn
from datahub.utilities.urns.data_job_urn import DataJobUrn
from datahub_provider._airflow_shims import ExternalTaskSensor, Operator
assert AIRFLOW_PATCHED
@ -23,6 +21,7 @@ if TYPE_CHECKING:
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub_provider._airflow_shims import Operator
def _task_downstream_task_ids(operator: "Operator") -> Set[str]:
@ -36,6 +35,8 @@ class AirflowGenerator:
def _get_dependencies(
task: "Operator", dag: "DAG", flow_urn: DataFlowUrn
) -> List[DataJobUrn]:
from datahub_provider._airflow_shims import ExternalTaskSensor
# resolve URNs for upstream nodes in subdags upstream of the current task.
upstream_subdag_task_urns: List[DataJobUrn] = []
@ -182,6 +183,8 @@ class AirflowGenerator:
@staticmethod
def _get_description(task: "Operator") -> Optional[str]:
from airflow.models.baseoperator import BaseOperator
if not isinstance(task, BaseOperator):
# TODO: Get docs for mapped operators.
return None