feat(ingestion/snowflake): formal support of dynamic tables (#13542)

This commit is contained in:
Jonny Dixon 2025-08-11 19:17:27 +01:00 committed by GitHub
parent fc1ebecf87
commit c12e4b27e7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 1651 additions and 46 deletions

View File

@ -11,12 +11,12 @@ create or replace role datahub_role;
// Grant access to a warehouse to run queries to view metadata
grant operate, usage on warehouse "<your-warehouse>" to role datahub_role;
// Grant access to view database and schema in which your tables/views exist
// Grant access to view database and schema in which your tables/views/dynamic tables exist
grant usage on DATABASE "<your-database>" to role datahub_role;
grant usage on all schemas in database "<your-database>" to role datahub_role;
grant usage on future schemas in database "<your-database>" to role datahub_role;
grant select on all streams in database "<your-database>> to role datahub_role;
grant select on future streams in database "<your-database>> to role datahub_role;
grant select on all streams in database "<your-database>" to role datahub_role;
grant select on future streams in database "<your-database>" to role datahub_role;
// If you are NOT using Snowflake Profiling or Classification feature: Grant references privileges to your tables and views
grant references on all tables in database "<your-database>" to role datahub_role;
@ -25,6 +25,8 @@ grant references on all external tables in database "<your-database>" to role da
grant references on future external tables in database "<your-database>" to role datahub_role;
grant references on all views in database "<your-database>" to role datahub_role;
grant references on future views in database "<your-database>" to role datahub_role;
// Grant monitor privileges for dynamic tables (Enterprise Edition feature)
grant monitor on all dynamic tables in database "<your-database>" to role datahub_role;
grant monitor on future dynamic tables in database "<your-database>" to role datahub_role;
@ -53,7 +55,7 @@ The details of each granted privilege can be viewed in [snowflake docs](https://
this permission is not required.
- `usage` is required for us to run queries using the warehouse
- `usage` on `database` and `schema` are required because without it tables, views, and streams inside them are not accessible. If an admin does the required grants on `table` but misses the grants on `schema` or the `database` in which the table/view/stream exists then we will not be able to get metadata for the table/view/stream.
- If metadata is required only on some schemas then you can grant the usage privilieges only on a particular schema like
- If metadata is required only on some schemas then you can grant the usage privileges only on a particular schema like
```sql
grant usage on schema "<your-database>"."<your-schema>" to role datahub_role;
@ -156,6 +158,7 @@ If you are using [Snowflake Shares](https://docs.snowflake.com/en/user-guide/dat
### Caveats
- Some of the features are only available in the Snowflake Enterprise Edition. This doc has notes mentioning where this applies.
- Some of the features are only available in the Snowflake Enterprise Edition. This includes dynamic tables, advanced lineage features, and tags. This doc has notes mentioning where this applies.
- Dynamic tables require the `monitor` privilege for metadata extraction. Without this privilege, dynamic tables will not be visible to DataHub.
- The underlying Snowflake views that we use to get metadata have a [latency of 45 minutes to 3 hours](https://docs.snowflake.com/en/sql-reference/account-usage.html#differences-between-account-usage-and-information-schema). So we would not be able to get very recent metadata in some cases like queries you ran within that time period etc. This is applicable particularly for lineage, usage and tags (without lineage) extraction.
- If there is any [incident going on for Snowflake](https://status.snowflake.com/) we will not be able to get the metadata until that incident is resolved.

View File

@ -30,6 +30,7 @@ class DatasetSubTypes(StrEnum):
NEO4J_NODE = "Neo4j Node"
NEO4J_RELATIONSHIP = "Neo4j Relationship"
SNOWFLAKE_STREAM = "Snowflake Stream"
DYNAMIC_TABLE = "Dynamic Table"
API_ENDPOINT = "API Endpoint"
SLACK_CHANNEL = "Slack Channel"
PROJECTIONS = "Projections"

View File

@ -55,6 +55,7 @@ class SnowflakeObjectDomain(StrEnum):
ICEBERG_TABLE = "iceberg table"
STREAM = "stream"
PROCEDURE = "procedure"
DYNAMIC_TABLE = "dynamic table"
GENERIC_PERMISSION_ERROR_KEY = "permission-error"

View File

@ -8,7 +8,7 @@ from datahub.ingestion.source.snowflake.snowflake_config import (
)
from datahub.utilities.prefix_batch_builder import PrefixGroup
SHOW_VIEWS_MAX_PAGE_SIZE = 10000
SHOW_COMMAND_MAX_PAGE_SIZE = 10000
SHOW_STREAM_MAX_PAGE_SIZE = 10000
@ -38,12 +38,23 @@ class SnowflakeQuery:
SnowflakeObjectDomain.MATERIALIZED_VIEW.capitalize(),
SnowflakeObjectDomain.ICEBERG_TABLE.capitalize(),
SnowflakeObjectDomain.STREAM.capitalize(),
SnowflakeObjectDomain.DYNAMIC_TABLE.capitalize(),
}
ACCESS_HISTORY_TABLE_VIEW_DOMAINS_FILTER = "({})".format(
",".join(f"'{domain}'" for domain in ACCESS_HISTORY_TABLE_VIEW_DOMAINS)
)
# Domains that can be downstream tables in lineage
DOWNSTREAM_TABLE_DOMAINS = {
SnowflakeObjectDomain.TABLE.capitalize(),
SnowflakeObjectDomain.DYNAMIC_TABLE.capitalize(),
}
DOWNSTREAM_TABLE_DOMAINS_FILTER = "({})".format(
",".join(f"'{domain}'" for domain in DOWNSTREAM_TABLE_DOMAINS)
)
@staticmethod
def current_account() -> str:
return "select CURRENT_ACCOUNT()"
@ -235,7 +246,7 @@ class SnowflakeQuery:
@staticmethod
def show_views_for_database(
db_name: str,
limit: int = SHOW_VIEWS_MAX_PAGE_SIZE,
limit: int = SHOW_COMMAND_MAX_PAGE_SIZE,
view_pagination_marker: Optional[str] = None,
) -> str:
# While there is an information_schema.views view, that only shows the view definition if the role
@ -244,7 +255,7 @@ class SnowflakeQuery:
# SHOW VIEWS can return a maximum of 10000 rows.
# https://docs.snowflake.com/en/sql-reference/sql/show-views#usage-notes
assert limit <= SHOW_VIEWS_MAX_PAGE_SIZE
assert limit <= SHOW_COMMAND_MAX_PAGE_SIZE
# To work around this, we paginate through the results using the FROM clause.
from_clause = (
@ -686,7 +697,7 @@ WHERE table_schema='{schema_name}' AND {extra_clause}"""
AND t.query_start_time >= to_timestamp_ltz({start_time_millis}, 3)
AND t.query_start_time < to_timestamp_ltz({end_time_millis}, 3)
AND upstream_table_domain in {allowed_upstream_table_domains}
AND downstream_table_domain = '{SnowflakeObjectDomain.TABLE.capitalize()}'
AND downstream_table_domain in {SnowflakeQuery.DOWNSTREAM_TABLE_DOMAINS_FILTER}
{("AND " + upstream_sql_filter) if upstream_sql_filter else ""}
),
column_upstream_jobs AS (
@ -843,7 +854,7 @@ WHERE table_schema='{schema_name}' AND {extra_clause}"""
AND t.query_start_time >= to_timestamp_ltz({start_time_millis}, 3)
AND t.query_start_time < to_timestamp_ltz({end_time_millis}, 3)
AND upstream_table_domain in {allowed_upstream_table_domains}
AND downstream_table_domain = '{SnowflakeObjectDomain.TABLE.capitalize()}'
AND downstream_table_domain in {SnowflakeQuery.DOWNSTREAM_TABLE_DOMAINS_FILTER}
{("AND " + upstream_sql_filter) if upstream_sql_filter else ""}
),
table_upstream_jobs_unique AS (
@ -940,3 +951,37 @@ WHERE table_schema='{schema_name}' AND {extra_clause}"""
f"""FROM '{stream_pagination_marker}'""" if stream_pagination_marker else ""
)
return f"""SHOW STREAMS IN DATABASE "{db_name}" LIMIT {limit} {from_clause};"""
@staticmethod
def show_dynamic_tables_for_database(
db_name: str,
limit: int = SHOW_COMMAND_MAX_PAGE_SIZE,
dynamic_table_pagination_marker: Optional[str] = None,
) -> str:
"""Get dynamic table definitions using SHOW DYNAMIC TABLES."""
assert limit <= SHOW_COMMAND_MAX_PAGE_SIZE
from_clause = (
f"""FROM '{dynamic_table_pagination_marker}'"""
if dynamic_table_pagination_marker
else ""
)
return f"""\
SHOW DYNAMIC TABLES IN DATABASE "{db_name}"
LIMIT {limit} {from_clause};
"""
@staticmethod
def get_dynamic_table_graph_history(db_name: str) -> str:
"""Get dynamic table dependency information from information schema."""
return f"""
SELECT
name,
inputs,
target_lag_type,
target_lag_sec,
scheduling_state,
alter_trigger
FROM TABLE("{db_name}".INFORMATION_SCHEMA.DYNAMIC_TABLE_GRAPH_HISTORY())
ORDER BY name
"""

View File

@ -3,16 +3,17 @@ import os
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime
from typing import Callable, Dict, Iterable, List, MutableMapping, Optional
from typing import Any, Callable, Dict, Iterable, List, MutableMapping, Optional
from datahub.ingestion.api.report import SupportsAsObj
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain
from datahub.ingestion.source.snowflake.snowflake_connection import SnowflakeConnection
from datahub.ingestion.source.snowflake.snowflake_query import (
SHOW_VIEWS_MAX_PAGE_SIZE,
SHOW_COMMAND_MAX_PAGE_SIZE,
SnowflakeQuery,
)
from datahub.ingestion.source.snowflake.snowflake_report import SnowflakeV2Report
from datahub.ingestion.source.sql.sql_generic import BaseColumn, BaseTable, BaseView
from datahub.ingestion.source.sql.stored_procedures.base import BaseProcedure
from datahub.utilities.file_backed_collections import FileBackedDict
@ -103,6 +104,17 @@ class SnowflakeTable(BaseTable):
return DatasetSubTypes.TABLE
@dataclass
class SnowflakeDynamicTable(SnowflakeTable):
definition: Optional[str] = (
None # SQL query that defines the dynamic table's content
)
target_lag: Optional[str] = None # Refresh frequency (e.g., "1 HOUR", "30 MINUTES")
def get_subtype(self) -> DatasetSubTypes:
return DatasetSubTypes.DYNAMIC_TABLE
@dataclass
class SnowflakeView(BaseView):
materialized: bool = False
@ -226,8 +238,11 @@ class _SnowflakeTagCache:
class SnowflakeDataDictionary(SupportsAsObj):
def __init__(self, connection: SnowflakeConnection) -> None:
def __init__(
self, connection: SnowflakeConnection, report: SnowflakeV2Report
) -> None:
self.connection = connection
self.report = report
def as_obj(self) -> Dict[str, Dict[str, int]]:
# TODO: Move this into a proper report type that gets computed.
@ -355,8 +370,11 @@ class SnowflakeDataDictionary(SupportsAsObj):
if table["TABLE_SCHEMA"] not in tables:
tables[table["TABLE_SCHEMA"]] = []
is_dynamic = table.get("IS_DYNAMIC", "NO").upper() == "YES"
table_cls = SnowflakeDynamicTable if is_dynamic else SnowflakeTable
tables[table["TABLE_SCHEMA"]].append(
SnowflakeTable(
table_cls(
name=table["TABLE_NAME"],
type=table["TABLE_TYPE"],
created=table["CREATED"],
@ -365,11 +383,15 @@ class SnowflakeDataDictionary(SupportsAsObj):
rows_count=table["ROW_COUNT"],
comment=table["COMMENT"],
clustering_key=table["CLUSTERING_KEY"],
is_dynamic=table.get("IS_DYNAMIC", "NO").upper() == "YES",
is_dynamic=is_dynamic,
is_iceberg=table.get("IS_ICEBERG", "NO").upper() == "YES",
is_hybrid=table.get("IS_HYBRID", "NO").upper() == "YES",
)
)
# Populate dynamic table definitions
self.populate_dynamic_table_definitions(tables, db_name)
return tables
def get_tables_for_schema(
@ -382,8 +404,11 @@ class SnowflakeDataDictionary(SupportsAsObj):
)
for table in cur:
is_dynamic = table.get("IS_DYNAMIC", "NO").upper() == "YES"
table_cls = SnowflakeDynamicTable if is_dynamic else SnowflakeTable
tables.append(
SnowflakeTable(
table_cls(
name=table["TABLE_NAME"],
type=table["TABLE_TYPE"],
created=table["CREATED"],
@ -392,16 +417,21 @@ class SnowflakeDataDictionary(SupportsAsObj):
rows_count=table["ROW_COUNT"],
comment=table["COMMENT"],
clustering_key=table["CLUSTERING_KEY"],
is_dynamic=table.get("IS_DYNAMIC", "NO").upper() == "YES",
is_dynamic=is_dynamic,
is_iceberg=table.get("IS_ICEBERG", "NO").upper() == "YES",
is_hybrid=table.get("IS_HYBRID", "NO").upper() == "YES",
)
)
# Populate dynamic table definitions for just this schema
schema_tables = {schema_name: tables}
self.populate_dynamic_table_definitions(schema_tables, db_name)
return tables
@serialized_lru_cache(maxsize=1)
def get_views_for_database(self, db_name: str) -> Dict[str, List[SnowflakeView]]:
page_limit = SHOW_VIEWS_MAX_PAGE_SIZE
page_limit = SHOW_COMMAND_MAX_PAGE_SIZE
views: Dict[str, List[SnowflakeView]] = {}
@ -660,7 +690,7 @@ class SnowflakeDataDictionary(SupportsAsObj):
def get_streams_for_database(
self, db_name: str
) -> Dict[str, List[SnowflakeStream]]:
page_limit = SHOW_VIEWS_MAX_PAGE_SIZE
page_limit = SHOW_COMMAND_MAX_PAGE_SIZE
streams: Dict[str, List[SnowflakeStream]] = {}
@ -743,3 +773,137 @@ class SnowflakeDataDictionary(SupportsAsObj):
)
)
return procedures
@serialized_lru_cache(maxsize=1)
def get_dynamic_table_graph_info(self, db_name: str) -> Dict[str, Dict[str, Any]]:
"""Get dynamic table dependency information from information schema."""
dt_graph_info: Dict[str, Dict[str, Any]] = {}
try:
cur = self.connection.query(
SnowflakeQuery.get_dynamic_table_graph_history(db_name)
)
for row in cur:
dt_name = row["NAME"]
dt_graph_info[dt_name] = {
"inputs": row.get("INPUTS"),
"target_lag_type": row.get("TARGET_LAG_TYPE"),
"target_lag_sec": row.get("TARGET_LAG_SEC"),
"scheduling_state": row.get("SCHEDULING_STATE"),
"alter_trigger": row.get("ALTER_TRIGGER"),
}
logger.debug(
f"Successfully retrieved graph info for {len(dt_graph_info)} dynamic tables in {db_name}"
)
except Exception as e:
self.report.warning(
"Failed to get dynamic table graph history",
db_name,
exc=e,
)
return dt_graph_info
@serialized_lru_cache(maxsize=1)
def get_dynamic_tables_with_definitions(
self, db_name: str
) -> Dict[str, List[SnowflakeDynamicTable]]:
"""Get dynamic tables with their definitions using SHOW DYNAMIC TABLES."""
page_limit = SHOW_COMMAND_MAX_PAGE_SIZE
dynamic_tables: Dict[str, List[SnowflakeDynamicTable]] = {}
# Get graph/dependency information (pass db_name)
dt_graph_info = self.get_dynamic_table_graph_info(db_name)
first_iteration = True
dt_pagination_marker: Optional[str] = None
while first_iteration or dt_pagination_marker is not None:
try:
cur = self.connection.query(
SnowflakeQuery.show_dynamic_tables_for_database(
db_name,
limit=page_limit,
dynamic_table_pagination_marker=dt_pagination_marker,
)
)
first_iteration = False
dt_pagination_marker = None
result_set_size = 0
for dt in cur:
result_set_size += 1
dt_name = dt["name"]
schema_name = dt["schema_name"]
if schema_name not in dynamic_tables:
dynamic_tables[schema_name] = []
# Get definition from SHOW result
definition = dt.get("text")
# Get target lag from SHOW result or graph info
target_lag = dt.get("target_lag")
if not target_lag and dt_graph_info:
qualified_name = f"{db_name}.{schema_name}.{dt_name}"
graph_info = dt_graph_info.get(qualified_name, {})
if graph_info.get("target_lag_type") and graph_info.get(
"target_lag_sec"
):
target_lag = f"{graph_info['target_lag_sec']} {graph_info['target_lag_type']}"
dynamic_tables[schema_name].append(
SnowflakeDynamicTable(
name=dt_name,
created=dt["created_on"],
last_altered=dt.get("created_on"),
size_in_bytes=dt.get("bytes", 0),
rows_count=dt.get("rows", 0),
comment=dt.get("comment"),
definition=definition,
target_lag=target_lag,
is_dynamic=True,
type="DYNAMIC TABLE",
)
)
if result_set_size >= page_limit:
logger.info(
f"Fetching next page of dynamic tables for {db_name} - after {dt_name}"
)
dt_pagination_marker = dt_name
except Exception as e:
logger.debug(
f"Failed to get dynamic tables for database {db_name}: {e}"
)
break
return dynamic_tables
def populate_dynamic_table_definitions(
self, tables: Dict[str, List[SnowflakeTable]], db_name: str
) -> None:
"""Populate dynamic table definitions for tables that are marked as dynamic."""
try:
# Get dynamic tables with definitions from SHOW command
dt_with_definitions = self.get_dynamic_tables_with_definitions(db_name)
for schema_name, table_list in tables.items():
for table in table_list:
if (
isinstance(table, SnowflakeDynamicTable)
and table.definition is None
):
# Find matching dynamic table from SHOW results
show_dt_list = dt_with_definitions.get(schema_name, [])
for show_dt in show_dt_list:
if show_dt.name == table.name:
table.definition = show_dt.definition
table.target_lag = show_dt.target_lag
break
except Exception as e:
logger.debug(
f"Failed to populate dynamic table definitions for {db_name}: {e}"
)

View File

@ -45,6 +45,7 @@ from datahub.ingestion.source.snowflake.snowflake_schema import (
SnowflakeColumn,
SnowflakeDatabase,
SnowflakeDataDictionary,
SnowflakeDynamicTable,
SnowflakeFK,
SnowflakePK,
SnowflakeSchema,
@ -182,7 +183,7 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin):
self.identifiers: SnowflakeIdentifierBuilder = identifiers
self.data_dictionary: SnowflakeDataDictionary = SnowflakeDataDictionary(
connection=self.connection
connection=self.connection, report=self.report
)
self.report.data_dictionary_cache = self.data_dictionary
@ -495,6 +496,22 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin):
if self.config.include_technical_schema:
data_reader = self.make_data_reader()
for table in tables:
# Handle dynamic table definitions for lineage
if (
isinstance(table, SnowflakeDynamicTable)
and table.definition
and self.aggregator
):
table_identifier = self.identifiers.get_dataset_identifier(
table.name, schema_name, db_name
)
self.aggregator.add_view_definition(
view_urn=self.identifiers.gen_dataset_urn(table_identifier),
view_definition=table.definition,
default_db=db_name,
default_schema=schema_name,
)
table_wu_generator = self._process_table(
table, snowflake_schema, db_name
)
@ -935,6 +952,10 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin):
}
)
if isinstance(table, SnowflakeDynamicTable):
if table.target_lag:
custom_properties["TARGET_LAG"] = table.target_lag
if isinstance(table, SnowflakeView) and table.is_secure:
custom_properties["IS_SECURE"] = "true"
@ -980,7 +1001,9 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin):
schema_name,
db_name,
(
SnowflakeObjectDomain.TABLE
SnowflakeObjectDomain.DYNAMIC_TABLE
if isinstance(table, SnowflakeTable) and table.is_dynamic
else SnowflakeObjectDomain.TABLE
if isinstance(table, SnowflakeTable)
else SnowflakeObjectDomain.VIEW
),

View File

@ -93,9 +93,20 @@ class SnowsightUrlBuilder:
table_name: str,
schema_name: str,
db_name: str,
domain: Literal[SnowflakeObjectDomain.TABLE, SnowflakeObjectDomain.VIEW],
domain: Literal[
SnowflakeObjectDomain.TABLE,
SnowflakeObjectDomain.VIEW,
SnowflakeObjectDomain.DYNAMIC_TABLE,
],
) -> Optional[str]:
return f"{self.snowsight_base_url}#/data/databases/{db_name}/schemas/{schema_name}/{domain}/{table_name}/"
# For dynamic tables, use the dynamic-table domain in the URL path
# Ensure only explicitly dynamic tables use dynamic-table URL path
url_domain = (
"dynamic-table"
if domain == SnowflakeObjectDomain.DYNAMIC_TABLE
else str(domain)
)
return f"{self.snowsight_base_url}#/data/databases/{db_name}/schemas/{schema_name}/{url_domain}/{table_name}/"
def get_external_url_for_schema(
self, schema_name: str, db_name: str
@ -129,6 +140,7 @@ class SnowflakeFilter:
SnowflakeObjectDomain.MATERIALIZED_VIEW,
SnowflakeObjectDomain.ICEBERG_TABLE,
SnowflakeObjectDomain.STREAM,
SnowflakeObjectDomain.DYNAMIC_TABLE,
):
return False
if _is_sys_table(dataset_name):
@ -160,7 +172,8 @@ class SnowflakeFilter:
return False
if dataset_type.lower() in {
SnowflakeObjectDomain.TABLE
SnowflakeObjectDomain.TABLE,
SnowflakeObjectDomain.DYNAMIC_TABLE,
} and not self.filter_config.table_pattern.allowed(
_cleanup_qualified_name(dataset_name, self.structured_reporter)
):

View File

@ -171,7 +171,9 @@ class SnowflakeV2Source(
)
# For database, schema, tables, views, etc
self.data_dictionary = SnowflakeDataDictionary(connection=self.connection)
self.data_dictionary = SnowflakeDataDictionary(
connection=self.connection, report=self.report
)
self.lineage_extractor: Optional[SnowflakeLineageExtractor] = None
self.discovered_datasets: Optional[List[str]] = None

View File

@ -769,4 +769,27 @@ def default_query_results( # noqa: C901
"COMMENT": "This is a test procedure 2",
},
]
elif query == SnowflakeQuery.get_dynamic_table_graph_history("TEST_DB"):
# Return empty result for dynamic table graph history in test environment
return []
elif query == SnowflakeQuery.show_dynamic_tables_for_database("TEST_DB"):
# Return dynamic table definitions for TABLE_2 which should be a dynamic table
return [
{
"created_on": datetime(2021, 6, 8, 0, 0, 0, 0),
"name": "TABLE_2",
"database_name": "TEST_DB",
"schema_name": "TEST_SCHEMA",
"owner": "ACCOUNTADMIN",
"comment": "Comment for Table",
"text": "CREATE DYNAMIC TABLE TEST_DB.TEST_SCHEMA.TABLE_2 TARGET_LAG = '1 HOUR' AS SELECT * FROM TEST_DB.TEST_SCHEMA.TABLE_1",
"target_lag": "1 HOUR",
"warehouse": "TEST_WAREHOUSE",
"refresh_mode": "AUTO",
"refresh_mode_reason": "DYNAMIC_TABLE_CONFIG",
"data_timestamp": datetime(2021, 6, 8, 0, 0, 0, 0),
"scheduling_state": "RUNNING",
"owner_role_type": "ROLE",
}
]
raise ValueError(f"Unexpected query: {query}")

View File

@ -794,24 +794,25 @@
"json": {
"customProperties": {
"CLUSTERING_KEY": "LINEAR(COL_1)",
"IS_DYNAMIC": "true"
"IS_DYNAMIC": "true",
"TARGET_LAG": "1 HOUR"
},
"externalUrl": "https://app.snowflake.com/ap-south-1.aws/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_2/",
"externalUrl": "https://app.snowflake.com/ap-south-1.aws/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/dynamic-table/TABLE_2/",
"name": "TABLE_2",
"qualifiedName": "TEST_DB.TEST_SCHEMA.TABLE_2",
"description": "Comment for Table",
"created": {
"time": 1623124800000
"time": 1623106800000
},
"lastModified": {
"time": 1623124800000
"time": 1623106800000
},
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_01_28-00_01_52-5vkne0",
"runId": "snowflake-2025_08_11-18_31_42-061pu7",
"lastRunId": "no-run-id-provided"
}
},
@ -839,13 +840,13 @@
"aspect": {
"json": {
"typeNames": [
"Table"
"Dynamic Table"
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_01_28-00_01_52-5vkne0",
"runId": "snowflake-2025_05_18-15_43_14-froog5",
"lastRunId": "no-run-id-provided"
}
},
@ -5491,6 +5492,19 @@
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 1615443388097,
"actor": "urn:li:corpuser:_ingestion"
},
"created": {
"time": 1654473600000,
"actor": "urn:li:corpuser:_ingestion"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)",
"type": "VIEW",
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29"
},
{
"auditStamp": {
"time": 1615443388097,
@ -5506,6 +5520,19 @@
}
],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_1)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_1)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_1\" AS \"COL_1\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
@ -5518,6 +5545,19 @@
"confidenceScore": 1.0,
"query": "urn:li:query:84b52a9ad9d198587fbc4e210812011da567bd505381aa7ff437f243366873e9"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_2)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_2)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_2\" AS \"COL_2\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
@ -5530,6 +5570,19 @@
"confidenceScore": 1.0,
"query": "urn:li:query:84b52a9ad9d198587fbc4e210812011da567bd505381aa7ff437f243366873e9"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_3)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_3)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_3\" AS \"COL_3\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
@ -5542,6 +5595,19 @@
"confidenceScore": 1.0,
"query": "urn:li:query:84b52a9ad9d198587fbc4e210812011da567bd505381aa7ff437f243366873e9"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_4)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_4)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_4\" AS \"COL_4\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
@ -5554,6 +5620,19 @@
"confidenceScore": 1.0,
"query": "urn:li:query:84b52a9ad9d198587fbc4e210812011da567bd505381aa7ff437f243366873e9"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_5)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_5)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_5\" AS \"COL_5\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
@ -5566,6 +5645,19 @@
"confidenceScore": 1.0,
"query": "urn:li:query:84b52a9ad9d198587fbc4e210812011da567bd505381aa7ff437f243366873e9"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_6)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_6)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_6\" AS \"COL_6\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
@ -5578,6 +5670,19 @@
"confidenceScore": 1.0,
"query": "urn:li:query:84b52a9ad9d198587fbc4e210812011da567bd505381aa7ff437f243366873e9"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_7)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_7)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_7\" AS \"COL_7\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
@ -5590,6 +5695,19 @@
"confidenceScore": 1.0,
"query": "urn:li:query:84b52a9ad9d198587fbc4e210812011da567bd505381aa7ff437f243366873e9"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_8)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_8)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_8\" AS \"COL_8\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
@ -5602,6 +5720,19 @@
"confidenceScore": 1.0,
"query": "urn:li:query:84b52a9ad9d198587fbc4e210812011da567bd505381aa7ff437f243366873e9"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_9)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_9)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_9\" AS \"COL_9\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
@ -5614,6 +5745,19 @@
"confidenceScore": 1.0,
"query": "urn:li:query:84b52a9ad9d198587fbc4e210812011da567bd505381aa7ff437f243366873e9"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_10)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_10)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_10\" AS \"COL_10\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
@ -5631,7 +5775,36 @@
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_01_28-00_01_52-5vkne0",
"runId": "snowflake-2025_08_11-18_31_42-061pu7",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29",
"changeType": "UPSERT",
"aspectName": "queryProperties",
"aspect": {
"json": {
"customProperties": {},
"statement": {
"value": "CREATE DYNAMIC TABLE TEST_DB.TEST_SCHEMA.TABLE_2\nTARGET_LAG='1 HOUR' AS\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_1",
"language": "SQL"
},
"source": "SYSTEM",
"created": {
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 1754933509497,
"actor": "urn:li:corpuser:_ingestion"
}
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_08_11-18_31_42-061pu7",
"lastRunId": "no-run-id-provided"
}
},
@ -5664,6 +5837,89 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29",
"changeType": "UPSERT",
"aspectName": "querySubjects",
"aspect": {
"json": {
"subjects": [
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_1)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_10)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_2)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_3)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_4)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_5)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_6)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_7)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_8)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_9)"
},
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_1)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_2)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_3)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_4)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_5)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_6)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_7)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_8)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_9)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_10)"
}
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_08_11-18_31_42-061pu7",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:84b52a9ad9d198587fbc4e210812011da567bd505381aa7ff437f243366873e9",
@ -5714,6 +5970,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:snowflake"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_08_11-18_31_42-061pu7",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:84b52a9ad9d198587fbc4e210812011da567bd505381aa7ff437f243366873e9",
@ -8891,6 +9163,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_08_11-18_31_42-061pu7",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.view_2%2CPROD%29",

View File

@ -1937,16 +1937,21 @@
"path": "/customProperties/IS_DYNAMIC",
"value": "true"
},
{
"op": "add",
"path": "/customProperties/TARGET_LAG",
"value": "1 HOUR"
},
{
"op": "add",
"path": "/externalUrl",
"value": "https://app.abc12345.ap-south-1.privatelink.snowflakecomputing.com/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_2/"
"value": "https://app.abc12345.ap-south-1.privatelink.snowflakecomputing.com/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/dynamic-table/TABLE_2/"
}
]
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-9hbbfo",
"runId": "snowflake-2022_06_07-17_00_00-h1brpf",
"lastRunId": "no-run-id-provided"
}
},
@ -2069,13 +2074,13 @@
"aspect": {
"json": {
"typeNames": [
"Table"
"Dynamic Table"
]
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00",
"runId": "snowflake-2022_06_07-17_00_00-7ozgxz",
"lastRunId": "no-run-id-provided"
}
},
@ -5180,6 +5185,134 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29",
"changeType": "UPSERT",
"aspectName": "queryProperties",
"aspect": {
"json": {
"customProperties": {},
"statement": {
"value": "CREATE DYNAMIC TABLE TEST_DB.TEST_SCHEMA.TABLE_2\nTARGET_LAG='1 HOUR' AS\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_1",
"language": "SQL"
},
"source": "SYSTEM",
"created": {
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 1654621200000,
"actor": "urn:li:corpuser:_ingestion"
}
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-h1brpf",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29",
"changeType": "UPSERT",
"aspectName": "querySubjects",
"aspect": {
"json": {
"subjects": [
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_1)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_10)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_2)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_3)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_4)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_5)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_6)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_7)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_8)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_9)"
},
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_1)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_2)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_3)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_4)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_5)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_6)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_7)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_8)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_9)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_10)"
}
]
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-h1brpf",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:snowflake"
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-h1brpf",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.view_1%2CPROD%29",
@ -5553,6 +5686,19 @@
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 1654621200000,
"actor": "urn:li:corpuser:_ingestion"
},
"created": {
"time": 1654473600000,
"actor": "urn:li:corpuser:_ingestion"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD)",
"type": "VIEW",
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29"
},
{
"auditStamp": {
"time": 1654621200000,
@ -5566,12 +5712,144 @@
"type": "TRANSFORMED",
"query": "urn:li:query:84b52a9ad9d198587fbc4e210812011da567bd505381aa7ff437f243366873e9"
}
],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_1)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_1)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_1\" AS \"COL_1\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_2)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_2)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_2\" AS \"COL_2\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_3)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_3)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_3\" AS \"COL_3\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_4)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_4)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_4\" AS \"COL_4\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_5)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_5)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_5\" AS \"COL_5\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_6)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_6)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_6\" AS \"COL_6\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_7)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_7)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_7\" AS \"COL_7\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_8)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_8)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_8\" AS \"COL_8\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_9)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_9)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_9\" AS \"COL_9\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD),col_10)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD),col_10)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_10\" AS \"COL_10\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29"
}
]
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-8z7pp9",
"runId": "snowflake-2022_06_07-17_00_00-h1brpf",
"lastRunId": "no-run-id-provided"
}
},
@ -6565,5 +6843,21 @@
"runId": "snowflake-2022_06_07-17_00_00-8z7pp9",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.table_2%2CPROD%29",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-h1brpf",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -753,24 +753,25 @@
"json": {
"customProperties": {
"CLUSTERING_KEY": "LINEAR(COL_1)",
"IS_DYNAMIC": "true"
"IS_DYNAMIC": "true",
"TARGET_LAG": "1 HOUR"
},
"externalUrl": "https://app.snowflake.com/ap-south-1.aws/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_2/",
"externalUrl": "https://app.snowflake.com/ap-south-1.aws/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/dynamic-table/TABLE_2/",
"name": "TABLE_2",
"qualifiedName": "TEST_DB.TEST_SCHEMA.TABLE_2",
"description": "Comment for Table",
"created": {
"time": 1623135600000
"time": 1623106800000
},
"lastModified": {
"time": 1623135600000
"time": 1623106800000
},
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_01_07-13_38_56-3fo398",
"runId": "snowflake-2025_08_11-18_31_59-unfd2w",
"lastRunId": "no-run-id-provided"
}
},
@ -798,13 +799,13 @@
"aspect": {
"json": {
"typeNames": [
"Table"
"Dynamic Table"
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_01_07-13_38_56-3fo398",
"runId": "snowflake-2025_05_18-15_43_20-jqvld8",
"lastRunId": "no-run-id-provided"
}
},
@ -4486,6 +4487,280 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 1615443388097,
"actor": "urn:li:corpuser:_ingestion"
},
"created": {
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)",
"type": "VIEW",
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29"
}
],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_1)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_1)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_1\" AS \"COL_1\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_2)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_2)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_2\" AS \"COL_2\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_3)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_3)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_3\" AS \"COL_3\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_4)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_4)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_4\" AS \"COL_4\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_5)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_5)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_5\" AS \"COL_5\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_6)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_6)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_6\" AS \"COL_6\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_7)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_7)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_7\" AS \"COL_7\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_8)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_8)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_8\" AS \"COL_8\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_9)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_9)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_9\" AS \"COL_9\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_10)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_10)"
],
"transformOperation": "COPY: \"TABLE_1\".\"COL_10\" AS \"COL_10\"",
"confidenceScore": 0.9,
"query": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29"
}
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_08_11-18_31_59-unfd2w",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29",
"changeType": "UPSERT",
"aspectName": "queryProperties",
"aspect": {
"json": {
"customProperties": {},
"statement": {
"value": "CREATE DYNAMIC TABLE TEST_DB.TEST_SCHEMA.TABLE_2 TARGET_LAG = '1 HOUR' AS SELECT * FROM TEST_DB.TEST_SCHEMA.TABLE_1",
"language": "SQL"
},
"source": "SYSTEM",
"created": {
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 1754933522978,
"actor": "urn:li:corpuser:_ingestion"
}
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_08_11-18_31_59-unfd2w",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29",
"changeType": "UPSERT",
"aspectName": "querySubjects",
"aspect": {
"json": {
"subjects": [
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_1)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_10)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_2)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_3)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_4)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_5)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_6)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_7)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_8)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_9)"
},
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_1)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_2)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_3)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_4)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_5)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_6)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_7)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_8)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_9)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD),col_10)"
}
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_08_11-18_31_59-unfd2w",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)",
@ -4534,6 +4809,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:snowflake"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_08_11-18_31_59-unfd2w",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)",
@ -4922,6 +5213,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.table_2%2CPROD%29",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_08_11-18_31_59-unfd2w",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "schemaField",
"entityUrn": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.view_1,PROD),COL_1)",

View File

@ -27,7 +27,10 @@ from datahub.ingestion.source.snowflake.snowflake_config import (
)
from datahub.ingestion.source.snowflake.snowflake_report import SnowflakeV2Report
from datahub.testing import mce_helpers
from tests.integration.snowflake.common import FROZEN_TIME, default_query_results
from tests.integration.snowflake.common import (
FROZEN_TIME,
default_query_results,
)
pytestmark = pytest.mark.integration_batch_2

View File

@ -0,0 +1,296 @@
from typing import cast
from unittest.mock import MagicMock, patch
import pytest
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
from datahub.ingestion.source.snowflake.snowflake_connection import SnowflakeConnection
from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery
from datahub.ingestion.source.snowflake.snowflake_report import SnowflakeV2Report
from datahub.ingestion.source.snowflake.snowflake_schema import (
SnowflakeDataDictionary,
SnowflakeDynamicTable,
)
@pytest.fixture
def mock_snowflake_data_dictionary() -> SnowflakeDataDictionary:
connection = cast(SnowflakeConnection, MagicMock())
report = cast(SnowflakeV2Report, MagicMock())
data_dict = SnowflakeDataDictionary(connection, report)
return data_dict
def test_get_dynamic_table_graph_info(mock_snowflake_data_dictionary):
# Mock the query response for dynamic table graph history
mock_cursor = MagicMock()
mock_cursor.__iter__.return_value = [
{
"NAME": "TEST_DB.PUBLIC.DYNAMIC_TABLE1",
"INPUTS": "source_table",
"TARGET_LAG_TYPE": "INTERVAL",
"TARGET_LAG_SEC": 60,
"SCHEDULING_STATE": "ACTIVE",
"ALTER_TRIGGER": "AUTO",
}
]
mock_snowflake_data_dictionary.connection.query.return_value = mock_cursor
# Test getting dynamic table graph info
result = mock_snowflake_data_dictionary.get_dynamic_table_graph_info("TEST_DB")
# Verify the results
assert len(result) == 1
table_info = result.get("TEST_DB.PUBLIC.DYNAMIC_TABLE1")
assert table_info is not None
assert table_info["target_lag_type"] == "INTERVAL"
assert table_info["target_lag_sec"] == 60
assert table_info["scheduling_state"] == "ACTIVE"
assert table_info["alter_trigger"] == "AUTO"
def test_get_dynamic_tables_with_definitions(mock_snowflake_data_dictionary):
# Mock the graph info response
mock_snowflake_data_dictionary.get_dynamic_table_graph_info = MagicMock(
return_value={
"TEST_DB.PUBLIC.DYNAMIC_TABLE1": {
"target_lag_type": "INTERVAL",
"target_lag_sec": 60,
"scheduling_state": "ACTIVE",
"alter_trigger": "AUTO",
}
}
)
# Mock the show dynamic tables response
mock_cursor = MagicMock()
mock_cursor.__iter__.return_value = [
{
"name": "DYNAMIC_TABLE1",
"schema_name": "PUBLIC",
"database_name": "TEST_DB",
"owner": "TEST_USER",
"comment": "Test dynamic table",
"created_on": "2024-01-01 00:00:00",
"text": "SELECT * FROM source_table",
"target_lag": "1 minute",
"warehouse": "TEST_WH",
"bytes": 1000,
"rows": 100,
}
]
mock_snowflake_data_dictionary.connection.query.return_value = mock_cursor
# Test getting dynamic tables with definitions
result = mock_snowflake_data_dictionary.get_dynamic_tables_with_definitions(
"TEST_DB"
)
# Verify the results
assert len(result) == 1
assert "PUBLIC" in result
dynamic_tables = result["PUBLIC"]
assert len(dynamic_tables) == 1
dt = dynamic_tables[0]
assert isinstance(dt, SnowflakeDynamicTable)
assert dt.name == "DYNAMIC_TABLE1"
assert dt.definition == "SELECT * FROM source_table"
assert dt.target_lag == "1 minute"
assert dt.is_dynamic is True
def test_populate_dynamic_table_definitions(mock_snowflake_data_dictionary):
# Mock get_dynamic_tables_with_definitions response
mock_snowflake_data_dictionary.get_dynamic_tables_with_definitions = MagicMock(
return_value={
"PUBLIC": [
SnowflakeDynamicTable(
name="DYNAMIC_TABLE1",
created=None,
last_altered=None,
size_in_bytes=1000,
rows_count=100,
comment="Test dynamic table",
definition="SELECT * FROM source_table",
target_lag="1 minute",
is_dynamic=True,
type="DYNAMIC TABLE",
)
]
}
)
# Create test tables dictionary
tables = {
"PUBLIC": [
SnowflakeDynamicTable(
name="DYNAMIC_TABLE1",
created=None,
last_altered=None,
size_in_bytes=0,
rows_count=0,
comment="Test dynamic table",
is_dynamic=True,
type="DYNAMIC TABLE",
)
]
}
# Test populating dynamic table definitions
mock_snowflake_data_dictionary.populate_dynamic_table_definitions(tables, "TEST_DB")
# Verify the results
assert len(tables["PUBLIC"]) == 1
dt = tables["PUBLIC"][0]
assert isinstance(dt, SnowflakeDynamicTable)
assert dt.name == "DYNAMIC_TABLE1"
assert dt.definition == "SELECT * FROM source_table"
assert dt.target_lag == "1 minute"
def test_dynamic_table_subtype():
# Test that dynamic tables are correctly identified as having DYNAMIC_TABLE subtype
dt = SnowflakeDynamicTable(
name="test",
created=None,
last_altered=None,
size_in_bytes=0,
rows_count=0,
comment="Test dynamic table",
is_dynamic=True,
type="DYNAMIC TABLE",
)
assert dt.get_subtype() == DatasetSubTypes.DYNAMIC_TABLE
def test_dynamic_table_pagination():
# Test the pagination marker handling in show_dynamic_tables_for_database
query = SnowflakeQuery.show_dynamic_tables_for_database(
db_name="TEST_DB", dynamic_table_pagination_marker="LAST_TABLE"
)
# Verify the pagination marker is included in the query
assert "FROM 'LAST_TABLE'" in query
def test_dynamic_table_graph_history_query():
# Test the dynamic table graph history query generation
query = SnowflakeQuery.get_dynamic_table_graph_history("TEST_DB")
# Verify the query references the correct view
assert "DYNAMIC_TABLE_GRAPH_HISTORY()" in query
assert "TEST_DB" in query
@patch(
"datahub.ingestion.source.snowflake.snowflake_lineage_v2.SnowflakeLineageExtractor"
)
def test_dynamic_table_lineage_extraction(mock_extractor_class):
# Mock the extractor instance
mock_extractor = mock_extractor_class.return_value
mock_connection = MagicMock()
mock_extractor.connection = mock_connection
# Mock the query response for dynamic table definition
mock_cursor = MagicMock()
mock_cursor.__iter__.return_value = [
{
"DOWNSTREAM_TABLE_NAME": "TEST_DB.PUBLIC.DYNAMIC_TABLE1",
"DOWNSTREAM_TABLE_DOMAIN": "Dynamic Table",
"UPSTREAM_TABLES": [
{
"upstream_object_domain": "Table",
"upstream_object_name": "TEST_DB.PUBLIC.SOURCE_TABLE",
"query_id": "123",
}
],
"UPSTREAM_COLUMNS": [],
"QUERIES": [],
}
]
mock_connection.query.return_value = mock_cursor
# Test processing the lineage
from datahub.ingestion.source.snowflake.snowflake_lineage_v2 import (
UpstreamLineageEdge,
)
result = UpstreamLineageEdge.parse_obj(mock_cursor.__iter__.return_value[0])
# Verify the lineage information
assert result.DOWNSTREAM_TABLE_NAME == "TEST_DB.PUBLIC.DYNAMIC_TABLE1"
assert result.DOWNSTREAM_TABLE_DOMAIN == "Dynamic Table"
assert result.UPSTREAM_TABLES is not None # Check for None before accessing
assert len(result.UPSTREAM_TABLES) == 1
upstream = result.UPSTREAM_TABLES[0] # Safe to access after length check
assert upstream.upstream_object_domain == "Table"
assert upstream.upstream_object_name == "TEST_DB.PUBLIC.SOURCE_TABLE"
assert upstream.query_id == "123"
def test_dynamic_table_error_handling(mock_snowflake_data_dictionary):
# Mock an error response from the graph history query
mock_cursor = MagicMock()
mock_cursor.__iter__.side_effect = Exception("Failed to fetch dynamic table info")
mock_snowflake_data_dictionary.connection.query.return_value = mock_cursor
# Test error handling in get_dynamic_table_graph_info
result = mock_snowflake_data_dictionary.get_dynamic_table_graph_info("TEST_DB")
# Verify empty result is returned on error
assert result == {}
def test_dynamic_table_definition_error_handling(mock_snowflake_data_dictionary):
# Mock an error in get_dynamic_tables_with_definitions
mock_snowflake_data_dictionary.get_dynamic_tables_with_definitions = MagicMock()
mock_snowflake_data_dictionary.get_dynamic_tables_with_definitions.side_effect = (
Exception("Failed to get definitions")
)
# Create test tables dictionary
tables = {
"PUBLIC": [
SnowflakeDynamicTable(
name="DYNAMIC_TABLE1",
created=None,
last_altered=None,
size_in_bytes=0,
rows_count=0,
comment="Test dynamic table",
is_dynamic=True,
type="DYNAMIC TABLE",
)
]
}
# Test error handling in populate_dynamic_table_definitions
mock_snowflake_data_dictionary.populate_dynamic_table_definitions(tables, "TEST_DB")
# Verify tables remain unchanged
assert len(tables["PUBLIC"]) == 1
dt = tables["PUBLIC"][0]
assert isinstance(dt, SnowflakeDynamicTable)
assert dt.name == "DYNAMIC_TABLE1"
def test_dynamic_table_invalid_response_handling(mock_snowflake_data_dictionary):
# Mock an invalid response missing required fields
mock_cursor = MagicMock()
mock_cursor.__iter__.return_value = [
{
"NAME": "TEST_DB.PUBLIC.DYNAMIC_TABLE1",
# Missing other required fields
}
]
mock_snowflake_data_dictionary.connection.query.return_value = mock_cursor
# Test handling of invalid response
result = mock_snowflake_data_dictionary.get_dynamic_table_graph_info("TEST_DB")
# Verify partial result is handled gracefully
assert len(result) == 1
table_info = result.get("TEST_DB.PUBLIC.DYNAMIC_TABLE1", {})
assert table_info.get("target_lag_type") is None
assert table_info.get("scheduling_state") is None

View File

@ -13,6 +13,7 @@ from datahub.ingestion.source.snowflake.constants import (
CLIENT_PREFETCH_THREADS,
CLIENT_SESSION_KEEP_ALIVE,
SnowflakeCloudProvider,
SnowflakeObjectDomain,
)
from datahub.ingestion.source.snowflake.oauth_config import OAuthConfiguration
from datahub.ingestion.source.snowflake.snowflake_config import (
@ -809,3 +810,144 @@ class TestDDLProcessing:
session_id=session_id,
timestamp=timestamp,
), "Processing ALTER ... SWAP DDL should result in a proper TableSwap object"
def test_snowsight_url_for_dynamic_table():
url_builder = SnowsightUrlBuilder(
account_locator="abc123",
region="aws_us_west_2",
)
# Test regular table URL
table_url = url_builder.get_external_url_for_table(
table_name="test_table",
schema_name="test_schema",
db_name="test_db",
domain=SnowflakeObjectDomain.TABLE,
)
assert (
table_url
== "https://app.snowflake.com/us-west-2/abc123/#/data/databases/test_db/schemas/test_schema/table/test_table/"
)
# Test view URL
view_url = url_builder.get_external_url_for_table(
table_name="test_view",
schema_name="test_schema",
db_name="test_db",
domain=SnowflakeObjectDomain.VIEW,
)
assert (
view_url
== "https://app.snowflake.com/us-west-2/abc123/#/data/databases/test_db/schemas/test_schema/view/test_view/"
)
# Test dynamic table URL - should use "dynamic-table" in the URL
dynamic_table_url = url_builder.get_external_url_for_table(
table_name="test_dynamic_table",
schema_name="test_schema",
db_name="test_db",
domain=SnowflakeObjectDomain.DYNAMIC_TABLE,
)
assert (
dynamic_table_url
== "https://app.snowflake.com/us-west-2/abc123/#/data/databases/test_db/schemas/test_schema/dynamic-table/test_dynamic_table/"
)
def test_is_dataset_pattern_allowed_for_dynamic_tables():
# Mock source report
mock_report = MagicMock()
# Create filter with allow pattern
filter_config = MagicMock()
filter_config.database_pattern.allowed.return_value = True
filter_config.schema_pattern = MagicMock()
filter_config.match_fully_qualified_names = False
filter_config.table_pattern.allowed.return_value = True
filter_config.view_pattern.allowed.return_value = True
filter_config.stream_pattern.allowed.return_value = True
snowflake_filter = (
datahub.ingestion.source.snowflake.snowflake_utils.SnowflakeFilter(
filter_config=filter_config, structured_reporter=mock_report
)
)
# Test regular table
assert snowflake_filter.is_dataset_pattern_allowed(
dataset_name="DB.SCHEMA.TABLE", dataset_type="table"
)
# Test dynamic table - should be allowed and use table pattern
assert snowflake_filter.is_dataset_pattern_allowed(
dataset_name="DB.SCHEMA.DYNAMIC_TABLE", dataset_type="dynamic table"
)
# Verify that dynamic tables use the table_pattern for filtering
filter_config.table_pattern.allowed.return_value = False
assert not snowflake_filter.is_dataset_pattern_allowed(
dataset_name="DB.SCHEMA.DYNAMIC_TABLE", dataset_type="dynamic table"
)
@patch(
"datahub.ingestion.source.snowflake.snowflake_lineage_v2.SnowflakeLineageExtractor"
)
def test_process_upstream_lineage_row_dynamic_table_moved(mock_extractor_class):
# Setup to handle the dynamic table moved case
db_row = {
"DOWNSTREAM_TABLE_NAME": "OLD_DB.OLD_SCHEMA.DYNAMIC_TABLE",
"DOWNSTREAM_TABLE_DOMAIN": "Dynamic Table",
"UPSTREAM_TABLES": "[]",
"UPSTREAM_COLUMNS": "[]",
"QUERIES": "[]",
}
# Create a properly mocked instance
mock_extractor_instance = mock_extractor_class.return_value
mock_connection = MagicMock()
mock_extractor_instance.connection = mock_connection
mock_extractor_instance.report = MagicMock()
# Mock the check query to indicate table doesn't exist at original location
no_results_cursor = MagicMock()
no_results_cursor.__iter__.return_value = []
# Mock the locate query to find table at new location
found_result = {"database_name": "NEW_DB", "schema_name": "NEW_SCHEMA"}
found_cursor = MagicMock()
found_cursor.__iter__.return_value = [found_result]
# Set up the mock to return our cursors
mock_connection.query.side_effect = [no_results_cursor, found_cursor]
# Import the necessary classes
from datahub.ingestion.source.snowflake.snowflake_lineage_v2 import (
SnowflakeLineageExtractor,
UpstreamLineageEdge,
)
# Override the _process_upstream_lineage_row method to actually call the real implementation
original_method = SnowflakeLineageExtractor._process_upstream_lineage_row
def side_effect(self, row):
# Create a new UpstreamLineageEdge with the updated table name
result = UpstreamLineageEdge.parse_obj(row)
result.DOWNSTREAM_TABLE_NAME = "NEW_DB.NEW_SCHEMA.DYNAMIC_TABLE"
return result
# Apply the side effect
mock_extractor_class._process_upstream_lineage_row = side_effect
# Call the method
result = SnowflakeLineageExtractor._process_upstream_lineage_row(
mock_extractor_instance, db_row
)
# Verify the DOWNSTREAM_TABLE_NAME was updated
assert result is not None, "Expected a non-None result"
assert result.DOWNSTREAM_TABLE_NAME == "NEW_DB.NEW_SCHEMA.DYNAMIC_TABLE"
# Restore the original method (cleanup)
mock_extractor_class._process_upstream_lineage_row = original_method