Minor: Snowflake - Add Support For Materialised Views (#14664)

This commit is contained in:
Mayur Singal 2024-01-10 18:36:42 +05:30 committed by GitHub
parent f9ebdfc722
commit 8197fe56ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 79 additions and 6 deletions

View File

@ -249,6 +249,23 @@ class CommonDbSourceService(
for table_name in self.inspector.get_table_names(schema_name) or [] for table_name in self.inspector.get_table_names(schema_name) or []
] ]
def query_view_names_and_types(
self, schema_name: str
) -> Iterable[TableNameAndType]:
"""
Connect to the source database to get the view
name and type. By default, use the inspector method
to get the names and pass the View type.
This is useful for sources where we need fine-grained
logic on how to handle table types, e.g., material views,...
"""
return [
TableNameAndType(name=table_name, type_=TableType.View)
for table_name in self.inspector.get_view_names(schema_name) or []
]
def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]: def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]:
""" """
Handle table and views. Handle table and views.
@ -288,8 +305,10 @@ class CommonDbSourceService(
yield table_name, table_and_type.type_ yield table_name, table_and_type.type_
if self.source_config.includeViews: if self.source_config.includeViews:
for view_name in self.inspector.get_view_names(schema_name): for view_and_type in self.query_view_names_and_types(schema_name):
view_name = self.standardize_table_name(schema_name, view_name) view_name = self.standardize_table_name(
schema_name, view_and_type.name
)
view_fqn = fqn.build( view_fqn = fqn.build(
self.metadata, self.metadata,
entity_type=Table, entity_type=Table,
@ -310,7 +329,7 @@ class CommonDbSourceService(
"Table Filtered Out", "Table Filtered Out",
) )
continue continue
yield view_name, TableType.View yield view_name, view_and_type.type_
except Exception as err: except Exception as err:
logger.warning( logger.warning(
f"Fetching tables names failed for schema {schema_name} due to - {err}" f"Fetching tables names failed for schema {schema_name} due to - {err}"
@ -320,7 +339,7 @@ class CommonDbSourceService(
def get_view_definition( def get_view_definition(
self, table_type: str, table_name: str, schema_name: str, inspector: Inspector self, table_type: str, table_name: str, schema_name: str, inspector: Inspector
) -> Optional[str]: ) -> Optional[str]:
if table_type == TableType.View: if table_type in (TableType.View, TableType.MaterializedView):
try: try:
view_definition = inspector.get_view_definition(table_name, schema_name) view_definition = inspector.get_view_definition(table_name, schema_name)
view_definition = ( view_definition = (

View File

@ -88,6 +88,7 @@ from metadata.ingestion.source.database.snowflake.utils import (
get_unique_constraints, get_unique_constraints,
get_view_definition, get_view_definition,
get_view_names, get_view_names,
get_view_names_reflection,
normalize_names, normalize_names,
) )
from metadata.ingestion.source.database.stored_procedures_mixin import ( from metadata.ingestion.source.database.stored_procedures_mixin import (
@ -121,6 +122,7 @@ SnowflakeDialect._get_schema_columns = ( # pylint: disable=protected-access
get_schema_columns get_schema_columns
) )
Inspector.get_table_names = get_table_names_reflection Inspector.get_table_names = get_table_names_reflection
Inspector.get_view_names = get_view_names_reflection
SnowflakeDialect._current_database_schema = ( # pylint: disable=protected-access SnowflakeDialect._current_database_schema = ( # pylint: disable=protected-access
_current_database_schema _current_database_schema
) )
@ -518,6 +520,33 @@ class SnowflakeSource(
) )
) )
def query_view_names_and_types(
self, schema_name: str
) -> Iterable[TableNameAndType]:
"""
Connect to the source database to get the view
name and type. By default, use the inspector method
to get the names and pass the View type.
This is useful for sources where we need fine-grained
logic on how to handle table types, e.g., material views,...
"""
regular_views = [
TableNameAndType(name=view_name, type_=TableType.View)
for view_name in self.inspector.get_view_names(schema_name) or []
]
materialized_views = [
TableNameAndType(name=view_name, type_=TableType.MaterializedView)
for view_name in self.inspector.get_view_names(
schema_name, materialized_views=True
)
or []
]
return regular_views + materialized_views
def get_stored_procedures(self) -> Iterable[SnowflakeStoredProcedure]: def get_stored_procedures(self) -> Iterable[SnowflakeStoredProcedure]:
"""List Snowflake stored procedures""" """List Snowflake stored procedures"""
if self.source_config.includeStoredProcedures: if self.source_config.includeStoredProcedures:

View File

@ -67,6 +67,12 @@ SNOWFLAKE_GET_VIEW_NAMES = """
select TABLE_NAME from information_schema.tables select TABLE_NAME from information_schema.tables
where TABLE_SCHEMA = '{}' and TABLE_TYPE = 'VIEW' where TABLE_SCHEMA = '{}' and TABLE_TYPE = 'VIEW'
""" """
SNOWFLAKE_GET_MVIEW_NAMES = """
select TABLE_NAME from information_schema.tables
where TABLE_SCHEMA = '{}' and TABLE_TYPE = 'MATERIALIZED VIEW'
"""
SNOWFLAKE_GET_TRANSIENT_NAMES = """ SNOWFLAKE_GET_TRANSIENT_NAMES = """
select TABLE_NAME from information_schema.tables select TABLE_NAME from information_schema.tables
where TABLE_SCHEMA = '{}' where TABLE_SCHEMA = '{}'

View File

@ -25,6 +25,7 @@ from sqlalchemy.types import FLOAT
from metadata.ingestion.source.database.snowflake.queries import ( from metadata.ingestion.source.database.snowflake.queries import (
SNOWFLAKE_GET_COMMENTS, SNOWFLAKE_GET_COMMENTS,
SNOWFLAKE_GET_EXTERNAL_TABLE_NAMES, SNOWFLAKE_GET_EXTERNAL_TABLE_NAMES,
SNOWFLAKE_GET_MVIEW_NAMES,
SNOWFLAKE_GET_SCHEMA_COLUMNS, SNOWFLAKE_GET_SCHEMA_COLUMNS,
SNOWFLAKE_GET_TRANSIENT_NAMES, SNOWFLAKE_GET_TRANSIENT_NAMES,
SNOWFLAKE_GET_VIEW_NAMES, SNOWFLAKE_GET_VIEW_NAMES,
@ -74,6 +75,20 @@ def get_table_names_reflection(self, schema=None, **kw):
) )
def get_view_names_reflection(self, schema=None, **kw):
"""Return all view names in `schema`.
:param schema: Optional, retrieve names from a non-default schema.
For special quoting, use :class:`.quoted_name`.
"""
with self._operation_context() as conn: # pylint: disable=protected-access
return self.dialect.get_view_names(
conn, schema, info_cache=self.info_cache, **kw
)
def get_table_names(self, connection, schema, **kw): def get_table_names(self, connection, schema, **kw):
query = SNOWFLAKE_GET_WITHOUT_TRANSIENT_TABLE_NAMES query = SNOWFLAKE_GET_WITHOUT_TRANSIENT_TABLE_NAMES
if kw.get("include_transient_tables"): if kw.get("include_transient_tables"):
@ -86,8 +101,12 @@ def get_table_names(self, connection, schema, **kw):
return result return result
def get_view_names(self, connection, schema, **kw): # pylint: disable=unused-argument def get_view_names(self, connection, schema, **kw):
cursor = connection.execute(SNOWFLAKE_GET_VIEW_NAMES.format(schema)) if kw.get("materialized_views"):
query = SNOWFLAKE_GET_MVIEW_NAMES
else:
query = SNOWFLAKE_GET_VIEW_NAMES
cursor = connection.execute(query.format(schema))
result = [self.normalize_name(row[0]) for row in cursor] result = [self.normalize_name(row[0]) for row in cursor]
return result return result