diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py index 919a05f5df9..a66a0925853 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py @@ -39,6 +39,7 @@ from metadata.ingestion.models.custom_pydantic import BaseModel from metadata.ingestion.ometa.client import REST, APIError from metadata.ingestion.ometa.utils import quote from metadata.ingestion.source.models import TableView +from metadata.utils import fqn from metadata.utils.elasticsearch import ES_INDEX_MAP, get_entity_from_es_result from metadata.utils.execution_time_tracker import calculate_execution_time_generator from metadata.utils.logger import ometa_logger @@ -521,10 +522,13 @@ class ESMixin(Generic[T]): fetch table from es when with/without `db_service_name` """ try: + prepended_fqn = fqn.prefix_entity_for_wildcard_search( + entity_type=entity_type, fqn=fqn_search_string + ) entity_result = get_entity_from_es_result( entity_list=self.es_search_from_fqn( entity_type=entity_type, - fqn_search_string=fqn_search_string, + fqn_search_string=prepended_fqn, ), fetch_multiple_entities=fetch_multiple_entities, ) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/client.py b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/client.py index d5e8ecc847c..35a55b3351d 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/client.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/client.py @@ -110,16 +110,16 @@ class ConnectorConfigKeys: "db.name", "snowflake.database.name", "database.include.list", - "database.hostname", - "connection.url", + # "database.hostname", + # "connection.url", "database.dbname", "topic.prefix", - "database.server.name", # Debezium V1 + # "database.server.name", # This maps the server name, not the actual database "databases.include", "database.names", "snowflake.database", - "connection.host", - "database.exclude.list", + # "connection.host", + # "database.exclude.list", ] CONTAINER_KEYS = [ diff --git a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/metadata.py index 2ff2af8407d..c3a2654d372 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/metadata.py @@ -182,7 +182,7 @@ class KafkaconnectSource(PipelineServiceSource): ) # Build search string: schema.table format search_string = ( - f"{dataset_details.database}.{dataset_details.table}" + f"{fqn.quote_name(dataset_details.database)}.{fqn.quote_name(dataset_details.table)}" if dataset_details.database else dataset_details.table ) diff --git a/ingestion/src/metadata/utils/fqn.py b/ingestion/src/metadata/utils/fqn.py index c41c8a15214..7a55c3d9165 100644 --- a/ingestion/src/metadata/utils/fqn.py +++ b/ingestion/src/metadata/utils/fqn.py @@ -13,10 +13,12 @@ Handle FQN building and splitting logic. Filter information has been taken from the ES indexes definitions """ +from __future__ import annotations + import hashlib import re import traceback -from typing import Dict, List, Optional, Type, TypeVar, Union +from typing import TYPE_CHECKING, Dict, List, Optional, Type, TypeVar, Union from antlr4.CommonTokenStream import CommonTokenStream from antlr4.error.ErrorStrategy import BailErrorStrategy @@ -51,11 +53,13 @@ from metadata.generated.schema.entity.teams.team import Team from metadata.generated.schema.entity.teams.user import User from metadata.generated.schema.tests.testCase import TestCase from metadata.generated.schema.tests.testSuite import TestSuite -from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.dispatch import class_register from metadata.utils.elasticsearch import get_entity_from_es_result from metadata.utils.logger import utils_logger +if TYPE_CHECKING: + from metadata.ingestion.ometa.ometa_api import OpenMetadata + logger = utils_logger() T = TypeVar("T", bound=BaseModel) @@ -866,3 +870,56 @@ def get_query_checksum(query: str) -> str: The checksum is used as the query's name. """ return hashlib.md5(query.encode()).hexdigest() + + +# Not adding container since children can have recursive slots: service.container1.container2... +FQN_ENTITY_SLOTS = { + Table.__name__: 4, + DatabaseSchema.__name__: 3, + Database.__name__: 2, + Dashboard.__name__: 2, + APICollection.__name__: 2, + Chart.__name__: 2, + MlModel.__name__: 2, + Topic.__name__: 2, + SearchIndex.__name__: 2, + Tag.__name__: 2, + DataModel.__name__: 2, + StoredProcedure.__name__: 4, + Pipeline.__name__: 2, +} + + +def prefix_entity_for_wildcard_search(entity_type: Type[T], fqn: str) -> str: + """ + Given an entity type and an FQN, return the FQN prefixed with wildcards + to match any parent hierarchy leading to that entity. + + For example, for a Topic with FQN "potato", return "*.potato" to match + the topic in any service. For a Table with FQN "schema.table", return + "*.*.schema.table" to match the table in any service and database. + + Args: + entity_type: The entity type to match. + fqn: The FQN to prefix. + + Returns: + The prefixed FQN with wildcards for missing parent levels. + """ + slots = FQN_ENTITY_SLOTS.get(entity_type.__name__) + if not slots: + raise FQNBuildingException( + f"Entity type {entity_type.__name__} not supported for wildcard search" + ) + + parts = split(fqn) + if len(parts) > slots: + raise FQNBuildingException( + f"FQN {fqn} has too many parts ({len(parts)})" + f"for entity type {entity_type.__name__} (expected {slots} or fewer)" + ) + + # Add wildcards for missing parent levels + wildcards_needed = slots - len(parts) + prefixed_parts = ["*"] * wildcards_needed + parts + return _build(*prefixed_parts, quote=True) diff --git a/ingestion/tests/unit/test_fqn.py b/ingestion/tests/unit/test_fqn.py index b4b3b608146..d8cbd12ecb5 100644 --- a/ingestion/tests/unit/test_fqn.py +++ b/ingestion/tests/unit/test_fqn.py @@ -16,7 +16,18 @@ from unittest.mock import MagicMock import pytest -from metadata.generated.schema.entity.data.table import Column, Table +from metadata.generated.schema.entity.classification.tag import Tag +from metadata.generated.schema.entity.data.apiCollection import APICollection +from metadata.generated.schema.entity.data.chart import Chart +from metadata.generated.schema.entity.data.dashboard import Dashboard +from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema +from metadata.generated.schema.entity.data.mlmodel import MlModel +from metadata.generated.schema.entity.data.pipeline import Pipeline +from metadata.generated.schema.entity.data.searchIndex import SearchIndex +from metadata.generated.schema.entity.data.storedProcedure import StoredProcedure +from metadata.generated.schema.entity.data.table import Column, DataModel, Table +from metadata.generated.schema.entity.data.topic import Topic from metadata.generated.schema.type.basic import FullyQualifiedEntityName from metadata.ingestion.models.custom_basemodel_validation import ( RESERVED_ARROW_KEYWORD, @@ -288,3 +299,130 @@ class TestFqn(TestCase): ) expected3 = f"bigquery.my-project.dataset.events_2024${RESERVED_QUOTE_KEYWORD}daily{RESERVED_QUOTE_KEYWORD}" self.assertEqual(result3, expected3) + + def test_prefix_entity_for_wildcard_search(self): + """Test wildcard search prefix generation for all supported entity types""" + + # Table (4 slots: service.database.schema.table) + # Full FQN - no wildcards needed + table_fqn = "my_service.my_db.my_schema.my_table" + result = fqn.prefix_entity_for_wildcard_search(Table, table_fqn) + self.assertEqual(result, "my_service.my_db.my_schema.my_table") + + # Table with partial FQN - needs wildcards + table_fqn_partial = "my_schema.my_table" + result = fqn.prefix_entity_for_wildcard_search(Table, table_fqn_partial) + self.assertEqual(result, "*.*.my_schema.my_table") + + # Table with just table name - needs all wildcards + table_fqn_minimal = "my_table" + result = fqn.prefix_entity_for_wildcard_search(Table, table_fqn_minimal) + self.assertEqual(result, "*.*.*.my_table") + + # Table with quoted parts + table_fqn_quoted = '"my.schema".my_table' + result_quoted = fqn.prefix_entity_for_wildcard_search(Table, table_fqn_quoted) + self.assertEqual(result_quoted, '*.*."my.schema".my_table') + + # DatabaseSchema (3 slots: service.database.schema) + schema_fqn = "public" + result = fqn.prefix_entity_for_wildcard_search(DatabaseSchema, schema_fqn) + self.assertEqual(result, "*.*.public") + + schema_fqn_full = "postgres_service.analytics_db.public" + result = fqn.prefix_entity_for_wildcard_search(DatabaseSchema, schema_fqn_full) + self.assertEqual(result, "postgres_service.analytics_db.public") + + # Database (2 slots: service.database) + database_fqn = "production_db" + result = fqn.prefix_entity_for_wildcard_search(Database, database_fqn) + self.assertEqual(result, "*.production_db") + + database_fqn_full = "mysql_service.production_db" + result = fqn.prefix_entity_for_wildcard_search(Database, database_fqn_full) + self.assertEqual(result, "mysql_service.production_db") + + # Dashboard (2 slots: service.dashboard) + dashboard_fqn = "sales_dashboard" + result = fqn.prefix_entity_for_wildcard_search(Dashboard, dashboard_fqn) + self.assertEqual(result, "*.sales_dashboard") + + # APICollection (2 slots: service.collection) + api_collection_fqn = "users_api" + result = fqn.prefix_entity_for_wildcard_search( + APICollection, api_collection_fqn + ) + self.assertEqual(result, "*.users_api") + + # Chart (2 slots: service.chart) + chart_fqn = "revenue_chart" + result = fqn.prefix_entity_for_wildcard_search(Chart, chart_fqn) + self.assertEqual(result, "*.revenue_chart") + + # MlModel (2 slots: service.model) + mlmodel_fqn = "fraud_detection_model" + result = fqn.prefix_entity_for_wildcard_search(MlModel, mlmodel_fqn) + self.assertEqual(result, "*.fraud_detection_model") + + # Topic (2 slots: service.topic) + topic_fqn = "potato" + result = fqn.prefix_entity_for_wildcard_search(Topic, topic_fqn) + self.assertEqual(result, "*.potato") + + topic_fqn_full = "kafka.user_events" + result = fqn.prefix_entity_for_wildcard_search(Topic, topic_fqn_full) + self.assertEqual(result, "kafka.user_events") + + # SearchIndex (2 slots: service.index) + search_index_fqn = "product_index" + result = fqn.prefix_entity_for_wildcard_search(SearchIndex, search_index_fqn) + self.assertEqual(result, "*.product_index") + + # Tag (2 slots: classification.tag) + tag_fqn = "Sensitive" + result = fqn.prefix_entity_for_wildcard_search(Tag, tag_fqn) + self.assertEqual(result, "*.Sensitive") + + tag_fqn_full = "PII.Sensitive" + result = fqn.prefix_entity_for_wildcard_search(Tag, tag_fqn_full) + self.assertEqual(result, "PII.Sensitive") + + # DataModel (2 slots: service.model) + data_model_fqn = "customer_model" + result = fqn.prefix_entity_for_wildcard_search(DataModel, data_model_fqn) + self.assertEqual(result, "*.customer_model") + + # StoredProcedure (4 slots: service.database.schema.procedure) + stored_proc_fqn = "calculate_revenue" + result = fqn.prefix_entity_for_wildcard_search(StoredProcedure, stored_proc_fqn) + self.assertEqual(result, "*.*.*.calculate_revenue") + + stored_proc_fqn_partial = "public.calculate_revenue" + result = fqn.prefix_entity_for_wildcard_search( + StoredProcedure, stored_proc_fqn_partial + ) + self.assertEqual(result, "*.*.public.calculate_revenue") + + stored_proc_fqn_full = "oracle.sales_db.public.calculate_revenue" + result = fqn.prefix_entity_for_wildcard_search( + StoredProcedure, stored_proc_fqn_full + ) + self.assertEqual(result, "oracle.sales_db.public.calculate_revenue") + + # Pipeline (2 slots: service.pipeline) + pipeline_fqn = "daily_ingestion" + result = fqn.prefix_entity_for_wildcard_search(Pipeline, pipeline_fqn) + self.assertEqual(result, "*.daily_ingestion") + + # Test error cases + # FQN with too many parts + with pytest.raises(fqn.FQNBuildingException) as exc: + fqn.prefix_entity_for_wildcard_search( + Table, "service.db.schema.table.extra" + ) + assert "has too many parts" in str(exc.value) + + # Test unsupported entity type (Column doesn't have slots defined) + with pytest.raises(fqn.FQNBuildingException) as exc: + fqn.prefix_entity_for_wildcard_search(Column, "column") + assert "not supported for wildcard search" in str(exc.value)