From ce3a9bd654b529d87be97a5c69e53f1cf47b58da Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Sat, 11 Oct 2025 13:26:14 -0700 Subject: [PATCH] Kafka connect improvements (#23845) * Kafka Connect Lineage Improvements * Remove specific Kafka topic example from docstring Removed example from the documentation regarding the earnin.bank.dev topic. * fix: update comment to reflect accurate example for database server name handling * fix: improve expected FQN display in warning messages for missing Kafka topics * fix: update table entity retrieval method in KafkaconnectSource * fix: enhance lineage information checks and improve logging for missing configurations in KafkaconnectSource * Kafka Connect Lineage Improvements * address comments; work without the table.include.list --------- Co-authored-by: Ayush Shah --- .../source/pipeline/kafkaconnect/client.py | 70 +- .../source/pipeline/kafkaconnect/metadata.py | 1221 +++++++++++++++-- .../source/pipeline/kafkaconnect/models.py | 3 + .../topology/pipeline/test_kafkaconnect.py | 30 + .../test_kafkaconnect_service_discovery.py | 428 ++++++ 5 files changed, 1624 insertions(+), 128 deletions(-) create mode 100644 ingestion/tests/unit/topology/pipeline/test_kafkaconnect_service_discovery.py diff --git a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/client.py b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/client.py index 35a55b3351d..ccd74141a7e 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/client.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/client.py @@ -43,7 +43,6 @@ def parse_cdc_topic_name(topic_name: str, database_server_name: str = None) -> d - Examples: - MysqlKafkaV2.ecommerce.orders -> database=ecommerce, table=orders - PostgresKafkaCDC.public.orders -> database=public, table=orders - - ecommerce.customers -> database=ecommerce, table=customers (if server-name matches) Args: topic_name: The Kafka topic name @@ -59,30 +58,47 @@ def parse_cdc_topic_name(topic_name: str, database_server_name: str = None) -> d if topic_name.startswith(("_", "dbhistory.", "__")): return {} + # If database_server_name is provided, check if topic starts with it + # This handles server names with dots like "collate.ecommerce.dev" + if database_server_name: + # Check if topic starts with the server name prefix + server_prefix = database_server_name + "." + if topic_name.startswith(server_prefix): + # Strip the server name prefix to get schema.table or just table + remaining = topic_name[len(server_prefix) :] + remaining_parts = remaining.split(".") + + if len(remaining_parts) == 2: + # Pattern: {server-name}.{schema}.{table} + database, table = remaining_parts + return {"database": database, "table": table} + elif len(remaining_parts) == 1: + # Pattern: {server-name}.{table} (no explicit schema) + return {"database": database_server_name, "table": remaining_parts[0]} + + # Check if topic exactly matches server name (edge case) + if topic_name.lower() == database_server_name.lower(): + return {} + + # Fallback: try to parse without server name parts = topic_name.split(".") # Pattern: {prefix}.{database}.{table} (3 parts) if len(parts) == 3: prefix, database, table = parts - # Verify prefix matches server name if provided - if database_server_name and prefix.lower() != database_server_name.lower(): - # Might be schema.database.table for some connectors - pass return {"database": database, "table": table} # Pattern: {database}.{table} (2 parts) elif len(parts) == 2: database, table = parts - # Only accept if server name matches or not provided - if database_server_name and database.lower() == database_server_name.lower(): - # This is server_name.table, so database is the server name - return {"database": database, "table": table} - # Or accept as database.table return {"database": database, "table": table} - # Pattern: just {table} (1 part) - use server name as database - elif len(parts) == 1 and database_server_name: - return {"database": database_server_name, "table": topic_name} + # Pattern: just {table} (1 part) + elif len(parts) == 1: + if database_server_name: + return {"database": database_server_name, "table": topic_name} + # Without server name, we can't determine the database + return {} return {} @@ -139,6 +155,34 @@ SUPPORTED_DATASETS = { "container_name": ConnectorConfigKeys.CONTAINER_KEYS, } +# Map Kafka Connect connector class names to OpenMetadata service types +CONNECTOR_CLASS_TO_SERVICE_TYPE = { + "MySqlCdcSource": "Mysql", + "MySqlCdcSourceV2": "Mysql", + "PostgresCdcSource": "Postgres", + "PostgresSourceConnector": "Postgres", + "SqlServerCdcSource": "Mssql", + "MongoDbCdcSource": "MongoDB", + "OracleCdcSource": "Oracle", + "Db2CdcSource": "Db2", +} + +# Map service types to hostname config keys +SERVICE_TYPE_HOSTNAME_KEYS = { + "Mysql": ["database.hostname", "connection.host"], + "Postgres": ["database.hostname", "connection.host"], + "Mssql": ["database.hostname"], + "MongoDB": ["mongodb.connection.uri", "connection.uri"], + "Oracle": ["database.hostname"], +} + +# Map service types to broker/endpoint config keys for messaging services +MESSAGING_ENDPOINT_KEYS = [ + "kafka.endpoint", + "bootstrap.servers", + "kafka.bootstrap.servers", +] + class KafkaConnectClient: """ diff --git a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/metadata.py index c3a2654d372..3d4569732ff 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/metadata.py @@ -30,9 +30,11 @@ from metadata.generated.schema.entity.data.topic import Topic from metadata.generated.schema.entity.services.connections.pipeline.kafkaConnectConnection import ( KafkaConnectConnection, ) +from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.ingestionPipelines.status import ( StackTraceError, ) +from metadata.generated.schema.entity.services.messagingService import MessagingService from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) @@ -54,7 +56,12 @@ from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.lineage.sql_lineage import get_column_fqn from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus from metadata.ingestion.ometa.ometa_api import OpenMetadata, T -from metadata.ingestion.source.pipeline.kafkaconnect.client import parse_cdc_topic_name +from metadata.ingestion.source.pipeline.kafkaconnect.client import ( + CONNECTOR_CLASS_TO_SERVICE_TYPE, + MESSAGING_ENDPOINT_KEYS, + SERVICE_TYPE_HOSTNAME_KEYS, + parse_cdc_topic_name, +) from metadata.ingestion.source.pipeline.kafkaconnect.models import ( ConnectorType, KafkaConnectPipelineDetails, @@ -98,6 +105,14 @@ class KafkaconnectSource(PipelineServiceSource): Pipeline metadata from Kafka Connect """ + def __init__(self, config: WorkflowSource, metadata: OpenMetadata): + super().__init__(config, metadata) + # Track lineage results for summary reporting + self.lineage_results = [] + # Cache services for hostname matching (lazy loaded) + self._database_services_cache = None + self._messaging_services_cache = None + @classmethod def create( cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None @@ -110,6 +125,257 @@ class KafkaconnectSource(PipelineServiceSource): ) return cls(config, metadata) + @property + def database_services(self) -> List[DatabaseService]: + """Lazily load and cache database services for hostname matching""" + if self._database_services_cache is None: + self._database_services_cache = list( + self.metadata.list_all_entities(entity=DatabaseService, limit=100) + ) + logger.debug( + f"Cached {len(self._database_services_cache)} database services for hostname matching" + ) + return self._database_services_cache + + @property + def messaging_services(self) -> List[MessagingService]: + """Lazily load and cache messaging services for broker matching""" + if self._messaging_services_cache is None: + self._messaging_services_cache = list( + self.metadata.list_all_entities(entity=MessagingService, limit=100) + ) + logger.debug( + f"Cached {len(self._messaging_services_cache)} messaging services for broker matching" + ) + return self._messaging_services_cache + + def _extract_hostname(self, host_string: str) -> str: + """ + Extract just the hostname from a connection string by removing protocol and port. + + Args: + host_string: Connection string (e.g., "SASL_SSL://host:9092", "host:3306", "host") + + Returns: + Just the hostname part (e.g., "host") + """ + if not host_string: + return "" + + # Remove protocol prefix (e.g., "SASL_SSL://", "http://", "jdbc:mysql://") + if "://" in host_string: + host_string = host_string.split("://", 1)[1] + + # Remove port suffix (e.g., ":9092", ":3306") + if ":" in host_string: + host_string = host_string.split(":")[0] + + return host_string.strip() + + def find_database_service_by_hostname( + self, service_type: str, hostname: str + ) -> Optional[str]: + """ + Find database service by matching serviceType and hostname. + + Args: + service_type: OpenMetadata service type (e.g., "Mysql", "Postgres") + hostname: Hostname from Kafka Connect config (e.g., "localhost:3306", "db.example.com") + + Returns: + Service name if found, None otherwise + """ + try: + # Use cached database services + all_services = self.database_services + + # Filter by serviceType first to reduce the search space + filtered_services = [ + svc + for svc in all_services + if svc.serviceType and svc.serviceType.value == service_type + ] + + logger.debug( + f"Found {len(filtered_services)} services with serviceType={service_type} " + f"out of {len(all_services)} total database services" + ) + + # Extract just the hostname (no protocol, no port) + connector_host = self._extract_hostname(hostname).lower() + + # Match by hostname in service connection config + for service in filtered_services: + if not service.connection or not service.connection.config: + continue + + service_config = service.connection.config + + # Extract hostPort from service config + # Different services use different field names + host_port = None + if hasattr(service_config, "hostPort") and service_config.hostPort: + host_port = service_config.hostPort + elif hasattr(service_config, "host") and service_config.host: + host_port = service_config.host + + if host_port: + # Extract just the hostname (no protocol, no port) + service_host = self._extract_hostname(host_port).lower() + + # Match hostname (case-insensitive) + if service_host == connector_host: + logger.info( + f"Matched database service: {service.name} " + f"(type={service_type}, hostname={connector_host})" + ) + return str( + service.name.root + if hasattr(service.name, "root") + else service.name + ) + + logger.debug( + f"No database service found matching serviceType={service_type}, hostname={connector_host}" + ) + return None + + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Unable to find database service by hostname: {exc}") + return None + + def find_messaging_service_by_brokers(self, brokers: str) -> Optional[str]: + """ + Find messaging service by matching broker endpoints. + + Args: + brokers: Comma-separated broker list from Kafka Connect config + (e.g., "SASL_SSL://broker1:9092,broker2:9092") + + Returns: + Service name if found, None otherwise + """ + try: + # Use cached messaging services + all_services = self.messaging_services + + logger.debug(f"Searching for messaging service matching brokers: {brokers}") + + # Parse connector brokers into a set of hostnames (no protocol, no port) + connector_brokers = set( + self._extract_hostname(broker.strip()).lower() + for broker in brokers.split(",") + ) + + # Match by brokers in service connection config + for service in all_services: + if not service.connection or not service.connection.config: + continue + + service_config = service.connection.config + + # Extract bootstrapServers from Kafka connection + if ( + hasattr(service_config, "bootstrapServers") + and service_config.bootstrapServers + ): + # Parse service brokers into hostnames (no protocol, no port) + service_brokers = set( + self._extract_hostname(broker.strip()).lower() + for broker in service_config.bootstrapServers.split(",") + ) + + # Check if any broker hostname matches + matched_brokers = ( + connector_brokers & service_brokers + ) # Set intersection + if matched_brokers: + logger.info( + f"Matched messaging service: {service.name} " + f"(matched broker hostnames: {matched_brokers})" + ) + return str( + service.name.root + if hasattr(service.name, "root") + else service.name + ) + + logger.debug( + f"No messaging service found matching broker hostnames: {connector_brokers}" + ) + return None + + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Unable to find messaging service by brokers: {exc}") + return None + + def get_service_from_connector_config( + self, pipeline_details: KafkaConnectPipelineDetails + ) -> tuple[Optional[str], Optional[str]]: + """ + Extract and match database and messaging service names from connector configuration. + + Args: + pipeline_details: Kafka Connect pipeline details with config + + Returns: + Tuple of (database_service_name, messaging_service_name) + Either or both can be None if not found + """ + db_service_name = None + messaging_service_name = None + + if not pipeline_details.config: + return db_service_name, messaging_service_name + + try: + # Extract connector class to determine service type + connector_class = pipeline_details.config.get("connector.class", "") + + # Get the class name without package (e.g., "MySqlCdcSource" from "io.debezium.connector.mysql.MySqlCdcSource") + if connector_class: + class_name = connector_class.split(".")[-1] + service_type = CONNECTOR_CLASS_TO_SERVICE_TYPE.get(class_name) + + if service_type: + # Extract hostname from connector config + hostname_keys = SERVICE_TYPE_HOSTNAME_KEYS.get(service_type, []) + for key in hostname_keys: + hostname = pipeline_details.config.get(key) + if hostname: + logger.debug( + f"Found hostname '{hostname}' for service type '{service_type}' " + f"from config key '{key}'" + ) + # Match database service + db_service_name = self.find_database_service_by_hostname( + service_type=service_type, hostname=hostname + ) + if db_service_name: + break + + # Extract broker endpoints for messaging service + for key in MESSAGING_ENDPOINT_KEYS: + brokers = pipeline_details.config.get(key) + if brokers: + logger.debug(f"Found brokers '{brokers}' from config key '{key}'") + messaging_service_name = self.find_messaging_service_by_brokers( + brokers=brokers + ) + if messaging_service_name: + break + + return db_service_name, messaging_service_name + + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Unable to extract service names from connector config: {exc}" + ) + return None, None + def yield_pipeline( self, pipeline_details: KafkaConnectPipelineDetails ) -> Iterable[Either[CreatePipelineRequest]]: @@ -156,8 +422,38 @@ class KafkaconnectSource(PipelineServiceSource): dataset_details = pipeline_details.dataset if dataset_details: if dataset_details.dataset_type == Table: - # If dbServiceNames is configured, use it to build FQN directly - if self.source_config.lineageInformation.dbServiceNames: + # Try to match database service from connector config first + db_service_name, _ = self.get_service_from_connector_config( + pipeline_details + ) + + # Priority 1: Use matched service from connector config + if db_service_name: + logger.info( + f"Using matched database service '{db_service_name}' from connector config" + ) + dataset_entity = self.metadata.get_by_name( + entity=dataset_details.dataset_type, + fqn=fqn.build( + metadata=self.metadata, + entity_type=dataset_details.dataset_type, + table_name=dataset_details.table, + database_name=None, + schema_name=dataset_details.database, + service_name=db_service_name, + ), + ) + if dataset_entity: + return dataset_entity + + # Priority 2: Use configured dbServiceNames + if ( + hasattr(self.source_config, "lineageInformation") + and hasattr( + self.source_config.lineageInformation, "dbServiceNames" + ) + and self.source_config.lineageInformation.dbServiceNames + ): for ( dbservicename ) in self.source_config.lineageInformation.dbServiceNames: @@ -175,30 +471,36 @@ class KafkaconnectSource(PipelineServiceSource): if dataset_entity: return dataset_entity - else: - # Search across all database services - logger.info( - f"dbServiceNames not configured - searching all database services for table {dataset_details.table}" + + # Priority 3: Fallback to search across all database services + logger.info( + f"No service match found - searching all database services for table {dataset_details.table}" + ) + # Build search string: schema.table format (with proper quoting for special chars) + search_string = ( + f"{fqn.quote_name(dataset_details.database)}.{fqn.quote_name(dataset_details.table)}" + if dataset_details.database + else fqn.quote_name(dataset_details.table) + ) + dataset_entity = self.metadata.search_in_any_service( + entity_type=Table, + fqn_search_string=search_string, + ) + if dataset_entity: + logger.debug( + f"Found table {dataset_details.table} via search in service {dataset_entity.service.name if dataset_entity.service else 'unknown'}" ) - # Build search string: schema.table format - search_string = ( - f"{fqn.quote_name(dataset_details.database)}.{fqn.quote_name(dataset_details.table)}" - if dataset_details.database - else dataset_details.table - ) - dataset_entity = self.metadata.search_in_any_service( - entity_type=Table, - fqn_search_string=search_string, - ) - if dataset_entity: - logger.debug( - f"Found table {dataset_details.table} via search in service {dataset_entity.service.name if dataset_entity.service else 'unknown'}" - ) - return dataset_entity + return dataset_entity if dataset_details.dataset_type == Container: # If storageServiceNames is configured, use it to build FQN directly - if self.source_config.lineageInformation.storageServiceNames: + if ( + hasattr(self.source_config, "lineageInformation") + and hasattr( + self.source_config.lineageInformation, "storageServiceNames" + ) + and self.source_config.lineageInformation.storageServiceNames + ): for ( storageservicename ) in self.source_config.lineageInformation.storageServiceNames: @@ -222,7 +524,9 @@ class KafkaconnectSource(PipelineServiceSource): ) storage_entity = self.metadata.search_in_any_service( entity_type=Container, - fqn_search_string=dataset_details.container_name, + fqn_search_string=fqn.quote_name( + dataset_details.container_name + ), ) if storage_entity: logger.debug( @@ -612,64 +916,149 @@ class KafkaconnectSource(PipelineServiceSource): return None - def _query_cdc_topics_from_messaging_service( - self, database_server_name: str + def _search_topics_by_prefix( + self, database_server_name: str, messaging_service_name: Optional[str] = None ) -> List[KafkaConnectTopics]: """ - Query topics from messaging service and filter by CDC naming pattern. + Search for topics in the messaging service that match the database.server.name prefix. - Used for CDC connectors without explicit topic lists - discovers topics - by matching against database.server.name prefix. + This is a fallback when table.include.list is not configured in the connector. + It relies on topics being already ingested in the messaging service. Args: - database_server_name: The database.server.name or topic.prefix from connector config + database_server_name: The database.server.name prefix to search for + messaging_service_name: Optional messaging service name to narrow search Returns: - List of matching CDC topics + List of KafkaConnectTopics that match the prefix """ topics_found = [] try: - logger.debug( - f"CDC connector without topics list - querying messaging service " - f"for pattern: {database_server_name}.*" + if not database_server_name: + return topics_found + + logger.info( + f"Searching messaging service for topics with prefix: {database_server_name}" ) - # Build params for topic query - params = {} - if self.service_connection.messagingServiceName: - # Filter by specific service if configured - params["service"] = self.service_connection.messagingServiceName - logger.debug( - f"Searching topics in service: {self.service_connection.messagingServiceName}" + # Search for topics matching the prefix + # Use wildcard pattern: .".*" + search_pattern = f"{database_server_name}.*" + + if messaging_service_name: + # Search in specific messaging service + from metadata.utils import fqn as fqn_utils + + search_fqn = f"{fqn_utils.quote_name(messaging_service_name)}.{fqn_utils.quote_name(search_pattern)}" + logger.debug(f"Searching for topics with FQN pattern: {search_fqn}") + + # Get all topics from the messaging service + from metadata.generated.schema.entity.data.topic import Topic + + topics = list( + self.metadata.list_all_entities( + entity=Topic, + params={"service": messaging_service_name}, + ) + ) + + # Filter topics that start with the database_server_name prefix + for topic in topics: + topic_name = str( + topic.name.root if hasattr(topic.name, "root") else topic.name + ) + if topic_name.startswith(database_server_name + "."): + # Build full FQN for this topic + topic_fqn = ( + topic.fullyQualifiedName.root + if hasattr(topic.fullyQualifiedName, "root") + else topic.fullyQualifiedName + ) + topics_found.append( + KafkaConnectTopics(name=topic_name, fqn=topic_fqn) + ) + logger.debug( + f"Found matching topic: {topic_name} (FQN: {topic_fqn})" + ) + + if topics_found: + logger.info( + f"Found {len(topics_found)} topics matching prefix '{database_server_name}' " + f"in messaging service" ) else: - # Search all messaging services - logger.debug("Searching topics across all messaging services") - - # List topics from messaging service(s) - topics_list = self.metadata.list_entities( - entity=Topic, - fields=["name", "fullyQualifiedName", "service"], - params=params if params else None, - ).entities - - # Filter topics that match the CDC naming pattern - for topic_entity in topics_list or []: - topic_name = ( - topic_entity.name.root - if hasattr(topic_entity.name, "root") - else str(topic_entity.name) + logger.warning( + f"No topics found matching prefix '{database_server_name}'. " + f"Ensure the messaging service has ingested topics before running Kafka Connect ingestion." ) - # Parse the topic to see if it's a CDC topic related to this connector - topic_info = parse_cdc_topic_name(topic_name, database_server_name) - if topic_info: - topics_found.append(KafkaConnectTopics(name=topic_name)) - logger.debug(f"Matched CDC topic: {topic_name} -> {topic_info}") + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Unable to search topics by prefix: {exc}") + + return topics_found + + def _parse_cdc_topics_from_config( + self, pipeline_details: KafkaConnectPipelineDetails, database_server_name: str + ) -> List[KafkaConnectTopics]: + """ + Parse CDC topic names from connector config using table.include.list. + + For CDC connectors, topics follow pattern: {database.server.name}.{schema}.{table} + Extracts table list from config and constructs expected topic names. + + Args: + pipeline_details: Kafka Connect pipeline details with config + database_server_name: The database.server.name from connector config + + Returns: + List of KafkaConnectTopics with topic names + """ + topics_found = [] + + try: + if not pipeline_details.config: + return topics_found + + # Get table include list from connector config + table_include_list = None + for key in ["table.include.list", "table.whitelist"]: + if pipeline_details.config.get(key): + table_include_list = pipeline_details.config.get(key) + logger.debug( + f"Found table list from config key '{key}': {table_include_list}" + ) + break + + if not table_include_list: + logger.warning( + f"⚠️ CDC connector '{pipeline_details.name}' is missing table.include.list or table.whitelist.\n" + f" Without this configuration, lineage cannot be created automatically.\n" + f' Add to connector config: "table.include.list": "schema1.table1,schema2.table2"\n' + ) + return topics_found + + # Parse table list (format: "schema1.table1,schema2.table2") + for table_entry in table_include_list.split(","): + table_entry = table_entry.strip() + if not table_entry: + continue + + # Construct CDC topic name: {database.server.name}.{schema}.{table} + # table_entry is already "schema.table" format + topic_name = f"{database_server_name}.{table_entry}" + + topics_found.append(KafkaConnectTopics(name=topic_name)) + logger.debug(f"Parsed CDC topic from config: {topic_name}") + + logger.info( + f"Parsed {len(topics_found)} CDC topics from table.include.list" + ) except Exception as exc: - logger.debug(f"Unable to query topics from messaging service: {exc}") + logger.debug(traceback.format_exc()) + logger.warning(f"Unable to parse CDC topics from connector config: {exc}") return topics_found @@ -680,9 +1069,26 @@ class KafkaconnectSource(PipelineServiceSource): Get lineage between pipeline and data sources """ try: - if not self.service_connection.messagingServiceName: + # Try to match messaging service from connector config + _, messaging_service_name = self.get_service_from_connector_config( + pipeline_details + ) + + # Use matched service if found, otherwise fall back to configured name + effective_messaging_service = messaging_service_name or ( + self.service_connection.messagingServiceName + if hasattr(self.service_connection, "messagingServiceName") + else None + ) + + if effective_messaging_service: logger.info( - "messagingServiceName not configured - will search all messaging services for topics" + f"Using messaging service '{effective_messaging_service}' " + f"({'matched from config' if messaging_service_name else 'from configuration'})" + ) + else: + logger.info( + "No messaging service specified - will search all messaging services for topics" ) pipeline_fqn = fqn.build( @@ -706,8 +1112,8 @@ class KafkaconnectSource(PipelineServiceSource): "database.server.name" ) or pipeline_details.config.get("topic.prefix") - # For CDC connectors without explicit topics, query topics from messaging service - # and filter by CDC naming pattern + # For CDC connectors without explicit topics, parse topics from connector config + # using table.include.list and database.server.name # Only do this for Debezium CDC connectors (identified by database.server.name or topic.prefix) topics_to_process = pipeline_details.topics or [] if ( @@ -715,77 +1121,184 @@ class KafkaconnectSource(PipelineServiceSource): and database_server_name and pipeline_details.conn_type == ConnectorType.SOURCE.value ): - topics_to_process = self._query_cdc_topics_from_messaging_service( - database_server_name + # Try to parse topics from table.include.list first + topics_to_process = self._parse_cdc_topics_from_config( + pipeline_details=pipeline_details, + database_server_name=database_server_name, ) + # If table.include.list is not available, fallback to searching topics by prefix + # This requires topics to be already ingested in the messaging service + if not topics_to_process and effective_messaging_service: + logger.info( + f"Falling back to searching topics by prefix in messaging service '{effective_messaging_service}'" + ) + topics_to_process = self._search_topics_by_prefix( + database_server_name=database_server_name, + messaging_service_name=effective_messaging_service, + ) + for topic in topics_to_process: topic_entity = None - # If messagingServiceName is configured, use it to build FQN directly - if self.service_connection.messagingServiceName: + logger.info(f"Processing topic: {topic.name}") + + # If we have FQN from CDC topic discovery, use it directly + if topic.fqn: + logger.info(f"Searching for topic using pre-built FQN: {topic.fqn}") + topic_entity = self.metadata.get_by_name( + entity=Topic, fqn=topic.fqn + ) + if topic_entity: + logger.info(f"✓ Found topic using stored FQN: {topic.fqn}") + else: + logger.warning(f"✗ Topic NOT found using FQN: {topic.fqn}") + # If messaging service is known (matched or configured), use it to build FQN + elif effective_messaging_service: + # fqn.build() already quotes each component (service_name and topic_name) topic_fqn = fqn.build( metadata=self.metadata, entity_type=Topic, - service_name=self.service_connection.messagingServiceName, + service_name=effective_messaging_service, topic_name=str(topic.name), ) + logger.info( + f"Built topic FQN: {topic_fqn} " + f"(service={effective_messaging_service}, topic_name={topic.name})" + ) topic_entity = self.metadata.get_by_name( entity=Topic, fqn=topic_fqn ) + if topic_entity: + logger.info(f"✓ Found topic using built FQN: {topic_fqn}") + else: + logger.warning( + f"✗ Topic NOT found using FQN: {topic_fqn} " + f"(service={effective_messaging_service}, topic_name={topic.name})" + ) else: - # Search across all messaging services + # Fallback: Search across all messaging services + search_string = f"*.{fqn.quote_name(str(topic.name))}" + logger.info( + f"Searching for topic across all services using pattern: {search_string}" + ) topic_entity = self.metadata.search_in_any_service( entity_type=Topic, - fqn_search_string=str(topic.name), + fqn_search_string=search_string, ) if topic_entity: - logger.debug( - f"Found topic {topic.name} via search in service {topic_entity.service.name if topic_entity.service else 'unknown'}" + logger.info( + f"✓ Found topic via search: {topic.name} in service " + f"{topic_entity.service.name if topic_entity.service else 'unknown'}" ) + else: + logger.warning(f"✗ Topic NOT found via search: {search_string}") + # If topic not found, we'll still try to create table → pipeline lineage if topic_entity is None: - logger.debug(f"Topic {topic.name} not found in OpenMetadata") - continue + logger.warning( + f"Topic {topic.name} not found in OpenMetadata - will create direct table → pipeline lineage" + ) + else: + logger.info(f"✓ Successfully found topic entity: {topic.name}") # If no dataset entity from config, try to parse table info from CDC topic name current_dataset_entity = dataset_entity + if current_dataset_entity: + logger.info( + f"Using dataset entity from config: {current_dataset_entity.fullyQualifiedName.root if hasattr(current_dataset_entity.fullyQualifiedName, 'root') else current_dataset_entity.fullyQualifiedName}" + ) + if ( current_dataset_entity is None and pipeline_details.conn_type == ConnectorType.SOURCE.value ): # Parse CDC topic name to extract table information + logger.info( + f"Parsing CDC topic name to extract table info: {topic.name}" + ) topic_info = parse_cdc_topic_name( str(topic.name), database_server_name ) if topic_info.get("database") and topic_info.get("table"): - logger.debug( - f"Parsed CDC topic {topic.name}: database={topic_info['database']}, table={topic_info['table']}" + logger.info( + f"Parsed CDC topic {topic.name}: schema={topic_info['database']}, table={topic_info['table']}" ) + + # Get matched database service name + db_service_name, _ = self.get_service_from_connector_config( + pipeline_details + ) + # Try to find the table entity - if self.source_config.lineageInformation.dbServiceNames: - # Use specific database services if configured + # Use wildcard search pattern since we don't know the database name + # Pattern: service.*.schema.table + if db_service_name: + # Use matched database service first with wildcard search + logger.info( + f"Using matched database service: {db_service_name}" + ) + # Build wildcard FQN: service.*.schema.table + search_pattern = f"{fqn.quote_name(db_service_name)}.*.{fqn.quote_name(topic_info['database'])}.{fqn.quote_name(topic_info['table'])}" + logger.info( + f"Searching for table with pattern: {search_pattern} " + f"(service={db_service_name}, schema={topic_info['database']}, table={topic_info['table']})" + ) + current_dataset_entity = self.metadata.get_by_name( + entity=Table, fqn=search_pattern + entity_type=Table, fqn_search_string=search_pattern + ) + ) + if current_dataset_entity: + logger.info( + f"✓ Found table using matched service pattern: {search_pattern}" + ) + else: + logger.warning( + f"✗ Table NOT found using matched service pattern: {search_pattern}" + ) + + if ( + not current_dataset_entity + and hasattr(self.source_config, "lineageInformation") + and hasattr( + self.source_config.lineageInformation, "dbServiceNames" + ) + and self.source_config.lineageInformation.dbServiceNames + ): + # Try configured database services with wildcard search + logger.info( + f"Trying configured database services: {self.source_config.lineageInformation.dbServiceNames}" + ) for ( dbservicename ) in self.source_config.lineageInformation.dbServiceNames: - table_fqn = fqn.build( - metadata=self.metadata, - entity_type=Table, - table_name=topic_info["table"], - database_name=None, - schema_name=topic_info["database"], - service_name=dbservicename, + # Build wildcard FQN: service.*.schema.table + search_pattern = f"{fqn.quote_name(dbservicename)}.*.{fqn.quote_name(topic_info['database'])}.{fqn.quote_name(topic_info['table'])}" + logger.info( + f"Searching for table with pattern: {search_pattern}" ) current_dataset_entity = self.metadata.get_by_name( - entity=Table, fqn=table_fqn + entity=Table, fqn=search_pattern + entity_type=Table, + fqn_search_string=search_pattern, + ) ) if current_dataset_entity: - logger.debug(f"Found table entity: {table_fqn}") + logger.info( + f"✓ Found table in service {dbservicename}: {search_pattern}" + ) break - else: + else: + logger.debug( + f"✗ Table NOT found in service {dbservicename}" + ) + + if not current_dataset_entity: # Search across all database services - search_string = ( - f"{topic_info['database']}.{topic_info['table']}" + search_string = f"{fqn.quote_name(topic_info['database'])}.{fqn.quote_name(topic_info['table'])}" + logger.info( + f"Searching for table across all services using pattern: {search_string}" ) current_dataset_entity = ( self.metadata.search_in_any_service( @@ -794,18 +1307,362 @@ class KafkaconnectSource(PipelineServiceSource): ) ) if current_dataset_entity: - logger.debug( - f"Found table {search_string} via search in service {current_dataset_entity.service.name if current_dataset_entity.service else 'unknown'}" + logger.info( + f"✓ Found table via search in service " + f"{current_dataset_entity.service.name if current_dataset_entity.service else 'unknown'}: " + f"{current_dataset_entity.fullyQualifiedName.root if hasattr(current_dataset_entity.fullyQualifiedName, 'root') else current_dataset_entity.fullyQualifiedName}" ) + else: + logger.warning( + f"✗ Table NOT found via search: {search_string}" + ) + else: + logger.warning( + f"Failed to parse table info from CDC topic name: {topic.name}" + ) - if current_dataset_entity is None: - # No table entity found, skip this topic + # Lineage must always be between data assets (Table ↔ Topic) + # The pipeline is referenced in lineageDetails, not as a node in the graph + # Skip lineage if we don't have BOTH table and topic + if current_dataset_entity is None or topic_entity is None: + # Get table FQN for tracking + if current_dataset_entity: + table_fqn_str = ( + current_dataset_entity.fullyQualifiedName.root + if hasattr( + current_dataset_entity.fullyQualifiedName, "root" + ) + else str(current_dataset_entity.fullyQualifiedName) + ) + else: + # Table not found - construct debug message with search details + table_fqn_str = "NOT FOUND" + + # Get matched database service name and hostname + ( + db_service_name_for_debug, + _, + ) = self.get_service_from_connector_config(pipeline_details) + + # Extract hostname from connector config + db_hostname_for_debug = "NOT SET" + if pipeline_details.config: + db_hostname_for_debug = ( + pipeline_details.config.get("database.hostname") + or pipeline_details.config.get("database.server") + or pipeline_details.config.get("connection.host") + or "NOT SET" + ) + + # Build debug message with what we searched for + if ( + "topic_info" in locals() + and topic_info.get("database") + and topic_info.get("table") + ): + search_details = ( + f"{topic_info['database']}.{topic_info['table']}" + ) + if db_service_name_for_debug: + table_fqn_str = f"NOT FOUND (service: {db_service_name_for_debug}, searched: {search_details})" + else: + table_fqn_str = f"NOT FOUND (searched: {search_details}, hostname: {db_hostname_for_debug}, no service matched)" + else: + table_fqn_str = f"NOT FOUND (hostname: {db_hostname_for_debug}, no CDC topic info)" + + # Get topic FQN for tracking (show expected FQN even if not found) + if topic_entity: + # Topic exists - use actual FQN + topic_fqn_str = ( + topic_entity.fullyQualifiedName.root + if hasattr(topic_entity.fullyQualifiedName, "root") + else str(topic_entity.fullyQualifiedName) + ) + else: + # Topic not found - construct expected FQN with service name and quoting + if effective_messaging_service and topic: + # Build expected FQN: service."topic.name" + topic_fqn_str = fqn.build( + metadata=self.metadata, + entity_type=Topic, + service_name=effective_messaging_service, + topic_name=str(topic.name), + ) + elif topic: + # No service configured - show quoted topic name + topic_fqn_str = f'"{fqn.quote_name(str(topic.name))}"' + else: + topic_fqn_str = "NOT FOUND" + + # Track failure reason + if current_dataset_entity is None and topic_entity is None: + failure_reason = "Missing both table and topic" + elif current_dataset_entity is None: + failure_reason = "Missing table" + else: + failure_reason = "Missing topic" + + self.lineage_results.append( + { + "connector": pipeline_details.name, + "table_fqn": table_fqn_str, + "topic_fqn": topic_fqn_str, + "status": "FAILED", + "reason": failure_reason, + } + ) + logger.warning("=" * 80) + logger.warning( + f"⚠️ SKIPPING LINEAGE for connector: {pipeline_details.name}" + ) + logger.warning("=" * 80) + + # Log connector configuration for debugging FQN construction + logger.debug( + "\n📋 CONNECTOR CONFIGURATION (used for FQN construction):" + ) + if pipeline_details.config: + # Extract key config values used for FQN building + connector_class = pipeline_details.config.get( + "connector.class", "NOT SET" + ) + db_hostname = ( + pipeline_details.config.get("database.hostname") + or pipeline_details.config.get("database.server") + or pipeline_details.config.get("connection.host") + or "NOT SET" + ) + bootstrap_servers = ( + pipeline_details.config.get("kafka.bootstrap.servers") + or pipeline_details.config.get("bootstrap.servers") + or "NOT SET" + ) + table_include_list = ( + pipeline_details.config.get("table.include.list") + or pipeline_details.config.get("table.whitelist") + or "NOT SET" + ) + + logger.debug( + f" • connector.class: {connector_class}\n" + f" • database.server.name: {database_server_name or 'NOT SET'}\n" + f" • database.hostname: {db_hostname}\n" + f" • table.include.list: {table_include_list}\n" + f" • bootstrap.servers: {bootstrap_servers}\n" + f" • Connector type: {pipeline_details.conn_type}" + ) + else: + logger.debug(" NO CONFIG AVAILABLE") + + # Build expected topic FQN with proper quoting + expected_topic_fqn = None + topic_fqn_params = {} + if effective_messaging_service: + topic_fqn_params = { + "service_name": effective_messaging_service, + "topic_name": str(topic.name), + } + expected_topic_fqn = fqn.build( + metadata=self.metadata, + entity_type=Topic, + **topic_fqn_params, + ) + + # Build expected table FQN if we parsed CDC topic info + expected_table_fqn = None + table_fqn_params = {} + if ( + "topic_info" in locals() + and topic_info.get("database") + and topic_info.get("table") + ): + # Get matched database service name + ( + db_service_name_for_log, + _, + ) = self.get_service_from_connector_config(pipeline_details) + if db_service_name_for_log: + # Use wildcard pattern since we don't know the database name + # Pattern: service.*.schema.table + table_fqn_params = { + "service_name": db_service_name_for_log, + "schema_name": topic_info["database"], + "table_name": topic_info["table"], + "database_name": "* (wildcard - database name unknown)", + } + expected_table_fqn = f"{fqn.quote_name(db_service_name_for_log)}.*.{fqn.quote_name(topic_info['database'])}.{fqn.quote_name(topic_info['table'])}" + + # Log FQN construction details + logger.debug("\n🔧 FQN CONSTRUCTION DETAILS:") + if expected_topic_fqn: + logger.debug(f" Topic FQN built with: {topic_fqn_params}") + logger.debug(f" → Result: {expected_topic_fqn}") + else: + logger.debug( + f" Topic FQN: NOT BUILT (messaging service not configured)" + ) + + if expected_table_fqn: + logger.debug(f" Table FQN built with: {table_fqn_params}") + logger.debug(f" → Result: {expected_table_fqn}") + elif "topic_info" in locals() and topic_info: + logger.debug( + f" Table FQN: NOT BUILT (parsed topic_info: {topic_info}, but no db service matched)" + ) + else: + logger.debug( + f" Table FQN: NOT BUILT (no CDC topic info parsed)" + ) + + # Get bootstrap servers from config + bootstrap_servers = "NOT SET" + if pipeline_details.config: + bootstrap_servers = ( + pipeline_details.config.get("kafka.bootstrap.servers") + or pipeline_details.config.get("bootstrap.servers") + or "NOT SET" + ) + + if current_dataset_entity is None and topic_entity is None: + expected_fqn_display = ( + expected_topic_fqn + or f'."{topic.name}" (messaging service not configured)' + ) + + logger.warning( + f"❌ MISSING BOTH SOURCE AND SINK:\n" + f" • Table: NOT FOUND (searched for table related to topic '{topic.name}')\n" + f" • Topic: NOT FOUND (searched for topic '{topic.name}')\n" + f"\n" + f"💡 ACTION REQUIRED:\n" + f" 1. Ensure the topic is ingested in OpenMetadata:\n" + f" - Topic name: {topic.name}\n" + f" - Expected FQN: {expected_fqn_display}\n" + f" - Messaging service: {effective_messaging_service or 'NOT CONFIGURED - will search all services'}\n" + f" - Run messaging service metadata ingestion if needed\n" + f" 2. Ensure the source table exists in OpenMetadata:\n" + f" - Verify database service is connected and metadata is ingested\n" + f"\n" + f"⚠️ Lineage requires BOTH table and topic to be present in OpenMetadata" + ) + elif current_dataset_entity is None: + # Extract topic details + topic_service = ( + topic_entity.service.name + if hasattr(topic_entity, "service") and topic_entity.service + else "UNKNOWN" + ) + topic_fqn_full = ( + topic_entity.fullyQualifiedName.root + if hasattr(topic_entity.fullyQualifiedName, "root") + else topic_entity.fullyQualifiedName + ) + + logger.warning( + f"❌ MISSING SOURCE (Table):\n" + f" • Table: NOT FOUND\n" + f" • Topic: FOUND ✓\n" + f" - FQN: {topic_fqn_full}\n" + f" - Service: {topic_service}\n" + f" - Topic name: {topic.name}\n" + f"\n" + f"💡 ACTION REQUIRED:\n" + f" 1. Ensure the source table is ingested in OpenMetadata:\n" + f" - For CDC connectors: Check table from schema '{topic_info.get('database') if 'topic_info' in locals() else 'UNKNOWN'}'\n" + f" - Table name: {topic_info.get('table') if 'topic_info' in locals() else 'UNKNOWN'}\n" + f" - Verify the table exists in database service\n" + f" 2. Check connector configuration:\n" + f" - Connector type: {pipeline_details.conn_type}\n" + f" - Database server: {database_server_name or 'NOT SET'}\n" + f" - Table include list: {pipeline_details.config.get('table.include.list', 'NOT SET') if pipeline_details.config else 'NO CONFIG'}\n" + f"\n" + f"⚠️ Cannot create lineage without both source (table) and sink (topic)" + ) + else: + # Extract table details - split FQN to show components + table_fqn_full = ( + current_dataset_entity.fullyQualifiedName.root + if hasattr( + current_dataset_entity.fullyQualifiedName, "root" + ) + else current_dataset_entity.fullyQualifiedName + ) + table_service = ( + current_dataset_entity.service.name + if hasattr(current_dataset_entity, "service") + and current_dataset_entity.service + else "UNKNOWN" + ) + + # Parse table FQN: service.database.schema.table + table_fqn_parts = str(table_fqn_full).split(".") + if len(table_fqn_parts) >= 4: + table_db_service = table_fqn_parts[0] + table_database = table_fqn_parts[1] + table_schema = table_fqn_parts[2] + table_name = ".".join( + table_fqn_parts[3:] + ) # Handle quoted names with dots + else: + table_db_service = table_service + table_database = "UNKNOWN" + table_schema = "UNKNOWN" + table_name = "UNKNOWN" + + expected_topic_fqn_display = ( + expected_topic_fqn or f'."{topic.name}"' + ) + + logger.warning( + f"❌ MISSING SINK (Topic):\n" + f" • Table: FOUND ✓\n" + f" - FQN: {table_fqn_full}\n" + f" - Service: {table_db_service}\n" + f" - Database: {table_database}\n" + f" - Schema: {table_schema}\n" + f" - Table: {table_name}\n" + f" • Topic: NOT FOUND\n" + f" - Searched for: {topic.name}\n" + f" - Expected FQN: {expected_topic_fqn_display}\n" + f"\n" + f"💡 ACTION REQUIRED:\n" + f" 1. Ensure the topic is ingested in OpenMetadata:\n" + f" - Topic name in Kafka: {topic.name}\n" + f" - Expected FQN in OM: {expected_topic_fqn_display}\n" + f" - Messaging service: {effective_messaging_service or 'NOT CONFIGURED - will search all services'}\n" + f" - Note: Topics with dots (.) in the name are quoted in FQN\n" + f" 2. Run messaging service metadata ingestion:\n" + f" - Ingest topics from messaging service '{effective_messaging_service or 'your-kafka-service'}'\n" + f" - Verify topic '{topic.name}' exists in Kafka cluster\n" + f" 3. Check Kafka Connect configuration:\n" + f" - Connector type: {pipeline_details.conn_type}\n" + f" - Bootstrap servers: {bootstrap_servers}\n" + f" - Database server (CDC): {database_server_name or 'NOT SET'}\n" + f"\n" + f"⚠️ Cannot create lineage without both source (table) and sink (topic)" + ) + + logger.warning("=" * 80) continue + # We have both table and topic - create lineage between them + logger.info(f"✓ Found both table and topic entities for lineage") + + # Determine lineage direction based on connector type if pipeline_details.conn_type == ConnectorType.SINK.value: + # SINK: topic → table from_entity, to_entity = topic_entity, current_dataset_entity + logger.info( + f"Creating SINK lineage: {topic_entity.fullyQualifiedName.root if hasattr(topic_entity.fullyQualifiedName, 'root') else topic_entity.fullyQualifiedName} " + f"→ {current_dataset_entity.fullyQualifiedName.root if hasattr(current_dataset_entity.fullyQualifiedName, 'root') else current_dataset_entity.fullyQualifiedName}" + ) else: + # SOURCE: table → topic from_entity, to_entity = current_dataset_entity, topic_entity + logger.info( + f"Creating SOURCE lineage: {current_dataset_entity.fullyQualifiedName.root if hasattr(current_dataset_entity.fullyQualifiedName, 'root') else current_dataset_entity.fullyQualifiedName} " + f"→ {topic_entity.fullyQualifiedName.root if hasattr(topic_entity.fullyQualifiedName, 'root') else topic_entity.fullyQualifiedName}" + ) # Build column-level lineage (best effort - don't fail entity-level lineage) column_lineage = None @@ -823,6 +1680,34 @@ class KafkaconnectSource(PipelineServiceSource): ) logger.debug(traceback.format_exc()) + # Log entity details before creating lineage request + from_entity_type = type(from_entity).__name__ + to_entity_type = type(to_entity).__name__ + from_entity_id = ( + from_entity.id.root + if hasattr(from_entity.id, "root") + else from_entity.id + ) + to_entity_id = ( + to_entity.id.root if hasattr(to_entity.id, "root") else to_entity.id + ) + from_entity_fqn = ( + from_entity.fullyQualifiedName.root + if hasattr(from_entity.fullyQualifiedName, "root") + else from_entity.fullyQualifiedName + ) + to_entity_fqn = ( + to_entity.fullyQualifiedName.root + if hasattr(to_entity.fullyQualifiedName, "root") + else to_entity.fullyQualifiedName + ) + + logger.info( + f"Creating lineage edge:\n" + f" FROM: {from_entity_type} | ID={from_entity_id} | FQN={from_entity_fqn}\n" + f" TO: {to_entity_type} | ID={to_entity_id} | FQN={to_entity_fqn}" + ) + lineage_details = LineageDetails( pipeline=EntityReference( id=pipeline_entity.id.root, type="pipeline" @@ -831,25 +1716,80 @@ class KafkaconnectSource(PipelineServiceSource): columnsLineage=column_lineage, ) - yield Either( - right=AddLineageRequest( - edge=EntitiesEdge( - fromEntity=EntityReference( - id=from_entity.id, - type=ENTITY_REFERENCE_TYPE_MAP[ - type(from_entity).__name__ - ], - ), - toEntity=EntityReference( - id=to_entity.id, - type=ENTITY_REFERENCE_TYPE_MAP[ - type(to_entity).__name__ - ], - ), - lineageDetails=lineage_details, - ) + lineage_request = AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=from_entity.id, + type=ENTITY_REFERENCE_TYPE_MAP[type(from_entity).__name__], + ), + toEntity=EntityReference( + id=to_entity.id, + type=ENTITY_REFERENCE_TYPE_MAP[type(to_entity).__name__], + ), + lineageDetails=lineage_details, ) ) + + # Track successful lineage creation + table_fqn_str = ( + current_dataset_entity.fullyQualifiedName.root + if hasattr(current_dataset_entity.fullyQualifiedName, "root") + else str(current_dataset_entity.fullyQualifiedName) + ) + topic_fqn_str = ( + topic_entity.fullyQualifiedName.root + if hasattr(topic_entity.fullyQualifiedName, "root") + else str(topic_entity.fullyQualifiedName) + ) + self.lineage_results.append( + { + "connector": pipeline_details.name, + "table_fqn": table_fqn_str, + "topic_fqn": topic_fqn_str, + "status": "SUCCESS", + "reason": f"{from_entity_type} → {to_entity_type}", + } + ) + + # Log successful lineage creation (debug level - details in summary table) + logger.debug("=" * 80) + logger.debug( + f"✅ LINEAGE CREATED SUCCESSFULLY for connector: {pipeline_details.name}" + ) + logger.debug("=" * 80) + + # Extract service names for logging + from_service = "UNKNOWN" + to_service = "UNKNOWN" + if hasattr(from_entity, "service") and from_entity.service: + from_service = ( + from_entity.service.name + if hasattr(from_entity.service.name, "root") + else from_entity.service.name + ) + if hasattr(to_entity, "service") and to_entity.service: + to_service = ( + to_entity.service.name + if hasattr(to_entity.service.name, "root") + else to_entity.service.name + ) + + logger.debug( + f"📊 LINEAGE DETAILS:\n" + f" • FROM: {from_entity_type}\n" + f" - FQN: {from_entity_fqn}\n" + f" - Service: {from_service}\n" + f" • TO: {to_entity_type}\n" + f" - FQN: {to_entity_fqn}\n" + f" - Service: {to_service}\n" + f" • PIPELINE: {pipeline_details.name}\n" + f" - Type: {pipeline_details.conn_type}\n" + f" - Pipeline FQN: {pipeline_fqn}\n" + f" • COLUMN LINEAGE: {len(column_lineage) if column_lineage else 0} column mappings\n" + ) + logger.debug("=" * 80) + + yield Either(right=lineage_request) except Exception as exc: yield Either( left=StackTraceError( @@ -927,3 +1867,54 @@ class KafkaconnectSource(PipelineServiceSource): stackTrace=traceback.format_exc(), ) ) + + def print_lineage_summary(self): + """ + Print a summary table of lineage creation results + """ + if not self.lineage_results: + return + + logger.info("\n" + "=" * 180) + logger.info("LINEAGE CREATION SUMMARY") + logger.info("=" * 180) + + # Count successes and failures + successes = [r for r in self.lineage_results if r["status"] == "SUCCESS"] + failures = [r for r in self.lineage_results if r["status"] == "FAILED"] + + # Print header + logger.info( + f"{'Connector':<35} | {'Table FQN':<50} | {'Topic FQN':<50} | {'Status':<10} | {'Details':<20}" + ) + logger.info("-" * 180) + + # Print all results + for result in self.lineage_results: + status_icon = "✅" if result["status"] == "SUCCESS" else "❌" + logger.info( + f"{result['connector']:<35} | " + f"{result['table_fqn']:<50} | " + f"{result['topic_fqn']:<50} | " + f"{status_icon} {result['status']:<8} | " + f"{result['reason']:<20}" + ) + + # Print summary stats + logger.info("=" * 180) + total = len(self.lineage_results) + success_count = len(successes) + failure_count = len(failures) + success_pct = (success_count / total * 100) if total > 0 else 0 + + logger.info( + f"Total: {total} | Success: {success_count} ({success_pct:.1f}%) | Failed: {failure_count}" + ) + logger.info("=" * 180 + "\n") + + def close(self): + """ + Called at the end of the ingestion workflow to cleanup and print summary + """ + self.print_lineage_summary() + super().close() diff --git a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/models.py b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/models.py index 44bad0f66d2..b7edbcdd1e8 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/models.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/models.py @@ -42,6 +42,9 @@ class KafkaConnectTasks(BaseModel): class KafkaConnectTopics(BaseModel): name: str = Field(..., description="Name of the topic (e.g., random-source-avro)") + fqn: Optional[str] = Field( + default=None, description="Fully qualified name of the topic in OpenMetadata" + ) class KafkaConnectColumnMapping(BaseModel): diff --git a/ingestion/tests/unit/topology/pipeline/test_kafkaconnect.py b/ingestion/tests/unit/topology/pipeline/test_kafkaconnect.py index 4d7a87216cd..67c589ff313 100644 --- a/ingestion/tests/unit/topology/pipeline/test_kafkaconnect.py +++ b/ingestion/tests/unit/topology/pipeline/test_kafkaconnect.py @@ -952,6 +952,36 @@ class TestCDCTopicParsing(TestCase): result = parse_cdc_topic_name("MongoCDC.mydb.users", "MongoCDC") self.assertEqual(result, {"database": "mydb", "table": "users"}) + def test_parse_cdc_topic_server_name_with_dots(self): + """Test parsing CDC topics when server name contains dots""" + from metadata.ingestion.source.pipeline.kafkaconnect.client import ( + parse_cdc_topic_name, + ) + + # Server name with dots: myapp.payments.prod + # Full topic: myapp.payments.prod.transactions.orders + # Expected: database=transactions, table=orders + result = parse_cdc_topic_name( + "myapp.payments.prod.transactions.orders", "myapp.payments.prod" + ) + self.assertEqual(result, {"database": "transactions", "table": "orders"}) + + # Server name with dots and only table (no schema) + # Full topic: myapp.payments.prod.users + # Expected: database=myapp.payments.prod, table=users + result = parse_cdc_topic_name( + "myapp.payments.prod.users", "myapp.payments.prod" + ) + self.assertEqual(result, {"database": "myapp.payments.prod", "table": "users"}) + + # Multiple level server name + # Server: app.service.env.region + # Topic: app.service.env.region.schema1.table1 + result = parse_cdc_topic_name( + "app.service.env.region.schema1.table1", "app.service.env.region" + ) + self.assertEqual(result, {"database": "schema1", "table": "table1"}) + class TestKafkaConnectCDCColumnExtraction(TestCase): """Test CDC column extraction from Debezium schema""" diff --git a/ingestion/tests/unit/topology/pipeline/test_kafkaconnect_service_discovery.py b/ingestion/tests/unit/topology/pipeline/test_kafkaconnect_service_discovery.py new file mode 100644 index 00000000000..c93ed97ed5d --- /dev/null +++ b/ingestion/tests/unit/topology/pipeline/test_kafkaconnect_service_discovery.py @@ -0,0 +1,428 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test KafkaConnect service discovery and caching functionality +""" +from unittest import TestCase +from unittest.mock import Mock, patch + +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.entity.services.messagingService import MessagingService +from metadata.ingestion.source.pipeline.kafkaconnect.metadata import KafkaconnectSource +from metadata.ingestion.source.pipeline.kafkaconnect.models import ( + KafkaConnectPipelineDetails, +) + + +class TestServiceCaching(TestCase): + """Test service caching functionality""" + + def _create_mock_service(self, name, service_type, host_port=None): + """Helper to create a mock database service""" + service = Mock(spec=DatabaseService) + service.name = Mock() + service.name.root = name + service.serviceType = Mock() + service.serviceType.value = service_type + + if host_port: + service.connection = Mock() + service.connection.config = Mock() + service.connection.config.hostPort = host_port + else: + service.connection = None + + return service + + def _create_mock_messaging_service(self, name, bootstrap_servers=None): + """Helper to create a mock messaging service""" + service = Mock(spec=MessagingService) + service.name = Mock() + service.name.root = name + + if bootstrap_servers: + service.connection = Mock() + service.connection.config = Mock() + service.connection.config.bootstrapServers = bootstrap_servers + else: + service.connection = None + + return service + + @patch( + "metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.__init__" + ) + def test_database_services_property_caches_results(self, mock_parent_init): + """Test that database_services property caches results""" + mock_parent_init.return_value = None + + mock_config = Mock() + mock_config.serviceConnection.root.config.hostPort = "http://localhost:8083" + mock_config.serviceConnection.root.config.verifySSL = True + + mock_metadata = Mock() + mock_db_services = [ + self._create_mock_service("mysql-prod", "Mysql", "localhost:3306"), + self._create_mock_service("postgres-prod", "Postgres", "localhost:5432"), + ] + mock_metadata.list_all_entities.return_value = iter(mock_db_services) + + source = KafkaconnectSource(mock_config, mock_metadata) + source.metadata = ( + mock_metadata # Set metadata manually since parent __init__ is mocked + ) + + # First access - should call list_all_entities + services1 = source.database_services + self.assertEqual(len(services1), 2) + self.assertEqual(mock_metadata.list_all_entities.call_count, 1) + + # Second access - should use cache (no additional call) + services2 = source.database_services + self.assertEqual(len(services2), 2) + self.assertEqual(mock_metadata.list_all_entities.call_count, 1) + + # Verify same object is returned (cached) + self.assertIs(services1, services2) + + @patch( + "metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.__init__" + ) + def test_messaging_services_property_caches_results(self, mock_parent_init): + """Test that messaging_services property caches results""" + mock_parent_init.return_value = None + + mock_config = Mock() + mock_config.serviceConnection.root.config.hostPort = "http://localhost:8083" + + mock_metadata = Mock() + mock_msg_services = [ + self._create_mock_messaging_service( + "kafka-prod", "broker1:9092,broker2:9092" + ), + self._create_mock_messaging_service("kafka-dev", "localhost:9092"), + ] + mock_metadata.list_all_entities.return_value = iter(mock_msg_services) + + source = KafkaconnectSource(mock_config, mock_metadata) + source.metadata = ( + mock_metadata # Set metadata manually since parent __init__ is mocked + ) + + # First access - should call list_all_entities + services1 = source.messaging_services + self.assertEqual(len(services1), 2) + self.assertEqual(mock_metadata.list_all_entities.call_count, 1) + + # Second access - should use cache + services2 = source.messaging_services + self.assertEqual(len(services2), 2) + self.assertEqual(mock_metadata.list_all_entities.call_count, 1) + + # Verify same object is returned (cached) + self.assertIs(services1, services2) + + +class TestServiceDiscovery(TestCase): + """Test database and messaging service discovery""" + + def _create_mock_db_service(self, name, service_type, host_port): + """Helper to create a mock database service""" + service = Mock(spec=DatabaseService) + service.name = Mock() + service.name.root = name + service.serviceType = Mock() + service.serviceType.value = service_type + service.connection = Mock() + service.connection.config = Mock() + service.connection.config.hostPort = host_port + return service + + @patch( + "metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.__init__" + ) + def test_find_database_service_by_hostname_matches_correctly( + self, mock_parent_init + ): + """Test finding database service by hostname with port stripping""" + mock_parent_init.return_value = None + + mock_config = Mock() + mock_config.serviceConnection.root.config.hostPort = "http://localhost:8083" + + mock_metadata = Mock() + mock_db_services = [ + self._create_mock_db_service( + "mysql-prod", "Mysql", "mysql.example.com:3306" + ), + self._create_mock_db_service( + "postgres-prod", "Postgres", "postgres.example.com:5432" + ), + ] + mock_metadata.list_all_entities.return_value = iter(mock_db_services) + + source = KafkaconnectSource(mock_config, mock_metadata) + source.metadata = ( + mock_metadata # Set metadata manually since parent __init__ is mocked + ) + + # Test matching MySQL service + result = source.find_database_service_by_hostname( + "Mysql", "mysql.example.com:3306" + ) + self.assertEqual(result, "mysql-prod") + + # Test matching with protocol prefix + result = source.find_database_service_by_hostname( + "Mysql", "jdbc:mysql://mysql.example.com:3306/db" + ) + self.assertEqual(result, "mysql-prod") + + @patch( + "metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.__init__" + ) + def test_find_messaging_service_by_brokers_matches_correctly( + self, mock_parent_init + ): + """Test finding messaging service by broker endpoints""" + mock_parent_init.return_value = None + + mock_config = Mock() + mock_config.serviceConnection.root.config.hostPort = "http://localhost:8083" + + mock_metadata = Mock() + + kafka_service = Mock(spec=MessagingService) + kafka_service.name = Mock() + kafka_service.name.root = "kafka-prod" + kafka_service.connection = Mock() + kafka_service.connection.config = Mock() + kafka_service.connection.config.bootstrapServers = ( + "broker1.example.com:9092,broker2.example.com:9092" + ) + + mock_metadata.list_all_entities.return_value = iter([kafka_service]) + + source = KafkaconnectSource(mock_config, mock_metadata) + source.metadata = ( + mock_metadata # Set metadata manually since parent __init__ is mocked + ) + + # Test matching with protocol prefix + result = source.find_messaging_service_by_brokers( + "SASL_SSL://broker1.example.com:9092,SASL_SSL://broker2.example.com:9092" + ) + self.assertEqual(result, "kafka-prod") + + # Test matching with partial overlap + result = source.find_messaging_service_by_brokers("broker1.example.com:9092") + self.assertEqual(result, "kafka-prod") + + +class TestTopicSearchByPrefix(TestCase): + """Test topic search by prefix fallback mechanism""" + + @patch( + "metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.__init__" + ) + def test_search_topics_by_prefix_finds_matching_topics(self, mock_parent_init): + """Test searching for topics by database.server.name prefix""" + mock_parent_init.return_value = None + + mock_config = Mock() + mock_config.serviceConnection.root.config.hostPort = "http://localhost:8083" + + mock_metadata = Mock() + + # Create mock topics + topic1 = Mock() + topic1.name = Mock() + topic1.name.root = "myserver.public.users" + topic1.fullyQualifiedName = Mock() + topic1.fullyQualifiedName.root = 'kafka-prod."myserver.public.users"' + + topic2 = Mock() + topic2.name = Mock() + topic2.name.root = "myserver.public.orders" + topic2.fullyQualifiedName = Mock() + topic2.fullyQualifiedName.root = 'kafka-prod."myserver.public.orders"' + + topic3 = Mock() + topic3.name = Mock() + topic3.name.root = "other.topic" + topic3.fullyQualifiedName = Mock() + topic3.fullyQualifiedName.root = "kafka-prod.other.topic" + + mock_metadata.list_all_entities.return_value = iter([topic1, topic2, topic3]) + + source = KafkaconnectSource(mock_config, mock_metadata) + source.metadata = ( + mock_metadata # Set metadata manually since parent __init__ is mocked + ) + + # Search for topics with prefix "myserver" + result = source._search_topics_by_prefix("myserver", "kafka-prod") + + # Should find only topics starting with "myserver." + self.assertEqual(len(result), 2) + self.assertEqual(result[0].name, "myserver.public.users") + self.assertEqual(result[1].name, "myserver.public.orders") + + # Verify FQNs are populated + self.assertEqual(result[0].fqn, 'kafka-prod."myserver.public.users"') + self.assertEqual(result[1].fqn, 'kafka-prod."myserver.public.orders"') + + @patch( + "metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.__init__" + ) + def test_search_topics_by_prefix_returns_empty_when_none_match( + self, mock_parent_init + ): + """Test that search returns empty list when no topics match""" + mock_parent_init.return_value = None + + mock_config = Mock() + mock_config.serviceConnection.root.config.hostPort = "http://localhost:8083" + + mock_metadata = Mock() + + topic = Mock() + topic.name = Mock() + topic.name.root = "other.topic" + + mock_metadata.list_all_entities.return_value = iter([topic]) + + source = KafkaconnectSource(mock_config, mock_metadata) + + # Search for topics with prefix that doesn't exist + result = source._search_topics_by_prefix("nonexistent", "kafka-prod") + + self.assertEqual(len(result), 0) + + @patch( + "metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.__init__" + ) + def test_search_topics_by_prefix_handles_no_messaging_service( + self, mock_parent_init + ): + """Test that search handles None messaging service gracefully""" + mock_parent_init.return_value = None + + mock_config = Mock() + mock_config.serviceConnection.root.config.hostPort = "http://localhost:8083" + + mock_metadata = Mock() + + source = KafkaconnectSource(mock_config, mock_metadata) + + # Search without messaging service name + result = source._search_topics_by_prefix("myserver", None) + + # Should return empty list + self.assertEqual(len(result), 0) + + +class TestCDCTopicFallback(TestCase): + """Test CDC topic parsing with table.include.list fallback""" + + @patch( + "metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.__init__" + ) + def test_parse_cdc_topics_from_config_with_table_include_list( + self, mock_parent_init + ): + """Test parsing topics from table.include.list""" + mock_parent_init.return_value = None + + mock_config = Mock() + mock_config.serviceConnection.root.config.hostPort = "http://localhost:8083" + + mock_metadata = Mock() + source = KafkaconnectSource(mock_config, mock_metadata) + + pipeline_details = KafkaConnectPipelineDetails( + name="test-connector", + config={ + "connector.class": "io.debezium.connector.mysql.MySqlConnector", + "database.server.name": "myserver", + "table.include.list": "public.users,public.orders,inventory.products", + }, + ) + + result = source._parse_cdc_topics_from_config(pipeline_details, "myserver") + + # Should create topics for each table + self.assertEqual(len(result), 3) + self.assertEqual(result[0].name, "myserver.public.users") + self.assertEqual(result[1].name, "myserver.public.orders") + self.assertEqual(result[2].name, "myserver.inventory.products") + + @patch( + "metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.__init__" + ) + def test_parse_cdc_topics_returns_empty_without_table_include_list( + self, mock_parent_init + ): + """Test that parsing returns empty when table.include.list is missing""" + mock_parent_init.return_value = None + + mock_config = Mock() + mock_config.serviceConnection.root.config.hostPort = "http://localhost:8083" + + mock_metadata = Mock() + source = KafkaconnectSource(mock_config, mock_metadata) + + pipeline_details = KafkaConnectPipelineDetails( + name="test-connector", + config={ + "connector.class": "io.debezium.connector.mysql.MySqlConnector", + "database.server.name": "myserver", + # No table.include.list + }, + ) + + with self.assertLogs(level="WARNING") as log: + result = source._parse_cdc_topics_from_config(pipeline_details, "myserver") + + # Should return empty list + self.assertEqual(len(result), 0) + + # Should log warning about missing table.include.list + self.assertTrue(any("table.include.list" in message for message in log.output)) + + @patch( + "metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.__init__" + ) + def test_parse_cdc_topics_supports_table_whitelist_legacy(self, mock_parent_init): + """Test that table.whitelist (legacy key) is also supported""" + mock_parent_init.return_value = None + + mock_config = Mock() + mock_config.serviceConnection.root.config.hostPort = "http://localhost:8083" + + mock_metadata = Mock() + source = KafkaconnectSource(mock_config, mock_metadata) + + pipeline_details = KafkaConnectPipelineDetails( + name="test-connector", + config={ + "connector.class": "io.debezium.connector.mysql.MySqlConnector", + "database.server.name": "myserver", + "table.whitelist": "public.users", # Legacy key + }, + ) + + result = source._parse_cdc_topics_from_config(pipeline_details, "myserver") + + # Should parse from legacy key + self.assertEqual(len(result), 1) + self.assertEqual(result[0].name, "myserver.public.users")