From c12e4b27e7c07a2a8f8fe44047965bc69be5d14e Mon Sep 17 00:00:00 2001 From: Jonny Dixon <45681293+acrylJonny@users.noreply.github.com> Date: Mon, 11 Aug 2025 19:17:27 +0100 Subject: [PATCH] feat(ingestion/snowflake): formal support of dynamic tables (#13542) --- .../docs/sources/snowflake/snowflake_pre.md | 13 +- .../ingestion/source/common/subtypes.py | 1 + .../ingestion/source/snowflake/constants.py | 1 + .../source/snowflake/snowflake_query.py | 55 ++- .../source/snowflake/snowflake_schema.py | 182 +++++++++- .../source/snowflake/snowflake_schema_gen.py | 27 +- .../source/snowflake/snowflake_utils.py | 19 +- .../source/snowflake/snowflake_v2.py | 4 +- .../tests/integration/snowflake/common.py | 23 ++ .../snowflake/snowflake_golden.json | 304 ++++++++++++++++- .../snowflake_privatelink_golden.json | 304 ++++++++++++++++- ...nowflake_structured_properties_golden.json | 321 +++++++++++++++++- .../integration/snowflake/test_snowflake.py | 5 +- .../test_snowflake_dynamic_tables.py | 296 ++++++++++++++++ .../unit/snowflake/test_snowflake_source.py | 142 ++++++++ 15 files changed, 1651 insertions(+), 46 deletions(-) create mode 100644 metadata-ingestion/tests/unit/snowflake/test_snowflake_dynamic_tables.py diff --git a/metadata-ingestion/docs/sources/snowflake/snowflake_pre.md b/metadata-ingestion/docs/sources/snowflake/snowflake_pre.md index 683124a0d8..2a1130b9fe 100644 --- a/metadata-ingestion/docs/sources/snowflake/snowflake_pre.md +++ b/metadata-ingestion/docs/sources/snowflake/snowflake_pre.md @@ -11,12 +11,12 @@ create or replace role datahub_role; // Grant access to a warehouse to run queries to view metadata grant operate, usage on warehouse "" to role datahub_role; -// Grant access to view database and schema in which your tables/views exist +// Grant access to view database and schema in which your tables/views/dynamic tables exist grant usage on DATABASE "" to role datahub_role; grant usage on all schemas in database "" to role datahub_role; grant usage on future schemas in database "" to role datahub_role; -grant select on all streams in database "> to role datahub_role; -grant select on future streams in database "> to role datahub_role; +grant select on all streams in database "" to role datahub_role; +grant select on future streams in database "" to role datahub_role; // If you are NOT using Snowflake Profiling or Classification feature: Grant references privileges to your tables and views grant references on all tables in database "" to role datahub_role; @@ -25,6 +25,8 @@ grant references on all external tables in database "" to role da grant references on future external tables in database "" to role datahub_role; grant references on all views in database "" to role datahub_role; grant references on future views in database "" to role datahub_role; + +// Grant monitor privileges for dynamic tables (Enterprise Edition feature) grant monitor on all dynamic tables in database "" to role datahub_role; grant monitor on future dynamic tables in database "" to role datahub_role; @@ -53,7 +55,7 @@ The details of each granted privilege can be viewed in [snowflake docs](https:// this permission is not required. - `usage` is required for us to run queries using the warehouse - `usage` on `database` and `schema` are required because without it tables, views, and streams inside them are not accessible. If an admin does the required grants on `table` but misses the grants on `schema` or the `database` in which the table/view/stream exists then we will not be able to get metadata for the table/view/stream. -- If metadata is required only on some schemas then you can grant the usage privilieges only on a particular schema like +- If metadata is required only on some schemas then you can grant the usage privileges only on a particular schema like ```sql grant usage on schema ""."" to role datahub_role; @@ -156,6 +158,7 @@ If you are using [Snowflake Shares](https://docs.snowflake.com/en/user-guide/dat ### Caveats -- Some of the features are only available in the Snowflake Enterprise Edition. This doc has notes mentioning where this applies. +- Some of the features are only available in the Snowflake Enterprise Edition. This includes dynamic tables, advanced lineage features, and tags. This doc has notes mentioning where this applies. +- Dynamic tables require the `monitor` privilege for metadata extraction. Without this privilege, dynamic tables will not be visible to DataHub. - The underlying Snowflake views that we use to get metadata have a [latency of 45 minutes to 3 hours](https://docs.snowflake.com/en/sql-reference/account-usage.html#differences-between-account-usage-and-information-schema). So we would not be able to get very recent metadata in some cases like queries you ran within that time period etc. This is applicable particularly for lineage, usage and tags (without lineage) extraction. - If there is any [incident going on for Snowflake](https://status.snowflake.com/) we will not be able to get the metadata until that incident is resolved. diff --git a/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py b/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py index e1a76e7a2e..a2d9a33189 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py @@ -30,6 +30,7 @@ class DatasetSubTypes(StrEnum): NEO4J_NODE = "Neo4j Node" NEO4J_RELATIONSHIP = "Neo4j Relationship" SNOWFLAKE_STREAM = "Snowflake Stream" + DYNAMIC_TABLE = "Dynamic Table" API_ENDPOINT = "API Endpoint" SLACK_CHANNEL = "Slack Channel" PROJECTIONS = "Projections" diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/constants.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/constants.py index f9bd7933a3..76cefbbc6c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/constants.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/constants.py @@ -55,6 +55,7 @@ class SnowflakeObjectDomain(StrEnum): ICEBERG_TABLE = "iceberg table" STREAM = "stream" PROCEDURE = "procedure" + DYNAMIC_TABLE = "dynamic table" GENERIC_PERMISSION_ERROR_KEY = "permission-error" diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py index c023ed9c75..c68e6b797f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py @@ -8,7 +8,7 @@ from datahub.ingestion.source.snowflake.snowflake_config import ( ) from datahub.utilities.prefix_batch_builder import PrefixGroup -SHOW_VIEWS_MAX_PAGE_SIZE = 10000 +SHOW_COMMAND_MAX_PAGE_SIZE = 10000 SHOW_STREAM_MAX_PAGE_SIZE = 10000 @@ -38,12 +38,23 @@ class SnowflakeQuery: SnowflakeObjectDomain.MATERIALIZED_VIEW.capitalize(), SnowflakeObjectDomain.ICEBERG_TABLE.capitalize(), SnowflakeObjectDomain.STREAM.capitalize(), + SnowflakeObjectDomain.DYNAMIC_TABLE.capitalize(), } ACCESS_HISTORY_TABLE_VIEW_DOMAINS_FILTER = "({})".format( ",".join(f"'{domain}'" for domain in ACCESS_HISTORY_TABLE_VIEW_DOMAINS) ) + # Domains that can be downstream tables in lineage + DOWNSTREAM_TABLE_DOMAINS = { + SnowflakeObjectDomain.TABLE.capitalize(), + SnowflakeObjectDomain.DYNAMIC_TABLE.capitalize(), + } + + DOWNSTREAM_TABLE_DOMAINS_FILTER = "({})".format( + ",".join(f"'{domain}'" for domain in DOWNSTREAM_TABLE_DOMAINS) + ) + @staticmethod def current_account() -> str: return "select CURRENT_ACCOUNT()" @@ -235,7 +246,7 @@ class SnowflakeQuery: @staticmethod def show_views_for_database( db_name: str, - limit: int = SHOW_VIEWS_MAX_PAGE_SIZE, + limit: int = SHOW_COMMAND_MAX_PAGE_SIZE, view_pagination_marker: Optional[str] = None, ) -> str: # While there is an information_schema.views view, that only shows the view definition if the role @@ -244,7 +255,7 @@ class SnowflakeQuery: # SHOW VIEWS can return a maximum of 10000 rows. # https://docs.snowflake.com/en/sql-reference/sql/show-views#usage-notes - assert limit <= SHOW_VIEWS_MAX_PAGE_SIZE + assert limit <= SHOW_COMMAND_MAX_PAGE_SIZE # To work around this, we paginate through the results using the FROM clause. from_clause = ( @@ -686,7 +697,7 @@ WHERE table_schema='{schema_name}' AND {extra_clause}""" AND t.query_start_time >= to_timestamp_ltz({start_time_millis}, 3) AND t.query_start_time < to_timestamp_ltz({end_time_millis}, 3) AND upstream_table_domain in {allowed_upstream_table_domains} - AND downstream_table_domain = '{SnowflakeObjectDomain.TABLE.capitalize()}' + AND downstream_table_domain in {SnowflakeQuery.DOWNSTREAM_TABLE_DOMAINS_FILTER} {("AND " + upstream_sql_filter) if upstream_sql_filter else ""} ), column_upstream_jobs AS ( @@ -843,7 +854,7 @@ WHERE table_schema='{schema_name}' AND {extra_clause}""" AND t.query_start_time >= to_timestamp_ltz({start_time_millis}, 3) AND t.query_start_time < to_timestamp_ltz({end_time_millis}, 3) AND upstream_table_domain in {allowed_upstream_table_domains} - AND downstream_table_domain = '{SnowflakeObjectDomain.TABLE.capitalize()}' + AND downstream_table_domain in {SnowflakeQuery.DOWNSTREAM_TABLE_DOMAINS_FILTER} {("AND " + upstream_sql_filter) if upstream_sql_filter else ""} ), table_upstream_jobs_unique AS ( @@ -940,3 +951,37 @@ WHERE table_schema='{schema_name}' AND {extra_clause}""" f"""FROM '{stream_pagination_marker}'""" if stream_pagination_marker else "" ) return f"""SHOW STREAMS IN DATABASE "{db_name}" LIMIT {limit} {from_clause};""" + + @staticmethod + def show_dynamic_tables_for_database( + db_name: str, + limit: int = SHOW_COMMAND_MAX_PAGE_SIZE, + dynamic_table_pagination_marker: Optional[str] = None, + ) -> str: + """Get dynamic table definitions using SHOW DYNAMIC TABLES.""" + assert limit <= SHOW_COMMAND_MAX_PAGE_SIZE + + from_clause = ( + f"""FROM '{dynamic_table_pagination_marker}'""" + if dynamic_table_pagination_marker + else "" + ) + return f"""\ + SHOW DYNAMIC TABLES IN DATABASE "{db_name}" + LIMIT {limit} {from_clause}; + """ + + @staticmethod + def get_dynamic_table_graph_history(db_name: str) -> str: + """Get dynamic table dependency information from information schema.""" + return f""" + SELECT + name, + inputs, + target_lag_type, + target_lag_sec, + scheduling_state, + alter_trigger + FROM TABLE("{db_name}".INFORMATION_SCHEMA.DYNAMIC_TABLE_GRAPH_HISTORY()) + ORDER BY name + """ diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py index 38a8295a96..2fb0015803 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py @@ -3,16 +3,17 @@ import os from collections import defaultdict from dataclasses import dataclass, field from datetime import datetime -from typing import Callable, Dict, Iterable, List, MutableMapping, Optional +from typing import Any, Callable, Dict, Iterable, List, MutableMapping, Optional from datahub.ingestion.api.report import SupportsAsObj from datahub.ingestion.source.common.subtypes import DatasetSubTypes from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain from datahub.ingestion.source.snowflake.snowflake_connection import SnowflakeConnection from datahub.ingestion.source.snowflake.snowflake_query import ( - SHOW_VIEWS_MAX_PAGE_SIZE, + SHOW_COMMAND_MAX_PAGE_SIZE, SnowflakeQuery, ) +from datahub.ingestion.source.snowflake.snowflake_report import SnowflakeV2Report from datahub.ingestion.source.sql.sql_generic import BaseColumn, BaseTable, BaseView from datahub.ingestion.source.sql.stored_procedures.base import BaseProcedure from datahub.utilities.file_backed_collections import FileBackedDict @@ -103,6 +104,17 @@ class SnowflakeTable(BaseTable): return DatasetSubTypes.TABLE +@dataclass +class SnowflakeDynamicTable(SnowflakeTable): + definition: Optional[str] = ( + None # SQL query that defines the dynamic table's content + ) + target_lag: Optional[str] = None # Refresh frequency (e.g., "1 HOUR", "30 MINUTES") + + def get_subtype(self) -> DatasetSubTypes: + return DatasetSubTypes.DYNAMIC_TABLE + + @dataclass class SnowflakeView(BaseView): materialized: bool = False @@ -226,8 +238,11 @@ class _SnowflakeTagCache: class SnowflakeDataDictionary(SupportsAsObj): - def __init__(self, connection: SnowflakeConnection) -> None: + def __init__( + self, connection: SnowflakeConnection, report: SnowflakeV2Report + ) -> None: self.connection = connection + self.report = report def as_obj(self) -> Dict[str, Dict[str, int]]: # TODO: Move this into a proper report type that gets computed. @@ -355,8 +370,11 @@ class SnowflakeDataDictionary(SupportsAsObj): if table["TABLE_SCHEMA"] not in tables: tables[table["TABLE_SCHEMA"]] = [] + is_dynamic = table.get("IS_DYNAMIC", "NO").upper() == "YES" + table_cls = SnowflakeDynamicTable if is_dynamic else SnowflakeTable + tables[table["TABLE_SCHEMA"]].append( - SnowflakeTable( + table_cls( name=table["TABLE_NAME"], type=table["TABLE_TYPE"], created=table["CREATED"], @@ -365,11 +383,15 @@ class SnowflakeDataDictionary(SupportsAsObj): rows_count=table["ROW_COUNT"], comment=table["COMMENT"], clustering_key=table["CLUSTERING_KEY"], - is_dynamic=table.get("IS_DYNAMIC", "NO").upper() == "YES", + is_dynamic=is_dynamic, is_iceberg=table.get("IS_ICEBERG", "NO").upper() == "YES", is_hybrid=table.get("IS_HYBRID", "NO").upper() == "YES", ) ) + + # Populate dynamic table definitions + self.populate_dynamic_table_definitions(tables, db_name) + return tables def get_tables_for_schema( @@ -382,8 +404,11 @@ class SnowflakeDataDictionary(SupportsAsObj): ) for table in cur: + is_dynamic = table.get("IS_DYNAMIC", "NO").upper() == "YES" + table_cls = SnowflakeDynamicTable if is_dynamic else SnowflakeTable + tables.append( - SnowflakeTable( + table_cls( name=table["TABLE_NAME"], type=table["TABLE_TYPE"], created=table["CREATED"], @@ -392,16 +417,21 @@ class SnowflakeDataDictionary(SupportsAsObj): rows_count=table["ROW_COUNT"], comment=table["COMMENT"], clustering_key=table["CLUSTERING_KEY"], - is_dynamic=table.get("IS_DYNAMIC", "NO").upper() == "YES", + is_dynamic=is_dynamic, is_iceberg=table.get("IS_ICEBERG", "NO").upper() == "YES", is_hybrid=table.get("IS_HYBRID", "NO").upper() == "YES", ) ) + + # Populate dynamic table definitions for just this schema + schema_tables = {schema_name: tables} + self.populate_dynamic_table_definitions(schema_tables, db_name) + return tables @serialized_lru_cache(maxsize=1) def get_views_for_database(self, db_name: str) -> Dict[str, List[SnowflakeView]]: - page_limit = SHOW_VIEWS_MAX_PAGE_SIZE + page_limit = SHOW_COMMAND_MAX_PAGE_SIZE views: Dict[str, List[SnowflakeView]] = {} @@ -660,7 +690,7 @@ class SnowflakeDataDictionary(SupportsAsObj): def get_streams_for_database( self, db_name: str ) -> Dict[str, List[SnowflakeStream]]: - page_limit = SHOW_VIEWS_MAX_PAGE_SIZE + page_limit = SHOW_COMMAND_MAX_PAGE_SIZE streams: Dict[str, List[SnowflakeStream]] = {} @@ -743,3 +773,137 @@ class SnowflakeDataDictionary(SupportsAsObj): ) ) return procedures + + @serialized_lru_cache(maxsize=1) + def get_dynamic_table_graph_info(self, db_name: str) -> Dict[str, Dict[str, Any]]: + """Get dynamic table dependency information from information schema.""" + dt_graph_info: Dict[str, Dict[str, Any]] = {} + try: + cur = self.connection.query( + SnowflakeQuery.get_dynamic_table_graph_history(db_name) + ) + for row in cur: + dt_name = row["NAME"] + dt_graph_info[dt_name] = { + "inputs": row.get("INPUTS"), + "target_lag_type": row.get("TARGET_LAG_TYPE"), + "target_lag_sec": row.get("TARGET_LAG_SEC"), + "scheduling_state": row.get("SCHEDULING_STATE"), + "alter_trigger": row.get("ALTER_TRIGGER"), + } + logger.debug( + f"Successfully retrieved graph info for {len(dt_graph_info)} dynamic tables in {db_name}" + ) + except Exception as e: + self.report.warning( + "Failed to get dynamic table graph history", + db_name, + exc=e, + ) + + return dt_graph_info + + @serialized_lru_cache(maxsize=1) + def get_dynamic_tables_with_definitions( + self, db_name: str + ) -> Dict[str, List[SnowflakeDynamicTable]]: + """Get dynamic tables with their definitions using SHOW DYNAMIC TABLES.""" + page_limit = SHOW_COMMAND_MAX_PAGE_SIZE + dynamic_tables: Dict[str, List[SnowflakeDynamicTable]] = {} + + # Get graph/dependency information (pass db_name) + dt_graph_info = self.get_dynamic_table_graph_info(db_name) + + first_iteration = True + dt_pagination_marker: Optional[str] = None + + while first_iteration or dt_pagination_marker is not None: + try: + cur = self.connection.query( + SnowflakeQuery.show_dynamic_tables_for_database( + db_name, + limit=page_limit, + dynamic_table_pagination_marker=dt_pagination_marker, + ) + ) + + first_iteration = False + dt_pagination_marker = None + result_set_size = 0 + + for dt in cur: + result_set_size += 1 + + dt_name = dt["name"] + schema_name = dt["schema_name"] + + if schema_name not in dynamic_tables: + dynamic_tables[schema_name] = [] + + # Get definition from SHOW result + definition = dt.get("text") + + # Get target lag from SHOW result or graph info + target_lag = dt.get("target_lag") + if not target_lag and dt_graph_info: + qualified_name = f"{db_name}.{schema_name}.{dt_name}" + graph_info = dt_graph_info.get(qualified_name, {}) + if graph_info.get("target_lag_type") and graph_info.get( + "target_lag_sec" + ): + target_lag = f"{graph_info['target_lag_sec']} {graph_info['target_lag_type']}" + + dynamic_tables[schema_name].append( + SnowflakeDynamicTable( + name=dt_name, + created=dt["created_on"], + last_altered=dt.get("created_on"), + size_in_bytes=dt.get("bytes", 0), + rows_count=dt.get("rows", 0), + comment=dt.get("comment"), + definition=definition, + target_lag=target_lag, + is_dynamic=True, + type="DYNAMIC TABLE", + ) + ) + + if result_set_size >= page_limit: + logger.info( + f"Fetching next page of dynamic tables for {db_name} - after {dt_name}" + ) + dt_pagination_marker = dt_name + + except Exception as e: + logger.debug( + f"Failed to get dynamic tables for database {db_name}: {e}" + ) + break + + return dynamic_tables + + def populate_dynamic_table_definitions( + self, tables: Dict[str, List[SnowflakeTable]], db_name: str + ) -> None: + """Populate dynamic table definitions for tables that are marked as dynamic.""" + try: + # Get dynamic tables with definitions from SHOW command + dt_with_definitions = self.get_dynamic_tables_with_definitions(db_name) + + for schema_name, table_list in tables.items(): + for table in table_list: + if ( + isinstance(table, SnowflakeDynamicTable) + and table.definition is None + ): + # Find matching dynamic table from SHOW results + show_dt_list = dt_with_definitions.get(schema_name, []) + for show_dt in show_dt_list: + if show_dt.name == table.name: + table.definition = show_dt.definition + table.target_lag = show_dt.target_lag + break + except Exception as e: + logger.debug( + f"Failed to populate dynamic table definitions for {db_name}: {e}" + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py index 08d4bbe118..bb2bd3041b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py @@ -45,6 +45,7 @@ from datahub.ingestion.source.snowflake.snowflake_schema import ( SnowflakeColumn, SnowflakeDatabase, SnowflakeDataDictionary, + SnowflakeDynamicTable, SnowflakeFK, SnowflakePK, SnowflakeSchema, @@ -182,7 +183,7 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin): self.identifiers: SnowflakeIdentifierBuilder = identifiers self.data_dictionary: SnowflakeDataDictionary = SnowflakeDataDictionary( - connection=self.connection + connection=self.connection, report=self.report ) self.report.data_dictionary_cache = self.data_dictionary @@ -495,6 +496,22 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin): if self.config.include_technical_schema: data_reader = self.make_data_reader() for table in tables: + # Handle dynamic table definitions for lineage + if ( + isinstance(table, SnowflakeDynamicTable) + and table.definition + and self.aggregator + ): + table_identifier = self.identifiers.get_dataset_identifier( + table.name, schema_name, db_name + ) + self.aggregator.add_view_definition( + view_urn=self.identifiers.gen_dataset_urn(table_identifier), + view_definition=table.definition, + default_db=db_name, + default_schema=schema_name, + ) + table_wu_generator = self._process_table( table, snowflake_schema, db_name ) @@ -935,6 +952,10 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin): } ) + if isinstance(table, SnowflakeDynamicTable): + if table.target_lag: + custom_properties["TARGET_LAG"] = table.target_lag + if isinstance(table, SnowflakeView) and table.is_secure: custom_properties["IS_SECURE"] = "true" @@ -980,7 +1001,9 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin): schema_name, db_name, ( - SnowflakeObjectDomain.TABLE + SnowflakeObjectDomain.DYNAMIC_TABLE + if isinstance(table, SnowflakeTable) and table.is_dynamic + else SnowflakeObjectDomain.TABLE if isinstance(table, SnowflakeTable) else SnowflakeObjectDomain.VIEW ), diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py index 20d55c3f1f..2f792408a8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py @@ -93,9 +93,20 @@ class SnowsightUrlBuilder: table_name: str, schema_name: str, db_name: str, - domain: Literal[SnowflakeObjectDomain.TABLE, SnowflakeObjectDomain.VIEW], + domain: Literal[ + SnowflakeObjectDomain.TABLE, + SnowflakeObjectDomain.VIEW, + SnowflakeObjectDomain.DYNAMIC_TABLE, + ], ) -> Optional[str]: - return f"{self.snowsight_base_url}#/data/databases/{db_name}/schemas/{schema_name}/{domain}/{table_name}/" + # For dynamic tables, use the dynamic-table domain in the URL path + # Ensure only explicitly dynamic tables use dynamic-table URL path + url_domain = ( + "dynamic-table" + if domain == SnowflakeObjectDomain.DYNAMIC_TABLE + else str(domain) + ) + return f"{self.snowsight_base_url}#/data/databases/{db_name}/schemas/{schema_name}/{url_domain}/{table_name}/" def get_external_url_for_schema( self, schema_name: str, db_name: str @@ -129,6 +140,7 @@ class SnowflakeFilter: SnowflakeObjectDomain.MATERIALIZED_VIEW, SnowflakeObjectDomain.ICEBERG_TABLE, SnowflakeObjectDomain.STREAM, + SnowflakeObjectDomain.DYNAMIC_TABLE, ): return False if _is_sys_table(dataset_name): @@ -160,7 +172,8 @@ class SnowflakeFilter: return False if dataset_type.lower() in { - SnowflakeObjectDomain.TABLE + SnowflakeObjectDomain.TABLE, + SnowflakeObjectDomain.DYNAMIC_TABLE, } and not self.filter_config.table_pattern.allowed( _cleanup_qualified_name(dataset_name, self.structured_reporter) ): diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index 54cce9b8ae..d5c136b3be 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -171,7 +171,9 @@ class SnowflakeV2Source( ) # For database, schema, tables, views, etc - self.data_dictionary = SnowflakeDataDictionary(connection=self.connection) + self.data_dictionary = SnowflakeDataDictionary( + connection=self.connection, report=self.report + ) self.lineage_extractor: Optional[SnowflakeLineageExtractor] = None self.discovered_datasets: Optional[List[str]] = None diff --git a/metadata-ingestion/tests/integration/snowflake/common.py b/metadata-ingestion/tests/integration/snowflake/common.py index 2380dca64b..79249a2a07 100644 --- a/metadata-ingestion/tests/integration/snowflake/common.py +++ b/metadata-ingestion/tests/integration/snowflake/common.py @@ -769,4 +769,27 @@ def default_query_results( # noqa: C901 "COMMENT": "This is a test procedure 2", }, ] + elif query == SnowflakeQuery.get_dynamic_table_graph_history("TEST_DB"): + # Return empty result for dynamic table graph history in test environment + return [] + elif query == SnowflakeQuery.show_dynamic_tables_for_database("TEST_DB"): + # Return dynamic table definitions for TABLE_2 which should be a dynamic table + return [ + { + "created_on": datetime(2021, 6, 8, 0, 0, 0, 0), + "name": "TABLE_2", + "database_name": "TEST_DB", + "schema_name": "TEST_SCHEMA", + "owner": "ACCOUNTADMIN", + "comment": "Comment for Table", + "text": "CREATE DYNAMIC TABLE TEST_DB.TEST_SCHEMA.TABLE_2 TARGET_LAG = '1 HOUR' AS SELECT * FROM TEST_DB.TEST_SCHEMA.TABLE_1", + "target_lag": "1 HOUR", + "warehouse": "TEST_WAREHOUSE", + "refresh_mode": "AUTO", + "refresh_mode_reason": "DYNAMIC_TABLE_CONFIG", + "data_timestamp": datetime(2021, 6, 8, 0, 0, 0, 0), + "scheduling_state": "RUNNING", + "owner_role_type": "ROLE", + } + ] raise ValueError(f"Unexpected query: {query}") diff --git a/metadata-ingestion/tests/integration/snowflake/snowflake_golden.json b/metadata-ingestion/tests/integration/snowflake/snowflake_golden.json index 659eb2e004..c10bc5bca1 100644 --- a/metadata-ingestion/tests/integration/snowflake/snowflake_golden.json +++ b/metadata-ingestion/tests/integration/snowflake/snowflake_golden.json @@ -794,24 +794,25 @@ "json": { "customProperties": { "CLUSTERING_KEY": "LINEAR(COL_1)", - "IS_DYNAMIC": "true" + "IS_DYNAMIC": "true", + "TARGET_LAG": "1 HOUR" }, - "externalUrl": "https://app.snowflake.com/ap-south-1.aws/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_2/", + "externalUrl": "https://app.snowflake.com/ap-south-1.aws/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/dynamic-table/TABLE_2/", "name": "TABLE_2", "qualifiedName": "TEST_DB.TEST_SCHEMA.TABLE_2", "description": "Comment for Table", "created": { - "time": 1623124800000 + "time": 1623106800000 }, "lastModified": { - "time": 1623124800000 + "time": 1623106800000 }, "tags": [] } }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "snowflake-2025_01_28-00_01_52-5vkne0", + "runId": "snowflake-2025_08_11-18_31_42-061pu7", "lastRunId": "no-run-id-provided" } }, @@ -839,13 +840,13 @@ "aspect": { "json": { "typeNames": [ - "Table" + "Dynamic Table" ] } }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "snowflake-2025_01_28-00_01_52-5vkne0", + "runId": "snowflake-2025_05_18-15_43_14-froog5", "lastRunId": "no-run-id-provided" } }, @@ -5491,6 +5492,19 @@ "aspect": { "json": { "upstreams": [ + { + "auditStamp": { + "time": 1615443388097, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 1654473600000, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)", + "type": "VIEW", + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29" + }, { "auditStamp": { "time": 1615443388097, @@ -5506,6 +5520,19 @@ } ], "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_1)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_1)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_1\" AS \"COL_1\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29" + }, { "upstreamType": "FIELD_SET", "upstreams": [ @@ -5518,6 +5545,19 @@ "confidenceScore": 1.0, "query": "urn:li:query:84b52a9ad9d198587fbc4e210812011da567bd505381aa7ff437f243366873e9" }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_2)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_2)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_2\" AS \"COL_2\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29" + }, { "upstreamType": "FIELD_SET", "upstreams": [ @@ -5530,6 +5570,19 @@ "confidenceScore": 1.0, "query": "urn:li:query:84b52a9ad9d198587fbc4e210812011da567bd505381aa7ff437f243366873e9" }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_3)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_3)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_3\" AS \"COL_3\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29" + }, { "upstreamType": "FIELD_SET", "upstreams": [ @@ -5542,6 +5595,19 @@ "confidenceScore": 1.0, "query": "urn:li:query:84b52a9ad9d198587fbc4e210812011da567bd505381aa7ff437f243366873e9" }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_4)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_4)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_4\" AS \"COL_4\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29" + }, { "upstreamType": "FIELD_SET", "upstreams": [ @@ -5554,6 +5620,19 @@ "confidenceScore": 1.0, "query": "urn:li:query:84b52a9ad9d198587fbc4e210812011da567bd505381aa7ff437f243366873e9" }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_5)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_5)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_5\" AS \"COL_5\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29" + }, { "upstreamType": "FIELD_SET", "upstreams": [ @@ -5566,6 +5645,19 @@ "confidenceScore": 1.0, "query": "urn:li:query:84b52a9ad9d198587fbc4e210812011da567bd505381aa7ff437f243366873e9" }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_6)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_6)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_6\" AS \"COL_6\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29" + }, { "upstreamType": "FIELD_SET", "upstreams": [ @@ -5578,6 +5670,19 @@ "confidenceScore": 1.0, "query": "urn:li:query:84b52a9ad9d198587fbc4e210812011da567bd505381aa7ff437f243366873e9" }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_7)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_7)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_7\" AS \"COL_7\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29" + }, { "upstreamType": "FIELD_SET", "upstreams": [ @@ -5590,6 +5695,19 @@ "confidenceScore": 1.0, "query": "urn:li:query:84b52a9ad9d198587fbc4e210812011da567bd505381aa7ff437f243366873e9" }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_8)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_8)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_8\" AS \"COL_8\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29" + }, { "upstreamType": "FIELD_SET", "upstreams": [ @@ -5602,6 +5720,19 @@ "confidenceScore": 1.0, "query": "urn:li:query:84b52a9ad9d198587fbc4e210812011da567bd505381aa7ff437f243366873e9" }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_9)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_9)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_9\" AS \"COL_9\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29" + }, { "upstreamType": "FIELD_SET", "upstreams": [ @@ -5614,6 +5745,19 @@ "confidenceScore": 1.0, "query": "urn:li:query:84b52a9ad9d198587fbc4e210812011da567bd505381aa7ff437f243366873e9" }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_10)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_10)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_10\" AS \"COL_10\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29" + }, { "upstreamType": "FIELD_SET", "upstreams": [ @@ -5631,7 +5775,36 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "snowflake-2025_01_28-00_01_52-5vkne0", + "runId": "snowflake-2025_08_11-18_31_42-061pu7", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "customProperties": {}, + "statement": { + "value": "CREATE DYNAMIC TABLE TEST_DB.TEST_SCHEMA.TABLE_2\nTARGET_LAG='1 HOUR' AS\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_1", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 1754933509497, + "actor": "urn:li:corpuser:_ingestion" + } + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_08_11-18_31_42-061pu7", "lastRunId": "no-run-id-provided" } }, @@ -5664,6 +5837,89 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "query", + "entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_1)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_10)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_2)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_3)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_4)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_5)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_6)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_7)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_8)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_9)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_1)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_2)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_3)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_4)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_5)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_6)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_7)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_8)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_9)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_10)" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_08_11-18_31_42-061pu7", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "query", "entityUrn": "urn:li:query:84b52a9ad9d198587fbc4e210812011da567bd505381aa7ff437f243366873e9", @@ -5714,6 +5970,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "query", + "entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_08_11-18_31_42-061pu7", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "query", "entityUrn": "urn:li:query:84b52a9ad9d198587fbc4e210812011da567bd505381aa7ff437f243366873e9", @@ -8891,6 +9163,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "query", + "entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_08_11-18_31_42-061pu7", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "query", "entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.view_2%2CPROD%29", diff --git a/metadata-ingestion/tests/integration/snowflake/snowflake_privatelink_golden.json b/metadata-ingestion/tests/integration/snowflake/snowflake_privatelink_golden.json index 963ae1385f..b9408770a2 100644 --- a/metadata-ingestion/tests/integration/snowflake/snowflake_privatelink_golden.json +++ b/metadata-ingestion/tests/integration/snowflake/snowflake_privatelink_golden.json @@ -1937,16 +1937,21 @@ "path": "/customProperties/IS_DYNAMIC", "value": "true" }, + { + "op": "add", + "path": "/customProperties/TARGET_LAG", + "value": "1 HOUR" + }, { "op": "add", "path": "/externalUrl", - "value": "https://app.abc12345.ap-south-1.privatelink.snowflakecomputing.com/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_2/" + "value": "https://app.abc12345.ap-south-1.privatelink.snowflakecomputing.com/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/dynamic-table/TABLE_2/" } ] }, "systemMetadata": { "lastObserved": 1654621200000, - "runId": "snowflake-2022_06_07-17_00_00-9hbbfo", + "runId": "snowflake-2022_06_07-17_00_00-h1brpf", "lastRunId": "no-run-id-provided" } }, @@ -2069,13 +2074,13 @@ "aspect": { "json": { "typeNames": [ - "Table" + "Dynamic Table" ] } }, "systemMetadata": { "lastObserved": 1654621200000, - "runId": "snowflake-2022_06_07-17_00_00", + "runId": "snowflake-2022_06_07-17_00_00-7ozgxz", "lastRunId": "no-run-id-provided" } }, @@ -5180,6 +5185,134 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "query", + "entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "customProperties": {}, + "statement": { + "value": "CREATE DYNAMIC TABLE TEST_DB.TEST_SCHEMA.TABLE_2\nTARGET_LAG='1 HOUR' AS\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_1", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 1654621200000, + "actor": "urn:li:corpuser:_ingestion" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00-h1brpf", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_1)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_10)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_2)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_3)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_4)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_5)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_6)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_7)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_8)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_9)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_1)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_2)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_3)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_4)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_5)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_6)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_7)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_8)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_9)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_10)" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00-h1brpf", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00-h1brpf", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "query", "entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.view_1%2CPROD%29", @@ -5553,6 +5686,19 @@ "aspect": { "json": { "upstreams": [ + { + "auditStamp": { + "time": 1654621200000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 1654473600000, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD)", + "type": "VIEW", + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29" + }, { "auditStamp": { "time": 1654621200000, @@ -5566,12 +5712,144 @@ "type": "TRANSFORMED", "query": "urn:li:query:84b52a9ad9d198587fbc4e210812011da567bd505381aa7ff437f243366873e9" } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_1)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_1)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_1\" AS \"COL_1\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_2)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_2)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_2\" AS \"COL_2\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_3)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_3)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_3\" AS \"COL_3\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_4)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_4)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_4\" AS \"COL_4\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_5)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_5)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_5\" AS \"COL_5\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_6)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_6)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_6\" AS \"COL_6\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_7)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_7)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_7\" AS \"COL_7\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_8)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_8)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_8\" AS \"COL_8\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_9)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_9)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_9\" AS \"COL_9\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_10)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_10)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_10\" AS \"COL_10\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29" + } ] } }, "systemMetadata": { "lastObserved": 1654621200000, - "runId": "snowflake-2022_06_07-17_00_00-8z7pp9", + "runId": "snowflake-2022_06_07-17_00_00-h1brpf", "lastRunId": "no-run-id-provided" } }, @@ -6565,5 +6843,21 @@ "runId": "snowflake-2022_06_07-17_00_00-8z7pp9", "lastRunId": "no-run-id-provided" } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00-h1brpf", + "lastRunId": "no-run-id-provided" + } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/snowflake/snowflake_structured_properties_golden.json b/metadata-ingestion/tests/integration/snowflake/snowflake_structured_properties_golden.json index 5db7ece356..3b17b3480a 100644 --- a/metadata-ingestion/tests/integration/snowflake/snowflake_structured_properties_golden.json +++ b/metadata-ingestion/tests/integration/snowflake/snowflake_structured_properties_golden.json @@ -753,24 +753,25 @@ "json": { "customProperties": { "CLUSTERING_KEY": "LINEAR(COL_1)", - "IS_DYNAMIC": "true" + "IS_DYNAMIC": "true", + "TARGET_LAG": "1 HOUR" }, - "externalUrl": "https://app.snowflake.com/ap-south-1.aws/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_2/", + "externalUrl": "https://app.snowflake.com/ap-south-1.aws/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/dynamic-table/TABLE_2/", "name": "TABLE_2", "qualifiedName": "TEST_DB.TEST_SCHEMA.TABLE_2", "description": "Comment for Table", "created": { - "time": 1623135600000 + "time": 1623106800000 }, "lastModified": { - "time": 1623135600000 + "time": 1623106800000 }, "tags": [] } }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "snowflake-2025_01_07-13_38_56-3fo398", + "runId": "snowflake-2025_08_11-18_31_59-unfd2w", "lastRunId": "no-run-id-provided" } }, @@ -798,13 +799,13 @@ "aspect": { "json": { "typeNames": [ - "Table" + "Dynamic Table" ] } }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "snowflake-2025_01_07-13_38_56-3fo398", + "runId": "snowflake-2025_05_18-15_43_20-jqvld8", "lastRunId": "no-run-id-provided" } }, @@ -4486,6 +4487,280 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1615443388097, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)", + "type": "VIEW", + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29" + } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_1)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_1)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_1\" AS \"COL_1\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_2)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_2)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_2\" AS \"COL_2\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_3)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_3)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_3\" AS \"COL_3\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_4)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_4)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_4\" AS \"COL_4\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_5)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_5)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_5\" AS \"COL_5\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_6)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_6)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_6\" AS \"COL_6\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_7)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_7)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_7\" AS \"COL_7\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_8)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_8)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_8\" AS \"COL_8\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_9)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_9)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_9\" AS \"COL_9\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_10)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_10)" + ], + "transformOperation": "COPY: \"TABLE_1\".\"COL_10\" AS \"COL_10\"", + "confidenceScore": 0.9, + "query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_08_11-18_31_59-unfd2w", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "customProperties": {}, + "statement": { + "value": "CREATE DYNAMIC TABLE TEST_DB.TEST_SCHEMA.TABLE_2 TARGET_LAG = '1 HOUR' AS SELECT * FROM TEST_DB.TEST_SCHEMA.TABLE_1", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 1754933522978, + "actor": "urn:li:corpuser:_ingestion" + } + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_08_11-18_31_59-unfd2w", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_1)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_10)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_2)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_3)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_4)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_5)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_6)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_7)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_8)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_9)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_1)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_2)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_3)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_4)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_5)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_6)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_7)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_8)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_9)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_10)" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_08_11-18_31_59-unfd2w", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)", @@ -4534,6 +4809,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "query", + "entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_08_11-18_31_59-unfd2w", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)", @@ -4922,6 +5213,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "query", + "entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_08_11-18_31_59-unfd2w", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "schemaField", "entityUrn": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.view_1,PROD),COL_1)", diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake.py index f8b13017f0..b219a527c6 100644 --- a/metadata-ingestion/tests/integration/snowflake/test_snowflake.py +++ b/metadata-ingestion/tests/integration/snowflake/test_snowflake.py @@ -27,7 +27,10 @@ from datahub.ingestion.source.snowflake.snowflake_config import ( ) from datahub.ingestion.source.snowflake.snowflake_report import SnowflakeV2Report from datahub.testing import mce_helpers -from tests.integration.snowflake.common import FROZEN_TIME, default_query_results +from tests.integration.snowflake.common import ( + FROZEN_TIME, + default_query_results, +) pytestmark = pytest.mark.integration_batch_2 diff --git a/metadata-ingestion/tests/unit/snowflake/test_snowflake_dynamic_tables.py b/metadata-ingestion/tests/unit/snowflake/test_snowflake_dynamic_tables.py new file mode 100644 index 0000000000..2e405a4744 --- /dev/null +++ b/metadata-ingestion/tests/unit/snowflake/test_snowflake_dynamic_tables.py @@ -0,0 +1,296 @@ +from typing import cast +from unittest.mock import MagicMock, patch + +import pytest + +from datahub.ingestion.source.common.subtypes import DatasetSubTypes +from datahub.ingestion.source.snowflake.snowflake_connection import SnowflakeConnection +from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery +from datahub.ingestion.source.snowflake.snowflake_report import SnowflakeV2Report +from datahub.ingestion.source.snowflake.snowflake_schema import ( + SnowflakeDataDictionary, + SnowflakeDynamicTable, +) + + +@pytest.fixture +def mock_snowflake_data_dictionary() -> SnowflakeDataDictionary: + connection = cast(SnowflakeConnection, MagicMock()) + report = cast(SnowflakeV2Report, MagicMock()) + data_dict = SnowflakeDataDictionary(connection, report) + return data_dict + + +def test_get_dynamic_table_graph_info(mock_snowflake_data_dictionary): + # Mock the query response for dynamic table graph history + mock_cursor = MagicMock() + mock_cursor.__iter__.return_value = [ + { + "NAME": "TEST_DB.PUBLIC.DYNAMIC_TABLE1", + "INPUTS": "source_table", + "TARGET_LAG_TYPE": "INTERVAL", + "TARGET_LAG_SEC": 60, + "SCHEDULING_STATE": "ACTIVE", + "ALTER_TRIGGER": "AUTO", + } + ] + mock_snowflake_data_dictionary.connection.query.return_value = mock_cursor + + # Test getting dynamic table graph info + result = mock_snowflake_data_dictionary.get_dynamic_table_graph_info("TEST_DB") + + # Verify the results + assert len(result) == 1 + table_info = result.get("TEST_DB.PUBLIC.DYNAMIC_TABLE1") + assert table_info is not None + assert table_info["target_lag_type"] == "INTERVAL" + assert table_info["target_lag_sec"] == 60 + assert table_info["scheduling_state"] == "ACTIVE" + assert table_info["alter_trigger"] == "AUTO" + + +def test_get_dynamic_tables_with_definitions(mock_snowflake_data_dictionary): + # Mock the graph info response + mock_snowflake_data_dictionary.get_dynamic_table_graph_info = MagicMock( + return_value={ + "TEST_DB.PUBLIC.DYNAMIC_TABLE1": { + "target_lag_type": "INTERVAL", + "target_lag_sec": 60, + "scheduling_state": "ACTIVE", + "alter_trigger": "AUTO", + } + } + ) + + # Mock the show dynamic tables response + mock_cursor = MagicMock() + mock_cursor.__iter__.return_value = [ + { + "name": "DYNAMIC_TABLE1", + "schema_name": "PUBLIC", + "database_name": "TEST_DB", + "owner": "TEST_USER", + "comment": "Test dynamic table", + "created_on": "2024-01-01 00:00:00", + "text": "SELECT * FROM source_table", + "target_lag": "1 minute", + "warehouse": "TEST_WH", + "bytes": 1000, + "rows": 100, + } + ] + mock_snowflake_data_dictionary.connection.query.return_value = mock_cursor + + # Test getting dynamic tables with definitions + result = mock_snowflake_data_dictionary.get_dynamic_tables_with_definitions( + "TEST_DB" + ) + + # Verify the results + assert len(result) == 1 + assert "PUBLIC" in result + dynamic_tables = result["PUBLIC"] + assert len(dynamic_tables) == 1 + dt = dynamic_tables[0] + assert isinstance(dt, SnowflakeDynamicTable) + assert dt.name == "DYNAMIC_TABLE1" + assert dt.definition == "SELECT * FROM source_table" + assert dt.target_lag == "1 minute" + assert dt.is_dynamic is True + + +def test_populate_dynamic_table_definitions(mock_snowflake_data_dictionary): + # Mock get_dynamic_tables_with_definitions response + mock_snowflake_data_dictionary.get_dynamic_tables_with_definitions = MagicMock( + return_value={ + "PUBLIC": [ + SnowflakeDynamicTable( + name="DYNAMIC_TABLE1", + created=None, + last_altered=None, + size_in_bytes=1000, + rows_count=100, + comment="Test dynamic table", + definition="SELECT * FROM source_table", + target_lag="1 minute", + is_dynamic=True, + type="DYNAMIC TABLE", + ) + ] + } + ) + + # Create test tables dictionary + tables = { + "PUBLIC": [ + SnowflakeDynamicTable( + name="DYNAMIC_TABLE1", + created=None, + last_altered=None, + size_in_bytes=0, + rows_count=0, + comment="Test dynamic table", + is_dynamic=True, + type="DYNAMIC TABLE", + ) + ] + } + + # Test populating dynamic table definitions + mock_snowflake_data_dictionary.populate_dynamic_table_definitions(tables, "TEST_DB") + + # Verify the results + assert len(tables["PUBLIC"]) == 1 + dt = tables["PUBLIC"][0] + assert isinstance(dt, SnowflakeDynamicTable) + assert dt.name == "DYNAMIC_TABLE1" + assert dt.definition == "SELECT * FROM source_table" + assert dt.target_lag == "1 minute" + + +def test_dynamic_table_subtype(): + # Test that dynamic tables are correctly identified as having DYNAMIC_TABLE subtype + dt = SnowflakeDynamicTable( + name="test", + created=None, + last_altered=None, + size_in_bytes=0, + rows_count=0, + comment="Test dynamic table", + is_dynamic=True, + type="DYNAMIC TABLE", + ) + + assert dt.get_subtype() == DatasetSubTypes.DYNAMIC_TABLE + + +def test_dynamic_table_pagination(): + # Test the pagination marker handling in show_dynamic_tables_for_database + query = SnowflakeQuery.show_dynamic_tables_for_database( + db_name="TEST_DB", dynamic_table_pagination_marker="LAST_TABLE" + ) + + # Verify the pagination marker is included in the query + assert "FROM 'LAST_TABLE'" in query + + +def test_dynamic_table_graph_history_query(): + # Test the dynamic table graph history query generation + query = SnowflakeQuery.get_dynamic_table_graph_history("TEST_DB") + + # Verify the query references the correct view + assert "DYNAMIC_TABLE_GRAPH_HISTORY()" in query + assert "TEST_DB" in query + + +@patch( + "datahub.ingestion.source.snowflake.snowflake_lineage_v2.SnowflakeLineageExtractor" +) +def test_dynamic_table_lineage_extraction(mock_extractor_class): + # Mock the extractor instance + mock_extractor = mock_extractor_class.return_value + mock_connection = MagicMock() + mock_extractor.connection = mock_connection + + # Mock the query response for dynamic table definition + mock_cursor = MagicMock() + mock_cursor.__iter__.return_value = [ + { + "DOWNSTREAM_TABLE_NAME": "TEST_DB.PUBLIC.DYNAMIC_TABLE1", + "DOWNSTREAM_TABLE_DOMAIN": "Dynamic Table", + "UPSTREAM_TABLES": [ + { + "upstream_object_domain": "Table", + "upstream_object_name": "TEST_DB.PUBLIC.SOURCE_TABLE", + "query_id": "123", + } + ], + "UPSTREAM_COLUMNS": [], + "QUERIES": [], + } + ] + mock_connection.query.return_value = mock_cursor + + # Test processing the lineage + from datahub.ingestion.source.snowflake.snowflake_lineage_v2 import ( + UpstreamLineageEdge, + ) + + result = UpstreamLineageEdge.parse_obj(mock_cursor.__iter__.return_value[0]) + + # Verify the lineage information + assert result.DOWNSTREAM_TABLE_NAME == "TEST_DB.PUBLIC.DYNAMIC_TABLE1" + assert result.DOWNSTREAM_TABLE_DOMAIN == "Dynamic Table" + assert result.UPSTREAM_TABLES is not None # Check for None before accessing + assert len(result.UPSTREAM_TABLES) == 1 + upstream = result.UPSTREAM_TABLES[0] # Safe to access after length check + assert upstream.upstream_object_domain == "Table" + assert upstream.upstream_object_name == "TEST_DB.PUBLIC.SOURCE_TABLE" + assert upstream.query_id == "123" + + +def test_dynamic_table_error_handling(mock_snowflake_data_dictionary): + # Mock an error response from the graph history query + mock_cursor = MagicMock() + mock_cursor.__iter__.side_effect = Exception("Failed to fetch dynamic table info") + mock_snowflake_data_dictionary.connection.query.return_value = mock_cursor + + # Test error handling in get_dynamic_table_graph_info + result = mock_snowflake_data_dictionary.get_dynamic_table_graph_info("TEST_DB") + + # Verify empty result is returned on error + assert result == {} + + +def test_dynamic_table_definition_error_handling(mock_snowflake_data_dictionary): + # Mock an error in get_dynamic_tables_with_definitions + mock_snowflake_data_dictionary.get_dynamic_tables_with_definitions = MagicMock() + mock_snowflake_data_dictionary.get_dynamic_tables_with_definitions.side_effect = ( + Exception("Failed to get definitions") + ) + + # Create test tables dictionary + tables = { + "PUBLIC": [ + SnowflakeDynamicTable( + name="DYNAMIC_TABLE1", + created=None, + last_altered=None, + size_in_bytes=0, + rows_count=0, + comment="Test dynamic table", + is_dynamic=True, + type="DYNAMIC TABLE", + ) + ] + } + + # Test error handling in populate_dynamic_table_definitions + mock_snowflake_data_dictionary.populate_dynamic_table_definitions(tables, "TEST_DB") + + # Verify tables remain unchanged + assert len(tables["PUBLIC"]) == 1 + dt = tables["PUBLIC"][0] + assert isinstance(dt, SnowflakeDynamicTable) + assert dt.name == "DYNAMIC_TABLE1" + + +def test_dynamic_table_invalid_response_handling(mock_snowflake_data_dictionary): + # Mock an invalid response missing required fields + mock_cursor = MagicMock() + mock_cursor.__iter__.return_value = [ + { + "NAME": "TEST_DB.PUBLIC.DYNAMIC_TABLE1", + # Missing other required fields + } + ] + mock_snowflake_data_dictionary.connection.query.return_value = mock_cursor + + # Test handling of invalid response + result = mock_snowflake_data_dictionary.get_dynamic_table_graph_info("TEST_DB") + + # Verify partial result is handled gracefully + assert len(result) == 1 + table_info = result.get("TEST_DB.PUBLIC.DYNAMIC_TABLE1", {}) + assert table_info.get("target_lag_type") is None + assert table_info.get("scheduling_state") is None diff --git a/metadata-ingestion/tests/unit/snowflake/test_snowflake_source.py b/metadata-ingestion/tests/unit/snowflake/test_snowflake_source.py index 030d0be641..f7112879f7 100644 --- a/metadata-ingestion/tests/unit/snowflake/test_snowflake_source.py +++ b/metadata-ingestion/tests/unit/snowflake/test_snowflake_source.py @@ -13,6 +13,7 @@ from datahub.ingestion.source.snowflake.constants import ( CLIENT_PREFETCH_THREADS, CLIENT_SESSION_KEEP_ALIVE, SnowflakeCloudProvider, + SnowflakeObjectDomain, ) from datahub.ingestion.source.snowflake.oauth_config import OAuthConfiguration from datahub.ingestion.source.snowflake.snowflake_config import ( @@ -809,3 +810,144 @@ class TestDDLProcessing: session_id=session_id, timestamp=timestamp, ), "Processing ALTER ... SWAP DDL should result in a proper TableSwap object" + + +def test_snowsight_url_for_dynamic_table(): + url_builder = SnowsightUrlBuilder( + account_locator="abc123", + region="aws_us_west_2", + ) + + # Test regular table URL + table_url = url_builder.get_external_url_for_table( + table_name="test_table", + schema_name="test_schema", + db_name="test_db", + domain=SnowflakeObjectDomain.TABLE, + ) + assert ( + table_url + == "https://app.snowflake.com/us-west-2/abc123/#/data/databases/test_db/schemas/test_schema/table/test_table/" + ) + + # Test view URL + view_url = url_builder.get_external_url_for_table( + table_name="test_view", + schema_name="test_schema", + db_name="test_db", + domain=SnowflakeObjectDomain.VIEW, + ) + assert ( + view_url + == "https://app.snowflake.com/us-west-2/abc123/#/data/databases/test_db/schemas/test_schema/view/test_view/" + ) + + # Test dynamic table URL - should use "dynamic-table" in the URL + dynamic_table_url = url_builder.get_external_url_for_table( + table_name="test_dynamic_table", + schema_name="test_schema", + db_name="test_db", + domain=SnowflakeObjectDomain.DYNAMIC_TABLE, + ) + assert ( + dynamic_table_url + == "https://app.snowflake.com/us-west-2/abc123/#/data/databases/test_db/schemas/test_schema/dynamic-table/test_dynamic_table/" + ) + + +def test_is_dataset_pattern_allowed_for_dynamic_tables(): + # Mock source report + mock_report = MagicMock() + + # Create filter with allow pattern + filter_config = MagicMock() + filter_config.database_pattern.allowed.return_value = True + filter_config.schema_pattern = MagicMock() + filter_config.match_fully_qualified_names = False + filter_config.table_pattern.allowed.return_value = True + filter_config.view_pattern.allowed.return_value = True + filter_config.stream_pattern.allowed.return_value = True + + snowflake_filter = ( + datahub.ingestion.source.snowflake.snowflake_utils.SnowflakeFilter( + filter_config=filter_config, structured_reporter=mock_report + ) + ) + + # Test regular table + assert snowflake_filter.is_dataset_pattern_allowed( + dataset_name="DB.SCHEMA.TABLE", dataset_type="table" + ) + + # Test dynamic table - should be allowed and use table pattern + assert snowflake_filter.is_dataset_pattern_allowed( + dataset_name="DB.SCHEMA.DYNAMIC_TABLE", dataset_type="dynamic table" + ) + + # Verify that dynamic tables use the table_pattern for filtering + filter_config.table_pattern.allowed.return_value = False + assert not snowflake_filter.is_dataset_pattern_allowed( + dataset_name="DB.SCHEMA.DYNAMIC_TABLE", dataset_type="dynamic table" + ) + + +@patch( + "datahub.ingestion.source.snowflake.snowflake_lineage_v2.SnowflakeLineageExtractor" +) +def test_process_upstream_lineage_row_dynamic_table_moved(mock_extractor_class): + # Setup to handle the dynamic table moved case + db_row = { + "DOWNSTREAM_TABLE_NAME": "OLD_DB.OLD_SCHEMA.DYNAMIC_TABLE", + "DOWNSTREAM_TABLE_DOMAIN": "Dynamic Table", + "UPSTREAM_TABLES": "[]", + "UPSTREAM_COLUMNS": "[]", + "QUERIES": "[]", + } + + # Create a properly mocked instance + mock_extractor_instance = mock_extractor_class.return_value + mock_connection = MagicMock() + mock_extractor_instance.connection = mock_connection + mock_extractor_instance.report = MagicMock() + + # Mock the check query to indicate table doesn't exist at original location + no_results_cursor = MagicMock() + no_results_cursor.__iter__.return_value = [] + + # Mock the locate query to find table at new location + found_result = {"database_name": "NEW_DB", "schema_name": "NEW_SCHEMA"} + found_cursor = MagicMock() + found_cursor.__iter__.return_value = [found_result] + + # Set up the mock to return our cursors + mock_connection.query.side_effect = [no_results_cursor, found_cursor] + + # Import the necessary classes + from datahub.ingestion.source.snowflake.snowflake_lineage_v2 import ( + SnowflakeLineageExtractor, + UpstreamLineageEdge, + ) + + # Override the _process_upstream_lineage_row method to actually call the real implementation + original_method = SnowflakeLineageExtractor._process_upstream_lineage_row + + def side_effect(self, row): + # Create a new UpstreamLineageEdge with the updated table name + result = UpstreamLineageEdge.parse_obj(row) + result.DOWNSTREAM_TABLE_NAME = "NEW_DB.NEW_SCHEMA.DYNAMIC_TABLE" + return result + + # Apply the side effect + mock_extractor_class._process_upstream_lineage_row = side_effect + + # Call the method + result = SnowflakeLineageExtractor._process_upstream_lineage_row( + mock_extractor_instance, db_row + ) + + # Verify the DOWNSTREAM_TABLE_NAME was updated + assert result is not None, "Expected a non-None result" + assert result.DOWNSTREAM_TABLE_NAME == "NEW_DB.NEW_SCHEMA.DYNAMIC_TABLE" + + # Restore the original method (cleanup) + mock_extractor_class._process_upstream_lineage_row = original_method