diff --git a/metadata-ingestion/src/datahub/emitter/rest_emitter.py b/metadata-ingestion/src/datahub/emitter/rest_emitter.py index 67ec370df2..fbde2b8b14 100644 --- a/metadata-ingestion/src/datahub/emitter/rest_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/rest_emitter.py @@ -9,6 +9,7 @@ from typing import Any, List, Optional, Union import requests from requests.exceptions import HTTPError, RequestException +from datahub import __package_name__ from datahub.configuration.common import OperationalError from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent from datahub.metadata.com.linkedin.pegasus2avro.usage import UsageAggregation @@ -81,6 +82,15 @@ class DatahubRestEmitter: if token: self._session.headers.update({"Authorization": f"Bearer {token}"}) + def test_connection(self) -> None: + response = self._session.get(f"{self._gms_server}/config") + response.raise_for_status() + config: dict = response.json() + if config.get("noCode") != "true": + raise ValueError( + f"This version of {__package_name__} requires GMS v0.8.0 or higher" + ) + def emit(self, item: Union[MetadataChangeEvent, UsageAggregation]) -> None: if isinstance(item, UsageAggregation): return self.emit_usage(item) diff --git a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py index 44af771532..f1ff603eb3 100644 --- a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py +++ b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py @@ -30,6 +30,7 @@ class DatahubRestSink(Sink): self.config = config self.report = SinkReport() self.emitter = DatahubRestEmitter(self.config.server, self.config.token) + self.emitter.test_connection() @classmethod def create(cls, config_dict: dict, ctx: PipelineContext) -> "DatahubRestSink":