Fixes #19534: Snowflake stream ingestion support (#20278)

This commit is contained in:
Mohit Tilala 2025-04-01 13:02:37 +05:30 committed by GitHub
parent 7ad97afa62
commit 06ab82170b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 392 additions and 49 deletions

View File

@ -446,6 +446,11 @@ class ESMixin(Generic[T]):
"tableType": TableType.Dynamic.value
}
},
{
"term": {
"tableType": TableType.Stream.value
}
},
]
}
}

View File

@ -509,6 +509,7 @@ class CommonDbSourceService(
foreign_columns,
) = self.get_columns_and_constraints(
schema_name=schema_name,
table_type=table_type,
table_name=table_name,
db_name=self.context.get().database,
inspector=self.inspector,

View File

@ -224,8 +224,13 @@ class DorisSource(CommonDbSourceService):
return table_columns, primary_columns
def get_columns_and_constraints(
self, schema_name: str, table_name: str, db_name: str, inspector: Inspector
def get_columns_and_constraints( # pylint: disable=too-many-locals
self,
schema_name: str,
table_name: str,
db_name: str,
inspector: Inspector,
table_type: str = None,
) -> Tuple[
Optional[List[Column]], Optional[List[TableConstraint]], Optional[List[Dict]]
]:

View File

@ -47,6 +47,7 @@ from metadata.ingestion.source.database.snowflake.queries import (
SNOWFLAKE_GET_DATABASES,
SNOWFLAKE_TEST_FETCH_TAG,
SNOWFLAKE_TEST_GET_QUERIES,
SNOWFLAKE_TEST_GET_STREAMS,
SNOWFLAKE_TEST_GET_TABLES,
SNOWFLAKE_TEST_GET_VIEWS,
)
@ -192,6 +193,11 @@ def test_connection(
statement=SNOWFLAKE_TEST_GET_VIEWS,
engine_wrapper=engine_wrapper,
),
"GetStreams": partial(
test_table_query,
statement=SNOWFLAKE_TEST_GET_STREAMS,
engine_wrapper=engine_wrapper,
),
"GetQueries": partial(
test_query,
statement=SNOWFLAKE_TEST_GET_QUERIES.format(

View File

@ -0,0 +1,48 @@
# Copyright 2025 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Snowflake constants
"""
from sqlalchemy.sql.sqltypes import BOOLEANTYPE, VARCHAR
DEFAULT_STREAM_COLUMNS = [
{
"name": "METADATA$ACTION",
"type": VARCHAR(length=16777216),
"nullable": True,
"default": None,
"autoincrement": False,
"system_data_type": "TEXT(16777216)",
"comment": None,
"primary_key": False,
},
{
"name": "METADATA$ISUPDATE",
"type": BOOLEANTYPE,
"nullable": True,
"default": None,
"autoincrement": False,
"system_data_type": "BOOLEAN",
"comment": None,
"primary_key": False,
},
{
"name": "METADATA$ROW_ID",
"type": VARCHAR(length=16777216),
"nullable": True,
"default": None,
"autoincrement": False,
"system_data_type": "TEXT(16777216)",
"comment": None,
"primary_key": False,
},
]

View File

@ -69,6 +69,9 @@ from metadata.ingestion.source.database.incremental_metadata_extraction import (
IncrementalConfig,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.ingestion.source.database.snowflake.constants import (
DEFAULT_STREAM_COLUMNS,
)
from metadata.ingestion.source.database.snowflake.models import (
STORED_PROC_LANGUAGE_MAP,
SnowflakeStoredProcedure,
@ -86,6 +89,7 @@ from metadata.ingestion.source.database.snowflake.queries import (
SNOWFLAKE_GET_ORGANIZATION_NAME,
SNOWFLAKE_GET_SCHEMA_COMMENTS,
SNOWFLAKE_GET_STORED_PROCEDURES,
SNOWFLAKE_GET_STREAM,
SNOWFLAKE_LIFE_CYCLE_QUERY,
SNOWFLAKE_SESSION_TAG_QUERY,
)
@ -96,6 +100,9 @@ from metadata.ingestion.source.database.snowflake.utils import (
get_pk_constraint,
get_schema_columns,
get_schema_foreign_keys,
get_stream_definition,
get_stream_names,
get_stream_names_reflection,
get_table_comment,
get_table_ddl,
get_table_names,
@ -123,6 +130,7 @@ logger = ingestion_logger()
SnowflakeDialect._json_deserializer = json.loads # pylint: disable=protected-access
SnowflakeDialect.get_table_names = get_table_names
SnowflakeDialect.get_view_names = get_view_names
SnowflakeDialect.get_stream_names = get_stream_names
SnowflakeDialect.get_all_table_comments = get_all_table_comments
SnowflakeDialect.normalize_name = normalize_names
SnowflakeDialect.get_table_comment = get_table_comment
@ -133,6 +141,7 @@ SnowflakeDialect._get_schema_columns = ( # pylint: disable=protected-access
)
Inspector.get_table_names = get_table_names_reflection
Inspector.get_view_names = get_view_names_reflection
Inspector.get_stream_names = get_stream_names_reflection
SnowflakeDialect._current_database_schema = ( # pylint: disable=protected-access
_current_database_schema
)
@ -141,6 +150,7 @@ SnowflakeDialect.get_foreign_keys = get_foreign_keys
SnowflakeDialect.get_columns = get_columns
Inspector.get_all_table_ddls = get_all_table_ddls
Inspector.get_table_ddl = get_table_ddl
Inspector.get_stream_definition = get_stream_definition
SnowflakeDialect._get_schema_foreign_keys = get_schema_foreign_keys
@ -304,9 +314,11 @@ class SnowflakeSource(
if filter_by_database(
self.source_config.databaseFilterPattern,
database_fqn
if self.source_config.useFqnForFiltering
else new_database,
(
database_fqn
if self.source_config.useFqnForFiltering
else new_database
),
):
self.status.filter(database_fqn, "Database Filtered Out")
continue
@ -495,6 +507,33 @@ class SnowflakeSource(
for table in snowflake_tables.get_not_deleted()
]
def _get_stream_names_and_types(self, schema_name: str) -> List[TableNameAndType]:
table_type = TableType.Stream
snowflake_streams = self.inspector.get_stream_names(
schema=schema_name,
incremental=self.incremental,
)
self.context.get_global().deleted_tables.extend(
[
fqn.build(
metadata=self.metadata,
entity_type=Table,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
schema_name=schema_name,
table_name=stream.name,
)
for stream in snowflake_streams.get_deleted()
]
)
return [
TableNameAndType(name=stream.name, type_=table_type)
for stream in snowflake_streams.get_not_deleted()
]
def query_table_names_and_types(
self, schema_name: str
) -> Iterable[TableNameAndType]:
@ -523,6 +562,9 @@ class SnowflakeSource(
)
)
if self.service_connection.includeStreams:
table_list.extend(self._get_stream_names_and_types(schema_name))
return table_list
def _get_org_name(self) -> Optional[str]:
@ -750,3 +792,71 @@ class SnowflakeSource(
)
else:
yield from super().mark_tables_as_deleted()
def _get_columns_internal(
self,
schema_name: str,
table_name: str,
db_name: str,
inspector: Inspector,
table_type: TableType = None,
):
"""
Get columns of table/view/stream
"""
if table_type == TableType.Stream:
cursor = self.connection.execute(
SNOWFLAKE_GET_STREAM.format(stream_name=table_name, schema=schema_name)
)
try:
result = cursor.fetchone()
if result:
table_name = result[6].split(".")[-1]
except Exception:
pass
columns = inspector.get_columns(
table_name, schema_name, table_type=table_type, db_name=db_name
)
if table_type == TableType.Stream:
columns = [*columns, *DEFAULT_STREAM_COLUMNS]
return columns
def get_schema_definition(
self,
table_type: TableType,
table_name: str,
schema_name: str,
inspector: Inspector,
) -> Optional[str]:
"""
Get the DDL statement, View Definition or Stream Definition for a table
"""
try:
schema_definition = None
if table_type in (TableType.View, TableType.MaterializedView):
schema_definition = inspector.get_view_definition(
table_name, schema_name
)
elif table_type == TableType.Stream:
schema_definition = inspector.get_stream_definition(
self.connection, table_name, schema_name
)
elif hasattr(inspector, "get_table_ddl") and self.source_config.includeDDL:
schema_definition = inspector.get_table_ddl(
self.connection, table_name, schema_name
)
schema_definition = (
str(schema_definition).strip()
if schema_definition is not None
else None
)
return schema_definition
except Exception as exc:
logger.debug(traceback.format_exc())
logger.debug(f"Failed to fetch schema definition for {table_name}: {exc}")
return None

View File

@ -85,8 +85,8 @@ class SnowflakeStoredProcedure(BaseModel):
class SnowflakeTable(BaseModel):
"""Models the items returned from the Table and View Queries used to get the entities to process.
:name: Holds the table/view name.
"""Models the items returned from the Table, View and Stream Queries used to get the entities to process.
:name: Holds the table/view/stream name.
:deleted: Holds either a datetime if the table was deleted or None.
"""
@ -95,7 +95,7 @@ class SnowflakeTable(BaseModel):
class SnowflakeTableList(BaseModel):
"""Understands how to return the deleted and not deleted tables/views from a given list."""
"""Understands how to return the deleted and not deleted tables/views/streams from a given list."""
tables: List[SnowflakeTable]

View File

@ -143,6 +143,18 @@ from (
where ROW_NUMBER = 1
"""
SNOWFLAKE_GET_STREAM_NAMES = """
SHOW STREAMS IN SCHEMA "{schema}"
"""
SNOWFLAKE_INCREMENTAL_GET_STREAM_NAMES = """
SHOW STREAMS IN SCHEMA "{schema}"
"""
SNOWFLAKE_GET_STREAM = """
SHOW STREAMS LIKE '{stream_name}' IN SCHEMA "{schema}"
"""
SNOWFLAKE_GET_TRANSIENT_NAMES = """
select TABLE_NAME, NULL from information_schema.tables
where TABLE_SCHEMA = '{schema}'
@ -250,6 +262,10 @@ SNOWFLAKE_TEST_GET_VIEWS = """
SELECT TABLE_NAME FROM "{database_name}".information_schema.views LIMIT 1
"""
SNOWFLAKE_TEST_GET_STREAMS = """
SHOW STREAMS IN DATABASE "{database_name}"
"""
SNOWFLAKE_GET_DATABASES = "SHOW DATABASES"
@ -389,6 +405,15 @@ ORDER BY PROCEDURE_START_TIME DESC
SNOWFLAKE_GET_TABLE_DDL = """
SELECT GET_DDL('TABLE','{table_name}') AS \"text\"
"""
SNOWFLAKE_GET_VIEW_DEFINITION = """
SELECT GET_DDL('VIEW','{view_name}') AS \"text\"
"""
SNOWFLAKE_GET_STREAM_DEFINITION = """
SELECT GET_DDL('STREAM','{stream_name}') AS \"text\"
"""
SNOWFLAKE_QUERY_LOG_QUERY = """
SELECT
QUERY_ID,

View File

@ -10,7 +10,7 @@
# limitations under the License.
"""
Module to define overriden dialect methods
Module to define overridden dialect methods
"""
import operator
from functools import reduce
@ -37,13 +37,17 @@ from metadata.ingestion.source.database.snowflake.queries import (
SNOWFLAKE_GET_EXTERNAL_TABLE_NAMES,
SNOWFLAKE_GET_MVIEW_NAMES,
SNOWFLAKE_GET_SCHEMA_COLUMNS,
SNOWFLAKE_GET_STREAM_DEFINITION,
SNOWFLAKE_GET_STREAM_NAMES,
SNOWFLAKE_GET_TABLE_DDL,
SNOWFLAKE_GET_TRANSIENT_NAMES,
SNOWFLAKE_GET_VIEW_DEFINITION,
SNOWFLAKE_GET_VIEW_NAMES,
SNOWFLAKE_GET_WITHOUT_TRANSIENT_TABLE_NAMES,
SNOWFLAKE_INCREMENTAL_GET_DYNAMIC_TABLE_NAMES,
SNOWFLAKE_INCREMENTAL_GET_EXTERNAL_TABLE_NAMES,
SNOWFLAKE_INCREMENTAL_GET_MVIEW_NAMES,
SNOWFLAKE_INCREMENTAL_GET_STREAM_NAMES,
SNOWFLAKE_INCREMENTAL_GET_TRANSIENT_NAMES,
SNOWFLAKE_INCREMENTAL_GET_VIEW_NAMES,
SNOWFLAKE_INCREMENTAL_GET_WITHOUT_TRANSIENT_TABLE_NAMES,
@ -85,6 +89,15 @@ VIEW_QUERY_MAPS = {
},
}
STREAM_QUERY_MAPS = {
"full": {
"default": SNOWFLAKE_GET_STREAM_NAMES,
},
"incremental": {
"default": SNOWFLAKE_INCREMENTAL_GET_STREAM_NAMES,
},
}
def _denormalize_quote_join(*idents):
ip = dialect.identifier_preparer
@ -150,6 +163,20 @@ def get_view_names_reflection(self, schema=None, **kw):
)
def get_stream_names_reflection(self, schema=None, **kw):
"""Return all stream names in `schema`.
:param schema: Optional, retrieve names from a non-default schema.
For special quoting, use :class:`.quoted_name`.
"""
with self._operation_context() as conn: # pylint: disable=protected-access
return self.dialect.get_stream_names(
conn, schema, info_cache=self.info_cache, **kw
)
def _get_query_map(
incremental: Optional[IncrementalConfig], query_maps: Dict[str, QueryMap]
):
@ -162,7 +189,7 @@ def _get_query_map(
def _get_query_parameters(
self, connection, schema: str, incremental: Optional[IncrementalConfig]
):
"""Returns the proper query parameters depending if the extraciton is Incremental or Full"""
"""Returns the proper query parameters depending if the extraction is Incremental or Full"""
parameters = {"schema": fqn.unquote_name(schema)}
if incremental and incremental.enabled:
@ -225,6 +252,24 @@ def get_view_names(self, connection, schema, **kw):
return result
def get_stream_names(self, connection, schema, **kw):
incremental = kw.get("incremental")
queries = _get_query_map(incremental, STREAM_QUERY_MAPS)
parameters = _get_query_parameters(self, connection, schema, incremental)
query = queries["default"]
cursor = connection.execute(query.format(**parameters))
result = SnowflakeTableList(
tables=[
SnowflakeTable(name=self.normalize_name(row[1]), deleted=None)
for row in cursor
]
)
return result
@reflection.cache
def get_view_definition( # pylint: disable=unused-argument
self, connection, view_name, schema=None, **kw
@ -233,12 +278,10 @@ def get_view_definition( # pylint: disable=unused-argument
Gets the view definition
"""
schema = schema or self.default_schema_name
if schema:
cursor = connection.execute(
f"SELECT GET_DDL('VIEW','{schema}.{view_name}') AS \"text\""
)
else:
cursor = connection.execute(f"SELECT GET_DDL('VIEW','{view_name}') AS \"text\"")
view_name = f"{schema}.{view_name}" if schema else view_name
cursor = connection.execute(
SNOWFLAKE_GET_VIEW_DEFINITION.format(view_name=view_name)
)
n2i = self.__class__._map_name_to_idx(cursor) # pylint: disable=protected-access
try:
ret = cursor.fetchone()
@ -249,6 +292,27 @@ def get_view_definition( # pylint: disable=unused-argument
return None
@reflection.cache
def get_stream_definition( # pylint: disable=unused-argument
self, connection, stream_name, schema=None, **kw
):
"""
Gets the stream definition
"""
schema = schema or self.default_schema_name
stream_name = f"{schema}.{stream_name}" if schema else stream_name
cursor = connection.execute(
SNOWFLAKE_GET_STREAM_DEFINITION.format(stream_name=stream_name)
)
try:
result = cursor.fetchone()
if result:
return result[0]
except Exception:
pass
return None
@reflection.cache
def get_table_comment(
self, connection, table_name, schema=None, **kw
@ -346,11 +410,13 @@ def get_schema_columns(self, connection, schema, **kw):
),
"comment": comment,
"primary_key": (
column_name
in schema_primary_keys[table_name]["constrained_columns"]
)
if current_table_pks
else False,
(
column_name
in schema_primary_keys[table_name]["constrained_columns"]
)
if current_table_pks
else False
),
}
)
if is_identity == "YES":

View File

@ -24,6 +24,7 @@ from metadata.generated.schema.entity.data.table import (
ConstraintType,
DataType,
TableConstraint,
TableType,
)
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.utils.execution_time_tracker import calculate_execution_time
@ -202,9 +203,30 @@ class SqlColumnHandlerMixin:
]
return Column(**parsed_string)
def _get_columns_internal(
self,
schema_name: str,
table_name: str,
db_name: str,
inspector: Inspector,
table_type: TableType = None,
):
"""
Get columns list
"""
return inspector.get_columns(
table_name, schema_name, table_type=table_type, db_name=db_name
)
@calculate_execution_time()
def get_columns_and_constraints( # pylint: disable=too-many-locals
self, schema_name: str, table_name: str, db_name: str, inspector: Inspector
self,
schema_name: str,
table_name: str,
db_name: str,
inspector: Inspector,
table_type: TableType = None,
) -> Tuple[
Optional[List[Column]], Optional[List[TableConstraint]], Optional[List[Dict]]
]:
@ -246,7 +268,9 @@ class SqlColumnHandlerMixin:
table_columns = []
columns = inspector.get_columns(table_name, schema_name, db_name=db_name)
columns = self._get_columns_internal(
schema_name, table_name, db_name, inspector, table_type
)
def process_column(column: dict):
(

View File

@ -676,7 +676,7 @@ class BigqueryUnitTest(TestCase):
] # pylint: disable=cell-var-from-loop
)
self.bq_source.inspector.get_columns = (
lambda table_name, schema, db_name: MOCK_COLUMN_DATA[
lambda table_name, schema, table_type, db_name: MOCK_COLUMN_DATA[
i
] # pylint: disable=cell-var-from-loop
)

View File

@ -1,6 +1,7 @@
"""
Test Cockroach using the topology
"""
import types
from unittest import TestCase
from unittest.mock import patch
@ -9,7 +10,12 @@ from sqlalchemy.types import VARCHAR
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import Column, Constraint, DataType
from metadata.generated.schema.entity.data.table import (
Column,
Constraint,
DataType,
TableType,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
@ -242,14 +248,14 @@ class cockroachUnitTest(TestCase):
def test_datatype(self):
inspector = types.SimpleNamespace()
inspector.get_columns = (
lambda table_name, schema_name, db_name: MOCK_COLUMN_VALUE
lambda table_name, schema_name, table_type, db_name: MOCK_COLUMN_VALUE
)
inspector.get_pk_constraint = lambda table_name, schema_name: []
inspector.get_unique_constraints = lambda table_name, schema_name: []
inspector.get_foreign_keys = lambda table_name, schema_name: []
result, _, _ = self.cockroach_source.get_columns_and_constraints(
"public", "user", "cockroach", inspector
"public", "user", "cockroach", inspector, TableType.Regular
)
for i, _ in enumerate(EXPECTED_COLUMN_VALUE):
self.assertEqual(result[i], EXPECTED_COLUMN_VALUE[i])

View File

@ -369,7 +369,7 @@ class HiveUnitTest(TestCase):
def test_yield_table(self):
self.hive.inspector.get_columns = (
lambda table_name, schema_name, db_name: MOCK_COLUMN_VALUE
lambda table_name, schema_name, table_type, db_name: MOCK_COLUMN_VALUE
)
assert EXPECTED_TABLE == [
either.right

View File

@ -308,7 +308,9 @@ class MssqlUnitTest(TestCase):
self.mssql._inspector_map[self.thread_id] = types.SimpleNamespace()
self.mssql._inspector_map[
self.thread_id
].get_columns = lambda table_name, schema_name, db_name: MOCK_COLUMN_VALUE
].get_columns = (
lambda table_name, schema_name, table_type, db_name: MOCK_COLUMN_VALUE
)
self.mssql._inspector_map[
self.thread_id
].get_pk_constraint = lambda table_name, schema_name: []

View File

@ -21,7 +21,12 @@ from sqlalchemy.types import VARCHAR
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import Column, Constraint, DataType
from metadata.generated.schema.entity.data.table import (
Column,
Constraint,
DataType,
TableType,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
@ -306,13 +311,13 @@ class PostgresUnitTest(TestCase):
def test_datatype(self):
inspector = types.SimpleNamespace()
inspector.get_columns = (
lambda table_name, schema_name, db_name: MOCK_COLUMN_VALUE
lambda table_name, schema_name, table_type, db_name: MOCK_COLUMN_VALUE
)
inspector.get_pk_constraint = lambda table_name, schema_name: []
inspector.get_unique_constraints = lambda table_name, schema_name: []
inspector.get_foreign_keys = lambda table_name, schema_name: []
result, _, _ = self.postgres_source.get_columns_and_constraints(
"public", "user", "postgres", inspector
"public", "user", "postgres", inspector, TableType.Regular
)
for i, _ in enumerate(EXPECTED_COLUMN_VALUE):
self.assertEqual(result[i], EXPECTED_COLUMN_VALUE[i])

View File

@ -112,8 +112,10 @@ You can find more information about the `account_usage` schema [here](https://do
- **Private Key (Optional)**: If you have configured the key pair authentication for the given user you will have to pass the private key associated with the user in this field. You can checkout [this](https://docs.snowflake.com/en/user-guide/key-pair-auth) doc to get more details about key-pair authentication.
- The multi-line key needs to be converted to one line with `\n` for line endings i.e. `-----BEGIN ENCRYPTED PRIVATE KEY-----\nMII...\n...\n-----END ENCRYPTED PRIVATE KEY-----`
- **Snowflake Passphrase Key (Optional)**: If you have configured the encrypted key pair authentication for the given user you will have to pass the paraphrase associated with the private key in this field. You can checkout [this](https://docs.snowflake.com/en/user-guide/key-pair-auth) doc to get more details about key-pair authentication.
- **Include Temporary and Transient Tables**:
- **Include Temporary and Transient Tables**:
Optional configuration for ingestion of `TRANSIENT` and `TEMPORARY` tables, By default, it will skip the `TRANSIENT` and `TEMPORARY` tables.
- **Include Streams**:
Optional configuration for ingestion of streams, By default, it will skip the streams.
- **Client Session Keep Alive**: Optional Configuration to keep the session active in case the ingestion job runs for longer duration.
- **Account Usage Schema Name**: Full name of account usage schema, used in case your used do not have direct access to `SNOWFLAKE.ACCOUNT_USAGE` schema. In such case you can replicate tables `QUERY_HISTORY`, `TAG_REFERENCES`, `PROCEDURES`, `FUNCTIONS` to a custom schema let's say `CUSTOM_DB.CUSTOM_SCHEMA` and provide the same name in this field.

View File

@ -164,27 +164,33 @@ When using this field make sure you have all these tables available within your
{% /codeInfo %}
{% codeInfo srNumber=39 %}
{% codeInfo srNumber=7 %}
**clientSessionKeepAlive**: Optional Configuration to keep the session active in case the ingestion job runs for longer duration.
**includeStreams**: Optional configuration for ingestion of streams, By default, it will skip the streams.
{% /codeInfo %}
{% codeInfo srNumber=7 %}
{% codeInfo srNumber=39 %}
**clientSessionKeepAlive**: Optional Configuration to keep the session active in case the ingestion job runs for longer duration.
{% /codeInfo %}
{% codeInfo srNumber=8 %}
**privateKey**: If you have configured the key pair authentication for the given user you will have to pass the private key associated with the user in this field. You can checkout [this](https://docs.snowflake.com/en/user-guide/key-pair-auth) doc to get more details about key-pair authentication.
- The multi-line key needs to be converted to one line with `\n` for line endings i.e. `-----BEGIN ENCRYPTED PRIVATE KEY-----\nMII...\n...\n-----END ENCRYPTED PRIVATE KEY-----`
{% /codeInfo %}
{% codeInfo srNumber=8 %}
{% codeInfo srNumber=9 %}
**snowflakePrivatekeyPassphrase**: If you have configured the encrypted key pair authentication for the given user you will have to pass the paraphrase associated with the private key in this field. You can checkout [this](https://docs.snowflake.com/en/user-guide/key-pair-auth) doc to get more details about key-pair authentication.
{% /codeInfo %}
{% codeInfo srNumber=9 %}
{% codeInfo srNumber=10 %}
**role**: You can specify the role of user that you would like to ingest with, if no role is specified the default roles assigned to user will be selected.
@ -198,13 +204,13 @@ When using this field make sure you have all these tables available within your
#### Advanced Configuration
{% codeInfo srNumber=10 %}
{% codeInfo srNumber=11 %}
**Connection Options (Optional)**: Enter the details for any additional connection options that can be sent to database during the connection. These details must be added as Key-Value pairs.
{% /codeInfo %}
{% codeInfo srNumber=11 %}
{% codeInfo srNumber=12 %}
**Connection Arguments (Optional)**: Enter the details for any additional connection arguments such as security or protocol configs that can be sent to database during the connection. These details must be added as Key-Value pairs.
@ -245,23 +251,26 @@ source:
```yaml {% srNumber=6 %}
includeTransientTables: false
```
```yaml {% srNumber=7 %}
includeStreams: false
```
```yaml {% srNumber=39 %}
clientSessionKeepAlive: false
```
```yaml {% srNumber=7 %}
```yaml {% srNumber=8 %}
# privateKey: <privateKey>
```
```yaml {% srNumber=8 %}
```yaml {% srNumber=9 %}
# snowflakePrivatekeyPassphrase: <passphrase>
```
```yaml {% srNumber=9 %}
```yaml {% srNumber=10 %}
# role: <role>
```
```yaml {% srNumber=10 %}
```yaml {% srNumber=11 %}
# connectionOptions:
# key: value
```
```yaml {% srNumber=11 %}
```yaml {% srNumber=12 %}
# connectionArguments:
# key: value
```

View File

@ -20,7 +20,8 @@ slug: /main-concepts/metadata-standard/schemas/entity/services/connections/datab
- **`queryTag`** *(string)*: Session query tag used to monitor usage on snowflake. To use a query tag snowflake user should have enough privileges to alter the session.
- **`privateKey`** *(string, format: password)*: Connection to Snowflake instance via Private Key.
- **`snowflakePrivatekeyPassphrase`** *(string, format: password)*: Snowflake Passphrase Key used with Private Key.
- **`includeTransientTables`** *(boolean)*: Optional configuration for ingestion of TRANSIENT tables, By default, it will skip the TRANSIENT tables. Default: `true`.
- **`includeTransientTables`** *(boolean)*: Optional configuration for ingestion of TRANSIENT tables, By default, it will skip the TRANSIENT tables. Default: `false`.
- **`includeStreams`** *(boolean)*: Optional configuration for ingestion of streams, By default, it will skip the streams. Default: `false`.
- **`clientSessionKeepAlive`** *(boolean)*: Optional configuration for ingestion to keep the client session active in case the ingestion process runs for longer durations. Default: `false`.
- **`connectionOptions`**: Refer to *[../connectionBasicType.json#/definitions/connectionOptions](#/connectionBasicType.json#/definitions/connectionOptions)*.
- **`connectionArguments`**: Refer to *[../connectionBasicType.json#/definitions/connectionArguments](#/connectionBasicType.json#/definitions/connectionArguments)*.

View File

@ -34,6 +34,12 @@
"errorMessage": "Failed to fetch views, please validate if the user has enough privilege to fetch views.",
"mandatory": false
},
{
"name": "GetStreams",
"description": "From a given schema, list the streams belonging to that schema. If no schema is specified, we'll list the streams of a random schema.",
"errorMessage": "Failed to fetch streams, please validate if the user has enough privilege to fetch streams.",
"mandatory": false
},
{
"name": "GetTags",
"description": "Check if we can access the snowflake.account_usage.tag_references table to list all available tags.",

View File

@ -42,7 +42,8 @@
"Local",
"Partitioned",
"Foreign",
"Transient"
"Transient",
"Stream"
],
"javaEnums": [
{
@ -77,6 +78,9 @@
},
{
"name": "Transient"
},
{
"name": "Stream"
}
]
},

View File

@ -90,7 +90,13 @@
"title": "Include Transient Tables",
"description": "Optional configuration for ingestion of TRANSIENT tables, By default, it will skip the TRANSIENT tables.",
"type": "boolean",
"default": true
"default": false
},
"includeStreams": {
"title": "Include Streams",
"description": "Optional configuration for ingestion of streams, By default, it will skip the streams.",
"type": "boolean",
"default": false
},
"clientSessionKeepAlive": {
"title": "Client Session Keep Alive",

View File

@ -151,10 +151,17 @@ In Snowflake, we have `TRANSIENT` tables, which will be ignored during the inges
Enable this setting to ingest them during the metadata workflow.
$$
$$section
### Include Streams $(id="includeStreams")
In Snowflake, we have streams, which will be ignored during the ingestion by default.
Enable this setting to ingest them during the metadata workflow.
$$
$$section
### Client Session Keep Alive $(id="clientSessionKeepAlive")
Optional Configuration to keep the session active in case the ingestion job runs for longer duration.
Optional Configuration to keep the session active in case the ingestion job runs for longer duration.
$$
$$section

View File

@ -1275,6 +1275,7 @@ export enum TableType {
Partitioned = "Partitioned",
Regular = "Regular",
SecureView = "SecureView",
Stream = "Stream",
Transient = "Transient",
View = "View",
}

View File

@ -49,6 +49,10 @@ export interface SnowflakeConnection {
* TRANSIENT tables.
*/
includeTransientTables?: boolean;
/**
* Optional configuration for ingestion of streams, By default, it will skip the streams.
*/
includeStreams?: boolean;
/**
* Password to connect to Snowflake.
*/