feat(ingestion): test GMS connections before ingestion (#2946)

This commit is contained in:
Harshal Sheth 2021-07-25 21:17:51 -07:00 committed by GitHub
parent 49093ea1ce
commit f08cf11a4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 11 additions and 0 deletions

View File

@ -9,6 +9,7 @@ from typing import Any, List, Optional, Union
import requests
from requests.exceptions import HTTPError, RequestException
from datahub import __package_name__
from datahub.configuration.common import OperationalError
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.com.linkedin.pegasus2avro.usage import UsageAggregation
@ -81,6 +82,15 @@ class DatahubRestEmitter:
if token:
self._session.headers.update({"Authorization": f"Bearer {token}"})
def test_connection(self) -> None:
response = self._session.get(f"{self._gms_server}/config")
response.raise_for_status()
config: dict = response.json()
if config.get("noCode") != "true":
raise ValueError(
f"This version of {__package_name__} requires GMS v0.8.0 or higher"
)
def emit(self, item: Union[MetadataChangeEvent, UsageAggregation]) -> None:
if isinstance(item, UsageAggregation):
return self.emit_usage(item)

View File

@ -30,6 +30,7 @@ class DatahubRestSink(Sink):
self.config = config
self.report = SinkReport()
self.emitter = DatahubRestEmitter(self.config.server, self.config.token)
self.emitter.test_connection()
@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "DatahubRestSink":