fix(ingest/dbt): Fix urn validation in ownership type check (#13563)

This commit is contained in:
Tamas Nemeth 2025-05-21 13:02:26 +02:00 committed by GitHub
parent f4a8d9e7fc
commit 9fca1737ff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 91 additions and 3 deletions

View File

@ -59,6 +59,7 @@ from datahub.metadata.urns import (
DataJobUrn,
DataPlatformUrn,
DatasetUrn,
OwnershipTypeUrn,
TagUrn,
)
from datahub.utilities.urn_encoder import UrnEncoder
@ -406,7 +407,8 @@ def make_ml_model_group_urn(platform: str, group_name: str, env: str) -> str:
def validate_ownership_type(ownership_type: str) -> Tuple[str, Optional[str]]:
if ownership_type.startswith("urn:li:"):
return OwnershipTypeClass.CUSTOM, ownership_type
ownership_type_urn = OwnershipTypeUrn.from_string(ownership_type)
return OwnershipTypeClass.CUSTOM, ownership_type_urn.urn()
ownership_type = ownership_type.upper()
if ownership_type in get_enum_options(OwnershipTypeClass):
return ownership_type, None

View File

@ -284,6 +284,8 @@ SNOWFLAKE_TYPES_MAP: Dict[str, Any] = {
"INTEGER": NumberType,
"BIGINT": NumberType,
"SMALLINT": NumberType,
"TINYINT": NumberType,
"BYTEINT": NumberType,
"FLOAT": NumberType,
"FLOAT4": NumberType,
"FLOAT8": NumberType,
@ -291,6 +293,7 @@ SNOWFLAKE_TYPES_MAP: Dict[str, Any] = {
"DOUBLE PRECISION": NumberType,
"REAL": NumberType,
"VARCHAR": StringType,
"CHARACTER VARYING": StringType,
"CHAR": StringType,
"CHARACTER": StringType,
"STRING": StringType,
@ -313,8 +316,8 @@ SNOWFLAKE_TYPES_MAP: Dict[str, Any] = {
def resolve_snowflake_modified_type(type_string: str) -> Any:
# Match types with precision and scale, e.g., 'DECIMAL(38,0)'
match = re.match(r"([a-zA-Z_]+)\(\d+,\s\d+\)", type_string)
# Match types with precision and scale, e.g., 'DECIMAL(38,0)' or TIME(3)
match = re.match(r"([a-z A-Z_]+)\(\d+(,(\s+)?\d+)?\)", type_string)
if match:
modified_type_base = match.group(1) # Extract the base type
return SNOWFLAKE_TYPES_MAP.get(modified_type_base)

View File

@ -1,5 +1,8 @@
from typing import Any, Dict
import pytest
from datahub.emitter.mce_builder import validate_ownership_type
from datahub.metadata.com.linkedin.pegasus2avro.common import GlobalTags
from datahub.metadata.schema_classes import (
DomainsClass,
@ -12,6 +15,7 @@ from datahub.metadata.schema_classes import (
OwnershipTypeClass,
)
from datahub.utilities.mapping import OperationProcessor
from datahub.utilities.urns.error import InvalidUrnError
def get_operation_defs() -> Dict[str, Any]:
@ -420,3 +424,31 @@ def test_operation_processor_datahub_props():
assert isinstance(aspect_map["add_domain"], DomainsClass)
assert aspect_map["add_domain"].domains == ["urn:li:domain:domain1"]
def test_validate_ownership_type_with_urn_valid():
# Valid urn starting with "urn:li:ownershipType:" (and not __system__)
input_urn = "urn:li:ownershipType:TEST"
result = validate_ownership_type(input_urn)
assert result == (OwnershipTypeClass.CUSTOM, input_urn)
def test_validate_ownership_type_with_wrong_prefix():
# Invalid if urn does not have the correct prefix
wrong_urn = "urn:li:notOwnership:INVALID"
with pytest.raises(InvalidUrnError):
validate_ownership_type(wrong_urn)
def test_validate_ownership_type_non_urn_valid():
# Non-urn input should be uppercased and found in valid options.
# Assuming "DATAOWNER" is one of the valid options from OwnershipTypeClass.
input_type = "dataowner"
result = validate_ownership_type(input_type)
assert result == ("DATAOWNER", None)
def test_validate_ownership_type_non_urn_invalid():
# Non-urn input that is not valid should raise ValueError.
with pytest.raises(ValueError):
validate_ownership_type("invalid_type")

View File

@ -2,8 +2,10 @@ import pytest
from datahub.ingestion.source.sql.sql_types import (
ATHENA_SQL_TYPES_MAP,
SNOWFLAKE_TYPES_MAP,
TRINO_SQL_TYPES_MAP,
resolve_athena_modified_type,
resolve_snowflake_modified_type,
resolve_sql_type,
resolve_trino_modified_type,
)
@ -73,6 +75,55 @@ def test_resolve_athena_modified_type(data_type, expected_data_type):
)
@pytest.mark.parametrize(
"data_type, expected_data_type",
[
("BOOLEAN", "BOOLEAN"),
("TINYINT", "TINYINT"),
("BYTEINT", "BYTEINT"),
("SMALLINT", "SMALLINT"),
("INT", "INT"),
("INTEGER", "INTEGER"),
("BIGINT", "BIGINT"),
("FLOAT", "FLOAT"),
("FLOAT4", "FLOAT4"),
("FLOAT8", "FLOAT8"),
("DOUBLE", "DOUBLE"),
("DOUBLE PRECISION", "DOUBLE PRECISION"),
("REAL", "REAL"),
("NUMBER(10,0)", "NUMBER"),
("DECIMAL(38,2)", "DECIMAL"),
("NUMERIC(15,4)", "NUMERIC"),
("VARCHAR(20)", "VARCHAR"),
("CHARACTER VARYING(50)", "CHARACTER VARYING"),
("CHAR(10)", "CHAR"),
("CHARACTER(5)", "CHARACTER"),
("STRING", "STRING"),
("TEXT", "TEXT"),
("BINARY", "BINARY"),
("VARBINARY", "VARBINARY"),
("DATE", "DATE"),
("DATETIME", "DATETIME"),
("TIME", "TIME"),
("TIME(3)", "TIME"),
("TIMESTAMP", "TIMESTAMP"),
("TIMESTAMP(3)", "TIMESTAMP"),
("TIMESTAMP_LTZ", "TIMESTAMP_LTZ"),
("TIMESTAMP_NTZ", "TIMESTAMP_NTZ"),
("TIMESTAMP_TZ", "TIMESTAMP_TZ"),
("VARIANT", "VARIANT"),
("OBJECT", "OBJECT"),
("ARRAY", "ARRAY"),
("GEOGRAPHY", "GEOGRAPHY"),
],
)
def test_resolve_snowflake_type(data_type, expected_data_type):
assert (
resolve_snowflake_modified_type(data_type)
== SNOWFLAKE_TYPES_MAP[expected_data_type]
)
def test_resolve_sql_type() -> None:
assert resolve_sql_type("boolean") == BooleanTypeClass()
assert resolve_sql_type("varchar") == StringTypeClass()