fix: improve query and minimize snflk <-> OM backand and forth (#24228)

This commit is contained in:
Teddy 2025-11-07 14:54:37 +01:00 committed by GitHub
parent 4e398d003b
commit f3ef29a117
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 78 additions and 99 deletions

View File

@ -94,10 +94,9 @@ from metadata.ingestion.source.database.snowflake.queries import (
SNOWFLAKE_GET_DATABASE_COMMENTS,
SNOWFLAKE_GET_DATABASES,
SNOWFLAKE_GET_EXTERNAL_LOCATIONS,
SNOWFLAKE_GET_FUNCTIONS,
SNOWFLAKE_GET_ORGANIZATION_NAME,
SNOWFLAKE_GET_SCHEMA_COMMENTS,
SNOWFLAKE_GET_STORED_PROCEDURES,
SNOWFLAKE_GET_STORED_PROCEDURES_AND_FUNCTIONS,
SNOWFLAKE_GET_STREAM,
SNOWFLAKE_LIFE_CYCLE_QUERY,
SNOWFLAKE_SESSION_TAG_QUERY,
@ -558,18 +557,17 @@ class SnowflakeSource(
def _get_table_names_and_types(
self, schema_name: str, table_type: TableType = TableType.Regular
) -> List[TableNameAndType]:
table_type_to_params_map = {
TableType.Regular: {},
TableType.External: {"external_tables": True},
TableType.Transient: {"include_transient_tables": True},
TableType.Dynamic: {"dynamic_tables": True},
}
snowflake_tables = self.inspector.get_table_names(
schema=schema_name,
incremental=self.incremental,
account_usage=self.service_connection.accountUsageSchema,
**table_type_to_params_map[table_type],
include_views=self.source_config.includeViews,
**(
{"include_transient_tables": True}
if self.service_connection.includeTransientTables
else {}
),
)
self.context.get_global().deleted_tables.extend(
@ -587,7 +585,7 @@ class SnowflakeSource(
)
return [
TableNameAndType(name=table.name, type_=table_type)
TableNameAndType(name=table.name, type_=table.type_)
for table in snowflake_tables.get_not_deleted()
]
@ -631,21 +629,6 @@ class SnowflakeSource(
"""
table_list = self._get_table_names_and_types(schema_name)
table_list.extend(
self._get_table_names_and_types(schema_name, table_type=TableType.External)
)
table_list.extend(
self._get_table_names_and_types(schema_name, table_type=TableType.Dynamic)
)
if self.service_connection.includeTransientTables:
table_list.extend(
self._get_table_names_and_types(
schema_name, table_type=TableType.Transient
)
)
if self.service_connection.includeStreams:
table_list.extend(self._get_stream_names_and_types(schema_name))
@ -742,39 +725,6 @@ class SnowflakeSource(
logger.error(f"Unable to get procedure source url: {exc}")
return None
def _get_view_names_and_types(
self, schema_name: str, materialized_views: bool = False
) -> List[TableNameAndType]:
table_type = (
TableType.MaterializedView if materialized_views else TableType.View
)
snowflake_views = self.inspector.get_view_names(
schema=schema_name,
incremental=self.incremental,
account_usage=self.service_connection.accountUsageSchema,
materialized_views=materialized_views,
)
self.context.get_global().deleted_tables.extend(
[
fqn.build(
metadata=self.metadata,
entity_type=Table,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
schema_name=schema_name,
table_name=view.name,
)
for view in snowflake_views.get_deleted()
]
)
return [
TableNameAndType(name=view.name, type_=table_type)
for view in snowflake_views.get_not_deleted()
]
def query_view_names_and_types(
self, schema_name: str
) -> Iterable[TableNameAndType]:
@ -786,12 +736,7 @@ class SnowflakeSource(
This is useful for sources where we need fine-grained
logic on how to handle table types, e.g., material views,...
"""
views = self._get_view_names_and_types(schema_name)
views.extend(
self._get_view_names_and_types(schema_name, materialized_views=True)
)
return views
return []
def _get_stored_procedures_internal(
self, query: str
@ -823,9 +768,8 @@ class SnowflakeSource(
"""List Snowflake stored procedures"""
if self.source_config.includeStoredProcedures:
yield from self._get_stored_procedures_internal(
SNOWFLAKE_GET_STORED_PROCEDURES
SNOWFLAKE_GET_STORED_PROCEDURES_AND_FUNCTIONS
)
yield from self._get_stored_procedures_internal(SNOWFLAKE_GET_FUNCTIONS)
def describe_procedure_definition(
self, stored_procedure: SnowflakeStoredProcedure

View File

@ -21,6 +21,7 @@ from sqlalchemy import text
from sqlalchemy.orm import Session
from metadata.generated.schema.entity.data.storedProcedure import Language
from metadata.generated.schema.entity.data.table import TableType
from metadata.ingestion.source.database.snowflake.queries import (
SNOWFLAKE_QUERY_LOG_QUERY,
)
@ -92,6 +93,7 @@ class SnowflakeTable(BaseModel):
name: str
deleted: Optional[datetime] = None
type_: Optional[TableType] = None
class SnowflakeTableList(BaseModel):

View File

@ -14,6 +14,33 @@ SQL Queries used during ingestion
import textwrap
SNOWFLAKE_GET_TABLE_NAMES = """
select TABLE_NAME, NULL, TABLE_TYPE from information_schema.tables
where TABLE_SCHEMA = '{schema}'
AND COALESCE(IS_TRANSIENT, 'NO') != '{is_transient}'
AND {include_views}
"""
SNOWFLAKE_INCREMENTAL_GET_TABLE_NAMES = """
select TABLE_NAME, DELETED, TABLE_TYPE
from (
select
TABLE_NAME,
DELETED,
TABLE_TYPE,
ROW_NUMBER() over (
partition by TABLE_NAME order by LAST_DDL desc
) as ROW_NUMBER
from {account_usage}.tables
where TABLE_CATALOG = '{database}'
and TABLE_SCHEMA = '{schema}'
and COALESCE(IS_TRANSIENT, 'NO') != '{is_transient}'
and DATE_PART(epoch_millisecond, LAST_DDL) >= '{date}'
and {include_views}
)
where ROW_NUMBER = 1
"""
SNOWFLAKE_SQL_STATEMENT = textwrap.dedent(
"""
SELECT
@ -317,7 +344,7 @@ and table_catalog = '{database_name}'
"""
)
SNOWFLAKE_GET_STORED_PROCEDURES = textwrap.dedent(
SNOWFLAKE_GET_STORED_PROCEDURES_AND_FUNCTIONS = textwrap.dedent(
"""
SELECT
PROCEDURE_NAME AS name,
@ -331,11 +358,9 @@ FROM {account_usage}.PROCEDURES
WHERE PROCEDURE_CATALOG = '{database_name}'
AND PROCEDURE_SCHEMA = '{schema_name}'
AND DELETED IS NULL
"""
)
SNOWFLAKE_GET_FUNCTIONS = textwrap.dedent(
"""
UNION ALL
SELECT
FUNCTION_NAME AS name,
FUNCTION_OWNER AS owner,

View File

@ -24,6 +24,7 @@ from sqlalchemy.engine import reflection
from sqlalchemy.sql import text
from sqlalchemy.types import FLOAT
from metadata.generated.schema.entity.data.table import TableType
from metadata.ingestion.source.database.incremental_metadata_extraction import (
IncrementalConfig,
)
@ -33,25 +34,19 @@ from metadata.ingestion.source.database.snowflake.models import (
)
from metadata.ingestion.source.database.snowflake.queries import (
SNOWFLAKE_GET_COMMENTS,
SNOWFLAKE_GET_DYNAMIC_TABLE_NAMES,
SNOWFLAKE_GET_EXTERNAL_TABLE_NAMES,
SNOWFLAKE_GET_MVIEW_NAMES,
SNOWFLAKE_GET_SCHEMA_COLUMNS,
SNOWFLAKE_GET_STREAM_DEFINITION,
SNOWFLAKE_GET_STREAM_NAMES,
SNOWFLAKE_GET_TABLE_DDL,
SNOWFLAKE_GET_TRANSIENT_NAMES,
SNOWFLAKE_GET_TABLE_NAMES,
SNOWFLAKE_GET_VIEW_DDL,
SNOWFLAKE_GET_VIEW_DEFINITION,
SNOWFLAKE_GET_VIEW_NAMES,
SNOWFLAKE_GET_WITHOUT_TRANSIENT_TABLE_NAMES,
SNOWFLAKE_INCREMENTAL_GET_DYNAMIC_TABLE_NAMES,
SNOWFLAKE_INCREMENTAL_GET_EXTERNAL_TABLE_NAMES,
SNOWFLAKE_INCREMENTAL_GET_MVIEW_NAMES,
SNOWFLAKE_INCREMENTAL_GET_STREAM_NAMES,
SNOWFLAKE_INCREMENTAL_GET_TRANSIENT_NAMES,
SNOWFLAKE_INCREMENTAL_GET_TABLE_NAMES,
SNOWFLAKE_INCREMENTAL_GET_VIEW_NAMES,
SNOWFLAKE_INCREMENTAL_GET_WITHOUT_TRANSIENT_TABLE_NAMES,
)
from metadata.utils import fqn
from metadata.utils.sqlalchemy_utils import (
@ -67,16 +62,10 @@ QueryMap = Dict[str, Query]
TABLE_QUERY_MAPS = {
"full": {
"default": SNOWFLAKE_GET_WITHOUT_TRANSIENT_TABLE_NAMES,
"transient_tables": SNOWFLAKE_GET_TRANSIENT_NAMES,
"external_tables": SNOWFLAKE_GET_EXTERNAL_TABLE_NAMES,
"dynamic_tables": SNOWFLAKE_GET_DYNAMIC_TABLE_NAMES,
"default": SNOWFLAKE_GET_TABLE_NAMES,
},
"incremental": {
"default": SNOWFLAKE_INCREMENTAL_GET_WITHOUT_TRANSIENT_TABLE_NAMES,
"transient_tables": SNOWFLAKE_INCREMENTAL_GET_TRANSIENT_NAMES,
"external_tables": SNOWFLAKE_INCREMENTAL_GET_EXTERNAL_TABLE_NAMES,
"dynamic_tables": SNOWFLAKE_INCREMENTAL_GET_DYNAMIC_TABLE_NAMES,
"default": SNOWFLAKE_INCREMENTAL_GET_TABLE_NAMES,
},
}
@ -194,9 +183,15 @@ def _get_query_parameters(
schema: str,
incremental: Optional[IncrementalConfig],
account_usage: Optional[str] = None,
include_transient_tables: Optional[bool] = False,
include_views: Optional[bool] = False,
):
"""Returns the proper query parameters depending if the extraction is Incremental or Full"""
parameters = {"schema": fqn.unquote_name(schema)}
parameters = {
"schema": fqn.unquote_name(schema),
"is_transient": "YES" if include_transient_tables else "NO",
"include_views": "TRUE" if include_views else "TABLE_TYPE != 'VIEW'",
}
if incremental and incremental.enabled:
database, _ = self._current_database_schema(connection) # pylint: disable=W0212
@ -217,30 +212,43 @@ def get_table_names(self, connection, schema: str, **kw):
queries = _get_query_map(incremental, TABLE_QUERY_MAPS)
parameters = _get_query_parameters(
self, connection, schema, incremental, account_usage
self,
connection,
schema,
incremental,
account_usage,
include_transient_tables=kw.get("include_transient_tables", False),
include_views=kw.get("include_views", False),
)
query = queries["default"]
if kw.get("include_transient_tables"):
query = queries["transient_tables"]
if kw.get("external_tables"):
query = queries["external_tables"]
if kw.get("dynamic_tables"):
query = queries["dynamic_tables"]
cursor = connection.execute(query.format(**parameters))
result = SnowflakeTableList(
tables=[
SnowflakeTable(name=self.normalize_name(row[0]), deleted=row[1])
SnowflakeTable(
name=self.normalize_name(row[0]),
deleted=row[1],
type_=_get_table_type(row[2] if row[2] else "BASE TABLE"),
)
for row in cursor
]
)
return result
def _get_table_type(table_type: str) -> TableType:
table_type_map = {
"BASE TABLE": TableType.Regular,
"VIEW": TableType.View,
"MATERIALIZED VIEW": TableType.MaterializedView,
"EXTERNAL TABLE": TableType.External,
"TRANSIENT TABLE": TableType.Transient,
"DYNAMIC TABLE": TableType.Dynamic,
}
return table_type_map.get(table_type, TableType.Regular)
def get_view_names(self, connection, schema, **kw):
incremental = kw.get("incremental")
account_usage = kw.get("account_usage")