feat(ingest/snowflake): ingest stored procedures (#12929)

Co-authored-by: Sergio Gómez Villamor <sgomezvillamor@gmail.com>
This commit is contained in:
Mayuri Nehate 2025-03-26 20:02:55 +05:30 committed by GitHub
parent 29d05c214a
commit ac9997d970
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
31 changed files with 1905 additions and 92 deletions

View File

@ -69,7 +69,7 @@ class FlowContainerSubTypes(StrEnum):
class JobContainerSubTypes(StrEnum):
NIFI_PROCESS_GROUP = "Process Group"
MSSQL_JOBSTEP = "Job Step"
MSSQL_STORED_PROCEDURE = "Stored Procedure"
STORED_PROCEDURE = "Stored Procedure"
class BIAssetSubTypes(StrEnum):

View File

@ -54,6 +54,7 @@ class SnowflakeObjectDomain(StrEnum):
COLUMN = "column"
ICEBERG_TABLE = "iceberg table"
STREAM = "stream"
PROCEDURE = "procedure"
GENERIC_PERMISSION_ERROR_KEY = "permission-error"

View File

@ -100,7 +100,15 @@ class SnowflakeFilterConfig(SQLFilterConfig):
stream_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for streams to filter in ingestion. Note: Defaults to table_pattern if not specified. Specify regex to match the entire view name in database.schema.view format. e.g. to match all views starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'",
description="Regex patterns for streams to filter in ingestion. Specify regex to match the entire view name in database.schema.view format. e.g. to match all views starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'",
)
procedure_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for procedures to filter in ingestion. "
"Specify regex to match the entire procedure name in database.schema.procedure format. "
"e.g. to match all procedures starting with customer in Customer database and public schema,"
" use the regex 'Customer.public.customer.*'",
)
match_fully_qualified_names: bool = Field(
@ -284,6 +292,11 @@ class SnowflakeV2Config(
description="If enabled, streams will be ingested as separate entities from tables/views.",
)
include_procedures: bool = Field(
default=True,
description="If enabled, procedures will be ingested as pipelines/tasks.",
)
structured_property_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description=(

View File

@ -164,6 +164,23 @@ class SnowflakeQuery:
and table_type in ('BASE TABLE', 'EXTERNAL TABLE')
order by table_schema, table_name"""
@staticmethod
def procedures_for_database(db_name: Optional[str]) -> str:
db_clause = f'"{db_name}".' if db_name is not None else ""
return f"""
SELECT procedure_catalog AS "PROCEDURE_CATALOG",
procedure_schema AS "PROCEDURE_SCHEMA",
procedure_name AS "PROCEDURE_NAME",
procedure_language AS "PROCEDURE_LANGUAGE",
argument_signature AS "ARGUMENT_SIGNATURE",
data_type AS "PROCEDURE_RETURN_TYPE",
procedure_definition AS "PROCEDURE_DEFINITION",
created AS "CREATED",
last_altered AS "LAST_ALTERED",
comment AS "COMMENT"
FROM {db_clause}information_schema.procedures
order by procedure_schema, procedure_name"""
@staticmethod
def get_all_tags():
return """

View File

@ -105,6 +105,7 @@ class SnowflakeV2Report(
databases_scanned: int = 0
tags_scanned: int = 0
streams_scanned: int = 0
procedures_scanned: int = 0
include_usage_stats: bool = False
include_operational_stats: bool = False
@ -163,6 +164,8 @@ class SnowflakeV2Report(
self.tags_scanned += 1
elif ent_type == "stream":
self.streams_scanned += 1
elif ent_type == "procedure":
self.procedures_scanned += 1
else:
raise KeyError(f"Unknown entity {ent_type}.")

View File

@ -14,6 +14,7 @@ from datahub.ingestion.source.snowflake.snowflake_query import (
SnowflakeQuery,
)
from datahub.ingestion.source.sql.sql_generic import BaseColumn, BaseTable, BaseView
from datahub.ingestion.source.sql.stored_procedures.base import BaseProcedure
from datahub.utilities.file_backed_collections import FileBackedDict
from datahub.utilities.prefix_batch_builder import PrefixGroup, build_prefix_batches
from datahub.utilities.serialized_lru_cache import serialized_lru_cache
@ -714,3 +715,31 @@ class SnowflakeDataDictionary(SupportsAsObj):
stream_pagination_marker = stream_name
return streams
@serialized_lru_cache(maxsize=1)
def get_procedures_for_database(
self, db_name: str
) -> Dict[str, List[BaseProcedure]]:
procedures: Dict[str, List[BaseProcedure]] = {}
cur = self.connection.query(
SnowflakeQuery.procedures_for_database(db_name),
)
for procedure in cur:
if procedure["PROCEDURE_SCHEMA"] not in procedures:
procedures[procedure["PROCEDURE_SCHEMA"]] = []
procedures[procedure["PROCEDURE_SCHEMA"]].append(
BaseProcedure(
name=procedure["PROCEDURE_NAME"],
language=procedure["PROCEDURE_LANGUAGE"],
argument_signature=procedure["ARGUMENT_SIGNATURE"],
return_type=procedure["PROCEDURE_RETURN_TYPE"],
procedure_definition=procedure["PROCEDURE_DEFINITION"],
created=procedure["CREATED"],
last_altered=procedure["LAST_ALTERED"],
comment=procedure["COMMENT"],
extra_properties=None,
)
)
return procedures

View File

@ -41,6 +41,7 @@ from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery
from datahub.ingestion.source.snowflake.snowflake_report import SnowflakeV2Report
from datahub.ingestion.source.snowflake.snowflake_schema import (
SCHEMA_PARALLELISM,
BaseProcedure,
SnowflakeColumn,
SnowflakeDatabase,
SnowflakeDataDictionary,
@ -63,12 +64,14 @@ from datahub.ingestion.source.snowflake.snowflake_utils import (
from datahub.ingestion.source.sql.sql_utils import (
add_table_to_schema_container,
gen_database_container,
gen_database_key,
gen_schema_container,
gen_schema_key,
get_dataplatform_instance_aspect,
get_domain_wu,
)
from datahub.ingestion.source.sql.stored_procedures.base import (
generate_procedure_container_workunits,
generate_procedure_workunits,
)
from datahub.ingestion.source_report.ingestion_stage import (
EXTERNAL_TABLE_DDL_LINEAGE,
LINEAGE_EXTRACTION,
@ -448,10 +451,15 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin):
if self.config.include_streams:
self.report.num_get_streams_for_schema_queries += 1
streams = self.fetch_streams_for_schema(
snowflake_schema, db_name, schema_name
snowflake_schema,
db_name,
)
yield from self._process_streams(streams, snowflake_schema, db_name)
if self.config.include_procedures:
procedures = self.fetch_procedures_for_schema(snowflake_schema, db_name)
yield from self._process_procedures(procedures, snowflake_schema, db_name)
if self.config.include_technical_schema and snowflake_schema.tags:
yield from self._process_tags_in_schema(snowflake_schema)
@ -536,6 +544,26 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin):
for stream in streams:
yield from self._process_stream(stream, snowflake_schema, db_name)
def _process_procedures(
self,
procedures: List[BaseProcedure],
snowflake_schema: SnowflakeSchema,
db_name: str,
) -> Iterable[MetadataWorkUnit]:
if self.config.include_technical_schema:
if procedures:
yield from generate_procedure_container_workunits(
self.identifiers.gen_database_key(
db_name,
),
self.identifiers.gen_schema_key(
db_name=db_name,
schema_name=snowflake_schema.name,
),
)
for procedure in procedures:
yield from self._process_procedure(procedure, snowflake_schema, db_name)
def _process_tags_in_schema(
self, snowflake_schema: SnowflakeSchema
) -> Iterable[MetadataWorkUnit]:
@ -819,13 +847,7 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin):
entityUrn=dataset_urn, aspect=dataset_properties
).as_workunit()
schema_container_key = gen_schema_key(
db_name=self.snowflake_identifier(db_name),
schema=self.snowflake_identifier(schema_name),
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
schema_container_key = self.identifiers.gen_schema_key(db_name, schema_name)
if self.config.extract_tags_as_structured_properties:
yield from self.gen_column_tags_as_structured_properties(dataset_urn, table)
@ -1094,11 +1116,8 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin):
def gen_database_containers(
self, database: SnowflakeDatabase
) -> Iterable[MetadataWorkUnit]:
database_container_key = gen_database_key(
self.snowflake_identifier(database.name),
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
database_container_key = self.identifiers.gen_database_key(
database.name,
)
yield from gen_database_container(
@ -1147,21 +1166,9 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin):
def gen_schema_containers(
self, schema: SnowflakeSchema, db_name: str
) -> Iterable[MetadataWorkUnit]:
schema_name = self.snowflake_identifier(schema.name)
database_container_key = gen_database_key(
database=self.snowflake_identifier(db_name),
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
database_container_key = self.identifiers.gen_database_key(db_name)
schema_container_key = gen_schema_key(
db_name=self.snowflake_identifier(db_name),
schema=schema_name,
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
schema_container_key = self.identifiers.gen_schema_key(db_name, schema.name)
yield from gen_schema_container(
name=schema.name,
@ -1290,13 +1297,13 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin):
)
def fetch_streams_for_schema(
self, snowflake_schema: SnowflakeSchema, db_name: str, schema_name: str
self, snowflake_schema: SnowflakeSchema, db_name: str
) -> List[SnowflakeStream]:
try:
streams: List[SnowflakeStream] = []
for stream in self.get_streams_for_schema(schema_name, db_name):
for stream in self.get_streams_for_schema(snowflake_schema.name, db_name):
stream_identifier = self.identifiers.get_dataset_identifier(
stream.name, schema_name, db_name
stream.name, snowflake_schema.name, db_name
)
self.report.report_entity_scanned(stream_identifier, "stream")
@ -1310,16 +1317,15 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin):
snowflake_schema.streams = [stream.name for stream in streams]
return streams
except Exception as e:
if isinstance(e, SnowflakePermissionError):
error_msg = f"Failed to get streams for schema {db_name}.{schema_name}. Please check permissions."
raise SnowflakePermissionError(error_msg) from e.__cause__
else:
self.structured_reporter.warning(
"Failed to get streams for schema",
f"{db_name}.{schema_name}",
exc=e,
)
return []
self.structured_reporter.warning(
title="Failed to get streams for schema",
message="Please check permissions"
if isinstance(e, SnowflakePermissionError)
else "",
context=f"{db_name}.{snowflake_schema.name}",
exc=e,
)
return []
def get_streams_for_schema(
self, schema_name: str, db_name: str
@ -1328,6 +1334,42 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin):
return streams.get(schema_name, [])
def fetch_procedures_for_schema(
self, snowflake_schema: SnowflakeSchema, db_name: str
) -> List[BaseProcedure]:
try:
procedures: List[BaseProcedure] = []
for procedure in self.get_procedures_for_schema(snowflake_schema, db_name):
procedure_qualified_name = self.identifiers.get_dataset_identifier(
procedure.name, snowflake_schema.name, db_name
)
self.report.report_entity_scanned(procedure_qualified_name, "procedure")
if self.filters.is_procedure_allowed(procedure_qualified_name):
procedures.append(procedure)
else:
self.report.report_dropped(procedure_qualified_name)
return procedures
except Exception as e:
self.structured_reporter.warning(
title="Failed to get procedures for schema",
message="Please check permissions"
if isinstance(e, SnowflakePermissionError)
else "",
context=f"{db_name}.{snowflake_schema.name}",
exc=e,
)
return []
def get_procedures_for_schema(
self,
snowflake_schema: SnowflakeSchema,
db_name: str,
) -> List[BaseProcedure]:
procedures = self.data_dictionary.get_procedures_for_database(db_name)
return procedures.get(snowflake_schema.name, [])
def _process_stream(
self,
stream: SnowflakeStream,
@ -1350,6 +1392,34 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin):
"Failed to get columns for stream:", stream.name, exc=e
)
def _process_procedure(
self,
procedure: BaseProcedure,
snowflake_schema: SnowflakeSchema,
db_name: str,
) -> Iterable[MetadataWorkUnit]:
try:
# TODO: For CLL, we should process procedures after all tables are processed
yield from generate_procedure_workunits(
procedure,
database_key=self.identifiers.gen_database_key(
db_name,
),
schema_key=self.identifiers.gen_schema_key(
db_name, snowflake_schema.name
),
schema_resolver=(
self.aggregator._schema_resolver if self.aggregator else None
),
)
except Exception as e:
self.structured_reporter.warning(
title="Failed to ingest stored procedure",
message="",
context=procedure.name,
exc=e,
)
def get_columns_for_stream(
self,
source_object: str, # Qualified name of source table/view

View File

@ -3,7 +3,10 @@ from functools import cached_property
from typing import ClassVar, List, Literal, Optional, Tuple
from datahub.configuration.pattern_utils import is_schema_allowed
from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.emitter.mce_builder import (
make_dataset_urn_with_platform_instance,
)
from datahub.emitter.mcp_builder import DatabaseKey, SchemaKey
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.source.snowflake.constants import (
SNOWFLAKE_REGION_CLOUD_REGION_MAPPING,
@ -16,6 +19,7 @@ from datahub.ingestion.source.snowflake.snowflake_config import (
SnowflakeV2Config,
)
from datahub.ingestion.source.snowflake.snowflake_report import SnowflakeV2Report
from datahub.ingestion.source.sql.sql_utils import gen_database_key, gen_schema_key
class SnowflakeStructuredReportMixin(abc.ABC):
@ -180,6 +184,9 @@ class SnowflakeFilter:
return True
def is_procedure_allowed(self, procedure_name: str) -> bool:
return self.filter_config.procedure_pattern.allowed(procedure_name)
def _combine_identifier_parts(
*, table_name: str, schema_name: str, db_name: str
@ -330,6 +337,23 @@ class SnowflakeIdentifierBuilder:
else user_name
)
def gen_schema_key(self, db_name: str, schema_name: str) -> SchemaKey:
return gen_schema_key(
db_name=self.snowflake_identifier(db_name),
schema=self.snowflake_identifier(schema_name),
platform=self.platform,
platform_instance=self.identifier_config.platform_instance,
env=self.identifier_config.env,
)
def gen_database_key(self, db_name: str) -> DatabaseKey:
return gen_database_key(
database=self.snowflake_identifier(db_name),
platform=self.platform,
platform_instance=self.identifier_config.platform_instance,
env=self.identifier_config.env,
)
class SnowflakeCommonMixin(SnowflakeStructuredReportMixin):
platform = "snowflake"

View File

@ -15,6 +15,7 @@ from datahub.ingestion.source.common.subtypes import (
FlowContainerSubTypes,
JobContainerSubTypes,
)
from datahub.ingestion.source.sql.stored_procedures.base import BaseProcedure
from datahub.metadata.schema_classes import (
ContainerClass,
DataFlowInfoClass,
@ -135,6 +136,19 @@ class StoredProcedure:
def escape_full_name(self) -> str:
return f"[{self.db}].[{self.schema}].[{self.formatted_name}]"
def to_base_procedure(self) -> BaseProcedure:
return BaseProcedure(
name=self.formatted_name,
procedure_definition=self.code,
created=None,
last_altered=None,
comment=None,
argument_signature=None,
return_type=None,
language="SQL",
extra_properties=None,
)
@dataclass
class JobStep:
@ -222,7 +236,7 @@ class MSSQLDataJob:
type = (
JobContainerSubTypes.MSSQL_JOBSTEP
if isinstance(self.entity, JobStep)
else JobContainerSubTypes.MSSQL_STORED_PROCEDURE
else JobContainerSubTypes.STORED_PROCEDURE
)
return SubTypesClass(
typeNames=[type],

View File

@ -37,9 +37,6 @@ from datahub.ingestion.source.sql.mssql.job_models import (
ProcedureParameter,
StoredProcedure,
)
from datahub.ingestion.source.sql.mssql.stored_procedure_lineage import (
generate_procedure_lineage,
)
from datahub.ingestion.source.sql.sql_common import (
SQLAlchemySource,
SqlWorkUnit,
@ -50,6 +47,9 @@ from datahub.ingestion.source.sql.sql_config import (
make_sqlalchemy_uri,
)
from datahub.ingestion.source.sql.sql_report import SQLSourceReport
from datahub.ingestion.source.sql.stored_procedures.base import (
generate_procedure_lineage,
)
from datahub.utilities.file_backed_collections import FileBackedList
logger: logging.Logger = logging.getLogger(__name__)
@ -65,6 +65,8 @@ class SQLServerConfig(BasicSQLAlchemyConfig):
# defaults
host_port: str = Field(default="localhost:1433", description="MSSQL host URL.")
scheme: str = Field(default="mssql+pytds", description="", hidden_from_docs=True)
# TODO: rename to include_procedures ?
include_stored_procedures: bool = Field(
default=True,
description="Include ingest of stored procedures. Requires access to the 'sys' schema.",
@ -763,9 +765,11 @@ class SQLServerSource(SQLAlchemySource):
yield from auto_workunit(
generate_procedure_lineage(
schema_resolver=self.get_schema_resolver(),
procedure=procedure,
procedure=procedure.to_base_procedure(),
procedure_job_urn=MSSQLDataJob(entity=procedure).urn,
is_temp_table=self.is_temp_table,
default_db=procedure.db,
default_schema=procedure.schema,
)
)

View File

@ -0,0 +1,242 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Callable, Dict, Iterable, Optional
from datahub.emitter.mce_builder import (
DEFAULT_ENV,
datahub_guid,
make_data_flow_urn,
make_data_job_urn,
make_data_platform_urn,
make_dataplatform_instance_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import DatabaseKey, SchemaKey
from datahub.ingestion.api.source_helpers import auto_workunit
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.subtypes import (
FlowContainerSubTypes,
JobContainerSubTypes,
)
from datahub.ingestion.source.sql.stored_procedures.lineage import parse_procedure_code
from datahub.metadata.schema_classes import (
ContainerClass,
DataFlowInfoClass,
DataJobInfoClass,
DataPlatformInstanceClass,
DataTransformClass,
DataTransformLogicClass,
QueryStatementClass,
SubTypesClass,
)
from datahub.sql_parsing.schema_resolver import SchemaResolver
@dataclass
class BaseProcedure:
name: str
procedure_definition: Optional[str]
created: Optional[datetime]
last_altered: Optional[datetime]
comment: Optional[str]
argument_signature: Optional[str]
return_type: Optional[str]
language: str
extra_properties: Optional[Dict[str, str]]
def get_procedure_identifier(
self,
) -> str:
if self.argument_signature:
argument_signature_hash = datahub_guid(
dict(argument_signature=self.argument_signature)
)
return f"{self.name}_{argument_signature_hash}"
return self.name
def to_urn(self, database_key: DatabaseKey, schema_key: Optional[SchemaKey]) -> str:
return make_data_job_urn(
orchestrator=database_key.platform,
flow_id=_get_procedure_flow_name(database_key, schema_key),
job_id=self.get_procedure_identifier(),
cluster=database_key.env or DEFAULT_ENV,
platform_instance=database_key.instance,
)
def _generate_flow_workunits(
database_key: DatabaseKey, schema_key: Optional[SchemaKey]
) -> Iterable[MetadataWorkUnit]:
"""Generate flow workunits for database and schema"""
procedure_flow_name = _get_procedure_flow_name(database_key, schema_key)
flow_urn = make_data_flow_urn(
orchestrator=database_key.platform,
flow_id=procedure_flow_name,
cluster=database_key.env or DEFAULT_ENV,
platform_instance=database_key.instance,
)
yield MetadataChangeProposalWrapper(
entityUrn=flow_urn,
aspect=DataFlowInfoClass(
name=procedure_flow_name,
),
).as_workunit()
yield MetadataChangeProposalWrapper(
entityUrn=flow_urn,
aspect=SubTypesClass(
typeNames=[FlowContainerSubTypes.MSSQL_PROCEDURE_CONTAINER],
),
).as_workunit()
if database_key.instance:
yield MetadataChangeProposalWrapper(
entityUrn=flow_urn,
aspect=DataPlatformInstanceClass(
platform=make_data_platform_urn(database_key.platform),
instance=make_dataplatform_instance_urn(
platform=database_key.platform,
instance=database_key.instance,
),
),
).as_workunit()
yield MetadataChangeProposalWrapper(
entityUrn=flow_urn,
aspect=ContainerClass(container=database_key.as_urn()),
).as_workunit()
def _get_procedure_flow_name(
database_key: DatabaseKey, schema_key: Optional[SchemaKey]
) -> str:
if schema_key:
procedure_flow_name = (
f"{schema_key.database}.{schema_key.db_schema}.stored_procedures"
)
else:
procedure_flow_name = f"{database_key.database}.stored_procedures"
return procedure_flow_name
def _generate_job_workunits(
database_key: DatabaseKey,
schema_key: Optional[SchemaKey],
procedure: BaseProcedure,
) -> Iterable[MetadataWorkUnit]:
"""Generate job workunits for database, schema and procedure"""
job_urn = procedure.to_urn(database_key, schema_key)
yield MetadataChangeProposalWrapper(
entityUrn=job_urn,
aspect=DataJobInfoClass(
name=procedure.name,
type=JobContainerSubTypes.STORED_PROCEDURE,
description=procedure.comment,
customProperties=procedure.extra_properties,
),
).as_workunit()
yield MetadataChangeProposalWrapper(
entityUrn=job_urn,
aspect=SubTypesClass(
typeNames=[JobContainerSubTypes.STORED_PROCEDURE],
),
).as_workunit()
if database_key.instance:
yield MetadataChangeProposalWrapper(
entityUrn=job_urn,
aspect=DataPlatformInstanceClass(
platform=make_data_platform_urn(database_key.platform),
instance=make_dataplatform_instance_urn(
platform=database_key.platform,
instance=database_key.instance,
),
),
).as_workunit()
container_key = schema_key or database_key # database_key for 2-tier
yield MetadataChangeProposalWrapper(
entityUrn=job_urn,
aspect=ContainerClass(container=container_key.as_urn()),
).as_workunit()
# TODO: Config whether to ingest procedure code
if procedure.procedure_definition:
yield MetadataChangeProposalWrapper(
entityUrn=job_urn,
aspect=DataTransformLogicClass(
transforms=[
DataTransformClass(
queryStatement=QueryStatementClass(
value=procedure.procedure_definition,
language=procedure.language,
),
)
]
),
).as_workunit()
def generate_procedure_lineage(
*,
schema_resolver: SchemaResolver,
procedure: BaseProcedure,
procedure_job_urn: str,
default_db: Optional[str] = None,
default_schema: Optional[str] = None,
is_temp_table: Callable[[str], bool] = lambda _: False,
raise_: bool = False,
) -> Iterable[MetadataChangeProposalWrapper]:
if procedure.procedure_definition and procedure.language == "SQL":
datajob_input_output = parse_procedure_code(
schema_resolver=schema_resolver,
default_db=default_db,
default_schema=default_schema,
code=procedure.procedure_definition,
is_temp_table=is_temp_table,
raise_=raise_,
)
if datajob_input_output:
yield MetadataChangeProposalWrapper(
entityUrn=procedure_job_urn,
aspect=datajob_input_output,
)
def generate_procedure_container_workunits(
database_key: DatabaseKey,
schema_key: Optional[SchemaKey],
) -> Iterable[MetadataWorkUnit]:
"""Generate container workunits for database and schema"""
yield from _generate_flow_workunits(database_key, schema_key)
def generate_procedure_workunits(
procedure: BaseProcedure,
database_key: DatabaseKey,
schema_key: Optional[SchemaKey],
schema_resolver: Optional[SchemaResolver],
) -> Iterable[MetadataWorkUnit]:
yield from _generate_job_workunits(database_key, schema_key, procedure)
if schema_resolver:
job_urn = procedure.to_urn(database_key, schema_key)
yield from auto_workunit(
generate_procedure_lineage(
schema_resolver=schema_resolver,
procedure=procedure,
procedure_job_urn=job_urn,
default_db=database_key.database,
default_schema=schema_key.db_schema if schema_key else None,
)
)

View File

@ -1,8 +1,6 @@
import logging
from typing import Callable, Iterable, Optional
from typing import Callable, Optional
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.source.sql.mssql.job_models import StoredProcedure
from datahub.metadata.schema_classes import DataJobInputOutputClass
from datahub.sql_parsing.datajob import to_datajob_input_output
from datahub.sql_parsing.schema_resolver import SchemaResolver
@ -56,29 +54,3 @@ def parse_procedure_code(
mcps=mcps,
ignore_extra_mcps=True,
)
# Is procedure handling generic enough to be added to SqlParsingAggregator?
def generate_procedure_lineage(
*,
schema_resolver: SchemaResolver,
procedure: StoredProcedure,
procedure_job_urn: str,
is_temp_table: Callable[[str], bool] = lambda _: False,
raise_: bool = False,
) -> Iterable[MetadataChangeProposalWrapper]:
if procedure.code:
datajob_input_output = parse_procedure_code(
schema_resolver=schema_resolver,
default_db=procedure.db,
default_schema=procedure.schema,
code=procedure.code,
is_temp_table=is_temp_table,
raise_=raise_,
)
if datajob_input_output:
yield MetadataChangeProposalWrapper(
entityUrn=procedure_job_urn,
aspect=datajob_input_output,
)

View File

@ -1,7 +1,9 @@
import logging
import re
from enum import Enum
from typing import Iterator, List, Tuple
logger = logging.getLogger(__name__)
SELECT_KEYWORD = "SELECT"
CASE_KEYWORD = "CASE"
END_KEYWORD = "END"
@ -120,7 +122,9 @@ class _StatementSplitter:
# Reset current_statement-specific state.
self.does_select_mean_new_statement = False
if self.current_case_statements != 0:
breakpoint()
logger.warning(
f"Unexpected END keyword. Current case statements: {self.current_case_statements}"
)
self.current_case_statements = 0
def process(self) -> Iterator[str]:

View File

@ -754,4 +754,31 @@ def default_query_results( # noqa: C901
"DOMAIN": "DATABASE",
},
]
elif query == SnowflakeQuery.procedures_for_database("TEST_DB"):
return [
{
"PROCEDURE_CATALOG": "TEST_DB",
"PROCEDURE_SCHEMA": "TEST_SCHEMA",
"PROCEDURE_NAME": "my_procedure",
"PROCEDURE_LANGUAGE": "SQL",
"ARGUMENT_SIGNATURE": "(arg1 VARCHAR, arg2 VARCHAR)",
"PROCEDURE_RETURN_TYPE": "VARCHAR",
"PROCEDURE_DEFINITION": "BEGIN RETURN 'Hello World'; END",
"CREATED": "2021-01-01T00:00:00.000Z",
"LAST_ALTERED": "2021-01-01T00:00:00.000Z",
"COMMENT": "This is a test procedure",
},
{
"PROCEDURE_CATALOG": "TEST_DB",
"PROCEDURE_SCHEMA": "TEST_SCHEMA",
"PROCEDURE_NAME": "my_procedure",
"PROCEDURE_LANGUAGE": "SQL",
"ARGUMENT_SIGNATURE": "(arg1 VARCHAR)",
"PROCEDURE_RETURN_TYPE": "VARCHAR",
"PROCEDURE_DEFINITION": "BEGIN RETURN 'Hello World'; END",
"CREATED": "2021-01-01T00:00:00.000Z",
"LAST_ALTERED": "2021-01-01T00:00:00.000Z",
"COMMENT": "This is a test procedure 2",
},
]
raise ValueError(f"Unexpected query: {query}")

View File

@ -4268,6 +4268,23 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "test_db.test_schema.stored_procedures"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_25-15_03_32-j8ymga",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)",
@ -4291,6 +4308,61 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Procedures Container"
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_25-15_03_32-j8ymga",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:5e359958be02ce647cd9ac196dbd4585"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_25-15_03_32-j8ymga",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "my_procedure",
"description": "This is a test procedure",
"type": {
"string": "Stored Procedure"
}
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_24-20_29_40-4vdgrh",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)",
@ -4314,6 +4386,105 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:5e359958be02ce647cd9ac196dbd4585",
"urn": "urn:li:container:5e359958be02ce647cd9ac196dbd4585"
}
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_25-15_03_32-j8ymga",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_24-20_29_40-4vdgrh",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Stored Procedure"
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_24-20_29_40-4vdgrh",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)",
"changeType": "UPSERT",
"aspectName": "dataTransformLogic",
"aspect": {
"json": {
"transforms": [
{
"queryStatement": {
"value": "BEGIN RETURN 'Hello World'; END",
"language": "SQL"
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_26-18_25_36-8s62oa",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "my_procedure",
"description": "This is a test procedure 2",
"type": {
"string": "Stored Procedure"
}
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_24-20_29_40-4vdgrh",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_3,PROD)",
@ -4337,6 +4508,113 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_24-20_29_40-4vdgrh",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Stored Procedure"
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_24-20_29_40-4vdgrh",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)",
"changeType": "UPSERT",
"aspectName": "dataTransformLogic",
"aspect": {
"json": {
"transforms": [
{
"queryStatement": {
"value": "BEGIN RETURN 'Hello World'; END",
"language": "SQL"
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_26-18_25_36-8s62oa",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:5e359958be02ce647cd9ac196dbd4585",
"urn": "urn:li:container:5e359958be02ce647cd9ac196dbd4585"
},
{
"id": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c",
"urn": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c"
}
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_24-20_29_40-4vdgrh",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:5e359958be02ce647cd9ac196dbd4585",
"urn": "urn:li:container:5e359958be02ce647cd9ac196dbd4585"
},
{
"id": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c",
"urn": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c"
}
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_24-20_29_40-4vdgrh",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_4,PROD)",
@ -4858,6 +5136,7 @@
"aspectName": "queryProperties",
"aspect": {
"json": {
"customProperties": {},
"statement": {
"value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_1\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2",
"language": "SQL"
@ -5113,6 +5392,7 @@
"aspectName": "queryProperties",
"aspect": {
"json": {
"customProperties": {},
"statement": {
"value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_10\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2",
"language": "SQL"
@ -5362,6 +5642,7 @@
"aspectName": "queryProperties",
"aspect": {
"json": {
"customProperties": {},
"statement": {
"value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_2\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2",
"language": "SQL"
@ -5608,6 +5889,7 @@
"aspectName": "queryProperties",
"aspect": {
"json": {
"customProperties": {},
"statement": {
"value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_3\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2",
"language": "SQL"
@ -5857,6 +6139,7 @@
"aspectName": "queryProperties",
"aspect": {
"json": {
"customProperties": {},
"statement": {
"value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_4\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2",
"language": "SQL"
@ -6106,6 +6389,7 @@
"aspectName": "queryProperties",
"aspect": {
"json": {
"customProperties": {},
"statement": {
"value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_5\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2",
"language": "SQL"
@ -6355,6 +6639,7 @@
"aspectName": "queryProperties",
"aspect": {
"json": {
"customProperties": {},
"statement": {
"value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_6\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2",
"language": "SQL"
@ -6604,6 +6889,7 @@
"aspectName": "queryProperties",
"aspect": {
"json": {
"customProperties": {},
"statement": {
"value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_7\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2",
"language": "SQL"
@ -6853,6 +7139,7 @@
"aspectName": "queryProperties",
"aspect": {
"json": {
"customProperties": {},
"statement": {
"value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_8\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2",
"language": "SQL"
@ -7102,6 +7389,7 @@
"aspectName": "queryProperties",
"aspect": {
"json": {
"customProperties": {},
"statement": {
"value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_9\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2",
"language": "SQL"
@ -7351,6 +7639,7 @@
"aspectName": "queryProperties",
"aspect": {
"json": {
"customProperties": {},
"statement": {
"value": "CREATE VIEW view_1 AS\nSELECT\n *\nFROM table_1",
"language": "SQL"
@ -7630,6 +7919,7 @@
"aspectName": "queryProperties",
"aspect": {
"json": {
"customProperties": {},
"statement": {
"value": "CREATE VIEW view_2 AS\nSELECT\n *\nFROM table_2",
"language": "SQL"
@ -8357,6 +8647,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_25-15_03_32-j8ymga",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:1d69efa7115792a07468c8b3846a4c0b3ba8620c1f2263374c2efbb2e56e6200",
@ -8421,6 +8727,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_24-20_29_40-4vdgrh",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:a79e59b5f5248b9247f88eaecbe3a296f788634282edc9cdca80ba1bfb504f37",
@ -8485,6 +8807,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_24-20_29_40-4vdgrh",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:d37a930ca6d2dd7100fd81bbf3d96a8cfe9f30e3469de363650d7e3146c3e4e8",

View File

@ -4486,6 +4486,23 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "test_db.test_schema.stored_procedures"
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-2xyj8f",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.view_1%2CPROD%29",
@ -4493,6 +4510,7 @@
"aspectName": "queryProperties",
"aspect": {
"json": {
"customProperties": {},
"statement": {
"value": "CREATE VIEW view_1 AS\nSELECT\n *\nFROM table_1",
"language": "SQL"
@ -4514,6 +4532,78 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Procedures Container"
]
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-2xyj8f",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "my_procedure",
"description": "This is a test procedure",
"type": {
"string": "Stored Procedure"
}
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-tcd1vn",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:900b1327253068cb1537b1b3c807ddab"
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-2xyj8f",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:snowflake",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,instance1)"
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-2xyj8f",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.view_1%2CPROD%29",
@ -4597,6 +4687,258 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,instance1)",
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,instance1)"
},
{
"id": "urn:li:container:900b1327253068cb1537b1b3c807ddab",
"urn": "urn:li:container:900b1327253068cb1537b1b3c807ddab"
}
]
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-2xyj8f",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Stored Procedure"
]
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-tcd1vn",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:eac598ee71ef1b5e24448d650c08aa5f"
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-tcd1vn",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)",
"changeType": "UPSERT",
"aspectName": "dataTransformLogic",
"aspect": {
"json": {
"transforms": [
{
"queryStatement": {
"value": "BEGIN RETURN 'Hello World'; END",
"language": "SQL"
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-hyhd0l",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "my_procedure",
"description": "This is a test procedure 2",
"type": {
"string": "Stored Procedure"
}
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-tcd1vn",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:snowflake",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,instance1)"
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-tcd1vn",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Stored Procedure"
]
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-tcd1vn",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:eac598ee71ef1b5e24448d650c08aa5f"
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-tcd1vn",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,instance1)",
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,instance1)"
},
{
"id": "urn:li:container:900b1327253068cb1537b1b3c807ddab",
"urn": "urn:li:container:900b1327253068cb1537b1b3c807ddab"
},
{
"id": "urn:li:container:eac598ee71ef1b5e24448d650c08aa5f",
"urn": "urn:li:container:eac598ee71ef1b5e24448d650c08aa5f"
}
]
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-tcd1vn",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)",
"changeType": "UPSERT",
"aspectName": "dataTransformLogic",
"aspect": {
"json": {
"transforms": [
{
"queryStatement": {
"value": "BEGIN RETURN 'Hello World'; END",
"language": "SQL"
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-hyhd0l",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:snowflake",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,instance1)"
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-tcd1vn",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,instance1)",
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,instance1)"
},
{
"id": "urn:li:container:900b1327253068cb1537b1b3c807ddab",
"urn": "urn:li:container:900b1327253068cb1537b1b3c807ddab"
},
{
"id": "urn:li:container:eac598ee71ef1b5e24448d650c08aa5f",
"urn": "urn:li:container:eac598ee71ef1b5e24448d650c08aa5f"
}
]
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-tcd1vn",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.view_1%2CPROD%29",
@ -4801,6 +5143,7 @@
"aspectName": "queryProperties",
"aspect": {
"json": {
"customProperties": {},
"statement": {
"value": "CREATE VIEW view_2 AS\nSELECT\n *\nFROM table_2",
"language": "SQL"
@ -4921,6 +5264,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-2xyj8f",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.view_1%2CPROD%29",
@ -4952,5 +5311,37 @@
"runId": "snowflake-2022_06_07-17_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-tcd1vn",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00-tcd1vn",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -140,8 +140,7 @@
"lastModified": {
"time": 1615443388097,
"actor": "urn:li:corpuser:datahub"
},
"filterStatus": "DISABLED"
}
}
},
"systemMetadata": {
@ -2996,8 +2995,7 @@
"lastModified": {
"time": 1615443388097,
"actor": "urn:li:corpuser:datahub"
},
"filterStatus": "DISABLED"
}
}
},
"systemMetadata": {
@ -3337,8 +3335,7 @@
"lastModified": {
"time": 1615443388097,
"actor": "urn:li:corpuser:datahub"
},
"filterStatus": "DISABLED"
}
}
},
"systemMetadata": {
@ -3367,8 +3364,7 @@
"lastModified": {
"time": 1615443388097,
"actor": "urn:li:corpuser:datahub"
},
"filterStatus": "DISABLED"
}
}
},
"systemMetadata": {
@ -3397,8 +3393,7 @@
"lastModified": {
"time": 1615443388097,
"actor": "urn:li:corpuser:datahub"
},
"filterStatus": "DISABLED"
}
}
},
"systemMetadata": {
@ -4173,6 +4168,23 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "test_db.test_schema.stored_procedures"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_25-15_03_55-tlf3vq",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.view_1%2CPROD%29",
@ -4180,6 +4192,7 @@
"aspectName": "queryProperties",
"aspect": {
"json": {
"customProperties": {},
"statement": {
"value": "create view view_1 as select * from table_1",
"language": "SQL"
@ -4201,6 +4214,61 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Procedures Container"
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_25-15_03_55-tlf3vq",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:5e359958be02ce647cd9ac196dbd4585"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_25-15_03_55-tlf3vq",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "my_procedure",
"description": "This is a test procedure",
"type": {
"string": "Stored Procedure"
}
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_24-20_29_59-jvnw7v",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.view_1%2CPROD%29",
@ -4284,6 +4352,105 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:5e359958be02ce647cd9ac196dbd4585",
"urn": "urn:li:container:5e359958be02ce647cd9ac196dbd4585"
}
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_25-15_03_55-tlf3vq",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_24-20_29_59-jvnw7v",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Stored Procedure"
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_24-20_29_59-jvnw7v",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)",
"changeType": "UPSERT",
"aspectName": "dataTransformLogic",
"aspect": {
"json": {
"transforms": [
{
"queryStatement": {
"value": "BEGIN RETURN 'Hello World'; END",
"language": "SQL"
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_26-18_25_51-lc2w8p",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "my_procedure",
"description": "This is a test procedure 2",
"type": {
"string": "Stored Procedure"
}
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_24-20_29_59-jvnw7v",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.view_1%2CPROD%29",
@ -4300,6 +4467,113 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_24-20_29_59-jvnw7v",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Stored Procedure"
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_24-20_29_59-jvnw7v",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)",
"changeType": "UPSERT",
"aspectName": "dataTransformLogic",
"aspect": {
"json": {
"transforms": [
{
"queryStatement": {
"value": "BEGIN RETURN 'Hello World'; END",
"language": "SQL"
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_26-18_25_51-lc2w8p",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:5e359958be02ce647cd9ac196dbd4585",
"urn": "urn:li:container:5e359958be02ce647cd9ac196dbd4585"
},
{
"id": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c",
"urn": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c"
}
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_24-20_29_59-jvnw7v",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:5e359958be02ce647cd9ac196dbd4585",
"urn": "urn:li:container:5e359958be02ce647cd9ac196dbd4585"
},
{
"id": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c",
"urn": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c"
}
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_24-20_29_59-jvnw7v",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.stream_1,PROD)",
@ -4484,6 +4758,7 @@
"aspectName": "queryProperties",
"aspect": {
"json": {
"customProperties": {},
"statement": {
"value": "create view view_2 as select * from table_2",
"language": "SQL"
@ -4604,6 +4879,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_25-15_03_55-tlf3vq",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.view_1%2CPROD%29",
@ -4668,6 +4959,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_24-20_29_59-jvnw7v",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "structuredProperty",
"entityUrn": "urn:li:structuredProperty:snowflake.test_db.test_schema.my_tag_0",
@ -4731,5 +5038,21 @@
"runId": "snowflake-2025_01_07-13_38_56-3fo398",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_03_24-20_29_59-jvnw7v",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -7,7 +7,7 @@ from pathlib import Path
import pytest
from datahub.ingestion.source.sql.mssql.job_models import StoredProcedure
from datahub.ingestion.source.sql.mssql.stored_procedure_lineage import (
from datahub.ingestion.source.sql.stored_procedures.base import (
generate_procedure_lineage,
)
from datahub.sql_parsing.schema_resolver import SchemaResolver
@ -97,9 +97,11 @@ def test_stored_procedure_lineage(procedure_sql_file: str) -> None:
mcps = list(
generate_procedure_lineage(
schema_resolver=schema_resolver,
procedure=procedure,
procedure=procedure.to_base_procedure(),
procedure_job_urn=data_job_urn,
is_temp_table=lambda name: "temp" in name.lower(),
default_db=procedure.db,
default_schema=procedure.schema,
)
)
mce_helpers.check_goldens_stream(

View File

@ -178,3 +178,26 @@ WHERE
statements = [statement.strip() for statement in split_statements(test_sql)]
expected = [test_sql]
assert statements == expected
def test_split_statement_with_merge_query_fails():
test_sql = """\
MERGE INTO myTable AS t
USING myTable2 AS s
ON t.a = s.a
WHEN MATCHED THEN
UPDATE SET t.b = s.b
WHEN NOT MATCHED THEN
INSERT (a, b) VALUES (s.a, s.b)"""
statements = [statement.strip() for statement in split_statements(test_sql)]
expected = [test_sql]
assert statements != expected
assert statements == [
"""MERGE INTO myTable AS t
USING myTable2 AS s
ON t.a = s.a
WHEN MATCHED THEN""",
"""UPDATE SET t.b = s.b
WHEN NOT MATCHED THEN""",
"""INSERT (a, b) VALUES (s.a, s.b)""",
]

View File

@ -0,0 +1,42 @@
[
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,default_db.default_schema.stored_procedures,PROD),procedure_with_lineage.sql)",
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.public.processed_transactions,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.public.new_table_1,PROD)"
],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.public.processed_transactions,PROD),id)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.public.new_table_1,PROD),id)"
],
"confidenceScore": 0.2
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.public.processed_transactions,PROD),value)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.public.new_table_1,PROD),value)"
],
"confidenceScore": 0.2
}
]
}
}
}
]

View File

@ -0,0 +1,44 @@
[
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,default_db.default_schema.stored_procedures,PROD),procedure_with_multi_statements.sql)",
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.source_table3,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.source_table2,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.target_table_delete,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.target_table_insert,PROD)"
],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.source_table2,PROD),id)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.target_table_insert,PROD),id)"
],
"confidenceScore": 0.2
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.source_table2,PROD),column2)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.target_table_insert,PROD),column2)"
],
"confidenceScore": 0.2
}
]
}
}
}
]

View File

@ -0,0 +1,42 @@
[
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,default_db.default_schema.stored_procedures,PROD),procedure_with_temp_lineage.sql)",
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.public.processed_transactions,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.public.new_table_2,PROD)"
],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.public.processed_transactions,PROD),id)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.public.new_table_2,PROD),id)"
],
"confidenceScore": 0.2
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.public.processed_transactions,PROD),value)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.public.new_table_2,PROD),value)"
],
"confidenceScore": 0.2
}
]
}
}
}
]

View File

@ -0,0 +1,53 @@
[
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,default_db.default_schema.stored_procedures,PROD),procedure_with_transaction.sql)",
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake.account_usage.query_history,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.query_history,PROD)"
],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake.account_usage.query_history,PROD),query_id)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.query_history,PROD),query_id)"
],
"confidenceScore": 0.2
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake.account_usage.query_history,PROD),start_time)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.query_history,PROD),start_time)"
],
"confidenceScore": 0.2
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake.account_usage.query_history,PROD),user)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.query_history,PROD),user)"
],
"confidenceScore": 0.2
}
]
}
}
}
]

View File

@ -0,0 +1,5 @@
BEGIN
CREATE TABLE TEST_DB.PUBLIC.new_table_1 as select id, value from TEST_DB.PUBLIC.processed_transactions;
RETURN message;
END;

View File

@ -0,0 +1,25 @@
BEGIN
-- merge query with when matched and when not matched
MERGE INTO target_table tgt
USING (
SELECT id, column1
FROM source_table1
) src
ON (tgt.id = src.id)
WHEN MATCHED THEN
UPDATE SET
tgt.column1 = src.column1
WHEN NOT MATCHED THEN
INSERT (id, column1)
VALUES (src.id, src.column1);
INSERT INTO target_table_insert (id, column2)
SELECT id, column2
FROM source_table2;
DELETE FROM target_table_delete
WHERE id IN (
SELECT id
FROM source_table3
)
END

View File

@ -0,0 +1,19 @@
BEGIN
-- merge query with when matched and when not matched
MERGE INTO target_table tgt
USING (
SELECT s1.id, s1.column1, s2.column2, s3.column3
FROM source_table1 s1
JOIN source_table2 s2 ON s1.id = s2.id
JOIN source_table3 s3 ON s1.id = s3.id
) src
ON (tgt.id = src.id)
WHEN MATCHED THEN
UPDATE SET
tgt.column1 = src.column1,
tgt.column2 = src.column2,
tgt.column3 = src.column3
WHEN NOT MATCHED THEN
INSERT (id, column1, column2, column3)
VALUES (src.id, src.column1, src.column2, src.column3)
END

View File

@ -0,0 +1,5 @@
BEGIN
CREATE TEMPORARY TABLE temp1 as select id, value from TEST_DB.PUBLIC.processed_transactions;
CREATE TABLE TEST_DB.PUBLIC.new_table_2 as select * from temp1;
RETURN message;
END

View File

@ -0,0 +1,14 @@
BEGIN
-- snowflake stored procedure with begin transaction and commit, write to table in transaction
BEGIN transaction
insert into "QUERY_HISTORY"
SELECT query_id, start_time, user FROM "SNOWFLAKE"."ACCOUNT_USAGE"."QUERY_HISTORY";
COMMIT
RETURN 'Success';
EXCEPTION
WHEN OTHER THEN
ROLLBACK;
raise;
--RETURN 'Error';
END;

View File

@ -0,0 +1,64 @@
import pathlib
from pathlib import Path
import pytest
from datahub.ingestion.source.sql.stored_procedures.base import (
BaseProcedure,
generate_procedure_lineage,
)
from datahub.sql_parsing.schema_resolver import SchemaResolver
from tests.test_helpers import mce_helpers
PROCEDURE_SQLS_DIR = pathlib.Path(__file__).parent / "procedures"
PROCEDURES_GOLDEN_DIR = pathlib.Path(__file__).parent / "golden_files"
platforms = [folder.name for folder in PROCEDURE_SQLS_DIR.iterdir() if folder.is_dir()]
procedure_sqls = [
(platform, sql_file.name, f"{platform}/{sql_file.name}")
for platform in platforms
for sql_file in PROCEDURE_SQLS_DIR.glob(f"{platform}/*.sql")
]
@pytest.mark.parametrize("platform, sql_file_name, procedure_sql_file", procedure_sqls)
@pytest.mark.integration
def test_stored_procedure_lineage(
platform: str, sql_file_name: str, procedure_sql_file: str
) -> None:
sql_file_path = PROCEDURE_SQLS_DIR / procedure_sql_file
procedure_code = sql_file_path.read_text()
name = sql_file_name
db = "default_db"
schema = "default_schema"
procedure = BaseProcedure(
name=name,
procedure_definition=procedure_code,
created=None,
last_altered=None,
comment=None,
argument_signature=None,
return_type=None,
language="SQL",
extra_properties=None,
)
data_job_urn = f"urn:li:dataJob:(urn:li:dataFlow:({platform},{db}.{schema}.stored_procedures,PROD),{name})"
schema_resolver = SchemaResolver(platform=platform)
mcps = list(
generate_procedure_lineage(
schema_resolver=schema_resolver,
procedure=procedure,
procedure_job_urn=data_job_urn,
default_db=db,
default_schema=schema,
)
)
mce_helpers.check_goldens_stream(
outputs=mcps,
golden_path=(
PROCEDURES_GOLDEN_DIR / Path(procedure_sql_file).with_suffix(".json")
),
)