mirror of
https://github.com/datahub-project/datahub.git
synced 2025-10-11 17:08:23 +00:00
fix(ingest/bigquery): increase logging in bigquery-queries extractor (#11774)
This commit is contained in:
parent
3b415cde69
commit
42bb07a35e
@ -428,6 +428,7 @@ class Pipeline:
|
||||
def _time_to_print(self) -> bool:
|
||||
self.num_intermediate_workunits += 1
|
||||
current_time = int(time.time())
|
||||
# TODO: Replace with ProgressTimer.
|
||||
if current_time - self.last_time_printed > _REPORT_PRINT_INTERVAL_SECONDS:
|
||||
# we print
|
||||
self.num_intermediate_workunits = 0
|
||||
|
@ -2,7 +2,7 @@ import functools
|
||||
import logging
|
||||
import pathlib
|
||||
import tempfile
|
||||
from datetime import datetime, timezone
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Collection, Dict, Iterable, List, Optional, TypedDict
|
||||
|
||||
from google.cloud.bigquery import Client
|
||||
@ -49,6 +49,7 @@ from datahub.utilities.file_backed_collections import (
|
||||
FileBackedDict,
|
||||
FileBackedList,
|
||||
)
|
||||
from datahub.utilities.progress_timer import ProgressTimer
|
||||
from datahub.utilities.time import datetime_to_ts_millis
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -270,27 +271,25 @@ class BigQueryQueriesExtractor(Closeable):
|
||||
# Preprocessing stage that deduplicates the queries using query hash per usage bucket
|
||||
# Note: FileBackedDict is an ordered dictionary, so the order of execution of
|
||||
# queries is inherently maintained
|
||||
queries_deduped: FileBackedDict[Dict[int, ObservedQuery]]
|
||||
queries_deduped = self.deduplicate_queries(queries)
|
||||
queries_deduped: FileBackedDict[
|
||||
Dict[int, ObservedQuery]
|
||||
] = self.deduplicate_queries(queries)
|
||||
self.report.num_unique_queries = len(queries_deduped)
|
||||
logger.info(f"Found {self.report.num_unique_queries} unique queries")
|
||||
|
||||
with self.report.audit_log_load_timer, queries_deduped:
|
||||
last_log_time = datetime.now()
|
||||
last_report_time = datetime.now()
|
||||
log_timer = ProgressTimer(timedelta(minutes=1))
|
||||
report_timer = ProgressTimer(timedelta(minutes=5))
|
||||
|
||||
for i, (_, query_instances) in enumerate(queries_deduped.items()):
|
||||
for query in query_instances.values():
|
||||
now = datetime.now()
|
||||
if (now - last_log_time).total_seconds() >= 60:
|
||||
if log_timer.should_report():
|
||||
logger.info(
|
||||
f"Added {i} deduplicated query log entries to SQL aggregator"
|
||||
)
|
||||
last_log_time = now
|
||||
|
||||
if (now - last_report_time).total_seconds() >= 300:
|
||||
if self.report.sql_aggregator:
|
||||
if report_timer.should_report() and self.report.sql_aggregator:
|
||||
logger.info(self.report.sql_aggregator.as_string())
|
||||
last_report_time = now
|
||||
|
||||
self.aggregator.add(query)
|
||||
|
||||
|
34
metadata-ingestion/src/datahub/utilities/progress_timer.py
Normal file
34
metadata-ingestion/src/datahub/utilities/progress_timer.py
Normal file
@ -0,0 +1,34 @@
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
|
||||
class ProgressTimer:
|
||||
def __init__(self, report_every: timedelta, report_0: bool = False):
|
||||
"""A helper for reporting progress at a given time interval.
|
||||
|
||||
Should be used for long-running processes that iterate over a large number of items,
|
||||
but each iteration is fast.
|
||||
|
||||
Args:
|
||||
report_every: The time interval between progress reports.
|
||||
report_0: Whether to report progress on the first iteration.
|
||||
"""
|
||||
|
||||
self._report_every = report_every
|
||||
|
||||
if report_0:
|
||||
# Use the earliest possible time to force reporting on the first iteration.
|
||||
self._last_report_time = datetime.min.replace(tzinfo=timezone.utc)
|
||||
else:
|
||||
self._last_report_time = self._now()
|
||||
|
||||
def _now(self) -> datetime:
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
def should_report(self) -> bool:
|
||||
current_time = self._now()
|
||||
|
||||
should_report = (self._last_report_time + self._report_every) <= current_time
|
||||
if should_report:
|
||||
self._last_report_time = current_time
|
||||
|
||||
return should_report
|
@ -0,0 +1,53 @@
|
||||
from datetime import timedelta
|
||||
from time import sleep
|
||||
|
||||
from datahub.utilities.progress_timer import ProgressTimer
|
||||
|
||||
|
||||
def test_progress_timer_basic():
|
||||
timer = ProgressTimer(report_every=timedelta(milliseconds=100))
|
||||
|
||||
# First call should not report since report_0=False by default
|
||||
assert not timer.should_report()
|
||||
|
||||
# Call before interval elapsed should not report
|
||||
sleep(0.05) # 50ms
|
||||
assert not timer.should_report()
|
||||
|
||||
# Call after interval elapsed should report
|
||||
sleep(0.1) # Additional 100ms
|
||||
assert timer.should_report()
|
||||
|
||||
# Next immediate call should not report
|
||||
assert not timer.should_report()
|
||||
|
||||
|
||||
def test_progress_timer_with_report_0():
|
||||
timer = ProgressTimer(report_every=timedelta(milliseconds=100), report_0=True)
|
||||
|
||||
# First call should report since report_0=True
|
||||
assert timer.should_report()
|
||||
|
||||
# Next immediate call should not report
|
||||
assert not timer.should_report()
|
||||
|
||||
# Call after interval elapsed should report
|
||||
sleep(0.1) # 100ms
|
||||
assert timer.should_report()
|
||||
|
||||
|
||||
def test_progress_timer_multiple_intervals():
|
||||
timer = ProgressTimer(report_every=timedelta(milliseconds=50))
|
||||
|
||||
# First call should not report
|
||||
assert not timer.should_report()
|
||||
|
||||
# Check multiple intervals
|
||||
sleep(0.06) # 60ms - should report
|
||||
assert timer.should_report()
|
||||
|
||||
sleep(0.02) # 20ms - should not report
|
||||
assert not timer.should_report()
|
||||
|
||||
sleep(0.05) # 50ms - should report
|
||||
assert timer.should_report()
|
Loading…
x
Reference in New Issue
Block a user