Usage status for stage and bulk sink added (#3605)

* Usage status for stage and bulk sink added

* record-list-added-in-bulksink-usage-status
This commit is contained in:
codingwithabhi 2022-03-23 16:59:42 +05:30 committed by GitHub
parent d24b5e9c50
commit 49c089e4a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 19 additions and 3 deletions

View File

@ -20,12 +20,12 @@ from .status import Status
@dataclass
class BulkSinkStatus(Status):
records = 0
records: List[Any] = field(default_factory=list)
warnings: List[Any] = field(default_factory=list)
failures: List[Any] = field(default_factory=list)
def records_written(self, records: int):
self.records += records
def records_written(self, record: Any) -> None:
self.records.append(record)
def warning(self, info: Any) -> None:
self.warnings.append(info)

View File

@ -87,6 +87,14 @@ class MetadataUsageBulkSink(BulkSink):
)
try:
self.metadata.publish_table_usage(table_entity, table_usage_request)
logger.info(
f"Successfully table usage published for {record.table.fullyQualifiedName}"
)
self.status.records_written(
f"Table: {record.table.fullyQualifiedName}"
)
except Exception as err:
self.status.failures.append(table_usage_request)
logger.error(
@ -94,6 +102,7 @@ class MetadataUsageBulkSink(BulkSink):
table_usage.table, err
)
)
self.status.failures.append(f"Table: {table_usage.table}")
table_join_request = self.__get_table_joins(table_usage)
logger.debug("table join request {}".format(table_join_request))
try:
@ -118,6 +127,8 @@ class MetadataUsageBulkSink(BulkSink):
table_usage.table, table_usage.database
)
)
self.status.warnings.append(f"Table: {table_usage.table}")
try:
self.metadata.compute_percentile(Table, self.today)
self.metadata.compute_percentile(Database, self.today)

View File

@ -114,7 +114,12 @@ class TableUsageStage(Stage[QueryParserData]):
)
except Exception as exc:
logger.error("Error in staging record {}".format(exc))
self.status.failures(
f"Table: {table}", "Error in staging record {}".format(exc)
)
self.table_usage[table] = table_usage_count
logger.info(f"Successfully record staged for {table}")
self.status.records_status(f"Table: {table}")
def get_status(self):
return self.status