diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index 2e56d5866e..7c3a42c3e0 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -428,6 +428,7 @@ class Pipeline: def _time_to_print(self) -> bool: self.num_intermediate_workunits += 1 current_time = int(time.time()) + # TODO: Replace with ProgressTimer. if current_time - self.last_time_printed > _REPORT_PRINT_INTERVAL_SECONDS: # we print self.num_intermediate_workunits = 0 diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py index 91d55ad879..08c9beaa73 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py @@ -2,7 +2,7 @@ import functools import logging import pathlib import tempfile -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from typing import Collection, Dict, Iterable, List, Optional, TypedDict from google.cloud.bigquery import Client @@ -49,6 +49,7 @@ from datahub.utilities.file_backed_collections import ( FileBackedDict, FileBackedList, ) +from datahub.utilities.progress_timer import ProgressTimer from datahub.utilities.time import datetime_to_ts_millis logger = logging.getLogger(__name__) @@ -270,27 +271,25 @@ class BigQueryQueriesExtractor(Closeable): # Preprocessing stage that deduplicates the queries using query hash per usage bucket # Note: FileBackedDict is an ordered dictionary, so the order of execution of # queries is inherently maintained - queries_deduped: FileBackedDict[Dict[int, ObservedQuery]] - queries_deduped = self.deduplicate_queries(queries) + queries_deduped: FileBackedDict[ + Dict[int, ObservedQuery] + ] = self.deduplicate_queries(queries) self.report.num_unique_queries = len(queries_deduped) logger.info(f"Found {self.report.num_unique_queries} unique queries") with self.report.audit_log_load_timer, queries_deduped: - last_log_time = datetime.now() - last_report_time = datetime.now() + log_timer = ProgressTimer(timedelta(minutes=1)) + report_timer = ProgressTimer(timedelta(minutes=5)) + for i, (_, query_instances) in enumerate(queries_deduped.items()): for query in query_instances.values(): - now = datetime.now() - if (now - last_log_time).total_seconds() >= 60: + if log_timer.should_report(): logger.info( f"Added {i} deduplicated query log entries to SQL aggregator" ) - last_log_time = now - if (now - last_report_time).total_seconds() >= 300: - if self.report.sql_aggregator: - logger.info(self.report.sql_aggregator.as_string()) - last_report_time = now + if report_timer.should_report() and self.report.sql_aggregator: + logger.info(self.report.sql_aggregator.as_string()) self.aggregator.add(query) diff --git a/metadata-ingestion/src/datahub/utilities/progress_timer.py b/metadata-ingestion/src/datahub/utilities/progress_timer.py new file mode 100644 index 0000000000..eac62cddb5 --- /dev/null +++ b/metadata-ingestion/src/datahub/utilities/progress_timer.py @@ -0,0 +1,34 @@ +from datetime import datetime, timedelta, timezone + + +class ProgressTimer: + def __init__(self, report_every: timedelta, report_0: bool = False): + """A helper for reporting progress at a given time interval. + + Should be used for long-running processes that iterate over a large number of items, + but each iteration is fast. + + Args: + report_every: The time interval between progress reports. + report_0: Whether to report progress on the first iteration. + """ + + self._report_every = report_every + + if report_0: + # Use the earliest possible time to force reporting on the first iteration. + self._last_report_time = datetime.min.replace(tzinfo=timezone.utc) + else: + self._last_report_time = self._now() + + def _now(self) -> datetime: + return datetime.now(timezone.utc) + + def should_report(self) -> bool: + current_time = self._now() + + should_report = (self._last_report_time + self._report_every) <= current_time + if should_report: + self._last_report_time = current_time + + return should_report diff --git a/metadata-ingestion/tests/unit/utilities/test_progress_timer.py b/metadata-ingestion/tests/unit/utilities/test_progress_timer.py new file mode 100644 index 0000000000..139bad371b --- /dev/null +++ b/metadata-ingestion/tests/unit/utilities/test_progress_timer.py @@ -0,0 +1,53 @@ +from datetime import timedelta +from time import sleep + +from datahub.utilities.progress_timer import ProgressTimer + + +def test_progress_timer_basic(): + timer = ProgressTimer(report_every=timedelta(milliseconds=100)) + + # First call should not report since report_0=False by default + assert not timer.should_report() + + # Call before interval elapsed should not report + sleep(0.05) # 50ms + assert not timer.should_report() + + # Call after interval elapsed should report + sleep(0.1) # Additional 100ms + assert timer.should_report() + + # Next immediate call should not report + assert not timer.should_report() + + +def test_progress_timer_with_report_0(): + timer = ProgressTimer(report_every=timedelta(milliseconds=100), report_0=True) + + # First call should report since report_0=True + assert timer.should_report() + + # Next immediate call should not report + assert not timer.should_report() + + # Call after interval elapsed should report + sleep(0.1) # 100ms + assert timer.should_report() + + +def test_progress_timer_multiple_intervals(): + timer = ProgressTimer(report_every=timedelta(milliseconds=50)) + + # First call should not report + assert not timer.should_report() + + # Check multiple intervals + sleep(0.06) # 60ms - should report + assert timer.should_report() + + sleep(0.02) # 20ms - should not report + assert not timer.should_report() + + sleep(0.05) # 50ms - should report + assert timer.should_report()