fix(ingest): default to unlimited query log delay in bigquery-usage (#2881)

This commit is contained in:
Harshal Sheth 2021-07-14 20:05:31 -07:00 committed by GitHub
parent dbf3a8fd83
commit fe6bfc9685
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 30 additions and 15 deletions

View File

@ -4,7 +4,7 @@ import logging
import re
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Counter, Dict, Iterable, List, Optional, Union
from typing import Any, Counter, Dict, Iterable, List, MutableMapping, Optional, Union
import cachetools
import pydantic
@ -218,7 +218,7 @@ class BigQueryUsageConfig(BaseUsageConfig):
extra_client_options: dict = {}
env: str = builder.DEFAULT_ENV
query_log_delay: pydantic.PositiveInt = 100
query_log_delay: Optional[pydantic.PositiveInt] = None
@dataclass
@ -300,11 +300,13 @@ class BigQueryUsageSource(Source):
def _join_events_by_job_id(
self, events: Iterable[Union[ReadEvent, QueryEvent]]
) -> Iterable[ReadEvent]:
# We only store the most recently used query events, which are used when
# resolving job information within the read events.
query_jobs: cachetools.LRUCache[str, QueryEvent] = cachetools.LRUCache(
maxsize=2 * self.config.query_log_delay
)
# If caching eviction is enabled, we only store the most recently used query events,
# which are used when resolving job information within the read events.
query_jobs: MutableMapping[str, QueryEvent]
if self.config.query_log_delay:
query_jobs = cachetools.LRUCache(maxsize=5 * self.config.query_log_delay)
else:
query_jobs = {}
def event_processor(
events: Iterable[Union[ReadEvent, QueryEvent]]
@ -318,7 +320,8 @@ class BigQueryUsageSource(Source):
# TRICKY: To account for the possibility that the query event arrives after
# the read event in the audit logs, we wait for at least `query_log_delay`
# additional events to be processed before attempting to resolve BigQuery
# job information from the logs.
# job information from the logs. If `query_log_delay` is None, it gets treated
# as an unlimited delay, which prioritizes correctness at the expense of memory usage.
original_read_events = event_processor(events)
delayed_read_events = delayed_iter(
original_read_events, self.config.query_log_delay

View File

@ -1,18 +1,19 @@
import collections
from typing import Deque, Iterable, TypeVar
from typing import Deque, Iterable, Optional, TypeVar
T = TypeVar("T")
def delayed_iter(iterable: Iterable[T], delay: int) -> Iterable[T]:
def delayed_iter(iterable: Iterable[T], delay: Optional[int]) -> Iterable[T]:
"""Waits to yield the i'th element until after the (i+n)'th element has been
materialized by the source iterator.
materialized by the source iterator. If delay is none, wait until the full
iterable has been materialized before yielding.
"""
cache: Deque[T] = collections.deque([], maxlen=delay)
for item in iterable:
if len(cache) >= delay:
if delay is not None and len(cache) >= delay:
yield cache.popleft()
cache.append(item)

View File

@ -4,12 +4,12 @@ from datahub.utilities.delayed_iter import delayed_iter
def test_delayed_iter():
events = []
def maker():
for i in range(4):
def maker(n):
for i in range(n):
events.append(("add", i))
yield i
for i in delayed_iter(maker(), 2):
for i in delayed_iter(maker(4), 2):
events.append(("remove", i))
assert events == [
@ -22,3 +22,14 @@ def test_delayed_iter():
("remove", 2),
("remove", 3),
]
events.clear()
for i in delayed_iter(maker(2), None):
events.append(("remove", i))
assert events == [
("add", 0),
("add", 1),
("remove", 0),
("remove", 1),
]