From fcab544f1792dfb6c3b5261da1f8bdbcd7b28fd5 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 4 Jun 2024 09:06:48 -0700 Subject: [PATCH] feat(ingest): measure sink bottlenecking (#10628) --- .../datahub/ingestion/sink/datahub_rest.py | 56 ++++++++++--------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py index 007b7487cb..8572b2378a 100644 --- a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py +++ b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py @@ -1,9 +1,9 @@ import concurrent.futures import contextlib +import dataclasses import functools import logging import uuid -from dataclasses import dataclass from enum import auto from typing import Optional, Union @@ -29,6 +29,7 @@ from datahub.metadata.com.linkedin.pegasus2avro.mxe import ( MetadataChangeProposal, ) from datahub.utilities.advanced_thread_executor import PartitionExecutor +from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.server_config_util import set_gms_config logger = logging.getLogger(__name__) @@ -44,15 +45,17 @@ class DatahubRestSinkConfig(DatahubClientConfig): # These only apply in async mode. max_threads: int = 15 - max_pending_requests: int = 500 + max_pending_requests: int = 2000 -@dataclass +@dataclasses.dataclass class DataHubRestSinkReport(SinkReport): - max_threads: int = -1 - gms_version: str = "" + max_threads: Optional[int] = None + gms_version: Optional[str] = None pending_requests: int = 0 + main_thread_blocking_timer: PerfTimer = dataclasses.field(default_factory=PerfTimer) + def compute_stats(self) -> None: super().compute_stats() @@ -105,7 +108,7 @@ class DatahubRestSink(Sink[DatahubRestSinkConfig, DataHubRestSinkReport]): self.report.gms_version = ( gms_config.get("versions", {}) .get("acryldata/datahub", {}) - .get("version", "") + .get("version", None) ) self.report.max_threads = self.config.max_threads logger.debug("Setting env variables to override config") @@ -189,25 +192,28 @@ class DatahubRestSink(Sink[DatahubRestSinkConfig, DataHubRestSinkReport]): ], write_callback: WriteCallback, ) -> None: - record = record_envelope.record - if self.config.mode == SyncOrAsync.ASYNC: - partition_key = _get_partition_key(record_envelope) - self.executor.submit( - partition_key, - self._emit_wrapper, - record, - done_callback=functools.partial( - self._write_done_callback, record_envelope, write_callback - ), - ) - self.report.pending_requests += 1 - else: - # execute synchronously - try: - self._emit_wrapper(record) - write_callback.on_success(record_envelope, success_metadata={}) - except Exception as e: - write_callback.on_failure(record_envelope, e, failure_metadata={}) + # Because the default is async mode and most sources are slower than the sink, this + # should only have a high value if the sink is actually a bottleneck. + with self.report.main_thread_blocking_timer: + record = record_envelope.record + if self.config.mode == SyncOrAsync.ASYNC: + partition_key = _get_partition_key(record_envelope) + self.executor.submit( + partition_key, + self._emit_wrapper, + record, + done_callback=functools.partial( + self._write_done_callback, record_envelope, write_callback + ), + ) + self.report.pending_requests += 1 + else: + # execute synchronously + try: + self._emit_wrapper(record) + write_callback.on_success(record_envelope, success_metadata={}) + except Exception as e: + write_callback.on_failure(record_envelope, e, failure_metadata={}) def emit_async( self,