Fix #5565 - Handle missing column in usage fqn match (#5575)

* add try

* Add some exception handling

* fix date and split
This commit is contained in:
Pere Miquel Brull 2022-06-23 13:01:27 +02:00 committed by GitHub
parent 4d4704863b
commit 7fa47c31d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 60 additions and 31 deletions

View File

@ -10,9 +10,12 @@
# limitations under the License.
import json
import os
import traceback
from datetime import datetime
from typing import List, Optional
from pydantic import ValidationError
from metadata.config.common import ConfigModel
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import (
@ -30,6 +33,7 @@ from metadata.generated.schema.type.usageRequest import UsageRequest
from metadata.ingestion.api.bulk_sink import BulkSink, BulkSinkStatus
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger
from metadata.utils.sql_lineage import (
get_column_fqn,
@ -117,19 +121,20 @@ class MetadataUsageBulkSink(BulkSink):
Method to publish SQL Queries, Table Usage & Lineage
"""
for _, value_dict in self.table_usage_map.items():
self.metadata.ingest_table_queries_data(
table=value_dict["table_entity"],
table_queries=value_dict["sql_queries"],
)
self.ingest_sql_queries_lineage(
queries=value_dict["sql_queries"],
database_name=value_dict["database"],
schema_name=value_dict["database_schema"],
)
table_usage_request = UsageRequest(
date=self.today, count=value_dict["usage_count"]
)
table_usage_request = None
try:
table_usage_request = UsageRequest(
date=self.today, count=value_dict["usage_count"]
)
self.metadata.ingest_table_queries_data(
table=value_dict["table_entity"],
table_queries=value_dict["sql_queries"],
)
self.ingest_sql_queries_lineage(
queries=value_dict["sql_queries"],
database_name=value_dict["database"],
schema_name=value_dict["database_schema"],
)
self.metadata.publish_table_usage(
value_dict["table_entity"], table_usage_request
)
@ -141,6 +146,10 @@ class MetadataUsageBulkSink(BulkSink):
self.status.records_written(
"Table: {}".format(value_dict["table_entity"].name.__root__)
)
except ValidationError as err:
logger.error(
f"Cannot construct UsageRequest from {value_dict['table_entity']} - {err}"
)
except Exception as err:
self.status.failures.append(table_usage_request)
logger.error(
@ -160,13 +169,20 @@ class MetadataUsageBulkSink(BulkSink):
self.service_name = table_usage.serviceName
table_entities = get_table_entities_from_query(
metadata=self.metadata,
service_name=self.service_name,
database_name=table_usage.databaseName,
database_schema=table_usage.databaseSchema,
table_name=table_usage.table,
)
table_entities = None
try:
table_entities = get_table_entities_from_query(
metadata=self.metadata,
service_name=self.service_name,
database_name=table_usage.databaseName,
database_schema=table_usage.databaseSchema,
table_name=table_usage.table,
)
except Exception as err:
logger.error(
f"Cannot get table entities from query table {table_usage.table} - {err}"
)
logger.debug(traceback.format_exc())
if not table_entities:
logger.warning(
@ -176,14 +192,16 @@ class MetadataUsageBulkSink(BulkSink):
for table_entity in table_entities:
if table_entity is not None:
self.__populate_table_usage_map(
table_usage=table_usage, table_entity=table_entity
)
table_join_request = self.__get_table_joins(
table_entity=table_entity, table_usage=table_usage
)
logger.debug("table join request {}".format(table_join_request))
table_join_request = None
try:
self.__populate_table_usage_map(
table_usage=table_usage, table_entity=table_entity
)
table_join_request = self.__get_table_joins(
table_entity=table_entity, table_usage=table_usage
)
logger.debug("table join request {}".format(table_join_request))
if (
table_join_request is not None
and len(table_join_request.columnJoins) > 0
@ -198,6 +216,11 @@ class MetadataUsageBulkSink(BulkSink):
table_usage.table, err
)
)
except Exception as err:
logger.error(
f"Error getting usage and join information for {table_entity.name.__root__} - {err}"
)
logger.debug(traceback.format_exc())
else:
logger.warning(
"Table does not exist, skipping usage publish {}, {}".format(
@ -205,6 +228,7 @@ class MetadataUsageBulkSink(BulkSink):
)
)
self.status.warnings.append(f"Table: {table_usage.table}")
self.__publish_usage_records()
try:
self.metadata.compute_percentile(Table, self.today)
@ -251,11 +275,16 @@ class MetadataUsageBulkSink(BulkSink):
column_joins_dict[column_join.tableColumn.column] = joined_with
for key, value in column_joins_dict.items():
key_name = get_column_fqn(table_entity=table_entity, column=key).split(".")[
-1
]
key_name = get_column_fqn(table_entity=table_entity, column=key)
if not key_name:
logger.warning(
f"Could not find column {key} in table {table_entity.fullyQualifiedName.__root__}"
)
continue
table_joins.columnJoins.append(
ColumnJoins(columnName=key_name, joinedWith=list(value.values()))
ColumnJoins(
columnName=fqn.split(key_name)[-1], joinedWith=list(value.values())
)
)
return table_joins

View File

@ -59,7 +59,7 @@ class SnowflakeUsageSource(UsageSource):
super().__init__(config, metadata_config)
# Snowflake does not allow retrieval of data older than 7 days
duration = min(self.source_config.queryLogDuration, 7)
duration = min(self.source_config.queryLogDuration, 6)
self.start, self.end = get_start_and_end(duration)
self.sql_stmt = SnowflakeUsageSource.SQL_STATEMENT.format(