MINOR: Configurable account usage for incremental metadata extraction (#21182)

This commit is contained in:
Mayur Singal 2025-05-19 10:15:29 +05:30 committed by GitHub
parent 4c0ce77756
commit 2157337847
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 44 additions and 10 deletions

View File

@ -128,6 +128,15 @@ class LifeCycleQueryMixin:
)
)
def get_life_cycle_query(self):
"""
Get the life cycle query
"""
return self.life_cycle_query.format(
database_name=self.context.get().database,
schema_name=self.context.get().database_schema,
)
def yield_life_cycle_data(self, _) -> Iterable[Either[OMetaLifeCycleData]]:
"""
Get the life cycle data of the table

View File

@ -169,6 +169,8 @@ class SnowflakeSource(
Database metadata from Snowflake Source
"""
service_connection: SnowflakeConnection
def __init__(
self,
config,
@ -492,6 +494,7 @@ class SnowflakeSource(
snowflake_tables = self.inspector.get_table_names(
schema=schema_name,
incremental=self.incremental,
account_usage=self.service_connection.accountUsageSchema,
**table_type_to_params_map[table_type],
)
@ -640,6 +643,7 @@ class SnowflakeSource(
snowflake_views = self.inspector.get_view_names(
schema=schema_name,
incremental=self.incremental,
account_usage=self.service_connection.accountUsageSchema,
materialized_views=materialized_views,
)
@ -894,3 +898,13 @@ class SnowflakeSource(
logger.debug(f"Failed to fetch schema definition for {table_name}: {exc}")
return None
def get_life_cycle_query(self):
"""
Get the life cycle query
"""
return self.life_cycle_query.format(
database_name=self.context.get().database,
schema_name=self.context.get().database_schema,
account_usage=self.service_connection.accountUsageSchema,
)

View File

@ -60,7 +60,7 @@ from (
ROW_NUMBER() over (
partition by TABLE_NAME order by LAST_DDL desc
) as ROW_NUMBER
from snowflake.account_usage.tables
from {account_usage}.tables
where TABLE_CATALOG = '{database}'
and TABLE_SCHEMA = '{schema}'
and TABLE_TYPE = 'EXTERNAL TABLE'
@ -86,7 +86,7 @@ from (
ROW_NUMBER() over (
partition by TABLE_NAME order by LAST_DDL desc
) as ROW_NUMBER
from snowflake.account_usage.tables
from {account_usage}.tables
where TABLE_CATALOG = '{database}'
and TABLE_SCHEMA = '{schema}'
and TABLE_TYPE = 'BASE TABLE'
@ -111,7 +111,7 @@ from (
ROW_NUMBER() over (
partition by TABLE_NAME order by LAST_DDL desc
) as ROW_NUMBER
from snowflake.account_usage.tables
from {account_usage}.tables
where TABLE_CATALOG = '{database}'
and TABLE_SCHEMA = '{schema}'
and TABLE_TYPE = 'VIEW'
@ -134,7 +134,7 @@ from (
ROW_NUMBER() over (
partition by TABLE_NAME order by LAST_DDL desc
) as ROW_NUMBER
from snowflake.account_usage.tables
from {account_usage}.tables
where TABLE_CATALOG = '{database}'
and TABLE_SCHEMA = '{schema}'
and TABLE_TYPE = 'MATERIALIZED VIEW'
@ -171,7 +171,7 @@ from (
ROW_NUMBER() over (
partition by TABLE_NAME order by LAST_DDL desc
) as ROW_NUMBER
from snowflake.account_usage.tables
from {account_usage}.tables
where TABLE_CATALOG = '{database}'
and TABLE_SCHEMA = '{schema}'
and TABLE_TYPE = 'BASE TABLE'
@ -197,7 +197,7 @@ from (
ROW_NUMBER() over (
partition by TABLE_NAME order by LAST_DDL desc
) as ROW_NUMBER
from snowflake.account_usage.tables
from {account_usage}.tables
where TABLE_CATALOG = '{database}'
and TABLE_SCHEMA = '{schema}'
and TABLE_TYPE = 'BASE TABLE'
@ -297,7 +297,7 @@ SNOWFLAKE_LIFE_CYCLE_QUERY = textwrap.dedent(
select
table_name as table_name,
created as created_at
from snowflake.account_usage.tables
from {account_usage}.tables
where table_schema = '{schema_name}'
and table_catalog = '{database_name}'
"""

View File

@ -188,7 +188,11 @@ def _get_query_map(
def _get_query_parameters(
self, connection, schema: str, incremental: Optional[IncrementalConfig]
self,
connection,
schema: str,
incremental: Optional[IncrementalConfig],
account_usage: Optional[str] = None,
):
"""Returns the proper query parameters depending if the extraction is Incremental or Full"""
parameters = {"schema": fqn.unquote_name(schema)}
@ -199,6 +203,7 @@ def _get_query_parameters(
**parameters,
"date": incremental.start_timestamp,
"database": database,
"account_usage": account_usage or "SNOWFLAKE.ACCOUNT_USAGE",
}
return parameters
@ -207,9 +212,12 @@ def _get_query_parameters(
def get_table_names(self, connection, schema: str, **kw):
"""Return the Table names to process based on the incremental setup."""
incremental = kw.get("incremental")
account_usage = kw.get("account_usage")
queries = _get_query_map(incremental, TABLE_QUERY_MAPS)
parameters = _get_query_parameters(self, connection, schema, incremental)
parameters = _get_query_parameters(
self, connection, schema, incremental, account_usage
)
query = queries["default"]
@ -234,9 +242,12 @@ def get_table_names(self, connection, schema: str, **kw):
def get_view_names(self, connection, schema, **kw):
incremental = kw.get("incremental")
account_usage = kw.get("account_usage")
queries = _get_query_map(incremental, VIEW_QUERY_MAPS)
parameters = _get_query_parameters(self, connection, schema, incremental)
parameters = _get_query_parameters(
self, connection, schema, incremental, account_usage
)
if kw.get("materialized_views"):
query = queries["materialized_views"]