Add a configurationto skip Snowflake transient and tmp tables (#10665)

* Add a configurationto skip  Snowflake transient and tmp tables

* Fix Python checkstyle

* add separate query for transient tables

* Move skipTempTables into SnowflakeConnection

* Fix merge conflict

* change skip word to include

* Add title in json file

---------

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
This commit is contained in:
Milan Bariya 2023-03-28 09:59:59 +05:30 committed by GitHub
parent 78d7dd8789
commit 3d1bbb1037
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 87 additions and 3 deletions

View File

@ -39,7 +39,10 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService from metadata.ingestion.source.database.common_db_source import (
CommonDbSourceService,
TableNameAndType,
)
from metadata.ingestion.source.database.snowflake.queries import ( from metadata.ingestion.source.database.snowflake.queries import (
SNOWFLAKE_FETCH_ALL_TAGS, SNOWFLAKE_FETCH_ALL_TAGS,
SNOWFLAKE_GET_CLUSTER_KEY, SNOWFLAKE_GET_CLUSTER_KEY,
@ -52,6 +55,7 @@ from metadata.ingestion.source.database.snowflake.utils import (
get_schema_columns, get_schema_columns,
get_table_comment, get_table_comment,
get_table_names, get_table_names,
get_table_names_reflection,
get_unique_constraints, get_unique_constraints,
get_view_definition, get_view_definition,
get_view_names, get_view_names,
@ -82,6 +86,7 @@ SnowflakeDialect.get_unique_constraints = get_unique_constraints
SnowflakeDialect._get_schema_columns = ( # pylint: disable=protected-access SnowflakeDialect._get_schema_columns = ( # pylint: disable=protected-access
get_schema_columns get_schema_columns
) )
Inspector.get_table_names = get_table_names_reflection
class SnowflakeSource(CommonDbSourceService): class SnowflakeSource(CommonDbSourceService):
@ -301,3 +306,30 @@ class SnowflakeSource(CommonDbSourceService):
description="SNOWFLAKE TAG VALUE", description="SNOWFLAKE TAG VALUE",
), ),
) )
def query_table_names_and_types(
self, schema_name: str
) -> Iterable[TableNameAndType]:
"""
Connect to the source database to get the table
name and type. By default, use the inspector method
to get the names and pass the Regular type.
This is useful for sources where we need fine-grained
logic on how to handle table types, e.g., external, foreign,...
"""
if self.config.serviceConnection.__root__.config.includeTempTables:
return [
TableNameAndType(name=table_name)
for table_name in self.inspector.get_table_names(
schema=schema_name, include_temp_tables="True"
)
or []
]
return [
TableNameAndType(name=table_name)
for table_name in self.inspector.get_table_names(schema=schema_name) or []
]

View File

@ -49,6 +49,10 @@ SNOWFLAKE_GET_TABLE_NAMES = """
select TABLE_NAME from information_schema.tables where TABLE_SCHEMA = '{}' and TABLE_TYPE = 'BASE TABLE' select TABLE_NAME from information_schema.tables where TABLE_SCHEMA = '{}' and TABLE_TYPE = 'BASE TABLE'
""" """
SNOWFLAKE_GET_WITHOUT_TRANSIENT_TABLE_NAMES = """
select TABLE_NAME from information_schema.tables where TABLE_SCHEMA = '{}' and IS_TRANSIENT = 'NO'
"""
SNOWFLAKE_GET_VIEW_NAMES = """ SNOWFLAKE_GET_VIEW_NAMES = """
select TABLE_NAME from information_schema.tables where TABLE_SCHEMA = '{}' and TABLE_TYPE = 'VIEW' select TABLE_NAME from information_schema.tables where TABLE_SCHEMA = '{}' and TABLE_TYPE = 'VIEW'
""" """

View File

@ -25,6 +25,7 @@ from metadata.ingestion.source.database.snowflake.queries import (
SNOWFLAKE_GET_SCHEMA_COLUMNS, SNOWFLAKE_GET_SCHEMA_COLUMNS,
SNOWFLAKE_GET_TABLE_NAMES, SNOWFLAKE_GET_TABLE_NAMES,
SNOWFLAKE_GET_VIEW_NAMES, SNOWFLAKE_GET_VIEW_NAMES,
SNOWFLAKE_GET_WITHOUT_TRANSIENT_TABLE_NAMES,
) )
from metadata.utils.sqlalchemy_utils import ( from metadata.utils.sqlalchemy_utils import (
get_display_datatype, get_display_datatype,
@ -32,8 +33,45 @@ from metadata.utils.sqlalchemy_utils import (
) )
def get_table_names(self, connection, schema, **kw): # pylint: disable=unused-argument def get_table_names_reflection(self, schema=None, **kw):
cursor = connection.execute(SNOWFLAKE_GET_TABLE_NAMES.format(schema)) """Return all table names in referred to within a particular schema.
The names are expected to be real tables only, not views.
Views are instead returned using the
:meth:`_reflection.Inspector.get_view_names`
method.
:param schema: Schema name. If ``schema`` is left at ``None``, the
database's default schema is
used, else the named schema is searched. If the database does not
support named schemas, behavior is undefined if ``schema`` is not
passed as ``None``. For special quoting, use :class:`.quoted_name`.
.. seealso::
:meth:`_reflection.Inspector.get_sorted_table_and_fkc_names`
:attr:`_schema.MetaData.sorted_tables`
"""
with self._operation_context() as conn: # pylint: disable=protected-access
return self.dialect.get_table_names(
conn, schema, info_cache=self.info_cache, **kw
)
def get_table_names(self, connection, schema, **kw):
if kw.get("include_temp_tables"):
cursor = connection.execute(SNOWFLAKE_GET_TABLE_NAMES.format(schema))
result = [self.normalize_name(row[0]) for row in cursor]
return result
cursor = connection.execute(
SNOWFLAKE_GET_WITHOUT_TRANSIENT_TABLE_NAMES.format(schema)
)
result = [self.normalize_name(row[0]) for row in cursor] result = [self.normalize_name(row[0]) for row in cursor]
return result return result

View File

@ -133,6 +133,7 @@ source:
warehouse: <warehouse> warehouse: <warehouse>
account: <account> account: <account>
# database: <database> # database: <database>
includeTempTables: false
# hostPort: account.region.service.snowflakecomputing.com # hostPort: account.region.service.snowflakecomputing.com
# privateKey: <privateKey> # privateKey: <privateKey>
# snowflakePrivatekeyPassphrase: <passphrase> # snowflakePrivatekeyPassphrase: <passphrase>
@ -184,6 +185,7 @@ workflowConfig:
- **warehouse**: Warehouse name. - **warehouse**: Warehouse name.
- **database**: The database of the data source is an optional parameter, if you would like to restrict the metadata reading to a single database. If left blank, OpenMetadata ingestion attempts to scan all the databases. - **database**: The database of the data source is an optional parameter, if you would like to restrict the metadata reading to a single database. If left blank, OpenMetadata ingestion attempts to scan all the databases.
- **privateKey**: Connection to Snowflake instance via Private Key. - **privateKey**: Connection to Snowflake instance via Private Key.
- **includeTempTables**: Optional configuration for ingestion of TRANSIENT and TEMPORARY tables, By default, it will skip the TRANSIENT and TEMPORARY tables.
- **snowflakePrivatekeyPassphrase**: Snowflake Passphrase Key used with Private Key. - **snowflakePrivatekeyPassphrase**: Snowflake Passphrase Key used with Private Key.
- **Connection Options (Optional)**: Enter the details for any additional connection options that can be sent to Snowflake during the connection. These details must be added as Key-Value pairs. - **Connection Options (Optional)**: Enter the details for any additional connection options that can be sent to Snowflake during the connection. These details must be added as Key-Value pairs.
- **Connection Arguments (Optional)**: Enter the details for any additional connection arguments such as security or protocol configs that can be sent to Snowflake during the connection. These details must be added as Key-Value pairs. - **Connection Arguments (Optional)**: Enter the details for any additional connection arguments such as security or protocol configs that can be sent to Snowflake during the connection. These details must be added as Key-Value pairs.

View File

@ -132,6 +132,7 @@ source:
warehouse: <warehouse> warehouse: <warehouse>
account: <account> account: <account>
# database: <database> # database: <database>
includeTempTables: false
# hostPort: account.region.service.snowflakecomputing.com # hostPort: account.region.service.snowflakecomputing.com
# privateKey: | # privateKey: |
# <privateKey> # <privateKey>
@ -186,6 +187,7 @@ workflowConfig:
- **database**: The database of the data source is an optional parameter, if you would like to restrict the metadata reading to a single database. If left blank, OpenMetadata ingestion attempts to scan all the databases. - **database**: The database of the data source is an optional parameter, if you would like to restrict the metadata reading to a single database. If left blank, OpenMetadata ingestion attempts to scan all the databases.
- **privateKey**: Connection to Snowflake instance via Private Key instead of a Password. - **privateKey**: Connection to Snowflake instance via Private Key instead of a Password.
- The multi-line key needs to be correctly formatted in YAML so a literal block scalar which retains new lines is recommended (`|`). - The multi-line key needs to be correctly formatted in YAML so a literal block scalar which retains new lines is recommended (`|`).
- **includeTempTables**: Optional configuration for ingestion of TRANSIENT and TEMPORARY tables, By default, it will skip the TRANSIENT and TEMPORARY tables.
- **snowflakePrivatekeyPassphrase**: Snowflake Passphrase Key used with and encrypted Private Key. - **snowflakePrivatekeyPassphrase**: Snowflake Passphrase Key used with and encrypted Private Key.
- **Connection Options (Optional)**: Enter the details for any additional connection options that can be sent to Snowflake during the connection. These details must be added as Key-Value pairs. - **Connection Options (Optional)**: Enter the details for any additional connection options that can be sent to Snowflake during the connection. These details must be added as Key-Value pairs.
- **Connection Arguments (Optional)**: Enter the details for any additional connection arguments such as security or protocol configs that can be sent to Snowflake during the connection. These details must be added as Key-Value pairs. - **Connection Arguments (Optional)**: Enter the details for any additional connection arguments such as security or protocol configs that can be sent to Snowflake during the connection. These details must be added as Key-Value pairs.

View File

@ -80,6 +80,12 @@
"type": "string", "type": "string",
"format": "password" "format": "password"
}, },
"includeTempTables": {
"title": "Include Temporary and Transient Tables",
"description": "Optional configuration for ingestion of TRANSIENT and TEMPORARY tables, By default, it will skip the TRANSIENT and TEMPORARY tables.",
"type": "boolean",
"default": false
},
"connectionOptions": { "connectionOptions": {
"title": "Connection Options", "title": "Connection Options",
"$ref": "../connectionBasicType.json#/definitions/connectionOptions" "$ref": "../connectionBasicType.json#/definitions/connectionOptions"