# Copyright 2025 Collate # Licensed under the Collate Community License, Version 1.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Test FQN build behavior """ from unittest import TestCase from unittest.mock import MagicMock import pytest from metadata.generated.schema.entity.classification.tag import Tag from metadata.generated.schema.entity.data.apiCollection import APICollection from metadata.generated.schema.entity.data.chart import Chart from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.mlmodel import MlModel from metadata.generated.schema.entity.data.pipeline import Pipeline from metadata.generated.schema.entity.data.searchIndex import SearchIndex from metadata.generated.schema.entity.data.storedProcedure import StoredProcedure from metadata.generated.schema.entity.data.table import Column, DataModel, Table from metadata.generated.schema.entity.data.topic import Topic from metadata.generated.schema.type.basic import FullyQualifiedEntityName from metadata.ingestion.models.custom_basemodel_validation import ( RESERVED_ARROW_KEYWORD, RESERVED_COLON_KEYWORD, RESERVED_QUOTE_KEYWORD, ) from metadata.ingestion.ometa.utils import quote from metadata.utils import fqn class TestFqn(TestCase): """ Validate FQN building """ def test_split(self): this = self class FQNTest: """ Test helper class """ def __init__(self, parts, fqn): self.parts = parts self.fqn = fqn def validate(self, actual_parts, actual_fqn): this.assertEqual(self.fqn, actual_fqn) this.assertEqual(len(self.parts), len(actual_parts)) for i in range(len(self.parts)): if "." in self.parts[i]: this.assertEqual(fqn.quote_name(self.parts[i]), actual_parts[i]) else: this.assertEqual(self.parts[i], actual_parts[i]) xs = [ FQNTest(["a", "b", "c", "d"], "a.b.c.d"), FQNTest(["a.1", "b", "c", "d"], '"a.1".b.c.d'), FQNTest(["a", "b.2", "c", "d"], 'a."b.2".c.d'), FQNTest(["a", "b", "c.3", "d"], 'a.b."c.3".d'), FQNTest(["a", "b", "c", "d.4"], 'a.b.c."d.4"'), FQNTest(["a.1", "b.2", "c", "d"], '"a.1"."b.2".c.d'), FQNTest(["a.1", "b.2", "c.3", "d"], '"a.1"."b.2"."c.3".d'), FQNTest(["a.1", "b.2", "c.3", "d.4"], '"a.1"."b.2"."c.3"."d.4"'), FQNTest(["fqn", "test.test.test"], 'fqn."test.test.test"'), FQNTest(["fqn", "testtesttest"], "fqn.testtesttest"), FQNTest(["fqn", "testtes ttest"], "fqn.testtes ttest"), ] for x in xs: x.validate(fqn.split(x.fqn), fqn._build(*x.parts)) def test_quote_name(self): """ Make sure that fqns are properly quoted """ # Unquote_named name remains unquote_named self.assertEqual("a", fqn.quote_name("a")) # Add quote_names when "." exists in the name self.assertEqual('"a.b"', fqn.quote_name("a.b")) # Leave existing valid quote_names self.assertEqual('"a.b"', fqn.quote_name('"a.b"')) # Remove quote_names when not needed self.assertEqual("a", fqn.quote_name('"a"')) with self.assertRaises(Exception) as context: fqn.quote_name('"a') self.assertEqual('Invalid name "a', str(context.exception)) with self.assertRaises(Exception) as context: fqn.quote_name('a"') self.assertEqual('Invalid name a"', str(context.exception)) with self.assertRaises(Exception) as context: fqn.quote_name('a"b') self.assertEqual('Invalid name a"b', str(context.exception)) def test_invalid(self): with self.assertRaises(Exception): fqn.split('a.."') def test_build_table(self): """ Validate Table FQN building """ mocked_metadata = MagicMock() mocked_metadata.es_search_from_fqn.return_value = None table_fqn = fqn.build( metadata=mocked_metadata, entity_type=Table, service_name="service", database_name="db", schema_name="schema", table_name="table", ) self.assertEqual(table_fqn, "service.db.schema.table") table_fqn_dots = fqn.build( metadata=mocked_metadata, entity_type=Table, service_name="service", database_name="data.base", schema_name="schema", table_name="table", ) self.assertEqual(table_fqn_dots, 'service."data.base".schema.table') table_fqn_space = fqn.build( metadata=mocked_metadata, entity_type=Table, service_name="service", database_name="data base", schema_name="schema", table_name="table", ) self.assertEqual(table_fqn_space, "service.data base.schema.table") def test_split_test_case_fqn(self): """test for split test case""" split_fqn = fqn.split_test_case_fqn( "local_redshift.dev.dbt_jaffle.customers.customer_id.expect_column_max_to_be_between" ) assert split_fqn.service == "local_redshift" assert split_fqn.database == "dev" assert split_fqn.schema_ == "dbt_jaffle" assert split_fqn.table == "customers" assert split_fqn.column == "customer_id" assert split_fqn.test_case == "expect_column_max_to_be_between" split_fqn = fqn.split_test_case_fqn( "local_redshift.dev.dbt_jaffle.customers.expect_table_column_to_be_between" ) assert not split_fqn.column assert split_fqn.test_case == "expect_table_column_to_be_between" with pytest.raises(ValueError): fqn.split_test_case_fqn("local_redshift.dev.dbt_jaffle.customers") def test_quote_fqns(self): """We can properly quote FQNs for URL usage""" assert quote(FullyQualifiedEntityName("a.b.c")) == "a.b.c" # Works with strings directly assert quote("a.b.c") == "a.b.c" assert quote(FullyQualifiedEntityName('"foo.bar".baz')) == "%22foo.bar%22.baz" assert quote('"foo.bar/baz".hello') == "%22foo.bar%2Fbaz%22.hello" def test_table_with_quotes(self): """Test FQN building for table names containing quotes""" mocked_metadata = MagicMock() mocked_metadata.es_search_from_fqn.return_value = None table_name = 'users "2024"' result = fqn.build( metadata=mocked_metadata, entity_type=Table, service_name="mysql", database_name="test_db", schema_name="public", table_name=table_name, skip_es_search=True, ) expected = f"mysql.test_db.public.users {RESERVED_QUOTE_KEYWORD}2024{RESERVED_QUOTE_KEYWORD}" self.assertEqual(result, expected) def test_column_with_special_chars(self): """Test FQN building for column names with multiple special characters""" mocked_metadata = MagicMock() mocked_metadata.es_search_from_fqn.return_value = None column_name = 'data::type>"info"' result = fqn.build( metadata=mocked_metadata, entity_type=Column, service_name="postgres", database_name="analytics", schema_name="reporting", table_name="metrics", column_name=column_name, ) expected = f"postgres.analytics.reporting.metrics.data{RESERVED_COLON_KEYWORD}type{RESERVED_ARROW_KEYWORD}{RESERVED_QUOTE_KEYWORD}info{RESERVED_QUOTE_KEYWORD}" self.assertEqual(result, expected) def test_both_table_and_column_special_chars(self): """Test FQN building when both table and column have special characters""" mocked_metadata = MagicMock() mocked_metadata.es_search_from_fqn.return_value = None table_name = "report::daily" column_name = 'value>"USD"' result = fqn.build( metadata=mocked_metadata, entity_type=Column, service_name="snowflake", database_name="warehouse", schema_name="analytics", table_name=table_name, column_name=column_name, ) expected = f"snowflake.warehouse.analytics.report{RESERVED_COLON_KEYWORD}daily.value{RESERVED_ARROW_KEYWORD}{RESERVED_QUOTE_KEYWORD}USD{RESERVED_QUOTE_KEYWORD}" self.assertEqual(result, expected) def test_no_transformation_needed(self): """Test FQN building for names without special characters""" mocked_metadata = MagicMock() mocked_metadata.es_search_from_fqn.return_value = None result = fqn.build( metadata=mocked_metadata, entity_type=Table, service_name="mysql", database_name="test_db", schema_name="public", table_name="normal_table_name", skip_es_search=True, ) self.assertEqual(result, "mysql.test_db.public.normal_table_name") def test_real_world_scenarios(self): """Test FQN building for real-world database scenarios""" mocked_metadata = MagicMock() mocked_metadata.es_search_from_fqn.return_value = None # Snowflake case-sensitive identifier snowflake_table = '"MixedCase_Table"' result1 = fqn.build( metadata=mocked_metadata, entity_type=Table, service_name="snowflake", database_name="ANALYTICS", schema_name="PUBLIC", table_name=snowflake_table, skip_es_search=True, ) expected1 = f"snowflake.ANALYTICS.PUBLIC.{RESERVED_QUOTE_KEYWORD}MixedCase_Table{RESERVED_QUOTE_KEYWORD}" self.assertEqual(result1, expected1) # PostgreSQL type cast in column postgres_column = "created_at::timestamp" result2 = fqn.build( metadata=mocked_metadata, entity_type=Column, service_name="postgres", database_name="mydb", schema_name="public", table_name="events", column_name=postgres_column, ) expected2 = ( f"postgres.mydb.public.events.created_at{RESERVED_COLON_KEYWORD}timestamp" ) self.assertEqual(result2, expected2) # BigQuery partition notation bigquery_table = 'events_2024$"daily"' result3 = fqn.build( metadata=mocked_metadata, entity_type=Table, service_name="bigquery", database_name="my-project", schema_name="dataset", table_name=bigquery_table, skip_es_search=True, ) expected3 = f"bigquery.my-project.dataset.events_2024${RESERVED_QUOTE_KEYWORD}daily{RESERVED_QUOTE_KEYWORD}" self.assertEqual(result3, expected3) def test_prefix_entity_for_wildcard_search(self): """Test wildcard search prefix generation for all supported entity types""" # Table (4 slots: service.database.schema.table) # Full FQN - no wildcards needed table_fqn = "my_service.my_db.my_schema.my_table" result = fqn.prefix_entity_for_wildcard_search(Table, table_fqn) self.assertEqual(result, "my_service.my_db.my_schema.my_table") # Table with partial FQN - needs wildcards table_fqn_partial = "my_schema.my_table" result = fqn.prefix_entity_for_wildcard_search(Table, table_fqn_partial) self.assertEqual(result, "*.*.my_schema.my_table") # Table with just table name - needs all wildcards table_fqn_minimal = "my_table" result = fqn.prefix_entity_for_wildcard_search(Table, table_fqn_minimal) self.assertEqual(result, "*.*.*.my_table") # Table with quoted parts table_fqn_quoted = '"my.schema".my_table' result_quoted = fqn.prefix_entity_for_wildcard_search(Table, table_fqn_quoted) self.assertEqual(result_quoted, '*.*."my.schema".my_table') # DatabaseSchema (3 slots: service.database.schema) schema_fqn = "public" result = fqn.prefix_entity_for_wildcard_search(DatabaseSchema, schema_fqn) self.assertEqual(result, "*.*.public") schema_fqn_full = "postgres_service.analytics_db.public" result = fqn.prefix_entity_for_wildcard_search(DatabaseSchema, schema_fqn_full) self.assertEqual(result, "postgres_service.analytics_db.public") # Database (2 slots: service.database) database_fqn = "production_db" result = fqn.prefix_entity_for_wildcard_search(Database, database_fqn) self.assertEqual(result, "*.production_db") database_fqn_full = "mysql_service.production_db" result = fqn.prefix_entity_for_wildcard_search(Database, database_fqn_full) self.assertEqual(result, "mysql_service.production_db") # Dashboard (2 slots: service.dashboard) dashboard_fqn = "sales_dashboard" result = fqn.prefix_entity_for_wildcard_search(Dashboard, dashboard_fqn) self.assertEqual(result, "*.sales_dashboard") # APICollection (2 slots: service.collection) api_collection_fqn = "users_api" result = fqn.prefix_entity_for_wildcard_search( APICollection, api_collection_fqn ) self.assertEqual(result, "*.users_api") # Chart (2 slots: service.chart) chart_fqn = "revenue_chart" result = fqn.prefix_entity_for_wildcard_search(Chart, chart_fqn) self.assertEqual(result, "*.revenue_chart") # MlModel (2 slots: service.model) mlmodel_fqn = "fraud_detection_model" result = fqn.prefix_entity_for_wildcard_search(MlModel, mlmodel_fqn) self.assertEqual(result, "*.fraud_detection_model") # Topic (2 slots: service.topic) topic_fqn = "potato" result = fqn.prefix_entity_for_wildcard_search(Topic, topic_fqn) self.assertEqual(result, "*.potato") topic_fqn_full = "kafka.user_events" result = fqn.prefix_entity_for_wildcard_search(Topic, topic_fqn_full) self.assertEqual(result, "kafka.user_events") # SearchIndex (2 slots: service.index) search_index_fqn = "product_index" result = fqn.prefix_entity_for_wildcard_search(SearchIndex, search_index_fqn) self.assertEqual(result, "*.product_index") # Tag (2 slots: classification.tag) tag_fqn = "Sensitive" result = fqn.prefix_entity_for_wildcard_search(Tag, tag_fqn) self.assertEqual(result, "*.Sensitive") tag_fqn_full = "PII.Sensitive" result = fqn.prefix_entity_for_wildcard_search(Tag, tag_fqn_full) self.assertEqual(result, "PII.Sensitive") # DataModel (2 slots: service.model) data_model_fqn = "customer_model" result = fqn.prefix_entity_for_wildcard_search(DataModel, data_model_fqn) self.assertEqual(result, "*.customer_model") # StoredProcedure (4 slots: service.database.schema.procedure) stored_proc_fqn = "calculate_revenue" result = fqn.prefix_entity_for_wildcard_search(StoredProcedure, stored_proc_fqn) self.assertEqual(result, "*.*.*.calculate_revenue") stored_proc_fqn_partial = "public.calculate_revenue" result = fqn.prefix_entity_for_wildcard_search( StoredProcedure, stored_proc_fqn_partial ) self.assertEqual(result, "*.*.public.calculate_revenue") stored_proc_fqn_full = "oracle.sales_db.public.calculate_revenue" result = fqn.prefix_entity_for_wildcard_search( StoredProcedure, stored_proc_fqn_full ) self.assertEqual(result, "oracle.sales_db.public.calculate_revenue") # Pipeline (2 slots: service.pipeline) pipeline_fqn = "daily_ingestion" result = fqn.prefix_entity_for_wildcard_search(Pipeline, pipeline_fqn) self.assertEqual(result, "*.daily_ingestion") # Test error cases # FQN with too many parts with pytest.raises(fqn.FQNBuildingException) as exc: fqn.prefix_entity_for_wildcard_search( Table, "service.db.schema.table.extra" ) assert "has too many parts" in str(exc.value) # Test unsupported entity type (Column doesn't have slots defined) with pytest.raises(fqn.FQNBuildingException) as exc: fqn.prefix_entity_for_wildcard_search(Column, "column") assert "not supported for wildcard search" in str(exc.value)