feat(ingest/datahub): report progress on db ingestion (#12117)

This commit is contained in:
Harshal Sheth 2024-12-17 03:58:47 -05:00 committed by GitHub
parent 8f9659fadf
commit d2359e259a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,5 +1,5 @@
import logging import logging
from datetime import datetime, timezone from datetime import datetime, timedelta, timezone
from functools import partial from functools import partial
from typing import Dict, Iterable, List, Optional from typing import Dict, Iterable, List, Optional
@ -26,6 +26,7 @@ from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionSourceBase, StatefulIngestionSourceBase,
) )
from datahub.metadata.schema_classes import ChangeTypeClass from datahub.metadata.schema_classes import ChangeTypeClass
from datahub.utilities.progress_timer import ProgressTimer
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -105,11 +106,17 @@ class DataHubSource(StatefulIngestionSourceBase):
self, from_createdon: datetime, reader: DataHubDatabaseReader self, from_createdon: datetime, reader: DataHubDatabaseReader
) -> Iterable[MetadataWorkUnit]: ) -> Iterable[MetadataWorkUnit]:
logger.info(f"Fetching database aspects starting from {from_createdon}") logger.info(f"Fetching database aspects starting from {from_createdon}")
progress = ProgressTimer(report_every=timedelta(seconds=60))
mcps = reader.get_aspects(from_createdon, self.report.stop_time) mcps = reader.get_aspects(from_createdon, self.report.stop_time)
for i, (mcp, createdon) in enumerate(mcps): for i, (mcp, createdon) in enumerate(mcps):
if not self.urn_pattern.allowed(str(mcp.entityUrn)): if not self.urn_pattern.allowed(str(mcp.entityUrn)):
continue continue
if progress.should_report():
logger.info(
f"Ingested {i} database aspects so far, currently at {createdon}"
)
yield mcp.as_workunit() yield mcp.as_workunit()
self.report.num_database_aspects_ingested += 1 self.report.num_database_aspects_ingested += 1