From ed0d65efd5fbcba0222a0d2c68cba40aa768a2c9 Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Wed, 11 May 2022 17:35:35 +0530 Subject: [PATCH] Optimize and fix snowflake + usage (#4862) Optimize and fix snowflake + usage (#4862) --- .../ingestion/ometa/mixins/es_mixin.py | 2 +- .../metadata/ingestion/source/snowflake.py | 36 ++++++++++++++----- .../ingestion/source/snowflake_usage.py | 13 ++++--- ingestion/src/metadata/utils/sql_queries.py | 2 +- 4 files changed, 38 insertions(+), 15 deletions(-) diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py index 1f37e606c48..84407194b74 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py @@ -38,7 +38,7 @@ T = TypeVar("T", bound=BaseModel) # pylint: disable=invalid-name class ESMixin(Generic[T]): client: REST - es_url: str = "/search/query?q=service:{} {}&from={}&size={}&index={}" + es_url: str = "/search/query?q=service:{} AND {}&from={}&size={}&index={}" def search_entities_using_es( self, service_name, table_obj, search_index, from_count: int = 0, size: int = 10 diff --git a/ingestion/src/metadata/ingestion/source/snowflake.py b/ingestion/src/metadata/ingestion/source/snowflake.py index 5ea11b564bb..4f0e3454a98 100644 --- a/ingestion/src/metadata/ingestion/source/snowflake.py +++ b/ingestion/src/metadata/ingestion/source/snowflake.py @@ -42,7 +42,7 @@ from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory from metadata.ingestion.source.sql_source import SQLSource from metadata.utils.column_type_parser import create_sqlalchemy_type from metadata.utils.connections import get_connection -from metadata.utils.filters import filter_by_table +from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table from metadata.utils.fqdn_generator import get_fqdn from metadata.utils.logger import ingestion_logger from metadata.utils.sql_queries import ( @@ -69,6 +69,11 @@ class SnowflakeSource(SQLSource): results = self.connection.execute(query) for res in results: row = list(res) + if filter_by_database( + self.source_config.databaseFilterPattern, database_name=row[1] + ): + self.status.filter(row[1], "Database pattern not allowed") + continue use_db_query = f"USE DATABASE {row[1]}" self.connection.execute(use_db_query) logger.info(f"Ingesting from database: {row[1]}") @@ -77,7 +82,6 @@ class SnowflakeSource(SQLSource): yield inspect(self.engine) def fetch_tags(self, schema, table_name: str, column_name: str = ""): - self.connection.execute(f"USE {self.service_connection.database}.{schema}") try: result = self.connection.execute( FETCH_SNOWFLAKE_ALL_TAGS.format(table_name) @@ -143,7 +147,19 @@ class SnowflakeSource(SQLSource): def next_record(self) -> Iterable[Entity]: for inspector in self.get_databases(): - yield from self.fetch_tables(inspector=inspector, schema="") + for schema in inspector.get_schema_names(): + if filter_by_schema( + self.source_config.schemaFilterPattern, schema_name=schema + ): + self.status.filter( + f"{self.config.serviceName}.{self.service_connection.database}.{schema}", + "{} pattern not allowed".format("Schema"), + ) + continue + self.connection.execute( + f"USE {self.service_connection.database}.{schema}" + ) + yield from self.fetch_tables(inspector=inspector, schema=schema) def add_tags_to_table(self, schema: str, table_name: str, table_entity): tag_category_list = self.fetch_tags(schema=schema, table_name=table_name) @@ -168,17 +184,19 @@ class SnowflakeSource(SQLSource): inspector: Inspector, schema: str, ) -> Iterable[Union[OMetaDatabaseAndTable, OMetaTagAndCategory]]: - entities = inspector.get_table_names() - for db, schema, table_name, entity_type, comment in entities: + entities = inspector.get_table_names(schema) + for table_name, entity_type, comment in entities: try: if filter_by_table( self.source_config.tableFilterPattern, table_name=table_name ): self.status.filter( - f"{self.config.serviceName}.{db}.{schema}.{table_name}", + f"{self.config.serviceName}.{self.service_connection.database}.{schema}.{table_name}", "{} pattern not allowed".format(entity_type), ) continue + if entity_type == "VIEW" and not self.source_config.includeViews: + continue table_columns = self._get_columns(schema, table_name, inspector) view_definition = inspector.get_view_definition(table_name, schema) view_definition = ( @@ -199,7 +217,7 @@ class SnowflakeSource(SQLSource): table_data = self.fetch_sample_data(schema, table_name) table_entity.sampleData = table_data if self.source_config.enableDataProfiler: - profile = self.run_profiler(table=table_name, schema=schema) + profile = self.run_profiler(table=table_entity, schema=schema) table_entity.tableProfile = [profile] if profile else None database = self._get_database(self.service_connection.database) table_schema_and_db = OMetaDatabaseAndTable( @@ -214,8 +232,8 @@ class SnowflakeSource(SQLSource): logger.error(err) -def get_table_names(self, connection, schema=None, **kw): - result = connection.execute(FETCH_SNOWFLAKE_METADATA) +def get_table_names(self, connection, schema, **kw): + result = connection.execute(FETCH_SNOWFLAKE_METADATA.format(schema)) return result.fetchall() diff --git a/ingestion/src/metadata/ingestion/source/snowflake_usage.py b/ingestion/src/metadata/ingestion/source/snowflake_usage.py index a3ec5a43319..632eb72f7fa 100644 --- a/ingestion/src/metadata/ingestion/source/snowflake_usage.py +++ b/ingestion/src/metadata/ingestion/source/snowflake_usage.py @@ -21,6 +21,9 @@ from sqlalchemy import inspect from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import ( SnowflakeConnection, ) +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) from metadata.generated.schema.entity.services.databaseService import ( DatabaseServiceType, ) @@ -55,7 +58,7 @@ class SnowflakeUsageSource(UsageSource): SERVICE_TYPE = DatabaseServiceType.Snowflake.value DEFAULT_CLUSTER_SOURCE = "CURRENT_DATABASE()" - def __init__(self, config: WorkflowSource, metadata_config: WorkflowConfig): + def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): super().__init__(config, metadata_config) start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration) end = end + timedelta(days=1) @@ -91,7 +94,9 @@ class SnowflakeUsageSource(UsageSource): logger.info(f"Ingesting from database: {row[1]}") self.config.serviceConnection.__root__.config.database = row[1] self.engine = get_connection(self.connection) - yield inspect(self.engine) + rows = self.engine.execute(self.sql_stmt) + for row in rows: + yield row def next_record(self) -> Iterable[TableQuery]: """ @@ -112,8 +117,8 @@ class SnowflakeUsageSource(UsageSource): sql=row["query_text"], service_name=self.config.serviceName, ) - if not row["database_name"] and self.service_connection.database: - TableQuery.database = self.service_connection.database + if not row["database_name"] and self.connection.database: + TableQuery.database = self.connection.database logger.debug(f"Parsed Query: {row['query_text']}") if row["schema_name"] is not None: self.report.scanned(f"{row['database_name']}.{row['schema_name']}") diff --git a/ingestion/src/metadata/utils/sql_queries.py b/ingestion/src/metadata/utils/sql_queries.py index d2abdc29e50..0e0149007ca 100644 --- a/ingestion/src/metadata/utils/sql_queries.py +++ b/ingestion/src/metadata/utils/sql_queries.py @@ -311,5 +311,5 @@ FETCH_SNOWFLAKE_ALL_TAGS = ( ) FETCH_SNOWFLAKE_METADATA = """ -select TABLE_CATALOG,TABLE_SCHEMA,TABLE_NAME,TABLE_TYPE,COMMENT from information_schema.tables +select TABLE_NAME,TABLE_TYPE,COMMENT from information_schema.tables where TABLE_SCHEMA = '{}' """