From 7c387d6d54cb4359fb38dc58fb7d672f91baf612 Mon Sep 17 00:00:00 2001 From: Onkar Ravgan Date: Thu, 9 Jun 2022 22:24:40 +0530 Subject: [PATCH] Optimized Snowflake fetch tags (#5376) * optimized snowflake fetch tags * fixed same named entities issue Co-authored-by: Onkar Ravgan Co-authored-by: Mayur Singal <39544459+ulixius9@users.noreply.github.com> --- .../ingestion/source/database/bigquery.py | 4 +- .../ingestion/source/database/snowflake.py | 154 ++++++++++++------ .../source/database/sql_column_handler.py | 10 +- ingestion/src/metadata/utils/sql_queries.py | 4 +- 4 files changed, 113 insertions(+), 59 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery.py b/ingestion/src/metadata/ingestion/source/database/bigquery.py index a21b13f1032..a80efaddb86 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery.py @@ -136,7 +136,9 @@ class BigquerySource(CommonDbSourceService): logger.error(err) return super().prepare() - def fetch_column_tags(self, column: dict, col_obj: Column) -> None: + def fetch_column_tags( + self, column: dict, col_obj: Column, schema: str, table: str + ) -> None: try: if ( self.source_config.includeTags diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake.py b/ingestion/src/metadata/ingestion/source/database/snowflake.py index a18c4decf82..2ac517a6044 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake.py @@ -23,7 +23,7 @@ from metadata.generated.schema.api.tags.createTag import CreateTagRequest from metadata.generated.schema.api.tags.createTagCategory import ( CreateTagCategoryRequest, ) -from metadata.generated.schema.entity.data.table import Table, TableData +from metadata.generated.schema.entity.data.table import Column, Table, TableData from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import ( SnowflakeConnection, ) @@ -61,6 +61,51 @@ class SnowflakeSource(CommonDbSourceService): def __init__(self, config, metadata_config): super().__init__(config, metadata_config) + def get_all_table_tags(self): + results = self.connection.execute(FETCH_SNOWFLAKE_ALL_TAGS) + self.all_table_tags = {} + self.all_column_tags = {} + for result in results: + row = list(result) + tag_dict = { + "tag_id": row[2], + "tag_name": row[3], + "tag_value": row[4], + "tag_domain": row[10], + } + tag = OMetaTagAndCategory( + category_name=CreateTagCategoryRequest( + name=tag_dict["tag_name"], + description="SNOWFLAKE COLUMN TAG NAME", + categoryType="Descriptive", + ), + category_details=CreateTagRequest( + name=tag_dict["tag_value"], description="SNOWFLAKE COLUMN TAG VALUE" + ), + ) + yield tag + if tag_dict["tag_domain"] == "TABLE": + table_fqn = fqn.build( + self.metadata, + entity_type=Table, + service_name=self.config.serviceName, + schema_name=row[6], + table_name=row[8], + database_name=row[5], + ) + if self.all_table_tags.get(table_fqn): + self.all_table_tags[table_fqn].append(tag_dict) + else: + self.all_table_tags[table_fqn] = [tag_dict] + else: + column_fqn = ( + f"{self.config.serviceName}.{row[5]}.{row[6]}.{row[8]}.{row[12]}" + ) + if self.all_column_tags.get(column_fqn): + self.all_column_tags[column_fqn].append(tag_dict) + else: + self.all_column_tags[column_fqn] = [tag_dict] + def get_databases(self) -> Iterable[Inspector]: if self.config.serviceConnection.__root__.config.database: yield from super().get_databases() @@ -81,39 +126,62 @@ class SnowflakeSource(CommonDbSourceService): self.engine = get_connection(self.service_connection) yield inspect(self.engine) - def fetch_tags(self, schema, table_name: str, column_name: str = ""): + def add_tags_to_table(self, schema: str, table_name: str, table_entity): + table_fqn = fqn.build( + self.metadata, + entity_type=Table, + service_name=self.config.serviceName, + schema_name=schema, + table_name=table_name, + database_name=self.config.serviceConnection.__root__.config.database, + ) try: - result = self.connection.execute( - FETCH_SNOWFLAKE_ALL_TAGS.format(table_name) - ) + tag_category_list = self.all_table_tags.get(table_fqn) + if tag_category_list: + table_entity.tags = [] + for tag in tag_category_list: + table_entity.tags.append( + TagLabel( + tagFQN=fqn.build( + self.metadata, + entity_type=Tag, + tag_category_name=tag["tag_name"], + tag_name=tag["tag_value"], + ), + labelType="Automated", + state="Suggested", + source="Tag", + ) + ) except Exception as err: - logger.warning("Trying tags for tables with quotes") - result = self.connection.execute( - FETCH_SNOWFLAKE_ALL_TAGS.format(f'"{table_name}"') - ) - tags = [] - for res in result: - row = list(res) - tag_category = row[2] - primary_tag = row[3] - if row[4] == "COLUMN" or column_name and row[9] != column_name: - continue - tags.append( - OMetaTagAndCategory( - category_name=CreateTagCategoryRequest( - name=tag_category, - description="SNOWFLAKE TAG NAME", - categoryType="Descriptive", - ), - category_details=CreateTagRequest( - name=primary_tag, description="SNOWFLAKE TAG VALUE" - ), - ) - ) - logger.info( - f"Tag Category {tag_category}, Primary Tag {primary_tag} Ingested" - ) - return tags + logger.debug(traceback.format_exc()) + logger.info(err) + + def fetch_column_tags( + self, column: dict, col_obj: Column, schema: str, table: str + ) -> None: + column_fqn = f"{self.config.serviceName}.{self.config.serviceConnection.__root__.config.database}.{schema}.{table}.{col_obj.name.__root__}" + try: + tag_category_list = self.all_column_tags.get(column_fqn) + if tag_category_list: + col_obj.tags = [] + for tag in tag_category_list: + col_obj.tags.append( + TagLabel( + tagFQN=fqn.build( + self.metadata, + entity_type=Tag, + tag_category_name=tag["tag_name"], + tag_name=tag["tag_value"], + ), + labelType="Automated", + state="Suggested", + source="Tag", + ) + ) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.info(err) @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): @@ -126,6 +194,7 @@ class SnowflakeSource(CommonDbSourceService): return cls(config, metadata_config) def next_record(self) -> Iterable[Entity]: + yield from self.get_all_table_tags() or [] for inspector in self.get_databases(): for schema in inspector.get_schema_names(): try: @@ -148,25 +217,6 @@ class SnowflakeSource(CommonDbSourceService): logger.debug(traceback.format_exc()) logger.info(err) - def add_tags_to_table(self, schema: str, table_name: str, table_entity): - tag_category_list = self.fetch_tags(schema=schema, table_name=table_name) - table_entity.tags = [] - for tags in tag_category_list: - yield tags - table_entity.tags.append( - TagLabel( - tagFQN=fqn.build( - self.metadata, - entity_type=Tag, - tag_category_name=tags.category_name.name.__root__, - tag_name=tags.category_details.name.__root__, - ), - labelType="Automated", - state="Suggested", - source="Tag", - ) - ) - def fetch_tables( self, inspector: Inspector, @@ -201,7 +251,7 @@ class SnowflakeSource(CommonDbSourceService): columns=table_columns, viewDefinition=view_definition, ) - yield from self.add_tags_to_table( + self.add_tags_to_table( schema=schema, table_name=table_name, table_entity=table_entity ) database = self.get_database_entity() diff --git a/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py b/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py index 3b83ddae751..dd9a2a11825 100644 --- a/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py +++ b/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py @@ -31,9 +31,11 @@ logger = ingestion_logger() class SqlColumnHandler: - def fetch_column_tags(self, column: dict, col_obj: Column) -> None: + def fetch_column_tags( + self, column: dict, col_obj: Column, schema: str, table: str + ) -> None: if self.source_config.includeTags: - logger.info("Fetching tags not implemeneted for this connector") + logger.info("Fetching tags not implemented for this connector") self.source_config.includeTags = False def _get_display_datatype( @@ -192,7 +194,9 @@ class SqlColumnHandler: logger.debug(traceback.format_exc()) logger.error(f"{err} : {column}") continue - self.fetch_column_tags(column=column, col_obj=om_column) + self.fetch_column_tags( + column=column, col_obj=om_column, schema=schema, table=table + ) table_columns.append(om_column) return table_columns diff --git a/ingestion/src/metadata/utils/sql_queries.py b/ingestion/src/metadata/utils/sql_queries.py index 0e0149007ca..40abe358d36 100644 --- a/ingestion/src/metadata/utils/sql_queries.py +++ b/ingestion/src/metadata/utils/sql_queries.py @@ -306,9 +306,7 @@ CLICKHOUSE_SQL_USAGE_STATEMENT = """ """ -FETCH_SNOWFLAKE_ALL_TAGS = ( - "select * from table(information_schema.TAG_REFERENCES_ALL_COLUMNS('{}', 'table'));" -) +FETCH_SNOWFLAKE_ALL_TAGS = "select * from snowflake.account_usage.tag_references;" FETCH_SNOWFLAKE_METADATA = """ select TABLE_NAME,TABLE_TYPE,COMMENT from information_schema.tables where TABLE_SCHEMA = '{}'