From 7fa47c31d8a71c8144cd3f581e6d154c7759f620 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Thu, 23 Jun 2022 13:01:27 +0200 Subject: [PATCH] Fix #5565 - Handle missing column in usage fqn match (#5575) * add try * Add some exception handling * fix date and split --- .../ingestion/bulksink/metadata_usage.py | 89 ++++++++++++------- .../source/database/snowflake_usage.py | 2 +- 2 files changed, 60 insertions(+), 31 deletions(-) diff --git a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py index 0a5b45cd058..49684758fef 100644 --- a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py +++ b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py @@ -10,9 +10,12 @@ # limitations under the License. import json import os +import traceback from datetime import datetime from typing import List, Optional +from pydantic import ValidationError + from metadata.config.common import ConfigModel from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.table import ( @@ -30,6 +33,7 @@ from metadata.generated.schema.type.usageRequest import UsageRequest from metadata.ingestion.api.bulk_sink import BulkSink, BulkSinkStatus from metadata.ingestion.ometa.client import APIError from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils import fqn from metadata.utils.logger import ingestion_logger from metadata.utils.sql_lineage import ( get_column_fqn, @@ -117,19 +121,20 @@ class MetadataUsageBulkSink(BulkSink): Method to publish SQL Queries, Table Usage & Lineage """ for _, value_dict in self.table_usage_map.items(): - self.metadata.ingest_table_queries_data( - table=value_dict["table_entity"], - table_queries=value_dict["sql_queries"], - ) - self.ingest_sql_queries_lineage( - queries=value_dict["sql_queries"], - database_name=value_dict["database"], - schema_name=value_dict["database_schema"], - ) - table_usage_request = UsageRequest( - date=self.today, count=value_dict["usage_count"] - ) + table_usage_request = None try: + table_usage_request = UsageRequest( + date=self.today, count=value_dict["usage_count"] + ) + self.metadata.ingest_table_queries_data( + table=value_dict["table_entity"], + table_queries=value_dict["sql_queries"], + ) + self.ingest_sql_queries_lineage( + queries=value_dict["sql_queries"], + database_name=value_dict["database"], + schema_name=value_dict["database_schema"], + ) self.metadata.publish_table_usage( value_dict["table_entity"], table_usage_request ) @@ -141,6 +146,10 @@ class MetadataUsageBulkSink(BulkSink): self.status.records_written( "Table: {}".format(value_dict["table_entity"].name.__root__) ) + except ValidationError as err: + logger.error( + f"Cannot construct UsageRequest from {value_dict['table_entity']} - {err}" + ) except Exception as err: self.status.failures.append(table_usage_request) logger.error( @@ -160,13 +169,20 @@ class MetadataUsageBulkSink(BulkSink): self.service_name = table_usage.serviceName - table_entities = get_table_entities_from_query( - metadata=self.metadata, - service_name=self.service_name, - database_name=table_usage.databaseName, - database_schema=table_usage.databaseSchema, - table_name=table_usage.table, - ) + table_entities = None + try: + table_entities = get_table_entities_from_query( + metadata=self.metadata, + service_name=self.service_name, + database_name=table_usage.databaseName, + database_schema=table_usage.databaseSchema, + table_name=table_usage.table, + ) + except Exception as err: + logger.error( + f"Cannot get table entities from query table {table_usage.table} - {err}" + ) + logger.debug(traceback.format_exc()) if not table_entities: logger.warning( @@ -176,14 +192,16 @@ class MetadataUsageBulkSink(BulkSink): for table_entity in table_entities: if table_entity is not None: - self.__populate_table_usage_map( - table_usage=table_usage, table_entity=table_entity - ) - table_join_request = self.__get_table_joins( - table_entity=table_entity, table_usage=table_usage - ) - logger.debug("table join request {}".format(table_join_request)) + table_join_request = None try: + self.__populate_table_usage_map( + table_usage=table_usage, table_entity=table_entity + ) + table_join_request = self.__get_table_joins( + table_entity=table_entity, table_usage=table_usage + ) + logger.debug("table join request {}".format(table_join_request)) + if ( table_join_request is not None and len(table_join_request.columnJoins) > 0 @@ -198,6 +216,11 @@ class MetadataUsageBulkSink(BulkSink): table_usage.table, err ) ) + except Exception as err: + logger.error( + f"Error getting usage and join information for {table_entity.name.__root__} - {err}" + ) + logger.debug(traceback.format_exc()) else: logger.warning( "Table does not exist, skipping usage publish {}, {}".format( @@ -205,6 +228,7 @@ class MetadataUsageBulkSink(BulkSink): ) ) self.status.warnings.append(f"Table: {table_usage.table}") + self.__publish_usage_records() try: self.metadata.compute_percentile(Table, self.today) @@ -251,11 +275,16 @@ class MetadataUsageBulkSink(BulkSink): column_joins_dict[column_join.tableColumn.column] = joined_with for key, value in column_joins_dict.items(): - key_name = get_column_fqn(table_entity=table_entity, column=key).split(".")[ - -1 - ] + key_name = get_column_fqn(table_entity=table_entity, column=key) + if not key_name: + logger.warning( + f"Could not find column {key} in table {table_entity.fullyQualifiedName.__root__}" + ) + continue table_joins.columnJoins.append( - ColumnJoins(columnName=key_name, joinedWith=list(value.values())) + ColumnJoins( + columnName=fqn.split(key_name)[-1], joinedWith=list(value.values()) + ) ) return table_joins diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake_usage.py b/ingestion/src/metadata/ingestion/source/database/snowflake_usage.py index 8fea625f27b..e8e5a95aa07 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake_usage.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake_usage.py @@ -59,7 +59,7 @@ class SnowflakeUsageSource(UsageSource): super().__init__(config, metadata_config) # Snowflake does not allow retrieval of data older than 7 days - duration = min(self.source_config.queryLogDuration, 7) + duration = min(self.source_config.queryLogDuration, 6) self.start, self.end = get_start_and_end(duration) self.sql_stmt = SnowflakeUsageSource.SQL_STATEMENT.format(