feat(ingestion/snowflake):adds streams as a new dataset with lineage and properties. (#12318)

Co-authored-by: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com>
This commit is contained in:
Brock Griffey 2025-02-05 00:18:37 -05:00 committed by GitHub
parent 7f6e3995d3
commit ac13f255e6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 5142 additions and 3170 deletions

View File

@ -15,6 +15,8 @@ grant operate, usage on warehouse "<your-warehouse>" to role datahub_role;
grant usage on DATABASE "<your-database>" to role datahub_role;
grant usage on all schemas in database "<your-database>" to role datahub_role;
grant usage on future schemas in database "<your-database>" to role datahub_role;
grant select on all streams in database "<your-database>> to role datahub_role;
grant select on future streams in database "<your-database>> to role datahub_role;
// If you are NOT using Snowflake Profiling or Classification feature: Grant references privileges to your tables and views
grant references on all tables in database "<your-database>" to role datahub_role;
@ -50,9 +52,12 @@ The details of each granted privilege can be viewed in [snowflake docs](https://
If the warehouse is already running during ingestion or has auto-resume enabled,
this permission is not required.
- `usage` is required for us to run queries using the warehouse
- `usage` on `database` and `schema` are required because without it tables and views inside them are not accessible. If an admin does the required grants on `table` but misses the grants on `schema` or the `database` in which the table/view exists then we will not be able to get metadata for the table/view.
- `usage` on `database` and `schema` are required because without it tables, views, and streams inside them are not accessible. If an admin does the required grants on `table` but misses the grants on `schema` or the `database` in which the table/view/stream exists then we will not be able to get metadata for the table/view/stream.
- If metadata is required only on some schemas then you can grant the usage privilieges only on a particular schema like
```sql
grant usage on schema "<your-database>"."<your-schema>" to role datahub_role;
```
- `select` on `streams` is required in order for stream definitions to be available. This does not allow selecting of the data (not required) unless the underlying dataset has select access as well.
```sql
grant usage on schema "<your-database>"."<your-schema>" to role datahub_role;
```

View File

@ -24,6 +24,7 @@ class DatasetSubTypes(StrEnum):
SAC_LIVE_DATA_MODEL = "Live Data Model"
NEO4J_NODE = "Neo4j Node"
NEO4J_RELATIONSHIP = "Neo4j Relationship"
SNOWFLAKE_STREAM = "Snowflake Stream"
# TODO: Create separate entity...
NOTEBOOK = "Notebook"

View File

@ -53,6 +53,7 @@ class SnowflakeObjectDomain(StrEnum):
SCHEMA = "schema"
COLUMN = "column"
ICEBERG_TABLE = "iceberg table"
STREAM = "stream"
GENERIC_PERMISSION_ERROR_KEY = "permission-error"

View File

@ -98,6 +98,11 @@ class SnowflakeFilterConfig(SQLFilterConfig):
)
# table_pattern and view_pattern are inherited from SQLFilterConfig
stream_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for streams to filter in ingestion. Note: Defaults to table_pattern if not specified. Specify regex to match the entire view name in database.schema.view format. e.g. to match all views starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'",
)
match_fully_qualified_names: bool = Field(
default=False,
description="Whether `schema_pattern` is matched against fully qualified schema name `<catalog>.<schema>`.",
@ -274,6 +279,11 @@ class SnowflakeV2Config(
description="List of regex patterns for tags to include in ingestion. Only used if `extract_tags` is enabled.",
)
include_streams: bool = Field(
default=True,
description="If enabled, streams will be ingested as separate entities from tables/views.",
)
structured_property_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description=(

View File

@ -49,6 +49,7 @@ from datahub.metadata.urns import CorpUserUrn
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.sql_parsing.sql_parsing_aggregator import (
KnownLineageMapping,
ObservedQuery,
PreparsedQuery,
SqlAggregatorReport,
SqlParsingAggregator,
@ -241,7 +242,13 @@ class SnowflakeQueriesExtractor(SnowflakeStructuredReportMixin, Closeable):
use_cached_audit_log = audit_log_file.exists()
queries: FileBackedList[
Union[KnownLineageMapping, PreparsedQuery, TableRename, TableSwap]
Union[
KnownLineageMapping,
PreparsedQuery,
TableRename,
TableSwap,
ObservedQuery,
]
]
if use_cached_audit_log:
logger.info("Using cached audit log")
@ -252,7 +259,13 @@ class SnowflakeQueriesExtractor(SnowflakeStructuredReportMixin, Closeable):
shared_connection = ConnectionWrapper(audit_log_file)
queries = FileBackedList(shared_connection)
entry: Union[KnownLineageMapping, PreparsedQuery, TableRename, TableSwap]
entry: Union[
KnownLineageMapping,
PreparsedQuery,
TableRename,
TableSwap,
ObservedQuery,
]
with self.report.copy_history_fetch_timer:
for entry in self.fetch_copy_history():
@ -329,7 +342,7 @@ class SnowflakeQueriesExtractor(SnowflakeStructuredReportMixin, Closeable):
def fetch_query_log(
self, users: UsersMapping
) -> Iterable[Union[PreparsedQuery, TableRename, TableSwap]]:
) -> Iterable[Union[PreparsedQuery, TableRename, TableSwap, ObservedQuery]]:
query_log_query = _build_enriched_query_log_query(
start_time=self.config.window.start_time,
end_time=self.config.window.end_time,
@ -362,7 +375,7 @@ class SnowflakeQueriesExtractor(SnowflakeStructuredReportMixin, Closeable):
def _parse_audit_log_row(
self, row: Dict[str, Any], users: UsersMapping
) -> Optional[Union[TableRename, TableSwap, PreparsedQuery]]:
) -> Optional[Union[TableRename, TableSwap, PreparsedQuery, ObservedQuery]]:
json_fields = {
"DIRECT_OBJECTS_ACCESSED",
"OBJECTS_MODIFIED",
@ -398,6 +411,34 @@ class SnowflakeQueriesExtractor(SnowflakeStructuredReportMixin, Closeable):
pass
else:
return None
user = CorpUserUrn(
self.identifiers.get_user_identifier(
res["user_name"], users.get(res["user_name"])
)
)
# Use direct_objects_accessed instead objects_modified
# objects_modified returns $SYS_VIEW_X with no mapping
has_stream_objects = any(
obj.get("objectDomain") == "Stream" for obj in direct_objects_accessed
)
# If a stream is used, default to query parsing.
if has_stream_objects:
logger.debug("Found matching stream object")
return ObservedQuery(
query=res["query_text"],
session_id=res["session_id"],
timestamp=res["query_start_time"].astimezone(timezone.utc),
user=user,
default_db=res["default_db"],
default_schema=res["default_schema"],
query_hash=get_query_fingerprint(
res["query_text"], self.identifiers.platform, fast=True
),
)
upstreams = []
column_usage = {}
@ -460,12 +501,6 @@ class SnowflakeQueriesExtractor(SnowflakeStructuredReportMixin, Closeable):
)
)
user = CorpUserUrn(
self.identifiers.get_user_identifier(
res["user_name"], users.get(res["user_name"])
)
)
timestamp: datetime = res["query_start_time"]
timestamp = timestamp.astimezone(timezone.utc)

View File

@ -9,6 +9,7 @@ from datahub.ingestion.source.snowflake.snowflake_config import (
from datahub.utilities.prefix_batch_builder import PrefixGroup
SHOW_VIEWS_MAX_PAGE_SIZE = 10000
SHOW_STREAM_MAX_PAGE_SIZE = 10000
def create_deny_regex_sql_filter(
@ -36,6 +37,7 @@ class SnowflakeQuery:
SnowflakeObjectDomain.VIEW.capitalize(),
SnowflakeObjectDomain.MATERIALIZED_VIEW.capitalize(),
SnowflakeObjectDomain.ICEBERG_TABLE.capitalize(),
SnowflakeObjectDomain.STREAM.capitalize(),
}
ACCESS_HISTORY_TABLE_VIEW_DOMAINS_FILTER = "({})".format(
@ -44,7 +46,8 @@ class SnowflakeQuery:
ACCESS_HISTORY_TABLE_DOMAINS_FILTER = (
"("
f"'{SnowflakeObjectDomain.TABLE.capitalize()}',"
f"'{SnowflakeObjectDomain.VIEW.capitalize()}'"
f"'{SnowflakeObjectDomain.VIEW.capitalize()}',"
f"'{SnowflakeObjectDomain.STREAM.capitalize()}',"
")"
)
@ -963,3 +966,19 @@ WHERE table_schema='{schema_name}' AND {extra_clause}"""
@staticmethod
def get_all_users() -> str:
return """SELECT name as "NAME", email as "EMAIL" FROM SNOWFLAKE.ACCOUNT_USAGE.USERS"""
@staticmethod
def streams_for_database(
db_name: str,
limit: int = SHOW_STREAM_MAX_PAGE_SIZE,
stream_pagination_marker: Optional[str] = None,
) -> str:
# SHOW STREAMS can return a maximum of 10000 rows.
# https://docs.snowflake.com/en/sql-reference/sql/show-streams#usage-notes
assert limit <= SHOW_STREAM_MAX_PAGE_SIZE
# To work around this, we paginate through the results using the FROM clause.
from_clause = (
f"""FROM '{stream_pagination_marker}'""" if stream_pagination_marker else ""
)
return f"""SHOW STREAMS IN DATABASE {db_name} LIMIT {limit} {from_clause};"""

View File

@ -104,6 +104,7 @@ class SnowflakeV2Report(
schemas_scanned: int = 0
databases_scanned: int = 0
tags_scanned: int = 0
streams_scanned: int = 0
include_usage_stats: bool = False
include_operational_stats: bool = False
@ -113,6 +114,7 @@ class SnowflakeV2Report(
table_lineage_query_secs: float = -1
external_lineage_queries_secs: float = -1
num_tables_with_known_upstreams: int = 0
num_streams_with_known_upstreams: int = 0
num_upstream_lineage_edge_parsing_failed: int = 0
num_secure_views_missing_definition: int = 0
num_structured_property_templates_created: int = 0
@ -131,6 +133,8 @@ class SnowflakeV2Report(
num_get_tags_for_object_queries: int = 0
num_get_tags_on_columns_for_table_queries: int = 0
num_get_streams_for_schema_queries: int = 0
rows_zero_objects_modified: int = 0
_processed_tags: MutableSet[str] = field(default_factory=set)
@ -157,6 +161,8 @@ class SnowflakeV2Report(
return
self._scanned_tags.add(name)
self.tags_scanned += 1
elif ent_type == "stream":
self.streams_scanned += 1
else:
raise KeyError(f"Unknown entity {ent_type}.")

View File

@ -14,7 +14,7 @@ from datahub.ingestion.source.snowflake.snowflake_query import (
)
from datahub.ingestion.source.sql.sql_generic import BaseColumn, BaseTable, BaseView
from datahub.utilities.file_backed_collections import FileBackedDict
from datahub.utilities.prefix_batch_builder import build_prefix_batches
from datahub.utilities.prefix_batch_builder import PrefixGroup, build_prefix_batches
from datahub.utilities.serialized_lru_cache import serialized_lru_cache
logger: logging.Logger = logging.getLogger(__name__)
@ -118,6 +118,7 @@ class SnowflakeSchema:
comment: Optional[str]
tables: List[str] = field(default_factory=list)
views: List[str] = field(default_factory=list)
streams: List[str] = field(default_factory=list)
tags: Optional[List[SnowflakeTag]] = None
@ -131,6 +132,29 @@ class SnowflakeDatabase:
tags: Optional[List[SnowflakeTag]] = None
@dataclass
class SnowflakeStream:
name: str
created: datetime
owner: str
source_type: str
type: str
stale: str
mode: str
invalid_reason: str
owner_role_type: str
database_name: str
schema_name: str
table_name: str
comment: Optional[str]
columns: List[SnowflakeColumn] = field(default_factory=list)
stale_after: Optional[datetime] = None
base_tables: Optional[str] = None
tags: Optional[List[SnowflakeTag]] = None
column_tags: Dict[str, List[SnowflakeTag]] = field(default_factory=dict)
last_altered: Optional[datetime] = None
class _SnowflakeTagCache:
def __init__(self) -> None:
# self._database_tags[<database_name>] = list of tags applied to database
@ -208,6 +232,7 @@ class SnowflakeDataDictionary(SupportsAsObj):
self.get_tables_for_database,
self.get_views_for_database,
self.get_columns_for_schema,
self.get_streams_for_database,
self.get_pk_constraints_for_schema,
self.get_fk_constraints_for_schema,
]
@ -431,9 +456,18 @@ class SnowflakeDataDictionary(SupportsAsObj):
# For massive schemas, use a FileBackedDict to avoid memory issues.
columns = FileBackedDict()
object_batches = build_prefix_batches(
all_objects, max_batch_size=10000, max_groups_in_batch=5
)
# Single prefix table case (for streams)
if len(all_objects) == 1:
object_batches = [
[PrefixGroup(prefix=all_objects[0], names=[], exact_match=True)]
]
else:
# Build batches for full schema scan
object_batches = build_prefix_batches(
all_objects, max_batch_size=10000, max_groups_in_batch=5
)
# Process batches
for batch_index, object_batch in enumerate(object_batches):
if batch_index > 0:
logger.info(
@ -611,3 +645,63 @@ class SnowflakeDataDictionary(SupportsAsObj):
tags[column_name].append(snowflake_tag)
return tags
@serialized_lru_cache(maxsize=1)
def get_streams_for_database(
self, db_name: str
) -> Dict[str, List[SnowflakeStream]]:
page_limit = SHOW_VIEWS_MAX_PAGE_SIZE
streams: Dict[str, List[SnowflakeStream]] = {}
first_iteration = True
stream_pagination_marker: Optional[str] = None
while first_iteration or stream_pagination_marker is not None:
cur = self.connection.query(
SnowflakeQuery.streams_for_database(
db_name,
limit=page_limit,
stream_pagination_marker=stream_pagination_marker,
)
)
first_iteration = False
stream_pagination_marker = None
result_set_size = 0
for stream in cur:
result_set_size += 1
stream_name = stream["name"]
schema_name = stream["schema_name"]
if schema_name not in streams:
streams[schema_name] = []
streams[stream["schema_name"]].append(
SnowflakeStream(
name=stream["name"],
created=stream["created_on"],
owner=stream["owner"],
comment=stream["comment"],
source_type=stream["source_type"],
type=stream["type"],
stale=stream["stale"],
mode=stream["mode"],
database_name=stream["database_name"],
schema_name=stream["schema_name"],
invalid_reason=stream["invalid_reason"],
owner_role_type=stream["owner_role_type"],
stale_after=stream["stale_after"],
table_name=stream["table_name"],
base_tables=stream["base_tables"],
last_altered=stream["created_on"],
)
)
if result_set_size >= page_limit:
# If we hit the limit, we need to send another request to get the next page.
logger.info(
f"Fetching next page of streams for {db_name} - after {stream_name}"
)
stream_pagination_marker = stream_name
return streams

View File

@ -48,6 +48,7 @@ from datahub.ingestion.source.snowflake.snowflake_schema import (
SnowflakeFK,
SnowflakePK,
SnowflakeSchema,
SnowflakeStream,
SnowflakeTable,
SnowflakeTag,
SnowflakeView,
@ -58,6 +59,7 @@ from datahub.ingestion.source.snowflake.snowflake_utils import (
SnowflakeIdentifierBuilder,
SnowflakeStructuredReportMixin,
SnowsightUrlBuilder,
split_qualified_name,
)
from datahub.ingestion.source.sql.sql_utils import (
add_table_to_schema_container,
@ -70,6 +72,7 @@ from datahub.ingestion.source.sql.sql_utils import (
)
from datahub.ingestion.source_report.ingestion_stage import (
EXTERNAL_TABLE_DDL_LINEAGE,
LINEAGE_EXTRACTION,
METADATA_EXTRACTION,
PROFILING,
)
@ -81,6 +84,7 @@ from datahub.metadata.com.linkedin.pegasus2avro.common import (
TimeStamp,
)
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageTypeClass,
DatasetProperties,
ViewProperties,
)
@ -420,73 +424,120 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin):
schema_name = snowflake_schema.name
if self.config.extract_tags != TagOption.skip:
snowflake_schema.tags = self.tag_extractor.get_tags_on_object(
schema_name=schema_name, db_name=db_name, domain="schema"
)
self._process_tags(snowflake_schema, schema_name, db_name, domain="schema")
if self.config.include_technical_schema:
yield from self.gen_schema_containers(snowflake_schema, db_name)
# We need to do this first so that we can use it when fetching columns.
tables, views, streams = [], [], []
if self.config.include_tables:
tables = self.fetch_tables_for_schema(
snowflake_schema, db_name, schema_name
)
db_tables[schema_name] = tables
yield from self._process_tables(
tables, snowflake_schema, db_name, schema_name
)
if self.config.include_views:
views = self.fetch_views_for_schema(snowflake_schema, db_name, schema_name)
yield from self._process_views(
views, snowflake_schema, db_name, schema_name
)
if self.config.include_tables:
db_tables[schema_name] = tables
if self.config.include_technical_schema:
data_reader = self.make_data_reader()
for table in tables:
table_wu_generator = self._process_table(
table, snowflake_schema, db_name
)
yield from classification_workunit_processor(
table_wu_generator,
self.classification_handler,
data_reader,
[db_name, schema_name, table.name],
)
if self.config.include_views:
if self.aggregator:
for view in views:
view_identifier = self.identifiers.get_dataset_identifier(
view.name, schema_name, db_name
)
if view.is_secure and not view.view_definition:
view.view_definition = self.fetch_secure_view_definition(
view.name, schema_name, db_name
)
if view.view_definition:
self.aggregator.add_view_definition(
view_urn=self.identifiers.gen_dataset_urn(view_identifier),
view_definition=view.view_definition,
default_db=db_name,
default_schema=schema_name,
)
elif view.is_secure:
self.report.num_secure_views_missing_definition += 1
if self.config.include_technical_schema:
for view in views:
yield from self._process_view(view, snowflake_schema, db_name)
if self.config.include_streams:
self.report.num_get_streams_for_schema_queries += 1
streams = self.fetch_streams_for_schema(
snowflake_schema, db_name, schema_name
)
yield from self._process_streams(streams, snowflake_schema, db_name)
if self.config.include_technical_schema and snowflake_schema.tags:
for tag in snowflake_schema.tags:
yield from self._process_tag(tag)
yield from self._process_tags_in_schema(snowflake_schema)
if not snowflake_schema.views and not snowflake_schema.tables:
if (
not snowflake_schema.views
and not snowflake_schema.tables
and not snowflake_schema.streams
):
self.structured_reporter.info(
title="No tables/views found in schema",
message="If tables exist, please grant REFERENCES or SELECT permissions on them.",
title="No tables/views/streams found in schema",
message="If objects exist, please grant REFERENCES or SELECT permissions on them.",
context=f"{db_name}.{schema_name}",
)
def _process_tags(self, snowflake_schema, schema_name, db_name, domain):
snowflake_schema.tags = self.tag_extractor.get_tags_on_object(
schema_name=schema_name, db_name=db_name, domain=domain
)
def _process_tables(
self,
tables: List[SnowflakeTable],
snowflake_schema: SnowflakeSchema,
db_name: str,
schema_name: str,
) -> Iterable[MetadataWorkUnit]:
if self.config.include_technical_schema:
data_reader = self.make_data_reader()
for table in tables:
table_wu_generator = self._process_table(
table, snowflake_schema, db_name
)
yield from classification_workunit_processor(
table_wu_generator,
self.classification_handler,
data_reader,
[db_name, schema_name, table.name],
)
def _process_views(
self,
views: List[SnowflakeView],
snowflake_schema: SnowflakeSchema,
db_name: str,
schema_name: str,
) -> Iterable[MetadataWorkUnit]:
if self.aggregator:
for view in views:
view_identifier = self.identifiers.get_dataset_identifier(
view.name, schema_name, db_name
)
if view.is_secure and not view.view_definition:
view.view_definition = self.fetch_secure_view_definition(
view.name, schema_name, db_name
)
if view.view_definition:
self.aggregator.add_view_definition(
view_urn=self.identifiers.gen_dataset_urn(view_identifier),
view_definition=view.view_definition,
default_db=db_name,
default_schema=schema_name,
)
elif view.is_secure:
self.report.num_secure_views_missing_definition += 1
if self.config.include_technical_schema:
for view in views:
yield from self._process_view(view, snowflake_schema, db_name)
def _process_streams(
self,
streams: List[SnowflakeStream],
snowflake_schema: SnowflakeSchema,
db_name: str,
) -> Iterable[MetadataWorkUnit]:
for stream in streams:
yield from self._process_stream(stream, snowflake_schema, db_name)
def _process_tags_in_schema(
self, snowflake_schema: SnowflakeSchema
) -> Iterable[MetadataWorkUnit]:
if snowflake_schema.tags:
for tag in snowflake_schema.tags:
yield from self._process_tag(tag)
def fetch_secure_view_definition(
self, table_name: str, schema_name: str, db_name: str
) -> Optional[str]:
@ -729,7 +780,7 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin):
def gen_dataset_workunits(
self,
table: Union[SnowflakeTable, SnowflakeView],
table: Union[SnowflakeTable, SnowflakeView, SnowflakeStream],
schema_name: str,
db_name: str,
) -> Iterable[MetadataWorkUnit]:
@ -788,7 +839,9 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin):
subTypes = SubTypes(
typeNames=(
[DatasetSubTypes.VIEW]
[DatasetSubTypes.SNOWFLAKE_STREAM]
if isinstance(table, SnowflakeStream)
else [DatasetSubTypes.VIEW]
if isinstance(table, SnowflakeView)
else [DatasetSubTypes.TABLE]
)
@ -843,28 +896,50 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin):
def get_dataset_properties(
self,
table: Union[SnowflakeTable, SnowflakeView],
table: Union[SnowflakeTable, SnowflakeView, SnowflakeStream],
schema_name: str,
db_name: str,
) -> DatasetProperties:
custom_properties = {}
if isinstance(table, SnowflakeTable):
if table.clustering_key:
custom_properties["CLUSTERING_KEY"] = table.clustering_key
if table.is_hybrid:
custom_properties["IS_HYBRID"] = "true"
if table.is_dynamic:
custom_properties["IS_DYNAMIC"] = "true"
if table.is_iceberg:
custom_properties["IS_ICEBERG"] = "true"
custom_properties.update(
{
k: v
for k, v in {
"CLUSTERING_KEY": table.clustering_key,
"IS_HYBRID": "true" if table.is_hybrid else None,
"IS_DYNAMIC": "true" if table.is_dynamic else None,
"IS_ICEBERG": "true" if table.is_iceberg else None,
}.items()
if v
}
)
if isinstance(table, SnowflakeView) and table.is_secure:
custom_properties["IS_SECURE"] = "true"
elif isinstance(table, SnowflakeStream):
custom_properties.update(
{
k: v
for k, v in {
"SOURCE_TYPE": table.source_type,
"TYPE": table.type,
"STALE": table.stale,
"MODE": table.mode,
"INVALID_REASON": table.invalid_reason,
"OWNER_ROLE_TYPE": table.owner_role_type,
"TABLE_NAME": table.table_name,
"BASE_TABLES": table.base_tables,
"STALE_AFTER": table.stale_after.isoformat()
if table.stale_after
else None,
}.items()
if v
}
)
return DatasetProperties(
name=table.name,
created=(
@ -909,7 +984,9 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin):
).as_workunit()
def gen_column_tags_as_structured_properties(
self, dataset_urn: str, table: Union[SnowflakeTable, SnowflakeView]
self,
dataset_urn: str,
table: Union[SnowflakeTable, SnowflakeView, SnowflakeStream],
) -> Iterable[MetadataWorkUnit]:
for column_name in table.column_tags:
schema_field_urn = SchemaFieldUrn(dataset_urn, column_name).urn()
@ -922,7 +999,7 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin):
def gen_schema_metadata(
self,
table: Union[SnowflakeTable, SnowflakeView],
table: Union[SnowflakeTable, SnowflakeView, SnowflakeStream],
schema_name: str,
db_name: str,
) -> SchemaMetadata:
@ -1214,3 +1291,158 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin):
"External table ddl lineage extraction failed",
exc=e,
)
def fetch_streams_for_schema(
self, snowflake_schema: SnowflakeSchema, db_name: str, schema_name: str
) -> List[SnowflakeStream]:
try:
streams: List[SnowflakeStream] = []
for stream in self.get_streams_for_schema(schema_name, db_name):
stream_identifier = self.identifiers.get_dataset_identifier(
stream.name, schema_name, db_name
)
self.report.report_entity_scanned(stream_identifier, "stream")
if not self.filters.is_dataset_pattern_allowed(
stream_identifier, SnowflakeObjectDomain.STREAM
):
self.report.report_dropped(stream_identifier)
else:
streams.append(stream)
snowflake_schema.streams = [stream.name for stream in streams]
return streams
except Exception as e:
if isinstance(e, SnowflakePermissionError):
error_msg = f"Failed to get streams for schema {db_name}.{schema_name}. Please check permissions."
raise SnowflakePermissionError(error_msg) from e.__cause__
else:
self.structured_reporter.warning(
"Failed to get streams for schema",
f"{db_name}.{schema_name}",
exc=e,
)
return []
def get_streams_for_schema(
self, schema_name: str, db_name: str
) -> List[SnowflakeStream]:
streams = self.data_dictionary.get_streams_for_database(db_name)
return streams.get(schema_name, [])
def _process_stream(
self,
stream: SnowflakeStream,
snowflake_schema: SnowflakeSchema,
db_name: str,
) -> Iterable[MetadataWorkUnit]:
schema_name = snowflake_schema.name
try:
# Retrieve and register the schema without metadata to prevent columns from mapping upstream
stream.columns = self.get_columns_for_stream(stream.table_name)
yield from self.gen_dataset_workunits(stream, schema_name, db_name)
if self.config.include_column_lineage:
with self.report.new_stage(f"*: {LINEAGE_EXTRACTION}"):
self.populate_stream_upstreams(stream, db_name, schema_name)
except Exception as e:
self.structured_reporter.warning(
"Failed to get columns for stream:", stream.name, exc=e
)
def get_columns_for_stream(
self,
source_object: str, # Qualified name of source table/view
) -> List[SnowflakeColumn]:
"""
Get column information for a stream by getting source object columns and adding metadata columns.
Stream includes all columns from source object plus metadata columns like:
- METADATA$ACTION
- METADATA$ISUPDATE
- METADATA$ROW_ID
"""
columns: List[SnowflakeColumn] = []
source_parts = split_qualified_name(source_object)
source_db, source_schema, source_name = source_parts
# Get columns from source object
source_columns = self.data_dictionary.get_columns_for_schema(
source_schema, source_db, itertools.chain([source_name])
).get(source_name, [])
# Add all source columns
columns.extend(source_columns)
# Add standard stream metadata columns
metadata_columns = [
SnowflakeColumn(
name="METADATA$ACTION",
ordinal_position=len(columns) + 1,
is_nullable=False,
data_type="VARCHAR",
comment="Type of DML operation (INSERT/DELETE)",
character_maximum_length=10,
numeric_precision=None,
numeric_scale=None,
),
SnowflakeColumn(
name="METADATA$ISUPDATE",
ordinal_position=len(columns) + 2,
is_nullable=False,
data_type="BOOLEAN",
comment="Whether row is from UPDATE operation",
character_maximum_length=None,
numeric_precision=None,
numeric_scale=None,
),
SnowflakeColumn(
name="METADATA$ROW_ID",
ordinal_position=len(columns) + 3,
is_nullable=False,
data_type="NUMBER",
comment="Unique row identifier",
character_maximum_length=None,
numeric_precision=38,
numeric_scale=0,
),
]
columns.extend(metadata_columns)
return columns
def populate_stream_upstreams(
self, stream: SnowflakeStream, db_name: str, schema_name: str
) -> None:
"""
Populate Streams upstream tables
"""
self.report.num_streams_with_known_upstreams += 1
if self.aggregator:
source_parts = split_qualified_name(stream.table_name)
source_db, source_schema, source_name = source_parts
dataset_identifier = self.identifiers.get_dataset_identifier(
stream.name, schema_name, db_name
)
dataset_urn = self.identifiers.gen_dataset_urn(dataset_identifier)
upstream_identifier = self.identifiers.get_dataset_identifier(
source_name, source_schema, source_db
)
upstream_urn = self.identifiers.gen_dataset_urn(upstream_identifier)
logger.debug(
f"""upstream_urn: {upstream_urn}, downstream_urn: {dataset_urn}"""
)
self.aggregator.add_known_lineage_mapping(
upstream_urn=upstream_urn,
downstream_urn=dataset_urn,
lineage_type=DatasetLineageTypeClass.COPY,
)

View File

@ -124,19 +124,20 @@ class SnowflakeFilter:
SnowflakeObjectDomain.VIEW,
SnowflakeObjectDomain.MATERIALIZED_VIEW,
SnowflakeObjectDomain.ICEBERG_TABLE,
SnowflakeObjectDomain.STREAM,
):
return False
if _is_sys_table(dataset_name):
return False
dataset_params = _split_qualified_name(dataset_name)
dataset_params = split_qualified_name(dataset_name)
if len(dataset_params) != 3:
self.structured_reporter.info(
title="Unexpected dataset pattern",
message=f"Found a {dataset_type} with an unexpected number of parts. Database and schema filtering will not work as expected, but table filtering will still work.",
context=dataset_name,
)
# We fall-through here so table/view filtering still works.
# We fall-through here so table/view/stream filtering still works.
if (
len(dataset_params) >= 1
@ -169,6 +170,14 @@ class SnowflakeFilter:
):
return False
if (
dataset_type.lower() == SnowflakeObjectDomain.STREAM
and not self.filter_config.stream_pattern.allowed(
_cleanup_qualified_name(dataset_name, self.structured_reporter)
)
):
return False
return True
@ -183,17 +192,17 @@ def _is_sys_table(table_name: str) -> bool:
return table_name.lower().startswith("sys$")
def _split_qualified_name(qualified_name: str) -> List[str]:
def split_qualified_name(qualified_name: str) -> List[str]:
"""
Split a qualified name into its constituent parts.
>>> _split_qualified_name("db.my_schema.my_table")
>>> split_qualified_name("db.my_schema.my_table")
['db', 'my_schema', 'my_table']
>>> _split_qualified_name('"db"."my_schema"."my_table"')
>>> split_qualified_name('"db"."my_schema"."my_table"')
['db', 'my_schema', 'my_table']
>>> _split_qualified_name('TEST_DB.TEST_SCHEMA."TABLE.WITH.DOTS"')
>>> split_qualified_name('TEST_DB.TEST_SCHEMA."TABLE.WITH.DOTS"')
['TEST_DB', 'TEST_SCHEMA', 'TABLE.WITH.DOTS']
>>> _split_qualified_name('TEST_DB."SCHEMA.WITH.DOTS".MY_TABLE')
>>> split_qualified_name('TEST_DB."SCHEMA.WITH.DOTS".MY_TABLE')
['TEST_DB', 'SCHEMA.WITH.DOTS', 'MY_TABLE']
"""
@ -231,7 +240,7 @@ def _split_qualified_name(qualified_name: str) -> List[str]:
def _cleanup_qualified_name(
qualified_name: str, structured_reporter: SourceReport
) -> str:
name_parts = _split_qualified_name(qualified_name)
name_parts = split_qualified_name(qualified_name)
if len(name_parts) != 3:
if not _is_sys_table(qualified_name):
structured_reporter.info(

View File

@ -539,15 +539,27 @@ class SnowflakeV2Source(
for schema in db.schemas
for table_name in schema.views
]
discovered_streams: List[str] = [
self.identifiers.get_dataset_identifier(stream_name, schema.name, db.name)
for db in databases
for schema in db.schemas
for stream_name in schema.streams
]
if len(discovered_tables) == 0 and len(discovered_views) == 0:
if (
len(discovered_tables) == 0
and len(discovered_views) == 0
and len(discovered_streams) == 0
):
self.structured_reporter.failure(
GENERIC_PERMISSION_ERROR_KEY,
"No tables/views found. Please check permissions.",
"No tables/views/streams found. Please check permissions.",
)
return
self.discovered_datasets = discovered_tables + discovered_views
self.discovered_datasets = (
discovered_tables + discovered_views + discovered_streams
)
if self.config.use_queries_v2:
with self.report.new_stage(f"*: {VIEW_PARSING}"):

View File

@ -7,9 +7,11 @@ from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import BucketDuration
from datahub.ingestion.source.snowflake import snowflake_query
from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery
from datahub.utilities.prefix_batch_builder import PrefixGroup
NUM_TABLES = 10
NUM_VIEWS = 2
NUM_STREAMS = 1
NUM_COLS = 10
NUM_OPS = 10
NUM_USAGE = 0
@ -175,6 +177,7 @@ def default_query_results( # noqa: C901
query,
num_tables=NUM_TABLES,
num_views=NUM_VIEWS,
num_streams=NUM_STREAMS,
num_cols=NUM_COLS,
num_ops=NUM_OPS,
num_usages=NUM_USAGE,
@ -272,6 +275,27 @@ def default_query_results( # noqa: C901
for view_idx in range(1, num_views + 1)
if is_secure(view_idx)
]
elif query == SnowflakeQuery.columns_for_schema(
"TEST_SCHEMA",
"TEST_DB",
[PrefixGroup(prefix="TABLE_1", names=[], exact_match=True)],
):
return [
{
"TABLE_CATALOG": "TEST_DB",
"TABLE_SCHEMA": "TEST_SCHEMA",
"TABLE_NAME": "TABLE_1",
"COLUMN_NAME": f"COL_{col_idx}",
"ORDINAL_POSITION": col_idx,
"IS_NULLABLE": "NO",
"DATA_TYPE": "TEXT" if col_idx > 1 else "NUMBER",
"COMMENT": "Comment for column",
"CHARACTER_MAXIMUM_LENGTH": 255 if col_idx > 1 else None,
"NUMERIC_PRECISION": None if col_idx > 1 else 38,
"NUMERIC_SCALE": None if col_idx > 1 else 0,
}
for col_idx in range(1, num_cols + 1)
]
elif query == SnowflakeQuery.columns_for_schema("TEST_SCHEMA", "TEST_DB"):
return [
{
@ -293,6 +317,28 @@ def default_query_results( # noqa: C901
)
for col_idx in range(1, num_cols + 1)
]
elif query == SnowflakeQuery.streams_for_database("TEST_DB"):
# TODO: Add tests for stream pagination.
return [
{
"created_on": datetime(2021, 6, 8, 0, 0, 0, 0, tzinfo=timezone.utc),
"name": f"STREAM_{stream_idx}",
"database_name": "TEST_DB",
"schema_name": "TEST_SCHEMA",
"owner": "ACCOUNTADMIN",
"comment": f"Comment for Stream {stream_idx}",
"table_name": f"TEST_DB.TEST_SCHEMA.TABLE_{stream_idx}",
"source_type": "Table",
"base_tables": f"TEST_DB.TEST_SCHEMA.TABLE_{stream_idx}",
"type": "DELTA",
"stale": "false",
"mode": "DEFAULT",
"stale_after": datetime(2021, 6, 22, 0, 0, 0, 0, tzinfo=timezone.utc),
"invalid_reason": None,
"owner_role_type": "ROLE",
}
for stream_idx in range(1, num_streams + 1)
]
elif query in (
SnowflakeQuery.use_database("TEST_DB"),
SnowflakeQuery.show_primary_keys_for_schema("TEST_SCHEMA", "TEST_DB"),

View File

@ -4101,6 +4101,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.stream_1,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-foacth",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.view_2,PROD)",
@ -4117,6 +4133,344 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.stream_1,PROD)",
"changeType": "UPSERT",
"aspectName": "schemaMetadata",
"aspect": {
"json": {
"schemaName": "test_db.test_schema.stream_1",
"platform": "urn:li:dataPlatform:snowflake",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"hash": "",
"platformSchema": {
"com.linkedin.schema.MySqlDDL": {
"tableSchema": ""
}
},
"fields": [
{
"fieldPath": "col_1",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.NumberType": {}
}
},
"nativeDataType": "NUMBER(38,0)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col_2",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col_3",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col_4",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col_5",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col_6",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col_7",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col_8",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col_9",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col_10",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "metadata$action",
"nullable": false,
"description": "Type of DML operation (INSERT/DELETE)",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(10)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "metadata$isupdate",
"nullable": false,
"description": "Whether row is from UPDATE operation",
"type": {
"type": {
"com.linkedin.schema.BooleanType": {}
}
},
"nativeDataType": "BOOLEAN",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "metadata$row_id",
"nullable": false,
"description": "Unique row identifier",
"type": {
"type": {
"com.linkedin.schema.NumberType": {}
}
},
"nativeDataType": "NUMBER(38,0)",
"recursive": false,
"isPartOfKey": false
}
]
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-foacth",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.stream_1,PROD)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:eac598ee71ef1b5e24448d650c08aa5f"
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-foacth",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.stream_1,PROD)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:snowflake",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,instance1)"
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-foacth",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.stream_1,PROD)",
"changeType": "PATCH",
"aspectName": "datasetProperties",
"aspect": {
"json": [
{
"op": "add",
"path": "/name",
"value": "STREAM_1"
},
{
"op": "add",
"path": "/description",
"value": "Comment for Stream 1"
},
{
"op": "add",
"path": "/created",
"value": {
"time": 1623110400000
}
},
{
"op": "add",
"path": "/lastModified",
"value": {
"time": 1623110400000
}
},
{
"op": "add",
"path": "/qualifiedName",
"value": "TEST_DB.TEST_SCHEMA.STREAM_1"
},
{
"op": "add",
"path": "/customProperties/SOURCE_TYPE",
"value": "Table"
},
{
"op": "add",
"path": "/customProperties/TYPE",
"value": "DELTA"
},
{
"op": "add",
"path": "/customProperties/STALE",
"value": "false"
},
{
"op": "add",
"path": "/customProperties/MODE",
"value": "DEFAULT"
},
{
"op": "add",
"path": "/customProperties/OWNER_ROLE_TYPE",
"value": "ROLE"
},
{
"op": "add",
"path": "/customProperties/TABLE_NAME",
"value": "TEST_DB.TEST_SCHEMA.TABLE_1"
},
{
"op": "add",
"path": "/customProperties/BASE_TABLES",
"value": "TEST_DB.TEST_SCHEMA.TABLE_1"
},
{
"op": "add",
"path": "/customProperties/STALE_AFTER",
"value": "2021-06-22T00:00:00+00:00"
}
]
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-ftc9zc",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.stream_1,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Snowflake Stream"
]
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-foacth",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.view_1%2CPROD%29",
@ -4396,6 +4750,35 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.stream_1,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,instance1)",
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,instance1)"
},
{
"id": "urn:li:container:900b1327253068cb1537b1b3c807ddab",
"urn": "urn:li:container:900b1327253068cb1537b1b3c807ddab"
},
{
"id": "urn:li:container:eac598ee71ef1b5e24448d650c08aa5f",
"urn": "urn:li:container:eac598ee71ef1b5e24448d650c08aa5f"
}
]
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-foacth",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.view_2%2CPROD%29",

View File

@ -3729,6 +3729,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.stream_1,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_01_16-14_38_38-5znjnn",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.view_1,PROD)",
@ -3881,6 +3897,279 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.stream_1,PROD)",
"changeType": "UPSERT",
"aspectName": "schemaMetadata",
"aspect": {
"json": {
"schemaName": "test_db.test_schema.stream_1",
"platform": "urn:li:dataPlatform:snowflake",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"hash": "",
"platformSchema": {
"com.linkedin.schema.MySqlDDL": {
"tableSchema": ""
}
},
"fields": [
{
"fieldPath": "col_1",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.NumberType": {}
}
},
"nativeDataType": "NUMBER(38,0)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col_2",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col_3",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col_4",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col_5",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col_6",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col_7",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col_8",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col_9",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col_10",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "metadata$action",
"nullable": false,
"description": "Type of DML operation (INSERT/DELETE)",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(10)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "metadata$isupdate",
"nullable": false,
"description": "Whether row is from UPDATE operation",
"type": {
"type": {
"com.linkedin.schema.BooleanType": {}
}
},
"nativeDataType": "BOOLEAN",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "metadata$row_id",
"nullable": false,
"description": "Unique row identifier",
"type": {
"type": {
"com.linkedin.schema.NumberType": {}
}
},
"nativeDataType": "NUMBER(38,0)",
"recursive": false,
"isPartOfKey": false
}
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_01_16-14_38_38-5znjnn",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.stream_1,PROD)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_01_16-14_38_38-5znjnn",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.stream_1,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Snowflake Stream"
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_01_16-14_38_38-5znjnn",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.stream_1,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"customProperties": {
"SOURCE_TYPE": "Table",
"TYPE": "DELTA",
"STALE": "false",
"MODE": "DEFAULT",
"OWNER_ROLE_TYPE": "ROLE",
"TABLE_NAME": "TEST_DB.TEST_SCHEMA.TABLE_1",
"BASE_TABLES": "TEST_DB.TEST_SCHEMA.TABLE_1",
"STALE_AFTER": "2021-06-22T00:00:00+00:00"
},
"externalUrl": "https://app.snowflake.com/ap-south-1.aws/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/view/STREAM_1/",
"name": "STREAM_1",
"qualifiedName": "TEST_DB.TEST_SCHEMA.STREAM_1",
"description": "Comment for Stream 1",
"created": {
"time": 1623110400000
},
"lastModified": {
"time": 1623110400000
},
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_01_17-13_30_28-vz7bmd",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.view_1%2CPROD%29",
@ -4008,6 +4297,31 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.stream_1,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:5e359958be02ce647cd9ac196dbd4585",
"urn": "urn:li:container:5e359958be02ce647cd9ac196dbd4585"
},
{
"id": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c",
"urn": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c"
}
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_01_16-14_38_38-5znjnn",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.view_2,PROD)",

View File

@ -180,7 +180,8 @@ def test_snowflake_basic(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
cache_info = report.data_dictionary_cache.as_obj()
assert cache_info["get_tables_for_database"]["misses"] == 1
assert cache_info["get_views_for_database"]["misses"] == 1
assert cache_info["get_columns_for_schema"]["misses"] == 1
# When streams query specific tables, the query will not be cached resulting in 2 cache misses
assert cache_info["get_columns_for_schema"]["misses"] == 2
assert cache_info["get_pk_constraints_for_schema"]["misses"] == 1
assert cache_info["get_fk_constraints_for_schema"]["misses"] == 1

View File

@ -143,12 +143,16 @@ def test_snowflake_no_tables_causes_pipeline_failure(
[SnowflakeQuery.tables_for_schema("TEST_SCHEMA", "TEST_DB")],
[],
)
sf_cursor.execute.side_effect = query_permission_response_override(
no_views_fn = query_permission_response_override(
no_tables_fn,
[SnowflakeQuery.show_views_for_database("TEST_DB")],
[],
)
sf_cursor.execute.side_effect = query_permission_response_override(
no_views_fn,
[SnowflakeQuery.streams_for_database("TEST_DB")],
[],
)
pipeline = Pipeline(snowflake_pipeline_config)
pipeline.run()
assert "permission-error" in [