fix(ingestion): airflow - import emitters indirectly to avoid unneeded dependency (#4668)

This commit is contained in:
Tamas Nemeth 2022-04-14 19:22:16 +02:00 committed by GitHub
parent 8f6718104b
commit 61dc6e8723
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -7,8 +7,6 @@ from datahub.api.entities.dataprocess.dataprocess_instance import (
DataProcessInstance,
InstanceRunResult,
)
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import DataProcessTypeClass
from datahub.utilities.urns.data_flow_urn import DataFlowUrn
from datahub.utilities.urns.data_job_urn import DataJobUrn
@ -18,6 +16,9 @@ if TYPE_CHECKING:
from airflow import DAG
from airflow.models import BaseOperator, DagRun, TaskInstance
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter
from datahub.emitter.rest_emitter import DatahubRestEmitter
class AirflowGenerator:
@staticmethod
@ -253,7 +254,7 @@ class AirflowGenerator:
@staticmethod
def run_dataflow(
emitter: DatahubRestEmitter,
emitter: Union["DatahubRestEmitter", "DatahubKafkaEmitter"],
cluster: str,
dag_run: "DagRun",
start_timestamp_millis: Optional[int] = None,
@ -300,7 +301,7 @@ class AirflowGenerator:
@staticmethod
def complete_dataflow(
emitter: DatahubRestEmitter,
emitter: Union["DatahubRestEmitter", "DatahubKafkaEmitter"],
cluster: str,
dag_run: "DagRun",
end_timestamp_millis: Optional[int] = None,
@ -345,7 +346,7 @@ class AirflowGenerator:
@staticmethod
def run_datajob(
emitter: Union[DatahubRestEmitter, DatahubKafkaEmitter],
emitter: Union["DatahubRestEmitter", "DatahubKafkaEmitter"],
cluster: str,
ti: "TaskInstance",
dag: "DAG",
@ -416,7 +417,7 @@ class AirflowGenerator:
@staticmethod
def complete_datajob(
emitter: Union[DatahubRestEmitter, DatahubKafkaEmitter],
emitter: Union["DatahubRestEmitter", "DatahubKafkaEmitter"],
cluster: str,
ti: "TaskInstance",
dag: "DAG",