diff --git a/ingestion/src/metadata/utils/fqn.py b/ingestion/src/metadata/utils/fqn.py index dbcc4a12229..25ebf1e3302 100644 --- a/ingestion/src/metadata/utils/fqn.py +++ b/ingestion/src/metadata/utils/fqn.py @@ -290,16 +290,33 @@ def _(_: Optional[OpenMetadata], *, table_fqn: str) -> str: @fqn_build_registry.add(Topic) def _( - _: Optional[OpenMetadata], # ES Index not necessary for Topic FQN building + metadata: Optional[OpenMetadata], *, service_name: str, topic_name: str, -) -> str: - if not service_name or not topic_name: + skip_es_search: bool = True, +) -> Optional[str]: + entity: Optional[Topic] = None + + if not skip_es_search: + entity = search_topic_from_es( + metadata=metadata, service_name=service_name, topic_name=topic_name + ) + + # if entity not found in ES proceed to build FQN with database_name and schema_name + if not entity and service_name and topic_name: + fqn = _build(service_name, topic_name) + return fqn + + if entity: + return str(entity.fullyQualifiedName.root) + + if not all([service_name, topic_name]): raise FQNBuildingException( f"Args should be informed, but got service=`{service_name}`, topic=`{topic_name}``" ) - return _build(service_name, topic_name) + + return None @fqn_build_registry.add(Container) @@ -681,6 +698,34 @@ def search_database_from_es( ) +def search_topic_from_es( + metadata: OpenMetadata, + topic_name: str, + service_name: Optional[str], + fields: Optional[str] = None, +): + """ + Search Topic entity from ES + """ + + if not topic_name: + raise FQNBuildingException( + f"Topic Name should be informed, but got topic=`{topic_name}`" + ) + + fqn_search_string = _build(service_name or "*", topic_name) + + es_result = metadata.es_search_from_fqn( + entity_type=Topic, + fqn_search_string=fqn_search_string, + fields=fields, + ) + + return get_entity_from_es_result( + entity_list=es_result, fetch_multiple_entities=False + ) + + def get_query_checksum(query: str) -> str: """ Prepare the query checksum from its string representation.