Added elastic search in usage (#4226)

* Added elastic search in usage

* optimized conditions
This commit is contained in:
Mayur Singal 2022-04-19 19:57:42 +05:30 committed by GitHub
parent 2860258b27
commit cc6683beed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 83 additions and 52 deletions

View File

@ -12,6 +12,7 @@
import json
import logging
from datetime import datetime
from typing import List
from metadata.config.common import ConfigModel
from metadata.generated.schema.entity.data.database import Database
@ -79,56 +80,69 @@ class MetadataUsageBulkSink(BulkSink):
table_usage_map = {}
for record in usage_records:
table_usage = TableUsageCount(**json.loads(record))
table_entities = []
if "." in table_usage.table:
(
table_usage.database_schema,
table_usage.table,
) = table_usage.table.split(".")[-2:]
self.service_name = table_usage.service_name
table_entity = self.__get_table_entity(
table_usage.database, table_usage.database_schema, table_usage.table
)
if table_entity is not None:
if not table_usage_map.get(table_entity.id.__root__):
table_usage_map[table_entity.id.__root__] = {
"table_entity": table_entity,
"usage_count": table_usage.count,
"sql_queries": table_usage.sql_queries,
"usage_date": table_usage.date,
"database": table_usage.database,
}
else:
table_usage_map[table_entity.id.__root__][
"usage_count"
] += table_usage.count
table_usage_map[table_entity.id.__root__]["sql_queries"].extend(
table_usage.sql_queries
)
table_join_request = self.__get_table_joins(table_usage)
logger.debug("table join request {}".format(table_join_request))
try:
if (
table_join_request is not None
and len(table_join_request.columnJoins) > 0
):
self.metadata.publish_frequently_joined_with(
table_entity, table_join_request
)
except APIError as err:
self.status.failures.append(table_join_request)
logger.error(
"Failed to update query join for {}, {}".format(
table_usage.table, err
)
)
else:
logger.warning(
"Table does not exist, skipping usage publish {}, {}".format(
table_usage.table, table_usage.database
)
table_entities = self.__get_table_entity(
table_usage.database, table_usage.database_schema, table_usage.table
)
self.status.warnings.append(f"Table: {table_usage.table}")
else:
es_result = self.metadata.search_entities_using_es(
service_name=self.service_name,
table_obj={
"database": table_usage.database,
"database_schema": None,
"name": table_usage.table,
},
search_index="table_search_index",
)
table_entities = es_result
self.service_name = table_usage.service_name
for table_entity in table_entities:
if table_entity is not None:
if not table_usage_map.get(table_entity.id.__root__):
table_usage_map[table_entity.id.__root__] = {
"table_entity": table_entity,
"usage_count": table_usage.count,
"sql_queries": table_usage.sql_queries,
"usage_date": table_usage.date,
"database": table_usage.database,
}
else:
table_usage_map[table_entity.id.__root__][
"usage_count"
] += table_usage.count
table_usage_map[table_entity.id.__root__]["sql_queries"].extend(
table_usage.sql_queries
)
table_join_request = self.__get_table_joins(table_usage)
logger.debug("table join request {}".format(table_join_request))
try:
if (
table_join_request is not None
and len(table_join_request.columnJoins) > 0
):
self.metadata.publish_frequently_joined_with(
table_entity, table_join_request
)
except APIError as err:
self.status.failures.append(table_join_request)
logger.error(
"Failed to update query join for {}, {}".format(
table_usage.table, err
)
)
else:
logger.warning(
"Table does not exist, skipping usage publish {}, {}".format(
table_usage.table, table_usage.database
)
)
self.status.warnings.append(f"Table: {table_usage.table}")
for table_id, value_dict in table_usage_map.items():
self.metadata.ingest_table_queries_data(
@ -212,24 +226,36 @@ class MetadataUsageBulkSink(BulkSink):
def __get_column_fqdn(
self, database: str, database_schema: str, table_column: TableColumn
):
table_entity = self.__get_table_entity(
table_entities = self.__get_table_entity(
database, database_schema, table_column.table
)
if table_entity is None:
if table_entities is None or table_entities == []:
return None
for tbl_column in table_entity.columns:
if table_column.column.lower() == tbl_column.name.__root__.lower():
return tbl_column.fullyQualifiedName.__root__.__root__
for table_entity in table_entities:
for tbl_column in table_entity.columns:
if table_column.column.lower() == tbl_column.name.__root__.lower():
return tbl_column.fullyQualifiedName.__root__.__root__
def __get_table_entity(
self, database_name: str, database_schema: str, table_name: str
) -> Table:
) -> List[Table]:
table_fqn = get_fqdn(
Table, self.service_name, database_name, database_schema, table_name
)
table_fqn = _get_formmated_table_name(table_fqn)
table_entity = self.metadata.get_by_name(Table, fqdn=table_fqn)
return table_entity
if table_entity:
return [table_entity]
es_result = self.metadata.search_entities_using_es(
service_name=self.service_name,
table_obj={
"database": database_name,
"database_schema": database_schema,
"name": table_name,
},
search_index="table_search_index",
)
return es_result
def get_status(self):
return self.status

View File

@ -144,6 +144,8 @@ class OMetaLineageMixin(Generic[T]):
def _separate_fqn(self, database, fqn):
database_schema, table = fqn.split(".")[-2:]
if not database_schema:
database_schema = None
return {"database": database, "database_schema": database_schema, "name": table}
def _create_lineage_by_table_name(
@ -153,6 +155,8 @@ class OMetaLineageMixin(Generic[T]):
This method is to create a lineage between two tables
"""
try:
from_table = str(from_table).replace("<default>", "")
to_table = str(to_table).replace("<default>", "")
from_fqdn = get_fqdn(
AddLineageRequest,
service_name,
@ -185,7 +189,7 @@ class OMetaLineageMixin(Generic[T]):
)
else:
multiple_to_fqns = [to_entity]
if not from_entity or not to_entity:
if not multiple_to_fqns or not multiple_from_fqns:
return None
for from_entity in multiple_from_fqns:
for to_entity in multiple_to_fqns:
@ -201,6 +205,7 @@ class OMetaLineageMixin(Generic[T]):
),
)
)
created_lineage = self.add_lineage(lineage)
logger.info(f"Successfully added Lineage {created_lineage}")