Optimized Snowflake fetch tags (#5376)

* optimized snowflake fetch tags

* fixed same named entities issue

Co-authored-by: Onkar Ravgan <onkarravgan@Onkars-MacBook-Pro.local>
Co-authored-by: Mayur Singal <39544459+ulixius9@users.noreply.github.com>
This commit is contained in:
Onkar Ravgan 2022-06-09 22:24:40 +05:30 committed by GitHub
parent 00b02ed2f8
commit 7c387d6d54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 113 additions and 59 deletions

View File

@ -136,7 +136,9 @@ class BigquerySource(CommonDbSourceService):
logger.error(err)
return super().prepare()
def fetch_column_tags(self, column: dict, col_obj: Column) -> None:
def fetch_column_tags(
self, column: dict, col_obj: Column, schema: str, table: str
) -> None:
try:
if (
self.source_config.includeTags

View File

@ -23,7 +23,7 @@ from metadata.generated.schema.api.tags.createTag import CreateTagRequest
from metadata.generated.schema.api.tags.createTagCategory import (
CreateTagCategoryRequest,
)
from metadata.generated.schema.entity.data.table import Table, TableData
from metadata.generated.schema.entity.data.table import Column, Table, TableData
from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import (
SnowflakeConnection,
)
@ -61,6 +61,51 @@ class SnowflakeSource(CommonDbSourceService):
def __init__(self, config, metadata_config):
super().__init__(config, metadata_config)
def get_all_table_tags(self):
results = self.connection.execute(FETCH_SNOWFLAKE_ALL_TAGS)
self.all_table_tags = {}
self.all_column_tags = {}
for result in results:
row = list(result)
tag_dict = {
"tag_id": row[2],
"tag_name": row[3],
"tag_value": row[4],
"tag_domain": row[10],
}
tag = OMetaTagAndCategory(
category_name=CreateTagCategoryRequest(
name=tag_dict["tag_name"],
description="SNOWFLAKE COLUMN TAG NAME",
categoryType="Descriptive",
),
category_details=CreateTagRequest(
name=tag_dict["tag_value"], description="SNOWFLAKE COLUMN TAG VALUE"
),
)
yield tag
if tag_dict["tag_domain"] == "TABLE":
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=self.config.serviceName,
schema_name=row[6],
table_name=row[8],
database_name=row[5],
)
if self.all_table_tags.get(table_fqn):
self.all_table_tags[table_fqn].append(tag_dict)
else:
self.all_table_tags[table_fqn] = [tag_dict]
else:
column_fqn = (
f"{self.config.serviceName}.{row[5]}.{row[6]}.{row[8]}.{row[12]}"
)
if self.all_column_tags.get(column_fqn):
self.all_column_tags[column_fqn].append(tag_dict)
else:
self.all_column_tags[column_fqn] = [tag_dict]
def get_databases(self) -> Iterable[Inspector]:
if self.config.serviceConnection.__root__.config.database:
yield from super().get_databases()
@ -81,39 +126,62 @@ class SnowflakeSource(CommonDbSourceService):
self.engine = get_connection(self.service_connection)
yield inspect(self.engine)
def fetch_tags(self, schema, table_name: str, column_name: str = ""):
def add_tags_to_table(self, schema: str, table_name: str, table_entity):
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=self.config.serviceName,
schema_name=schema,
table_name=table_name,
database_name=self.config.serviceConnection.__root__.config.database,
)
try:
result = self.connection.execute(
FETCH_SNOWFLAKE_ALL_TAGS.format(table_name)
)
tag_category_list = self.all_table_tags.get(table_fqn)
if tag_category_list:
table_entity.tags = []
for tag in tag_category_list:
table_entity.tags.append(
TagLabel(
tagFQN=fqn.build(
self.metadata,
entity_type=Tag,
tag_category_name=tag["tag_name"],
tag_name=tag["tag_value"],
),
labelType="Automated",
state="Suggested",
source="Tag",
)
)
except Exception as err:
logger.warning("Trying tags for tables with quotes")
result = self.connection.execute(
FETCH_SNOWFLAKE_ALL_TAGS.format(f'"{table_name}"')
)
tags = []
for res in result:
row = list(res)
tag_category = row[2]
primary_tag = row[3]
if row[4] == "COLUMN" or column_name and row[9] != column_name:
continue
tags.append(
OMetaTagAndCategory(
category_name=CreateTagCategoryRequest(
name=tag_category,
description="SNOWFLAKE TAG NAME",
categoryType="Descriptive",
),
category_details=CreateTagRequest(
name=primary_tag, description="SNOWFLAKE TAG VALUE"
),
)
)
logger.info(
f"Tag Category {tag_category}, Primary Tag {primary_tag} Ingested"
)
return tags
logger.debug(traceback.format_exc())
logger.info(err)
def fetch_column_tags(
self, column: dict, col_obj: Column, schema: str, table: str
) -> None:
column_fqn = f"{self.config.serviceName}.{self.config.serviceConnection.__root__.config.database}.{schema}.{table}.{col_obj.name.__root__}"
try:
tag_category_list = self.all_column_tags.get(column_fqn)
if tag_category_list:
col_obj.tags = []
for tag in tag_category_list:
col_obj.tags.append(
TagLabel(
tagFQN=fqn.build(
self.metadata,
entity_type=Tag,
tag_category_name=tag["tag_name"],
tag_name=tag["tag_value"],
),
labelType="Automated",
state="Suggested",
source="Tag",
)
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.info(err)
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
@ -126,6 +194,7 @@ class SnowflakeSource(CommonDbSourceService):
return cls(config, metadata_config)
def next_record(self) -> Iterable[Entity]:
yield from self.get_all_table_tags() or []
for inspector in self.get_databases():
for schema in inspector.get_schema_names():
try:
@ -148,25 +217,6 @@ class SnowflakeSource(CommonDbSourceService):
logger.debug(traceback.format_exc())
logger.info(err)
def add_tags_to_table(self, schema: str, table_name: str, table_entity):
tag_category_list = self.fetch_tags(schema=schema, table_name=table_name)
table_entity.tags = []
for tags in tag_category_list:
yield tags
table_entity.tags.append(
TagLabel(
tagFQN=fqn.build(
self.metadata,
entity_type=Tag,
tag_category_name=tags.category_name.name.__root__,
tag_name=tags.category_details.name.__root__,
),
labelType="Automated",
state="Suggested",
source="Tag",
)
)
def fetch_tables(
self,
inspector: Inspector,
@ -201,7 +251,7 @@ class SnowflakeSource(CommonDbSourceService):
columns=table_columns,
viewDefinition=view_definition,
)
yield from self.add_tags_to_table(
self.add_tags_to_table(
schema=schema, table_name=table_name, table_entity=table_entity
)
database = self.get_database_entity()

View File

@ -31,9 +31,11 @@ logger = ingestion_logger()
class SqlColumnHandler:
def fetch_column_tags(self, column: dict, col_obj: Column) -> None:
def fetch_column_tags(
self, column: dict, col_obj: Column, schema: str, table: str
) -> None:
if self.source_config.includeTags:
logger.info("Fetching tags not implemeneted for this connector")
logger.info("Fetching tags not implemented for this connector")
self.source_config.includeTags = False
def _get_display_datatype(
@ -192,7 +194,9 @@ class SqlColumnHandler:
logger.debug(traceback.format_exc())
logger.error(f"{err} : {column}")
continue
self.fetch_column_tags(column=column, col_obj=om_column)
self.fetch_column_tags(
column=column, col_obj=om_column, schema=schema, table=table
)
table_columns.append(om_column)
return table_columns

View File

@ -306,9 +306,7 @@ CLICKHOUSE_SQL_USAGE_STATEMENT = """
"""
FETCH_SNOWFLAKE_ALL_TAGS = (
"select * from table(information_schema.TAG_REFERENCES_ALL_COLUMNS('{}', 'table'));"
)
FETCH_SNOWFLAKE_ALL_TAGS = "select * from snowflake.account_usage.tag_references;"
FETCH_SNOWFLAKE_METADATA = """
select TABLE_NAME,TABLE_TYPE,COMMENT from information_schema.tables where TABLE_SCHEMA = '{}'