From 3d1bbb1037f941e73b1b057e33ea1fc43e148f69 Mon Sep 17 00:00:00 2001 From: Milan Bariya <52292922+MilanBariya@users.noreply.github.com> Date: Tue, 28 Mar 2023 09:59:59 +0530 Subject: [PATCH] Add a configurationto skip Snowflake transient and tmp tables (#10665) * Add a configurationto skip Snowflake transient and tmp tables * Fix Python checkstyle * add separate query for transient tables * Move skipTempTables into SnowflakeConnection * Fix merge conflict * change skip word to include * Add title in json file --------- Co-authored-by: Pere Miquel Brull --- .../source/database/snowflake/metadata.py | 34 ++++++++++++++- .../source/database/snowflake/queries.py | 4 ++ .../source/database/snowflake/utils.py | 42 ++++++++++++++++++- .../connectors/database/snowflake/airflow.md | 2 + .../connectors/database/snowflake/cli.md | 2 + .../database/snowflakeConnection.json | 6 +++ 6 files changed, 87 insertions(+), 3 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py index 35cdfaffdf8..75a1163b880 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py @@ -39,7 +39,10 @@ from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type -from metadata.ingestion.source.database.common_db_source import CommonDbSourceService +from metadata.ingestion.source.database.common_db_source import ( + CommonDbSourceService, + TableNameAndType, +) from metadata.ingestion.source.database.snowflake.queries import ( SNOWFLAKE_FETCH_ALL_TAGS, SNOWFLAKE_GET_CLUSTER_KEY, @@ -52,6 +55,7 @@ from metadata.ingestion.source.database.snowflake.utils import ( get_schema_columns, get_table_comment, get_table_names, + get_table_names_reflection, get_unique_constraints, get_view_definition, get_view_names, @@ -82,6 +86,7 @@ SnowflakeDialect.get_unique_constraints = get_unique_constraints SnowflakeDialect._get_schema_columns = ( # pylint: disable=protected-access get_schema_columns ) +Inspector.get_table_names = get_table_names_reflection class SnowflakeSource(CommonDbSourceService): @@ -301,3 +306,30 @@ class SnowflakeSource(CommonDbSourceService): description="SNOWFLAKE TAG VALUE", ), ) + + def query_table_names_and_types( + self, schema_name: str + ) -> Iterable[TableNameAndType]: + """ + Connect to the source database to get the table + name and type. By default, use the inspector method + to get the names and pass the Regular type. + + This is useful for sources where we need fine-grained + logic on how to handle table types, e.g., external, foreign,... + """ + + if self.config.serviceConnection.__root__.config.includeTempTables: + + return [ + TableNameAndType(name=table_name) + for table_name in self.inspector.get_table_names( + schema=schema_name, include_temp_tables="True" + ) + or [] + ] + + return [ + TableNameAndType(name=table_name) + for table_name in self.inspector.get_table_names(schema=schema_name) or [] + ] diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py b/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py index b2a5cfdb3cc..ad67fdd7708 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py @@ -49,6 +49,10 @@ SNOWFLAKE_GET_TABLE_NAMES = """ select TABLE_NAME from information_schema.tables where TABLE_SCHEMA = '{}' and TABLE_TYPE = 'BASE TABLE' """ +SNOWFLAKE_GET_WITHOUT_TRANSIENT_TABLE_NAMES = """ +select TABLE_NAME from information_schema.tables where TABLE_SCHEMA = '{}' and IS_TRANSIENT = 'NO' +""" + SNOWFLAKE_GET_VIEW_NAMES = """ select TABLE_NAME from information_schema.tables where TABLE_SCHEMA = '{}' and TABLE_TYPE = 'VIEW' """ diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py b/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py index 80b01fa98dd..29467f24e30 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py @@ -25,6 +25,7 @@ from metadata.ingestion.source.database.snowflake.queries import ( SNOWFLAKE_GET_SCHEMA_COLUMNS, SNOWFLAKE_GET_TABLE_NAMES, SNOWFLAKE_GET_VIEW_NAMES, + SNOWFLAKE_GET_WITHOUT_TRANSIENT_TABLE_NAMES, ) from metadata.utils.sqlalchemy_utils import ( get_display_datatype, @@ -32,8 +33,45 @@ from metadata.utils.sqlalchemy_utils import ( ) -def get_table_names(self, connection, schema, **kw): # pylint: disable=unused-argument - cursor = connection.execute(SNOWFLAKE_GET_TABLE_NAMES.format(schema)) +def get_table_names_reflection(self, schema=None, **kw): + """Return all table names in referred to within a particular schema. + + The names are expected to be real tables only, not views. + Views are instead returned using the + :meth:`_reflection.Inspector.get_view_names` + method. + + + :param schema: Schema name. If ``schema`` is left at ``None``, the + database's default schema is + used, else the named schema is searched. If the database does not + support named schemas, behavior is undefined if ``schema`` is not + passed as ``None``. For special quoting, use :class:`.quoted_name`. + + .. seealso:: + + :meth:`_reflection.Inspector.get_sorted_table_and_fkc_names` + + :attr:`_schema.MetaData.sorted_tables` + + """ + + with self._operation_context() as conn: # pylint: disable=protected-access + return self.dialect.get_table_names( + conn, schema, info_cache=self.info_cache, **kw + ) + + +def get_table_names(self, connection, schema, **kw): + + if kw.get("include_temp_tables"): + cursor = connection.execute(SNOWFLAKE_GET_TABLE_NAMES.format(schema)) + result = [self.normalize_name(row[0]) for row in cursor] + return result + + cursor = connection.execute( + SNOWFLAKE_GET_WITHOUT_TRANSIENT_TABLE_NAMES.format(schema) + ) result = [self.normalize_name(row[0]) for row in cursor] return result diff --git a/openmetadata-docs/content/connectors/database/snowflake/airflow.md b/openmetadata-docs/content/connectors/database/snowflake/airflow.md index 0cdbeac00c0..670eb2a25f9 100644 --- a/openmetadata-docs/content/connectors/database/snowflake/airflow.md +++ b/openmetadata-docs/content/connectors/database/snowflake/airflow.md @@ -133,6 +133,7 @@ source: warehouse: account: # database: + includeTempTables: false # hostPort: account.region.service.snowflakecomputing.com # privateKey: # snowflakePrivatekeyPassphrase: @@ -184,6 +185,7 @@ workflowConfig: - **warehouse**: Warehouse name. - **database**: The database of the data source is an optional parameter, if you would like to restrict the metadata reading to a single database. If left blank, OpenMetadata ingestion attempts to scan all the databases. - **privateKey**: Connection to Snowflake instance via Private Key. +- **includeTempTables**: Optional configuration for ingestion of TRANSIENT and TEMPORARY tables, By default, it will skip the TRANSIENT and TEMPORARY tables. - **snowflakePrivatekeyPassphrase**: Snowflake Passphrase Key used with Private Key. - **Connection Options (Optional)**: Enter the details for any additional connection options that can be sent to Snowflake during the connection. These details must be added as Key-Value pairs. - **Connection Arguments (Optional)**: Enter the details for any additional connection arguments such as security or protocol configs that can be sent to Snowflake during the connection. These details must be added as Key-Value pairs. diff --git a/openmetadata-docs/content/connectors/database/snowflake/cli.md b/openmetadata-docs/content/connectors/database/snowflake/cli.md index 3644824513c..d7eb98c0467 100644 --- a/openmetadata-docs/content/connectors/database/snowflake/cli.md +++ b/openmetadata-docs/content/connectors/database/snowflake/cli.md @@ -132,6 +132,7 @@ source: warehouse: account: # database: + includeTempTables: false # hostPort: account.region.service.snowflakecomputing.com # privateKey: | # @@ -186,6 +187,7 @@ workflowConfig: - **database**: The database of the data source is an optional parameter, if you would like to restrict the metadata reading to a single database. If left blank, OpenMetadata ingestion attempts to scan all the databases. - **privateKey**: Connection to Snowflake instance via Private Key instead of a Password. - The multi-line key needs to be correctly formatted in YAML so a literal block scalar which retains new lines is recommended (`|`). +- **includeTempTables**: Optional configuration for ingestion of TRANSIENT and TEMPORARY tables, By default, it will skip the TRANSIENT and TEMPORARY tables. - **snowflakePrivatekeyPassphrase**: Snowflake Passphrase Key used with and encrypted Private Key. - **Connection Options (Optional)**: Enter the details for any additional connection options that can be sent to Snowflake during the connection. These details must be added as Key-Value pairs. - **Connection Arguments (Optional)**: Enter the details for any additional connection arguments such as security or protocol configs that can be sent to Snowflake during the connection. These details must be added as Key-Value pairs. diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/snowflakeConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/snowflakeConnection.json index 695253f89fe..1191ab1dabd 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/snowflakeConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/snowflakeConnection.json @@ -80,6 +80,12 @@ "type": "string", "format": "password" }, + "includeTempTables": { + "title": "Include Temporary and Transient Tables", + "description": "Optional configuration for ingestion of TRANSIENT and TEMPORARY tables, By default, it will skip the TRANSIENT and TEMPORARY tables.", + "type": "boolean", + "default": false + }, "connectionOptions": { "title": "Connection Options", "$ref": "../connectionBasicType.json#/definitions/connectionOptions"