feat(ingest): optional custom headers REST emitter (#3290)

Co-authored-by: Adriaan Slechten <adriaan.slechten@klarna.com>
This commit is contained in:
adriaanslechten 2021-09-23 21:53:39 +02:00 committed by GitHub
parent ab270a98ee
commit 185f7e2f5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 18 additions and 2 deletions

View File

@ -36,6 +36,8 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
| -------- | -------- | ------- | ---------------------------- |
| `server` | ✅ | | URL of DataHub GMS endpoint. |
| `timeout_sec` | | 30 | Per-HTTP request timeout. |
| `token` | | | Bearer token used for authentication. |
| `extra_headers` | | | Extra headers which will be added to the request. |
## DataHub Kafka

View File

@ -3,7 +3,7 @@ import json
import logging
import shlex
from json.decoder import JSONDecodeError
from typing import List, Optional, Union
from typing import Dict, List, Optional, Union
import requests
from requests.exceptions import HTTPError, RequestException
@ -56,6 +56,7 @@ class DatahubRestEmitter:
token: Optional[str] = None,
connect_timeout_sec: Optional[float] = None,
read_timeout_sec: Optional[float] = None,
extra_headers: Optional[Dict[str, str]] = None,
):
if ":9002" in gms_server:
logger.warning(
@ -74,6 +75,9 @@ class DatahubRestEmitter:
if token:
self._session.headers.update({"Authorization": f"Bearer {token}"})
if extra_headers:
self._session.headers.update(extra_headers)
if connect_timeout_sec:
self._connect_timeout_sec = connect_timeout_sec

View File

@ -1,6 +1,6 @@
import logging
from dataclasses import dataclass
from typing import Optional, Union
from typing import Dict, Optional, Union
from datahub.configuration.common import ConfigModel, OperationalError
from datahub.emitter.mcp import MetadataChangeProposalWrapper
@ -22,6 +22,7 @@ class DatahubRestSinkConfig(ConfigModel):
server: str = "http://localhost:8080"
token: Optional[str]
timeout_sec: Optional[int]
extra_headers: Optional[Dict[str, str]]
@dataclass
@ -39,6 +40,7 @@ class DatahubRestSink(Sink):
self.config.token,
connect_timeout_sec=self.config.timeout_sec, # reuse timeout_sec for connect timeout
read_timeout_sec=self.config.timeout_sec,
extra_headers=self.config.extra_headers,
)
self.emitter.test_connection()

View File

@ -15,3 +15,11 @@ def test_datahub_rest_emitter_timeout_construction():
)
assert emitter._connect_timeout_sec == 2
assert emitter._read_timeout_sec == 4
def test_datahub_rest_emitter_extra_params():
emitter = DatahubRestEmitter(
MOCK_GMS_ENDPOINT, extra_headers={"key1": "value1", "key2": "value2"}
)
assert emitter._session.headers.get("key1") == "value1"
assert emitter._session.headers.get("key2") == "value2"