diff --git a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/metadata.py index 114ef23deae..2ff2af8407d 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/metadata.py @@ -156,42 +156,78 @@ class KafkaconnectSource(PipelineServiceSource): dataset_details = pipeline_details.dataset if dataset_details: if dataset_details.dataset_type == Table: - for ( - dbservicename - ) in self.source_config.lineageInformation.dbServiceNames or ["*"]: - dataset_entity = self.metadata.get_by_name( - entity=dataset_details.dataset_type, - fqn=fqn.build( - metadata=self.metadata, - entity_type=dataset_details.dataset_type, - table_name=dataset_details.table, - database_name=None, - schema_name=dataset_details.database, - service_name=dbservicename, - ), - ) + # If dbServiceNames is configured, use it to build FQN directly + if self.source_config.lineageInformation.dbServiceNames: + for ( + dbservicename + ) in self.source_config.lineageInformation.dbServiceNames: + dataset_entity = self.metadata.get_by_name( + entity=dataset_details.dataset_type, + fqn=fqn.build( + metadata=self.metadata, + entity_type=dataset_details.dataset_type, + table_name=dataset_details.table, + database_name=None, + schema_name=dataset_details.database, + service_name=dbservicename, + ), + ) + if dataset_entity: + return dataset_entity + else: + # Search across all database services + logger.info( + f"dbServiceNames not configured - searching all database services for table {dataset_details.table}" + ) + # Build search string: schema.table format + search_string = ( + f"{dataset_details.database}.{dataset_details.table}" + if dataset_details.database + else dataset_details.table + ) + dataset_entity = self.metadata.search_in_any_service( + entity_type=Table, + fqn_search_string=search_string, + ) if dataset_entity: + logger.debug( + f"Found table {dataset_details.table} via search in service {dataset_entity.service.name if dataset_entity.service else 'unknown'}" + ) return dataset_entity if dataset_details.dataset_type == Container: - for ( - storageservicename - ) in self.source_config.lineageInformation.storageServiceNames or [ - "*" - ]: - storage_entity = self.metadata.get_by_name( - entity=dataset_details.dataset_type, - fqn=fqn.build( - metadata=self.metadata, - entity_type=dataset_details.dataset_type, - container_name=dataset_details.container_name, - service_name=storageservicename, - parent_container=None, - ), - ) + # If storageServiceNames is configured, use it to build FQN directly + if self.source_config.lineageInformation.storageServiceNames: + for ( + storageservicename + ) in self.source_config.lineageInformation.storageServiceNames: + storage_entity = self.metadata.get_by_name( + entity=dataset_details.dataset_type, + fqn=fqn.build( + metadata=self.metadata, + entity_type=dataset_details.dataset_type, + container_name=dataset_details.container_name, + service_name=storageservicename, + parent_container=None, + ), + ) + if storage_entity: + return storage_entity + else: + # Search across all storage services + logger.info( + f"storageServiceNames not configured - searching all storage services for container {dataset_details.container_name}" + ) + storage_entity = self.metadata.search_in_any_service( + entity_type=Container, + fqn_search_string=dataset_details.container_name, + ) if storage_entity: + logger.debug( + f"Found container {dataset_details.container_name} via search in service {storage_entity.service.name if storage_entity.service else 'unknown'}" + ) return storage_entity except Exception as exc: @@ -599,11 +635,23 @@ class KafkaconnectSource(PipelineServiceSource): f"for pattern: {database_server_name}.*" ) - # List topics from the configured messaging service only + # Build params for topic query + params = {} + if self.service_connection.messagingServiceName: + # Filter by specific service if configured + params["service"] = self.service_connection.messagingServiceName + logger.debug( + f"Searching topics in service: {self.service_connection.messagingServiceName}" + ) + else: + # Search all messaging services + logger.debug("Searching topics across all messaging services") + + # List topics from messaging service(s) topics_list = self.metadata.list_entities( entity=Topic, fields=["name", "fullyQualifiedName", "service"], - params={"service": self.service_connection.messagingServiceName}, + params=params if params else None, ).entities # Filter topics that match the CDC naming pattern @@ -633,8 +681,9 @@ class KafkaconnectSource(PipelineServiceSource): """ try: if not self.service_connection.messagingServiceName: - logger.debug("Kafka messagingServiceName not found") - return + logger.info( + "messagingServiceName not configured - will search all messaging services for topics" + ) pipeline_fqn = fqn.build( metadata=self.metadata, @@ -671,16 +720,32 @@ class KafkaconnectSource(PipelineServiceSource): ) for topic in topics_to_process: - topic_fqn = fqn.build( - metadata=self.metadata, - entity_type=Topic, - service_name=self.service_connection.messagingServiceName, - topic_name=str(topic.name), - ) + topic_entity = None - topic_entity = self.metadata.get_by_name(entity=Topic, fqn=topic_fqn) + # If messagingServiceName is configured, use it to build FQN directly + if self.service_connection.messagingServiceName: + topic_fqn = fqn.build( + metadata=self.metadata, + entity_type=Topic, + service_name=self.service_connection.messagingServiceName, + topic_name=str(topic.name), + ) + topic_entity = self.metadata.get_by_name( + entity=Topic, fqn=topic_fqn + ) + else: + # Search across all messaging services + topic_entity = self.metadata.search_in_any_service( + entity_type=Topic, + fqn_search_string=str(topic.name), + ) + if topic_entity: + logger.debug( + f"Found topic {topic.name} via search in service {topic_entity.service.name if topic_entity.service else 'unknown'}" + ) if topic_entity is None: + logger.debug(f"Topic {topic.name} not found in OpenMetadata") continue # If no dataset entity from config, try to parse table info from CDC topic name @@ -698,25 +763,40 @@ class KafkaconnectSource(PipelineServiceSource): f"Parsed CDC topic {topic.name}: database={topic_info['database']}, table={topic_info['table']}" ) # Try to find the table entity - for ( - dbservicename - ) in self.source_config.lineageInformation.dbServiceNames or [ - "*" - ]: - table_fqn = fqn.build( - metadata=self.metadata, - entity_type=Table, - table_name=topic_info["table"], - database_name=None, - schema_name=topic_info["database"], - service_name=dbservicename, + if self.source_config.lineageInformation.dbServiceNames: + # Use specific database services if configured + for ( + dbservicename + ) in self.source_config.lineageInformation.dbServiceNames: + table_fqn = fqn.build( + metadata=self.metadata, + entity_type=Table, + table_name=topic_info["table"], + database_name=None, + schema_name=topic_info["database"], + service_name=dbservicename, + ) + current_dataset_entity = self.metadata.get_by_name( + entity=Table, fqn=table_fqn + ) + if current_dataset_entity: + logger.debug(f"Found table entity: {table_fqn}") + break + else: + # Search across all database services + search_string = ( + f"{topic_info['database']}.{topic_info['table']}" ) - current_dataset_entity = self.metadata.get_by_name( - entity=Table, fqn=table_fqn + current_dataset_entity = ( + self.metadata.search_in_any_service( + entity_type=Table, + fqn_search_string=search_string, + ) ) if current_dataset_entity: - logger.debug(f"Found table entity: {table_fqn}") - break + logger.debug( + f"Found table {search_string} via search in service {current_dataset_entity.service.name if current_dataset_entity.service else 'unknown'}" + ) if current_dataset_entity is None: # No table entity found, skip this topic