Fix: Snowflake support extarnal tables (#11466)

This commit is contained in:
Mayur Singal 2023-05-08 15:54:42 +05:30 committed by GitHub
parent a897954afc
commit 074418eac3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 41 additions and 21 deletions

View File

@ -26,7 +26,11 @@ from metadata.generated.schema.api.classification.createClassification import (
)
from metadata.generated.schema.api.classification.createTag import CreateTagRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import IntervalType, TablePartition
from metadata.generated.schema.entity.data.table import (
IntervalType,
TablePartition,
TableType,
)
from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import (
SnowflakeConnection,
)
@ -319,16 +323,21 @@ class SnowflakeSource(CommonDbSourceService):
logic on how to handle table types, e.g., external, foreign,...
"""
if self.config.serviceConnection.__root__.config.includeTempTables:
return [
TableNameAndType(name=table_name)
for table_name in self.inspector.get_table_names(
schema=schema_name, include_temp_tables="True"
)
or []
]
return [
regular_tables = [
TableNameAndType(name=table_name)
for table_name in self.inspector.get_table_names(schema=schema_name) or []
for table_name in self.inspector.get_table_names(
schema=schema_name,
include_temp_tables=self.service_connection.includeTempTables,
)
or []
]
external_tables = [
TableNameAndType(name=table_name, type_=TableType.External)
for table_name in self.inspector.get_table_names(
schema=schema_name, external_tables=True
)
or []
]
return regular_tables + external_tables

View File

@ -46,15 +46,25 @@ SNOWFLAKE_FETCH_ALL_TAGS = textwrap.dedent(
)
SNOWFLAKE_GET_TABLE_NAMES = """
select TABLE_NAME from information_schema.tables where TABLE_SCHEMA = '{}' and TABLE_TYPE = 'BASE TABLE'
select TABLE_NAME from information_schema.tables
where TABLE_SCHEMA = '{}' and TABLE_TYPE = 'BASE TABLE'
"""
SNOWFLAKE_GET_EXTERNAL_TABLE_NAMES = """
select TABLE_NAME from information_schema.tables
where TABLE_SCHEMA = '{}' AND TABLE_TYPE = 'EXTERNAL TABLE'
"""
SNOWFLAKE_GET_WITHOUT_TRANSIENT_TABLE_NAMES = """
select TABLE_NAME from information_schema.tables where TABLE_SCHEMA = '{}' and IS_TRANSIENT = 'NO'
select TABLE_NAME from information_schema.tables
where TABLE_SCHEMA = '{}'
AND TABLE_TYPE = 'BASE TABLE'
AND IS_TRANSIENT != 'YES'
"""
SNOWFLAKE_GET_VIEW_NAMES = """
select TABLE_NAME from information_schema.tables where TABLE_SCHEMA = '{}' and TABLE_TYPE = 'VIEW'
select TABLE_NAME from information_schema.tables
where TABLE_SCHEMA = '{}' and TABLE_TYPE = 'VIEW'
"""
SNOWFLAKE_GET_COMMENTS = textwrap.dedent(

View File

@ -22,6 +22,7 @@ from sqlalchemy.types import FLOAT
from metadata.ingestion.source.database.snowflake.queries import (
SNOWFLAKE_GET_COMMENTS,
SNOWFLAKE_GET_EXTERNAL_TABLE_NAMES,
SNOWFLAKE_GET_SCHEMA_COLUMNS,
SNOWFLAKE_GET_TABLE_NAMES,
SNOWFLAKE_GET_VIEW_NAMES,
@ -63,14 +64,14 @@ def get_table_names_reflection(self, schema=None, **kw):
def get_table_names(self, connection, schema, **kw):
query = SNOWFLAKE_GET_WITHOUT_TRANSIENT_TABLE_NAMES
if kw.get("include_temp_tables"):
cursor = connection.execute(SNOWFLAKE_GET_TABLE_NAMES.format(schema))
result = [self.normalize_name(row[0]) for row in cursor]
return result
query = SNOWFLAKE_GET_TABLE_NAMES
cursor = connection.execute(
SNOWFLAKE_GET_WITHOUT_TRANSIENT_TABLE_NAMES.format(schema)
)
if kw.get("external_tables"):
query = SNOWFLAKE_GET_EXTERNAL_TABLE_NAMES
cursor = connection.execute(query.format(schema))
result = [self.normalize_name(row[0]) for row in cursor]
return result