fix(ingestion): isolate dependency requirements of airflow hooks (#2977)

This commit is contained in:
Harshal Sheth 2021-07-28 14:23:06 -07:00 committed by GitHub
parent c32ba3c5ef
commit 14bca4c2f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 13 deletions

View File

@ -1,4 +1,4 @@
from typing import Any, Dict, List, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
from airflow.exceptions import AirflowException
@ -11,11 +11,14 @@ except ModuleNotFoundError:
AIRFLOW_1 = True
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.ingestion.sink.datahub_kafka import KafkaSinkConfig
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
if TYPE_CHECKING:
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.ingestion.sink.datahub_kafka import KafkaSinkConfig
_default_hook_args = []
if AIRFLOW_1:
_default_hook_args = [None]
@ -64,8 +67,10 @@ class DatahubRestHook(BaseHook):
raise AirflowException("host parameter is required")
return (host, conn.password)
def make_emitter(self) -> DatahubRestEmitter:
return DatahubRestEmitter(*self._get_config())
def make_emitter(self) -> "DatahubRestEmitter":
import datahub.emitter.rest_emitter
return datahub.emitter.rest_emitter.DatahubRestEmitter(*self._get_config())
def emit_mces(self, mces: List[MetadataChangeEvent]) -> None:
emitter = self.make_emitter()
@ -110,7 +115,9 @@ class DatahubKafkaHook(BaseHook):
},
}
def _get_config(self) -> KafkaSinkConfig:
def _get_config(self) -> "KafkaSinkConfig":
import datahub.ingestion.sink.datahub_kafka
conn = self.get_connection(self.datahub_kafka_conn_id)
obj = conn.extra_dejson
obj.setdefault("connection", {})
@ -120,12 +127,14 @@ class DatahubKafkaHook(BaseHook):
"Kafka broker specified twice (present in host and extra)"
)
obj["connection"]["bootstrap"] = conn.host
config = KafkaSinkConfig.parse_obj(obj)
config = datahub.ingestion.sink.datahub_kafka.KafkaSinkConfig.parse_obj(obj)
return config
def make_emitter(self) -> DatahubKafkaEmitter:
def make_emitter(self) -> "DatahubKafkaEmitter":
import datahub.emitter.kafka_emitter
sink_config = self._get_config()
return DatahubKafkaEmitter(sink_config)
return datahub.emitter.kafka_emitter.DatahubKafkaEmitter(sink_config)
def emit_mces(self, mces: List[MetadataChangeEvent]) -> None:
emitter = self.make_emitter()
@ -176,7 +185,7 @@ class DatahubGenericHook(BaseHook):
f"DataHub cannot handle conn_type {conn.conn_type} in {conn}"
)
def make_emitter(self) -> Union[DatahubRestEmitter, DatahubKafkaEmitter]:
def make_emitter(self) -> Union["DatahubRestEmitter", "DatahubKafkaEmitter"]:
return self.get_underlying_hook().make_emitter()
def emit_mces(self, mces: List[MetadataChangeEvent]) -> None:

View File

@ -91,7 +91,7 @@ def patch_airflow_connection(conn: Connection) -> Iterator[Connection]:
yield conn
@mock.patch("datahub_provider.hooks.datahub.DatahubRestEmitter", autospec=True)
@mock.patch("datahub.emitter.rest_emitter.DatahubRestEmitter", autospec=True)
def test_datahub_rest_hook(mock_emitter):
with patch_airflow_connection(datahub_rest_connection_config) as config:
hook = DatahubRestHook(config.conn_id)
@ -102,7 +102,7 @@ def test_datahub_rest_hook(mock_emitter):
instance.emit_mce.assert_called_with(lineage_mce)
@mock.patch("datahub_provider.hooks.datahub.DatahubKafkaEmitter", autospec=True)
@mock.patch("datahub.emitter.kafka_emitter.DatahubKafkaEmitter", autospec=True)
def test_datahub_kafka_hook(mock_emitter):
with patch_airflow_connection(datahub_kafka_connection_config) as config:
hook = DatahubKafkaHook(config.conn_id)