From 14bca4c2f6cdf57f8fe4c2ecb71bdd41fd596451 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 28 Jul 2021 14:23:06 -0700 Subject: [PATCH] fix(ingestion): isolate dependency requirements of airflow hooks (#2977) --- .../src/datahub_provider/hooks/datahub.py | 31 ++++++++++++------- metadata-ingestion/tests/unit/test_airflow.py | 4 +-- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/metadata-ingestion/src/datahub_provider/hooks/datahub.py b/metadata-ingestion/src/datahub_provider/hooks/datahub.py index 44887098c5..e463e3abcb 100644 --- a/metadata-ingestion/src/datahub_provider/hooks/datahub.py +++ b/metadata-ingestion/src/datahub_provider/hooks/datahub.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union from airflow.exceptions import AirflowException @@ -11,11 +11,14 @@ except ModuleNotFoundError: AIRFLOW_1 = True -from datahub.emitter.kafka_emitter import DatahubKafkaEmitter -from datahub.emitter.rest_emitter import DatahubRestEmitter -from datahub.ingestion.sink.datahub_kafka import KafkaSinkConfig from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent +if TYPE_CHECKING: + from datahub.emitter.kafka_emitter import DatahubKafkaEmitter + from datahub.emitter.rest_emitter import DatahubRestEmitter + from datahub.ingestion.sink.datahub_kafka import KafkaSinkConfig + + _default_hook_args = [] if AIRFLOW_1: _default_hook_args = [None] @@ -64,8 +67,10 @@ class DatahubRestHook(BaseHook): raise AirflowException("host parameter is required") return (host, conn.password) - def make_emitter(self) -> DatahubRestEmitter: - return DatahubRestEmitter(*self._get_config()) + def make_emitter(self) -> "DatahubRestEmitter": + import datahub.emitter.rest_emitter + + return datahub.emitter.rest_emitter.DatahubRestEmitter(*self._get_config()) def emit_mces(self, mces: List[MetadataChangeEvent]) -> None: emitter = self.make_emitter() @@ -110,7 +115,9 @@ class DatahubKafkaHook(BaseHook): }, } - def _get_config(self) -> KafkaSinkConfig: + def _get_config(self) -> "KafkaSinkConfig": + import datahub.ingestion.sink.datahub_kafka + conn = self.get_connection(self.datahub_kafka_conn_id) obj = conn.extra_dejson obj.setdefault("connection", {}) @@ -120,12 +127,14 @@ class DatahubKafkaHook(BaseHook): "Kafka broker specified twice (present in host and extra)" ) obj["connection"]["bootstrap"] = conn.host - config = KafkaSinkConfig.parse_obj(obj) + config = datahub.ingestion.sink.datahub_kafka.KafkaSinkConfig.parse_obj(obj) return config - def make_emitter(self) -> DatahubKafkaEmitter: + def make_emitter(self) -> "DatahubKafkaEmitter": + import datahub.emitter.kafka_emitter + sink_config = self._get_config() - return DatahubKafkaEmitter(sink_config) + return datahub.emitter.kafka_emitter.DatahubKafkaEmitter(sink_config) def emit_mces(self, mces: List[MetadataChangeEvent]) -> None: emitter = self.make_emitter() @@ -176,7 +185,7 @@ class DatahubGenericHook(BaseHook): f"DataHub cannot handle conn_type {conn.conn_type} in {conn}" ) - def make_emitter(self) -> Union[DatahubRestEmitter, DatahubKafkaEmitter]: + def make_emitter(self) -> Union["DatahubRestEmitter", "DatahubKafkaEmitter"]: return self.get_underlying_hook().make_emitter() def emit_mces(self, mces: List[MetadataChangeEvent]) -> None: diff --git a/metadata-ingestion/tests/unit/test_airflow.py b/metadata-ingestion/tests/unit/test_airflow.py index 71fddc266d..7839bfdb3d 100644 --- a/metadata-ingestion/tests/unit/test_airflow.py +++ b/metadata-ingestion/tests/unit/test_airflow.py @@ -91,7 +91,7 @@ def patch_airflow_connection(conn: Connection) -> Iterator[Connection]: yield conn -@mock.patch("datahub_provider.hooks.datahub.DatahubRestEmitter", autospec=True) +@mock.patch("datahub.emitter.rest_emitter.DatahubRestEmitter", autospec=True) def test_datahub_rest_hook(mock_emitter): with patch_airflow_connection(datahub_rest_connection_config) as config: hook = DatahubRestHook(config.conn_id) @@ -102,7 +102,7 @@ def test_datahub_rest_hook(mock_emitter): instance.emit_mce.assert_called_with(lineage_mce) -@mock.patch("datahub_provider.hooks.datahub.DatahubKafkaEmitter", autospec=True) +@mock.patch("datahub.emitter.kafka_emitter.DatahubKafkaEmitter", autospec=True) def test_datahub_kafka_hook(mock_emitter): with patch_airflow_connection(datahub_kafka_connection_config) as config: hook = DatahubKafkaHook(config.conn_id)