diff --git a/metadata-ingestion/sink_docs/datahub.md b/metadata-ingestion/sink_docs/datahub.md index ed130b2540..984f185bb4 100644 --- a/metadata-ingestion/sink_docs/datahub.md +++ b/metadata-ingestion/sink_docs/datahub.md @@ -10,7 +10,7 @@ To install this plugin, run `pip install 'acryl-datahub[datahub-rest]'`. ### Capabilities -Pushes metadata to DataHub using the GMA rest API. The advantage of the rest-based interface +Pushes metadata to DataHub using the GMS REST API. The advantage of the REST-based interface is that any errors can immediately be reported. ### Quickstart recipe @@ -35,6 +35,7 @@ Note that a `.` is used to denote nested fields in the YAML recipe. | Field | Required | Default | Description | | -------- | -------- | ------- | ---------------------------- | | `server` | ✅ | | URL of DataHub GMS endpoint. | +| `timeout_sec` | | 2 | Per-HTTP request timeout. | ## DataHub Kafka diff --git a/metadata-ingestion/src/datahub/emitter/rest_emitter.py b/metadata-ingestion/src/datahub/emitter/rest_emitter.py index bcbd5a76b9..10b183b5f4 100644 --- a/metadata-ingestion/src/datahub/emitter/rest_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/rest_emitter.py @@ -39,11 +39,24 @@ def _make_curl_command( class DatahubRestEmitter: + DEFAULT_CONNECT_TIMEOUT_SEC = 10 # 10 seconds should be plenty to connect + DEFAULT_READ_TIMEOUT_SEC = ( + 2 # Any ingest call taking longer than 2 seconds should be abandoned + ) + _gms_server: str _token: Optional[str] _session: requests.Session + _connect_timeout_sec: float = DEFAULT_CONNECT_TIMEOUT_SEC + _read_timeout_sec: float = DEFAULT_READ_TIMEOUT_SEC - def __init__(self, gms_server: str, token: Optional[str] = None): + def __init__( + self, + gms_server: str, + token: Optional[str] = None, + connect_timeout_sec: Optional[float] = None, + read_timeout_sec: Optional[float] = None, + ): if ":9002" in gms_server: logger.warning( "the rest emitter should connect to GMS (usually port 8080) instead of frontend" @@ -61,6 +74,17 @@ class DatahubRestEmitter: if token: self._session.headers.update({"Authorization": f"Bearer {token}"}) + if connect_timeout_sec: + self._connect_timeout_sec = connect_timeout_sec + + if read_timeout_sec: + self._read_timeout_sec = read_timeout_sec + + if self._connect_timeout_sec < 1 or self._read_timeout_sec < 1: + logger.warning( + f"Setting timeout values lower than 1 second is not recommended. Your configuration is connect_timeout:{self._connect_timeout_sec}s, read_timeout:{self._read_timeout_sec}s" + ) + def test_connection(self) -> None: response = self._session.get(f"{self._gms_server}/config") response.raise_for_status() @@ -139,7 +163,11 @@ class DatahubRestEmitter: curl_command, ) try: - response = self._session.post(url, data=payload) + response = self._session.post( + url, + data=payload, + timeout=(self._connect_timeout_sec, self._read_timeout_sec), + ) response.raise_for_status() except HTTPError as e: diff --git a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py index 25ef58d925..c4798bdaf3 100644 --- a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py +++ b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py @@ -21,6 +21,7 @@ class DatahubRestSinkConfig(ConfigModel): server: str = "http://localhost:8080" token: Optional[str] + timeout_sec: Optional[int] @dataclass @@ -33,7 +34,12 @@ class DatahubRestSink(Sink): super().__init__(ctx) self.config = config self.report = SinkReport() - self.emitter = DatahubRestEmitter(self.config.server, self.config.token) + self.emitter = DatahubRestEmitter( + self.config.server, + self.config.token, + connect_timeout_sec=self.config.timeout_sec, # reuse timeout_sec for connect timeout + read_timeout_sec=self.config.timeout_sec, + ) self.emitter.test_connection() @classmethod diff --git a/metadata-ingestion/src/datahub_provider/hooks/datahub.py b/metadata-ingestion/src/datahub_provider/hooks/datahub.py index e463e3abcb..09f7472abd 100644 --- a/metadata-ingestion/src/datahub_provider/hooks/datahub.py +++ b/metadata-ingestion/src/datahub_provider/hooks/datahub.py @@ -60,12 +60,13 @@ class DatahubRestHook(BaseHook): }, } - def _get_config(self) -> Tuple[str, Optional[str]]: + def _get_config(self) -> Tuple[str, Optional[str], Optional[int]]: conn = self.get_connection(self.datahub_rest_conn_id) host = conn.host if host is None: raise AirflowException("host parameter is required") - return (host, conn.password) + timeout_sec = conn.extra_dejson.get("timeout_sec") + return (host, conn.password, timeout_sec) def make_emitter(self) -> "DatahubRestEmitter": import datahub.emitter.rest_emitter diff --git a/metadata-ingestion/tests/unit/test_airflow.py b/metadata-ingestion/tests/unit/test_airflow.py index 7839bfdb3d..ef25f1844d 100644 --- a/metadata-ingestion/tests/unit/test_airflow.py +++ b/metadata-ingestion/tests/unit/test_airflow.py @@ -37,6 +37,13 @@ datahub_rest_connection_config = Connection( host="http://test_host:8080/", extra=None, ) +datahub_rest_connection_config_with_timeout = Connection( + conn_id="datahub_rest_test", + conn_type="datahub_rest", + host="http://test_host:8080/", + extra=json.dumps({"timeout_sec": 5}), +) + datahub_kafka_connection_config = Connection( conn_id="datahub_kafka_test", conn_type="datahub_kafka", @@ -97,7 +104,20 @@ def test_datahub_rest_hook(mock_emitter): hook = DatahubRestHook(config.conn_id) hook.emit_mces([lineage_mce]) - mock_emitter.assert_called_once_with(config.host, None) + mock_emitter.assert_called_once_with(config.host, None, None) + instance = mock_emitter.return_value + instance.emit_mce.assert_called_with(lineage_mce) + + +@mock.patch("datahub.emitter.rest_emitter.DatahubRestEmitter", autospec=True) +def test_datahub_rest_hook_with_timeout(mock_emitter): + with patch_airflow_connection( + datahub_rest_connection_config_with_timeout + ) as config: + hook = DatahubRestHook(config.conn_id) + hook.emit_mces([lineage_mce]) + + mock_emitter.assert_called_once_with(config.host, None, 5) instance = mock_emitter.return_value instance.emit_mce.assert_called_with(lineage_mce) diff --git a/metadata-ingestion/tests/unit/test_rest_emitter.py b/metadata-ingestion/tests/unit/test_rest_emitter.py new file mode 100644 index 0000000000..b385963b2e --- /dev/null +++ b/metadata-ingestion/tests/unit/test_rest_emitter.py @@ -0,0 +1,17 @@ +from datahub.emitter.rest_emitter import DatahubRestEmitter + +MOCK_GMS_ENDPOINT = "http://fakegmshost:8080" + + +def test_datahub_rest_emitter_construction(): + emitter = DatahubRestEmitter(MOCK_GMS_ENDPOINT) + assert emitter._connect_timeout_sec == emitter.DEFAULT_CONNECT_TIMEOUT_SEC + assert emitter._read_timeout_sec == emitter.DEFAULT_READ_TIMEOUT_SEC + + +def test_datahub_rest_emitter_timeout_construction(): + emitter = DatahubRestEmitter( + MOCK_GMS_ENDPOINT, connect_timeout_sec=2, read_timeout_sec=4 + ) + assert emitter._connect_timeout_sec == 2 + assert emitter._read_timeout_sec == 4