diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py index 39b6fd3bfe5..cef70b4a6f5 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py @@ -94,10 +94,9 @@ from metadata.ingestion.source.database.snowflake.queries import ( SNOWFLAKE_GET_DATABASE_COMMENTS, SNOWFLAKE_GET_DATABASES, SNOWFLAKE_GET_EXTERNAL_LOCATIONS, - SNOWFLAKE_GET_FUNCTIONS, SNOWFLAKE_GET_ORGANIZATION_NAME, SNOWFLAKE_GET_SCHEMA_COMMENTS, - SNOWFLAKE_GET_STORED_PROCEDURES, + SNOWFLAKE_GET_STORED_PROCEDURES_AND_FUNCTIONS, SNOWFLAKE_GET_STREAM, SNOWFLAKE_LIFE_CYCLE_QUERY, SNOWFLAKE_SESSION_TAG_QUERY, @@ -558,18 +557,17 @@ class SnowflakeSource( def _get_table_names_and_types( self, schema_name: str, table_type: TableType = TableType.Regular ) -> List[TableNameAndType]: - table_type_to_params_map = { - TableType.Regular: {}, - TableType.External: {"external_tables": True}, - TableType.Transient: {"include_transient_tables": True}, - TableType.Dynamic: {"dynamic_tables": True}, - } snowflake_tables = self.inspector.get_table_names( schema=schema_name, incremental=self.incremental, account_usage=self.service_connection.accountUsageSchema, - **table_type_to_params_map[table_type], + include_views=self.source_config.includeViews, + **( + {"include_transient_tables": True} + if self.service_connection.includeTransientTables + else {} + ), ) self.context.get_global().deleted_tables.extend( @@ -587,7 +585,7 @@ class SnowflakeSource( ) return [ - TableNameAndType(name=table.name, type_=table_type) + TableNameAndType(name=table.name, type_=table.type_) for table in snowflake_tables.get_not_deleted() ] @@ -631,21 +629,6 @@ class SnowflakeSource( """ table_list = self._get_table_names_and_types(schema_name) - table_list.extend( - self._get_table_names_and_types(schema_name, table_type=TableType.External) - ) - - table_list.extend( - self._get_table_names_and_types(schema_name, table_type=TableType.Dynamic) - ) - - if self.service_connection.includeTransientTables: - table_list.extend( - self._get_table_names_and_types( - schema_name, table_type=TableType.Transient - ) - ) - if self.service_connection.includeStreams: table_list.extend(self._get_stream_names_and_types(schema_name)) @@ -742,39 +725,6 @@ class SnowflakeSource( logger.error(f"Unable to get procedure source url: {exc}") return None - def _get_view_names_and_types( - self, schema_name: str, materialized_views: bool = False - ) -> List[TableNameAndType]: - table_type = ( - TableType.MaterializedView if materialized_views else TableType.View - ) - - snowflake_views = self.inspector.get_view_names( - schema=schema_name, - incremental=self.incremental, - account_usage=self.service_connection.accountUsageSchema, - materialized_views=materialized_views, - ) - - self.context.get_global().deleted_tables.extend( - [ - fqn.build( - metadata=self.metadata, - entity_type=Table, - service_name=self.context.get().database_service, - database_name=self.context.get().database, - schema_name=schema_name, - table_name=view.name, - ) - for view in snowflake_views.get_deleted() - ] - ) - - return [ - TableNameAndType(name=view.name, type_=table_type) - for view in snowflake_views.get_not_deleted() - ] - def query_view_names_and_types( self, schema_name: str ) -> Iterable[TableNameAndType]: @@ -786,12 +736,7 @@ class SnowflakeSource( This is useful for sources where we need fine-grained logic on how to handle table types, e.g., material views,... """ - views = self._get_view_names_and_types(schema_name) - views.extend( - self._get_view_names_and_types(schema_name, materialized_views=True) - ) - - return views + return [] def _get_stored_procedures_internal( self, query: str @@ -823,9 +768,8 @@ class SnowflakeSource( """List Snowflake stored procedures""" if self.source_config.includeStoredProcedures: yield from self._get_stored_procedures_internal( - SNOWFLAKE_GET_STORED_PROCEDURES + SNOWFLAKE_GET_STORED_PROCEDURES_AND_FUNCTIONS ) - yield from self._get_stored_procedures_internal(SNOWFLAKE_GET_FUNCTIONS) def describe_procedure_definition( self, stored_procedure: SnowflakeStoredProcedure diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/models.py b/ingestion/src/metadata/ingestion/source/database/snowflake/models.py index c7a9b6b6e48..c7ad5999e3f 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/models.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/models.py @@ -21,6 +21,7 @@ from sqlalchemy import text from sqlalchemy.orm import Session from metadata.generated.schema.entity.data.storedProcedure import Language +from metadata.generated.schema.entity.data.table import TableType from metadata.ingestion.source.database.snowflake.queries import ( SNOWFLAKE_QUERY_LOG_QUERY, ) @@ -92,6 +93,7 @@ class SnowflakeTable(BaseModel): name: str deleted: Optional[datetime] = None + type_: Optional[TableType] = None class SnowflakeTableList(BaseModel): diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py b/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py index 80c45168f0e..fdedcdc0593 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py @@ -14,6 +14,33 @@ SQL Queries used during ingestion import textwrap +SNOWFLAKE_GET_TABLE_NAMES = """ + select TABLE_NAME, NULL, TABLE_TYPE from information_schema.tables + where TABLE_SCHEMA = '{schema}' + AND COALESCE(IS_TRANSIENT, 'NO') != '{is_transient}' + AND {include_views} +""" + +SNOWFLAKE_INCREMENTAL_GET_TABLE_NAMES = """ +select TABLE_NAME, DELETED, TABLE_TYPE +from ( + select + TABLE_NAME, + DELETED, + TABLE_TYPE, + ROW_NUMBER() over ( + partition by TABLE_NAME order by LAST_DDL desc + ) as ROW_NUMBER + from {account_usage}.tables + where TABLE_CATALOG = '{database}' + and TABLE_SCHEMA = '{schema}' + and COALESCE(IS_TRANSIENT, 'NO') != '{is_transient}' + and DATE_PART(epoch_millisecond, LAST_DDL) >= '{date}' + and {include_views} +) +where ROW_NUMBER = 1 +""" + SNOWFLAKE_SQL_STATEMENT = textwrap.dedent( """ SELECT @@ -317,7 +344,7 @@ and table_catalog = '{database_name}' """ ) -SNOWFLAKE_GET_STORED_PROCEDURES = textwrap.dedent( +SNOWFLAKE_GET_STORED_PROCEDURES_AND_FUNCTIONS = textwrap.dedent( """ SELECT PROCEDURE_NAME AS name, @@ -331,11 +358,9 @@ FROM {account_usage}.PROCEDURES WHERE PROCEDURE_CATALOG = '{database_name}' AND PROCEDURE_SCHEMA = '{schema_name}' AND DELETED IS NULL - """ -) -SNOWFLAKE_GET_FUNCTIONS = textwrap.dedent( - """ +UNION ALL + SELECT FUNCTION_NAME AS name, FUNCTION_OWNER AS owner, diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py b/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py index 2ea285e1071..ceedcfadd99 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py @@ -24,6 +24,7 @@ from sqlalchemy.engine import reflection from sqlalchemy.sql import text from sqlalchemy.types import FLOAT +from metadata.generated.schema.entity.data.table import TableType from metadata.ingestion.source.database.incremental_metadata_extraction import ( IncrementalConfig, ) @@ -33,25 +34,19 @@ from metadata.ingestion.source.database.snowflake.models import ( ) from metadata.ingestion.source.database.snowflake.queries import ( SNOWFLAKE_GET_COMMENTS, - SNOWFLAKE_GET_DYNAMIC_TABLE_NAMES, - SNOWFLAKE_GET_EXTERNAL_TABLE_NAMES, SNOWFLAKE_GET_MVIEW_NAMES, SNOWFLAKE_GET_SCHEMA_COLUMNS, SNOWFLAKE_GET_STREAM_DEFINITION, SNOWFLAKE_GET_STREAM_NAMES, SNOWFLAKE_GET_TABLE_DDL, - SNOWFLAKE_GET_TRANSIENT_NAMES, + SNOWFLAKE_GET_TABLE_NAMES, SNOWFLAKE_GET_VIEW_DDL, SNOWFLAKE_GET_VIEW_DEFINITION, SNOWFLAKE_GET_VIEW_NAMES, - SNOWFLAKE_GET_WITHOUT_TRANSIENT_TABLE_NAMES, - SNOWFLAKE_INCREMENTAL_GET_DYNAMIC_TABLE_NAMES, - SNOWFLAKE_INCREMENTAL_GET_EXTERNAL_TABLE_NAMES, SNOWFLAKE_INCREMENTAL_GET_MVIEW_NAMES, SNOWFLAKE_INCREMENTAL_GET_STREAM_NAMES, - SNOWFLAKE_INCREMENTAL_GET_TRANSIENT_NAMES, + SNOWFLAKE_INCREMENTAL_GET_TABLE_NAMES, SNOWFLAKE_INCREMENTAL_GET_VIEW_NAMES, - SNOWFLAKE_INCREMENTAL_GET_WITHOUT_TRANSIENT_TABLE_NAMES, ) from metadata.utils import fqn from metadata.utils.sqlalchemy_utils import ( @@ -67,16 +62,10 @@ QueryMap = Dict[str, Query] TABLE_QUERY_MAPS = { "full": { - "default": SNOWFLAKE_GET_WITHOUT_TRANSIENT_TABLE_NAMES, - "transient_tables": SNOWFLAKE_GET_TRANSIENT_NAMES, - "external_tables": SNOWFLAKE_GET_EXTERNAL_TABLE_NAMES, - "dynamic_tables": SNOWFLAKE_GET_DYNAMIC_TABLE_NAMES, + "default": SNOWFLAKE_GET_TABLE_NAMES, }, "incremental": { - "default": SNOWFLAKE_INCREMENTAL_GET_WITHOUT_TRANSIENT_TABLE_NAMES, - "transient_tables": SNOWFLAKE_INCREMENTAL_GET_TRANSIENT_NAMES, - "external_tables": SNOWFLAKE_INCREMENTAL_GET_EXTERNAL_TABLE_NAMES, - "dynamic_tables": SNOWFLAKE_INCREMENTAL_GET_DYNAMIC_TABLE_NAMES, + "default": SNOWFLAKE_INCREMENTAL_GET_TABLE_NAMES, }, } @@ -194,9 +183,15 @@ def _get_query_parameters( schema: str, incremental: Optional[IncrementalConfig], account_usage: Optional[str] = None, + include_transient_tables: Optional[bool] = False, + include_views: Optional[bool] = False, ): """Returns the proper query parameters depending if the extraction is Incremental or Full""" - parameters = {"schema": fqn.unquote_name(schema)} + parameters = { + "schema": fqn.unquote_name(schema), + "is_transient": "YES" if include_transient_tables else "NO", + "include_views": "TRUE" if include_views else "TABLE_TYPE != 'VIEW'", + } if incremental and incremental.enabled: database, _ = self._current_database_schema(connection) # pylint: disable=W0212 @@ -217,30 +212,43 @@ def get_table_names(self, connection, schema: str, **kw): queries = _get_query_map(incremental, TABLE_QUERY_MAPS) parameters = _get_query_parameters( - self, connection, schema, incremental, account_usage + self, + connection, + schema, + incremental, + account_usage, + include_transient_tables=kw.get("include_transient_tables", False), + include_views=kw.get("include_views", False), ) query = queries["default"] - if kw.get("include_transient_tables"): - query = queries["transient_tables"] - - if kw.get("external_tables"): - query = queries["external_tables"] - - if kw.get("dynamic_tables"): - query = queries["dynamic_tables"] - cursor = connection.execute(query.format(**parameters)) result = SnowflakeTableList( tables=[ - SnowflakeTable(name=self.normalize_name(row[0]), deleted=row[1]) + SnowflakeTable( + name=self.normalize_name(row[0]), + deleted=row[1], + type_=_get_table_type(row[2] if row[2] else "BASE TABLE"), + ) for row in cursor ] ) return result +def _get_table_type(table_type: str) -> TableType: + table_type_map = { + "BASE TABLE": TableType.Regular, + "VIEW": TableType.View, + "MATERIALIZED VIEW": TableType.MaterializedView, + "EXTERNAL TABLE": TableType.External, + "TRANSIENT TABLE": TableType.Transient, + "DYNAMIC TABLE": TableType.Dynamic, + } + return table_type_map.get(table_type, TableType.Regular) + + def get_view_names(self, connection, schema, **kw): incremental = kw.get("incremental") account_usage = kw.get("account_usage")