Fix Kafka Connect for lineage parsing (#23819)

* Fix Kafka Connect for lineage parsing

* Fix Kafka Connect for lineage parsing
This commit is contained in:
Sriharsha Chintalapani 2025-10-09 17:01:36 -04:00 committed by GitHub
parent 88115e1218
commit 76020bd0e7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -156,42 +156,78 @@ class KafkaconnectSource(PipelineServiceSource):
dataset_details = pipeline_details.dataset dataset_details = pipeline_details.dataset
if dataset_details: if dataset_details:
if dataset_details.dataset_type == Table: if dataset_details.dataset_type == Table:
for ( # If dbServiceNames is configured, use it to build FQN directly
dbservicename if self.source_config.lineageInformation.dbServiceNames:
) in self.source_config.lineageInformation.dbServiceNames or ["*"]: for (
dataset_entity = self.metadata.get_by_name( dbservicename
entity=dataset_details.dataset_type, ) in self.source_config.lineageInformation.dbServiceNames:
fqn=fqn.build( dataset_entity = self.metadata.get_by_name(
metadata=self.metadata, entity=dataset_details.dataset_type,
entity_type=dataset_details.dataset_type, fqn=fqn.build(
table_name=dataset_details.table, metadata=self.metadata,
database_name=None, entity_type=dataset_details.dataset_type,
schema_name=dataset_details.database, table_name=dataset_details.table,
service_name=dbservicename, database_name=None,
), schema_name=dataset_details.database,
) service_name=dbservicename,
),
)
if dataset_entity:
return dataset_entity
else:
# Search across all database services
logger.info(
f"dbServiceNames not configured - searching all database services for table {dataset_details.table}"
)
# Build search string: schema.table format
search_string = (
f"{dataset_details.database}.{dataset_details.table}"
if dataset_details.database
else dataset_details.table
)
dataset_entity = self.metadata.search_in_any_service(
entity_type=Table,
fqn_search_string=search_string,
)
if dataset_entity: if dataset_entity:
logger.debug(
f"Found table {dataset_details.table} via search in service {dataset_entity.service.name if dataset_entity.service else 'unknown'}"
)
return dataset_entity return dataset_entity
if dataset_details.dataset_type == Container: if dataset_details.dataset_type == Container:
for ( # If storageServiceNames is configured, use it to build FQN directly
storageservicename if self.source_config.lineageInformation.storageServiceNames:
) in self.source_config.lineageInformation.storageServiceNames or [ for (
"*" storageservicename
]: ) in self.source_config.lineageInformation.storageServiceNames:
storage_entity = self.metadata.get_by_name( storage_entity = self.metadata.get_by_name(
entity=dataset_details.dataset_type, entity=dataset_details.dataset_type,
fqn=fqn.build( fqn=fqn.build(
metadata=self.metadata, metadata=self.metadata,
entity_type=dataset_details.dataset_type, entity_type=dataset_details.dataset_type,
container_name=dataset_details.container_name, container_name=dataset_details.container_name,
service_name=storageservicename, service_name=storageservicename,
parent_container=None, parent_container=None,
), ),
) )
if storage_entity:
return storage_entity
else:
# Search across all storage services
logger.info(
f"storageServiceNames not configured - searching all storage services for container {dataset_details.container_name}"
)
storage_entity = self.metadata.search_in_any_service(
entity_type=Container,
fqn_search_string=dataset_details.container_name,
)
if storage_entity: if storage_entity:
logger.debug(
f"Found container {dataset_details.container_name} via search in service {storage_entity.service.name if storage_entity.service else 'unknown'}"
)
return storage_entity return storage_entity
except Exception as exc: except Exception as exc:
@ -599,11 +635,23 @@ class KafkaconnectSource(PipelineServiceSource):
f"for pattern: {database_server_name}.*" f"for pattern: {database_server_name}.*"
) )
# List topics from the configured messaging service only # Build params for topic query
params = {}
if self.service_connection.messagingServiceName:
# Filter by specific service if configured
params["service"] = self.service_connection.messagingServiceName
logger.debug(
f"Searching topics in service: {self.service_connection.messagingServiceName}"
)
else:
# Search all messaging services
logger.debug("Searching topics across all messaging services")
# List topics from messaging service(s)
topics_list = self.metadata.list_entities( topics_list = self.metadata.list_entities(
entity=Topic, entity=Topic,
fields=["name", "fullyQualifiedName", "service"], fields=["name", "fullyQualifiedName", "service"],
params={"service": self.service_connection.messagingServiceName}, params=params if params else None,
).entities ).entities
# Filter topics that match the CDC naming pattern # Filter topics that match the CDC naming pattern
@ -633,8 +681,9 @@ class KafkaconnectSource(PipelineServiceSource):
""" """
try: try:
if not self.service_connection.messagingServiceName: if not self.service_connection.messagingServiceName:
logger.debug("Kafka messagingServiceName not found") logger.info(
return "messagingServiceName not configured - will search all messaging services for topics"
)
pipeline_fqn = fqn.build( pipeline_fqn = fqn.build(
metadata=self.metadata, metadata=self.metadata,
@ -671,16 +720,32 @@ class KafkaconnectSource(PipelineServiceSource):
) )
for topic in topics_to_process: for topic in topics_to_process:
topic_fqn = fqn.build( topic_entity = None
metadata=self.metadata,
entity_type=Topic,
service_name=self.service_connection.messagingServiceName,
topic_name=str(topic.name),
)
topic_entity = self.metadata.get_by_name(entity=Topic, fqn=topic_fqn) # If messagingServiceName is configured, use it to build FQN directly
if self.service_connection.messagingServiceName:
topic_fqn = fqn.build(
metadata=self.metadata,
entity_type=Topic,
service_name=self.service_connection.messagingServiceName,
topic_name=str(topic.name),
)
topic_entity = self.metadata.get_by_name(
entity=Topic, fqn=topic_fqn
)
else:
# Search across all messaging services
topic_entity = self.metadata.search_in_any_service(
entity_type=Topic,
fqn_search_string=str(topic.name),
)
if topic_entity:
logger.debug(
f"Found topic {topic.name} via search in service {topic_entity.service.name if topic_entity.service else 'unknown'}"
)
if topic_entity is None: if topic_entity is None:
logger.debug(f"Topic {topic.name} not found in OpenMetadata")
continue continue
# If no dataset entity from config, try to parse table info from CDC topic name # If no dataset entity from config, try to parse table info from CDC topic name
@ -698,25 +763,40 @@ class KafkaconnectSource(PipelineServiceSource):
f"Parsed CDC topic {topic.name}: database={topic_info['database']}, table={topic_info['table']}" f"Parsed CDC topic {topic.name}: database={topic_info['database']}, table={topic_info['table']}"
) )
# Try to find the table entity # Try to find the table entity
for ( if self.source_config.lineageInformation.dbServiceNames:
dbservicename # Use specific database services if configured
) in self.source_config.lineageInformation.dbServiceNames or [ for (
"*" dbservicename
]: ) in self.source_config.lineageInformation.dbServiceNames:
table_fqn = fqn.build( table_fqn = fqn.build(
metadata=self.metadata, metadata=self.metadata,
entity_type=Table, entity_type=Table,
table_name=topic_info["table"], table_name=topic_info["table"],
database_name=None, database_name=None,
schema_name=topic_info["database"], schema_name=topic_info["database"],
service_name=dbservicename, service_name=dbservicename,
)
current_dataset_entity = self.metadata.get_by_name(
entity=Table, fqn=table_fqn
)
if current_dataset_entity:
logger.debug(f"Found table entity: {table_fqn}")
break
else:
# Search across all database services
search_string = (
f"{topic_info['database']}.{topic_info['table']}"
) )
current_dataset_entity = self.metadata.get_by_name( current_dataset_entity = (
entity=Table, fqn=table_fqn self.metadata.search_in_any_service(
entity_type=Table,
fqn_search_string=search_string,
)
) )
if current_dataset_entity: if current_dataset_entity:
logger.debug(f"Found table entity: {table_fqn}") logger.debug(
break f"Found table {search_string} via search in service {current_dataset_entity.service.name if current_dataset_entity.service else 'unknown'}"
)
if current_dataset_entity is None: if current_dataset_entity is None:
# No table entity found, skip this topic # No table entity found, skip this topic