diff --git a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py index 04f13c0365e..60fd263e40a 100644 --- a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py +++ b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py @@ -12,6 +12,7 @@ import json import logging from datetime import datetime +from typing import List from metadata.config.common import ConfigModel from metadata.generated.schema.entity.data.database import Database @@ -79,56 +80,69 @@ class MetadataUsageBulkSink(BulkSink): table_usage_map = {} for record in usage_records: table_usage = TableUsageCount(**json.loads(record)) + table_entities = [] if "." in table_usage.table: ( table_usage.database_schema, table_usage.table, ) = table_usage.table.split(".")[-2:] - self.service_name = table_usage.service_name - table_entity = self.__get_table_entity( - table_usage.database, table_usage.database_schema, table_usage.table - ) - if table_entity is not None: - if not table_usage_map.get(table_entity.id.__root__): - table_usage_map[table_entity.id.__root__] = { - "table_entity": table_entity, - "usage_count": table_usage.count, - "sql_queries": table_usage.sql_queries, - "usage_date": table_usage.date, - "database": table_usage.database, - } - else: - table_usage_map[table_entity.id.__root__][ - "usage_count" - ] += table_usage.count - table_usage_map[table_entity.id.__root__]["sql_queries"].extend( - table_usage.sql_queries - ) - table_join_request = self.__get_table_joins(table_usage) - logger.debug("table join request {}".format(table_join_request)) - try: - if ( - table_join_request is not None - and len(table_join_request.columnJoins) > 0 - ): - self.metadata.publish_frequently_joined_with( - table_entity, table_join_request - ) - except APIError as err: - self.status.failures.append(table_join_request) - logger.error( - "Failed to update query join for {}, {}".format( - table_usage.table, err - ) - ) - - else: - logger.warning( - "Table does not exist, skipping usage publish {}, {}".format( - table_usage.table, table_usage.database - ) + table_entities = self.__get_table_entity( + table_usage.database, table_usage.database_schema, table_usage.table ) - self.status.warnings.append(f"Table: {table_usage.table}") + else: + es_result = self.metadata.search_entities_using_es( + service_name=self.service_name, + table_obj={ + "database": table_usage.database, + "database_schema": None, + "name": table_usage.table, + }, + search_index="table_search_index", + ) + table_entities = es_result + self.service_name = table_usage.service_name + for table_entity in table_entities: + if table_entity is not None: + if not table_usage_map.get(table_entity.id.__root__): + table_usage_map[table_entity.id.__root__] = { + "table_entity": table_entity, + "usage_count": table_usage.count, + "sql_queries": table_usage.sql_queries, + "usage_date": table_usage.date, + "database": table_usage.database, + } + else: + table_usage_map[table_entity.id.__root__][ + "usage_count" + ] += table_usage.count + table_usage_map[table_entity.id.__root__]["sql_queries"].extend( + table_usage.sql_queries + ) + table_join_request = self.__get_table_joins(table_usage) + logger.debug("table join request {}".format(table_join_request)) + try: + if ( + table_join_request is not None + and len(table_join_request.columnJoins) > 0 + ): + self.metadata.publish_frequently_joined_with( + table_entity, table_join_request + ) + except APIError as err: + self.status.failures.append(table_join_request) + logger.error( + "Failed to update query join for {}, {}".format( + table_usage.table, err + ) + ) + + else: + logger.warning( + "Table does not exist, skipping usage publish {}, {}".format( + table_usage.table, table_usage.database + ) + ) + self.status.warnings.append(f"Table: {table_usage.table}") for table_id, value_dict in table_usage_map.items(): self.metadata.ingest_table_queries_data( @@ -212,24 +226,36 @@ class MetadataUsageBulkSink(BulkSink): def __get_column_fqdn( self, database: str, database_schema: str, table_column: TableColumn ): - table_entity = self.__get_table_entity( + table_entities = self.__get_table_entity( database, database_schema, table_column.table ) - if table_entity is None: + if table_entities is None or table_entities == []: return None - for tbl_column in table_entity.columns: - if table_column.column.lower() == tbl_column.name.__root__.lower(): - return tbl_column.fullyQualifiedName.__root__.__root__ + for table_entity in table_entities: + for tbl_column in table_entity.columns: + if table_column.column.lower() == tbl_column.name.__root__.lower(): + return tbl_column.fullyQualifiedName.__root__.__root__ def __get_table_entity( self, database_name: str, database_schema: str, table_name: str - ) -> Table: + ) -> List[Table]: table_fqn = get_fqdn( Table, self.service_name, database_name, database_schema, table_name ) table_fqn = _get_formmated_table_name(table_fqn) table_entity = self.metadata.get_by_name(Table, fqdn=table_fqn) - return table_entity + if table_entity: + return [table_entity] + es_result = self.metadata.search_entities_using_es( + service_name=self.service_name, + table_obj={ + "database": database_name, + "database_schema": database_schema, + "name": table_name, + }, + search_index="table_search_index", + ) + return es_result def get_status(self): return self.status diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py index 844d565fedf..e327114a7c7 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py @@ -144,6 +144,8 @@ class OMetaLineageMixin(Generic[T]): def _separate_fqn(self, database, fqn): database_schema, table = fqn.split(".")[-2:] + if not database_schema: + database_schema = None return {"database": database, "database_schema": database_schema, "name": table} def _create_lineage_by_table_name( @@ -153,6 +155,8 @@ class OMetaLineageMixin(Generic[T]): This method is to create a lineage between two tables """ try: + from_table = str(from_table).replace("", "") + to_table = str(to_table).replace("", "") from_fqdn = get_fqdn( AddLineageRequest, service_name, @@ -185,7 +189,7 @@ class OMetaLineageMixin(Generic[T]): ) else: multiple_to_fqns = [to_entity] - if not from_entity or not to_entity: + if not multiple_to_fqns or not multiple_from_fqns: return None for from_entity in multiple_from_fqns: for to_entity in multiple_to_fqns: @@ -201,6 +205,7 @@ class OMetaLineageMixin(Generic[T]): ), ) ) + created_lineage = self.add_lineage(lineage) logger.info(f"Successfully added Lineage {created_lineage}")