diff --git a/ingestion/examples/sample_data/dashboards/dashboardDataModels.json b/ingestion/examples/sample_data/dashboards/dashboardDataModels.json index 40b86e81eee..8160cfbf501 100644 --- a/ingestion/examples/sample_data/dashboards/dashboardDataModels.json +++ b/ingestion/examples/sample_data/dashboards/dashboardDataModels.json @@ -14564,6 +14564,14 @@ "description": "Trend confidence level", "tags": [], "ordinalPosition": 3 + }, + { + "name": "trend_confidence with quotes \"00\"", + "dataType": "DOUBLE", + "dataTypeDisplay": "double", + "description": "Trend confidence level", + "tags": [], + "ordinalPosition": 4 } ] } diff --git a/ingestion/examples/sample_data/mysql/tables.json b/ingestion/examples/sample_data/mysql/tables.json index c5d5c00ebf2..14eab9a6e08 100644 --- a/ingestion/examples/sample_data/mysql/tables.json +++ b/ingestion/examples/sample_data/mysql/tables.json @@ -1223,6 +1223,101 @@ "description": null, "constraint": "NULL", "ordinalPosition": 3 + }, + { + "name": "risk_score", + "dataType": "DOUBLE", + "dataTypeDisplay": "double", + "description": "Customer risk assessment score", + "tags": [], + "ordinalPosition": 2 + }, + { + "name": "credit_metrics", + "dataType": "STRUCT", + "dataTypeDisplay": "struct", + "description": "Customer credit assessment metrics", + "tags": [], + "ordinalPosition": 3, + "children": [ + { + "name": "credit_score", + "dataType": "BIGINT", + "dataTypeDisplay": "bigint", + "description": "Customer credit score", + "tags": [], + "ordinalPosition": 1 + }, + { + "name": "payment_history", + "dataType": "STRUCT", + "dataTypeDisplay": "struct", + "description": "Payment history analytics", + "tags": [], + "ordinalPosition": 2, + "children": [ + { + "name": "on_time_payments", + "dataType": "BIGINT", + "dataTypeDisplay": "bigint", + "description": "Number of on-time payments", + "tags": [], + "ordinalPosition": 1 + }, + { + "name": "late_payments", + "dataType": "BIGINT", + "dataTypeDisplay": "bigint", + "description": "Number of late payments", + "tags": [], + "ordinalPosition": 2 + }, + { + "name": "payment_trends", + "dataType": "ARRAY", + "arrayDataType": "STRUCT", + "dataTypeDisplay": "array", + "description": "Payment trend analysis", + "tags": [], + "ordinalPosition": 3, + "children": [ + { + "name": "trend_period", + "dataType": "DATE", + "dataTypeDisplay": "date", + "description": "Payment trend period", + "tags": [], + "ordinalPosition": 1 + }, + { + "name": "trend_score", + "dataType": "DOUBLE", + "dataTypeDisplay": "double", + "description": "Payment trend score", + "tags": [], + "ordinalPosition": 2 + }, + { + "name": "trend_confidence", + "dataType": "DOUBLE", + "dataTypeDisplay": "double", + "description": "Trend confidence level", + "tags": [], + "ordinalPosition": 3 + }, + { + "name": "trend_confidence with quotes \"00\"", + "dataType": "DOUBLE", + "dataTypeDisplay": "double", + "description": "Trend confidence level", + "tags": [], + "ordinalPosition": 4 + } + ] + } + ] + } + ] } ], "tableConstraints": [ diff --git a/ingestion/src/metadata/ingestion/models/custom_basemodel_validation.py b/ingestion/src/metadata/ingestion/models/custom_basemodel_validation.py index fa07051e179..1d6b8f0fb18 100644 --- a/ingestion/src/metadata/ingestion/models/custom_basemodel_validation.py +++ b/ingestion/src/metadata/ingestion/models/custom_basemodel_validation.py @@ -13,19 +13,103 @@ Validation logic for Custom Pydantic BaseModel """ import logging +from enum import Enum +from typing import Any, Callable, Dict, Optional logger = logging.getLogger("metadata") - RESTRICTED_KEYWORDS = ["::", ">"] RESERVED_COLON_KEYWORD = "__reserved__colon__" RESERVED_ARROW_KEYWORD = "__reserved__arrow__" RESERVED_QUOTE_KEYWORD = "__reserved__quote__" -CREATE_ADJACENT_MODELS = {"ProfilerResponse", "SampleData"} -NAME_FIELDS = {"EntityName", "str", "ColumnName", "TableData"} -FETCH_MODELS = {"Table", "CustomColumnName", "DashboardDataModel"} -FIELD_NAMES = {"name", "columns", "root"} + +class TransformDirection(Enum): + """Direction of name transformation""" + + ENCODE = "encode" # For storage (Create operations) - replace separators + DECODE = "decode" # For display (Fetch operations) - revert separators + + +def is_service_level_create_model(model_name: str) -> bool: + """ + Check if a model is a Service-level Create model that should NOT be transformed. + Service-level models follow the pattern: Create*ServiceRequest where * is the service name + This is scalable and requires no maintenance for new services. + """ + if not model_name.startswith("Create") or not model_name.endswith("ServiceRequest"): + return False + + # Extract the middle part (service name) - must not be empty + # "CreateServiceRequest" -> middle = "" (invalid) + # "CreateDatabaseServiceRequest" -> middle = "Database" (valid) + middle = model_name[ + 6:-14 + ] # Remove "Create" (6 chars) and "ServiceRequest" (14 chars) + return len(middle) > 0 + + +# Explicit configuration for entity name transformations +# This dictionary will be populated lazily to avoid circular imports +TRANSFORMABLE_ENTITIES: Dict[Any, Dict[str, Any]] = {} + + +def _initialize_transformable_entities(): + """Initialize the transformable entities dictionary lazily to avoid circular imports""" + # Import all model classes here to avoid circular dependency at module load time + from metadata.generated.schema.api.data.createDashboardDataModel import ( + CreateDashboardDataModelRequest, + ) + from metadata.generated.schema.api.data.createTable import CreateTableRequest + from metadata.generated.schema.entity.data.dashboardDataModel import ( + DashboardDataModel, + ) + from metadata.generated.schema.entity.data.table import ( + ColumnName, + ColumnProfile, + Table, + TableData, + ) + from metadata.profiler.api.models import ProfilerResponse + from metadata.utils.entity_link import CustomColumnName + + # Now populate the dictionary with the imported classes + TRANSFORMABLE_ENTITIES.update( + { + # Fetch models - decode reserved keywords back to original characters + Table: { + "fields": {"name", "columns", "children", "tableConstraints"}, + "direction": TransformDirection.DECODE, + }, + DashboardDataModel: { + "fields": {"name", "columns", "children"}, + "direction": TransformDirection.DECODE, + }, + CustomColumnName: { + "fields": {"root"}, + "direction": TransformDirection.DECODE, + }, + # Create/Store models - encode special characters to reserved keywords + ProfilerResponse: { + "fields": {"name", "profile"}, + "direction": TransformDirection.ENCODE, + }, + TableData: {"fields": {"columns"}, "direction": TransformDirection.ENCODE}, + ColumnName: {"fields": {"root"}, "direction": TransformDirection.ENCODE}, + CreateTableRequest: { + "fields": {"name", "columns", "children", "tableConstraints"}, + "direction": TransformDirection.ENCODE, + }, + CreateDashboardDataModelRequest: { + "fields": {"name", "columns", "children"}, + "direction": TransformDirection.ENCODE, + }, + ColumnProfile: { + "fields": {"name"}, + "direction": TransformDirection.ENCODE, + }, + } + ) def revert_separators(value): @@ -44,42 +128,84 @@ def replace_separators(value): ) -def validate_name_and_transform(values, modification_method, field_name: str = None): - """ - Validate the name and transform it if needed. - """ - if isinstance(values, str) and field_name in FIELD_NAMES: - values = modification_method(values) - elif ( - hasattr(values, "root") - and isinstance(values.root, str) - and field_name in FIELD_NAMES +def get_entity_config(model: Optional[Any]) -> Optional[Dict[str, Any]]: + """Get transformation configuration for entity""" + _initialize_transformable_entities() # Ensure entities are loaded + return TRANSFORMABLE_ENTITIES.get(model) + + +def get_transformer(model: Optional[Any]) -> Optional[Callable]: + """Get the appropriate transformer function for model""" + config = get_entity_config(model) + if not config: + return None + + direction = config.get("direction") + if direction == TransformDirection.ENCODE: + return replace_separators + elif direction == TransformDirection.DECODE: + return revert_separators + return None + + +def transform_all_names(obj, transformer): + """Transform all name fields recursively""" + if not obj: + return + + # Transform name field if it exists (supports both obj.name.root and obj.root) + name = getattr(obj, "name", None) + if name and hasattr(name, "root") and name.root is not None: + name.root = transformer(name.root) + elif hasattr(obj, "root") and obj.root is not None: + obj.root = transformer(obj.root) + + # Transform nested collections in a single loop each + for attr_name in ["columns", "children"]: + if hasattr(obj, attr_name): + attr_value = getattr(obj, attr_name) + if attr_value is not None: + for item in attr_value: + transform_all_names(item, transformer) + + # Transform table constraints + if hasattr(obj, "tableConstraints"): + table_constraints = getattr(obj, "tableConstraints") + if table_constraints is not None: + for constraint in table_constraints: + if hasattr(constraint, "columns"): + constraint.columns = [ + transformer(col) for col in constraint.columns + ] + + if transformer == replace_separators and type(name) == str: + obj.name = transformer(name) + + +def transform_entity_names(entity: Any, model: Optional[Any]) -> Any: + """Transform entity names""" + model_name = model.__name__ + if not entity or ( + model_name.startswith("Create") and is_service_level_create_model(model_name) ): - values.root = modification_method(values.root) - elif hasattr(type(values), "model_fields"): - for key in type(values).model_fields.keys(): - if getattr(values, key): - if getattr(values, key).__class__.__name__ in NAME_FIELDS: - setattr( - values, - key, - validate_name_and_transform( - getattr(values, key), - modification_method=modification_method, - field_name=key, - ), - ) - elif isinstance(getattr(values, key), list): - setattr( - values, - key, - [ - validate_name_and_transform( - item, - modification_method=modification_method, - field_name=key, - ) - for item in getattr(values, key) - ], - ) - return values + return entity + + # Root attribute handling + if hasattr(entity, "root") and entity.root is not None: + entity.root = ( + replace_separators(entity.root) + if model_name.startswith("Create") + else revert_separators(entity.root) + ) + return entity + + # Get model-specific transformer + transformer = get_transformer(model) + if not transformer: + # Fallback to original logic for backward compatibility + transformer = ( + replace_separators if model_name.startswith("Create") else revert_separators + ) + + transform_all_names(entity, transformer) + return entity diff --git a/ingestion/src/metadata/ingestion/models/custom_pydantic.py b/ingestion/src/metadata/ingestion/models/custom_pydantic.py index 49cf61434cc..f5d7c8c2490 100644 --- a/ingestion/src/metadata/ingestion/models/custom_pydantic.py +++ b/ingestion/src/metadata/ingestion/models/custom_pydantic.py @@ -26,13 +26,7 @@ from pydantic.types import SecretStr from pydantic_core.core_schema import SerializationInfo from typing_extensions import Annotated -from metadata.ingestion.models.custom_basemodel_validation import ( - CREATE_ADJACENT_MODELS, - FETCH_MODELS, - replace_separators, - revert_separators, - validate_name_and_transform, -) +from metadata.ingestion.models.custom_basemodel_validation import transform_entity_names logger = logging.getLogger("metadata") @@ -75,25 +69,18 @@ class BaseModel(PydanticBaseModel): @classmethod def parse_name(cls, values): # pylint: disable=inconsistent-return-statements """ - Primary entry point to process values based on their class. + Transform entity names using hybrid configuration system. """ if not values: - return + return values try: - - if cls.__name__ in CREATE_ADJACENT_MODELS or cls.__name__.startswith( - "Create" - ): - values = validate_name_and_transform(values, replace_separators) - elif cls.__name__ in FETCH_MODELS: - values = validate_name_and_transform(values, revert_separators) - + # Try new hybrid system first + return transform_entity_names(entity=values, model=cls) except Exception as exc: logger.warning("Exception while parsing Basemodel: %s", exc) - raise exc - return values + return values def model_dump_json( # pylint: disable=too-many-arguments self, diff --git a/ingestion/src/metadata/utils/fqn.py b/ingestion/src/metadata/utils/fqn.py index e29aad26110..e5ad236258a 100644 --- a/ingestion/src/metadata/utils/fqn.py +++ b/ingestion/src/metadata/utils/fqn.py @@ -15,6 +15,7 @@ ES indexes definitions """ import hashlib import re +import traceback from typing import Dict, List, Optional, Type, TypeVar, Union from antlr4.CommonTokenStream import CommonTokenStream @@ -53,6 +54,9 @@ from metadata.generated.schema.tests.testSuite import TestSuite from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.dispatch import class_register from metadata.utils.elasticsearch import get_entity_from_es_result +from metadata.utils.logger import utils_logger + +logger = utils_logger() T = TypeVar("T", bound=BaseModel) @@ -144,12 +148,32 @@ def build( :param kwargs: required to build the FQN :return: FQN as a string """ - func = fqn_build_registry.registry.get(entity_type.__name__) - if not func: - raise FQNBuildingException( - f"Invalid Entity Type {entity_type.__name__}. FQN builder not implemented." + # Transform table_name and column_name if they exist and contain special characters + if kwargs.get("table_name") or kwargs.get("column_name"): + from metadata.ingestion.models.custom_basemodel_validation import ( # pylint: disable=import-outside-toplevel + replace_separators, + ) + + table_name = kwargs.get("table_name") + if table_name and isinstance(table_name, str): + kwargs["table_name"] = replace_separators(table_name) + + column_name = kwargs.get("column_name") + if column_name and isinstance(column_name, str): + kwargs["column_name"] = replace_separators(column_name) + + func = fqn_build_registry.registry.get(entity_type.__name__) + try: + if not func: + raise FQNBuildingException( + f"Invalid Entity Type {entity_type.__name__}. FQN builder not implemented." + ) + return func(metadata, **kwargs) + except Exception as e: + logger.debug(traceback.format_exc()) + raise FQNBuildingException( + f"Error building FQN for {entity_type.__name__}: {e}" ) - return func(metadata, **kwargs) @fqn_build_registry.add(Table) diff --git a/ingestion/tests/unit/models/test_custom_basemodel_validation.py b/ingestion/tests/unit/models/test_custom_basemodel_validation.py new file mode 100644 index 00000000000..890df20162b --- /dev/null +++ b/ingestion/tests/unit/models/test_custom_basemodel_validation.py @@ -0,0 +1,998 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Comprehensive tests for custom basemodel validation system. +Tests the hybrid name validation system with all edge cases and scenarios. +""" + +import uuid +from unittest import TestCase +from unittest.mock import patch + +from metadata.generated.schema.api.data.createDashboardDataModel import ( + CreateDashboardDataModelRequest, +) +from metadata.generated.schema.api.data.createTable import CreateTableRequest +from metadata.generated.schema.api.services.createDatabaseService import ( + CreateDatabaseServiceRequest, +) +from metadata.generated.schema.entity.data.dashboardDataModel import ( + DashboardDataModel, + DataModelType, +) +from metadata.generated.schema.entity.data.table import ( + Column, + ColumnName, + DataType, + Table, + TableData, +) +from metadata.generated.schema.type.basic import EntityName, FullyQualifiedEntityName +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.models.custom_basemodel_validation import ( + RESERVED_ARROW_KEYWORD, + RESERVED_COLON_KEYWORD, + RESERVED_QUOTE_KEYWORD, + TRANSFORMABLE_ENTITIES, + TransformDirection, + get_entity_config, + get_transformer, + is_service_level_create_model, + replace_separators, + revert_separators, + transform_entity_names, +) +from metadata.profiler.api.models import ProfilerResponse +from metadata.utils.entity_link import CustomColumnName + + +class TestCustomBasemodelValidation(TestCase): + """Comprehensive test suite for custom basemodel validation functionality.""" + + def setUp(self): + """Set up common test data.""" + self.sample_table_id = uuid.uuid4() + self.sample_schema_ref = EntityReference(id=uuid.uuid4(), type="databaseSchema") + + def test_service_pattern_detection(self): + """Test the scalable service pattern detection system.""" + # Test existing services (should be identified as services) + existing_services = [ + "CreateDatabaseServiceRequest", + "CreateDashboardServiceRequest", + "CreateMessagingServiceRequest", + "CreatePipelineServiceRequest", + "CreateMlModelServiceRequest", + "CreateStorageServiceRequest", + "CreateMetadataServiceRequest", + "CreateSearchServiceRequest", + "CreateApiServiceRequest", + ] + + for service in existing_services: + self.assertTrue( + is_service_level_create_model(service), + f"{service} should be identified as a service model", + ) + + # Test future services (should be identified as services - scalability test) + future_services = [ + "CreateNewServiceRequest", + "CreateCustomServiceRequest", + "CreateXYZServiceRequest", + "CreateAnalyticsServiceRequest", + "CreateAnyThingServiceRequest", + ] + + for service in future_services: + self.assertTrue( + is_service_level_create_model(service), + f"{service} should be identified as a service model (future compatibility)", + ) + + # Test non-services (should NOT be identified as services) + non_services = [ + "CreateTable", + "CreateDatabase", + "CreateServiceRequest", # No service name between Create and ServiceRequest + "CreateService", # Missing "Request" suffix + "MyCreateServiceRequest", # Doesn't start with "Create" + "createDatabaseServiceRequest", # Lowercase + "CreateServiceRequestSomething", # ServiceRequest not at the end + "CreateDashboard", + "CreateChart", + ] + + for non_service in non_services: + self.assertFalse( + is_service_level_create_model(non_service), + f"{non_service} should NOT be identified as a service model", + ) + + def test_service_pattern_edge_cases(self): + """Test edge cases for service pattern detection.""" + # Test edge case: just "CreateServiceRequest" (no service name) + self.assertFalse( + is_service_level_create_model("CreateServiceRequest"), + "CreateServiceRequest with no service name should not be considered a service", + ) + + # Test minimum valid service name + self.assertTrue( + is_service_level_create_model("CreateXServiceRequest"), + "CreateXServiceRequest should be considered a service", + ) + + # Test very long service name + long_service = "Create" + "Very" * 50 + "LongServiceRequest" + self.assertTrue( + is_service_level_create_model(long_service), + "Very long service names should be handled correctly", + ) + + def test_transformable_entities_configuration(self): + """Test the TRANSFORMABLE_ENTITIES configuration.""" + # Test that expected entities are configured + expected_entities = { + Table, + DashboardDataModel, + CustomColumnName, + ProfilerResponse, + TableData, + CreateTableRequest, + CreateDashboardDataModelRequest, + } + + for entity in expected_entities: + self.assertIn( + entity, + TRANSFORMABLE_ENTITIES, + f"{entity} should be in TRANSFORMABLE_ENTITIES", + ) + + # Test entity configurations have required fields + for entity_name, config in TRANSFORMABLE_ENTITIES.items(): + self.assertIn( + "fields", config, f"{entity_name} config should have 'fields' key" + ) + self.assertIn( + "direction", config, f"{entity_name} config should have 'direction' key" + ) + self.assertIsInstance( + config["fields"], set, f"{entity_name} fields should be a set" + ) + self.assertIsInstance( + config["direction"], + TransformDirection, + f"{entity_name} direction should be TransformDirection enum", + ) + + def test_get_entity_config(self): + """Test get_entity_config function.""" + # Test existing entity + table_config = get_entity_config(Table) + self.assertIsNotNone(table_config) + self.assertEqual(table_config["direction"], TransformDirection.DECODE) + self.assertIn("name", table_config["fields"]) + + # Test non-existent entity + non_existent_config = get_entity_config("NonExistentEntity") + self.assertIsNone(non_existent_config) + + def test_get_transformer(self): + """Test get_transformer function.""" + # Test DECODE transformer + table_transformer = get_transformer(Table) + self.assertIsNotNone(table_transformer) + self.assertEqual(table_transformer, revert_separators) + + # Test ENCODE transformer + create_table_transformer = get_transformer(CreateTableRequest) + self.assertIsNotNone(create_table_transformer) + self.assertEqual(create_table_transformer, replace_separators) + + # Test non-existent entity + non_existent_transformer = get_transformer("NonExistentEntity") + self.assertIsNone(non_existent_transformer) + + def test_replace_separators_function(self): + """Test replace_separators function with various inputs.""" + test_cases = [ + ("simple_name", "simple_name"), # No separators + ( + "name::with::colons", + "name__reserved__colon__with__reserved__colon__colons", + ), + ( + "name>with>arrows", + "name__reserved__arrow__with__reserved__arrow__arrows", + ), + ( + 'name"with"quotes', + "name__reserved__quote__with__reserved__quote__quotes", + ), + ( + 'mixed::>"chars', + "mixed__reserved__colon____reserved__arrow____reserved__quote__chars", + ), + ("", ""), # Empty string + (":::", "__reserved__colon__:"), # Multiple colons - :: replaced, : remains + ( + ">>>", + "__reserved__arrow____reserved__arrow____reserved__arrow__", + ), # Multiple arrows - each > replaced + ( + '"""', + "__reserved__quote____reserved__quote____reserved__quote__", + ), # Multiple quotes - each " replaced + ] + + for input_val, expected in test_cases: + result = replace_separators(input_val) + self.assertEqual( + result, + expected, + f"replace_separators('{input_val}') should return '{expected}'", + ) + + def test_revert_separators_function(self): + """Test revert_separators function with various inputs.""" + test_cases = [ + ("simple_name", "simple_name"), # No reserved keywords + ( + "name__reserved__colon__with__reserved__colon__colons", + "name::with::colons", + ), + ( + "name__reserved__arrow__with__reserved__arrow__arrows", + "name>with>arrows", + ), + ( + "name__reserved__quote__with__reserved__quote__quotes", + 'name"with"quotes', + ), + ( + "mixed__reserved__colon____reserved__arrow____reserved__quote__chars", + 'mixed::>"chars', + ), + ("", ""), # Empty string + ( + "__reserved__colon__:", + ":::", + ), # Multiple colons: __reserved__colon__ + : = :: + : = ::: + ] + + for input_val, expected in test_cases: + result = revert_separators(input_val) + self.assertEqual( + result, + expected, + f"revert_separators('{input_val}') should return '{expected}'", + ) + + def test_round_trip_transformations(self): + """Test that encode->decode round trips preserve original values.""" + test_values = [ + "simple_name", + "name::with::colons", + "name>with>arrows", + 'name"with"quotes', + 'complex::name>with"all', + "unicode测试::name", + 'emoji🚀::data📊>chart"report', + " spaced :: values ", # Leading/trailing spaces + "special!@#$%^&*()_+-={}[]|\\:;'<>?,./", # Special characters (non-reserved) + ] + + for original in test_values: + encoded = replace_separators(original) + decoded = revert_separators(encoded) + self.assertEqual(decoded, original, f"Round trip failed for: '{original}'") + + def test_transform_entity_names_with_explicit_config(self): + """Test transform_entity_names with explicitly configured entities.""" + # Test Table (DECODE direction) + table = Table( + id=self.sample_table_id, + name="test__reserved__colon__table__reserved__arrow__name", + databaseSchema=self.sample_schema_ref, + fullyQualifiedName="db.schema.test_table", + columns=[Column(name="id", dataType=DataType.BIGINT)], + ) + + result = transform_entity_names(table, Table) + self.assertEqual(result.name.root, "test::table>name") + + # Test CreateTable (ENCODE direction) + create_request = CreateTableRequest( + name=EntityName('my::table>with"special_chars'), + columns=[Column(name=ColumnName("col1"), dataType=DataType.STRING)], + databaseSchema=FullyQualifiedEntityName("db.schema"), + ) + + result = transform_entity_names(create_request, CreateTableRequest) + expected = "my__reserved__colon__table__reserved__arrow__with__reserved__quote__special_chars" + self.assertEqual(result.name.root, expected) + + def test_transform_entity_names_with_dynamic_pattern(self): + """Test transform_entity_names with dynamic Create* pattern.""" + # Create a custom CreateTableRequest that should use dynamic pattern + create_request = CreateTableRequest( + name=EntityName('dynamic::table>name"test'), + columns=[Column(name=ColumnName("col1"), dataType=DataType.STRING)], + databaseSchema=FullyQualifiedEntityName("db.schema"), + ) + + # Use a model name not in explicit config to trigger dynamic pattern + result = transform_entity_names(create_request, CreateTableRequest) + expected = "dynamic__reserved__colon__table__reserved__arrow__name__reserved__quote__test" + self.assertEqual(result.name.root, expected) + + def test_transform_entity_names_service_exclusion(self): + """Test that service-level models are excluded from transformation.""" + service_request = CreateDatabaseServiceRequest( + name=EntityName('my::database>service"with_separators'), serviceType="Mysql" + ) + + result = transform_entity_names(service_request, CreateDatabaseServiceRequest) + # Should NOT be transformed + self.assertEqual(result.name.root, 'my::database>service"with_separators') + + def test_transform_entity_names_edge_cases(self): + """Test transform_entity_names with edge cases.""" + # Test None entity + result = transform_entity_names(None, Table) + self.assertIsNone(result) + + # Test entity without __dict__ (edge case) + simple_value = "test_string" + result = transform_entity_names(simple_value, Table) + self.assertEqual(result, simple_value) + + # Test entity with minimal name + table_minimal = Table( + id=self.sample_table_id, + name=EntityName("a"), + databaseSchema=self.sample_schema_ref, + fullyQualifiedName="db.schema.minimal", + columns=[], + ) + result = transform_entity_names(table_minimal, Table) + self.assertEqual(result.name.root, "a") + + def test_transform_entity_names_with_nested_structures(self): + """Test transform_entity_names with complex nested structures.""" + # Create deeply nested column structure + level3_columns = [ + Column( + name=ColumnName("deep__reserved__colon__field"), + dataType=DataType.STRING, + ) + ] + + level2_columns = [ + Column( + name=ColumnName("nested__reserved__arrow__struct"), + dataType=DataType.STRUCT, + children=level3_columns, + ) + ] + + level1_column = Column( + name=ColumnName("root__reserved__quote__struct"), + dataType=DataType.STRUCT, + children=level2_columns, + ) + + table = Table( + id=self.sample_table_id, + name="complex__reserved__colon__table", + columns=[level1_column], + databaseSchema=self.sample_schema_ref, + fullyQualifiedName="db.schema.complex_table", + ) + + result = transform_entity_names(table, Table) + + # Verify table name transformation (DECODE operation) + self.assertEqual(result.name.root, "complex::table") + # Column names should also be decoded since Table config includes columns + self.assertEqual(result.columns[0].name.root, 'root"struct') + self.assertEqual(result.columns[0].children[0].name.root, "nested>struct") + self.assertEqual( + result.columns[0].children[0].children[0].name.root, "deep::field" + ) + + def test_transform_entity_names_with_root_attributes(self): + """Test transformation of entities with root attributes (like FullyQualifiedEntityName).""" + # Create a mock entity with root attribute + class MockEntityWithRoot: + def __init__(self, root_value): + self.root = root_value + + # Test transformation of root attribute + entity = MockEntityWithRoot("test__reserved__colon__value") + result = transform_entity_names(entity, Table) + self.assertEqual(result.root, "test::value") + + def test_unicode_and_international_characters(self): + """Test handling of Unicode and international characters.""" + # Test Unicode characters with separators + table_unicode = Table( + id=self.sample_table_id, + name="測試__reserved__colon__表格__reserved__arrow__名稱", + databaseSchema=self.sample_schema_ref, + fullyQualifiedName="db.schema.unicode_table", + columns=[ + Column(name="unicode__reserved__quote__列", dataType=DataType.STRING) + ], + ) + + result = transform_entity_names(table_unicode, Table) + self.assertEqual(result.name.root, "測試::表格>名稱") + # Column names should also be decoded since Table config includes columns + self.assertEqual(result.columns[0].name.root, 'unicode"列') + + # Test emojis with separators + table_emoji = Table( + id=self.sample_table_id, + name="table🚀__reserved__colon__data📊__reserved__arrow__chart", + databaseSchema=self.sample_schema_ref, + fullyQualifiedName="db.schema.emoji_table", + columns=[ + Column(name="emoji__reserved__quote__field🎯", dataType=DataType.STRING) + ], + ) + + result = transform_entity_names(table_emoji, Table) + self.assertEqual(result.name.root, "table🚀::data📊>chart") + self.assertEqual(result.columns[0].name.root, 'emoji"field🎯') + + def test_very_long_strings(self): + """Test handling of long strings within validation limits.""" + # Create long names within validation limits (under 256 chars) + long_name = ( + "a" * 50 + + "__reserved__colon__" + + "b" * 50 + + "__reserved__arrow__" + + "c" * 50 + ) + + table = Table( + id=self.sample_table_id, + name=long_name, + databaseSchema=self.sample_schema_ref, + fullyQualifiedName="db.schema.long_table", + columns=[], + ) + + result = transform_entity_names(table, Table) + + # Should still transform correctly + expected = "a" * 50 + "::" + "b" * 50 + ">" + "c" * 50 + self.assertEqual(result.name.root, expected) + + def test_nested_reserved_keywords(self): + """Test handling of nested/overlapping reserved keywords.""" + # Test overlapping patterns + overlapping_name = "test__reserved__colon____reserved__colon__reserved__name" + + table = Table( + id=self.sample_table_id, + name=overlapping_name, + databaseSchema=self.sample_schema_ref, + fullyQualifiedName="db.schema.overlapping_table", + columns=[], + ) + + result = transform_entity_names(table, Table) + # This should handle the overlapping keywords correctly + expected = "test::::reserved__name" + self.assertEqual(result.name.root, expected) + + def test_error_handling_and_logging(self): + """Test error handling and logging in transformation functions.""" + # Test with mock entity that might cause errors + class ProblematicEntity: + def __init__(self): + self.name = "test_name" + + def __getattribute__(self, name): + if name == "name" and hasattr(self, "_fail_count"): + self._fail_count += 1 + if self._fail_count > 2: + raise ValueError("Simulated error") + return super().__getattribute__(name) + + problematic_entity = ProblematicEntity() + problematic_entity._fail_count = 0 + + # Should handle errors gracefully and return original entity + with patch( + "metadata.ingestion.models.custom_basemodel_validation.logger" + ) as mock_logger: + result = transform_entity_names(problematic_entity, Table) + # Should return original entity on error + self.assertEqual(result, problematic_entity) + + def test_performance_with_large_datasets(self): + """Test performance with large datasets.""" + # Create table with many columns + large_columns = [] + for i in range(100): + col_name = f"col_{i}__reserved__colon__field_{i}" + large_columns.append( + Column(name=ColumnName(col_name), dataType=DataType.STRING) + ) + + large_table = Table( + id=self.sample_table_id, + name="large__reserved__arrow__table", + databaseSchema=self.sample_schema_ref, + fullyQualifiedName="db.schema.large_table", + columns=large_columns, + ) + + # Should handle large datasets efficiently + result = transform_entity_names(large_table, Table) + + self.assertEqual(result.name.root, "large>table") + self.assertEqual(len(result.columns), 100) + + # Verify first and last columns are transformed correctly + self.assertEqual(result.columns[0].name.root, "col_0::field_0") + self.assertEqual(result.columns[99].name.root, "col_99::field_99") + + def test_dashboard_data_model_transformations(self): + """Test DashboardDataModel specific transformations.""" + # Test DashboardDataModel with nested columns + child_columns = [ + Column( + name=ColumnName("nested__reserved__colon__metric"), + dataType=DataType.DOUBLE, + ), + Column( + name=ColumnName("nested__reserved__arrow__dimension"), + dataType=DataType.STRING, + ), + ] + + parent_column = Column( + name=ColumnName("complex__reserved__quote__field"), + dataType=DataType.STRUCT, + children=child_columns, + ) + + dashboard_model = DashboardDataModel( + id=uuid.uuid4(), + name="dashboard__reserved__colon__model__reserved__quote__name", + dataModelType=DataModelType.TableauDataModel, + columns=[parent_column], + ) + + result = transform_entity_names(dashboard_model, DashboardDataModel) + + # Verify transformations + self.assertEqual(result.name.root, 'dashboard::model"name') + self.assertEqual(result.columns[0].name.root, 'complex"field') + self.assertEqual(result.columns[0].children[0].name.root, "nested::metric") + self.assertEqual(result.columns[0].children[1].name.root, "nested>dimension") + + def test_configuration_consistency(self): + """Test consistency of configuration across the system.""" + # Verify that all configured entities have consistent field mappings + for entity_name, config in TRANSFORMABLE_ENTITIES.items(): + # Verify direction is valid + self.assertIn( + config["direction"], + [TransformDirection.ENCODE, TransformDirection.DECODE], + ) + + # Verify fields is not empty + self.assertGreater( + len(config["fields"]), + 0, + f"{entity_name} should have at least one field configured", + ) + + +class TestTransformationConstants(TestCase): + """Test transformation constants and reserved keywords.""" + + def test_reserved_keywords_constants(self): + """Test that reserved keyword constants are properly defined.""" + self.assertEqual(RESERVED_COLON_KEYWORD, "__reserved__colon__") + self.assertEqual(RESERVED_ARROW_KEYWORD, "__reserved__arrow__") + self.assertEqual(RESERVED_QUOTE_KEYWORD, "__reserved__quote__") + + def test_reserved_keywords_uniqueness(self): + """Test that reserved keywords are unique and don't conflict.""" + keywords = [ + RESERVED_COLON_KEYWORD, + RESERVED_ARROW_KEYWORD, + RESERVED_QUOTE_KEYWORD, + ] + self.assertEqual( + len(keywords), len(set(keywords)), "Reserved keywords should be unique" + ) + + # Test that keywords don't contain each other + for i, keyword1 in enumerate(keywords): + for j, keyword2 in enumerate(keywords): + if i != j: + self.assertNotIn( + keyword1, + keyword2, + f"{keyword1} should not be contained in {keyword2}", + ) + + def test_transform_direction_enum(self): + """Test TransformDirection enum values.""" + self.assertEqual(TransformDirection.ENCODE.value, "encode") + self.assertEqual(TransformDirection.DECODE.value, "decode") + + # Test enum has exactly two values + self.assertEqual(len(list(TransformDirection)), 2) + + +class TestDashboardDataModelValidation(TestCase): + """Test DashboardDataModel-specific validation and transformations.""" + + def setUp(self): + """Set up test data.""" + self.sample_dashboard_id = uuid.uuid4() + self.sample_service_ref = EntityReference( + id=uuid.uuid4(), type="dashboardService" + ) + + def test_dashboard_datamodel_create_transformation(self): + """Test CreateDashboardDataModelRequest transformations with nested children.""" + from metadata.generated.schema.api.data.createDashboardDataModel import ( + CreateDashboardDataModelRequest, + ) + from metadata.generated.schema.entity.data.dashboardDataModel import ( + DataModelType, + ) + + create_request = CreateDashboardDataModelRequest( + name=EntityName('analytics::report>model"quarterly'), + displayName="Analytics Report Model", + dataModelType=DataModelType.PowerBIDataModel, + service=FullyQualifiedEntityName("service.powerbi"), + columns=[ + Column( + name=ColumnName("revenue::summary>metrics"), + displayName="Revenue Summary", + dataType=DataType.STRUCT, + children=[ + Column( + name=ColumnName('total::amount>"USD"'), + displayName="Total Amount", + dataType=DataType.DECIMAL, + ), + Column( + name=ColumnName("nested::data>structure"), + displayName="Nested Data", + dataType=DataType.STRUCT, + children=[ + Column( + name=ColumnName('deep::field>"value"'), + displayName="Deep Field", + dataType=DataType.STRING, + ) + ], + ), + ], + ) + ], + ) + + result = transform_entity_names(create_request, CreateDashboardDataModelRequest) + + # Verify main name transformation (ENCODE for Create operations) + self.assertEqual( + result.name.root, + "analytics__reserved__colon__report__reserved__arrow__model__reserved__quote__quarterly", + ) + + # Verify top-level column transformation + self.assertEqual( + result.columns[0].name.root, + "revenue__reserved__colon__summary__reserved__arrow__metrics", + ) + + # Verify nested children transformations (first level) + revenue_column = result.columns[0] + self.assertEqual( + revenue_column.children[0].name.root, + "total__reserved__colon__amount__reserved__arrow____reserved__quote__USD__reserved__quote__", + ) + self.assertEqual( + revenue_column.children[1].name.root, + "nested__reserved__colon__data__reserved__arrow__structure", + ) + + # Verify deeply nested transformations (second level) + nested_struct = revenue_column.children[1] + self.assertEqual( + nested_struct.children[0].name.root, + "deep__reserved__colon__field__reserved__arrow____reserved__quote__value__reserved__quote__", + ) + + def test_dashboard_datamodel_fetch_transformation(self): + """Test DashboardDataModel fetch transformations with nested children.""" + from metadata.generated.schema.entity.data.dashboardDataModel import ( + DashboardDataModel, + DataModelType, + ) + + dashboard_model = DashboardDataModel( + id=self.sample_dashboard_id, + name="analytics__reserved__colon__report__reserved__arrow__model__reserved__quote__quarterly", + displayName="Analytics Report Model", + dataModelType=DataModelType.PowerBIDataModel, + service=self.sample_service_ref, + fullyQualifiedName="service.analytics__reserved__colon__report__reserved__arrow__model__reserved__quote__quarterly", + columns=[ + Column( + name=ColumnName( + "revenue__reserved__colon__summary__reserved__arrow__metrics" + ), + displayName="Revenue Summary", + dataType=DataType.STRUCT, + children=[ + Column( + name=ColumnName( + "total__reserved__colon__amount__reserved__arrow____reserved__quote__USD__reserved__quote__" + ), + displayName="Total Amount", + dataType=DataType.DECIMAL, + ), + Column( + name=ColumnName( + "nested__reserved__colon__data__reserved__arrow__structure" + ), + displayName="Nested Data", + dataType=DataType.STRUCT, + children=[ + Column( + name=ColumnName( + "deep__reserved__colon__field__reserved__arrow____reserved__quote__value__reserved__quote__" + ), + displayName="Deep Field", + dataType=DataType.STRING, + ) + ], + ), + ], + ) + ], + ) + + result = transform_entity_names(dashboard_model, DashboardDataModel) + + # Verify main name transformation (DECODE for fetch operations) + self.assertEqual(result.name.root, 'analytics::report>model"quarterly') + + # Verify top-level column transformation + self.assertEqual(result.columns[0].name.root, "revenue::summary>metrics") + + # Verify nested children transformations (first level) + revenue_column = result.columns[0] + self.assertEqual(revenue_column.children[0].name.root, 'total::amount>"USD"') + self.assertEqual(revenue_column.children[1].name.root, "nested::data>structure") + + # Verify deeply nested transformations (second level) + nested_struct = revenue_column.children[1] + self.assertEqual(nested_struct.children[0].name.root, 'deep::field>"value"') + + def test_dashboard_datamodel_edge_cases(self): + """Test edge cases for DashboardDataModel transformations.""" + from metadata.generated.schema.entity.data.dashboardDataModel import ( + DashboardDataModel, + DataModelType, + ) + + # Test with empty children + model_empty_children = DashboardDataModel( + id=self.sample_dashboard_id, + name="test__reserved__colon__model", + displayName="Test Model", + dataModelType=DataModelType.PowerBIDataModel, + service=self.sample_service_ref, + fullyQualifiedName="service.test__reserved__colon__model", + columns=[ + Column( + name=ColumnName("parent__reserved__arrow__column"), + displayName="Parent Column", + dataType=DataType.STRUCT, + children=[], # Empty children list + ) + ], + ) + + result_empty = transform_entity_names(model_empty_children, DashboardDataModel) + self.assertEqual(result_empty.name.root, "test::model") + self.assertEqual(result_empty.columns[0].name.root, "parent>column") + + # Test with None children + model_none_children = DashboardDataModel( + id=self.sample_dashboard_id, + name="test__reserved__quote__model", + displayName="Test Model", + dataModelType=DataModelType.PowerBIDataModel, + service=self.sample_service_ref, + fullyQualifiedName="service.test__reserved__quote__model", + columns=[ + Column( + name=ColumnName("parent__reserved__quote__column"), + displayName="Parent Column", + dataType=DataType.STRING, + children=None, # None children + ) + ], + ) + + result_none = transform_entity_names(model_none_children, DashboardDataModel) + self.assertEqual(result_none.name.root, 'test"model') + self.assertEqual(result_none.columns[0].name.root, 'parent"column') + + def test_dashboard_datamodel_complex_nested_structures(self): + """Test complex nested structures with multiple levels and various datatypes.""" + from metadata.generated.schema.entity.data.dashboardDataModel import ( + DashboardDataModel, + DataModelType, + ) + + complex_model = DashboardDataModel( + id=self.sample_dashboard_id, + name="complex__reserved__colon__model__reserved__arrow__test", + displayName="Complex Test Model", + dataModelType=DataModelType.PowerBIDataModel, + service=self.sample_service_ref, + fullyQualifiedName="service.complex__reserved__colon__model__reserved__arrow__test", + columns=[ + Column( + name=ColumnName( + "level1__reserved__colon__struct__reserved__arrow__data" + ), + displayName="Level 1 Struct", + dataType=DataType.STRUCT, + children=[ + Column( + name=ColumnName( + "level2__reserved__quote__array__reserved__colon__items" + ), + displayName="Level 2 Array", + dataType=DataType.ARRAY, + arrayDataType=DataType.STRUCT, + children=[ + Column( + name=ColumnName( + "level3__reserved__arrow__nested__reserved__quote__field" + ), + displayName="Level 3 Nested", + dataType=DataType.STRUCT, + children=[ + Column( + name=ColumnName( + "level4__reserved__colon__deep__reserved__arrow__value" + ), + displayName="Level 4 Deep", + dataType=DataType.STRING, + ) + ], + ) + ], + ), + Column( + name=ColumnName("simple__reserved__quote__field"), + displayName="Simple Field", + dataType=DataType.INT, + ), + ], + ) + ], + ) + + result = transform_entity_names(complex_model, DashboardDataModel) + + # Verify transformations at each level + self.assertEqual(result.name.root, "complex::model>test") + self.assertEqual(result.columns[0].name.root, "level1::struct>data") + + # Level 2 + level1_struct = result.columns[0] + self.assertEqual(level1_struct.children[0].name.root, 'level2"array::items') + self.assertEqual(level1_struct.children[1].name.root, 'simple"field') + + # Level 3 + level2_array = level1_struct.children[0] + self.assertEqual(level2_array.children[0].name.root, 'level3>nested"field') + + # Level 4 + level3_nested = level2_array.children[0] + self.assertEqual(level3_nested.children[0].name.root, "level4::deep>value") + + def test_dashboard_datamodel_round_trip_validation(self): + """Test round-trip validation for DashboardDataModel transformations.""" + from metadata.generated.schema.api.data.createDashboardDataModel import ( + CreateDashboardDataModelRequest, + ) + from metadata.generated.schema.entity.data.dashboardDataModel import ( + DashboardDataModel, + DataModelType, + ) + + # Test data with mixed special characters + test_cases = [ + ("simple::name", "simple__reserved__colon__name"), + ( + 'complex::name>with"quotes', + "complex__reserved__colon__name__reserved__arrow__with__reserved__quote__quotes", + ), + ( + 'edge::case>test"data', + "edge__reserved__colon__case__reserved__arrow__test__reserved__quote__data", + ), + ] + + for original_name, encoded_name in test_cases: + with self.subTest(original_name=original_name): + # Create request (should encode) + create_request = CreateDashboardDataModelRequest( + name=EntityName(original_name), + displayName="Test Model", + dataModelType=DataModelType.PowerBIDataModel, + service=FullyQualifiedEntityName("service.test"), + columns=[ + Column( + name=ColumnName(original_name), + displayName="Test Column", + dataType=DataType.STRING, + ) + ], + ) + + create_result = transform_entity_names( + create_request, CreateDashboardDataModelRequest + ) + self.assertEqual(create_result.name.root, encoded_name) + self.assertEqual(create_result.columns[0].name.root, encoded_name) + + # Fetch model (should decode) + fetch_model = DashboardDataModel( + id=self.sample_dashboard_id, + name=encoded_name, + displayName="Test Model", + dataModelType=DataModelType.PowerBIDataModel, + service=self.sample_service_ref, + fullyQualifiedName=f"service.{encoded_name}", + columns=[ + Column( + name=ColumnName(encoded_name), + displayName="Test Column", + dataType=DataType.STRING, + ) + ], + ) + + fetch_result = transform_entity_names(fetch_model, DashboardDataModel) + self.assertEqual(fetch_result.name.root, original_name) + self.assertEqual(fetch_result.columns[0].name.root, original_name) + + +if __name__ == "__main__": + import unittest + + unittest.main() diff --git a/ingestion/tests/unit/models/test_custom_pydantic.py b/ingestion/tests/unit/models/test_custom_pydantic.py index 51f22c2bfa3..b50b801eec1 100644 --- a/ingestion/tests/unit/models/test_custom_pydantic.py +++ b/ingestion/tests/unit/models/test_custom_pydantic.py @@ -1,11 +1,14 @@ import uuid -from typing import List +from typing import List, Optional from unittest import TestCase from metadata.generated.schema.api.data.createDashboardDataModel import ( CreateDashboardDataModelRequest, ) from metadata.generated.schema.api.data.createTable import CreateTableRequest +from metadata.generated.schema.api.services.createDatabaseService import ( + CreateDatabaseServiceRequest, +) from metadata.generated.schema.entity.data.dashboardDataModel import ( DashboardDataModel, DataModelType, @@ -16,6 +19,7 @@ from metadata.generated.schema.entity.data.table import ( DataType, Table, TableConstraint, + TableType, ) from metadata.generated.schema.type.basic import ( EntityExtension, @@ -227,3 +231,936 @@ def test_model_dump_json_secrets(): ).root_secret.get_secret_value() == "root_password" ) + + +# Additional comprehensive tests for enhanced functionality +class ExtendedCustomPydanticValidationTest(TestCase): + """Extended test suite for comprehensive validation of custom Pydantic functionality.""" + + def setUp(self): + """Set up test data for extended tests.""" + self.sample_table_id = uuid.uuid4() + self.sample_schema_ref = EntityReference(id=uuid.uuid4(), type="databaseSchema") + + def test_service_level_models_not_transformed(self): + """Test that service-level Create models are not transformed.""" + # Test database service creation (should NOT be transformed) + service_request = CreateDatabaseServiceRequest( + name=EntityName('my::database>service"with_separators'), serviceType="Mysql" + ) + + # Service names should remain unchanged (not transformed) + assert service_request.name.root == 'my::database>service"with_separators' + + def test_edge_cases_empty_and_none_values(self): + """Test handling of edge cases like empty strings and None values.""" + # Test minimal name (empty string not allowed by EntityName validation) + table_empty = Table( + id=self.sample_table_id, + name=EntityName("a"), + databaseSchema=self.sample_schema_ref, + fullyQualifiedName="test.empty", + columns=[Column(name="id", dataType=DataType.BIGINT)], + ) + assert table_empty.name.root == "a" + + # Test table with no columns (edge case) + table_no_columns = Table( + id=self.sample_table_id, + name="test__reserved__colon__table", + databaseSchema=self.sample_schema_ref, + fullyQualifiedName="test.empty", + columns=[], + ) + assert table_no_columns.name.root == "test::table" + assert len(table_no_columns.columns) == 0 + + def test_complex_nested_structures(self): + """Test complex nested column structures with multiple levels.""" + # Create deeply nested structure + level3_columns = [ + Column( + name=ColumnName("deep__reserved__colon__field"), + dataType=DataType.STRING, + ) + ] + + level2_columns = [ + Column( + name=ColumnName("nested__reserved__arrow__struct"), + dataType=DataType.STRUCT, + children=level3_columns, + ) + ] + + level1_column = Column( + name=ColumnName("root__reserved__quote__struct"), + dataType=DataType.STRUCT, + children=level2_columns, + ) + + table = Table( + id=self.sample_table_id, + name="complex__reserved__colon__table", + columns=[level1_column], + databaseSchema=self.sample_schema_ref, + fullyQualifiedName="test.complex", + ) + + # Verify transformations at all levels + assert table.name.root == "complex::table" + assert table.columns[0].name.root == 'root"struct' + assert table.columns[0].children[0].name.root == "nested>struct" + assert table.columns[0].children[0].children[0].name.root == "deep::field" + + def test_unicode_and_special_characters(self): + """Test handling of Unicode and international characters.""" + # Test Unicode with separators + table_unicode = Table( + id=self.sample_table_id, + name="測試__reserved__colon__表格__reserved__arrow__名稱", + databaseSchema=self.sample_schema_ref, + fullyQualifiedName="test.unicode", + columns=[ + Column(name="unicode__reserved__quote__列", dataType=DataType.STRING) + ], + ) + assert table_unicode.name.root == "測試::表格>名稱" + assert table_unicode.columns[0].name.root == 'unicode"列' + + # Test emojis with separators + table_emoji = Table( + id=self.sample_table_id, + name="table🚀__reserved__colon__data📊", + databaseSchema=self.sample_schema_ref, + fullyQualifiedName="test.emoji", + columns=[ + Column(name="emoji__reserved__arrow__field🎯", dataType=DataType.STRING) + ], + ) + assert table_emoji.name.root == "table🚀::data📊" + assert table_emoji.columns[0].name.root == "emoji>field🎯" + + def test_all_separator_combinations(self): + """Test all combinations of separators in various scenarios.""" + # Test all separators together + complex_name = 'test::colon>arrow"quote__reserved__mixed' + create_request = CreateTableRequest( + name=EntityName(complex_name), + columns=[Column(name=ColumnName("simple_col"), dataType=DataType.STRING)], + databaseSchema=FullyQualifiedEntityName("db.schema"), + ) + + expected = "test__reserved__colon__colon__reserved__arrow__arrow__reserved__quote__quote__reserved__mixed" + assert create_request.name.root == expected + + def test_table_types_and_properties(self): + """Test different table types and properties with name transformations.""" + # Test with comprehensive table properties + table_full = Table( + id=self.sample_table_id, + name="full__reserved__colon__table__reserved__arrow__test", + displayName="Full Test Table", + description=Markdown(root="A comprehensive test table"), + tableType=TableType.Regular, + databaseSchema=self.sample_schema_ref, + fullyQualifiedName="test.db.schema.full_table", + columns=[ + Column( + name=ColumnName("id__reserved__quote__primary"), + displayName="ID Primary", + dataType=DataType.BIGINT, + description=Markdown(root="Primary key column"), + ), + Column( + name=ColumnName("data__reserved__arrow__field"), + displayName="Data Field", + dataType=DataType.STRING, + description=Markdown(root="Data field column"), + ), + ], + tableConstraints=[ + TableConstraint( + constraintType="PRIMARY_KEY", + columns=["id__reserved__quote__primary"], + ) + ], + ) + + # Verify all transformations + assert table_full.name.root == "full::table>test" + assert table_full.columns[0].name.root == 'id"primary' + assert table_full.columns[1].name.root == "data>field" + assert table_full.tableConstraints[0].columns[0] == 'id"primary' + + def test_dashboard_data_model_comprehensive(self): + """Test comprehensive DashboardDataModel scenarios.""" + # Test with all data model types + data_model_types = [ + DataModelType.TableauDataModel, + DataModelType.PowerBIDataModel, + DataModelType.SupersetDataModel, + DataModelType.MetabaseDataModel, + ] + + for model_type in data_model_types: + dashboard_model = DashboardDataModel( + id=uuid.uuid4(), + name=f"model__reserved__colon__{model_type.value.lower()}", + dataModelType=model_type, + columns=[ + Column( + name=ColumnName( + f"metric__reserved__arrow__{model_type.value.lower()}" + ), + dataType=DataType.DOUBLE, + ) + ], + ) + + expected_name = f"model::{model_type.value.lower()}" + expected_col = f"metric>{model_type.value.lower()}" + + assert dashboard_model.name.root == expected_name + assert dashboard_model.columns[0].name.root == expected_col + + def test_create_requests_comprehensive(self): + """Test comprehensive CreateRequest scenarios.""" + # Test CreateTableRequest with all possible fields + comprehensive_request = CreateTableRequest( + name=EntityName('comprehensive::table>name"test'), + displayName='Comprehensive"Table>Test::Name', + description=Markdown(root="A comprehensive test table with all fields"), + tableType=TableType.Regular, + columns=[ + Column( + name=ColumnName("primary__reserved__quote__key"), + displayName="Primary Key", + dataType=DataType.BIGINT, + constraint="NOT_NULL", + ordinalPosition=1, + ), + Column( + name=ColumnName("foreign__reserved__arrow__key"), + displayName="Foreign Key", + dataType=DataType.BIGINT, + constraint="NOT_NULL", + ordinalPosition=2, + ), + Column( + name=ColumnName("nested__reserved__colon__struct"), + displayName="Nested Struct", + dataType=DataType.STRUCT, + children=[ + Column( + name=ColumnName("child__reserved__quote__field"), + dataType=DataType.STRING, + ) + ], + ), + ], + tableConstraints=[ + TableConstraint( + constraintType="PRIMARY_KEY", + columns=["primary__reserved__quote__key"], + ), + TableConstraint( + constraintType="UNIQUE", columns=["foreign__reserved__arrow__key"] + ), + ], + databaseSchema=FullyQualifiedEntityName("test__reserved__colon__db.schema"), + ) + + # Verify transformations + assert ( + comprehensive_request.name.root + == "comprehensive__reserved__colon__table__reserved__arrow__name__reserved__quote__test" + ) + assert ( + comprehensive_request.columns[0].name.root + == "primary__reserved__quote__key" + ) + assert ( + comprehensive_request.columns[1].name.root + == "foreign__reserved__arrow__key" + ) + assert ( + comprehensive_request.columns[2].name.root + == "nested__reserved__colon__struct" + ) + assert ( + comprehensive_request.columns[2].children[0].name.root + == "child__reserved__quote__field" + ) + + def test_mixed_separator_edge_cases(self): + """Test edge cases with mixed separators.""" + edge_cases = [ + # Consecutive separators + ( + 'test::>>""name', + "test__reserved__colon____reserved__arrow____reserved__arrow____reserved__quote____reserved__quote__name", + ), + # Separators at start and end + ( + '::test>name"', + "__reserved__colon__test__reserved__arrow__name__reserved__quote__", + ), + # Only separators + ('::>"', "__reserved__colon____reserved__arrow____reserved__quote__"), + # Empty between separators + ( + 'test::>"name', + "test__reserved__colon____reserved__arrow____reserved__quote__name", + ), + ] + + for input_name, expected in edge_cases: + create_request = CreateTableRequest( + name=EntityName(input_name), + columns=[Column(name=ColumnName("col"), dataType=DataType.STRING)], + databaseSchema=FullyQualifiedEntityName("db.schema"), + ) + assert ( + create_request.name.root == expected + ), f"Failed for input: {input_name}" + + def test_very_long_names_performance(self): + """Test performance with very long names.""" + # Create very long names to test performance + long_base_name = "very_long_table_name_" * 3 + long_name_with_separators = ( + f'{long_base_name}::separator>{long_base_name}"quote{long_base_name}' + ) + + create_request = CreateTableRequest( + name=EntityName(long_name_with_separators), + columns=[Column(name=ColumnName("col"), dataType=DataType.STRING)], + databaseSchema=FullyQualifiedEntityName("db.schema"), + ) + + # Should handle long names without issues + result_name = create_request.name.root + assert "__reserved__colon__" in result_name + assert "__reserved__arrow__" in result_name + assert "__reserved__quote__" in result_name + + def test_happy_path_simple_names(self): + """Test happy path with simple names that don't need transformation.""" + # Test simple names without special characters + simple_create = CreateTableRequest( + name=EntityName("simple_table_name"), + columns=[ + Column(name=ColumnName("simple_column"), dataType=DataType.STRING) + ], + databaseSchema=FullyQualifiedEntityName("db.schema"), + ) + + # Names should remain unchanged + assert simple_create.name.root == "simple_table_name" + assert simple_create.columns[0].name.root == "simple_column" + + # Test simple fetch model + simple_table = Table( + id=self.sample_table_id, + name="simple_table", + databaseSchema=self.sample_schema_ref, + fullyQualifiedName="db.schema.simple_table", + columns=[Column(name="simple_col", dataType=DataType.STRING)], + ) + + assert simple_table.name.root == "simple_table" + assert simple_table.columns[0].name.root == "simple_col" + + def test_error_handling_invalid_models(self): + """Test error handling with None and invalid models.""" + # Test with None entity + result = None + # This would normally be called by the validation system + # Just ensure no exceptions are thrown + + # Test with mock invalid object + class InvalidModel: + def __init__(self): + self.invalid_attr = "test" + + invalid_obj = InvalidModel() + # Should handle gracefully without transformation + assert hasattr(invalid_obj, "invalid_attr") + + def test_boundary_conditions(self): + """Test boundary conditions and edge cases.""" + # Test single character names + single_char_create = CreateTableRequest( + name=EntityName("a"), + columns=[Column(name=ColumnName("b"), dataType=DataType.STRING)], + databaseSchema=FullyQualifiedEntityName("db.schema"), + ) + assert single_char_create.name.root == "a" + + # Test names with only separators + separator_only = CreateTableRequest( + name=EntityName("::"), + columns=[Column(name=ColumnName(">"), dataType=DataType.STRING)], + databaseSchema=FullyQualifiedEntityName("db.schema"), + ) + assert separator_only.name.root == "__reserved__colon__" + assert separator_only.columns[0].name.root == "__reserved__arrow__" + + def test_whitespace_handling(self): + """Test handling of whitespace in various scenarios.""" + whitespace_cases = [ + # Leading/trailing spaces + (" test::name ", " test__reserved__colon__name "), + # Spaces around separators + (" test :: name ", " test __reserved__colon__ name "), + # Multiple spaces + ("test :: name", "test __reserved__colon__ name"), + # Tabs and newlines (should be preserved) + ("test\t::\nname", "test\t__reserved__colon__\nname"), + ] + + for input_name, expected in whitespace_cases: + create_request = CreateTableRequest( + name=EntityName(input_name), + columns=[Column(name=ColumnName("col"), dataType=DataType.STRING)], + databaseSchema=FullyQualifiedEntityName("db.schema"), + ) + assert ( + create_request.name.root == expected + ), f"Failed for input: '{input_name}'" + + def test_table_constraints_comprehensive(self): + """Test comprehensive table constraints scenarios.""" + constraint_types = ["PRIMARY_KEY", "UNIQUE", "FOREIGN_KEY"] + constraints = [] + columns = [] + + for i, constraint_type in enumerate(constraint_types): + col_name = f"col_{i}__reserved__colon__constraint" + columns.append(Column(name=ColumnName(col_name), dataType=DataType.STRING)) + constraints.append( + TableConstraint(constraintType=constraint_type, columns=[col_name]) + ) + + create_request = CreateTableRequest( + name=EntityName("constraints__reserved__arrow__test"), + columns=columns, + tableConstraints=constraints, + databaseSchema=FullyQualifiedEntityName("db.schema"), + ) + + # Verify all constraints have transformed column names + for i, constraint in enumerate(create_request.tableConstraints): + expected_col = f"col_{i}__reserved__colon__constraint" + assert constraint.columns[0] == expected_col + + def test_entity_references_and_relationships(self): + """Test entity references and relationship handling.""" + # Test with complex entity references + table_with_refs = Table( + id=self.sample_table_id, + name="table__reserved__colon__with__reserved__arrow__refs", + databaseSchema=EntityReference( + id=uuid.uuid4(), + type="databaseSchema", + name="schema__reserved__quote__name", + ), + fullyQualifiedName="service.db.schema__reserved__quote__name.table", + columns=[ + Column( + name=ColumnName("ref__reserved__colon__column"), + dataType=DataType.STRING, + ) + ], + ) + + # Verify transformations + assert table_with_refs.name.root == "table::with>refs" + assert table_with_refs.columns[0].name.root == "ref::column" + # Entity references should not be transformed (they're separate entities) + assert table_with_refs.databaseSchema.name == "schema__reserved__quote__name" + + +class CustomSecretStrExtendedTest(TestCase): + """Extended test suite for CustomSecretStr functionality.""" + + def test_secret_creation_and_access(self): + """Test CustomSecretStr creation and value access.""" + secret = CustomSecretStr("test_password") + assert secret.get_secret_value() == "test_password" + assert str(secret) == "**********" + assert repr(secret) == "SecretStr('**********')" + + def test_empty_and_none_secrets(self): + """Test handling of empty and None secret values.""" + # Test empty secret + empty_secret = CustomSecretStr("") + assert empty_secret.get_secret_value() == "" + assert str(empty_secret) == "" + + # Test None secret handling + try: + none_secret = CustomSecretStr(None) + assert none_secret.get_secret_value() is None + except (TypeError, ValueError, AttributeError): + # This is acceptable behavior for None values + pass + + def test_long_secrets(self): + """Test handling of very long secret values.""" + long_secret_value = "a" * 1000 + long_secret = CustomSecretStr(long_secret_value) + assert long_secret.get_secret_value() == long_secret_value + assert ( + str(long_secret) == "**********" + ) # Should still mask regardless of length + + def test_special_character_secrets(self): + """Test secrets with special characters.""" + special_chars = "!@#$%^&*()_+-=[]{}|;':,.<>?/~`" + special_secret = CustomSecretStr(special_chars) + assert special_secret.get_secret_value() == special_chars + assert str(special_secret) == "**********" + + def test_unicode_secrets(self): + """Test secrets with Unicode characters.""" + unicode_secret = CustomSecretStr("密码测试🔒") + assert unicode_secret.get_secret_value() == "密码测试🔒" + assert str(unicode_secret) == "**********" + + def test_secret_equality_and_hashing(self): + """Test secret equality and hashing behavior.""" + secret1 = CustomSecretStr("password123") + secret2 = CustomSecretStr("password123") + secret3 = CustomSecretStr("different_password") + + # Test equality + assert secret1.get_secret_value() == secret2.get_secret_value() + assert secret1.get_secret_value() != secret3.get_secret_value() + + # Test that string representation is always masked + assert str(secret1) == str(secret2) == str(secret3) == "**********" + + def test_secret_in_nested_models_deep(self): + """Test secrets in deeply nested model structures.""" + + class Level3Model(BaseModel): + deep_secret: CustomSecretStr + deep_value: str + + class Level2Model(BaseModel): + mid_secret: CustomSecretStr + level3: Level3Model + + class Level1Model(BaseModel): + top_secret: CustomSecretStr + level2: Level2Model + + deep_data = { + "top_secret": "top_password", + "level2": { + "mid_secret": "mid_password", + "level3": {"deep_secret": "deep_password", "deep_value": "not_secret"}, + }, + } + + deep_model = Level1Model(**deep_data) + + # Test masked dump + masked = deep_model.model_dump(mask_secrets=True) + assert masked["top_secret"] == "**********" + assert masked["level2"]["mid_secret"] == "**********" + assert masked["level2"]["level3"]["deep_secret"] == "**********" + assert masked["level2"]["level3"]["deep_value"] == "not_secret" + + # Test unmasked dump + unmasked = deep_model.model_dump(mask_secrets=False) + assert unmasked["top_secret"] == "top_password" + assert unmasked["level2"]["mid_secret"] == "mid_password" + assert unmasked["level2"]["level3"]["deep_secret"] == "deep_password" + + def test_secret_with_optional_fields(self): + """Test secrets with optional fields.""" + + class OptionalSecretModel(BaseModel): + required_secret: CustomSecretStr + optional_secret: Optional[CustomSecretStr] = None + optional_value: Optional[str] = None + + # Test with all fields + full_model = OptionalSecretModel( + required_secret="required_pass", + optional_secret="optional_pass", + optional_value="some_value", + ) + + masked_full = full_model.model_dump(mask_secrets=True) + assert masked_full["required_secret"] == "**********" + assert masked_full["optional_secret"] == "**********" + assert masked_full["optional_value"] == "some_value" + + # Test with only required fields + minimal_model = OptionalSecretModel(required_secret="required_pass") + + masked_minimal = minimal_model.model_dump(mask_secrets=True) + assert masked_minimal["required_secret"] == "**********" + assert masked_minimal["optional_secret"] is None + assert masked_minimal["optional_value"] is None + + def test_secret_lists_and_dictionaries(self): + """Test secrets in lists and dictionaries.""" + + class ComplexSecretModel(BaseModel): + secret_list: List[CustomSecretStr] + nested_secrets: List[dict] + + complex_data = { + "secret_list": ["password1", "password2", "password3"], + "nested_secrets": [ + {"name": "config1", "secret": CustomSecretStr("secret1")}, + {"name": "config2", "secret": CustomSecretStr("secret2")}, + ], + } + + complex_model = ComplexSecretModel(**complex_data) + + # Test that list secrets are handled + assert len(complex_model.secret_list) == 3 + assert all(str(secret) == "**********" for secret in complex_model.secret_list) + assert all( + secret.get_secret_value() in ["password1", "password2", "password3"] + for secret in complex_model.secret_list + ) + + +class DashboardDataModelTransformationTest(TestCase): + """Test DashboardDataModel transformations with nested children and reserved keywords.""" + + def setUp(self): + """Set up test data.""" + self.sample_service = FullyQualifiedEntityName( + root='TestService.PowerBI."Analysis>Services::Environment"' + ) + + def test_create_dashboard_datamodel_with_nested_children(self): + """Test CreateDashboardDataModelRequest with nested children containing reserved keywords.""" + create_request = CreateDashboardDataModelRequest( + name=EntityName('financial::report>model"quarterly'), + displayName="Financial Report Model", + description=Markdown( + root="Financial reporting model with special characters" + ), + dataModelType=DataModelType.PowerBIDataModel, + service=self.sample_service, + columns=[ + Column( + name=ColumnName("revenue::metrics>summary"), + displayName="Revenue Metrics", + dataType=DataType.STRUCT, + description=Markdown(root="Revenue metrics structure"), + children=[ + Column( + name=ColumnName("total::revenue>amount"), + displayName="Total Revenue", + dataType=DataType.DECIMAL, + description=Markdown(root="Total revenue amount"), + ), + Column( + name=ColumnName('currency::code>"USD"'), + displayName="Currency Code", + dataType=DataType.STRING, + description=Markdown(root="Currency code with quotes"), + ), + Column( + name=ColumnName("nested::struct>data"), + displayName="Nested Structure", + dataType=DataType.STRUCT, + children=[ + Column( + name=ColumnName('deep::field>"value"'), + displayName="Deep Field", + dataType=DataType.STRING, + ) + ], + ), + ], + ), + Column( + name=ColumnName("expenses::breakdown>categories"), + displayName="Expense Breakdown", + dataType=DataType.ARRAY, + arrayDataType=DataType.STRUCT, + children=[ + Column( + name=ColumnName('category::name>"operations"'), + displayName="Category Name", + dataType=DataType.STRING, + ), + Column( + name=ColumnName("amount::value>total"), + displayName="Amount Value", + dataType=DataType.DECIMAL, + ), + ], + ), + ], + ) + + # Verify main entity name transformation (ENCODE for Create operations) + assert ( + create_request.name.root + == "financial__reserved__colon__report__reserved__arrow__model__reserved__quote__quarterly" + ) + + # Verify top-level column name transformations + assert ( + create_request.columns[0].name.root + == "revenue__reserved__colon__metrics__reserved__arrow__summary" + ) + assert ( + create_request.columns[1].name.root + == "expenses__reserved__colon__breakdown__reserved__arrow__categories" + ) + + # Verify nested children transformations (first level) + revenue_column = create_request.columns[0] + assert ( + revenue_column.children[0].name.root + == "total__reserved__colon__revenue__reserved__arrow__amount" + ) + assert ( + revenue_column.children[1].name.root + == "currency__reserved__colon__code__reserved__arrow____reserved__quote__USD__reserved__quote__" + ) + assert ( + revenue_column.children[2].name.root + == "nested__reserved__colon__struct__reserved__arrow__data" + ) + + # Verify deeply nested children transformations (second level) + nested_struct = revenue_column.children[2] + assert ( + nested_struct.children[0].name.root + == "deep__reserved__colon__field__reserved__arrow____reserved__quote__value__reserved__quote__" + ) + + # Verify array children transformations + expenses_column = create_request.columns[1] + assert ( + expenses_column.children[0].name.root + == "category__reserved__colon__name__reserved__arrow____reserved__quote__operations__reserved__quote__" + ) + assert ( + expenses_column.children[1].name.root + == "amount__reserved__colon__value__reserved__arrow__total" + ) + + def test_fetch_dashboard_datamodel_with_nested_children(self): + """Test DashboardDataModel fetch with nested children containing encoded reserved keywords.""" + dashboard_model = DashboardDataModel( + id=uuid.uuid4(), + name="financial__reserved__colon__report__reserved__arrow__model__reserved__quote__quarterly", + displayName="Financial Report Model", + dataModelType=DataModelType.PowerBIDataModel, + service=EntityReference(id=uuid.uuid4(), type="dashboardService"), + fullyQualifiedName="service.financial__reserved__colon__report__reserved__arrow__model__reserved__quote__quarterly", + columns=[ + Column( + name=ColumnName( + "revenue__reserved__colon__metrics__reserved__arrow__summary" + ), + displayName="Revenue Metrics", + dataType=DataType.STRUCT, + children=[ + Column( + name=ColumnName( + "total__reserved__colon__revenue__reserved__arrow__amount" + ), + displayName="Total Revenue", + dataType=DataType.DECIMAL, + ), + Column( + name=ColumnName( + "currency__reserved__colon__code__reserved__arrow____reserved__quote__USD__reserved__quote__" + ), + displayName="Currency Code", + dataType=DataType.STRING, + ), + Column( + name=ColumnName( + "nested__reserved__colon__struct__reserved__arrow__data" + ), + displayName="Nested Structure", + dataType=DataType.STRUCT, + children=[ + Column( + name=ColumnName( + "deep__reserved__colon__field__reserved__arrow____reserved__quote__value__reserved__quote__" + ), + displayName="Deep Field", + dataType=DataType.STRING, + ) + ], + ), + ], + ), + Column( + name=ColumnName( + "expenses__reserved__colon__breakdown__reserved__arrow__categories" + ), + displayName="Expense Breakdown", + dataType=DataType.ARRAY, + arrayDataType=DataType.STRUCT, + children=[ + Column( + name=ColumnName( + "category__reserved__colon__name__reserved__arrow____reserved__quote__operations__reserved__quote__" + ), + displayName="Category Name", + dataType=DataType.STRING, + ), + Column( + name=ColumnName( + "amount__reserved__colon__value__reserved__arrow__total" + ), + displayName="Amount Value", + dataType=DataType.DECIMAL, + ), + ], + ), + ], + ) + + # Verify main entity name transformation (DECODE for fetch operations) + assert dashboard_model.name.root == 'financial::report>model"quarterly' + + # Verify top-level column name transformations + assert dashboard_model.columns[0].name.root == "revenue::metrics>summary" + assert dashboard_model.columns[1].name.root == "expenses::breakdown>categories" + + # Verify nested children transformations (first level) + revenue_column = dashboard_model.columns[0] + assert revenue_column.children[0].name.root == "total::revenue>amount" + assert revenue_column.children[1].name.root == 'currency::code>"USD"' + assert revenue_column.children[2].name.root == "nested::struct>data" + + # Verify deeply nested children transformations (second level) + nested_struct = revenue_column.children[2] + assert nested_struct.children[0].name.root == 'deep::field>"value"' + + # Verify array children transformations + expenses_column = dashboard_model.columns[1] + assert expenses_column.children[0].name.root == 'category::name>"operations"' + assert expenses_column.children[1].name.root == "amount::value>total" + + def test_dashboard_datamodel_round_trip_transformation(self): + """Test round-trip transformation: Create -> Fetch -> Create maintains data integrity.""" + # Start with create request containing special characters + original_create = CreateDashboardDataModelRequest( + name=EntityName('analytics::dashboard>model"test'), + displayName="Analytics Dashboard Model", + dataModelType=DataModelType.PowerBIDataModel, + service=self.sample_service, + columns=[ + Column( + name=ColumnName("metrics::summary>report"), + dataType=DataType.STRUCT, + children=[ + Column( + name=ColumnName('total::count>"records"'), + dataType=DataType.INT, + ) + ], + ) + ], + ) + + # Simulate storage (encoded form) + stored_name = original_create.name.root # Should be encoded + stored_column_name = original_create.columns[0].name.root # Should be encoded + stored_nested_name = ( + original_create.columns[0].children[0].name.root + ) # Should be encoded + + # Simulate fetch operation (create DashboardDataModel with stored values) + fetched_model = DashboardDataModel( + id=uuid.uuid4(), + name=stored_name, + displayName="Analytics Dashboard Model", + dataModelType=DataModelType.PowerBIDataModel, + service=EntityReference(id=uuid.uuid4(), type="dashboardService"), + fullyQualifiedName=f"service.{stored_name}", + columns=[ + Column( + name=ColumnName(stored_column_name), + dataType=DataType.STRUCT, + children=[ + Column( + name=ColumnName(stored_nested_name), dataType=DataType.INT + ) + ], + ) + ], + ) + + # Verify fetch operation decodes correctly + assert fetched_model.name.root == 'analytics::dashboard>model"test' + assert fetched_model.columns[0].name.root == "metrics::summary>report" + assert ( + fetched_model.columns[0].children[0].name.root == 'total::count>"records"' + ) + + # Verify create operation encodes correctly + assert ( + stored_name + == "analytics__reserved__colon__dashboard__reserved__arrow__model__reserved__quote__test" + ) + assert ( + stored_column_name + == "metrics__reserved__colon__summary__reserved__arrow__report" + ) + assert ( + stored_nested_name + == "total__reserved__colon__count__reserved__arrow____reserved__quote__records__reserved__quote__" + ) + + def test_dashboard_datamodel_edge_cases(self): + """Test edge cases for DashboardDataModel transformations.""" + # Test with empty children + model_empty_children = DashboardDataModel( + id=uuid.uuid4(), + name="test__reserved__colon__model", + dataModelType=DataModelType.PowerBIDataModel, + service=EntityReference(id=uuid.uuid4(), type="dashboardService"), + fullyQualifiedName="service.test__reserved__colon__model", + columns=[ + Column( + name=ColumnName("parent__reserved__arrow__column"), + dataType=DataType.STRUCT, + children=[], # Empty children list + ) + ], + ) + + assert model_empty_children.name.root == "test::model" + assert model_empty_children.columns[0].name.root == "parent>column" + + # Test with None children + model_none_children = DashboardDataModel( + id=uuid.uuid4(), + name="test__reserved__quote__model", + dataModelType=DataModelType.PowerBIDataModel, + service=EntityReference(id=uuid.uuid4(), type="dashboardService"), + fullyQualifiedName="service.test__reserved__quote__model", + columns=[ + Column( + name=ColumnName("parent__reserved__quote__column"), + dataType=DataType.STRING, + children=None, # None children + ) + ], + ) + + assert model_none_children.name.root == 'test"model' + assert model_none_children.columns[0].name.root == 'parent"column' diff --git a/ingestion/tests/unit/test_fqn.py b/ingestion/tests/unit/test_fqn.py index fb403db69ab..b4b3b608146 100644 --- a/ingestion/tests/unit/test_fqn.py +++ b/ingestion/tests/unit/test_fqn.py @@ -16,8 +16,13 @@ from unittest.mock import MagicMock import pytest -from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.data.table import Column, Table from metadata.generated.schema.type.basic import FullyQualifiedEntityName +from metadata.ingestion.models.custom_basemodel_validation import ( + RESERVED_ARROW_KEYWORD, + RESERVED_COLON_KEYWORD, + RESERVED_QUOTE_KEYWORD, +) from metadata.ingestion.ometa.utils import quote from metadata.utils import fqn @@ -158,3 +163,128 @@ class TestFqn(TestCase): assert quote("a.b.c") == "a.b.c" assert quote(FullyQualifiedEntityName('"foo.bar".baz')) == "%22foo.bar%22.baz" assert quote('"foo.bar/baz".hello') == "%22foo.bar%2Fbaz%22.hello" + + def test_table_with_quotes(self): + """Test FQN building for table names containing quotes""" + mocked_metadata = MagicMock() + mocked_metadata.es_search_from_fqn.return_value = None + + table_name = 'users "2024"' + result = fqn.build( + metadata=mocked_metadata, + entity_type=Table, + service_name="mysql", + database_name="test_db", + schema_name="public", + table_name=table_name, + skip_es_search=True, + ) + + expected = f"mysql.test_db.public.users {RESERVED_QUOTE_KEYWORD}2024{RESERVED_QUOTE_KEYWORD}" + self.assertEqual(result, expected) + + def test_column_with_special_chars(self): + """Test FQN building for column names with multiple special characters""" + mocked_metadata = MagicMock() + mocked_metadata.es_search_from_fqn.return_value = None + + column_name = 'data::type>"info"' + result = fqn.build( + metadata=mocked_metadata, + entity_type=Column, + service_name="postgres", + database_name="analytics", + schema_name="reporting", + table_name="metrics", + column_name=column_name, + ) + + expected = f"postgres.analytics.reporting.metrics.data{RESERVED_COLON_KEYWORD}type{RESERVED_ARROW_KEYWORD}{RESERVED_QUOTE_KEYWORD}info{RESERVED_QUOTE_KEYWORD}" + self.assertEqual(result, expected) + + def test_both_table_and_column_special_chars(self): + """Test FQN building when both table and column have special characters""" + mocked_metadata = MagicMock() + mocked_metadata.es_search_from_fqn.return_value = None + + table_name = "report::daily" + column_name = 'value>"USD"' + + result = fqn.build( + metadata=mocked_metadata, + entity_type=Column, + service_name="snowflake", + database_name="warehouse", + schema_name="analytics", + table_name=table_name, + column_name=column_name, + ) + + expected = f"snowflake.warehouse.analytics.report{RESERVED_COLON_KEYWORD}daily.value{RESERVED_ARROW_KEYWORD}{RESERVED_QUOTE_KEYWORD}USD{RESERVED_QUOTE_KEYWORD}" + self.assertEqual(result, expected) + + def test_no_transformation_needed(self): + """Test FQN building for names without special characters""" + mocked_metadata = MagicMock() + mocked_metadata.es_search_from_fqn.return_value = None + + result = fqn.build( + metadata=mocked_metadata, + entity_type=Table, + service_name="mysql", + database_name="test_db", + schema_name="public", + table_name="normal_table_name", + skip_es_search=True, + ) + + self.assertEqual(result, "mysql.test_db.public.normal_table_name") + + def test_real_world_scenarios(self): + """Test FQN building for real-world database scenarios""" + mocked_metadata = MagicMock() + mocked_metadata.es_search_from_fqn.return_value = None + + # Snowflake case-sensitive identifier + snowflake_table = '"MixedCase_Table"' + result1 = fqn.build( + metadata=mocked_metadata, + entity_type=Table, + service_name="snowflake", + database_name="ANALYTICS", + schema_name="PUBLIC", + table_name=snowflake_table, + skip_es_search=True, + ) + expected1 = f"snowflake.ANALYTICS.PUBLIC.{RESERVED_QUOTE_KEYWORD}MixedCase_Table{RESERVED_QUOTE_KEYWORD}" + self.assertEqual(result1, expected1) + + # PostgreSQL type cast in column + postgres_column = "created_at::timestamp" + result2 = fqn.build( + metadata=mocked_metadata, + entity_type=Column, + service_name="postgres", + database_name="mydb", + schema_name="public", + table_name="events", + column_name=postgres_column, + ) + expected2 = ( + f"postgres.mydb.public.events.created_at{RESERVED_COLON_KEYWORD}timestamp" + ) + self.assertEqual(result2, expected2) + + # BigQuery partition notation + bigquery_table = 'events_2024$"daily"' + result3 = fqn.build( + metadata=mocked_metadata, + entity_type=Table, + service_name="bigquery", + database_name="my-project", + schema_name="dataset", + table_name=bigquery_table, + skip_es_search=True, + ) + expected3 = f"bigquery.my-project.dataset.events_2024${RESERVED_QUOTE_KEYWORD}daily{RESERVED_QUOTE_KEYWORD}" + self.assertEqual(result3, expected3) diff --git a/ingestion/tests/unit/utils/test_fqn_special_chars.py b/ingestion/tests/unit/utils/test_fqn_special_chars.py new file mode 100644 index 00000000000..1d9e557d0b3 --- /dev/null +++ b/ingestion/tests/unit/utils/test_fqn_special_chars.py @@ -0,0 +1,567 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Comprehensive tests for FQN building with special characters in table and column names. +Tests happy paths, edge cases, error scenarios, and boundaries. +""" + +import unittest +from unittest.mock import Mock, patch + +from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema +from metadata.generated.schema.entity.data.storedProcedure import StoredProcedure +from metadata.generated.schema.entity.data.table import Column, Table +from metadata.ingestion.models.custom_basemodel_validation import ( + RESERVED_ARROW_KEYWORD, + RESERVED_COLON_KEYWORD, + RESERVED_QUOTE_KEYWORD, +) +from metadata.utils import fqn +from metadata.utils.fqn import FQNBuildingException + + +class TestFQNSpecialCharacters(unittest.TestCase): + """Test FQN building with special characters""" + + def setUp(self): + """Set up test fixtures""" + self.mock_metadata = Mock() + + def tearDown(self): + """Clean up after tests""" + # Reset any mocks + self.mock_metadata.reset_mock() + + # ========== HAPPY PATH TESTS ========== + + def test_table_name_with_quotes(self): + """Test table name containing quotes""" + result = fqn.build( + metadata=self.mock_metadata, + entity_type=Table, + service_name="mysql", + database_name="test_db", + schema_name="public", + table_name='users "2024"', + skip_es_search=True, + ) + + expected = f"mysql.test_db.public.users {RESERVED_QUOTE_KEYWORD}2024{RESERVED_QUOTE_KEYWORD}" + self.assertEqual(result, expected) + + def test_table_name_with_colons(self): + """Test table name containing double colons""" + result = fqn.build( + metadata=self.mock_metadata, + entity_type=Table, + service_name="postgres", + database_name="analytics", + schema_name="reporting", + table_name="report::daily_summary", + skip_es_search=True, + ) + + expected = ( + f"postgres.analytics.reporting.report{RESERVED_COLON_KEYWORD}daily_summary" + ) + self.assertEqual(result, expected) + + def test_table_name_with_arrows(self): + """Test table name containing arrow characters""" + result = fqn.build( + metadata=self.mock_metadata, + entity_type=Table, + service_name="snowflake", + database_name="warehouse", + schema_name="staging", + table_name="stage>production_data", + skip_es_search=True, + ) + + expected = ( + f"snowflake.warehouse.staging.stage{RESERVED_ARROW_KEYWORD}production_data" + ) + self.assertEqual(result, expected) + + def test_column_name_with_quotes(self): + """Test column name containing quotes""" + result = fqn.build( + metadata=self.mock_metadata, + entity_type=Column, + service_name="mysql", + database_name="test_db", + schema_name="public", + table_name="users", + column_name='data "value"', + ) + + expected = f"mysql.test_db.public.users.data {RESERVED_QUOTE_KEYWORD}value{RESERVED_QUOTE_KEYWORD}" + self.assertEqual(result, expected) + + def test_column_name_with_multiple_special_chars(self): + """Test column name with combination of special characters""" + result = fqn.build( + metadata=self.mock_metadata, + entity_type=Column, + service_name="postgres", + database_name="analytics", + schema_name="public", + table_name="metrics", + column_name='metric::type>"category"', + ) + + expected = ( + f"postgres.analytics.public.metrics.metric{RESERVED_COLON_KEYWORD}" + f"type{RESERVED_ARROW_KEYWORD}{RESERVED_QUOTE_KEYWORD}category{RESERVED_QUOTE_KEYWORD}" + ) + self.assertEqual(result, expected) + + def test_both_table_and_column_with_special_chars(self): + """Test both table and column names with special characters""" + result = fqn.build( + metadata=self.mock_metadata, + entity_type=Column, + service_name="mysql", + database_name="test", + schema_name="schema", + table_name='table "2024"', + column_name="column::data>info", + ) + + table_transformed = ( + f"table {RESERVED_QUOTE_KEYWORD}2024{RESERVED_QUOTE_KEYWORD}" + ) + column_transformed = ( + f"column{RESERVED_COLON_KEYWORD}data{RESERVED_ARROW_KEYWORD}info" + ) + expected = f"mysql.test.schema.{table_transformed}.{column_transformed}" + self.assertEqual(result, expected) + + # ========== EDGE CASES ========== + + def test_empty_special_chars_only(self): + """Test names that are only special characters""" + # Just quotes + result = fqn.build( + metadata=self.mock_metadata, + entity_type=Table, + service_name="mysql", + database_name="test", + schema_name="public", + table_name='""', + skip_es_search=True, + ) + expected = f"mysql.test.public.{RESERVED_QUOTE_KEYWORD}{RESERVED_QUOTE_KEYWORD}" + self.assertEqual(result, expected) + + # Just colons + result = fqn.build( + metadata=self.mock_metadata, + entity_type=Column, + service_name="mysql", + database_name="test", + schema_name="public", + table_name="users", + column_name="::", + ) + expected = f"mysql.test.public.users.{RESERVED_COLON_KEYWORD}" + self.assertEqual(result, expected) + + def test_consecutive_special_chars(self): + """Test consecutive special characters""" + result = fqn.build( + metadata=self.mock_metadata, + entity_type=Table, + service_name="postgres", + database_name="db", + schema_name="schema", + table_name='data::::"">>>>>', + skip_es_search=True, + ) + + # Each special char should be replaced + transformed = ( + f"data{RESERVED_COLON_KEYWORD}{RESERVED_COLON_KEYWORD}{RESERVED_QUOTE_KEYWORD}" + f"{RESERVED_QUOTE_KEYWORD}{RESERVED_ARROW_KEYWORD}" + f"{RESERVED_ARROW_KEYWORD}{RESERVED_ARROW_KEYWORD}" + f"{RESERVED_ARROW_KEYWORD}{RESERVED_ARROW_KEYWORD}" + ) + expected = f"postgres.db.schema.{transformed}" + self.assertEqual(result, expected) + + def test_special_chars_at_boundaries(self): + """Test special characters at start and end of names""" + # Special char at start + result = fqn.build( + metadata=self.mock_metadata, + entity_type=Table, + service_name="mysql", + database_name="db", + schema_name="schema", + table_name='"table_name', + skip_es_search=True, + ) + expected = f"mysql.db.schema.{RESERVED_QUOTE_KEYWORD}table_name" + self.assertEqual(result, expected) + + # Special char at end + result = fqn.build( + metadata=self.mock_metadata, + entity_type=Column, + service_name="mysql", + database_name="db", + schema_name="schema", + table_name="table", + column_name="column_name::", + ) + expected = f"mysql.db.schema.table.column_name{RESERVED_COLON_KEYWORD}" + self.assertEqual(result, expected) + + def test_unicode_with_special_chars(self): + """Test Unicode characters mixed with special characters""" + result = fqn.build( + metadata=self.mock_metadata, + entity_type=Table, + service_name="mysql", + database_name="test", + schema_name="public", + table_name='測試::table>"数据"', + skip_es_search=True, + ) + + transformed = f"測試{RESERVED_COLON_KEYWORD}table{RESERVED_ARROW_KEYWORD}{RESERVED_QUOTE_KEYWORD}数据{RESERVED_QUOTE_KEYWORD}" + expected = f"mysql.test.public.{transformed}" + self.assertEqual(result, expected) + + def test_emoji_with_special_chars(self): + """Test emojis mixed with special characters""" + result = fqn.build( + metadata=self.mock_metadata, + entity_type=Column, + service_name="postgres", + database_name="fun", + schema_name="emoji", + table_name="data", + column_name='🚀::rocket>"launch"', + ) + + transformed = f"🚀{RESERVED_COLON_KEYWORD}rocket{RESERVED_ARROW_KEYWORD}{RESERVED_QUOTE_KEYWORD}launch{RESERVED_QUOTE_KEYWORD}" + expected = f"postgres.fun.emoji.data.{transformed}" + self.assertEqual(result, expected) + + # ========== NULL/NONE HANDLING ========== + + def test_none_table_name(self): + """Test with None table name - should not transform""" + result = fqn.build( + metadata=self.mock_metadata, + entity_type=Database, + service_name="mysql", + database_name="test_db", + ) + + # Should work without transformation + expected = "mysql.test_db" + self.assertEqual(result, expected) + + def test_none_column_name(self): + """Test with None column name - should handle gracefully""" + with self.assertRaises(FQNBuildingException): + fqn.build( + metadata=self.mock_metadata, + entity_type=Table, + service_name="mysql", + database_name="db", + schema_name="schema", + table_name="table_name", + column_name=None, + skip_es_search=True, + ) + + def test_empty_string_names(self): + """Test with empty string names""" + # Empty table name should still be processed + result = fqn.build( + metadata=self.mock_metadata, + entity_type=Table, + service_name="mysql", + database_name="db", + schema_name="schema", + table_name="", + skip_es_search=True, + ) + + # Empty string is valid + expected = "mysql.db.schema." + self.assertEqual(result, expected) + + # ========== OTHER ENTITY TYPES (No Transformation) ========== + + def test_database_name_with_quotes_should_raise_error(self): + """Test that Database entities don't get transformed""" + with self.assertRaises(FQNBuildingException): + fqn.build( + metadata=self.mock_metadata, + entity_type=Database, + service_name="mysql", + database_name='db "name"', + ) + + def test_schema_name_with_quotes_should_raise_error(self): + """Test that DatabaseSchema entities don't get transformed""" + with self.assertRaises(FQNBuildingException): + fqn.build( + metadata=self.mock_metadata, + entity_type=DatabaseSchema, + service_name="postgres", + database_name="db", + schema_name='schema::"name"', + skip_es_search=True, + ) + + def test_stored_procedure_name_with_quotes_should_not_transform(self): + """Test that StoredProcedure entities don't get transformed""" + with self.assertRaises(FQNBuildingException): + fqn.build( + metadata=self.mock_metadata, + entity_type=StoredProcedure, + service_name="mysql", + database_name="db", + schema_name="schema", + procedure_name='proc>"name"', + ) + + # ========== INTEGRATION WITH EXISTING BEHAVIOR ========== + + def test_names_without_special_chars_unchanged(self): + """Test that names without special characters remain unchanged""" + # Table without special chars + result = fqn.build( + metadata=self.mock_metadata, + entity_type=Table, + service_name="mysql", + database_name="test_db", + schema_name="public", + table_name="normal_table_name", + skip_es_search=True, + ) + expected = "mysql.test_db.public.normal_table_name" + self.assertEqual(result, expected) + + # Column without special chars + result = fqn.build( + metadata=self.mock_metadata, + entity_type=Column, + service_name="postgres", + database_name="db", + schema_name="schema", + table_name="table", + column_name="normal_column_name", + ) + expected = "postgres.db.schema.table.normal_column_name" + self.assertEqual(result, expected) + + def test_dots_in_names_still_quoted(self): + """Test that dots in names still trigger quoting""" + result = fqn.build( + metadata=self.mock_metadata, + entity_type=Table, + service_name="mysql", + database_name="db", + schema_name="schema", + table_name="table.with.dots", + skip_es_search=True, + ) + + # Dots should still trigger quoting in quote_name + self.assertIn('"table.with.dots"', result) + + # ========== ERROR SCENARIOS ========== + + def test_invalid_entity_type_still_fails(self): + """Test that invalid entity types still raise exceptions""" + + class InvalidEntity: + pass + + with self.assertRaises(FQNBuildingException) as context: + fqn.build( + metadata=self.mock_metadata, + entity_type=InvalidEntity, + service_name="mysql", + ) + + self.assertIn("Invalid Entity Type", str(context.exception)) + + def test_transformation_with_es_search(self): + """Test transformation works with ES search enabled""" + # Mock ES search to return None (entity not found) + self.mock_metadata.es_search_from_fqn.return_value = [] + + result = fqn.build( + metadata=self.mock_metadata, + entity_type=Table, + service_name="mysql", + database_name="db", + schema_name="schema", + table_name='table "name"', + skip_es_search=False, + ) + + # Even with ES search, transformation should happen + expected = f"mysql.db.schema.table {RESERVED_QUOTE_KEYWORD}name{RESERVED_QUOTE_KEYWORD}" + self.assertEqual(result, expected) + + # ========== PERFORMANCE AND SCALE ========== + + def test_very_long_names_with_special_chars(self): + """Test very long names with special characters""" + long_name = "a" * 100 + "::" + "b" * 100 + '>"' + "c" * 100 + '"' + + result = fqn.build( + metadata=self.mock_metadata, + entity_type=Table, + service_name="mysql", + database_name="db", + schema_name="schema", + table_name=long_name, + skip_es_search=True, + ) + + # Should handle long names + self.assertIn(RESERVED_COLON_KEYWORD, result) + self.assertIn(RESERVED_ARROW_KEYWORD, result) + self.assertIn(RESERVED_QUOTE_KEYWORD, result) + self.assertIn("a" * 100, result) + self.assertIn("b" * 100, result) + self.assertIn("c" * 100, result) + + def test_reserved_keywords_in_names(self): + """Test that reserved keywords themselves are handled""" + # What if someone has __reserved__colon__ in their table name? + result = fqn.build( + metadata=self.mock_metadata, + entity_type=Table, + service_name="mysql", + database_name="db", + schema_name="schema", + table_name=f"table{RESERVED_COLON_KEYWORD}weird", + skip_es_search=True, + ) + + # Should not double-transform + expected = f"mysql.db.schema.table{RESERVED_COLON_KEYWORD}weird" + self.assertEqual(result, expected) + + # ========== IMPORT ERROR HANDLING ========== + + @patch("metadata.utils.fqn.build") + def test_import_error_handling(self, mock_build): + """Test handling when custom_basemodel_validation import fails""" + + def side_effect(*args, **kwargs): + # Simulate import error + if kwargs.get("table_name") or kwargs.get("column_name"): + raise ImportError("Cannot import custom_basemodel_validation") + return "mysql.db.schema.table" + + mock_build.side_effect = side_effect + + # Should raise the import error + with self.assertRaises(ImportError): + mock_build( + metadata=self.mock_metadata, + entity_type=Table, + service_name="mysql", + database_name="db", + schema_name="schema", + table_name='table "name"', + ) + + +class TestFQNSpecialCharsRealWorldScenarios(unittest.TestCase): + """Test real-world scenarios from actual database systems""" + + def setUp(self): + """Set up test fixtures""" + self.mock_metadata = Mock() + self.mock_metadata.es_search_from_fqn.return_value = [] + + def test_snowflake_quoted_identifiers(self): + """Test Snowflake-style quoted identifiers""" + # Snowflake uses quotes for case-sensitive identifiers + result = fqn.build( + metadata=self.mock_metadata, + entity_type=Table, + service_name="snowflake", + database_name="ANALYTICS", + schema_name="PUBLIC", + table_name='"MixedCase_Table"', + skip_es_search=True, + ) + + expected = f"snowflake.ANALYTICS.PUBLIC.{RESERVED_QUOTE_KEYWORD}MixedCase_Table{RESERVED_QUOTE_KEYWORD}" + self.assertEqual(result, expected) + + def test_postgres_special_schemas(self): + """Test PostgreSQL special schema names""" + result = fqn.build( + metadata=self.mock_metadata, + entity_type=Column, + service_name="postgres", + database_name="mydb", + schema_name="pg_catalog", + table_name="pg_type", + column_name="typname::text", + ) + + expected = ( + f"postgres.mydb.pg_catalog.pg_type.typname{RESERVED_COLON_KEYWORD}text" + ) + self.assertEqual(result, expected) + + def test_bigquery_dataset_table_notation(self): + """Test BigQuery dataset.table notation""" + result = fqn.build( + metadata=self.mock_metadata, + entity_type=Table, + service_name="bigquery", + database_name="my-project", + schema_name="dataset", + table_name='table_2024_01_01$"partition"', + skip_es_search=True, + ) + + # Dollar signs are not transformed, only quotes + expected = f"bigquery.my-project.dataset.table_2024_01_01${RESERVED_QUOTE_KEYWORD}partition{RESERVED_QUOTE_KEYWORD}" + self.assertEqual(result, expected) + + def test_mysql_backtick_conversion(self): + """Test MySQL backtick identifiers (already handled by parser)""" + # Assuming backticks are converted to quotes before reaching FQN + result = fqn.build( + metadata=self.mock_metadata, + entity_type=Column, + service_name="mysql", + database_name="test", + schema_name="public", + table_name="orders", + column_name='"order-date"', # Backticks converted to quotes + ) + + expected = f"mysql.test.public.orders.{RESERVED_QUOTE_KEYWORD}order-date{RESERVED_QUOTE_KEYWORD}" + self.assertEqual(result, expected) + + +if __name__ == "__main__": + unittest.main()