Fix correct snowflake object types in source url (#23612)

This commit is contained in:
Mohit Tilala 2025-09-29 21:01:10 +05:30 committed by GitHub
parent ae503c26e5
commit 22a0925cd2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 181 additions and 7 deletions

View File

@ -14,6 +14,9 @@ Snowflake constants
from sqlalchemy.sql.sqltypes import BOOLEANTYPE, VARCHAR
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureType
from metadata.generated.schema.entity.data.table import TableType
DEFAULT_STREAM_COLUMNS = [
{
"name": "METADATA$ACTION",
@ -49,3 +52,18 @@ DEFAULT_STREAM_COLUMNS = [
SNOWFLAKE_TAG_DESCRIPTION = "SNOWFLAKE TAG VALUE"
SNOWFLAKE_CLASSIFICATION_DESCRIPTION = "SNOWFLAKE TAG NAME"
TABLE_TYPE_URL_MAP = {
TableType.Regular: "table",
TableType.Transient: "table",
TableType.Dynamic: "dynamic-table",
TableType.External: "external-table",
TableType.View: "view",
TableType.MaterializedView: "materialized-view",
TableType.Stream: "stream",
}
PROCEDURE_TYPE_URL_MAP = {
StoredProcedureType.StoredProcedure: "procedure",
StoredProcedureType.UDF: "user-function",
}

View File

@ -74,8 +74,10 @@ from metadata.ingestion.source.database.incremental_metadata_extraction import (
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.ingestion.source.database.snowflake.constants import (
DEFAULT_STREAM_COLUMNS,
PROCEDURE_TYPE_URL_MAP,
SNOWFLAKE_CLASSIFICATION_DESCRIPTION,
SNOWFLAKE_TAG_DESCRIPTION,
TABLE_TYPE_URL_MAP,
)
from metadata.ingestion.source.database.snowflake.models import (
STORED_PROC_LANGUAGE_MAP,
@ -688,11 +690,11 @@ class SnowflakeSource(
table_type: Optional[TableType] = None,
) -> Optional[str]:
"""
Method to get the source url for snowflake
Method to get the source url for snowflake tables
"""
try:
if self.account and self.org_name:
tab_type = "view" if table_type == TableType.View else "table"
tab_type = TABLE_TYPE_URL_MAP.get(table_type, "table")
url = self._get_source_url_root(
database_name=database_name, schema_name=schema_name
)
@ -704,6 +706,41 @@ class SnowflakeSource(
logger.error(f"Unable to get source url: {exc}")
return None
def get_procedure_source_url(
self,
database_name: Optional[str] = None,
schema_name: Optional[str] = None,
procedure_name: Optional[str] = None,
procedure_signature: Optional[str] = None,
procedure_type: Optional[str] = None,
) -> Optional[str]:
"""
Method to get the source url for snowflake stored procedures
"""
try:
if self.account and self.org_name:
url = self._get_source_url_root(
database_name=database_name, schema_name=schema_name
)
# Convert string procedure type to enum and get URL mapping
proc_type_enum = (
StoredProcedureType(procedure_type)
if procedure_type
else StoredProcedureType.StoredProcedure
)
tab_type = PROCEDURE_TYPE_URL_MAP.get(proc_type_enum, "procedure")
if procedure_name:
full_name = f"{procedure_name}{procedure_signature or ''}"
url = f"{url}/{tab_type}/{full_name}"
return url
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(f"Unable to get procedure source url: {exc}")
return None
def _get_view_names_and_types(
self, schema_name: str, materialized_views: bool = False
) -> List[TableNameAndType]:
@ -844,12 +881,13 @@ class SnowflakeSource(
schema_name=self.context.get().database_schema,
),
sourceUrl=SourceUrl(
self._get_source_url_root(
self.get_procedure_source_url(
database_name=self.context.get().database,
schema_name=self.context.get().database_schema,
procedure_name=stored_procedure.name,
procedure_signature=stored_procedure.signature,
procedure_type=stored_procedure.procedure_type,
)
+ f"/procedure/{stored_procedure.name}"
+ f"{stored_procedure.signature if stored_procedure.signature else ''}"
),
)
yield Either(right=stored_procedure_request)

View File

@ -140,10 +140,30 @@ MOCK_SCHEMA_NAME_1 = "INFORMATION_SCHEMA"
MOCK_SCHEMA_NAME_2 = "TPCDS_SF10TCL"
MOCK_VIEW_NAME = "COLUMNS"
MOCK_TABLE_NAME = "CALL_CENTER"
EXPECTED_SNOW_URL_VIEW = "https://app.snowflake.com/random_org/random_account/#/data/databases/SNOWFLAKE_SAMPLE_DATA/schemas/INFORMATION_SCHEMA/view/COLUMNS"
# Table URLs
EXPECTED_SNOW_URL_TABLE = "https://app.snowflake.com/random_org/random_account/#/data/databases/SNOWFLAKE_SAMPLE_DATA/schemas/TPCDS_SF10TCL/table/CALL_CENTER"
EXPECTED_SNOW_URL_VIEW_CUSTOM = "https://custom.snowflake.com/random_org/random_account/#/data/databases/SNOWFLAKE_SAMPLE_DATA/schemas/INFORMATION_SCHEMA/view/COLUMNS"
EXPECTED_SNOW_URL_TABLE_CUSTOM = "https://custom.snowflake.com/random_org/random_account/#/data/databases/SNOWFLAKE_SAMPLE_DATA/schemas/TPCDS_SF10TCL/table/CALL_CENTER"
EXPECTED_SNOW_URL_TRANSIENT = "https://app.snowflake.com/random_org/random_account/#/data/databases/SNOWFLAKE_SAMPLE_DATA/schemas/INFORMATION_SCHEMA/table/TEST_TRANSIENT"
EXPECTED_SNOW_URL_TRANSIENT_CUSTOM = "https://custom.snowflake.com/random_org/random_account/#/data/databases/SNOWFLAKE_SAMPLE_DATA/schemas/INFORMATION_SCHEMA/table/TEST_TRANSIENT"
EXPECTED_SNOW_URL_DYNAMIC = "https://app.snowflake.com/random_org/random_account/#/data/databases/SNOWFLAKE_SAMPLE_DATA/schemas/INFORMATION_SCHEMA/dynamic-table/TEST_DYNAMIC"
EXPECTED_SNOW_URL_DYNAMIC_CUSTOM = "https://custom.snowflake.com/random_org/random_account/#/data/databases/SNOWFLAKE_SAMPLE_DATA/schemas/INFORMATION_SCHEMA/dynamic-table/TEST_DYNAMIC"
EXPECTED_SNOW_URL_EXTERNAL = "https://app.snowflake.com/random_org/random_account/#/data/databases/SNOWFLAKE_SAMPLE_DATA/schemas/INFORMATION_SCHEMA/external-table/TEST_EXTERNAL"
EXPECTED_SNOW_URL_EXTERNAL_CUSTOM = "https://custom.snowflake.com/random_org/random_account/#/data/databases/SNOWFLAKE_SAMPLE_DATA/schemas/INFORMATION_SCHEMA/external-table/TEST_EXTERNAL"
EXPECTED_SNOW_URL_VIEW = "https://app.snowflake.com/random_org/random_account/#/data/databases/SNOWFLAKE_SAMPLE_DATA/schemas/INFORMATION_SCHEMA/view/COLUMNS"
EXPECTED_SNOW_URL_VIEW_CUSTOM = "https://custom.snowflake.com/random_org/random_account/#/data/databases/SNOWFLAKE_SAMPLE_DATA/schemas/INFORMATION_SCHEMA/view/COLUMNS"
EXPECTED_SNOW_URL_MATERIALIZED_VIEW = "https://app.snowflake.com/random_org/random_account/#/data/databases/SNOWFLAKE_SAMPLE_DATA/schemas/INFORMATION_SCHEMA/materialized-view/TEST_MV"
EXPECTED_SNOW_URL_MATERIALIZED_VIEW_CUSTOM = "https://custom.snowflake.com/random_org/random_account/#/data/databases/SNOWFLAKE_SAMPLE_DATA/schemas/INFORMATION_SCHEMA/materialized-view/TEST_MV"
EXPECTED_SNOW_URL_STREAM = "https://app.snowflake.com/random_org/random_account/#/data/databases/SNOWFLAKE_SAMPLE_DATA/schemas/INFORMATION_SCHEMA/stream/TEST_STREAM"
EXPECTED_SNOW_URL_STREAM_CUSTOM = "https://custom.snowflake.com/random_org/random_account/#/data/databases/SNOWFLAKE_SAMPLE_DATA/schemas/INFORMATION_SCHEMA/stream/TEST_STREAM"
# Procedure URLs
EXPECTED_SNOW_URL_PROCEDURE = "https://app.snowflake.com/random_org/random_account/#/data/databases/SNOWFLAKE_SAMPLE_DATA/schemas/INFORMATION_SCHEMA/procedure/TEST_PROC(VARCHAR)"
EXPECTED_SNOW_URL_PROCEDURE_CUSTOM = "https://custom.snowflake.com/random_org/random_account/#/data/databases/SNOWFLAKE_SAMPLE_DATA/schemas/INFORMATION_SCHEMA/procedure/TEST_PROC(VARCHAR)"
EXPECTED_SNOW_URL_UDF = "https://app.snowflake.com/random_org/random_account/#/data/databases/SNOWFLAKE_SAMPLE_DATA/schemas/INFORMATION_SCHEMA/user-function/TEST_UDF(NUMBER)"
EXPECTED_SNOW_URL_UDF_CUSTOM = "https://custom.snowflake.com/random_org/random_account/#/data/databases/SNOWFLAKE_SAMPLE_DATA/schemas/INFORMATION_SCHEMA/user-function/TEST_UDF(NUMBER)"
def get_snowflake_sources():
@ -225,10 +245,28 @@ class SnowflakeUnitTest(TestCase):
for source_key, source in self.sources.items():
if source_key == "custom_host":
expected_table_url = EXPECTED_SNOW_URL_TABLE_CUSTOM
expected_transient_url = EXPECTED_SNOW_URL_TRANSIENT_CUSTOM
expected_dynamic_url = EXPECTED_SNOW_URL_DYNAMIC_CUSTOM
expected_external_url = EXPECTED_SNOW_URL_EXTERNAL_CUSTOM
expected_view_url = EXPECTED_SNOW_URL_VIEW_CUSTOM
expected_materialized_view_url = (
EXPECTED_SNOW_URL_MATERIALIZED_VIEW_CUSTOM
)
expected_stream_url = EXPECTED_SNOW_URL_STREAM_CUSTOM
expected_procedure_url = EXPECTED_SNOW_URL_PROCEDURE_CUSTOM
expected_udf_url = EXPECTED_SNOW_URL_UDF_CUSTOM
else:
expected_table_url = EXPECTED_SNOW_URL_TABLE
expected_transient_url = EXPECTED_SNOW_URL_TRANSIENT
expected_dynamic_url = EXPECTED_SNOW_URL_DYNAMIC
expected_external_url = EXPECTED_SNOW_URL_EXTERNAL
expected_view_url = EXPECTED_SNOW_URL_VIEW
expected_materialized_view_url = EXPECTED_SNOW_URL_MATERIALIZED_VIEW
expected_stream_url = EXPECTED_SNOW_URL_STREAM
expected_procedure_url = EXPECTED_SNOW_URL_PROCEDURE
expected_udf_url = EXPECTED_SNOW_URL_UDF
# Test regular table URL
self.assertEqual(
source.get_source_url(
database_name=MOCK_DB_NAME,
@ -239,6 +277,40 @@ class SnowflakeUnitTest(TestCase):
expected_table_url,
)
# Test transient table URL (should use 'table')
self.assertEqual(
source.get_source_url(
database_name=MOCK_DB_NAME,
schema_name=MOCK_SCHEMA_NAME_1,
table_name="TEST_TRANSIENT",
table_type=TableType.Transient,
),
expected_transient_url,
)
# Test dynamic table URL
self.assertEqual(
source.get_source_url(
database_name=MOCK_DB_NAME,
schema_name=MOCK_SCHEMA_NAME_1,
table_name="TEST_DYNAMIC",
table_type=TableType.Dynamic,
),
expected_dynamic_url,
)
# Test external table URL
self.assertEqual(
source.get_source_url(
database_name=MOCK_DB_NAME,
schema_name=MOCK_SCHEMA_NAME_1,
table_name="TEST_EXTERNAL",
table_type=TableType.External,
),
expected_external_url,
)
# Test view URL
self.assertEqual(
source.get_source_url(
database_name=MOCK_DB_NAME,
@ -249,6 +321,52 @@ class SnowflakeUnitTest(TestCase):
expected_view_url,
)
# Test materialized view URL
self.assertEqual(
source.get_source_url(
database_name=MOCK_DB_NAME,
schema_name=MOCK_SCHEMA_NAME_1,
table_name="TEST_MV",
table_type=TableType.MaterializedView,
),
expected_materialized_view_url,
)
# Test stream URL
self.assertEqual(
source.get_source_url(
database_name=MOCK_DB_NAME,
schema_name=MOCK_SCHEMA_NAME_1,
table_name="TEST_STREAM",
table_type=TableType.Stream,
),
expected_stream_url,
)
# Test stored procedure URL
self.assertEqual(
source.get_procedure_source_url(
database_name=MOCK_DB_NAME,
schema_name=MOCK_SCHEMA_NAME_1,
procedure_name="TEST_PROC",
procedure_signature="(VARCHAR)",
procedure_type="StoredProcedure",
),
expected_procedure_url,
)
# Test UDF URL
self.assertEqual(
source.get_procedure_source_url(
database_name=MOCK_DB_NAME,
schema_name=MOCK_SCHEMA_NAME_1,
procedure_name="TEST_UDF",
procedure_signature="(NUMBER)",
procedure_type="UDF",
),
expected_udf_url,
)
def test_source_url(self):
"""
method to test source url