fix(ingest): add default configurable timeout for rest emitter (#3055)

This commit is contained in:
Dexter Lee 2021-08-08 22:30:55 -07:00 committed by GitHub
parent 8a23d0eac9
commit 8316316daa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 80 additions and 7 deletions

View File

@ -10,7 +10,7 @@ To install this plugin, run `pip install 'acryl-datahub[datahub-rest]'`.
### Capabilities
Pushes metadata to DataHub using the GMA rest API. The advantage of the rest-based interface
Pushes metadata to DataHub using the GMS REST API. The advantage of the REST-based interface
is that any errors can immediately be reported.
### Quickstart recipe
@ -35,6 +35,7 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
| Field | Required | Default | Description |
| -------- | -------- | ------- | ---------------------------- |
| `server` | ✅ | | URL of DataHub GMS endpoint. |
| `timeout_sec` | | 2 | Per-HTTP request timeout. |
## DataHub Kafka

View File

@ -39,11 +39,24 @@ def _make_curl_command(
class DatahubRestEmitter:
DEFAULT_CONNECT_TIMEOUT_SEC = 10 # 10 seconds should be plenty to connect
DEFAULT_READ_TIMEOUT_SEC = (
2 # Any ingest call taking longer than 2 seconds should be abandoned
)
_gms_server: str
_token: Optional[str]
_session: requests.Session
_connect_timeout_sec: float = DEFAULT_CONNECT_TIMEOUT_SEC
_read_timeout_sec: float = DEFAULT_READ_TIMEOUT_SEC
def __init__(self, gms_server: str, token: Optional[str] = None):
def __init__(
self,
gms_server: str,
token: Optional[str] = None,
connect_timeout_sec: Optional[float] = None,
read_timeout_sec: Optional[float] = None,
):
if ":9002" in gms_server:
logger.warning(
"the rest emitter should connect to GMS (usually port 8080) instead of frontend"
@ -61,6 +74,17 @@ class DatahubRestEmitter:
if token:
self._session.headers.update({"Authorization": f"Bearer {token}"})
if connect_timeout_sec:
self._connect_timeout_sec = connect_timeout_sec
if read_timeout_sec:
self._read_timeout_sec = read_timeout_sec
if self._connect_timeout_sec < 1 or self._read_timeout_sec < 1:
logger.warning(
f"Setting timeout values lower than 1 second is not recommended. Your configuration is connect_timeout:{self._connect_timeout_sec}s, read_timeout:{self._read_timeout_sec}s"
)
def test_connection(self) -> None:
response = self._session.get(f"{self._gms_server}/config")
response.raise_for_status()
@ -139,7 +163,11 @@ class DatahubRestEmitter:
curl_command,
)
try:
response = self._session.post(url, data=payload)
response = self._session.post(
url,
data=payload,
timeout=(self._connect_timeout_sec, self._read_timeout_sec),
)
response.raise_for_status()
except HTTPError as e:

View File

@ -21,6 +21,7 @@ class DatahubRestSinkConfig(ConfigModel):
server: str = "http://localhost:8080"
token: Optional[str]
timeout_sec: Optional[int]
@dataclass
@ -33,7 +34,12 @@ class DatahubRestSink(Sink):
super().__init__(ctx)
self.config = config
self.report = SinkReport()
self.emitter = DatahubRestEmitter(self.config.server, self.config.token)
self.emitter = DatahubRestEmitter(
self.config.server,
self.config.token,
connect_timeout_sec=self.config.timeout_sec, # reuse timeout_sec for connect timeout
read_timeout_sec=self.config.timeout_sec,
)
self.emitter.test_connection()
@classmethod

View File

@ -60,12 +60,13 @@ class DatahubRestHook(BaseHook):
},
}
def _get_config(self) -> Tuple[str, Optional[str]]:
def _get_config(self) -> Tuple[str, Optional[str], Optional[int]]:
conn = self.get_connection(self.datahub_rest_conn_id)
host = conn.host
if host is None:
raise AirflowException("host parameter is required")
return (host, conn.password)
timeout_sec = conn.extra_dejson.get("timeout_sec")
return (host, conn.password, timeout_sec)
def make_emitter(self) -> "DatahubRestEmitter":
import datahub.emitter.rest_emitter

View File

@ -37,6 +37,13 @@ datahub_rest_connection_config = Connection(
host="http://test_host:8080/",
extra=None,
)
datahub_rest_connection_config_with_timeout = Connection(
conn_id="datahub_rest_test",
conn_type="datahub_rest",
host="http://test_host:8080/",
extra=json.dumps({"timeout_sec": 5}),
)
datahub_kafka_connection_config = Connection(
conn_id="datahub_kafka_test",
conn_type="datahub_kafka",
@ -97,7 +104,20 @@ def test_datahub_rest_hook(mock_emitter):
hook = DatahubRestHook(config.conn_id)
hook.emit_mces([lineage_mce])
mock_emitter.assert_called_once_with(config.host, None)
mock_emitter.assert_called_once_with(config.host, None, None)
instance = mock_emitter.return_value
instance.emit_mce.assert_called_with(lineage_mce)
@mock.patch("datahub.emitter.rest_emitter.DatahubRestEmitter", autospec=True)
def test_datahub_rest_hook_with_timeout(mock_emitter):
with patch_airflow_connection(
datahub_rest_connection_config_with_timeout
) as config:
hook = DatahubRestHook(config.conn_id)
hook.emit_mces([lineage_mce])
mock_emitter.assert_called_once_with(config.host, None, 5)
instance = mock_emitter.return_value
instance.emit_mce.assert_called_with(lineage_mce)

View File

@ -0,0 +1,17 @@
from datahub.emitter.rest_emitter import DatahubRestEmitter
MOCK_GMS_ENDPOINT = "http://fakegmshost:8080"
def test_datahub_rest_emitter_construction():
emitter = DatahubRestEmitter(MOCK_GMS_ENDPOINT)
assert emitter._connect_timeout_sec == emitter.DEFAULT_CONNECT_TIMEOUT_SEC
assert emitter._read_timeout_sec == emitter.DEFAULT_READ_TIMEOUT_SEC
def test_datahub_rest_emitter_timeout_construction():
emitter = DatahubRestEmitter(
MOCK_GMS_ENDPOINT, connect_timeout_sec=2, read_timeout_sec=4
)
assert emitter._connect_timeout_sec == 2
assert emitter._read_timeout_sec == 4