diff --git a/ingestion/src/metadata/ingestion/api/bulk_sink.py b/ingestion/src/metadata/ingestion/api/bulk_sink.py index 9a2fbef47c2..5cbcd022812 100644 --- a/ingestion/src/metadata/ingestion/api/bulk_sink.py +++ b/ingestion/src/metadata/ingestion/api/bulk_sink.py @@ -20,12 +20,12 @@ from .status import Status @dataclass class BulkSinkStatus(Status): - records = 0 + records: List[Any] = field(default_factory=list) warnings: List[Any] = field(default_factory=list) failures: List[Any] = field(default_factory=list) - def records_written(self, records: int): - self.records += records + def records_written(self, record: Any) -> None: + self.records.append(record) def warning(self, info: Any) -> None: self.warnings.append(info) diff --git a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py index 1f0c590fdb6..eb3d46c84ae 100644 --- a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py +++ b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py @@ -87,6 +87,14 @@ class MetadataUsageBulkSink(BulkSink): ) try: self.metadata.publish_table_usage(table_entity, table_usage_request) + + logger.info( + f"Successfully table usage published for {record.table.fullyQualifiedName}" + ) + self.status.records_written( + f"Table: {record.table.fullyQualifiedName}" + ) + except Exception as err: self.status.failures.append(table_usage_request) logger.error( @@ -94,6 +102,7 @@ class MetadataUsageBulkSink(BulkSink): table_usage.table, err ) ) + self.status.failures.append(f"Table: {table_usage.table}") table_join_request = self.__get_table_joins(table_usage) logger.debug("table join request {}".format(table_join_request)) try: @@ -118,6 +127,8 @@ class MetadataUsageBulkSink(BulkSink): table_usage.table, table_usage.database ) ) + self.status.warnings.append(f"Table: {table_usage.table}") + try: self.metadata.compute_percentile(Table, self.today) self.metadata.compute_percentile(Database, self.today) diff --git a/ingestion/src/metadata/ingestion/stage/table_usage.py b/ingestion/src/metadata/ingestion/stage/table_usage.py index 171c6c57762..63023e1f6fe 100644 --- a/ingestion/src/metadata/ingestion/stage/table_usage.py +++ b/ingestion/src/metadata/ingestion/stage/table_usage.py @@ -114,7 +114,12 @@ class TableUsageStage(Stage[QueryParserData]): ) except Exception as exc: logger.error("Error in staging record {}".format(exc)) + self.status.failures( + f"Table: {table}", "Error in staging record {}".format(exc) + ) self.table_usage[table] = table_usage_count + logger.info(f"Successfully record staged for {table}") + self.status.records_status(f"Table: {table}") def get_status(self): return self.status