diff --git a/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py b/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py index 17230ea152..ee63a4bd6d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py @@ -69,7 +69,7 @@ class FlowContainerSubTypes(StrEnum): class JobContainerSubTypes(StrEnum): NIFI_PROCESS_GROUP = "Process Group" MSSQL_JOBSTEP = "Job Step" - MSSQL_STORED_PROCEDURE = "Stored Procedure" + STORED_PROCEDURE = "Stored Procedure" class BIAssetSubTypes(StrEnum): diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/constants.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/constants.py index 84ccd2c67c..f9bd7933a3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/constants.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/constants.py @@ -54,6 +54,7 @@ class SnowflakeObjectDomain(StrEnum): COLUMN = "column" ICEBERG_TABLE = "iceberg table" STREAM = "stream" + PROCEDURE = "procedure" GENERIC_PERMISSION_ERROR_KEY = "permission-error" diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py index 94ce51c031..c5250521e0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py @@ -100,7 +100,15 @@ class SnowflakeFilterConfig(SQLFilterConfig): stream_pattern: AllowDenyPattern = Field( default=AllowDenyPattern.allow_all(), - description="Regex patterns for streams to filter in ingestion. Note: Defaults to table_pattern if not specified. Specify regex to match the entire view name in database.schema.view format. e.g. to match all views starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'", + description="Regex patterns for streams to filter in ingestion. Specify regex to match the entire view name in database.schema.view format. e.g. to match all views starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'", + ) + + procedure_pattern: AllowDenyPattern = Field( + default=AllowDenyPattern.allow_all(), + description="Regex patterns for procedures to filter in ingestion. " + "Specify regex to match the entire procedure name in database.schema.procedure format. " + "e.g. to match all procedures starting with customer in Customer database and public schema," + " use the regex 'Customer.public.customer.*'", ) match_fully_qualified_names: bool = Field( @@ -284,6 +292,11 @@ class SnowflakeV2Config( description="If enabled, streams will be ingested as separate entities from tables/views.", ) + include_procedures: bool = Field( + default=True, + description="If enabled, procedures will be ingested as pipelines/tasks.", + ) + structured_property_pattern: AllowDenyPattern = Field( default=AllowDenyPattern.allow_all(), description=( diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py index 139ce02c18..235117d7d8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py @@ -164,6 +164,23 @@ class SnowflakeQuery: and table_type in ('BASE TABLE', 'EXTERNAL TABLE') order by table_schema, table_name""" + @staticmethod + def procedures_for_database(db_name: Optional[str]) -> str: + db_clause = f'"{db_name}".' if db_name is not None else "" + return f""" + SELECT procedure_catalog AS "PROCEDURE_CATALOG", + procedure_schema AS "PROCEDURE_SCHEMA", + procedure_name AS "PROCEDURE_NAME", + procedure_language AS "PROCEDURE_LANGUAGE", + argument_signature AS "ARGUMENT_SIGNATURE", + data_type AS "PROCEDURE_RETURN_TYPE", + procedure_definition AS "PROCEDURE_DEFINITION", + created AS "CREATED", + last_altered AS "LAST_ALTERED", + comment AS "COMMENT" + FROM {db_clause}information_schema.procedures + order by procedure_schema, procedure_name""" + @staticmethod def get_all_tags(): return """ diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py index bd558bc72f..f7e3f74494 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py @@ -105,6 +105,7 @@ class SnowflakeV2Report( databases_scanned: int = 0 tags_scanned: int = 0 streams_scanned: int = 0 + procedures_scanned: int = 0 include_usage_stats: bool = False include_operational_stats: bool = False @@ -163,6 +164,8 @@ class SnowflakeV2Report( self.tags_scanned += 1 elif ent_type == "stream": self.streams_scanned += 1 + elif ent_type == "procedure": + self.procedures_scanned += 1 else: raise KeyError(f"Unknown entity {ent_type}.") diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py index 2701411da6..38a8295a96 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py @@ -14,6 +14,7 @@ from datahub.ingestion.source.snowflake.snowflake_query import ( SnowflakeQuery, ) from datahub.ingestion.source.sql.sql_generic import BaseColumn, BaseTable, BaseView +from datahub.ingestion.source.sql.stored_procedures.base import BaseProcedure from datahub.utilities.file_backed_collections import FileBackedDict from datahub.utilities.prefix_batch_builder import PrefixGroup, build_prefix_batches from datahub.utilities.serialized_lru_cache import serialized_lru_cache @@ -714,3 +715,31 @@ class SnowflakeDataDictionary(SupportsAsObj): stream_pagination_marker = stream_name return streams + + @serialized_lru_cache(maxsize=1) + def get_procedures_for_database( + self, db_name: str + ) -> Dict[str, List[BaseProcedure]]: + procedures: Dict[str, List[BaseProcedure]] = {} + cur = self.connection.query( + SnowflakeQuery.procedures_for_database(db_name), + ) + + for procedure in cur: + if procedure["PROCEDURE_SCHEMA"] not in procedures: + procedures[procedure["PROCEDURE_SCHEMA"]] = [] + + procedures[procedure["PROCEDURE_SCHEMA"]].append( + BaseProcedure( + name=procedure["PROCEDURE_NAME"], + language=procedure["PROCEDURE_LANGUAGE"], + argument_signature=procedure["ARGUMENT_SIGNATURE"], + return_type=procedure["PROCEDURE_RETURN_TYPE"], + procedure_definition=procedure["PROCEDURE_DEFINITION"], + created=procedure["CREATED"], + last_altered=procedure["LAST_ALTERED"], + comment=procedure["COMMENT"], + extra_properties=None, + ) + ) + return procedures diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py index 03f83a4e73..08d4bbe118 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py @@ -41,6 +41,7 @@ from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery from datahub.ingestion.source.snowflake.snowflake_report import SnowflakeV2Report from datahub.ingestion.source.snowflake.snowflake_schema import ( SCHEMA_PARALLELISM, + BaseProcedure, SnowflakeColumn, SnowflakeDatabase, SnowflakeDataDictionary, @@ -63,12 +64,14 @@ from datahub.ingestion.source.snowflake.snowflake_utils import ( from datahub.ingestion.source.sql.sql_utils import ( add_table_to_schema_container, gen_database_container, - gen_database_key, gen_schema_container, - gen_schema_key, get_dataplatform_instance_aspect, get_domain_wu, ) +from datahub.ingestion.source.sql.stored_procedures.base import ( + generate_procedure_container_workunits, + generate_procedure_workunits, +) from datahub.ingestion.source_report.ingestion_stage import ( EXTERNAL_TABLE_DDL_LINEAGE, LINEAGE_EXTRACTION, @@ -448,10 +451,15 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin): if self.config.include_streams: self.report.num_get_streams_for_schema_queries += 1 streams = self.fetch_streams_for_schema( - snowflake_schema, db_name, schema_name + snowflake_schema, + db_name, ) yield from self._process_streams(streams, snowflake_schema, db_name) + if self.config.include_procedures: + procedures = self.fetch_procedures_for_schema(snowflake_schema, db_name) + yield from self._process_procedures(procedures, snowflake_schema, db_name) + if self.config.include_technical_schema and snowflake_schema.tags: yield from self._process_tags_in_schema(snowflake_schema) @@ -536,6 +544,26 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin): for stream in streams: yield from self._process_stream(stream, snowflake_schema, db_name) + def _process_procedures( + self, + procedures: List[BaseProcedure], + snowflake_schema: SnowflakeSchema, + db_name: str, + ) -> Iterable[MetadataWorkUnit]: + if self.config.include_technical_schema: + if procedures: + yield from generate_procedure_container_workunits( + self.identifiers.gen_database_key( + db_name, + ), + self.identifiers.gen_schema_key( + db_name=db_name, + schema_name=snowflake_schema.name, + ), + ) + for procedure in procedures: + yield from self._process_procedure(procedure, snowflake_schema, db_name) + def _process_tags_in_schema( self, snowflake_schema: SnowflakeSchema ) -> Iterable[MetadataWorkUnit]: @@ -819,13 +847,7 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin): entityUrn=dataset_urn, aspect=dataset_properties ).as_workunit() - schema_container_key = gen_schema_key( - db_name=self.snowflake_identifier(db_name), - schema=self.snowflake_identifier(schema_name), - platform=self.platform, - platform_instance=self.config.platform_instance, - env=self.config.env, - ) + schema_container_key = self.identifiers.gen_schema_key(db_name, schema_name) if self.config.extract_tags_as_structured_properties: yield from self.gen_column_tags_as_structured_properties(dataset_urn, table) @@ -1094,11 +1116,8 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin): def gen_database_containers( self, database: SnowflakeDatabase ) -> Iterable[MetadataWorkUnit]: - database_container_key = gen_database_key( - self.snowflake_identifier(database.name), - platform=self.platform, - platform_instance=self.config.platform_instance, - env=self.config.env, + database_container_key = self.identifiers.gen_database_key( + database.name, ) yield from gen_database_container( @@ -1147,21 +1166,9 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin): def gen_schema_containers( self, schema: SnowflakeSchema, db_name: str ) -> Iterable[MetadataWorkUnit]: - schema_name = self.snowflake_identifier(schema.name) - database_container_key = gen_database_key( - database=self.snowflake_identifier(db_name), - platform=self.platform, - platform_instance=self.config.platform_instance, - env=self.config.env, - ) + database_container_key = self.identifiers.gen_database_key(db_name) - schema_container_key = gen_schema_key( - db_name=self.snowflake_identifier(db_name), - schema=schema_name, - platform=self.platform, - platform_instance=self.config.platform_instance, - env=self.config.env, - ) + schema_container_key = self.identifiers.gen_schema_key(db_name, schema.name) yield from gen_schema_container( name=schema.name, @@ -1290,13 +1297,13 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin): ) def fetch_streams_for_schema( - self, snowflake_schema: SnowflakeSchema, db_name: str, schema_name: str + self, snowflake_schema: SnowflakeSchema, db_name: str ) -> List[SnowflakeStream]: try: streams: List[SnowflakeStream] = [] - for stream in self.get_streams_for_schema(schema_name, db_name): + for stream in self.get_streams_for_schema(snowflake_schema.name, db_name): stream_identifier = self.identifiers.get_dataset_identifier( - stream.name, schema_name, db_name + stream.name, snowflake_schema.name, db_name ) self.report.report_entity_scanned(stream_identifier, "stream") @@ -1310,16 +1317,15 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin): snowflake_schema.streams = [stream.name for stream in streams] return streams except Exception as e: - if isinstance(e, SnowflakePermissionError): - error_msg = f"Failed to get streams for schema {db_name}.{schema_name}. Please check permissions." - raise SnowflakePermissionError(error_msg) from e.__cause__ - else: - self.structured_reporter.warning( - "Failed to get streams for schema", - f"{db_name}.{schema_name}", - exc=e, - ) - return [] + self.structured_reporter.warning( + title="Failed to get streams for schema", + message="Please check permissions" + if isinstance(e, SnowflakePermissionError) + else "", + context=f"{db_name}.{snowflake_schema.name}", + exc=e, + ) + return [] def get_streams_for_schema( self, schema_name: str, db_name: str @@ -1328,6 +1334,42 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin): return streams.get(schema_name, []) + def fetch_procedures_for_schema( + self, snowflake_schema: SnowflakeSchema, db_name: str + ) -> List[BaseProcedure]: + try: + procedures: List[BaseProcedure] = [] + for procedure in self.get_procedures_for_schema(snowflake_schema, db_name): + procedure_qualified_name = self.identifiers.get_dataset_identifier( + procedure.name, snowflake_schema.name, db_name + ) + self.report.report_entity_scanned(procedure_qualified_name, "procedure") + + if self.filters.is_procedure_allowed(procedure_qualified_name): + procedures.append(procedure) + else: + self.report.report_dropped(procedure_qualified_name) + return procedures + except Exception as e: + self.structured_reporter.warning( + title="Failed to get procedures for schema", + message="Please check permissions" + if isinstance(e, SnowflakePermissionError) + else "", + context=f"{db_name}.{snowflake_schema.name}", + exc=e, + ) + return [] + + def get_procedures_for_schema( + self, + snowflake_schema: SnowflakeSchema, + db_name: str, + ) -> List[BaseProcedure]: + procedures = self.data_dictionary.get_procedures_for_database(db_name) + + return procedures.get(snowflake_schema.name, []) + def _process_stream( self, stream: SnowflakeStream, @@ -1350,6 +1392,34 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin): "Failed to get columns for stream:", stream.name, exc=e ) + def _process_procedure( + self, + procedure: BaseProcedure, + snowflake_schema: SnowflakeSchema, + db_name: str, + ) -> Iterable[MetadataWorkUnit]: + try: + # TODO: For CLL, we should process procedures after all tables are processed + yield from generate_procedure_workunits( + procedure, + database_key=self.identifiers.gen_database_key( + db_name, + ), + schema_key=self.identifiers.gen_schema_key( + db_name, snowflake_schema.name + ), + schema_resolver=( + self.aggregator._schema_resolver if self.aggregator else None + ), + ) + except Exception as e: + self.structured_reporter.warning( + title="Failed to ingest stored procedure", + message="", + context=procedure.name, + exc=e, + ) + def get_columns_for_stream( self, source_object: str, # Qualified name of source table/view diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py index 1e95597061..5b56bf9a2d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py @@ -3,7 +3,10 @@ from functools import cached_property from typing import ClassVar, List, Literal, Optional, Tuple from datahub.configuration.pattern_utils import is_schema_allowed -from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance +from datahub.emitter.mce_builder import ( + make_dataset_urn_with_platform_instance, +) +from datahub.emitter.mcp_builder import DatabaseKey, SchemaKey from datahub.ingestion.api.source import SourceReport from datahub.ingestion.source.snowflake.constants import ( SNOWFLAKE_REGION_CLOUD_REGION_MAPPING, @@ -16,6 +19,7 @@ from datahub.ingestion.source.snowflake.snowflake_config import ( SnowflakeV2Config, ) from datahub.ingestion.source.snowflake.snowflake_report import SnowflakeV2Report +from datahub.ingestion.source.sql.sql_utils import gen_database_key, gen_schema_key class SnowflakeStructuredReportMixin(abc.ABC): @@ -180,6 +184,9 @@ class SnowflakeFilter: return True + def is_procedure_allowed(self, procedure_name: str) -> bool: + return self.filter_config.procedure_pattern.allowed(procedure_name) + def _combine_identifier_parts( *, table_name: str, schema_name: str, db_name: str @@ -330,6 +337,23 @@ class SnowflakeIdentifierBuilder: else user_name ) + def gen_schema_key(self, db_name: str, schema_name: str) -> SchemaKey: + return gen_schema_key( + db_name=self.snowflake_identifier(db_name), + schema=self.snowflake_identifier(schema_name), + platform=self.platform, + platform_instance=self.identifier_config.platform_instance, + env=self.identifier_config.env, + ) + + def gen_database_key(self, db_name: str) -> DatabaseKey: + return gen_database_key( + database=self.snowflake_identifier(db_name), + platform=self.platform, + platform_instance=self.identifier_config.platform_instance, + env=self.identifier_config.env, + ) + class SnowflakeCommonMixin(SnowflakeStructuredReportMixin): platform = "snowflake" diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/job_models.py b/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/job_models.py index f30f898325..2398453a9d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/job_models.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/job_models.py @@ -15,6 +15,7 @@ from datahub.ingestion.source.common.subtypes import ( FlowContainerSubTypes, JobContainerSubTypes, ) +from datahub.ingestion.source.sql.stored_procedures.base import BaseProcedure from datahub.metadata.schema_classes import ( ContainerClass, DataFlowInfoClass, @@ -135,6 +136,19 @@ class StoredProcedure: def escape_full_name(self) -> str: return f"[{self.db}].[{self.schema}].[{self.formatted_name}]" + def to_base_procedure(self) -> BaseProcedure: + return BaseProcedure( + name=self.formatted_name, + procedure_definition=self.code, + created=None, + last_altered=None, + comment=None, + argument_signature=None, + return_type=None, + language="SQL", + extra_properties=None, + ) + @dataclass class JobStep: @@ -222,7 +236,7 @@ class MSSQLDataJob: type = ( JobContainerSubTypes.MSSQL_JOBSTEP if isinstance(self.entity, JobStep) - else JobContainerSubTypes.MSSQL_STORED_PROCEDURE + else JobContainerSubTypes.STORED_PROCEDURE ) return SubTypesClass( typeNames=[type], diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py b/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py index ed53c34a1a..adcd20408d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py @@ -37,9 +37,6 @@ from datahub.ingestion.source.sql.mssql.job_models import ( ProcedureParameter, StoredProcedure, ) -from datahub.ingestion.source.sql.mssql.stored_procedure_lineage import ( - generate_procedure_lineage, -) from datahub.ingestion.source.sql.sql_common import ( SQLAlchemySource, SqlWorkUnit, @@ -50,6 +47,9 @@ from datahub.ingestion.source.sql.sql_config import ( make_sqlalchemy_uri, ) from datahub.ingestion.source.sql.sql_report import SQLSourceReport +from datahub.ingestion.source.sql.stored_procedures.base import ( + generate_procedure_lineage, +) from datahub.utilities.file_backed_collections import FileBackedList logger: logging.Logger = logging.getLogger(__name__) @@ -65,6 +65,8 @@ class SQLServerConfig(BasicSQLAlchemyConfig): # defaults host_port: str = Field(default="localhost:1433", description="MSSQL host URL.") scheme: str = Field(default="mssql+pytds", description="", hidden_from_docs=True) + + # TODO: rename to include_procedures ? include_stored_procedures: bool = Field( default=True, description="Include ingest of stored procedures. Requires access to the 'sys' schema.", @@ -763,9 +765,11 @@ class SQLServerSource(SQLAlchemySource): yield from auto_workunit( generate_procedure_lineage( schema_resolver=self.get_schema_resolver(), - procedure=procedure, + procedure=procedure.to_base_procedure(), procedure_job_urn=MSSQLDataJob(entity=procedure).urn, is_temp_table=self.is_temp_table, + default_db=procedure.db, + default_schema=procedure.schema, ) ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/stored_procedures/__init__.py b/metadata-ingestion/src/datahub/ingestion/source/sql/stored_procedures/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/stored_procedures/base.py b/metadata-ingestion/src/datahub/ingestion/source/sql/stored_procedures/base.py new file mode 100644 index 0000000000..c694632817 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/stored_procedures/base.py @@ -0,0 +1,242 @@ +from dataclasses import dataclass +from datetime import datetime +from typing import Callable, Dict, Iterable, Optional + +from datahub.emitter.mce_builder import ( + DEFAULT_ENV, + datahub_guid, + make_data_flow_urn, + make_data_job_urn, + make_data_platform_urn, + make_dataplatform_instance_urn, +) +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.mcp_builder import DatabaseKey, SchemaKey +from datahub.ingestion.api.source_helpers import auto_workunit +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.common.subtypes import ( + FlowContainerSubTypes, + JobContainerSubTypes, +) +from datahub.ingestion.source.sql.stored_procedures.lineage import parse_procedure_code +from datahub.metadata.schema_classes import ( + ContainerClass, + DataFlowInfoClass, + DataJobInfoClass, + DataPlatformInstanceClass, + DataTransformClass, + DataTransformLogicClass, + QueryStatementClass, + SubTypesClass, +) +from datahub.sql_parsing.schema_resolver import SchemaResolver + + +@dataclass +class BaseProcedure: + name: str + procedure_definition: Optional[str] + created: Optional[datetime] + last_altered: Optional[datetime] + comment: Optional[str] + argument_signature: Optional[str] + return_type: Optional[str] + language: str + extra_properties: Optional[Dict[str, str]] + + def get_procedure_identifier( + self, + ) -> str: + if self.argument_signature: + argument_signature_hash = datahub_guid( + dict(argument_signature=self.argument_signature) + ) + return f"{self.name}_{argument_signature_hash}" + + return self.name + + def to_urn(self, database_key: DatabaseKey, schema_key: Optional[SchemaKey]) -> str: + return make_data_job_urn( + orchestrator=database_key.platform, + flow_id=_get_procedure_flow_name(database_key, schema_key), + job_id=self.get_procedure_identifier(), + cluster=database_key.env or DEFAULT_ENV, + platform_instance=database_key.instance, + ) + + +def _generate_flow_workunits( + database_key: DatabaseKey, schema_key: Optional[SchemaKey] +) -> Iterable[MetadataWorkUnit]: + """Generate flow workunits for database and schema""" + + procedure_flow_name = _get_procedure_flow_name(database_key, schema_key) + + flow_urn = make_data_flow_urn( + orchestrator=database_key.platform, + flow_id=procedure_flow_name, + cluster=database_key.env or DEFAULT_ENV, + platform_instance=database_key.instance, + ) + + yield MetadataChangeProposalWrapper( + entityUrn=flow_urn, + aspect=DataFlowInfoClass( + name=procedure_flow_name, + ), + ).as_workunit() + + yield MetadataChangeProposalWrapper( + entityUrn=flow_urn, + aspect=SubTypesClass( + typeNames=[FlowContainerSubTypes.MSSQL_PROCEDURE_CONTAINER], + ), + ).as_workunit() + + if database_key.instance: + yield MetadataChangeProposalWrapper( + entityUrn=flow_urn, + aspect=DataPlatformInstanceClass( + platform=make_data_platform_urn(database_key.platform), + instance=make_dataplatform_instance_urn( + platform=database_key.platform, + instance=database_key.instance, + ), + ), + ).as_workunit() + + yield MetadataChangeProposalWrapper( + entityUrn=flow_urn, + aspect=ContainerClass(container=database_key.as_urn()), + ).as_workunit() + + +def _get_procedure_flow_name( + database_key: DatabaseKey, schema_key: Optional[SchemaKey] +) -> str: + if schema_key: + procedure_flow_name = ( + f"{schema_key.database}.{schema_key.db_schema}.stored_procedures" + ) + else: + procedure_flow_name = f"{database_key.database}.stored_procedures" + return procedure_flow_name + + +def _generate_job_workunits( + database_key: DatabaseKey, + schema_key: Optional[SchemaKey], + procedure: BaseProcedure, +) -> Iterable[MetadataWorkUnit]: + """Generate job workunits for database, schema and procedure""" + + job_urn = procedure.to_urn(database_key, schema_key) + + yield MetadataChangeProposalWrapper( + entityUrn=job_urn, + aspect=DataJobInfoClass( + name=procedure.name, + type=JobContainerSubTypes.STORED_PROCEDURE, + description=procedure.comment, + customProperties=procedure.extra_properties, + ), + ).as_workunit() + + yield MetadataChangeProposalWrapper( + entityUrn=job_urn, + aspect=SubTypesClass( + typeNames=[JobContainerSubTypes.STORED_PROCEDURE], + ), + ).as_workunit() + + if database_key.instance: + yield MetadataChangeProposalWrapper( + entityUrn=job_urn, + aspect=DataPlatformInstanceClass( + platform=make_data_platform_urn(database_key.platform), + instance=make_dataplatform_instance_urn( + platform=database_key.platform, + instance=database_key.instance, + ), + ), + ).as_workunit() + + container_key = schema_key or database_key # database_key for 2-tier + yield MetadataChangeProposalWrapper( + entityUrn=job_urn, + aspect=ContainerClass(container=container_key.as_urn()), + ).as_workunit() + + # TODO: Config whether to ingest procedure code + if procedure.procedure_definition: + yield MetadataChangeProposalWrapper( + entityUrn=job_urn, + aspect=DataTransformLogicClass( + transforms=[ + DataTransformClass( + queryStatement=QueryStatementClass( + value=procedure.procedure_definition, + language=procedure.language, + ), + ) + ] + ), + ).as_workunit() + + +def generate_procedure_lineage( + *, + schema_resolver: SchemaResolver, + procedure: BaseProcedure, + procedure_job_urn: str, + default_db: Optional[str] = None, + default_schema: Optional[str] = None, + is_temp_table: Callable[[str], bool] = lambda _: False, + raise_: bool = False, +) -> Iterable[MetadataChangeProposalWrapper]: + if procedure.procedure_definition and procedure.language == "SQL": + datajob_input_output = parse_procedure_code( + schema_resolver=schema_resolver, + default_db=default_db, + default_schema=default_schema, + code=procedure.procedure_definition, + is_temp_table=is_temp_table, + raise_=raise_, + ) + + if datajob_input_output: + yield MetadataChangeProposalWrapper( + entityUrn=procedure_job_urn, + aspect=datajob_input_output, + ) + + +def generate_procedure_container_workunits( + database_key: DatabaseKey, + schema_key: Optional[SchemaKey], +) -> Iterable[MetadataWorkUnit]: + """Generate container workunits for database and schema""" + + yield from _generate_flow_workunits(database_key, schema_key) + + +def generate_procedure_workunits( + procedure: BaseProcedure, + database_key: DatabaseKey, + schema_key: Optional[SchemaKey], + schema_resolver: Optional[SchemaResolver], +) -> Iterable[MetadataWorkUnit]: + yield from _generate_job_workunits(database_key, schema_key, procedure) + + if schema_resolver: + job_urn = procedure.to_urn(database_key, schema_key) + + yield from auto_workunit( + generate_procedure_lineage( + schema_resolver=schema_resolver, + procedure=procedure, + procedure_job_urn=job_urn, + default_db=database_key.database, + default_schema=schema_key.db_schema if schema_key else None, + ) + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/stored_procedure_lineage.py b/metadata-ingestion/src/datahub/ingestion/source/sql/stored_procedures/lineage.py similarity index 63% rename from metadata-ingestion/src/datahub/ingestion/source/sql/mssql/stored_procedure_lineage.py rename to metadata-ingestion/src/datahub/ingestion/source/sql/stored_procedures/lineage.py index b979a270a5..1efa569a8f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/stored_procedure_lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/stored_procedures/lineage.py @@ -1,8 +1,6 @@ import logging -from typing import Callable, Iterable, Optional +from typing import Callable, Optional -from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.ingestion.source.sql.mssql.job_models import StoredProcedure from datahub.metadata.schema_classes import DataJobInputOutputClass from datahub.sql_parsing.datajob import to_datajob_input_output from datahub.sql_parsing.schema_resolver import SchemaResolver @@ -56,29 +54,3 @@ def parse_procedure_code( mcps=mcps, ignore_extra_mcps=True, ) - - -# Is procedure handling generic enough to be added to SqlParsingAggregator? -def generate_procedure_lineage( - *, - schema_resolver: SchemaResolver, - procedure: StoredProcedure, - procedure_job_urn: str, - is_temp_table: Callable[[str], bool] = lambda _: False, - raise_: bool = False, -) -> Iterable[MetadataChangeProposalWrapper]: - if procedure.code: - datajob_input_output = parse_procedure_code( - schema_resolver=schema_resolver, - default_db=procedure.db, - default_schema=procedure.schema, - code=procedure.code, - is_temp_table=is_temp_table, - raise_=raise_, - ) - - if datajob_input_output: - yield MetadataChangeProposalWrapper( - entityUrn=procedure_job_urn, - aspect=datajob_input_output, - ) diff --git a/metadata-ingestion/src/datahub/sql_parsing/split_statements.py b/metadata-ingestion/src/datahub/sql_parsing/split_statements.py index a8d87d37d2..dfaf946d0d 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/split_statements.py +++ b/metadata-ingestion/src/datahub/sql_parsing/split_statements.py @@ -1,7 +1,9 @@ +import logging import re from enum import Enum from typing import Iterator, List, Tuple +logger = logging.getLogger(__name__) SELECT_KEYWORD = "SELECT" CASE_KEYWORD = "CASE" END_KEYWORD = "END" @@ -120,7 +122,9 @@ class _StatementSplitter: # Reset current_statement-specific state. self.does_select_mean_new_statement = False if self.current_case_statements != 0: - breakpoint() + logger.warning( + f"Unexpected END keyword. Current case statements: {self.current_case_statements}" + ) self.current_case_statements = 0 def process(self) -> Iterator[str]: diff --git a/metadata-ingestion/tests/integration/snowflake/common.py b/metadata-ingestion/tests/integration/snowflake/common.py index bf83dd8148..b83b573da5 100644 --- a/metadata-ingestion/tests/integration/snowflake/common.py +++ b/metadata-ingestion/tests/integration/snowflake/common.py @@ -754,4 +754,31 @@ def default_query_results( # noqa: C901 "DOMAIN": "DATABASE", }, ] + elif query == SnowflakeQuery.procedures_for_database("TEST_DB"): + return [ + { + "PROCEDURE_CATALOG": "TEST_DB", + "PROCEDURE_SCHEMA": "TEST_SCHEMA", + "PROCEDURE_NAME": "my_procedure", + "PROCEDURE_LANGUAGE": "SQL", + "ARGUMENT_SIGNATURE": "(arg1 VARCHAR, arg2 VARCHAR)", + "PROCEDURE_RETURN_TYPE": "VARCHAR", + "PROCEDURE_DEFINITION": "BEGIN RETURN 'Hello World'; END", + "CREATED": "2021-01-01T00:00:00.000Z", + "LAST_ALTERED": "2021-01-01T00:00:00.000Z", + "COMMENT": "This is a test procedure", + }, + { + "PROCEDURE_CATALOG": "TEST_DB", + "PROCEDURE_SCHEMA": "TEST_SCHEMA", + "PROCEDURE_NAME": "my_procedure", + "PROCEDURE_LANGUAGE": "SQL", + "ARGUMENT_SIGNATURE": "(arg1 VARCHAR)", + "PROCEDURE_RETURN_TYPE": "VARCHAR", + "PROCEDURE_DEFINITION": "BEGIN RETURN 'Hello World'; END", + "CREATED": "2021-01-01T00:00:00.000Z", + "LAST_ALTERED": "2021-01-01T00:00:00.000Z", + "COMMENT": "This is a test procedure 2", + }, + ] raise ValueError(f"Unexpected query: {query}") diff --git a/metadata-ingestion/tests/integration/snowflake/snowflake_golden.json b/metadata-ingestion/tests/integration/snowflake/snowflake_golden.json index 311682df83..9434580906 100644 --- a/metadata-ingestion/tests/integration/snowflake/snowflake_golden.json +++ b/metadata-ingestion/tests/integration/snowflake/snowflake_golden.json @@ -4268,6 +4268,23 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "json": { + "customProperties": {}, + "name": "test_db.test_schema.stored_procedures" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_25-15_03_32-j8ymga", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)", @@ -4291,6 +4308,61 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Procedures Container" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_25-15_03_32-j8ymga", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:5e359958be02ce647cd9ac196dbd4585" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_25-15_03_32-j8ymga", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": {}, + "name": "my_procedure", + "description": "This is a test procedure", + "type": { + "string": "Stored Procedure" + } + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_24-20_29_40-4vdgrh", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)", @@ -4314,6 +4386,105 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:5e359958be02ce647cd9ac196dbd4585", + "urn": "urn:li:container:5e359958be02ce647cd9ac196dbd4585" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_25-15_03_32-j8ymga", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_24-20_29_40-4vdgrh", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Stored Procedure" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_24-20_29_40-4vdgrh", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)", + "changeType": "UPSERT", + "aspectName": "dataTransformLogic", + "aspect": { + "json": { + "transforms": [ + { + "queryStatement": { + "value": "BEGIN RETURN 'Hello World'; END", + "language": "SQL" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_26-18_25_36-8s62oa", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": {}, + "name": "my_procedure", + "description": "This is a test procedure 2", + "type": { + "string": "Stored Procedure" + } + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_24-20_29_40-4vdgrh", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_3,PROD)", @@ -4337,6 +4508,113 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_24-20_29_40-4vdgrh", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Stored Procedure" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_24-20_29_40-4vdgrh", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)", + "changeType": "UPSERT", + "aspectName": "dataTransformLogic", + "aspect": { + "json": { + "transforms": [ + { + "queryStatement": { + "value": "BEGIN RETURN 'Hello World'; END", + "language": "SQL" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_26-18_25_36-8s62oa", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:5e359958be02ce647cd9ac196dbd4585", + "urn": "urn:li:container:5e359958be02ce647cd9ac196dbd4585" + }, + { + "id": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c", + "urn": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_24-20_29_40-4vdgrh", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:5e359958be02ce647cd9ac196dbd4585", + "urn": "urn:li:container:5e359958be02ce647cd9ac196dbd4585" + }, + { + "id": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c", + "urn": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_24-20_29_40-4vdgrh", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_4,PROD)", @@ -4858,6 +5136,7 @@ "aspectName": "queryProperties", "aspect": { "json": { + "customProperties": {}, "statement": { "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_1\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2", "language": "SQL" @@ -5113,6 +5392,7 @@ "aspectName": "queryProperties", "aspect": { "json": { + "customProperties": {}, "statement": { "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_10\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2", "language": "SQL" @@ -5362,6 +5642,7 @@ "aspectName": "queryProperties", "aspect": { "json": { + "customProperties": {}, "statement": { "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_2\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2", "language": "SQL" @@ -5608,6 +5889,7 @@ "aspectName": "queryProperties", "aspect": { "json": { + "customProperties": {}, "statement": { "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_3\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2", "language": "SQL" @@ -5857,6 +6139,7 @@ "aspectName": "queryProperties", "aspect": { "json": { + "customProperties": {}, "statement": { "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_4\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2", "language": "SQL" @@ -6106,6 +6389,7 @@ "aspectName": "queryProperties", "aspect": { "json": { + "customProperties": {}, "statement": { "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_5\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2", "language": "SQL" @@ -6355,6 +6639,7 @@ "aspectName": "queryProperties", "aspect": { "json": { + "customProperties": {}, "statement": { "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_6\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2", "language": "SQL" @@ -6604,6 +6889,7 @@ "aspectName": "queryProperties", "aspect": { "json": { + "customProperties": {}, "statement": { "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_7\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2", "language": "SQL" @@ -6853,6 +7139,7 @@ "aspectName": "queryProperties", "aspect": { "json": { + "customProperties": {}, "statement": { "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_8\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2", "language": "SQL" @@ -7102,6 +7389,7 @@ "aspectName": "queryProperties", "aspect": { "json": { + "customProperties": {}, "statement": { "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_9\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2", "language": "SQL" @@ -7351,6 +7639,7 @@ "aspectName": "queryProperties", "aspect": { "json": { + "customProperties": {}, "statement": { "value": "CREATE VIEW view_1 AS\nSELECT\n *\nFROM table_1", "language": "SQL" @@ -7630,6 +7919,7 @@ "aspectName": "queryProperties", "aspect": { "json": { + "customProperties": {}, "statement": { "value": "CREATE VIEW view_2 AS\nSELECT\n *\nFROM table_2", "language": "SQL" @@ -8357,6 +8647,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_25-15_03_32-j8ymga", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "query", "entityUrn": "urn:li:query:1d69efa7115792a07468c8b3846a4c0b3ba8620c1f2263374c2efbb2e56e6200", @@ -8421,6 +8727,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_24-20_29_40-4vdgrh", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "query", "entityUrn": "urn:li:query:a79e59b5f5248b9247f88eaecbe3a296f788634282edc9cdca80ba1bfb504f37", @@ -8485,6 +8807,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_24-20_29_40-4vdgrh", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "query", "entityUrn": "urn:li:query:d37a930ca6d2dd7100fd81bbf3d96a8cfe9f30e3469de363650d7e3146c3e4e8", diff --git a/metadata-ingestion/tests/integration/snowflake/snowflake_privatelink_golden.json b/metadata-ingestion/tests/integration/snowflake/snowflake_privatelink_golden.json index ccd4564df1..2319f2455e 100644 --- a/metadata-ingestion/tests/integration/snowflake/snowflake_privatelink_golden.json +++ b/metadata-ingestion/tests/integration/snowflake/snowflake_privatelink_golden.json @@ -4486,6 +4486,23 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "json": { + "customProperties": {}, + "name": "test_db.test_schema.stored_procedures" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00-2xyj8f", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "query", "entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.view_1%2CPROD%29", @@ -4493,6 +4510,7 @@ "aspectName": "queryProperties", "aspect": { "json": { + "customProperties": {}, "statement": { "value": "CREATE VIEW view_1 AS\nSELECT\n *\nFROM table_1", "language": "SQL" @@ -4514,6 +4532,78 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Procedures Container" + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00-2xyj8f", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": {}, + "name": "my_procedure", + "description": "This is a test procedure", + "type": { + "string": "Stored Procedure" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00-tcd1vn", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:900b1327253068cb1537b1b3c807ddab" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00-2xyj8f", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,instance1)" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00-2xyj8f", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "query", "entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.view_1%2CPROD%29", @@ -4597,6 +4687,258 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,instance1)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,instance1)" + }, + { + "id": "urn:li:container:900b1327253068cb1537b1b3c807ddab", + "urn": "urn:li:container:900b1327253068cb1537b1b3c807ddab" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00-2xyj8f", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Stored Procedure" + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00-tcd1vn", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:eac598ee71ef1b5e24448d650c08aa5f" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00-tcd1vn", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)", + "changeType": "UPSERT", + "aspectName": "dataTransformLogic", + "aspect": { + "json": { + "transforms": [ + { + "queryStatement": { + "value": "BEGIN RETURN 'Hello World'; END", + "language": "SQL" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00-hyhd0l", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": {}, + "name": "my_procedure", + "description": "This is a test procedure 2", + "type": { + "string": "Stored Procedure" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00-tcd1vn", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,instance1)" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00-tcd1vn", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Stored Procedure" + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00-tcd1vn", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:eac598ee71ef1b5e24448d650c08aa5f" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00-tcd1vn", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,instance1)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,instance1)" + }, + { + "id": "urn:li:container:900b1327253068cb1537b1b3c807ddab", + "urn": "urn:li:container:900b1327253068cb1537b1b3c807ddab" + }, + { + "id": "urn:li:container:eac598ee71ef1b5e24448d650c08aa5f", + "urn": "urn:li:container:eac598ee71ef1b5e24448d650c08aa5f" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00-tcd1vn", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)", + "changeType": "UPSERT", + "aspectName": "dataTransformLogic", + "aspect": { + "json": { + "transforms": [ + { + "queryStatement": { + "value": "BEGIN RETURN 'Hello World'; END", + "language": "SQL" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00-hyhd0l", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,instance1)" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00-tcd1vn", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,instance1)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,instance1)" + }, + { + "id": "urn:li:container:900b1327253068cb1537b1b3c807ddab", + "urn": "urn:li:container:900b1327253068cb1537b1b3c807ddab" + }, + { + "id": "urn:li:container:eac598ee71ef1b5e24448d650c08aa5f", + "urn": "urn:li:container:eac598ee71ef1b5e24448d650c08aa5f" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00-tcd1vn", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "query", "entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.view_1%2CPROD%29", @@ -4801,6 +5143,7 @@ "aspectName": "queryProperties", "aspect": { "json": { + "customProperties": {}, "statement": { "value": "CREATE VIEW view_2 AS\nSELECT\n *\nFROM table_2", "language": "SQL" @@ -4921,6 +5264,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00-2xyj8f", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "query", "entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cinstance1.test_db.test_schema.view_1%2CPROD%29", @@ -4952,5 +5311,37 @@ "runId": "snowflake-2022_06_07-17_00_00", "lastRunId": "no-run-id-provided" } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00-tcd1vn", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,instance1.test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00-tcd1vn", + "lastRunId": "no-run-id-provided" + } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/snowflake/snowflake_structured_properties_golden.json b/metadata-ingestion/tests/integration/snowflake/snowflake_structured_properties_golden.json index 7e4885de4e..f6c32b788e 100644 --- a/metadata-ingestion/tests/integration/snowflake/snowflake_structured_properties_golden.json +++ b/metadata-ingestion/tests/integration/snowflake/snowflake_structured_properties_golden.json @@ -140,8 +140,7 @@ "lastModified": { "time": 1615443388097, "actor": "urn:li:corpuser:datahub" - }, - "filterStatus": "DISABLED" + } } }, "systemMetadata": { @@ -2996,8 +2995,7 @@ "lastModified": { "time": 1615443388097, "actor": "urn:li:corpuser:datahub" - }, - "filterStatus": "DISABLED" + } } }, "systemMetadata": { @@ -3337,8 +3335,7 @@ "lastModified": { "time": 1615443388097, "actor": "urn:li:corpuser:datahub" - }, - "filterStatus": "DISABLED" + } } }, "systemMetadata": { @@ -3367,8 +3364,7 @@ "lastModified": { "time": 1615443388097, "actor": "urn:li:corpuser:datahub" - }, - "filterStatus": "DISABLED" + } } }, "systemMetadata": { @@ -3397,8 +3393,7 @@ "lastModified": { "time": 1615443388097, "actor": "urn:li:corpuser:datahub" - }, - "filterStatus": "DISABLED" + } } }, "systemMetadata": { @@ -4173,6 +4168,23 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "json": { + "customProperties": {}, + "name": "test_db.test_schema.stored_procedures" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_25-15_03_55-tlf3vq", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "query", "entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.view_1%2CPROD%29", @@ -4180,6 +4192,7 @@ "aspectName": "queryProperties", "aspect": { "json": { + "customProperties": {}, "statement": { "value": "create view view_1 as select * from table_1", "language": "SQL" @@ -4201,6 +4214,61 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Procedures Container" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_25-15_03_55-tlf3vq", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:5e359958be02ce647cd9ac196dbd4585" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_25-15_03_55-tlf3vq", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": {}, + "name": "my_procedure", + "description": "This is a test procedure", + "type": { + "string": "Stored Procedure" + } + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_24-20_29_59-jvnw7v", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "query", "entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.view_1%2CPROD%29", @@ -4284,6 +4352,105 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:5e359958be02ce647cd9ac196dbd4585", + "urn": "urn:li:container:5e359958be02ce647cd9ac196dbd4585" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_25-15_03_55-tlf3vq", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_24-20_29_59-jvnw7v", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Stored Procedure" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_24-20_29_59-jvnw7v", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)", + "changeType": "UPSERT", + "aspectName": "dataTransformLogic", + "aspect": { + "json": { + "transforms": [ + { + "queryStatement": { + "value": "BEGIN RETURN 'Hello World'; END", + "language": "SQL" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_26-18_25_51-lc2w8p", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": {}, + "name": "my_procedure", + "description": "This is a test procedure 2", + "type": { + "string": "Stored Procedure" + } + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_24-20_29_59-jvnw7v", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "query", "entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.view_1%2CPROD%29", @@ -4300,6 +4467,113 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_24-20_29_59-jvnw7v", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Stored Procedure" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_24-20_29_59-jvnw7v", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)", + "changeType": "UPSERT", + "aspectName": "dataTransformLogic", + "aspect": { + "json": { + "transforms": [ + { + "queryStatement": { + "value": "BEGIN RETURN 'Hello World'; END", + "language": "SQL" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_26-18_25_51-lc2w8p", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:5e359958be02ce647cd9ac196dbd4585", + "urn": "urn:li:container:5e359958be02ce647cd9ac196dbd4585" + }, + { + "id": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c", + "urn": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_24-20_29_59-jvnw7v", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:5e359958be02ce647cd9ac196dbd4585", + "urn": "urn:li:container:5e359958be02ce647cd9ac196dbd4585" + }, + { + "id": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c", + "urn": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_24-20_29_59-jvnw7v", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.stream_1,PROD)", @@ -4484,6 +4758,7 @@ "aspectName": "queryProperties", "aspect": { "json": { + "customProperties": {}, "statement": { "value": "create view view_2 as select * from table_2", "language": "SQL" @@ -4604,6 +4879,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_25-15_03_55-tlf3vq", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "query", "entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Ctest_db.test_schema.view_1%2CPROD%29", @@ -4668,6 +4959,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_d72dbca43341d7a41a7e01b76e2a25d4)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_24-20_29_59-jvnw7v", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "structuredProperty", "entityUrn": "urn:li:structuredProperty:snowflake.test_db.test_schema.my_tag_0", @@ -4731,5 +5038,21 @@ "runId": "snowflake-2025_01_07-13_38_56-3fo398", "lastRunId": "no-run-id-provided" } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,test_db.test_schema.stored_procedures,PROD),my_procedure_90e45fd04db3e8c275484efab14dab7b)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "snowflake-2025_03_24-20_29_59-jvnw7v", + "lastRunId": "no-run-id-provided" + } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/sql_server/test_sql_server.py b/metadata-ingestion/tests/integration/sql_server/test_sql_server.py index 178729aa47..5f883b7801 100644 --- a/metadata-ingestion/tests/integration/sql_server/test_sql_server.py +++ b/metadata-ingestion/tests/integration/sql_server/test_sql_server.py @@ -7,7 +7,7 @@ from pathlib import Path import pytest from datahub.ingestion.source.sql.mssql.job_models import StoredProcedure -from datahub.ingestion.source.sql.mssql.stored_procedure_lineage import ( +from datahub.ingestion.source.sql.stored_procedures.base import ( generate_procedure_lineage, ) from datahub.sql_parsing.schema_resolver import SchemaResolver @@ -97,9 +97,11 @@ def test_stored_procedure_lineage(procedure_sql_file: str) -> None: mcps = list( generate_procedure_lineage( schema_resolver=schema_resolver, - procedure=procedure, + procedure=procedure.to_base_procedure(), procedure_job_urn=data_job_urn, is_temp_table=lambda name: "temp" in name.lower(), + default_db=procedure.db, + default_schema=procedure.schema, ) ) mce_helpers.check_goldens_stream( diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_split_statements.py b/metadata-ingestion/tests/unit/sql_parsing/test_split_statements.py index 61a3dfe401..4ca88f6630 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_split_statements.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_split_statements.py @@ -178,3 +178,26 @@ WHERE statements = [statement.strip() for statement in split_statements(test_sql)] expected = [test_sql] assert statements == expected + + +def test_split_statement_with_merge_query_fails(): + test_sql = """\ +MERGE INTO myTable AS t +USING myTable2 AS s +ON t.a = s.a +WHEN MATCHED THEN + UPDATE SET t.b = s.b +WHEN NOT MATCHED THEN + INSERT (a, b) VALUES (s.a, s.b)""" + statements = [statement.strip() for statement in split_statements(test_sql)] + expected = [test_sql] + assert statements != expected + assert statements == [ + """MERGE INTO myTable AS t +USING myTable2 AS s +ON t.a = s.a +WHEN MATCHED THEN""", + """UPDATE SET t.b = s.b +WHEN NOT MATCHED THEN""", + """INSERT (a, b) VALUES (s.a, s.b)""", + ] diff --git a/metadata-ingestion/tests/unit/stored_procedure/golden_files/snowflake/procedure_with_lineage.json b/metadata-ingestion/tests/unit/stored_procedure/golden_files/snowflake/procedure_with_lineage.json new file mode 100644 index 0000000000..fa05e60f70 --- /dev/null +++ b/metadata-ingestion/tests/unit/stored_procedure/golden_files/snowflake/procedure_with_lineage.json @@ -0,0 +1,42 @@ +[ +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,default_db.default_schema.stored_procedures,PROD),procedure_with_lineage.sql)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.public.processed_transactions,PROD)" + ], + "outputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.public.new_table_1,PROD)" + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.public.processed_transactions,PROD),id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.public.new_table_1,PROD),id)" + ], + "confidenceScore": 0.2 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.public.processed_transactions,PROD),value)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.public.new_table_1,PROD),value)" + ], + "confidenceScore": 0.2 + } + ] + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/stored_procedure/golden_files/snowflake/procedure_with_multi_statements.json b/metadata-ingestion/tests/unit/stored_procedure/golden_files/snowflake/procedure_with_multi_statements.json new file mode 100644 index 0000000000..1c767f3e4d --- /dev/null +++ b/metadata-ingestion/tests/unit/stored_procedure/golden_files/snowflake/procedure_with_multi_statements.json @@ -0,0 +1,44 @@ +[ +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,default_db.default_schema.stored_procedures,PROD),procedure_with_multi_statements.sql)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.source_table3,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.source_table2,PROD)" + ], + "outputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.target_table_delete,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.target_table_insert,PROD)" + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.source_table2,PROD),id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.target_table_insert,PROD),id)" + ], + "confidenceScore": 0.2 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.source_table2,PROD),column2)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.target_table_insert,PROD),column2)" + ], + "confidenceScore": 0.2 + } + ] + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/stored_procedure/golden_files/snowflake/procedure_with_multitable_lineage.json b/metadata-ingestion/tests/unit/stored_procedure/golden_files/snowflake/procedure_with_multitable_lineage.json new file mode 100644 index 0000000000..c44dc44f37 --- /dev/null +++ b/metadata-ingestion/tests/unit/stored_procedure/golden_files/snowflake/procedure_with_multitable_lineage.json @@ -0,0 +1,3 @@ +[ + +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/stored_procedure/golden_files/snowflake/procedure_with_temp_lineage.json b/metadata-ingestion/tests/unit/stored_procedure/golden_files/snowflake/procedure_with_temp_lineage.json new file mode 100644 index 0000000000..d46f325e6a --- /dev/null +++ b/metadata-ingestion/tests/unit/stored_procedure/golden_files/snowflake/procedure_with_temp_lineage.json @@ -0,0 +1,42 @@ +[ +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,default_db.default_schema.stored_procedures,PROD),procedure_with_temp_lineage.sql)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.public.processed_transactions,PROD)" + ], + "outputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.public.new_table_2,PROD)" + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.public.processed_transactions,PROD),id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.public.new_table_2,PROD),id)" + ], + "confidenceScore": 0.2 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.public.processed_transactions,PROD),value)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.public.new_table_2,PROD),value)" + ], + "confidenceScore": 0.2 + } + ] + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/stored_procedure/golden_files/snowflake/procedure_with_transaction.json b/metadata-ingestion/tests/unit/stored_procedure/golden_files/snowflake/procedure_with_transaction.json new file mode 100644 index 0000000000..f41246a47d --- /dev/null +++ b/metadata-ingestion/tests/unit/stored_procedure/golden_files/snowflake/procedure_with_transaction.json @@ -0,0 +1,53 @@ +[ +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,default_db.default_schema.stored_procedures,PROD),procedure_with_transaction.sql)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake.account_usage.query_history,PROD)" + ], + "outputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.query_history,PROD)" + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake.account_usage.query_history,PROD),query_id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.query_history,PROD),query_id)" + ], + "confidenceScore": 0.2 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake.account_usage.query_history,PROD),start_time)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.query_history,PROD),start_time)" + ], + "confidenceScore": 0.2 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake.account_usage.query_history,PROD),user)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.query_history,PROD),user)" + ], + "confidenceScore": 0.2 + } + ] + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/stored_procedure/procedures/snowflake/procedure_with_lineage.sql b/metadata-ingestion/tests/unit/stored_procedure/procedures/snowflake/procedure_with_lineage.sql new file mode 100644 index 0000000000..a0be959ff2 --- /dev/null +++ b/metadata-ingestion/tests/unit/stored_procedure/procedures/snowflake/procedure_with_lineage.sql @@ -0,0 +1,5 @@ + +BEGIN + CREATE TABLE TEST_DB.PUBLIC.new_table_1 as select id, value from TEST_DB.PUBLIC.processed_transactions; + RETURN message; +END; diff --git a/metadata-ingestion/tests/unit/stored_procedure/procedures/snowflake/procedure_with_multi_statements.sql b/metadata-ingestion/tests/unit/stored_procedure/procedures/snowflake/procedure_with_multi_statements.sql new file mode 100644 index 0000000000..120bfdcfb5 --- /dev/null +++ b/metadata-ingestion/tests/unit/stored_procedure/procedures/snowflake/procedure_with_multi_statements.sql @@ -0,0 +1,25 @@ +BEGIN + -- merge query with when matched and when not matched + MERGE INTO target_table tgt + USING ( + SELECT id, column1 + FROM source_table1 + ) src + ON (tgt.id = src.id) + WHEN MATCHED THEN + UPDATE SET + tgt.column1 = src.column1 + WHEN NOT MATCHED THEN + INSERT (id, column1) + VALUES (src.id, src.column1); + + INSERT INTO target_table_insert (id, column2) + SELECT id, column2 + FROM source_table2; + + DELETE FROM target_table_delete + WHERE id IN ( + SELECT id + FROM source_table3 + ) +END diff --git a/metadata-ingestion/tests/unit/stored_procedure/procedures/snowflake/procedure_with_multitable_lineage.sql b/metadata-ingestion/tests/unit/stored_procedure/procedures/snowflake/procedure_with_multitable_lineage.sql new file mode 100644 index 0000000000..0709d224e9 --- /dev/null +++ b/metadata-ingestion/tests/unit/stored_procedure/procedures/snowflake/procedure_with_multitable_lineage.sql @@ -0,0 +1,19 @@ +BEGIN + -- merge query with when matched and when not matched + MERGE INTO target_table tgt + USING ( + SELECT s1.id, s1.column1, s2.column2, s3.column3 + FROM source_table1 s1 + JOIN source_table2 s2 ON s1.id = s2.id + JOIN source_table3 s3 ON s1.id = s3.id + ) src + ON (tgt.id = src.id) + WHEN MATCHED THEN + UPDATE SET + tgt.column1 = src.column1, + tgt.column2 = src.column2, + tgt.column3 = src.column3 + WHEN NOT MATCHED THEN + INSERT (id, column1, column2, column3) + VALUES (src.id, src.column1, src.column2, src.column3) +END diff --git a/metadata-ingestion/tests/unit/stored_procedure/procedures/snowflake/procedure_with_temp_lineage.sql b/metadata-ingestion/tests/unit/stored_procedure/procedures/snowflake/procedure_with_temp_lineage.sql new file mode 100644 index 0000000000..69a6afaeb9 --- /dev/null +++ b/metadata-ingestion/tests/unit/stored_procedure/procedures/snowflake/procedure_with_temp_lineage.sql @@ -0,0 +1,5 @@ +BEGIN + CREATE TEMPORARY TABLE temp1 as select id, value from TEST_DB.PUBLIC.processed_transactions; + CREATE TABLE TEST_DB.PUBLIC.new_table_2 as select * from temp1; + RETURN message; +END \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/stored_procedure/procedures/snowflake/procedure_with_transaction.sql b/metadata-ingestion/tests/unit/stored_procedure/procedures/snowflake/procedure_with_transaction.sql new file mode 100644 index 0000000000..4960286cd3 --- /dev/null +++ b/metadata-ingestion/tests/unit/stored_procedure/procedures/snowflake/procedure_with_transaction.sql @@ -0,0 +1,14 @@ +BEGIN +-- snowflake stored procedure with begin transaction and commit, write to table in transaction +BEGIN transaction +insert into "QUERY_HISTORY" +SELECT query_id, start_time, user FROM "SNOWFLAKE"."ACCOUNT_USAGE"."QUERY_HISTORY"; +COMMIT +RETURN 'Success'; +EXCEPTION +WHEN OTHER THEN +ROLLBACK; +raise; +--RETURN 'Error'; +END; + diff --git a/metadata-ingestion/tests/unit/stored_procedure/test_procedure_lineage.py b/metadata-ingestion/tests/unit/stored_procedure/test_procedure_lineage.py new file mode 100644 index 0000000000..05e2b9ddb5 --- /dev/null +++ b/metadata-ingestion/tests/unit/stored_procedure/test_procedure_lineage.py @@ -0,0 +1,64 @@ +import pathlib +from pathlib import Path + +import pytest + +from datahub.ingestion.source.sql.stored_procedures.base import ( + BaseProcedure, + generate_procedure_lineage, +) +from datahub.sql_parsing.schema_resolver import SchemaResolver +from tests.test_helpers import mce_helpers + +PROCEDURE_SQLS_DIR = pathlib.Path(__file__).parent / "procedures" +PROCEDURES_GOLDEN_DIR = pathlib.Path(__file__).parent / "golden_files" +platforms = [folder.name for folder in PROCEDURE_SQLS_DIR.iterdir() if folder.is_dir()] +procedure_sqls = [ + (platform, sql_file.name, f"{platform}/{sql_file.name}") + for platform in platforms + for sql_file in PROCEDURE_SQLS_DIR.glob(f"{platform}/*.sql") +] + + +@pytest.mark.parametrize("platform, sql_file_name, procedure_sql_file", procedure_sqls) +@pytest.mark.integration +def test_stored_procedure_lineage( + platform: str, sql_file_name: str, procedure_sql_file: str +) -> None: + sql_file_path = PROCEDURE_SQLS_DIR / procedure_sql_file + procedure_code = sql_file_path.read_text() + + name = sql_file_name + db = "default_db" + schema = "default_schema" + + procedure = BaseProcedure( + name=name, + procedure_definition=procedure_code, + created=None, + last_altered=None, + comment=None, + argument_signature=None, + return_type=None, + language="SQL", + extra_properties=None, + ) + data_job_urn = f"urn:li:dataJob:(urn:li:dataFlow:({platform},{db}.{schema}.stored_procedures,PROD),{name})" + + schema_resolver = SchemaResolver(platform=platform) + + mcps = list( + generate_procedure_lineage( + schema_resolver=schema_resolver, + procedure=procedure, + procedure_job_urn=data_job_urn, + default_db=db, + default_schema=schema, + ) + ) + mce_helpers.check_goldens_stream( + outputs=mcps, + golden_path=( + PROCEDURES_GOLDEN_DIR / Path(procedure_sql_file).with_suffix(".json") + ), + )