From 06ab82170baf5a7171798294b477804233bce4f1 Mon Sep 17 00:00:00 2001 From: Mohit Tilala <63147650+mohittilala@users.noreply.github.com> Date: Tue, 1 Apr 2025 13:02:37 +0530 Subject: [PATCH] Fixes #19534: Snowflake stream ingestion support (#20278) --- .../ingestion/ometa/mixins/es_mixin.py | 5 + .../source/database/common_db_source.py | 1 + .../source/database/doris/metadata.py | 9 +- .../source/database/snowflake/connection.py | 6 + .../source/database/snowflake/constants.py | 48 ++++++++ .../source/database/snowflake/metadata.py | 116 +++++++++++++++++- .../source/database/snowflake/models.py | 6 +- .../source/database/snowflake/queries.py | 25 ++++ .../source/database/snowflake/utils.py | 92 ++++++++++++-- .../source/database/sql_column_handler.py | 28 ++++- .../unit/topology/database/test_bigquery.py | 2 +- .../unit/topology/database/test_cockroach.py | 12 +- .../tests/unit/topology/database/test_hive.py | 2 +- .../unit/topology/database/test_mssql.py | 4 +- .../unit/topology/database/test_postgres.py | 11 +- .../connectors/database/snowflake/index.md | 4 +- .../connectors/database/snowflake/yaml.md | 33 +++-- .../database/snowflakeConnection.md | 3 +- .../testConnections/database/snowflake.json | 6 + .../json/schema/entity/data/table.json | 6 +- .../database/snowflakeConnection.json | 8 +- .../locales/en-US/Database/Snowflake.md | 9 +- .../ui/src/generated/entity/data/table.ts | 1 + .../database/snowflakeConnection.ts | 4 + 24 files changed, 392 insertions(+), 49 deletions(-) create mode 100644 ingestion/src/metadata/ingestion/source/database/snowflake/constants.py diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py index 8a9349f2e4f..e21539a8f83 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py @@ -446,6 +446,11 @@ class ESMixin(Generic[T]): "tableType": TableType.Dynamic.value } }, + { + "term": { + "tableType": TableType.Stream.value + } + }, ] } } diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index a5da80460cb..201f05e4df4 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -509,6 +509,7 @@ class CommonDbSourceService( foreign_columns, ) = self.get_columns_and_constraints( schema_name=schema_name, + table_type=table_type, table_name=table_name, db_name=self.context.get().database, inspector=self.inspector, diff --git a/ingestion/src/metadata/ingestion/source/database/doris/metadata.py b/ingestion/src/metadata/ingestion/source/database/doris/metadata.py index b90a2d84abb..9822bc1d8c4 100644 --- a/ingestion/src/metadata/ingestion/source/database/doris/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/doris/metadata.py @@ -224,8 +224,13 @@ class DorisSource(CommonDbSourceService): return table_columns, primary_columns - def get_columns_and_constraints( - self, schema_name: str, table_name: str, db_name: str, inspector: Inspector + def get_columns_and_constraints( # pylint: disable=too-many-locals + self, + schema_name: str, + table_name: str, + db_name: str, + inspector: Inspector, + table_type: str = None, ) -> Tuple[ Optional[List[Column]], Optional[List[TableConstraint]], Optional[List[Dict]] ]: diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py b/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py index 6c98a201b31..00d5e49ef85 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py @@ -47,6 +47,7 @@ from metadata.ingestion.source.database.snowflake.queries import ( SNOWFLAKE_GET_DATABASES, SNOWFLAKE_TEST_FETCH_TAG, SNOWFLAKE_TEST_GET_QUERIES, + SNOWFLAKE_TEST_GET_STREAMS, SNOWFLAKE_TEST_GET_TABLES, SNOWFLAKE_TEST_GET_VIEWS, ) @@ -192,6 +193,11 @@ def test_connection( statement=SNOWFLAKE_TEST_GET_VIEWS, engine_wrapper=engine_wrapper, ), + "GetStreams": partial( + test_table_query, + statement=SNOWFLAKE_TEST_GET_STREAMS, + engine_wrapper=engine_wrapper, + ), "GetQueries": partial( test_query, statement=SNOWFLAKE_TEST_GET_QUERIES.format( diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/constants.py b/ingestion/src/metadata/ingestion/source/database/snowflake/constants.py new file mode 100644 index 00000000000..0b2391fb847 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/constants.py @@ -0,0 +1,48 @@ +# Copyright 2025 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Snowflake constants +""" + +from sqlalchemy.sql.sqltypes import BOOLEANTYPE, VARCHAR + +DEFAULT_STREAM_COLUMNS = [ + { + "name": "METADATA$ACTION", + "type": VARCHAR(length=16777216), + "nullable": True, + "default": None, + "autoincrement": False, + "system_data_type": "TEXT(16777216)", + "comment": None, + "primary_key": False, + }, + { + "name": "METADATA$ISUPDATE", + "type": BOOLEANTYPE, + "nullable": True, + "default": None, + "autoincrement": False, + "system_data_type": "BOOLEAN", + "comment": None, + "primary_key": False, + }, + { + "name": "METADATA$ROW_ID", + "type": VARCHAR(length=16777216), + "nullable": True, + "default": None, + "autoincrement": False, + "system_data_type": "TEXT(16777216)", + "comment": None, + "primary_key": False, + }, +] diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py index 97544026649..76b15764ebd 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py @@ -69,6 +69,9 @@ from metadata.ingestion.source.database.incremental_metadata_extraction import ( IncrementalConfig, ) from metadata.ingestion.source.database.multi_db_source import MultiDBSource +from metadata.ingestion.source.database.snowflake.constants import ( + DEFAULT_STREAM_COLUMNS, +) from metadata.ingestion.source.database.snowflake.models import ( STORED_PROC_LANGUAGE_MAP, SnowflakeStoredProcedure, @@ -86,6 +89,7 @@ from metadata.ingestion.source.database.snowflake.queries import ( SNOWFLAKE_GET_ORGANIZATION_NAME, SNOWFLAKE_GET_SCHEMA_COMMENTS, SNOWFLAKE_GET_STORED_PROCEDURES, + SNOWFLAKE_GET_STREAM, SNOWFLAKE_LIFE_CYCLE_QUERY, SNOWFLAKE_SESSION_TAG_QUERY, ) @@ -96,6 +100,9 @@ from metadata.ingestion.source.database.snowflake.utils import ( get_pk_constraint, get_schema_columns, get_schema_foreign_keys, + get_stream_definition, + get_stream_names, + get_stream_names_reflection, get_table_comment, get_table_ddl, get_table_names, @@ -123,6 +130,7 @@ logger = ingestion_logger() SnowflakeDialect._json_deserializer = json.loads # pylint: disable=protected-access SnowflakeDialect.get_table_names = get_table_names SnowflakeDialect.get_view_names = get_view_names +SnowflakeDialect.get_stream_names = get_stream_names SnowflakeDialect.get_all_table_comments = get_all_table_comments SnowflakeDialect.normalize_name = normalize_names SnowflakeDialect.get_table_comment = get_table_comment @@ -133,6 +141,7 @@ SnowflakeDialect._get_schema_columns = ( # pylint: disable=protected-access ) Inspector.get_table_names = get_table_names_reflection Inspector.get_view_names = get_view_names_reflection +Inspector.get_stream_names = get_stream_names_reflection SnowflakeDialect._current_database_schema = ( # pylint: disable=protected-access _current_database_schema ) @@ -141,6 +150,7 @@ SnowflakeDialect.get_foreign_keys = get_foreign_keys SnowflakeDialect.get_columns = get_columns Inspector.get_all_table_ddls = get_all_table_ddls Inspector.get_table_ddl = get_table_ddl +Inspector.get_stream_definition = get_stream_definition SnowflakeDialect._get_schema_foreign_keys = get_schema_foreign_keys @@ -304,9 +314,11 @@ class SnowflakeSource( if filter_by_database( self.source_config.databaseFilterPattern, - database_fqn - if self.source_config.useFqnForFiltering - else new_database, + ( + database_fqn + if self.source_config.useFqnForFiltering + else new_database + ), ): self.status.filter(database_fqn, "Database Filtered Out") continue @@ -495,6 +507,33 @@ class SnowflakeSource( for table in snowflake_tables.get_not_deleted() ] + def _get_stream_names_and_types(self, schema_name: str) -> List[TableNameAndType]: + table_type = TableType.Stream + + snowflake_streams = self.inspector.get_stream_names( + schema=schema_name, + incremental=self.incremental, + ) + + self.context.get_global().deleted_tables.extend( + [ + fqn.build( + metadata=self.metadata, + entity_type=Table, + service_name=self.context.get().database_service, + database_name=self.context.get().database, + schema_name=schema_name, + table_name=stream.name, + ) + for stream in snowflake_streams.get_deleted() + ] + ) + + return [ + TableNameAndType(name=stream.name, type_=table_type) + for stream in snowflake_streams.get_not_deleted() + ] + def query_table_names_and_types( self, schema_name: str ) -> Iterable[TableNameAndType]: @@ -523,6 +562,9 @@ class SnowflakeSource( ) ) + if self.service_connection.includeStreams: + table_list.extend(self._get_stream_names_and_types(schema_name)) + return table_list def _get_org_name(self) -> Optional[str]: @@ -750,3 +792,71 @@ class SnowflakeSource( ) else: yield from super().mark_tables_as_deleted() + + def _get_columns_internal( + self, + schema_name: str, + table_name: str, + db_name: str, + inspector: Inspector, + table_type: TableType = None, + ): + """ + Get columns of table/view/stream + """ + if table_type == TableType.Stream: + cursor = self.connection.execute( + SNOWFLAKE_GET_STREAM.format(stream_name=table_name, schema=schema_name) + ) + try: + result = cursor.fetchone() + if result: + table_name = result[6].split(".")[-1] + except Exception: + pass + + columns = inspector.get_columns( + table_name, schema_name, table_type=table_type, db_name=db_name + ) + + if table_type == TableType.Stream: + columns = [*columns, *DEFAULT_STREAM_COLUMNS] + + return columns + + def get_schema_definition( + self, + table_type: TableType, + table_name: str, + schema_name: str, + inspector: Inspector, + ) -> Optional[str]: + """ + Get the DDL statement, View Definition or Stream Definition for a table + """ + try: + schema_definition = None + if table_type in (TableType.View, TableType.MaterializedView): + schema_definition = inspector.get_view_definition( + table_name, schema_name + ) + elif table_type == TableType.Stream: + schema_definition = inspector.get_stream_definition( + self.connection, table_name, schema_name + ) + elif hasattr(inspector, "get_table_ddl") and self.source_config.includeDDL: + schema_definition = inspector.get_table_ddl( + self.connection, table_name, schema_name + ) + schema_definition = ( + str(schema_definition).strip() + if schema_definition is not None + else None + ) + return schema_definition + + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.debug(f"Failed to fetch schema definition for {table_name}: {exc}") + + return None diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/models.py b/ingestion/src/metadata/ingestion/source/database/snowflake/models.py index 594adb1149f..75603fba28b 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/models.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/models.py @@ -85,8 +85,8 @@ class SnowflakeStoredProcedure(BaseModel): class SnowflakeTable(BaseModel): - """Models the items returned from the Table and View Queries used to get the entities to process. - :name: Holds the table/view name. + """Models the items returned from the Table, View and Stream Queries used to get the entities to process. + :name: Holds the table/view/stream name. :deleted: Holds either a datetime if the table was deleted or None. """ @@ -95,7 +95,7 @@ class SnowflakeTable(BaseModel): class SnowflakeTableList(BaseModel): - """Understands how to return the deleted and not deleted tables/views from a given list.""" + """Understands how to return the deleted and not deleted tables/views/streams from a given list.""" tables: List[SnowflakeTable] diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py b/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py index 6db2079949a..1544cdedb0b 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py @@ -143,6 +143,18 @@ from ( where ROW_NUMBER = 1 """ +SNOWFLAKE_GET_STREAM_NAMES = """ +SHOW STREAMS IN SCHEMA "{schema}" +""" + +SNOWFLAKE_INCREMENTAL_GET_STREAM_NAMES = """ +SHOW STREAMS IN SCHEMA "{schema}" +""" + +SNOWFLAKE_GET_STREAM = """ +SHOW STREAMS LIKE '{stream_name}' IN SCHEMA "{schema}" +""" + SNOWFLAKE_GET_TRANSIENT_NAMES = """ select TABLE_NAME, NULL from information_schema.tables where TABLE_SCHEMA = '{schema}' @@ -250,6 +262,10 @@ SNOWFLAKE_TEST_GET_VIEWS = """ SELECT TABLE_NAME FROM "{database_name}".information_schema.views LIMIT 1 """ +SNOWFLAKE_TEST_GET_STREAMS = """ +SHOW STREAMS IN DATABASE "{database_name}" +""" + SNOWFLAKE_GET_DATABASES = "SHOW DATABASES" @@ -389,6 +405,15 @@ ORDER BY PROCEDURE_START_TIME DESC SNOWFLAKE_GET_TABLE_DDL = """ SELECT GET_DDL('TABLE','{table_name}') AS \"text\" """ + +SNOWFLAKE_GET_VIEW_DEFINITION = """ +SELECT GET_DDL('VIEW','{view_name}') AS \"text\" +""" + +SNOWFLAKE_GET_STREAM_DEFINITION = """ +SELECT GET_DDL('STREAM','{stream_name}') AS \"text\" +""" + SNOWFLAKE_QUERY_LOG_QUERY = """ SELECT QUERY_ID, diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py b/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py index a40cef61a2b..fe9938d1f4f 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py @@ -10,7 +10,7 @@ # limitations under the License. """ -Module to define overriden dialect methods +Module to define overridden dialect methods """ import operator from functools import reduce @@ -37,13 +37,17 @@ from metadata.ingestion.source.database.snowflake.queries import ( SNOWFLAKE_GET_EXTERNAL_TABLE_NAMES, SNOWFLAKE_GET_MVIEW_NAMES, SNOWFLAKE_GET_SCHEMA_COLUMNS, + SNOWFLAKE_GET_STREAM_DEFINITION, + SNOWFLAKE_GET_STREAM_NAMES, SNOWFLAKE_GET_TABLE_DDL, SNOWFLAKE_GET_TRANSIENT_NAMES, + SNOWFLAKE_GET_VIEW_DEFINITION, SNOWFLAKE_GET_VIEW_NAMES, SNOWFLAKE_GET_WITHOUT_TRANSIENT_TABLE_NAMES, SNOWFLAKE_INCREMENTAL_GET_DYNAMIC_TABLE_NAMES, SNOWFLAKE_INCREMENTAL_GET_EXTERNAL_TABLE_NAMES, SNOWFLAKE_INCREMENTAL_GET_MVIEW_NAMES, + SNOWFLAKE_INCREMENTAL_GET_STREAM_NAMES, SNOWFLAKE_INCREMENTAL_GET_TRANSIENT_NAMES, SNOWFLAKE_INCREMENTAL_GET_VIEW_NAMES, SNOWFLAKE_INCREMENTAL_GET_WITHOUT_TRANSIENT_TABLE_NAMES, @@ -85,6 +89,15 @@ VIEW_QUERY_MAPS = { }, } +STREAM_QUERY_MAPS = { + "full": { + "default": SNOWFLAKE_GET_STREAM_NAMES, + }, + "incremental": { + "default": SNOWFLAKE_INCREMENTAL_GET_STREAM_NAMES, + }, +} + def _denormalize_quote_join(*idents): ip = dialect.identifier_preparer @@ -150,6 +163,20 @@ def get_view_names_reflection(self, schema=None, **kw): ) +def get_stream_names_reflection(self, schema=None, **kw): + """Return all stream names in `schema`. + + :param schema: Optional, retrieve names from a non-default schema. + For special quoting, use :class:`.quoted_name`. + + """ + + with self._operation_context() as conn: # pylint: disable=protected-access + return self.dialect.get_stream_names( + conn, schema, info_cache=self.info_cache, **kw + ) + + def _get_query_map( incremental: Optional[IncrementalConfig], query_maps: Dict[str, QueryMap] ): @@ -162,7 +189,7 @@ def _get_query_map( def _get_query_parameters( self, connection, schema: str, incremental: Optional[IncrementalConfig] ): - """Returns the proper query parameters depending if the extraciton is Incremental or Full""" + """Returns the proper query parameters depending if the extraction is Incremental or Full""" parameters = {"schema": fqn.unquote_name(schema)} if incremental and incremental.enabled: @@ -225,6 +252,24 @@ def get_view_names(self, connection, schema, **kw): return result +def get_stream_names(self, connection, schema, **kw): + incremental = kw.get("incremental") + + queries = _get_query_map(incremental, STREAM_QUERY_MAPS) + parameters = _get_query_parameters(self, connection, schema, incremental) + + query = queries["default"] + + cursor = connection.execute(query.format(**parameters)) + result = SnowflakeTableList( + tables=[ + SnowflakeTable(name=self.normalize_name(row[1]), deleted=None) + for row in cursor + ] + ) + return result + + @reflection.cache def get_view_definition( # pylint: disable=unused-argument self, connection, view_name, schema=None, **kw @@ -233,12 +278,10 @@ def get_view_definition( # pylint: disable=unused-argument Gets the view definition """ schema = schema or self.default_schema_name - if schema: - cursor = connection.execute( - f"SELECT GET_DDL('VIEW','{schema}.{view_name}') AS \"text\"" - ) - else: - cursor = connection.execute(f"SELECT GET_DDL('VIEW','{view_name}') AS \"text\"") + view_name = f"{schema}.{view_name}" if schema else view_name + cursor = connection.execute( + SNOWFLAKE_GET_VIEW_DEFINITION.format(view_name=view_name) + ) n2i = self.__class__._map_name_to_idx(cursor) # pylint: disable=protected-access try: ret = cursor.fetchone() @@ -249,6 +292,27 @@ def get_view_definition( # pylint: disable=unused-argument return None +@reflection.cache +def get_stream_definition( # pylint: disable=unused-argument + self, connection, stream_name, schema=None, **kw +): + """ + Gets the stream definition + """ + schema = schema or self.default_schema_name + stream_name = f"{schema}.{stream_name}" if schema else stream_name + cursor = connection.execute( + SNOWFLAKE_GET_STREAM_DEFINITION.format(stream_name=stream_name) + ) + try: + result = cursor.fetchone() + if result: + return result[0] + except Exception: + pass + return None + + @reflection.cache def get_table_comment( self, connection, table_name, schema=None, **kw @@ -346,11 +410,13 @@ def get_schema_columns(self, connection, schema, **kw): ), "comment": comment, "primary_key": ( - column_name - in schema_primary_keys[table_name]["constrained_columns"] - ) - if current_table_pks - else False, + ( + column_name + in schema_primary_keys[table_name]["constrained_columns"] + ) + if current_table_pks + else False + ), } ) if is_identity == "YES": diff --git a/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py b/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py index 9f2b8689840..dc1a38182d3 100644 --- a/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py +++ b/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py @@ -24,6 +24,7 @@ from metadata.generated.schema.entity.data.table import ( ConstraintType, DataType, TableConstraint, + TableType, ) from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser from metadata.utils.execution_time_tracker import calculate_execution_time @@ -202,9 +203,30 @@ class SqlColumnHandlerMixin: ] return Column(**parsed_string) + def _get_columns_internal( + self, + schema_name: str, + table_name: str, + db_name: str, + inspector: Inspector, + table_type: TableType = None, + ): + """ + Get columns list + """ + + return inspector.get_columns( + table_name, schema_name, table_type=table_type, db_name=db_name + ) + @calculate_execution_time() def get_columns_and_constraints( # pylint: disable=too-many-locals - self, schema_name: str, table_name: str, db_name: str, inspector: Inspector + self, + schema_name: str, + table_name: str, + db_name: str, + inspector: Inspector, + table_type: TableType = None, ) -> Tuple[ Optional[List[Column]], Optional[List[TableConstraint]], Optional[List[Dict]] ]: @@ -246,7 +268,9 @@ class SqlColumnHandlerMixin: table_columns = [] - columns = inspector.get_columns(table_name, schema_name, db_name=db_name) + columns = self._get_columns_internal( + schema_name, table_name, db_name, inspector, table_type + ) def process_column(column: dict): ( diff --git a/ingestion/tests/unit/topology/database/test_bigquery.py b/ingestion/tests/unit/topology/database/test_bigquery.py index 2bce2514a04..96975645885 100644 --- a/ingestion/tests/unit/topology/database/test_bigquery.py +++ b/ingestion/tests/unit/topology/database/test_bigquery.py @@ -676,7 +676,7 @@ class BigqueryUnitTest(TestCase): ] # pylint: disable=cell-var-from-loop ) self.bq_source.inspector.get_columns = ( - lambda table_name, schema, db_name: MOCK_COLUMN_DATA[ + lambda table_name, schema, table_type, db_name: MOCK_COLUMN_DATA[ i ] # pylint: disable=cell-var-from-loop ) diff --git a/ingestion/tests/unit/topology/database/test_cockroach.py b/ingestion/tests/unit/topology/database/test_cockroach.py index b6a9531ab4a..df1abe248c5 100644 --- a/ingestion/tests/unit/topology/database/test_cockroach.py +++ b/ingestion/tests/unit/topology/database/test_cockroach.py @@ -1,6 +1,7 @@ """ Test Cockroach using the topology """ + import types from unittest import TestCase from unittest.mock import patch @@ -9,7 +10,12 @@ from sqlalchemy.types import VARCHAR from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema -from metadata.generated.schema.entity.data.table import Column, Constraint, DataType +from metadata.generated.schema.entity.data.table import ( + Column, + Constraint, + DataType, + TableType, +) from metadata.generated.schema.entity.services.databaseService import ( DatabaseConnection, DatabaseService, @@ -242,14 +248,14 @@ class cockroachUnitTest(TestCase): def test_datatype(self): inspector = types.SimpleNamespace() inspector.get_columns = ( - lambda table_name, schema_name, db_name: MOCK_COLUMN_VALUE + lambda table_name, schema_name, table_type, db_name: MOCK_COLUMN_VALUE ) inspector.get_pk_constraint = lambda table_name, schema_name: [] inspector.get_unique_constraints = lambda table_name, schema_name: [] inspector.get_foreign_keys = lambda table_name, schema_name: [] result, _, _ = self.cockroach_source.get_columns_and_constraints( - "public", "user", "cockroach", inspector + "public", "user", "cockroach", inspector, TableType.Regular ) for i, _ in enumerate(EXPECTED_COLUMN_VALUE): self.assertEqual(result[i], EXPECTED_COLUMN_VALUE[i]) diff --git a/ingestion/tests/unit/topology/database/test_hive.py b/ingestion/tests/unit/topology/database/test_hive.py index 50312890107..f41c845be02 100644 --- a/ingestion/tests/unit/topology/database/test_hive.py +++ b/ingestion/tests/unit/topology/database/test_hive.py @@ -369,7 +369,7 @@ class HiveUnitTest(TestCase): def test_yield_table(self): self.hive.inspector.get_columns = ( - lambda table_name, schema_name, db_name: MOCK_COLUMN_VALUE + lambda table_name, schema_name, table_type, db_name: MOCK_COLUMN_VALUE ) assert EXPECTED_TABLE == [ either.right diff --git a/ingestion/tests/unit/topology/database/test_mssql.py b/ingestion/tests/unit/topology/database/test_mssql.py index 0ac7516bec7..ba76f5cb96e 100644 --- a/ingestion/tests/unit/topology/database/test_mssql.py +++ b/ingestion/tests/unit/topology/database/test_mssql.py @@ -308,7 +308,9 @@ class MssqlUnitTest(TestCase): self.mssql._inspector_map[self.thread_id] = types.SimpleNamespace() self.mssql._inspector_map[ self.thread_id - ].get_columns = lambda table_name, schema_name, db_name: MOCK_COLUMN_VALUE + ].get_columns = ( + lambda table_name, schema_name, table_type, db_name: MOCK_COLUMN_VALUE + ) self.mssql._inspector_map[ self.thread_id ].get_pk_constraint = lambda table_name, schema_name: [] diff --git a/ingestion/tests/unit/topology/database/test_postgres.py b/ingestion/tests/unit/topology/database/test_postgres.py index e96855a6fc4..9478ddc28f2 100644 --- a/ingestion/tests/unit/topology/database/test_postgres.py +++ b/ingestion/tests/unit/topology/database/test_postgres.py @@ -21,7 +21,12 @@ from sqlalchemy.types import VARCHAR from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema -from metadata.generated.schema.entity.data.table import Column, Constraint, DataType +from metadata.generated.schema.entity.data.table import ( + Column, + Constraint, + DataType, + TableType, +) from metadata.generated.schema.entity.services.databaseService import ( DatabaseConnection, DatabaseService, @@ -306,13 +311,13 @@ class PostgresUnitTest(TestCase): def test_datatype(self): inspector = types.SimpleNamespace() inspector.get_columns = ( - lambda table_name, schema_name, db_name: MOCK_COLUMN_VALUE + lambda table_name, schema_name, table_type, db_name: MOCK_COLUMN_VALUE ) inspector.get_pk_constraint = lambda table_name, schema_name: [] inspector.get_unique_constraints = lambda table_name, schema_name: [] inspector.get_foreign_keys = lambda table_name, schema_name: [] result, _, _ = self.postgres_source.get_columns_and_constraints( - "public", "user", "postgres", inspector + "public", "user", "postgres", inspector, TableType.Regular ) for i, _ in enumerate(EXPECTED_COLUMN_VALUE): self.assertEqual(result[i], EXPECTED_COLUMN_VALUE[i]) diff --git a/openmetadata-docs/content/v1.7.x-SNAPSHOT/connectors/database/snowflake/index.md b/openmetadata-docs/content/v1.7.x-SNAPSHOT/connectors/database/snowflake/index.md index aea97c7f905..69540e0f471 100644 --- a/openmetadata-docs/content/v1.7.x-SNAPSHOT/connectors/database/snowflake/index.md +++ b/openmetadata-docs/content/v1.7.x-SNAPSHOT/connectors/database/snowflake/index.md @@ -112,8 +112,10 @@ You can find more information about the `account_usage` schema [here](https://do - **Private Key (Optional)**: If you have configured the key pair authentication for the given user you will have to pass the private key associated with the user in this field. You can checkout [this](https://docs.snowflake.com/en/user-guide/key-pair-auth) doc to get more details about key-pair authentication. - The multi-line key needs to be converted to one line with `\n` for line endings i.e. `-----BEGIN ENCRYPTED PRIVATE KEY-----\nMII...\n...\n-----END ENCRYPTED PRIVATE KEY-----` - **Snowflake Passphrase Key (Optional)**: If you have configured the encrypted key pair authentication for the given user you will have to pass the paraphrase associated with the private key in this field. You can checkout [this](https://docs.snowflake.com/en/user-guide/key-pair-auth) doc to get more details about key-pair authentication. -- **Include Temporary and Transient Tables**: +- **Include Temporary and Transient Tables**: Optional configuration for ingestion of `TRANSIENT` and `TEMPORARY` tables, By default, it will skip the `TRANSIENT` and `TEMPORARY` tables. +- **Include Streams**: +Optional configuration for ingestion of streams, By default, it will skip the streams. - **Client Session Keep Alive**: Optional Configuration to keep the session active in case the ingestion job runs for longer duration. - **Account Usage Schema Name**: Full name of account usage schema, used in case your used do not have direct access to `SNOWFLAKE.ACCOUNT_USAGE` schema. In such case you can replicate tables `QUERY_HISTORY`, `TAG_REFERENCES`, `PROCEDURES`, `FUNCTIONS` to a custom schema let's say `CUSTOM_DB.CUSTOM_SCHEMA` and provide the same name in this field. diff --git a/openmetadata-docs/content/v1.7.x-SNAPSHOT/connectors/database/snowflake/yaml.md b/openmetadata-docs/content/v1.7.x-SNAPSHOT/connectors/database/snowflake/yaml.md index 6d25d5ebb90..2090f1bb93e 100644 --- a/openmetadata-docs/content/v1.7.x-SNAPSHOT/connectors/database/snowflake/yaml.md +++ b/openmetadata-docs/content/v1.7.x-SNAPSHOT/connectors/database/snowflake/yaml.md @@ -164,27 +164,33 @@ When using this field make sure you have all these tables available within your {% /codeInfo %} -{% codeInfo srNumber=39 %} +{% codeInfo srNumber=7 %} -**clientSessionKeepAlive**: Optional Configuration to keep the session active in case the ingestion job runs for longer duration. +**includeStreams**: Optional configuration for ingestion of streams, By default, it will skip the streams. {% /codeInfo %} -{% codeInfo srNumber=7 %} +{% codeInfo srNumber=39 %} + +**clientSessionKeepAlive**: Optional Configuration to keep the session active in case the ingestion job runs for longer duration. + +{% /codeInfo %} + +{% codeInfo srNumber=8 %} **privateKey**: If you have configured the key pair authentication for the given user you will have to pass the private key associated with the user in this field. You can checkout [this](https://docs.snowflake.com/en/user-guide/key-pair-auth) doc to get more details about key-pair authentication. - The multi-line key needs to be converted to one line with `\n` for line endings i.e. `-----BEGIN ENCRYPTED PRIVATE KEY-----\nMII...\n...\n-----END ENCRYPTED PRIVATE KEY-----` {% /codeInfo %} -{% codeInfo srNumber=8 %} +{% codeInfo srNumber=9 %} **snowflakePrivatekeyPassphrase**: If you have configured the encrypted key pair authentication for the given user you will have to pass the paraphrase associated with the private key in this field. You can checkout [this](https://docs.snowflake.com/en/user-guide/key-pair-auth) doc to get more details about key-pair authentication. {% /codeInfo %} -{% codeInfo srNumber=9 %} +{% codeInfo srNumber=10 %} **role**: You can specify the role of user that you would like to ingest with, if no role is specified the default roles assigned to user will be selected. @@ -198,13 +204,13 @@ When using this field make sure you have all these tables available within your #### Advanced Configuration -{% codeInfo srNumber=10 %} +{% codeInfo srNumber=11 %} **Connection Options (Optional)**: Enter the details for any additional connection options that can be sent to database during the connection. These details must be added as Key-Value pairs. {% /codeInfo %} -{% codeInfo srNumber=11 %} +{% codeInfo srNumber=12 %} **Connection Arguments (Optional)**: Enter the details for any additional connection arguments such as security or protocol configs that can be sent to database during the connection. These details must be added as Key-Value pairs. @@ -245,23 +251,26 @@ source: ```yaml {% srNumber=6 %} includeTransientTables: false ``` +```yaml {% srNumber=7 %} + includeStreams: false +``` ```yaml {% srNumber=39 %} clientSessionKeepAlive: false ``` -```yaml {% srNumber=7 %} +```yaml {% srNumber=8 %} # privateKey: ``` -```yaml {% srNumber=8 %} +```yaml {% srNumber=9 %} # snowflakePrivatekeyPassphrase: ``` -```yaml {% srNumber=9 %} +```yaml {% srNumber=10 %} # role: ``` -```yaml {% srNumber=10 %} +```yaml {% srNumber=11 %} # connectionOptions: # key: value ``` -```yaml {% srNumber=11 %} +```yaml {% srNumber=12 %} # connectionArguments: # key: value ``` diff --git a/openmetadata-docs/content/v1.7.x-SNAPSHOT/main-concepts/metadata-standard/schemas/entity/services/connections/database/snowflakeConnection.md b/openmetadata-docs/content/v1.7.x-SNAPSHOT/main-concepts/metadata-standard/schemas/entity/services/connections/database/snowflakeConnection.md index 2dc628badf3..0d02dc29c65 100644 --- a/openmetadata-docs/content/v1.7.x-SNAPSHOT/main-concepts/metadata-standard/schemas/entity/services/connections/database/snowflakeConnection.md +++ b/openmetadata-docs/content/v1.7.x-SNAPSHOT/main-concepts/metadata-standard/schemas/entity/services/connections/database/snowflakeConnection.md @@ -20,7 +20,8 @@ slug: /main-concepts/metadata-standard/schemas/entity/services/connections/datab - **`queryTag`** *(string)*: Session query tag used to monitor usage on snowflake. To use a query tag snowflake user should have enough privileges to alter the session. - **`privateKey`** *(string, format: password)*: Connection to Snowflake instance via Private Key. - **`snowflakePrivatekeyPassphrase`** *(string, format: password)*: Snowflake Passphrase Key used with Private Key. -- **`includeTransientTables`** *(boolean)*: Optional configuration for ingestion of TRANSIENT tables, By default, it will skip the TRANSIENT tables. Default: `true`. +- **`includeTransientTables`** *(boolean)*: Optional configuration for ingestion of TRANSIENT tables, By default, it will skip the TRANSIENT tables. Default: `false`. +- **`includeStreams`** *(boolean)*: Optional configuration for ingestion of streams, By default, it will skip the streams. Default: `false`. - **`clientSessionKeepAlive`** *(boolean)*: Optional configuration for ingestion to keep the client session active in case the ingestion process runs for longer durations. Default: `false`. - **`connectionOptions`**: Refer to *[../connectionBasicType.json#/definitions/connectionOptions](#/connectionBasicType.json#/definitions/connectionOptions)*. - **`connectionArguments`**: Refer to *[../connectionBasicType.json#/definitions/connectionArguments](#/connectionBasicType.json#/definitions/connectionArguments)*. diff --git a/openmetadata-service/src/main/resources/json/data/testConnections/database/snowflake.json b/openmetadata-service/src/main/resources/json/data/testConnections/database/snowflake.json index 8c95e534787..321587d8235 100644 --- a/openmetadata-service/src/main/resources/json/data/testConnections/database/snowflake.json +++ b/openmetadata-service/src/main/resources/json/data/testConnections/database/snowflake.json @@ -34,6 +34,12 @@ "errorMessage": "Failed to fetch views, please validate if the user has enough privilege to fetch views.", "mandatory": false }, + { + "name": "GetStreams", + "description": "From a given schema, list the streams belonging to that schema. If no schema is specified, we'll list the streams of a random schema.", + "errorMessage": "Failed to fetch streams, please validate if the user has enough privilege to fetch streams.", + "mandatory": false + }, { "name": "GetTags", "description": "Check if we can access the snowflake.account_usage.tag_references table to list all available tags.", diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json index 8f277e3860f..c0559cf884b 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json @@ -42,7 +42,8 @@ "Local", "Partitioned", "Foreign", - "Transient" + "Transient", + "Stream" ], "javaEnums": [ { @@ -77,6 +78,9 @@ }, { "name": "Transient" + }, + { + "name": "Stream" } ] }, diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/snowflakeConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/snowflakeConnection.json index 3f6c5587954..928fa82f0a8 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/snowflakeConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/snowflakeConnection.json @@ -90,7 +90,13 @@ "title": "Include Transient Tables", "description": "Optional configuration for ingestion of TRANSIENT tables, By default, it will skip the TRANSIENT tables.", "type": "boolean", - "default": true + "default": false + }, + "includeStreams": { + "title": "Include Streams", + "description": "Optional configuration for ingestion of streams, By default, it will skip the streams.", + "type": "boolean", + "default": false }, "clientSessionKeepAlive": { "title": "Client Session Keep Alive", diff --git a/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Database/Snowflake.md b/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Database/Snowflake.md index 2b6fda8c7b5..74989cbc962 100644 --- a/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Database/Snowflake.md +++ b/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Database/Snowflake.md @@ -151,10 +151,17 @@ In Snowflake, we have `TRANSIENT` tables, which will be ignored during the inges Enable this setting to ingest them during the metadata workflow. $$ +$$section +### Include Streams $(id="includeStreams") + +In Snowflake, we have streams, which will be ignored during the ingestion by default. +Enable this setting to ingest them during the metadata workflow. +$$ + $$section ### Client Session Keep Alive $(id="clientSessionKeepAlive") -Optional Configuration to keep the session active in case the ingestion job runs for longer duration. +Optional Configuration to keep the session active in case the ingestion job runs for longer duration. $$ $$section diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/table.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/table.ts index f390c53b2b5..d4473d655b8 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/table.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/table.ts @@ -1275,6 +1275,7 @@ export enum TableType { Partitioned = "Partitioned", Regular = "Regular", SecureView = "SecureView", + Stream = "Stream", Transient = "Transient", View = "View", } diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/database/snowflakeConnection.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/database/snowflakeConnection.ts index 2ddf2058b9c..f3315a6295b 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/database/snowflakeConnection.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/database/snowflakeConnection.ts @@ -49,6 +49,10 @@ export interface SnowflakeConnection { * TRANSIENT tables. */ includeTransientTables?: boolean; + /** + * Optional configuration for ingestion of streams, By default, it will skip the streams. + */ + includeStreams?: boolean; /** * Password to connect to Snowflake. */