diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py index 63cea45f75..cb72441344 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py @@ -1,5 +1,5 @@ import logging -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from functools import partial from typing import Dict, Iterable, List, Optional @@ -26,6 +26,7 @@ from datahub.ingestion.source.state.stateful_ingestion_base import ( StatefulIngestionSourceBase, ) from datahub.metadata.schema_classes import ChangeTypeClass +from datahub.utilities.progress_timer import ProgressTimer logger = logging.getLogger(__name__) @@ -105,11 +106,17 @@ class DataHubSource(StatefulIngestionSourceBase): self, from_createdon: datetime, reader: DataHubDatabaseReader ) -> Iterable[MetadataWorkUnit]: logger.info(f"Fetching database aspects starting from {from_createdon}") + progress = ProgressTimer(report_every=timedelta(seconds=60)) mcps = reader.get_aspects(from_createdon, self.report.stop_time) for i, (mcp, createdon) in enumerate(mcps): if not self.urn_pattern.allowed(str(mcp.entityUrn)): continue + if progress.should_report(): + logger.info( + f"Ingested {i} database aspects so far, currently at {createdon}" + ) + yield mcp.as_workunit() self.report.num_database_aspects_ingested += 1