Fixes #21677: Refactor and enhance the entity name transformation logic (#22695)

This commit is contained in:
Ayush Shah 2025-08-21 08:43:33 +05:30
parent 0782dc28b6
commit c99edbe290
9 changed files with 2941 additions and 69 deletions

View File

@ -14564,6 +14564,14 @@
"description": "Trend confidence level",
"tags": [],
"ordinalPosition": 3
},
{
"name": "trend_confidence with quotes \"00\"",
"dataType": "DOUBLE",
"dataTypeDisplay": "double",
"description": "Trend confidence level",
"tags": [],
"ordinalPosition": 4
}
]
}

View File

@ -1223,6 +1223,101 @@
"description": null,
"constraint": "NULL",
"ordinalPosition": 3
},
{
"name": "risk_score",
"dataType": "DOUBLE",
"dataTypeDisplay": "double",
"description": "Customer risk assessment score",
"tags": [],
"ordinalPosition": 2
},
{
"name": "credit_metrics",
"dataType": "STRUCT",
"dataTypeDisplay": "struct",
"description": "Customer credit assessment metrics",
"tags": [],
"ordinalPosition": 3,
"children": [
{
"name": "credit_score",
"dataType": "BIGINT",
"dataTypeDisplay": "bigint",
"description": "Customer credit score",
"tags": [],
"ordinalPosition": 1
},
{
"name": "payment_history",
"dataType": "STRUCT",
"dataTypeDisplay": "struct",
"description": "Payment history analytics",
"tags": [],
"ordinalPosition": 2,
"children": [
{
"name": "on_time_payments",
"dataType": "BIGINT",
"dataTypeDisplay": "bigint",
"description": "Number of on-time payments",
"tags": [],
"ordinalPosition": 1
},
{
"name": "late_payments",
"dataType": "BIGINT",
"dataTypeDisplay": "bigint",
"description": "Number of late payments",
"tags": [],
"ordinalPosition": 2
},
{
"name": "payment_trends",
"dataType": "ARRAY",
"arrayDataType": "STRUCT",
"dataTypeDisplay": "array<struct>",
"description": "Payment trend analysis",
"tags": [],
"ordinalPosition": 3,
"children": [
{
"name": "trend_period",
"dataType": "DATE",
"dataTypeDisplay": "date",
"description": "Payment trend period",
"tags": [],
"ordinalPosition": 1
},
{
"name": "trend_score",
"dataType": "DOUBLE",
"dataTypeDisplay": "double",
"description": "Payment trend score",
"tags": [],
"ordinalPosition": 2
},
{
"name": "trend_confidence",
"dataType": "DOUBLE",
"dataTypeDisplay": "double",
"description": "Trend confidence level",
"tags": [],
"ordinalPosition": 3
},
{
"name": "trend_confidence with quotes \"00\"",
"dataType": "DOUBLE",
"dataTypeDisplay": "double",
"description": "Trend confidence level",
"tags": [],
"ordinalPosition": 4
}
]
}
]
}
]
}
],
"tableConstraints": [

View File

@ -13,19 +13,103 @@ Validation logic for Custom Pydantic BaseModel
"""
import logging
from enum import Enum
from typing import Any, Callable, Dict, Optional
logger = logging.getLogger("metadata")
RESTRICTED_KEYWORDS = ["::", ">"]
RESERVED_COLON_KEYWORD = "__reserved__colon__"
RESERVED_ARROW_KEYWORD = "__reserved__arrow__"
RESERVED_QUOTE_KEYWORD = "__reserved__quote__"
CREATE_ADJACENT_MODELS = {"ProfilerResponse", "SampleData"}
NAME_FIELDS = {"EntityName", "str", "ColumnName", "TableData"}
FETCH_MODELS = {"Table", "CustomColumnName", "DashboardDataModel"}
FIELD_NAMES = {"name", "columns", "root"}
class TransformDirection(Enum):
"""Direction of name transformation"""
ENCODE = "encode" # For storage (Create operations) - replace separators
DECODE = "decode" # For display (Fetch operations) - revert separators
def is_service_level_create_model(model_name: str) -> bool:
"""
Check if a model is a Service-level Create model that should NOT be transformed.
Service-level models follow the pattern: Create*ServiceRequest where * is the service name
This is scalable and requires no maintenance for new services.
"""
if not model_name.startswith("Create") or not model_name.endswith("ServiceRequest"):
return False
# Extract the middle part (service name) - must not be empty
# "CreateServiceRequest" -> middle = "" (invalid)
# "CreateDatabaseServiceRequest" -> middle = "Database" (valid)
middle = model_name[
6:-14
] # Remove "Create" (6 chars) and "ServiceRequest" (14 chars)
return len(middle) > 0
# Explicit configuration for entity name transformations
# This dictionary will be populated lazily to avoid circular imports
TRANSFORMABLE_ENTITIES: Dict[Any, Dict[str, Any]] = {}
def _initialize_transformable_entities():
"""Initialize the transformable entities dictionary lazily to avoid circular imports"""
# Import all model classes here to avoid circular dependency at module load time
from metadata.generated.schema.api.data.createDashboardDataModel import (
CreateDashboardDataModelRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.entity.data.dashboardDataModel import (
DashboardDataModel,
)
from metadata.generated.schema.entity.data.table import (
ColumnName,
ColumnProfile,
Table,
TableData,
)
from metadata.profiler.api.models import ProfilerResponse
from metadata.utils.entity_link import CustomColumnName
# Now populate the dictionary with the imported classes
TRANSFORMABLE_ENTITIES.update(
{
# Fetch models - decode reserved keywords back to original characters
Table: {
"fields": {"name", "columns", "children", "tableConstraints"},
"direction": TransformDirection.DECODE,
},
DashboardDataModel: {
"fields": {"name", "columns", "children"},
"direction": TransformDirection.DECODE,
},
CustomColumnName: {
"fields": {"root"},
"direction": TransformDirection.DECODE,
},
# Create/Store models - encode special characters to reserved keywords
ProfilerResponse: {
"fields": {"name", "profile"},
"direction": TransformDirection.ENCODE,
},
TableData: {"fields": {"columns"}, "direction": TransformDirection.ENCODE},
ColumnName: {"fields": {"root"}, "direction": TransformDirection.ENCODE},
CreateTableRequest: {
"fields": {"name", "columns", "children", "tableConstraints"},
"direction": TransformDirection.ENCODE,
},
CreateDashboardDataModelRequest: {
"fields": {"name", "columns", "children"},
"direction": TransformDirection.ENCODE,
},
ColumnProfile: {
"fields": {"name"},
"direction": TransformDirection.ENCODE,
},
}
)
def revert_separators(value):
@ -44,42 +128,84 @@ def replace_separators(value):
)
def validate_name_and_transform(values, modification_method, field_name: str = None):
"""
Validate the name and transform it if needed.
"""
if isinstance(values, str) and field_name in FIELD_NAMES:
values = modification_method(values)
elif (
hasattr(values, "root")
and isinstance(values.root, str)
and field_name in FIELD_NAMES
def get_entity_config(model: Optional[Any]) -> Optional[Dict[str, Any]]:
"""Get transformation configuration for entity"""
_initialize_transformable_entities() # Ensure entities are loaded
return TRANSFORMABLE_ENTITIES.get(model)
def get_transformer(model: Optional[Any]) -> Optional[Callable]:
"""Get the appropriate transformer function for model"""
config = get_entity_config(model)
if not config:
return None
direction = config.get("direction")
if direction == TransformDirection.ENCODE:
return replace_separators
elif direction == TransformDirection.DECODE:
return revert_separators
return None
def transform_all_names(obj, transformer):
"""Transform all name fields recursively"""
if not obj:
return
# Transform name field if it exists (supports both obj.name.root and obj.root)
name = getattr(obj, "name", None)
if name and hasattr(name, "root") and name.root is not None:
name.root = transformer(name.root)
elif hasattr(obj, "root") and obj.root is not None:
obj.root = transformer(obj.root)
# Transform nested collections in a single loop each
for attr_name in ["columns", "children"]:
if hasattr(obj, attr_name):
attr_value = getattr(obj, attr_name)
if attr_value is not None:
for item in attr_value:
transform_all_names(item, transformer)
# Transform table constraints
if hasattr(obj, "tableConstraints"):
table_constraints = getattr(obj, "tableConstraints")
if table_constraints is not None:
for constraint in table_constraints:
if hasattr(constraint, "columns"):
constraint.columns = [
transformer(col) for col in constraint.columns
]
if transformer == replace_separators and type(name) == str:
obj.name = transformer(name)
def transform_entity_names(entity: Any, model: Optional[Any]) -> Any:
"""Transform entity names"""
model_name = model.__name__
if not entity or (
model_name.startswith("Create") and is_service_level_create_model(model_name)
):
values.root = modification_method(values.root)
elif hasattr(type(values), "model_fields"):
for key in type(values).model_fields.keys():
if getattr(values, key):
if getattr(values, key).__class__.__name__ in NAME_FIELDS:
setattr(
values,
key,
validate_name_and_transform(
getattr(values, key),
modification_method=modification_method,
field_name=key,
),
)
elif isinstance(getattr(values, key), list):
setattr(
values,
key,
[
validate_name_and_transform(
item,
modification_method=modification_method,
field_name=key,
)
for item in getattr(values, key)
],
)
return values
return entity
# Root attribute handling
if hasattr(entity, "root") and entity.root is not None:
entity.root = (
replace_separators(entity.root)
if model_name.startswith("Create")
else revert_separators(entity.root)
)
return entity
# Get model-specific transformer
transformer = get_transformer(model)
if not transformer:
# Fallback to original logic for backward compatibility
transformer = (
replace_separators if model_name.startswith("Create") else revert_separators
)
transform_all_names(entity, transformer)
return entity

View File

@ -26,13 +26,7 @@ from pydantic.types import SecretStr
from pydantic_core.core_schema import SerializationInfo
from typing_extensions import Annotated
from metadata.ingestion.models.custom_basemodel_validation import (
CREATE_ADJACENT_MODELS,
FETCH_MODELS,
replace_separators,
revert_separators,
validate_name_and_transform,
)
from metadata.ingestion.models.custom_basemodel_validation import transform_entity_names
logger = logging.getLogger("metadata")
@ -75,25 +69,18 @@ class BaseModel(PydanticBaseModel):
@classmethod
def parse_name(cls, values): # pylint: disable=inconsistent-return-statements
"""
Primary entry point to process values based on their class.
Transform entity names using hybrid configuration system.
"""
if not values:
return
return values
try:
if cls.__name__ in CREATE_ADJACENT_MODELS or cls.__name__.startswith(
"Create"
):
values = validate_name_and_transform(values, replace_separators)
elif cls.__name__ in FETCH_MODELS:
values = validate_name_and_transform(values, revert_separators)
# Try new hybrid system first
return transform_entity_names(entity=values, model=cls)
except Exception as exc:
logger.warning("Exception while parsing Basemodel: %s", exc)
raise exc
return values
return values
def model_dump_json( # pylint: disable=too-many-arguments
self,

View File

@ -15,6 +15,7 @@ ES indexes definitions
"""
import hashlib
import re
import traceback
from typing import Dict, List, Optional, Type, TypeVar, Union
from antlr4.CommonTokenStream import CommonTokenStream
@ -53,6 +54,9 @@ from metadata.generated.schema.tests.testSuite import TestSuite
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.dispatch import class_register
from metadata.utils.elasticsearch import get_entity_from_es_result
from metadata.utils.logger import utils_logger
logger = utils_logger()
T = TypeVar("T", bound=BaseModel)
@ -144,12 +148,32 @@ def build(
:param kwargs: required to build the FQN
:return: FQN as a string
"""
func = fqn_build_registry.registry.get(entity_type.__name__)
if not func:
raise FQNBuildingException(
f"Invalid Entity Type {entity_type.__name__}. FQN builder not implemented."
# Transform table_name and column_name if they exist and contain special characters
if kwargs.get("table_name") or kwargs.get("column_name"):
from metadata.ingestion.models.custom_basemodel_validation import ( # pylint: disable=import-outside-toplevel
replace_separators,
)
table_name = kwargs.get("table_name")
if table_name and isinstance(table_name, str):
kwargs["table_name"] = replace_separators(table_name)
column_name = kwargs.get("column_name")
if column_name and isinstance(column_name, str):
kwargs["column_name"] = replace_separators(column_name)
func = fqn_build_registry.registry.get(entity_type.__name__)
try:
if not func:
raise FQNBuildingException(
f"Invalid Entity Type {entity_type.__name__}. FQN builder not implemented."
)
return func(metadata, **kwargs)
except Exception as e:
logger.debug(traceback.format_exc())
raise FQNBuildingException(
f"Error building FQN for {entity_type.__name__}: {e}"
)
return func(metadata, **kwargs)
@fqn_build_registry.add(Table)

View File

@ -0,0 +1,998 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Comprehensive tests for custom basemodel validation system.
Tests the hybrid name validation system with all edge cases and scenarios.
"""
import uuid
from unittest import TestCase
from unittest.mock import patch
from metadata.generated.schema.api.data.createDashboardDataModel import (
CreateDashboardDataModelRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.entity.data.dashboardDataModel import (
DashboardDataModel,
DataModelType,
)
from metadata.generated.schema.entity.data.table import (
Column,
ColumnName,
DataType,
Table,
TableData,
)
from metadata.generated.schema.type.basic import EntityName, FullyQualifiedEntityName
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.models.custom_basemodel_validation import (
RESERVED_ARROW_KEYWORD,
RESERVED_COLON_KEYWORD,
RESERVED_QUOTE_KEYWORD,
TRANSFORMABLE_ENTITIES,
TransformDirection,
get_entity_config,
get_transformer,
is_service_level_create_model,
replace_separators,
revert_separators,
transform_entity_names,
)
from metadata.profiler.api.models import ProfilerResponse
from metadata.utils.entity_link import CustomColumnName
class TestCustomBasemodelValidation(TestCase):
"""Comprehensive test suite for custom basemodel validation functionality."""
def setUp(self):
"""Set up common test data."""
self.sample_table_id = uuid.uuid4()
self.sample_schema_ref = EntityReference(id=uuid.uuid4(), type="databaseSchema")
def test_service_pattern_detection(self):
"""Test the scalable service pattern detection system."""
# Test existing services (should be identified as services)
existing_services = [
"CreateDatabaseServiceRequest",
"CreateDashboardServiceRequest",
"CreateMessagingServiceRequest",
"CreatePipelineServiceRequest",
"CreateMlModelServiceRequest",
"CreateStorageServiceRequest",
"CreateMetadataServiceRequest",
"CreateSearchServiceRequest",
"CreateApiServiceRequest",
]
for service in existing_services:
self.assertTrue(
is_service_level_create_model(service),
f"{service} should be identified as a service model",
)
# Test future services (should be identified as services - scalability test)
future_services = [
"CreateNewServiceRequest",
"CreateCustomServiceRequest",
"CreateXYZServiceRequest",
"CreateAnalyticsServiceRequest",
"CreateAnyThingServiceRequest",
]
for service in future_services:
self.assertTrue(
is_service_level_create_model(service),
f"{service} should be identified as a service model (future compatibility)",
)
# Test non-services (should NOT be identified as services)
non_services = [
"CreateTable",
"CreateDatabase",
"CreateServiceRequest", # No service name between Create and ServiceRequest
"CreateService", # Missing "Request" suffix
"MyCreateServiceRequest", # Doesn't start with "Create"
"createDatabaseServiceRequest", # Lowercase
"CreateServiceRequestSomething", # ServiceRequest not at the end
"CreateDashboard",
"CreateChart",
]
for non_service in non_services:
self.assertFalse(
is_service_level_create_model(non_service),
f"{non_service} should NOT be identified as a service model",
)
def test_service_pattern_edge_cases(self):
"""Test edge cases for service pattern detection."""
# Test edge case: just "CreateServiceRequest" (no service name)
self.assertFalse(
is_service_level_create_model("CreateServiceRequest"),
"CreateServiceRequest with no service name should not be considered a service",
)
# Test minimum valid service name
self.assertTrue(
is_service_level_create_model("CreateXServiceRequest"),
"CreateXServiceRequest should be considered a service",
)
# Test very long service name
long_service = "Create" + "Very" * 50 + "LongServiceRequest"
self.assertTrue(
is_service_level_create_model(long_service),
"Very long service names should be handled correctly",
)
def test_transformable_entities_configuration(self):
"""Test the TRANSFORMABLE_ENTITIES configuration."""
# Test that expected entities are configured
expected_entities = {
Table,
DashboardDataModel,
CustomColumnName,
ProfilerResponse,
TableData,
CreateTableRequest,
CreateDashboardDataModelRequest,
}
for entity in expected_entities:
self.assertIn(
entity,
TRANSFORMABLE_ENTITIES,
f"{entity} should be in TRANSFORMABLE_ENTITIES",
)
# Test entity configurations have required fields
for entity_name, config in TRANSFORMABLE_ENTITIES.items():
self.assertIn(
"fields", config, f"{entity_name} config should have 'fields' key"
)
self.assertIn(
"direction", config, f"{entity_name} config should have 'direction' key"
)
self.assertIsInstance(
config["fields"], set, f"{entity_name} fields should be a set"
)
self.assertIsInstance(
config["direction"],
TransformDirection,
f"{entity_name} direction should be TransformDirection enum",
)
def test_get_entity_config(self):
"""Test get_entity_config function."""
# Test existing entity
table_config = get_entity_config(Table)
self.assertIsNotNone(table_config)
self.assertEqual(table_config["direction"], TransformDirection.DECODE)
self.assertIn("name", table_config["fields"])
# Test non-existent entity
non_existent_config = get_entity_config("NonExistentEntity")
self.assertIsNone(non_existent_config)
def test_get_transformer(self):
"""Test get_transformer function."""
# Test DECODE transformer
table_transformer = get_transformer(Table)
self.assertIsNotNone(table_transformer)
self.assertEqual(table_transformer, revert_separators)
# Test ENCODE transformer
create_table_transformer = get_transformer(CreateTableRequest)
self.assertIsNotNone(create_table_transformer)
self.assertEqual(create_table_transformer, replace_separators)
# Test non-existent entity
non_existent_transformer = get_transformer("NonExistentEntity")
self.assertIsNone(non_existent_transformer)
def test_replace_separators_function(self):
"""Test replace_separators function with various inputs."""
test_cases = [
("simple_name", "simple_name"), # No separators
(
"name::with::colons",
"name__reserved__colon__with__reserved__colon__colons",
),
(
"name>with>arrows",
"name__reserved__arrow__with__reserved__arrow__arrows",
),
(
'name"with"quotes',
"name__reserved__quote__with__reserved__quote__quotes",
),
(
'mixed::>"chars',
"mixed__reserved__colon____reserved__arrow____reserved__quote__chars",
),
("", ""), # Empty string
(":::", "__reserved__colon__:"), # Multiple colons - :: replaced, : remains
(
">>>",
"__reserved__arrow____reserved__arrow____reserved__arrow__",
), # Multiple arrows - each > replaced
(
'"""',
"__reserved__quote____reserved__quote____reserved__quote__",
), # Multiple quotes - each " replaced
]
for input_val, expected in test_cases:
result = replace_separators(input_val)
self.assertEqual(
result,
expected,
f"replace_separators('{input_val}') should return '{expected}'",
)
def test_revert_separators_function(self):
"""Test revert_separators function with various inputs."""
test_cases = [
("simple_name", "simple_name"), # No reserved keywords
(
"name__reserved__colon__with__reserved__colon__colons",
"name::with::colons",
),
(
"name__reserved__arrow__with__reserved__arrow__arrows",
"name>with>arrows",
),
(
"name__reserved__quote__with__reserved__quote__quotes",
'name"with"quotes',
),
(
"mixed__reserved__colon____reserved__arrow____reserved__quote__chars",
'mixed::>"chars',
),
("", ""), # Empty string
(
"__reserved__colon__:",
":::",
), # Multiple colons: __reserved__colon__ + : = :: + : = :::
]
for input_val, expected in test_cases:
result = revert_separators(input_val)
self.assertEqual(
result,
expected,
f"revert_separators('{input_val}') should return '{expected}'",
)
def test_round_trip_transformations(self):
"""Test that encode->decode round trips preserve original values."""
test_values = [
"simple_name",
"name::with::colons",
"name>with>arrows",
'name"with"quotes',
'complex::name>with"all',
"unicode测试::name",
'emoji🚀::data📊>chart"report',
" spaced :: values ", # Leading/trailing spaces
"special!@#$%^&*()_+-={}[]|\\:;'<>?,./", # Special characters (non-reserved)
]
for original in test_values:
encoded = replace_separators(original)
decoded = revert_separators(encoded)
self.assertEqual(decoded, original, f"Round trip failed for: '{original}'")
def test_transform_entity_names_with_explicit_config(self):
"""Test transform_entity_names with explicitly configured entities."""
# Test Table (DECODE direction)
table = Table(
id=self.sample_table_id,
name="test__reserved__colon__table__reserved__arrow__name",
databaseSchema=self.sample_schema_ref,
fullyQualifiedName="db.schema.test_table",
columns=[Column(name="id", dataType=DataType.BIGINT)],
)
result = transform_entity_names(table, Table)
self.assertEqual(result.name.root, "test::table>name")
# Test CreateTable (ENCODE direction)
create_request = CreateTableRequest(
name=EntityName('my::table>with"special_chars'),
columns=[Column(name=ColumnName("col1"), dataType=DataType.STRING)],
databaseSchema=FullyQualifiedEntityName("db.schema"),
)
result = transform_entity_names(create_request, CreateTableRequest)
expected = "my__reserved__colon__table__reserved__arrow__with__reserved__quote__special_chars"
self.assertEqual(result.name.root, expected)
def test_transform_entity_names_with_dynamic_pattern(self):
"""Test transform_entity_names with dynamic Create* pattern."""
# Create a custom CreateTableRequest that should use dynamic pattern
create_request = CreateTableRequest(
name=EntityName('dynamic::table>name"test'),
columns=[Column(name=ColumnName("col1"), dataType=DataType.STRING)],
databaseSchema=FullyQualifiedEntityName("db.schema"),
)
# Use a model name not in explicit config to trigger dynamic pattern
result = transform_entity_names(create_request, CreateTableRequest)
expected = "dynamic__reserved__colon__table__reserved__arrow__name__reserved__quote__test"
self.assertEqual(result.name.root, expected)
def test_transform_entity_names_service_exclusion(self):
"""Test that service-level models are excluded from transformation."""
service_request = CreateDatabaseServiceRequest(
name=EntityName('my::database>service"with_separators'), serviceType="Mysql"
)
result = transform_entity_names(service_request, CreateDatabaseServiceRequest)
# Should NOT be transformed
self.assertEqual(result.name.root, 'my::database>service"with_separators')
def test_transform_entity_names_edge_cases(self):
"""Test transform_entity_names with edge cases."""
# Test None entity
result = transform_entity_names(None, Table)
self.assertIsNone(result)
# Test entity without __dict__ (edge case)
simple_value = "test_string"
result = transform_entity_names(simple_value, Table)
self.assertEqual(result, simple_value)
# Test entity with minimal name
table_minimal = Table(
id=self.sample_table_id,
name=EntityName("a"),
databaseSchema=self.sample_schema_ref,
fullyQualifiedName="db.schema.minimal",
columns=[],
)
result = transform_entity_names(table_minimal, Table)
self.assertEqual(result.name.root, "a")
def test_transform_entity_names_with_nested_structures(self):
"""Test transform_entity_names with complex nested structures."""
# Create deeply nested column structure
level3_columns = [
Column(
name=ColumnName("deep__reserved__colon__field"),
dataType=DataType.STRING,
)
]
level2_columns = [
Column(
name=ColumnName("nested__reserved__arrow__struct"),
dataType=DataType.STRUCT,
children=level3_columns,
)
]
level1_column = Column(
name=ColumnName("root__reserved__quote__struct"),
dataType=DataType.STRUCT,
children=level2_columns,
)
table = Table(
id=self.sample_table_id,
name="complex__reserved__colon__table",
columns=[level1_column],
databaseSchema=self.sample_schema_ref,
fullyQualifiedName="db.schema.complex_table",
)
result = transform_entity_names(table, Table)
# Verify table name transformation (DECODE operation)
self.assertEqual(result.name.root, "complex::table")
# Column names should also be decoded since Table config includes columns
self.assertEqual(result.columns[0].name.root, 'root"struct')
self.assertEqual(result.columns[0].children[0].name.root, "nested>struct")
self.assertEqual(
result.columns[0].children[0].children[0].name.root, "deep::field"
)
def test_transform_entity_names_with_root_attributes(self):
"""Test transformation of entities with root attributes (like FullyQualifiedEntityName)."""
# Create a mock entity with root attribute
class MockEntityWithRoot:
def __init__(self, root_value):
self.root = root_value
# Test transformation of root attribute
entity = MockEntityWithRoot("test__reserved__colon__value")
result = transform_entity_names(entity, Table)
self.assertEqual(result.root, "test::value")
def test_unicode_and_international_characters(self):
"""Test handling of Unicode and international characters."""
# Test Unicode characters with separators
table_unicode = Table(
id=self.sample_table_id,
name="測試__reserved__colon__表格__reserved__arrow__名稱",
databaseSchema=self.sample_schema_ref,
fullyQualifiedName="db.schema.unicode_table",
columns=[
Column(name="unicode__reserved__quote__列", dataType=DataType.STRING)
],
)
result = transform_entity_names(table_unicode, Table)
self.assertEqual(result.name.root, "測試::表格>名稱")
# Column names should also be decoded since Table config includes columns
self.assertEqual(result.columns[0].name.root, 'unicode"')
# Test emojis with separators
table_emoji = Table(
id=self.sample_table_id,
name="table🚀__reserved__colon__data📊__reserved__arrow__chart",
databaseSchema=self.sample_schema_ref,
fullyQualifiedName="db.schema.emoji_table",
columns=[
Column(name="emoji__reserved__quote__field🎯", dataType=DataType.STRING)
],
)
result = transform_entity_names(table_emoji, Table)
self.assertEqual(result.name.root, "table🚀::data📊>chart")
self.assertEqual(result.columns[0].name.root, 'emoji"field🎯')
def test_very_long_strings(self):
"""Test handling of long strings within validation limits."""
# Create long names within validation limits (under 256 chars)
long_name = (
"a" * 50
+ "__reserved__colon__"
+ "b" * 50
+ "__reserved__arrow__"
+ "c" * 50
)
table = Table(
id=self.sample_table_id,
name=long_name,
databaseSchema=self.sample_schema_ref,
fullyQualifiedName="db.schema.long_table",
columns=[],
)
result = transform_entity_names(table, Table)
# Should still transform correctly
expected = "a" * 50 + "::" + "b" * 50 + ">" + "c" * 50
self.assertEqual(result.name.root, expected)
def test_nested_reserved_keywords(self):
"""Test handling of nested/overlapping reserved keywords."""
# Test overlapping patterns
overlapping_name = "test__reserved__colon____reserved__colon__reserved__name"
table = Table(
id=self.sample_table_id,
name=overlapping_name,
databaseSchema=self.sample_schema_ref,
fullyQualifiedName="db.schema.overlapping_table",
columns=[],
)
result = transform_entity_names(table, Table)
# This should handle the overlapping keywords correctly
expected = "test::::reserved__name"
self.assertEqual(result.name.root, expected)
def test_error_handling_and_logging(self):
"""Test error handling and logging in transformation functions."""
# Test with mock entity that might cause errors
class ProblematicEntity:
def __init__(self):
self.name = "test_name"
def __getattribute__(self, name):
if name == "name" and hasattr(self, "_fail_count"):
self._fail_count += 1
if self._fail_count > 2:
raise ValueError("Simulated error")
return super().__getattribute__(name)
problematic_entity = ProblematicEntity()
problematic_entity._fail_count = 0
# Should handle errors gracefully and return original entity
with patch(
"metadata.ingestion.models.custom_basemodel_validation.logger"
) as mock_logger:
result = transform_entity_names(problematic_entity, Table)
# Should return original entity on error
self.assertEqual(result, problematic_entity)
def test_performance_with_large_datasets(self):
"""Test performance with large datasets."""
# Create table with many columns
large_columns = []
for i in range(100):
col_name = f"col_{i}__reserved__colon__field_{i}"
large_columns.append(
Column(name=ColumnName(col_name), dataType=DataType.STRING)
)
large_table = Table(
id=self.sample_table_id,
name="large__reserved__arrow__table",
databaseSchema=self.sample_schema_ref,
fullyQualifiedName="db.schema.large_table",
columns=large_columns,
)
# Should handle large datasets efficiently
result = transform_entity_names(large_table, Table)
self.assertEqual(result.name.root, "large>table")
self.assertEqual(len(result.columns), 100)
# Verify first and last columns are transformed correctly
self.assertEqual(result.columns[0].name.root, "col_0::field_0")
self.assertEqual(result.columns[99].name.root, "col_99::field_99")
def test_dashboard_data_model_transformations(self):
"""Test DashboardDataModel specific transformations."""
# Test DashboardDataModel with nested columns
child_columns = [
Column(
name=ColumnName("nested__reserved__colon__metric"),
dataType=DataType.DOUBLE,
),
Column(
name=ColumnName("nested__reserved__arrow__dimension"),
dataType=DataType.STRING,
),
]
parent_column = Column(
name=ColumnName("complex__reserved__quote__field"),
dataType=DataType.STRUCT,
children=child_columns,
)
dashboard_model = DashboardDataModel(
id=uuid.uuid4(),
name="dashboard__reserved__colon__model__reserved__quote__name",
dataModelType=DataModelType.TableauDataModel,
columns=[parent_column],
)
result = transform_entity_names(dashboard_model, DashboardDataModel)
# Verify transformations
self.assertEqual(result.name.root, 'dashboard::model"name')
self.assertEqual(result.columns[0].name.root, 'complex"field')
self.assertEqual(result.columns[0].children[0].name.root, "nested::metric")
self.assertEqual(result.columns[0].children[1].name.root, "nested>dimension")
def test_configuration_consistency(self):
"""Test consistency of configuration across the system."""
# Verify that all configured entities have consistent field mappings
for entity_name, config in TRANSFORMABLE_ENTITIES.items():
# Verify direction is valid
self.assertIn(
config["direction"],
[TransformDirection.ENCODE, TransformDirection.DECODE],
)
# Verify fields is not empty
self.assertGreater(
len(config["fields"]),
0,
f"{entity_name} should have at least one field configured",
)
class TestTransformationConstants(TestCase):
"""Test transformation constants and reserved keywords."""
def test_reserved_keywords_constants(self):
"""Test that reserved keyword constants are properly defined."""
self.assertEqual(RESERVED_COLON_KEYWORD, "__reserved__colon__")
self.assertEqual(RESERVED_ARROW_KEYWORD, "__reserved__arrow__")
self.assertEqual(RESERVED_QUOTE_KEYWORD, "__reserved__quote__")
def test_reserved_keywords_uniqueness(self):
"""Test that reserved keywords are unique and don't conflict."""
keywords = [
RESERVED_COLON_KEYWORD,
RESERVED_ARROW_KEYWORD,
RESERVED_QUOTE_KEYWORD,
]
self.assertEqual(
len(keywords), len(set(keywords)), "Reserved keywords should be unique"
)
# Test that keywords don't contain each other
for i, keyword1 in enumerate(keywords):
for j, keyword2 in enumerate(keywords):
if i != j:
self.assertNotIn(
keyword1,
keyword2,
f"{keyword1} should not be contained in {keyword2}",
)
def test_transform_direction_enum(self):
"""Test TransformDirection enum values."""
self.assertEqual(TransformDirection.ENCODE.value, "encode")
self.assertEqual(TransformDirection.DECODE.value, "decode")
# Test enum has exactly two values
self.assertEqual(len(list(TransformDirection)), 2)
class TestDashboardDataModelValidation(TestCase):
"""Test DashboardDataModel-specific validation and transformations."""
def setUp(self):
"""Set up test data."""
self.sample_dashboard_id = uuid.uuid4()
self.sample_service_ref = EntityReference(
id=uuid.uuid4(), type="dashboardService"
)
def test_dashboard_datamodel_create_transformation(self):
"""Test CreateDashboardDataModelRequest transformations with nested children."""
from metadata.generated.schema.api.data.createDashboardDataModel import (
CreateDashboardDataModelRequest,
)
from metadata.generated.schema.entity.data.dashboardDataModel import (
DataModelType,
)
create_request = CreateDashboardDataModelRequest(
name=EntityName('analytics::report>model"quarterly'),
displayName="Analytics Report Model",
dataModelType=DataModelType.PowerBIDataModel,
service=FullyQualifiedEntityName("service.powerbi"),
columns=[
Column(
name=ColumnName("revenue::summary>metrics"),
displayName="Revenue Summary",
dataType=DataType.STRUCT,
children=[
Column(
name=ColumnName('total::amount>"USD"'),
displayName="Total Amount",
dataType=DataType.DECIMAL,
),
Column(
name=ColumnName("nested::data>structure"),
displayName="Nested Data",
dataType=DataType.STRUCT,
children=[
Column(
name=ColumnName('deep::field>"value"'),
displayName="Deep Field",
dataType=DataType.STRING,
)
],
),
],
)
],
)
result = transform_entity_names(create_request, CreateDashboardDataModelRequest)
# Verify main name transformation (ENCODE for Create operations)
self.assertEqual(
result.name.root,
"analytics__reserved__colon__report__reserved__arrow__model__reserved__quote__quarterly",
)
# Verify top-level column transformation
self.assertEqual(
result.columns[0].name.root,
"revenue__reserved__colon__summary__reserved__arrow__metrics",
)
# Verify nested children transformations (first level)
revenue_column = result.columns[0]
self.assertEqual(
revenue_column.children[0].name.root,
"total__reserved__colon__amount__reserved__arrow____reserved__quote__USD__reserved__quote__",
)
self.assertEqual(
revenue_column.children[1].name.root,
"nested__reserved__colon__data__reserved__arrow__structure",
)
# Verify deeply nested transformations (second level)
nested_struct = revenue_column.children[1]
self.assertEqual(
nested_struct.children[0].name.root,
"deep__reserved__colon__field__reserved__arrow____reserved__quote__value__reserved__quote__",
)
def test_dashboard_datamodel_fetch_transformation(self):
"""Test DashboardDataModel fetch transformations with nested children."""
from metadata.generated.schema.entity.data.dashboardDataModel import (
DashboardDataModel,
DataModelType,
)
dashboard_model = DashboardDataModel(
id=self.sample_dashboard_id,
name="analytics__reserved__colon__report__reserved__arrow__model__reserved__quote__quarterly",
displayName="Analytics Report Model",
dataModelType=DataModelType.PowerBIDataModel,
service=self.sample_service_ref,
fullyQualifiedName="service.analytics__reserved__colon__report__reserved__arrow__model__reserved__quote__quarterly",
columns=[
Column(
name=ColumnName(
"revenue__reserved__colon__summary__reserved__arrow__metrics"
),
displayName="Revenue Summary",
dataType=DataType.STRUCT,
children=[
Column(
name=ColumnName(
"total__reserved__colon__amount__reserved__arrow____reserved__quote__USD__reserved__quote__"
),
displayName="Total Amount",
dataType=DataType.DECIMAL,
),
Column(
name=ColumnName(
"nested__reserved__colon__data__reserved__arrow__structure"
),
displayName="Nested Data",
dataType=DataType.STRUCT,
children=[
Column(
name=ColumnName(
"deep__reserved__colon__field__reserved__arrow____reserved__quote__value__reserved__quote__"
),
displayName="Deep Field",
dataType=DataType.STRING,
)
],
),
],
)
],
)
result = transform_entity_names(dashboard_model, DashboardDataModel)
# Verify main name transformation (DECODE for fetch operations)
self.assertEqual(result.name.root, 'analytics::report>model"quarterly')
# Verify top-level column transformation
self.assertEqual(result.columns[0].name.root, "revenue::summary>metrics")
# Verify nested children transformations (first level)
revenue_column = result.columns[0]
self.assertEqual(revenue_column.children[0].name.root, 'total::amount>"USD"')
self.assertEqual(revenue_column.children[1].name.root, "nested::data>structure")
# Verify deeply nested transformations (second level)
nested_struct = revenue_column.children[1]
self.assertEqual(nested_struct.children[0].name.root, 'deep::field>"value"')
def test_dashboard_datamodel_edge_cases(self):
"""Test edge cases for DashboardDataModel transformations."""
from metadata.generated.schema.entity.data.dashboardDataModel import (
DashboardDataModel,
DataModelType,
)
# Test with empty children
model_empty_children = DashboardDataModel(
id=self.sample_dashboard_id,
name="test__reserved__colon__model",
displayName="Test Model",
dataModelType=DataModelType.PowerBIDataModel,
service=self.sample_service_ref,
fullyQualifiedName="service.test__reserved__colon__model",
columns=[
Column(
name=ColumnName("parent__reserved__arrow__column"),
displayName="Parent Column",
dataType=DataType.STRUCT,
children=[], # Empty children list
)
],
)
result_empty = transform_entity_names(model_empty_children, DashboardDataModel)
self.assertEqual(result_empty.name.root, "test::model")
self.assertEqual(result_empty.columns[0].name.root, "parent>column")
# Test with None children
model_none_children = DashboardDataModel(
id=self.sample_dashboard_id,
name="test__reserved__quote__model",
displayName="Test Model",
dataModelType=DataModelType.PowerBIDataModel,
service=self.sample_service_ref,
fullyQualifiedName="service.test__reserved__quote__model",
columns=[
Column(
name=ColumnName("parent__reserved__quote__column"),
displayName="Parent Column",
dataType=DataType.STRING,
children=None, # None children
)
],
)
result_none = transform_entity_names(model_none_children, DashboardDataModel)
self.assertEqual(result_none.name.root, 'test"model')
self.assertEqual(result_none.columns[0].name.root, 'parent"column')
def test_dashboard_datamodel_complex_nested_structures(self):
"""Test complex nested structures with multiple levels and various datatypes."""
from metadata.generated.schema.entity.data.dashboardDataModel import (
DashboardDataModel,
DataModelType,
)
complex_model = DashboardDataModel(
id=self.sample_dashboard_id,
name="complex__reserved__colon__model__reserved__arrow__test",
displayName="Complex Test Model",
dataModelType=DataModelType.PowerBIDataModel,
service=self.sample_service_ref,
fullyQualifiedName="service.complex__reserved__colon__model__reserved__arrow__test",
columns=[
Column(
name=ColumnName(
"level1__reserved__colon__struct__reserved__arrow__data"
),
displayName="Level 1 Struct",
dataType=DataType.STRUCT,
children=[
Column(
name=ColumnName(
"level2__reserved__quote__array__reserved__colon__items"
),
displayName="Level 2 Array",
dataType=DataType.ARRAY,
arrayDataType=DataType.STRUCT,
children=[
Column(
name=ColumnName(
"level3__reserved__arrow__nested__reserved__quote__field"
),
displayName="Level 3 Nested",
dataType=DataType.STRUCT,
children=[
Column(
name=ColumnName(
"level4__reserved__colon__deep__reserved__arrow__value"
),
displayName="Level 4 Deep",
dataType=DataType.STRING,
)
],
)
],
),
Column(
name=ColumnName("simple__reserved__quote__field"),
displayName="Simple Field",
dataType=DataType.INT,
),
],
)
],
)
result = transform_entity_names(complex_model, DashboardDataModel)
# Verify transformations at each level
self.assertEqual(result.name.root, "complex::model>test")
self.assertEqual(result.columns[0].name.root, "level1::struct>data")
# Level 2
level1_struct = result.columns[0]
self.assertEqual(level1_struct.children[0].name.root, 'level2"array::items')
self.assertEqual(level1_struct.children[1].name.root, 'simple"field')
# Level 3
level2_array = level1_struct.children[0]
self.assertEqual(level2_array.children[0].name.root, 'level3>nested"field')
# Level 4
level3_nested = level2_array.children[0]
self.assertEqual(level3_nested.children[0].name.root, "level4::deep>value")
def test_dashboard_datamodel_round_trip_validation(self):
"""Test round-trip validation for DashboardDataModel transformations."""
from metadata.generated.schema.api.data.createDashboardDataModel import (
CreateDashboardDataModelRequest,
)
from metadata.generated.schema.entity.data.dashboardDataModel import (
DashboardDataModel,
DataModelType,
)
# Test data with mixed special characters
test_cases = [
("simple::name", "simple__reserved__colon__name"),
(
'complex::name>with"quotes',
"complex__reserved__colon__name__reserved__arrow__with__reserved__quote__quotes",
),
(
'edge::case>test"data',
"edge__reserved__colon__case__reserved__arrow__test__reserved__quote__data",
),
]
for original_name, encoded_name in test_cases:
with self.subTest(original_name=original_name):
# Create request (should encode)
create_request = CreateDashboardDataModelRequest(
name=EntityName(original_name),
displayName="Test Model",
dataModelType=DataModelType.PowerBIDataModel,
service=FullyQualifiedEntityName("service.test"),
columns=[
Column(
name=ColumnName(original_name),
displayName="Test Column",
dataType=DataType.STRING,
)
],
)
create_result = transform_entity_names(
create_request, CreateDashboardDataModelRequest
)
self.assertEqual(create_result.name.root, encoded_name)
self.assertEqual(create_result.columns[0].name.root, encoded_name)
# Fetch model (should decode)
fetch_model = DashboardDataModel(
id=self.sample_dashboard_id,
name=encoded_name,
displayName="Test Model",
dataModelType=DataModelType.PowerBIDataModel,
service=self.sample_service_ref,
fullyQualifiedName=f"service.{encoded_name}",
columns=[
Column(
name=ColumnName(encoded_name),
displayName="Test Column",
dataType=DataType.STRING,
)
],
)
fetch_result = transform_entity_names(fetch_model, DashboardDataModel)
self.assertEqual(fetch_result.name.root, original_name)
self.assertEqual(fetch_result.columns[0].name.root, original_name)
if __name__ == "__main__":
import unittest
unittest.main()

View File

@ -1,11 +1,14 @@
import uuid
from typing import List
from typing import List, Optional
from unittest import TestCase
from metadata.generated.schema.api.data.createDashboardDataModel import (
CreateDashboardDataModelRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.entity.data.dashboardDataModel import (
DashboardDataModel,
DataModelType,
@ -16,6 +19,7 @@ from metadata.generated.schema.entity.data.table import (
DataType,
Table,
TableConstraint,
TableType,
)
from metadata.generated.schema.type.basic import (
EntityExtension,
@ -227,3 +231,936 @@ def test_model_dump_json_secrets():
).root_secret.get_secret_value()
== "root_password"
)
# Additional comprehensive tests for enhanced functionality
class ExtendedCustomPydanticValidationTest(TestCase):
"""Extended test suite for comprehensive validation of custom Pydantic functionality."""
def setUp(self):
"""Set up test data for extended tests."""
self.sample_table_id = uuid.uuid4()
self.sample_schema_ref = EntityReference(id=uuid.uuid4(), type="databaseSchema")
def test_service_level_models_not_transformed(self):
"""Test that service-level Create models are not transformed."""
# Test database service creation (should NOT be transformed)
service_request = CreateDatabaseServiceRequest(
name=EntityName('my::database>service"with_separators'), serviceType="Mysql"
)
# Service names should remain unchanged (not transformed)
assert service_request.name.root == 'my::database>service"with_separators'
def test_edge_cases_empty_and_none_values(self):
"""Test handling of edge cases like empty strings and None values."""
# Test minimal name (empty string not allowed by EntityName validation)
table_empty = Table(
id=self.sample_table_id,
name=EntityName("a"),
databaseSchema=self.sample_schema_ref,
fullyQualifiedName="test.empty",
columns=[Column(name="id", dataType=DataType.BIGINT)],
)
assert table_empty.name.root == "a"
# Test table with no columns (edge case)
table_no_columns = Table(
id=self.sample_table_id,
name="test__reserved__colon__table",
databaseSchema=self.sample_schema_ref,
fullyQualifiedName="test.empty",
columns=[],
)
assert table_no_columns.name.root == "test::table"
assert len(table_no_columns.columns) == 0
def test_complex_nested_structures(self):
"""Test complex nested column structures with multiple levels."""
# Create deeply nested structure
level3_columns = [
Column(
name=ColumnName("deep__reserved__colon__field"),
dataType=DataType.STRING,
)
]
level2_columns = [
Column(
name=ColumnName("nested__reserved__arrow__struct"),
dataType=DataType.STRUCT,
children=level3_columns,
)
]
level1_column = Column(
name=ColumnName("root__reserved__quote__struct"),
dataType=DataType.STRUCT,
children=level2_columns,
)
table = Table(
id=self.sample_table_id,
name="complex__reserved__colon__table",
columns=[level1_column],
databaseSchema=self.sample_schema_ref,
fullyQualifiedName="test.complex",
)
# Verify transformations at all levels
assert table.name.root == "complex::table"
assert table.columns[0].name.root == 'root"struct'
assert table.columns[0].children[0].name.root == "nested>struct"
assert table.columns[0].children[0].children[0].name.root == "deep::field"
def test_unicode_and_special_characters(self):
"""Test handling of Unicode and international characters."""
# Test Unicode with separators
table_unicode = Table(
id=self.sample_table_id,
name="測試__reserved__colon__表格__reserved__arrow__名稱",
databaseSchema=self.sample_schema_ref,
fullyQualifiedName="test.unicode",
columns=[
Column(name="unicode__reserved__quote__列", dataType=DataType.STRING)
],
)
assert table_unicode.name.root == "測試::表格>名稱"
assert table_unicode.columns[0].name.root == 'unicode"'
# Test emojis with separators
table_emoji = Table(
id=self.sample_table_id,
name="table🚀__reserved__colon__data📊",
databaseSchema=self.sample_schema_ref,
fullyQualifiedName="test.emoji",
columns=[
Column(name="emoji__reserved__arrow__field🎯", dataType=DataType.STRING)
],
)
assert table_emoji.name.root == "table🚀::data📊"
assert table_emoji.columns[0].name.root == "emoji>field🎯"
def test_all_separator_combinations(self):
"""Test all combinations of separators in various scenarios."""
# Test all separators together
complex_name = 'test::colon>arrow"quote__reserved__mixed'
create_request = CreateTableRequest(
name=EntityName(complex_name),
columns=[Column(name=ColumnName("simple_col"), dataType=DataType.STRING)],
databaseSchema=FullyQualifiedEntityName("db.schema"),
)
expected = "test__reserved__colon__colon__reserved__arrow__arrow__reserved__quote__quote__reserved__mixed"
assert create_request.name.root == expected
def test_table_types_and_properties(self):
"""Test different table types and properties with name transformations."""
# Test with comprehensive table properties
table_full = Table(
id=self.sample_table_id,
name="full__reserved__colon__table__reserved__arrow__test",
displayName="Full Test Table",
description=Markdown(root="A comprehensive test table"),
tableType=TableType.Regular,
databaseSchema=self.sample_schema_ref,
fullyQualifiedName="test.db.schema.full_table",
columns=[
Column(
name=ColumnName("id__reserved__quote__primary"),
displayName="ID Primary",
dataType=DataType.BIGINT,
description=Markdown(root="Primary key column"),
),
Column(
name=ColumnName("data__reserved__arrow__field"),
displayName="Data Field",
dataType=DataType.STRING,
description=Markdown(root="Data field column"),
),
],
tableConstraints=[
TableConstraint(
constraintType="PRIMARY_KEY",
columns=["id__reserved__quote__primary"],
)
],
)
# Verify all transformations
assert table_full.name.root == "full::table>test"
assert table_full.columns[0].name.root == 'id"primary'
assert table_full.columns[1].name.root == "data>field"
assert table_full.tableConstraints[0].columns[0] == 'id"primary'
def test_dashboard_data_model_comprehensive(self):
"""Test comprehensive DashboardDataModel scenarios."""
# Test with all data model types
data_model_types = [
DataModelType.TableauDataModel,
DataModelType.PowerBIDataModel,
DataModelType.SupersetDataModel,
DataModelType.MetabaseDataModel,
]
for model_type in data_model_types:
dashboard_model = DashboardDataModel(
id=uuid.uuid4(),
name=f"model__reserved__colon__{model_type.value.lower()}",
dataModelType=model_type,
columns=[
Column(
name=ColumnName(
f"metric__reserved__arrow__{model_type.value.lower()}"
),
dataType=DataType.DOUBLE,
)
],
)
expected_name = f"model::{model_type.value.lower()}"
expected_col = f"metric>{model_type.value.lower()}"
assert dashboard_model.name.root == expected_name
assert dashboard_model.columns[0].name.root == expected_col
def test_create_requests_comprehensive(self):
"""Test comprehensive CreateRequest scenarios."""
# Test CreateTableRequest with all possible fields
comprehensive_request = CreateTableRequest(
name=EntityName('comprehensive::table>name"test'),
displayName='Comprehensive"Table>Test::Name',
description=Markdown(root="A comprehensive test table with all fields"),
tableType=TableType.Regular,
columns=[
Column(
name=ColumnName("primary__reserved__quote__key"),
displayName="Primary Key",
dataType=DataType.BIGINT,
constraint="NOT_NULL",
ordinalPosition=1,
),
Column(
name=ColumnName("foreign__reserved__arrow__key"),
displayName="Foreign Key",
dataType=DataType.BIGINT,
constraint="NOT_NULL",
ordinalPosition=2,
),
Column(
name=ColumnName("nested__reserved__colon__struct"),
displayName="Nested Struct",
dataType=DataType.STRUCT,
children=[
Column(
name=ColumnName("child__reserved__quote__field"),
dataType=DataType.STRING,
)
],
),
],
tableConstraints=[
TableConstraint(
constraintType="PRIMARY_KEY",
columns=["primary__reserved__quote__key"],
),
TableConstraint(
constraintType="UNIQUE", columns=["foreign__reserved__arrow__key"]
),
],
databaseSchema=FullyQualifiedEntityName("test__reserved__colon__db.schema"),
)
# Verify transformations
assert (
comprehensive_request.name.root
== "comprehensive__reserved__colon__table__reserved__arrow__name__reserved__quote__test"
)
assert (
comprehensive_request.columns[0].name.root
== "primary__reserved__quote__key"
)
assert (
comprehensive_request.columns[1].name.root
== "foreign__reserved__arrow__key"
)
assert (
comprehensive_request.columns[2].name.root
== "nested__reserved__colon__struct"
)
assert (
comprehensive_request.columns[2].children[0].name.root
== "child__reserved__quote__field"
)
def test_mixed_separator_edge_cases(self):
"""Test edge cases with mixed separators."""
edge_cases = [
# Consecutive separators
(
'test::>>""name',
"test__reserved__colon____reserved__arrow____reserved__arrow____reserved__quote____reserved__quote__name",
),
# Separators at start and end
(
'::test>name"',
"__reserved__colon__test__reserved__arrow__name__reserved__quote__",
),
# Only separators
('::>"', "__reserved__colon____reserved__arrow____reserved__quote__"),
# Empty between separators
(
'test::>"name',
"test__reserved__colon____reserved__arrow____reserved__quote__name",
),
]
for input_name, expected in edge_cases:
create_request = CreateTableRequest(
name=EntityName(input_name),
columns=[Column(name=ColumnName("col"), dataType=DataType.STRING)],
databaseSchema=FullyQualifiedEntityName("db.schema"),
)
assert (
create_request.name.root == expected
), f"Failed for input: {input_name}"
def test_very_long_names_performance(self):
"""Test performance with very long names."""
# Create very long names to test performance
long_base_name = "very_long_table_name_" * 3
long_name_with_separators = (
f'{long_base_name}::separator>{long_base_name}"quote{long_base_name}'
)
create_request = CreateTableRequest(
name=EntityName(long_name_with_separators),
columns=[Column(name=ColumnName("col"), dataType=DataType.STRING)],
databaseSchema=FullyQualifiedEntityName("db.schema"),
)
# Should handle long names without issues
result_name = create_request.name.root
assert "__reserved__colon__" in result_name
assert "__reserved__arrow__" in result_name
assert "__reserved__quote__" in result_name
def test_happy_path_simple_names(self):
"""Test happy path with simple names that don't need transformation."""
# Test simple names without special characters
simple_create = CreateTableRequest(
name=EntityName("simple_table_name"),
columns=[
Column(name=ColumnName("simple_column"), dataType=DataType.STRING)
],
databaseSchema=FullyQualifiedEntityName("db.schema"),
)
# Names should remain unchanged
assert simple_create.name.root == "simple_table_name"
assert simple_create.columns[0].name.root == "simple_column"
# Test simple fetch model
simple_table = Table(
id=self.sample_table_id,
name="simple_table",
databaseSchema=self.sample_schema_ref,
fullyQualifiedName="db.schema.simple_table",
columns=[Column(name="simple_col", dataType=DataType.STRING)],
)
assert simple_table.name.root == "simple_table"
assert simple_table.columns[0].name.root == "simple_col"
def test_error_handling_invalid_models(self):
"""Test error handling with None and invalid models."""
# Test with None entity
result = None
# This would normally be called by the validation system
# Just ensure no exceptions are thrown
# Test with mock invalid object
class InvalidModel:
def __init__(self):
self.invalid_attr = "test"
invalid_obj = InvalidModel()
# Should handle gracefully without transformation
assert hasattr(invalid_obj, "invalid_attr")
def test_boundary_conditions(self):
"""Test boundary conditions and edge cases."""
# Test single character names
single_char_create = CreateTableRequest(
name=EntityName("a"),
columns=[Column(name=ColumnName("b"), dataType=DataType.STRING)],
databaseSchema=FullyQualifiedEntityName("db.schema"),
)
assert single_char_create.name.root == "a"
# Test names with only separators
separator_only = CreateTableRequest(
name=EntityName("::"),
columns=[Column(name=ColumnName(">"), dataType=DataType.STRING)],
databaseSchema=FullyQualifiedEntityName("db.schema"),
)
assert separator_only.name.root == "__reserved__colon__"
assert separator_only.columns[0].name.root == "__reserved__arrow__"
def test_whitespace_handling(self):
"""Test handling of whitespace in various scenarios."""
whitespace_cases = [
# Leading/trailing spaces
(" test::name ", " test__reserved__colon__name "),
# Spaces around separators
(" test :: name ", " test __reserved__colon__ name "),
# Multiple spaces
("test :: name", "test __reserved__colon__ name"),
# Tabs and newlines (should be preserved)
("test\t::\nname", "test\t__reserved__colon__\nname"),
]
for input_name, expected in whitespace_cases:
create_request = CreateTableRequest(
name=EntityName(input_name),
columns=[Column(name=ColumnName("col"), dataType=DataType.STRING)],
databaseSchema=FullyQualifiedEntityName("db.schema"),
)
assert (
create_request.name.root == expected
), f"Failed for input: '{input_name}'"
def test_table_constraints_comprehensive(self):
"""Test comprehensive table constraints scenarios."""
constraint_types = ["PRIMARY_KEY", "UNIQUE", "FOREIGN_KEY"]
constraints = []
columns = []
for i, constraint_type in enumerate(constraint_types):
col_name = f"col_{i}__reserved__colon__constraint"
columns.append(Column(name=ColumnName(col_name), dataType=DataType.STRING))
constraints.append(
TableConstraint(constraintType=constraint_type, columns=[col_name])
)
create_request = CreateTableRequest(
name=EntityName("constraints__reserved__arrow__test"),
columns=columns,
tableConstraints=constraints,
databaseSchema=FullyQualifiedEntityName("db.schema"),
)
# Verify all constraints have transformed column names
for i, constraint in enumerate(create_request.tableConstraints):
expected_col = f"col_{i}__reserved__colon__constraint"
assert constraint.columns[0] == expected_col
def test_entity_references_and_relationships(self):
"""Test entity references and relationship handling."""
# Test with complex entity references
table_with_refs = Table(
id=self.sample_table_id,
name="table__reserved__colon__with__reserved__arrow__refs",
databaseSchema=EntityReference(
id=uuid.uuid4(),
type="databaseSchema",
name="schema__reserved__quote__name",
),
fullyQualifiedName="service.db.schema__reserved__quote__name.table",
columns=[
Column(
name=ColumnName("ref__reserved__colon__column"),
dataType=DataType.STRING,
)
],
)
# Verify transformations
assert table_with_refs.name.root == "table::with>refs"
assert table_with_refs.columns[0].name.root == "ref::column"
# Entity references should not be transformed (they're separate entities)
assert table_with_refs.databaseSchema.name == "schema__reserved__quote__name"
class CustomSecretStrExtendedTest(TestCase):
"""Extended test suite for CustomSecretStr functionality."""
def test_secret_creation_and_access(self):
"""Test CustomSecretStr creation and value access."""
secret = CustomSecretStr("test_password")
assert secret.get_secret_value() == "test_password"
assert str(secret) == "**********"
assert repr(secret) == "SecretStr('**********')"
def test_empty_and_none_secrets(self):
"""Test handling of empty and None secret values."""
# Test empty secret
empty_secret = CustomSecretStr("")
assert empty_secret.get_secret_value() == ""
assert str(empty_secret) == ""
# Test None secret handling
try:
none_secret = CustomSecretStr(None)
assert none_secret.get_secret_value() is None
except (TypeError, ValueError, AttributeError):
# This is acceptable behavior for None values
pass
def test_long_secrets(self):
"""Test handling of very long secret values."""
long_secret_value = "a" * 1000
long_secret = CustomSecretStr(long_secret_value)
assert long_secret.get_secret_value() == long_secret_value
assert (
str(long_secret) == "**********"
) # Should still mask regardless of length
def test_special_character_secrets(self):
"""Test secrets with special characters."""
special_chars = "!@#$%^&*()_+-=[]{}|;':,.<>?/~`"
special_secret = CustomSecretStr(special_chars)
assert special_secret.get_secret_value() == special_chars
assert str(special_secret) == "**********"
def test_unicode_secrets(self):
"""Test secrets with Unicode characters."""
unicode_secret = CustomSecretStr("密码测试🔒")
assert unicode_secret.get_secret_value() == "密码测试🔒"
assert str(unicode_secret) == "**********"
def test_secret_equality_and_hashing(self):
"""Test secret equality and hashing behavior."""
secret1 = CustomSecretStr("password123")
secret2 = CustomSecretStr("password123")
secret3 = CustomSecretStr("different_password")
# Test equality
assert secret1.get_secret_value() == secret2.get_secret_value()
assert secret1.get_secret_value() != secret3.get_secret_value()
# Test that string representation is always masked
assert str(secret1) == str(secret2) == str(secret3) == "**********"
def test_secret_in_nested_models_deep(self):
"""Test secrets in deeply nested model structures."""
class Level3Model(BaseModel):
deep_secret: CustomSecretStr
deep_value: str
class Level2Model(BaseModel):
mid_secret: CustomSecretStr
level3: Level3Model
class Level1Model(BaseModel):
top_secret: CustomSecretStr
level2: Level2Model
deep_data = {
"top_secret": "top_password",
"level2": {
"mid_secret": "mid_password",
"level3": {"deep_secret": "deep_password", "deep_value": "not_secret"},
},
}
deep_model = Level1Model(**deep_data)
# Test masked dump
masked = deep_model.model_dump(mask_secrets=True)
assert masked["top_secret"] == "**********"
assert masked["level2"]["mid_secret"] == "**********"
assert masked["level2"]["level3"]["deep_secret"] == "**********"
assert masked["level2"]["level3"]["deep_value"] == "not_secret"
# Test unmasked dump
unmasked = deep_model.model_dump(mask_secrets=False)
assert unmasked["top_secret"] == "top_password"
assert unmasked["level2"]["mid_secret"] == "mid_password"
assert unmasked["level2"]["level3"]["deep_secret"] == "deep_password"
def test_secret_with_optional_fields(self):
"""Test secrets with optional fields."""
class OptionalSecretModel(BaseModel):
required_secret: CustomSecretStr
optional_secret: Optional[CustomSecretStr] = None
optional_value: Optional[str] = None
# Test with all fields
full_model = OptionalSecretModel(
required_secret="required_pass",
optional_secret="optional_pass",
optional_value="some_value",
)
masked_full = full_model.model_dump(mask_secrets=True)
assert masked_full["required_secret"] == "**********"
assert masked_full["optional_secret"] == "**********"
assert masked_full["optional_value"] == "some_value"
# Test with only required fields
minimal_model = OptionalSecretModel(required_secret="required_pass")
masked_minimal = minimal_model.model_dump(mask_secrets=True)
assert masked_minimal["required_secret"] == "**********"
assert masked_minimal["optional_secret"] is None
assert masked_minimal["optional_value"] is None
def test_secret_lists_and_dictionaries(self):
"""Test secrets in lists and dictionaries."""
class ComplexSecretModel(BaseModel):
secret_list: List[CustomSecretStr]
nested_secrets: List[dict]
complex_data = {
"secret_list": ["password1", "password2", "password3"],
"nested_secrets": [
{"name": "config1", "secret": CustomSecretStr("secret1")},
{"name": "config2", "secret": CustomSecretStr("secret2")},
],
}
complex_model = ComplexSecretModel(**complex_data)
# Test that list secrets are handled
assert len(complex_model.secret_list) == 3
assert all(str(secret) == "**********" for secret in complex_model.secret_list)
assert all(
secret.get_secret_value() in ["password1", "password2", "password3"]
for secret in complex_model.secret_list
)
class DashboardDataModelTransformationTest(TestCase):
"""Test DashboardDataModel transformations with nested children and reserved keywords."""
def setUp(self):
"""Set up test data."""
self.sample_service = FullyQualifiedEntityName(
root='TestService.PowerBI."Analysis>Services::Environment"'
)
def test_create_dashboard_datamodel_with_nested_children(self):
"""Test CreateDashboardDataModelRequest with nested children containing reserved keywords."""
create_request = CreateDashboardDataModelRequest(
name=EntityName('financial::report>model"quarterly'),
displayName="Financial Report Model",
description=Markdown(
root="Financial reporting model with special characters"
),
dataModelType=DataModelType.PowerBIDataModel,
service=self.sample_service,
columns=[
Column(
name=ColumnName("revenue::metrics>summary"),
displayName="Revenue Metrics",
dataType=DataType.STRUCT,
description=Markdown(root="Revenue metrics structure"),
children=[
Column(
name=ColumnName("total::revenue>amount"),
displayName="Total Revenue",
dataType=DataType.DECIMAL,
description=Markdown(root="Total revenue amount"),
),
Column(
name=ColumnName('currency::code>"USD"'),
displayName="Currency Code",
dataType=DataType.STRING,
description=Markdown(root="Currency code with quotes"),
),
Column(
name=ColumnName("nested::struct>data"),
displayName="Nested Structure",
dataType=DataType.STRUCT,
children=[
Column(
name=ColumnName('deep::field>"value"'),
displayName="Deep Field",
dataType=DataType.STRING,
)
],
),
],
),
Column(
name=ColumnName("expenses::breakdown>categories"),
displayName="Expense Breakdown",
dataType=DataType.ARRAY,
arrayDataType=DataType.STRUCT,
children=[
Column(
name=ColumnName('category::name>"operations"'),
displayName="Category Name",
dataType=DataType.STRING,
),
Column(
name=ColumnName("amount::value>total"),
displayName="Amount Value",
dataType=DataType.DECIMAL,
),
],
),
],
)
# Verify main entity name transformation (ENCODE for Create operations)
assert (
create_request.name.root
== "financial__reserved__colon__report__reserved__arrow__model__reserved__quote__quarterly"
)
# Verify top-level column name transformations
assert (
create_request.columns[0].name.root
== "revenue__reserved__colon__metrics__reserved__arrow__summary"
)
assert (
create_request.columns[1].name.root
== "expenses__reserved__colon__breakdown__reserved__arrow__categories"
)
# Verify nested children transformations (first level)
revenue_column = create_request.columns[0]
assert (
revenue_column.children[0].name.root
== "total__reserved__colon__revenue__reserved__arrow__amount"
)
assert (
revenue_column.children[1].name.root
== "currency__reserved__colon__code__reserved__arrow____reserved__quote__USD__reserved__quote__"
)
assert (
revenue_column.children[2].name.root
== "nested__reserved__colon__struct__reserved__arrow__data"
)
# Verify deeply nested children transformations (second level)
nested_struct = revenue_column.children[2]
assert (
nested_struct.children[0].name.root
== "deep__reserved__colon__field__reserved__arrow____reserved__quote__value__reserved__quote__"
)
# Verify array children transformations
expenses_column = create_request.columns[1]
assert (
expenses_column.children[0].name.root
== "category__reserved__colon__name__reserved__arrow____reserved__quote__operations__reserved__quote__"
)
assert (
expenses_column.children[1].name.root
== "amount__reserved__colon__value__reserved__arrow__total"
)
def test_fetch_dashboard_datamodel_with_nested_children(self):
"""Test DashboardDataModel fetch with nested children containing encoded reserved keywords."""
dashboard_model = DashboardDataModel(
id=uuid.uuid4(),
name="financial__reserved__colon__report__reserved__arrow__model__reserved__quote__quarterly",
displayName="Financial Report Model",
dataModelType=DataModelType.PowerBIDataModel,
service=EntityReference(id=uuid.uuid4(), type="dashboardService"),
fullyQualifiedName="service.financial__reserved__colon__report__reserved__arrow__model__reserved__quote__quarterly",
columns=[
Column(
name=ColumnName(
"revenue__reserved__colon__metrics__reserved__arrow__summary"
),
displayName="Revenue Metrics",
dataType=DataType.STRUCT,
children=[
Column(
name=ColumnName(
"total__reserved__colon__revenue__reserved__arrow__amount"
),
displayName="Total Revenue",
dataType=DataType.DECIMAL,
),
Column(
name=ColumnName(
"currency__reserved__colon__code__reserved__arrow____reserved__quote__USD__reserved__quote__"
),
displayName="Currency Code",
dataType=DataType.STRING,
),
Column(
name=ColumnName(
"nested__reserved__colon__struct__reserved__arrow__data"
),
displayName="Nested Structure",
dataType=DataType.STRUCT,
children=[
Column(
name=ColumnName(
"deep__reserved__colon__field__reserved__arrow____reserved__quote__value__reserved__quote__"
),
displayName="Deep Field",
dataType=DataType.STRING,
)
],
),
],
),
Column(
name=ColumnName(
"expenses__reserved__colon__breakdown__reserved__arrow__categories"
),
displayName="Expense Breakdown",
dataType=DataType.ARRAY,
arrayDataType=DataType.STRUCT,
children=[
Column(
name=ColumnName(
"category__reserved__colon__name__reserved__arrow____reserved__quote__operations__reserved__quote__"
),
displayName="Category Name",
dataType=DataType.STRING,
),
Column(
name=ColumnName(
"amount__reserved__colon__value__reserved__arrow__total"
),
displayName="Amount Value",
dataType=DataType.DECIMAL,
),
],
),
],
)
# Verify main entity name transformation (DECODE for fetch operations)
assert dashboard_model.name.root == 'financial::report>model"quarterly'
# Verify top-level column name transformations
assert dashboard_model.columns[0].name.root == "revenue::metrics>summary"
assert dashboard_model.columns[1].name.root == "expenses::breakdown>categories"
# Verify nested children transformations (first level)
revenue_column = dashboard_model.columns[0]
assert revenue_column.children[0].name.root == "total::revenue>amount"
assert revenue_column.children[1].name.root == 'currency::code>"USD"'
assert revenue_column.children[2].name.root == "nested::struct>data"
# Verify deeply nested children transformations (second level)
nested_struct = revenue_column.children[2]
assert nested_struct.children[0].name.root == 'deep::field>"value"'
# Verify array children transformations
expenses_column = dashboard_model.columns[1]
assert expenses_column.children[0].name.root == 'category::name>"operations"'
assert expenses_column.children[1].name.root == "amount::value>total"
def test_dashboard_datamodel_round_trip_transformation(self):
"""Test round-trip transformation: Create -> Fetch -> Create maintains data integrity."""
# Start with create request containing special characters
original_create = CreateDashboardDataModelRequest(
name=EntityName('analytics::dashboard>model"test'),
displayName="Analytics Dashboard Model",
dataModelType=DataModelType.PowerBIDataModel,
service=self.sample_service,
columns=[
Column(
name=ColumnName("metrics::summary>report"),
dataType=DataType.STRUCT,
children=[
Column(
name=ColumnName('total::count>"records"'),
dataType=DataType.INT,
)
],
)
],
)
# Simulate storage (encoded form)
stored_name = original_create.name.root # Should be encoded
stored_column_name = original_create.columns[0].name.root # Should be encoded
stored_nested_name = (
original_create.columns[0].children[0].name.root
) # Should be encoded
# Simulate fetch operation (create DashboardDataModel with stored values)
fetched_model = DashboardDataModel(
id=uuid.uuid4(),
name=stored_name,
displayName="Analytics Dashboard Model",
dataModelType=DataModelType.PowerBIDataModel,
service=EntityReference(id=uuid.uuid4(), type="dashboardService"),
fullyQualifiedName=f"service.{stored_name}",
columns=[
Column(
name=ColumnName(stored_column_name),
dataType=DataType.STRUCT,
children=[
Column(
name=ColumnName(stored_nested_name), dataType=DataType.INT
)
],
)
],
)
# Verify fetch operation decodes correctly
assert fetched_model.name.root == 'analytics::dashboard>model"test'
assert fetched_model.columns[0].name.root == "metrics::summary>report"
assert (
fetched_model.columns[0].children[0].name.root == 'total::count>"records"'
)
# Verify create operation encodes correctly
assert (
stored_name
== "analytics__reserved__colon__dashboard__reserved__arrow__model__reserved__quote__test"
)
assert (
stored_column_name
== "metrics__reserved__colon__summary__reserved__arrow__report"
)
assert (
stored_nested_name
== "total__reserved__colon__count__reserved__arrow____reserved__quote__records__reserved__quote__"
)
def test_dashboard_datamodel_edge_cases(self):
"""Test edge cases for DashboardDataModel transformations."""
# Test with empty children
model_empty_children = DashboardDataModel(
id=uuid.uuid4(),
name="test__reserved__colon__model",
dataModelType=DataModelType.PowerBIDataModel,
service=EntityReference(id=uuid.uuid4(), type="dashboardService"),
fullyQualifiedName="service.test__reserved__colon__model",
columns=[
Column(
name=ColumnName("parent__reserved__arrow__column"),
dataType=DataType.STRUCT,
children=[], # Empty children list
)
],
)
assert model_empty_children.name.root == "test::model"
assert model_empty_children.columns[0].name.root == "parent>column"
# Test with None children
model_none_children = DashboardDataModel(
id=uuid.uuid4(),
name="test__reserved__quote__model",
dataModelType=DataModelType.PowerBIDataModel,
service=EntityReference(id=uuid.uuid4(), type="dashboardService"),
fullyQualifiedName="service.test__reserved__quote__model",
columns=[
Column(
name=ColumnName("parent__reserved__quote__column"),
dataType=DataType.STRING,
children=None, # None children
)
],
)
assert model_none_children.name.root == 'test"model'
assert model_none_children.columns[0].name.root == 'parent"column'

View File

@ -16,8 +16,13 @@ from unittest.mock import MagicMock
import pytest
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.table import Column, Table
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.ingestion.models.custom_basemodel_validation import (
RESERVED_ARROW_KEYWORD,
RESERVED_COLON_KEYWORD,
RESERVED_QUOTE_KEYWORD,
)
from metadata.ingestion.ometa.utils import quote
from metadata.utils import fqn
@ -158,3 +163,128 @@ class TestFqn(TestCase):
assert quote("a.b.c") == "a.b.c"
assert quote(FullyQualifiedEntityName('"foo.bar".baz')) == "%22foo.bar%22.baz"
assert quote('"foo.bar/baz".hello') == "%22foo.bar%2Fbaz%22.hello"
def test_table_with_quotes(self):
"""Test FQN building for table names containing quotes"""
mocked_metadata = MagicMock()
mocked_metadata.es_search_from_fqn.return_value = None
table_name = 'users "2024"'
result = fqn.build(
metadata=mocked_metadata,
entity_type=Table,
service_name="mysql",
database_name="test_db",
schema_name="public",
table_name=table_name,
skip_es_search=True,
)
expected = f"mysql.test_db.public.users {RESERVED_QUOTE_KEYWORD}2024{RESERVED_QUOTE_KEYWORD}"
self.assertEqual(result, expected)
def test_column_with_special_chars(self):
"""Test FQN building for column names with multiple special characters"""
mocked_metadata = MagicMock()
mocked_metadata.es_search_from_fqn.return_value = None
column_name = 'data::type>"info"'
result = fqn.build(
metadata=mocked_metadata,
entity_type=Column,
service_name="postgres",
database_name="analytics",
schema_name="reporting",
table_name="metrics",
column_name=column_name,
)
expected = f"postgres.analytics.reporting.metrics.data{RESERVED_COLON_KEYWORD}type{RESERVED_ARROW_KEYWORD}{RESERVED_QUOTE_KEYWORD}info{RESERVED_QUOTE_KEYWORD}"
self.assertEqual(result, expected)
def test_both_table_and_column_special_chars(self):
"""Test FQN building when both table and column have special characters"""
mocked_metadata = MagicMock()
mocked_metadata.es_search_from_fqn.return_value = None
table_name = "report::daily"
column_name = 'value>"USD"'
result = fqn.build(
metadata=mocked_metadata,
entity_type=Column,
service_name="snowflake",
database_name="warehouse",
schema_name="analytics",
table_name=table_name,
column_name=column_name,
)
expected = f"snowflake.warehouse.analytics.report{RESERVED_COLON_KEYWORD}daily.value{RESERVED_ARROW_KEYWORD}{RESERVED_QUOTE_KEYWORD}USD{RESERVED_QUOTE_KEYWORD}"
self.assertEqual(result, expected)
def test_no_transformation_needed(self):
"""Test FQN building for names without special characters"""
mocked_metadata = MagicMock()
mocked_metadata.es_search_from_fqn.return_value = None
result = fqn.build(
metadata=mocked_metadata,
entity_type=Table,
service_name="mysql",
database_name="test_db",
schema_name="public",
table_name="normal_table_name",
skip_es_search=True,
)
self.assertEqual(result, "mysql.test_db.public.normal_table_name")
def test_real_world_scenarios(self):
"""Test FQN building for real-world database scenarios"""
mocked_metadata = MagicMock()
mocked_metadata.es_search_from_fqn.return_value = None
# Snowflake case-sensitive identifier
snowflake_table = '"MixedCase_Table"'
result1 = fqn.build(
metadata=mocked_metadata,
entity_type=Table,
service_name="snowflake",
database_name="ANALYTICS",
schema_name="PUBLIC",
table_name=snowflake_table,
skip_es_search=True,
)
expected1 = f"snowflake.ANALYTICS.PUBLIC.{RESERVED_QUOTE_KEYWORD}MixedCase_Table{RESERVED_QUOTE_KEYWORD}"
self.assertEqual(result1, expected1)
# PostgreSQL type cast in column
postgres_column = "created_at::timestamp"
result2 = fqn.build(
metadata=mocked_metadata,
entity_type=Column,
service_name="postgres",
database_name="mydb",
schema_name="public",
table_name="events",
column_name=postgres_column,
)
expected2 = (
f"postgres.mydb.public.events.created_at{RESERVED_COLON_KEYWORD}timestamp"
)
self.assertEqual(result2, expected2)
# BigQuery partition notation
bigquery_table = 'events_2024$"daily"'
result3 = fqn.build(
metadata=mocked_metadata,
entity_type=Table,
service_name="bigquery",
database_name="my-project",
schema_name="dataset",
table_name=bigquery_table,
skip_es_search=True,
)
expected3 = f"bigquery.my-project.dataset.events_2024${RESERVED_QUOTE_KEYWORD}daily{RESERVED_QUOTE_KEYWORD}"
self.assertEqual(result3, expected3)

View File

@ -0,0 +1,567 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Comprehensive tests for FQN building with special characters in table and column names.
Tests happy paths, edge cases, error scenarios, and boundaries.
"""
import unittest
from unittest.mock import Mock, patch
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedure
from metadata.generated.schema.entity.data.table import Column, Table
from metadata.ingestion.models.custom_basemodel_validation import (
RESERVED_ARROW_KEYWORD,
RESERVED_COLON_KEYWORD,
RESERVED_QUOTE_KEYWORD,
)
from metadata.utils import fqn
from metadata.utils.fqn import FQNBuildingException
class TestFQNSpecialCharacters(unittest.TestCase):
"""Test FQN building with special characters"""
def setUp(self):
"""Set up test fixtures"""
self.mock_metadata = Mock()
def tearDown(self):
"""Clean up after tests"""
# Reset any mocks
self.mock_metadata.reset_mock()
# ========== HAPPY PATH TESTS ==========
def test_table_name_with_quotes(self):
"""Test table name containing quotes"""
result = fqn.build(
metadata=self.mock_metadata,
entity_type=Table,
service_name="mysql",
database_name="test_db",
schema_name="public",
table_name='users "2024"',
skip_es_search=True,
)
expected = f"mysql.test_db.public.users {RESERVED_QUOTE_KEYWORD}2024{RESERVED_QUOTE_KEYWORD}"
self.assertEqual(result, expected)
def test_table_name_with_colons(self):
"""Test table name containing double colons"""
result = fqn.build(
metadata=self.mock_metadata,
entity_type=Table,
service_name="postgres",
database_name="analytics",
schema_name="reporting",
table_name="report::daily_summary",
skip_es_search=True,
)
expected = (
f"postgres.analytics.reporting.report{RESERVED_COLON_KEYWORD}daily_summary"
)
self.assertEqual(result, expected)
def test_table_name_with_arrows(self):
"""Test table name containing arrow characters"""
result = fqn.build(
metadata=self.mock_metadata,
entity_type=Table,
service_name="snowflake",
database_name="warehouse",
schema_name="staging",
table_name="stage>production_data",
skip_es_search=True,
)
expected = (
f"snowflake.warehouse.staging.stage{RESERVED_ARROW_KEYWORD}production_data"
)
self.assertEqual(result, expected)
def test_column_name_with_quotes(self):
"""Test column name containing quotes"""
result = fqn.build(
metadata=self.mock_metadata,
entity_type=Column,
service_name="mysql",
database_name="test_db",
schema_name="public",
table_name="users",
column_name='data "value"',
)
expected = f"mysql.test_db.public.users.data {RESERVED_QUOTE_KEYWORD}value{RESERVED_QUOTE_KEYWORD}"
self.assertEqual(result, expected)
def test_column_name_with_multiple_special_chars(self):
"""Test column name with combination of special characters"""
result = fqn.build(
metadata=self.mock_metadata,
entity_type=Column,
service_name="postgres",
database_name="analytics",
schema_name="public",
table_name="metrics",
column_name='metric::type>"category"',
)
expected = (
f"postgres.analytics.public.metrics.metric{RESERVED_COLON_KEYWORD}"
f"type{RESERVED_ARROW_KEYWORD}{RESERVED_QUOTE_KEYWORD}category{RESERVED_QUOTE_KEYWORD}"
)
self.assertEqual(result, expected)
def test_both_table_and_column_with_special_chars(self):
"""Test both table and column names with special characters"""
result = fqn.build(
metadata=self.mock_metadata,
entity_type=Column,
service_name="mysql",
database_name="test",
schema_name="schema",
table_name='table "2024"',
column_name="column::data>info",
)
table_transformed = (
f"table {RESERVED_QUOTE_KEYWORD}2024{RESERVED_QUOTE_KEYWORD}"
)
column_transformed = (
f"column{RESERVED_COLON_KEYWORD}data{RESERVED_ARROW_KEYWORD}info"
)
expected = f"mysql.test.schema.{table_transformed}.{column_transformed}"
self.assertEqual(result, expected)
# ========== EDGE CASES ==========
def test_empty_special_chars_only(self):
"""Test names that are only special characters"""
# Just quotes
result = fqn.build(
metadata=self.mock_metadata,
entity_type=Table,
service_name="mysql",
database_name="test",
schema_name="public",
table_name='""',
skip_es_search=True,
)
expected = f"mysql.test.public.{RESERVED_QUOTE_KEYWORD}{RESERVED_QUOTE_KEYWORD}"
self.assertEqual(result, expected)
# Just colons
result = fqn.build(
metadata=self.mock_metadata,
entity_type=Column,
service_name="mysql",
database_name="test",
schema_name="public",
table_name="users",
column_name="::",
)
expected = f"mysql.test.public.users.{RESERVED_COLON_KEYWORD}"
self.assertEqual(result, expected)
def test_consecutive_special_chars(self):
"""Test consecutive special characters"""
result = fqn.build(
metadata=self.mock_metadata,
entity_type=Table,
service_name="postgres",
database_name="db",
schema_name="schema",
table_name='data::::"">>>>>',
skip_es_search=True,
)
# Each special char should be replaced
transformed = (
f"data{RESERVED_COLON_KEYWORD}{RESERVED_COLON_KEYWORD}{RESERVED_QUOTE_KEYWORD}"
f"{RESERVED_QUOTE_KEYWORD}{RESERVED_ARROW_KEYWORD}"
f"{RESERVED_ARROW_KEYWORD}{RESERVED_ARROW_KEYWORD}"
f"{RESERVED_ARROW_KEYWORD}{RESERVED_ARROW_KEYWORD}"
)
expected = f"postgres.db.schema.{transformed}"
self.assertEqual(result, expected)
def test_special_chars_at_boundaries(self):
"""Test special characters at start and end of names"""
# Special char at start
result = fqn.build(
metadata=self.mock_metadata,
entity_type=Table,
service_name="mysql",
database_name="db",
schema_name="schema",
table_name='"table_name',
skip_es_search=True,
)
expected = f"mysql.db.schema.{RESERVED_QUOTE_KEYWORD}table_name"
self.assertEqual(result, expected)
# Special char at end
result = fqn.build(
metadata=self.mock_metadata,
entity_type=Column,
service_name="mysql",
database_name="db",
schema_name="schema",
table_name="table",
column_name="column_name::",
)
expected = f"mysql.db.schema.table.column_name{RESERVED_COLON_KEYWORD}"
self.assertEqual(result, expected)
def test_unicode_with_special_chars(self):
"""Test Unicode characters mixed with special characters"""
result = fqn.build(
metadata=self.mock_metadata,
entity_type=Table,
service_name="mysql",
database_name="test",
schema_name="public",
table_name='測試::table>"数据"',
skip_es_search=True,
)
transformed = f"測試{RESERVED_COLON_KEYWORD}table{RESERVED_ARROW_KEYWORD}{RESERVED_QUOTE_KEYWORD}数据{RESERVED_QUOTE_KEYWORD}"
expected = f"mysql.test.public.{transformed}"
self.assertEqual(result, expected)
def test_emoji_with_special_chars(self):
"""Test emojis mixed with special characters"""
result = fqn.build(
metadata=self.mock_metadata,
entity_type=Column,
service_name="postgres",
database_name="fun",
schema_name="emoji",
table_name="data",
column_name='🚀::rocket>"launch"',
)
transformed = f"🚀{RESERVED_COLON_KEYWORD}rocket{RESERVED_ARROW_KEYWORD}{RESERVED_QUOTE_KEYWORD}launch{RESERVED_QUOTE_KEYWORD}"
expected = f"postgres.fun.emoji.data.{transformed}"
self.assertEqual(result, expected)
# ========== NULL/NONE HANDLING ==========
def test_none_table_name(self):
"""Test with None table name - should not transform"""
result = fqn.build(
metadata=self.mock_metadata,
entity_type=Database,
service_name="mysql",
database_name="test_db",
)
# Should work without transformation
expected = "mysql.test_db"
self.assertEqual(result, expected)
def test_none_column_name(self):
"""Test with None column name - should handle gracefully"""
with self.assertRaises(FQNBuildingException):
fqn.build(
metadata=self.mock_metadata,
entity_type=Table,
service_name="mysql",
database_name="db",
schema_name="schema",
table_name="table_name",
column_name=None,
skip_es_search=True,
)
def test_empty_string_names(self):
"""Test with empty string names"""
# Empty table name should still be processed
result = fqn.build(
metadata=self.mock_metadata,
entity_type=Table,
service_name="mysql",
database_name="db",
schema_name="schema",
table_name="",
skip_es_search=True,
)
# Empty string is valid
expected = "mysql.db.schema."
self.assertEqual(result, expected)
# ========== OTHER ENTITY TYPES (No Transformation) ==========
def test_database_name_with_quotes_should_raise_error(self):
"""Test that Database entities don't get transformed"""
with self.assertRaises(FQNBuildingException):
fqn.build(
metadata=self.mock_metadata,
entity_type=Database,
service_name="mysql",
database_name='db "name"',
)
def test_schema_name_with_quotes_should_raise_error(self):
"""Test that DatabaseSchema entities don't get transformed"""
with self.assertRaises(FQNBuildingException):
fqn.build(
metadata=self.mock_metadata,
entity_type=DatabaseSchema,
service_name="postgres",
database_name="db",
schema_name='schema::"name"',
skip_es_search=True,
)
def test_stored_procedure_name_with_quotes_should_not_transform(self):
"""Test that StoredProcedure entities don't get transformed"""
with self.assertRaises(FQNBuildingException):
fqn.build(
metadata=self.mock_metadata,
entity_type=StoredProcedure,
service_name="mysql",
database_name="db",
schema_name="schema",
procedure_name='proc>"name"',
)
# ========== INTEGRATION WITH EXISTING BEHAVIOR ==========
def test_names_without_special_chars_unchanged(self):
"""Test that names without special characters remain unchanged"""
# Table without special chars
result = fqn.build(
metadata=self.mock_metadata,
entity_type=Table,
service_name="mysql",
database_name="test_db",
schema_name="public",
table_name="normal_table_name",
skip_es_search=True,
)
expected = "mysql.test_db.public.normal_table_name"
self.assertEqual(result, expected)
# Column without special chars
result = fqn.build(
metadata=self.mock_metadata,
entity_type=Column,
service_name="postgres",
database_name="db",
schema_name="schema",
table_name="table",
column_name="normal_column_name",
)
expected = "postgres.db.schema.table.normal_column_name"
self.assertEqual(result, expected)
def test_dots_in_names_still_quoted(self):
"""Test that dots in names still trigger quoting"""
result = fqn.build(
metadata=self.mock_metadata,
entity_type=Table,
service_name="mysql",
database_name="db",
schema_name="schema",
table_name="table.with.dots",
skip_es_search=True,
)
# Dots should still trigger quoting in quote_name
self.assertIn('"table.with.dots"', result)
# ========== ERROR SCENARIOS ==========
def test_invalid_entity_type_still_fails(self):
"""Test that invalid entity types still raise exceptions"""
class InvalidEntity:
pass
with self.assertRaises(FQNBuildingException) as context:
fqn.build(
metadata=self.mock_metadata,
entity_type=InvalidEntity,
service_name="mysql",
)
self.assertIn("Invalid Entity Type", str(context.exception))
def test_transformation_with_es_search(self):
"""Test transformation works with ES search enabled"""
# Mock ES search to return None (entity not found)
self.mock_metadata.es_search_from_fqn.return_value = []
result = fqn.build(
metadata=self.mock_metadata,
entity_type=Table,
service_name="mysql",
database_name="db",
schema_name="schema",
table_name='table "name"',
skip_es_search=False,
)
# Even with ES search, transformation should happen
expected = f"mysql.db.schema.table {RESERVED_QUOTE_KEYWORD}name{RESERVED_QUOTE_KEYWORD}"
self.assertEqual(result, expected)
# ========== PERFORMANCE AND SCALE ==========
def test_very_long_names_with_special_chars(self):
"""Test very long names with special characters"""
long_name = "a" * 100 + "::" + "b" * 100 + '>"' + "c" * 100 + '"'
result = fqn.build(
metadata=self.mock_metadata,
entity_type=Table,
service_name="mysql",
database_name="db",
schema_name="schema",
table_name=long_name,
skip_es_search=True,
)
# Should handle long names
self.assertIn(RESERVED_COLON_KEYWORD, result)
self.assertIn(RESERVED_ARROW_KEYWORD, result)
self.assertIn(RESERVED_QUOTE_KEYWORD, result)
self.assertIn("a" * 100, result)
self.assertIn("b" * 100, result)
self.assertIn("c" * 100, result)
def test_reserved_keywords_in_names(self):
"""Test that reserved keywords themselves are handled"""
# What if someone has __reserved__colon__ in their table name?
result = fqn.build(
metadata=self.mock_metadata,
entity_type=Table,
service_name="mysql",
database_name="db",
schema_name="schema",
table_name=f"table{RESERVED_COLON_KEYWORD}weird",
skip_es_search=True,
)
# Should not double-transform
expected = f"mysql.db.schema.table{RESERVED_COLON_KEYWORD}weird"
self.assertEqual(result, expected)
# ========== IMPORT ERROR HANDLING ==========
@patch("metadata.utils.fqn.build")
def test_import_error_handling(self, mock_build):
"""Test handling when custom_basemodel_validation import fails"""
def side_effect(*args, **kwargs):
# Simulate import error
if kwargs.get("table_name") or kwargs.get("column_name"):
raise ImportError("Cannot import custom_basemodel_validation")
return "mysql.db.schema.table"
mock_build.side_effect = side_effect
# Should raise the import error
with self.assertRaises(ImportError):
mock_build(
metadata=self.mock_metadata,
entity_type=Table,
service_name="mysql",
database_name="db",
schema_name="schema",
table_name='table "name"',
)
class TestFQNSpecialCharsRealWorldScenarios(unittest.TestCase):
"""Test real-world scenarios from actual database systems"""
def setUp(self):
"""Set up test fixtures"""
self.mock_metadata = Mock()
self.mock_metadata.es_search_from_fqn.return_value = []
def test_snowflake_quoted_identifiers(self):
"""Test Snowflake-style quoted identifiers"""
# Snowflake uses quotes for case-sensitive identifiers
result = fqn.build(
metadata=self.mock_metadata,
entity_type=Table,
service_name="snowflake",
database_name="ANALYTICS",
schema_name="PUBLIC",
table_name='"MixedCase_Table"',
skip_es_search=True,
)
expected = f"snowflake.ANALYTICS.PUBLIC.{RESERVED_QUOTE_KEYWORD}MixedCase_Table{RESERVED_QUOTE_KEYWORD}"
self.assertEqual(result, expected)
def test_postgres_special_schemas(self):
"""Test PostgreSQL special schema names"""
result = fqn.build(
metadata=self.mock_metadata,
entity_type=Column,
service_name="postgres",
database_name="mydb",
schema_name="pg_catalog",
table_name="pg_type",
column_name="typname::text",
)
expected = (
f"postgres.mydb.pg_catalog.pg_type.typname{RESERVED_COLON_KEYWORD}text"
)
self.assertEqual(result, expected)
def test_bigquery_dataset_table_notation(self):
"""Test BigQuery dataset.table notation"""
result = fqn.build(
metadata=self.mock_metadata,
entity_type=Table,
service_name="bigquery",
database_name="my-project",
schema_name="dataset",
table_name='table_2024_01_01$"partition"',
skip_es_search=True,
)
# Dollar signs are not transformed, only quotes
expected = f"bigquery.my-project.dataset.table_2024_01_01${RESERVED_QUOTE_KEYWORD}partition{RESERVED_QUOTE_KEYWORD}"
self.assertEqual(result, expected)
def test_mysql_backtick_conversion(self):
"""Test MySQL backtick identifiers (already handled by parser)"""
# Assuming backticks are converted to quotes before reaching FQN
result = fqn.build(
metadata=self.mock_metadata,
entity_type=Column,
service_name="mysql",
database_name="test",
schema_name="public",
table_name="orders",
column_name='"order-date"', # Backticks converted to quotes
)
expected = f"mysql.test.public.orders.{RESERVED_QUOTE_KEYWORD}order-date{RESERVED_QUOTE_KEYWORD}"
self.assertEqual(result, expected)
if __name__ == "__main__":
unittest.main()