diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_usage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_usage.py index 6ec9700d81..fc417b7b03 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_usage.py @@ -4,7 +4,7 @@ import logging import re from dataclasses import dataclass from datetime import datetime -from typing import Any, Counter, Dict, Iterable, List, Optional, Union +from typing import Any, Counter, Dict, Iterable, List, MutableMapping, Optional, Union import cachetools import pydantic @@ -218,7 +218,7 @@ class BigQueryUsageConfig(BaseUsageConfig): extra_client_options: dict = {} env: str = builder.DEFAULT_ENV - query_log_delay: pydantic.PositiveInt = 100 + query_log_delay: Optional[pydantic.PositiveInt] = None @dataclass @@ -300,11 +300,13 @@ class BigQueryUsageSource(Source): def _join_events_by_job_id( self, events: Iterable[Union[ReadEvent, QueryEvent]] ) -> Iterable[ReadEvent]: - # We only store the most recently used query events, which are used when - # resolving job information within the read events. - query_jobs: cachetools.LRUCache[str, QueryEvent] = cachetools.LRUCache( - maxsize=2 * self.config.query_log_delay - ) + # If caching eviction is enabled, we only store the most recently used query events, + # which are used when resolving job information within the read events. + query_jobs: MutableMapping[str, QueryEvent] + if self.config.query_log_delay: + query_jobs = cachetools.LRUCache(maxsize=5 * self.config.query_log_delay) + else: + query_jobs = {} def event_processor( events: Iterable[Union[ReadEvent, QueryEvent]] @@ -318,7 +320,8 @@ class BigQueryUsageSource(Source): # TRICKY: To account for the possibility that the query event arrives after # the read event in the audit logs, we wait for at least `query_log_delay` # additional events to be processed before attempting to resolve BigQuery - # job information from the logs. + # job information from the logs. If `query_log_delay` is None, it gets treated + # as an unlimited delay, which prioritizes correctness at the expense of memory usage. original_read_events = event_processor(events) delayed_read_events = delayed_iter( original_read_events, self.config.query_log_delay diff --git a/metadata-ingestion/src/datahub/utilities/delayed_iter.py b/metadata-ingestion/src/datahub/utilities/delayed_iter.py index 72db989c66..689be91d7f 100644 --- a/metadata-ingestion/src/datahub/utilities/delayed_iter.py +++ b/metadata-ingestion/src/datahub/utilities/delayed_iter.py @@ -1,18 +1,19 @@ import collections -from typing import Deque, Iterable, TypeVar +from typing import Deque, Iterable, Optional, TypeVar T = TypeVar("T") -def delayed_iter(iterable: Iterable[T], delay: int) -> Iterable[T]: +def delayed_iter(iterable: Iterable[T], delay: Optional[int]) -> Iterable[T]: """Waits to yield the i'th element until after the (i+n)'th element has been - materialized by the source iterator. + materialized by the source iterator. If delay is none, wait until the full + iterable has been materialized before yielding. """ cache: Deque[T] = collections.deque([], maxlen=delay) for item in iterable: - if len(cache) >= delay: + if delay is not None and len(cache) >= delay: yield cache.popleft() cache.append(item) diff --git a/metadata-ingestion/tests/unit/test_utilities.py b/metadata-ingestion/tests/unit/test_utilities.py index 003a6a0d26..b881853f91 100644 --- a/metadata-ingestion/tests/unit/test_utilities.py +++ b/metadata-ingestion/tests/unit/test_utilities.py @@ -4,12 +4,12 @@ from datahub.utilities.delayed_iter import delayed_iter def test_delayed_iter(): events = [] - def maker(): - for i in range(4): + def maker(n): + for i in range(n): events.append(("add", i)) yield i - for i in delayed_iter(maker(), 2): + for i in delayed_iter(maker(4), 2): events.append(("remove", i)) assert events == [ @@ -22,3 +22,14 @@ def test_delayed_iter(): ("remove", 2), ("remove", 3), ] + + events.clear() + for i in delayed_iter(maker(2), None): + events.append(("remove", i)) + + assert events == [ + ("add", 0), + ("add", 1), + ("remove", 0), + ("remove", 1), + ]