From 454d7367b0d74ada3cacfedde53f96794c66aa54 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Wed, 8 Oct 2025 15:58:27 -0400 Subject: [PATCH] Kafka Connect: Support Confluent Cloud connectors (#23780) --- .../examples/workflows/confluent_cdc.yaml | 70 ++ .../workflows/confluent_cdc_cloud.yaml | 86 ++ .../workflows/confluent_cdc_local.yaml | 120 +++ .../workflows/my_confluent_cloud.yaml | 49 + .../source/pipeline/kafkaconnect/client.py | 348 ++++++- .../source/pipeline/kafkaconnect/metadata.py | 562 ++++++++++- .../source/pipeline/kafkaconnect/models.py | 33 +- .../metadata/parsers/json_schema_parser.py | 42 +- .../topology/pipeline/test_kafkaconnect.py | 886 +++++++++++++++++- 9 files changed, 2141 insertions(+), 55 deletions(-) create mode 100644 ingestion/src/metadata/examples/workflows/confluent_cdc.yaml create mode 100644 ingestion/src/metadata/examples/workflows/confluent_cdc_cloud.yaml create mode 100644 ingestion/src/metadata/examples/workflows/confluent_cdc_local.yaml create mode 100644 ingestion/src/metadata/examples/workflows/my_confluent_cloud.yaml diff --git a/ingestion/src/metadata/examples/workflows/confluent_cdc.yaml b/ingestion/src/metadata/examples/workflows/confluent_cdc.yaml new file mode 100644 index 00000000000..0b13381b2b8 --- /dev/null +++ b/ingestion/src/metadata/examples/workflows/confluent_cdc.yaml @@ -0,0 +1,70 @@ +source: + type: kafkaconnect + serviceName: confluent_cdc_mysql_postgres + serviceConnection: + config: + type: KafkaConnect + hostPort: http://localhost:8083 + # For Kafka Connect, choose one of the following authentication methods: + + # Option 1: Username/Password Authentication (for self-hosted Kafka Connect) + KafkaConnectConfig: + username: admin + password: admin_password + + # Option 2: API Key Authentication (for Confluent Cloud) + # KafkaConnectConfig: + # username: YOUR_CONFLUENT_CLOUD_API_KEY + # password: YOUR_CONFLUENT_CLOUD_API_SECRET + + # Option 3: No Authentication + # KafkaConnectConfig: null + + verifySSL: true + messagingServiceName: KafkaProd + + # Optional: Filter CDC connectors/pipelines using regex patterns + # pipelineFilterPattern: + # includes: + # - "mysql-cdc-.*" + # - "postgres-sink-.*" + # excludes: + # - ".*test.*" + # - ".*dev.*" + + sourceConfig: + config: + type: PipelineMetadata + lineageInformation: + dbServiceNames: + - "MysqlProd" + - "PostgresProd" + storageServiceNames: [] + +sink: + type: metadata-rest + config: {} + +workflowConfig: + loggerLevel: INFO # DEBUG, INFO, WARN or ERROR + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: openmetadata + securityConfig: + jwtToken: + +# To run this workflow: +# metadata ingest -c /path/to/confluent_cdc.yaml + +# Prerequisites: +# 1. OpenMetadata server running at http://localhost:8585 +# 2. MySQL database service already ingested into OpenMetadata +# 3. Postgres database service already ingested into OpenMetadata +# 4. Kafka messaging service already ingested into OpenMetadata +# 5. Kafka Connect connectors configured and running +# +# The connector will create: +# - Pipeline entities for each CDC connector +# - Table → Topic lineage with column-level mapping +# - Topic → Table lineage with column-level mapping +# - Complete end-to-end lineage: MySQL columns → Kafka topic fields → Postgres columns diff --git a/ingestion/src/metadata/examples/workflows/confluent_cdc_cloud.yaml b/ingestion/src/metadata/examples/workflows/confluent_cdc_cloud.yaml new file mode 100644 index 00000000000..2c5bd03df0f --- /dev/null +++ b/ingestion/src/metadata/examples/workflows/confluent_cdc_cloud.yaml @@ -0,0 +1,86 @@ +# Example: Confluent Cloud CDC Configuration +# This example shows how to connect to Confluent Cloud CDC connectors +# that replicate data from MySQL to PostgreSQL via Kafka + +source: + type: kafkaconnect + serviceName: confluent_cloud_cdc_production + serviceConnection: + config: + type: KafkaConnect + # Confluent Cloud Kafka Connect REST API endpoint + hostPort: https://pkc-xxxxx.us-east-1.aws.confluent.cloud:443 + + # Confluent Cloud API Key authentication + KafkaConnectConfig: + username: YOUR_CONFLUENT_CLOUD_API_KEY + password: YOUR_CONFLUENT_CLOUD_API_SECRET + + verifySSL: true + messagingServiceName: KafkaCloudProd + + # Filter to include only production CDC connectors + pipelineFilterPattern: + includes: + - "prod-mysql-cdc-.*" + - "prod-postgres-sink-.*" + excludes: + - ".*staging.*" + - ".*test.*" + + sourceConfig: + config: + type: PipelineMetadata + lineageInformation: + dbServiceNames: + - "MysqlProd" + - "PostgresProd" + storageServiceNames: [] + +sink: + type: metadata-rest + config: {} + +workflowConfig: + loggerLevel: INFO + openMetadataServerConfig: + hostPort: https://your-openmetadata-server.com/api + authProvider: openmetadata + securityConfig: + jwtToken: + +# Example CDC Connector Configuration that this workflow will discover: +# +# MySQL CDC Source Connector: +# { +# "name": "prod-mysql-cdc-customers", +# "config": { +# "connector.class": "io.debezium.connector.mysql.MySqlConnector", +# "database.hostname": "mysql.example.com", +# "database.name": "ecommerce", +# "table.include.list": "ecommerce.customers,ecommerce.orders", +# "database.server.name": "prod-mysql", +# "topics": "prod-mysql.ecommerce.customers" +# } +# } +# +# Postgres Sink Connector: +# { +# "name": "prod-postgres-sink-customers", +# "config": { +# "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", +# "connection.url": "jdbc:postgresql://postgres.example.com:5432/analytics", +# "topics": "prod-mysql.ecommerce.customers", +# "table.name.format": "customers", +# "schema": "public" +# } +# } +# +# This will create lineage: +# MySQL: ecommerce.customers (id, name, email, created_at) +# ↓ +# Kafka: prod-mysql.ecommerce.customers (id, name, email, created_at) [topic schema fields] +# ↓ +# Postgres: analytics.public.customers (id, name, email, created_at) +# +# With column-level lineage showing exact field mappings at each hop! diff --git a/ingestion/src/metadata/examples/workflows/confluent_cdc_local.yaml b/ingestion/src/metadata/examples/workflows/confluent_cdc_local.yaml new file mode 100644 index 00000000000..23db85e0c95 --- /dev/null +++ b/ingestion/src/metadata/examples/workflows/confluent_cdc_local.yaml @@ -0,0 +1,120 @@ +# Example: Local Confluent CDC Setup for Development +# This example shows a typical local development setup with: +# - Local Kafka Connect cluster +# - MySQL source database +# - PostgreSQL target database +# - Local Kafka broker + +source: + type: kafkaconnect + serviceName: local_cdc_dev + serviceConnection: + config: + type: KafkaConnect + # Local Kafka Connect REST API + hostPort: http://localhost:8083 + + # No authentication for local development + # KafkaConnectConfig: null + + # Or use basic auth if configured + KafkaConnectConfig: + username: connect-user + password: connect-password + + verifySSL: false # Typically disabled for local development + messagingServiceName: KafkaLocal + + sourceConfig: + config: + type: PipelineMetadata + lineageInformation: + dbServiceNames: + - "MysqlLocal" + - "PostgresLocal" + storageServiceNames: [] + +sink: + type: metadata-rest + config: {} + +workflowConfig: + loggerLevel: DEBUG # Use DEBUG for development to see detailed logs + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: openmetadata + securityConfig: + jwtToken: eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg + +# ============================================================================ +# Local Development Setup Guide +# ============================================================================ +# +# 1. Start local infrastructure: +# docker-compose up -d # Start MySQL, Postgres, Kafka, Kafka Connect +# +# 2. Ingest database metadata into OpenMetadata: +# # MySQL +# metadata ingest -c mysql_local.yaml +# +# # PostgreSQL +# metadata ingest -c postgres_local.yaml +# +# # Kafka +# metadata ingest -c kafka_local.yaml +# +# 3. Configure CDC connectors in Kafka Connect: +# +# # MySQL CDC Source Connector (Debezium) +# curl -X POST http://localhost:8083/connectors \ +# -H "Content-Type: application/json" \ +# -d '{ +# "name": "mysql-source-customers", +# "config": { +# "connector.class": "io.debezium.connector.mysql.MySqlConnector", +# "database.hostname": "mysql", +# "database.port": "3306", +# "database.user": "debezium", +# "database.password": "dbz", +# "database.server.id": "184054", +# "database.server.name": "local-mysql", +# "database.name": "testdb", +# "table.include.list": "testdb.customers,testdb.orders", +# "database.history.kafka.bootstrap.servers": "kafka:9092", +# "database.history.kafka.topic": "schema-changes.testdb" +# } +# }' +# +# # PostgreSQL Sink Connector +# curl -X POST http://localhost:8083/connectors \ +# -H "Content-Type: application/json" \ +# -d '{ +# "name": "postgres-sink-customers", +# "config": { +# "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", +# "connection.url": "jdbc:postgresql://postgres:5432/warehouse", +# "connection.user": "postgres", +# "connection.password": "postgres", +# "topics": "local-mysql.testdb.customers", +# "table.name.format": "customers", +# "auto.create": "true", +# "auto.evolve": "true", +# "insert.mode": "upsert", +# "pk.mode": "record_key", +# "delete.enabled": "true" +# } +# }' +# +# 4. Run this CDC workflow: +# metadata ingest -c confluent_cdc_local.yaml +# +# 5. View lineage in OpenMetadata UI: +# http://localhost:8585 +# Navigate to: Pipeline Services → local_cdc_dev → Pipelines +# Click on a pipeline to see column-level lineage +# +# Expected Lineage: +# testdb.customers.id → local-mysql.testdb.customers.id → warehouse.public.customers.id +# testdb.customers.name → local-mysql.testdb.customers.name → warehouse.public.customers.name +# testdb.customers.email → local-mysql.testdb.customers.email → warehouse.public.customers.email +# testdb.customers.created → local-mysql.testdb.customers.created → warehouse.public.customers.created diff --git a/ingestion/src/metadata/examples/workflows/my_confluent_cloud.yaml b/ingestion/src/metadata/examples/workflows/my_confluent_cloud.yaml new file mode 100644 index 00000000000..af9110f99a9 --- /dev/null +++ b/ingestion/src/metadata/examples/workflows/my_confluent_cloud.yaml @@ -0,0 +1,49 @@ +source: + type: kafkaconnect + serviceName: confluent_cloud_cdc + serviceConnection: + config: + type: KafkaConnect + # IMPORTANT: Use the Kafka Connect API endpoint, NOT the Kafka broker endpoint + # Format: https://api.confluent.cloud/connect/v1/environments/{ENV_ID}/clusters/{CLUSTER_ID} + # Example: https://api.confluent.cloud/connect/v1/environments/env-abc123/clusters/lkc-xyz789 + hostPort: https://api.confluent.cloud/connect/v1/environments/YOUR_ENV_ID/clusters/YOUR_CONNECT_CLUSTER_ID + + # Use Kafka Connect API Key (NOT Kafka broker API key) + KafkaConnectConfig: + username: YOUR_KAFKA_CONNECT_API_KEY + password: YOUR_KAFKA_CONNECT_API_SECRET + + verifySSL: true + + # REQUIRED for Confluent Cloud: Specify your Kafka messaging service + # This should match the service name you used when ingesting Kafka topics + messagingServiceName: confluent_kafka_cloud + + # Optional: Filter specific connectors + # pipelineFilterPattern: + # includes: + # - "mysql-cdc-.*" + # - "postgres-sink-.*" + + sourceConfig: + config: + type: PipelineMetadata + # Specify the database services where your tables are ingested + lineageInformation: + dbServiceNames: + - local_mysql # Replace with your MySQL service name + - local_postgres # Replace with your Postgres service name + storageServiceNames: [] # Add S3/GCS service names if using sink connectors to storage + +sink: + type: metadata-rest + config: {} + +workflowConfig: + loggerLevel: DEBUG # Use DEBUG to see detailed connection info + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: openmetadata + securityConfig: + jwtToken: eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg \ No newline at end of file diff --git a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/client.py b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/client.py index b760d8e696e..d5e8ecc847c 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/client.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/client.py @@ -14,6 +14,7 @@ Client to interact with Kafka Connect REST APIs import traceback from typing import List, Optional +from urllib.parse import urlparse from kafka_connect import KafkaConnect @@ -21,6 +22,7 @@ from metadata.generated.schema.entity.services.connections.pipeline.kafkaConnect KafkaConnectConnection, ) from metadata.ingestion.source.pipeline.kafkaconnect.models import ( + KafkaConnectColumnMapping, KafkaConnectDatasetDetails, KafkaConnectPipelineDetails, KafkaConnectTopics, @@ -30,16 +32,111 @@ from metadata.utils.logger import ometa_logger logger = ometa_logger() -SUPPORTED_DATASETS = { - "table": [ + +def parse_cdc_topic_name(topic_name: str, database_server_name: str = None) -> dict: + """ + Parse CDC topic names to extract database and table information. + + Common CDC topic naming patterns: + - Debezium: {server-name}.{database}.{table} + - Debezium V2: {topic-prefix}.{database}.{table} + - Examples: + - MysqlKafkaV2.ecommerce.orders -> database=ecommerce, table=orders + - PostgresKafkaCDC.public.orders -> database=public, table=orders + - ecommerce.customers -> database=ecommerce, table=customers (if server-name matches) + + Args: + topic_name: The Kafka topic name + database_server_name: The database.server.name or topic.prefix from connector config + + Returns: + dict with 'database' and 'table' keys, or empty dict if pattern doesn't match + """ + if not topic_name: + return {} + + # Skip internal/system topics + if topic_name.startswith(("_", "dbhistory.", "__")): + return {} + + parts = topic_name.split(".") + + # Pattern: {prefix}.{database}.{table} (3 parts) + if len(parts) == 3: + prefix, database, table = parts + # Verify prefix matches server name if provided + if database_server_name and prefix.lower() != database_server_name.lower(): + # Might be schema.database.table for some connectors + pass + return {"database": database, "table": table} + + # Pattern: {database}.{table} (2 parts) + elif len(parts) == 2: + database, table = parts + # Only accept if server name matches or not provided + if database_server_name and database.lower() == database_server_name.lower(): + # This is server_name.table, so database is the server name + return {"database": database, "table": table} + # Or accept as database.table + return {"database": database, "table": table} + + # Pattern: just {table} (1 part) - use server name as database + elif len(parts) == 1 and database_server_name: + return {"database": database_server_name, "table": topic_name} + + return {} + + +class ConnectorConfigKeys: + """Configuration keys for various Kafka Connect connectors""" + + TABLE_KEYS = [ "table", "collection", "snowflake.schema.name", "table.whitelist", "fields.whitelist", - ], - "database": ["database", "db.name", "snowflake.database.name"], - "container_name": ["s3.bucket.name"], + "table.include.list", + "table.name.format", + "tables.include", + "table.exclude.list", + "snowflake.schema", + "snowflake.topic2table.map", + "fields.included", + ] + + DATABASE_KEYS = [ + "database", + "db.name", + "snowflake.database.name", + "database.include.list", + "database.hostname", + "connection.url", + "database.dbname", + "topic.prefix", + "database.server.name", # Debezium V1 + "databases.include", + "database.names", + "snowflake.database", + "connection.host", + "database.exclude.list", + ] + + CONTAINER_KEYS = [ + "s3.bucket.name", + "s3.bucket", + "gcs.bucket.name", + "azure.container.name", + "topics.dir", # Directory path within storage container for sink connectors + ] + + TOPIC_KEYS = ["kafka.topic", "topics", "topic"] + + +SUPPORTED_DATASETS = { + "table": ConnectorConfigKeys.TABLE_KEYS, + "database": ConnectorConfigKeys.DATABASE_KEYS, + "container_name": ConnectorConfigKeys.CONTAINER_KEYS, } @@ -56,6 +153,41 @@ class KafkaConnectClient: auth = f"{config.KafkaConnectConfig.username}:{config.KafkaConnectConfig.password.get_secret_value()}" self.client = KafkaConnect(url=url, auth=auth, ssl_verify=ssl_verify) + # Detect if this is Confluent Cloud (managed connectors) + parsed_url = urlparse(url) + self.is_confluent_cloud = parsed_url.hostname == "api.confluent.cloud" + + def _infer_cdc_topics_from_server_name( + self, database_server_name: str + ) -> Optional[List[KafkaConnectTopics]]: + """ + For CDC connectors, infer topic names based on database.server.name or topic.prefix. + CDC connectors create topics with pattern: {server-name}.{database}.{table} + + This is a workaround for Confluent Cloud which doesn't expose topic lists. + We look for topics that start with the server name prefix. + + Args: + database_server_name: The database.server.name or topic.prefix from config + + Returns: + List of inferred KafkaConnectTopics, or None + """ + if not database_server_name or not self.is_confluent_cloud: + return None + + try: + # Get all connectors and check their topics + # Note: This is a best-effort approach for Confluent Cloud + # In practice, the messaging service should already have ingested these topics + logger.debug( + f"CDC connector detected with server name: {database_server_name}" + ) + return None # Topics will be matched via messaging service during lineage + except Exception as exc: + logger.debug(f"Unable to infer CDC topics: {exc}") + return None + def _enrich_connector_details( self, connector_details: KafkaConnectPipelineDetails, connector_name: str ) -> None: @@ -70,10 +202,49 @@ class KafkaConnectClient: connector_details.config ) + # For CDC connectors without explicit topics, try to infer from server name + if ( + not connector_details.topics + and connector_details.conn_type.lower() == "source" + ): + database_server_name = connector_details.config.get( + "database.server.name" + ) or connector_details.config.get("topic.prefix") + if database_server_name: + inferred_topics = self._infer_cdc_topics_from_server_name( + database_server_name + ) + if inferred_topics: + connector_details.topics = inferred_topics + def get_cluster_info(self) -> Optional[dict]: """ Get the version and other details of the Kafka Connect cluster. + + For Confluent Cloud, the root endpoint is not supported, so we use + the /connectors endpoint to verify authentication and connectivity. """ + if self.is_confluent_cloud: + # Confluent Cloud doesn't support the root endpoint (/) + # Use /connectors to test authentication and connectivity + logger.info( + "Confluent Cloud detected - testing connection via connectors list endpoint" + ) + try: + connectors = self.client.list_connectors() + # Connection successful - return a valid response + logger.info( + f"Confluent Cloud connection successful - found {len(connectors) if connectors else 0} connectors" + ) + return { + "version": "confluent-cloud", + "commit": "managed", + "kafka_cluster_id": "confluent-managed", + } + except Exception as exc: + logger.error(f"Failed to connect to Confluent Cloud: {exc}") + raise + return self.client.get_cluster_info() def get_connectors_list( @@ -122,19 +293,97 @@ class KafkaConnectClient: def get_connector_config(self, connector: str) -> Optional[dict]: """ Get the details of a single connector. + + For Confluent Cloud, the API returns configs as an array of {config, value} objects. + For self-hosted Kafka Connect, it returns a flat config dictionary. + Args: connector (str): The name of the connector. """ try: result = self.client.get_connector(connector=connector) - if result: - return result.get("config") + if not result: + return None + + # Check if this is Confluent Cloud format (array of {config, value}) + if self.is_confluent_cloud and "configs" in result: + # Transform Confluent Cloud format: [{config: "key", value: "val"}] -> {key: val} + configs_array = result.get("configs", []) + if isinstance(configs_array, list): + config_dict = { + item["config"]: item["value"] + for item in configs_array + if isinstance(item, dict) + and "config" in item + and "value" in item + } + return config_dict or None + + # Standard self-hosted Kafka Connect format + return result.get("config") + except Exception as exc: logger.debug(traceback.format_exc()) logger.error(f"Unable to get connector configuration details {exc}") return None + def extract_column_mappings( + self, connector_config: dict + ) -> Optional[List[KafkaConnectColumnMapping]]: + """ + Extract column mappings from connector configuration. + For Debezium and JDBC connectors, columns are typically mapped 1:1 + unless transforms are applied. + + Args: + connector_config: The connector configuration dictionary + + Returns: + List of KafkaConnectColumnMapping objects if mappings can be inferred + """ + if not connector_config or not isinstance(connector_config, dict): + logger.debug("Invalid connector_config: expected dict") + return None + + try: + column_mappings = [] + + # Check for SMT (Single Message Transform) configurations + transforms = connector_config.get("transforms", "") + if not transforms: + return None + + transform_list = [t.strip() for t in transforms.split(",")] + for transform in transform_list: + transform_type = connector_config.get( + f"transforms.{transform}.type", "" + ) + + # ReplaceField transform can rename columns + if "ReplaceField" in transform_type: + renames = connector_config.get( + f"transforms.{transform}.renames", "" + ) + if renames: + for rename in renames.split(","): + if ":" in rename: + source_col, target_col = rename.split(":", 1) + column_mappings.append( + KafkaConnectColumnMapping( + source_column=source_col.strip(), + target_column=target_col.strip(), + ) + ) + + return column_mappings if column_mappings else None + + except (KeyError, AttributeError, ValueError) as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Unable to extract column mappings: {exc}") + + return None + def get_connector_dataset_info( self, connector_config: dict ) -> Optional[KafkaConnectDatasetDetails]: @@ -143,27 +392,36 @@ class KafkaConnectClient: Checks in the connector configurations for dataset fields if any related field is found returns the result Args: - connector (str): The name of the connector. + connector_config: The connector configuration dictionary Returns: - Optional[Dict]: A dictionary containing dataset information - (type, table, database, or bucket_name) - if a dataset is found, or None if the connector - is not found, has no dataset, or an error occurs. + Optional[KafkaConnectDatasetDetails]: Dataset information including + table, database, or container_name if found, or None otherwise """ + if not connector_config or not isinstance(connector_config, dict): + logger.debug("Invalid connector_config: expected dict") + return None + try: - if not connector_config: - return None - result = {} - for dataset in SUPPORTED_DATASETS or []: - for key in SUPPORTED_DATASETS[dataset] or []: + for dataset_type, config_keys in SUPPORTED_DATASETS.items(): + for key in config_keys: if connector_config.get(key): - result[dataset] = connector_config[key] - return KafkaConnectDatasetDetails(**result) + result[dataset_type] = connector_config[key] - except Exception as exc: + # Only create dataset details if we have meaningful dataset information + # For CDC connectors, database.server.name/topic.prefix are captured + # but don't represent actual table names, so skip dataset creation + # We need either: table name OR container name + if result and (result.get("table") or result.get("container_name")): + dataset_details = KafkaConnectDatasetDetails(**result) + dataset_details.column_mappings = ( + self.extract_column_mappings(connector_config) or [] + ) + return dataset_details + + except (KeyError, ValueError, TypeError) as exc: logger.debug(traceback.format_exc()) - logger.warning(f"Unable to get connector dataset details {exc}") + logger.warning(f"Unable to get connector dataset details: {exc}") return None @@ -173,6 +431,9 @@ class KafkaConnectClient: """ Get the list of topics for a connector. + For Confluent Cloud, the /topics endpoint is not supported, so we extract + topics from the connector configuration instead. + Args: connector (str): The name of the connector. @@ -183,15 +444,42 @@ class KafkaConnectClient: or an error occurs. """ try: - result = self.client.list_connector_topics(connector=connector).get( - connector - ) - if result: - topics = [ - KafkaConnectTopics(name=topic) - for topic in result.get("topics") or [] - ] - return topics + if self.is_confluent_cloud: + # Confluent Cloud doesn't support /connectors/{name}/topics endpoint + # Extract topics from connector config instead + config = self.get_connector_config(connector=connector) + if config: + topics = [] + # Check common topic configuration keys + for key in ConnectorConfigKeys.TOPIC_KEYS: + if key in config: + topic_value = config[key] + # Handle single topic or comma-separated list + if isinstance(topic_value, str): + topic_list = [t.strip() for t in topic_value.split(",")] + topics.extend( + [ + KafkaConnectTopics(name=topic) + for topic in topic_list + ] + ) + + if topics: + logger.info( + f"Extracted {len(topics)} topics from Confluent Cloud connector config" + ) + return topics + else: + # Self-hosted Kafka Connect supports /topics endpoint + result = self.client.list_connector_topics(connector=connector).get( + connector + ) + if result: + topics = [ + KafkaConnectTopics(name=topic) + for topic in result.get("topics") or [] + ] + return topics except Exception as exc: logger.debug(traceback.format_exc()) logger.warning(f"Unable to get connector Topics {exc}") diff --git a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/metadata.py index be4cc6751e5..114ef23deae 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/metadata.py @@ -13,7 +13,7 @@ KafkaConnect source to extract metadata from OM UI """ import traceback from datetime import datetime -from typing import Iterable, Optional +from typing import Any, Iterable, List, Optional from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest @@ -42,15 +42,23 @@ from metadata.generated.schema.type.basic import ( SourceUrl, Timestamp, ) -from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails +from metadata.generated.schema.type.entityLineage import ( + ColumnLineage, + EntitiesEdge, + LineageDetails, +) from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException +from metadata.ingestion.lineage.sql_lineage import get_column_fqn from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus from metadata.ingestion.ometa.ometa_api import OpenMetadata, T +from metadata.ingestion.source.pipeline.kafkaconnect.client import parse_cdc_topic_name from metadata.ingestion.source.pipeline.kafkaconnect.models import ( + ConnectorType, KafkaConnectPipelineDetails, + KafkaConnectTopics, ) from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource from metadata.utils import fqn @@ -67,6 +75,22 @@ STATUS_MAP = { "UNASSIGNED": StatusType.Pending.value, } +# CDC envelope field names used for Debezium detection and parsing +CDC_ENVELOPE_FIELDS = {"after", "before", "op"} + + +def get_field_name(field_name: Any) -> str: + """ + Extract string name from FieldName object or string. + + Args: + field_name: FieldName object with .root attribute, or plain string + + Returns: + String representation of the field name + """ + return field_name.root if hasattr(field_name, "root") else str(field_name) + class KafkaconnectSource(PipelineServiceSource): """ @@ -105,9 +129,11 @@ class KafkaconnectSource(PipelineServiceSource): for task in pipeline_details.tasks or [] ], service=self.context.get().pipeline_service, - description=Markdown(pipeline_details.description) - if pipeline_details.description - else None, + description=( + Markdown(pipeline_details.description) + if pipeline_details.description + else None + ), ) yield Either(right=pipeline_request) self.register_record(pipeline_request=pipeline_request) @@ -174,6 +200,431 @@ class KafkaconnectSource(PipelineServiceSource): return None + def _get_entity_column_fqn(self, entity: T, column_name: str) -> Optional[str]: + """ + Get column FQN for any supported entity type. + Dispatch based on entity type. + + Args: + entity: Table or Topic entity + column_name: Column/field name + + Returns: + Fully qualified column name or None + """ + if isinstance(entity, Topic): + return self._get_topic_field_fqn(entity, column_name) + elif isinstance(entity, Table): + return get_column_fqn(table_entity=entity, column=column_name) + else: + logger.warning( + f"Unsupported entity type for column FQN: {type(entity).__name__}" + ) + return None + + def _parse_cdc_schema_columns(self, schema_text: str) -> List[str]: + """ + Parse Debezium CDC schema JSON to extract table column names. + + Looks for columns in 'after' or 'before' fields within the schema, + handling nullable oneOf structures. + + Args: + schema_text: Raw JSON schema string from topic + + Returns: + List of column names, or empty list if parsing fails + """ + try: + import json + + schema_dict = json.loads(schema_text) + + # Look for 'after' or 'before' field in the schema + for field_name in ["after", "before"]: + if field_name not in schema_dict.get("properties", {}): + continue + + field_def = schema_dict["properties"][field_name] + + # Handle oneOf (nullable types) + if "oneOf" not in field_def: + continue + + for option in field_def["oneOf"]: + if isinstance(option, dict) and option.get("type") == "object": + columns = list(option.get("properties", {}).keys()) + logger.debug( + f"Parsed {len(columns)} columns from CDC '{field_name}' field" + ) + return columns + + except Exception as exc: + logger.debug(f"Unable to parse CDC schema text: {exc}") + + return [] + + def _extract_columns_from_entity(self, entity: T) -> List[str]: + """ + Extract column/field names from Table or Topic entity. + + For Debezium CDC topics, extracts columns from the 'after' or 'before' field + which contains the actual table structure, not the CDC envelope fields. + + Args: + entity: Table or Topic entity + + Returns: + List of column/field names + """ + if isinstance(entity, Table): + return [col.name.root for col in entity.columns or []] + + if hasattr(entity, "messageSchema") and entity.messageSchema: + schema_fields = entity.messageSchema.schemaFields or [] + + # Check if this is a Debezium CDC envelope structure + # Can be either flat (top-level: op, before, after) or nested (Envelope -> op, before, after) + field_names = {get_field_name(f.name) for f in schema_fields} + is_debezium_cdc = CDC_ENVELOPE_FIELDS.issubset(field_names) + + # Fallback: Check schemaText for CDC structure if schemaFields doesn't indicate CDC + if not is_debezium_cdc and entity.messageSchema.schemaText: + try: + import json + + schema_dict = json.loads(entity.messageSchema.schemaText) + schema_props = schema_dict.get("properties", {}) + # Check if schemaText has CDC envelope fields + is_debezium_cdc = CDC_ENVELOPE_FIELDS.issubset( + set(schema_props.keys()) + ) + except Exception: + pass + + logger.debug( + f"Topic {get_field_name(entity.name) if hasattr(entity, 'name') else 'unknown'}: field_names={field_names}, is_debezium_cdc={is_debezium_cdc}" + ) + + # Check for nested Debezium CDC structure (single Envelope field with CDC children) + if not is_debezium_cdc and len(schema_fields) == 1: + envelope_field = schema_fields[0] + if envelope_field.children: + envelope_child_names = { + get_field_name(c.name) for c in envelope_field.children + } + is_debezium_cdc = CDC_ENVELOPE_FIELDS.issubset(envelope_child_names) + if is_debezium_cdc: + logger.debug( + f"Nested Debezium CDC envelope detected: {get_field_name(envelope_field.name)}" + ) + schema_fields = ( + envelope_field.children + ) # Use envelope children as schema fields + + if is_debezium_cdc: + # For Debezium CDC, extract columns from the 'after' field (or 'before' as fallback) + # The 'after' field contains the complete record structure after the change + for field in schema_fields: + field_name_str = get_field_name(field.name) + # Prefer 'after' for source connectors (contains new/updated record state) + if field_name_str == "after" and field.children: + columns = [ + get_field_name(child.name) for child in field.children + ] + logger.debug( + f"Debezium CDC: extracted {len(columns)} columns from 'after' field" + ) + return columns + + # Fallback to 'before' if 'after' has no children + for field in schema_fields: + field_name_str = get_field_name(field.name) + if field_name_str == "before" and field.children: + columns = [ + get_field_name(child.name) for child in field.children + ] + logger.debug( + f"Debezium CDC: extracted {len(columns)} columns from 'before' field" + ) + return columns + + # Final fallback: Parse schemaText if after/before don't have children + if entity.messageSchema.schemaText: + columns = self._parse_cdc_schema_columns( + entity.messageSchema.schemaText + ) + if columns: + logger.debug( + f"Debezium CDC: extracted {len(columns)} columns from schemaText" + ) + return columns + + logger.debug( + "Debezium CDC detected but unable to extract columns from after/before fields" + ) + return [] + + # Non-CDC topic: extract all fields + columns = [] + for field in schema_fields: + if field.children: + columns.extend( + [get_field_name(child.name) for child in field.children] + ) + else: + columns.append(get_field_name(field.name)) + return columns + + return [] + + def _get_topic_field_fqn( + self, topic_entity: Topic, field_name: str + ) -> Optional[str]: + """ + Get the fully qualified name for a field in a Topic's schema. + Handles nested structures where fields may be children of a parent RECORD. + For Debezium CDC topics, searches for fields inside after/before envelope children. + """ + if ( + not topic_entity.messageSchema + or not topic_entity.messageSchema.schemaFields + ): + logger.debug( + f"Topic {get_field_name(topic_entity.name)} has no message schema" + ) + return None + + # Search for the field in the schema (including nested fields) + for field in topic_entity.messageSchema.schemaFields: + field_name_str = get_field_name(field.name) + + # Check if it's a direct field + if field_name_str == field_name: + return ( + field.fullyQualifiedName.root if field.fullyQualifiedName else None + ) + + # Check if it's a child field (nested - one level deep) + if field.children: + # For Debezium CDC, prioritize 'after' over 'before' when searching for grandchildren + children_to_search = field.children + after_child = None + before_child = None + + for child in field.children: + child_name = get_field_name(child.name) + if child_name == "after": + after_child = child + elif child_name == "before": + before_child = child + # Check direct child match + if child_name == field_name: + return ( + child.fullyQualifiedName.root + if child.fullyQualifiedName + else None + ) + + # Search grandchildren - prefer 'after' over 'before' for CDC topics + for cdc_child in [after_child, before_child]: + if cdc_child and cdc_child.children: + for grandchild in cdc_child.children: + if get_field_name(grandchild.name) == field_name: + return ( + grandchild.fullyQualifiedName.root + if grandchild.fullyQualifiedName + else None + ) + + # Search other grandchildren (non-CDC fields) + for child in field.children: + if child not in [after_child, before_child] and child.children: + for grandchild in child.children: + if get_field_name(grandchild.name) == field_name: + return ( + grandchild.fullyQualifiedName.root + if grandchild.fullyQualifiedName + else None + ) + + # For Debezium CDC topics, columns might only exist in schemaText (not as field objects) + # Manually construct FQN: topicFQN.Envelope.columnName + for field in topic_entity.messageSchema.schemaFields: + field_name_str = get_field_name(field.name) + # Check if this is a CDC envelope field + if "Envelope" in field_name_str and field.fullyQualifiedName: + # Construct FQN manually for CDC column + envelope_fqn = field.fullyQualifiedName.root + return f"{envelope_fqn}.{field_name}" + + logger.debug( + f"Field {field_name} not found in topic {get_field_name(topic_entity.name)} schema" + ) + return None + + def build_column_lineage( + self, + from_entity: T, + to_entity: T, + topic_entity: Topic, + pipeline_details: KafkaConnectPipelineDetails, + ) -> Optional[List[ColumnLineage]]: + """ + Build column-level lineage between source table, topic, and target table. + For source connectors: Table columns -> Topic schema fields + For sink connectors: Topic schema fields -> Table columns + """ + try: + column_lineages = [] + + # Get column mappings from connector config if available + if pipeline_details.dataset and pipeline_details.dataset.column_mappings: + # Use explicit column mappings from connector config + for mapping in pipeline_details.dataset.column_mappings: + if pipeline_details.conn_type == ConnectorType.SINK.value: + from_col = get_column_fqn( + table_entity=topic_entity, column=mapping.source_column + ) + to_col = get_column_fqn( + table_entity=to_entity, column=mapping.target_column + ) + else: + from_col = get_column_fqn( + table_entity=from_entity, column=mapping.source_column + ) + to_col = get_column_fqn( + table_entity=topic_entity, column=mapping.target_column + ) + + if from_col and to_col: + column_lineages.append( + ColumnLineage( + fromColumns=[from_col], + toColumn=to_col, + function=None, + ) + ) + else: + # Infer 1:1 column mappings based on matching column names + if pipeline_details.conn_type == ConnectorType.SINK.value: + source_entity = topic_entity + target_entity = to_entity + else: + source_entity = from_entity + target_entity = topic_entity + + # Extract columns from both entities + source_columns = self._extract_columns_from_entity(source_entity) + target_columns = self._extract_columns_from_entity(target_entity) + + logger.debug( + f"Column matching for {pipeline_details.name}: " + f"source={len(source_columns)} cols from {source_entity.__class__.__name__}, " + f"target={len(target_columns)} cols from {target_entity.__class__.__name__}" + ) + logger.debug(f"Source columns: {source_columns[:5]}") # First 5 + logger.debug(f"Target columns: {target_columns}") + + # Create lookup dictionary for O(n) performance instead of O(n²) + target_cols_map = {str(col).lower(): col for col in target_columns} + + # Match columns by name (case-insensitive) + for source_col_name in source_columns: + source_key = str(source_col_name).lower() + if source_key in target_cols_map: + target_col_name = target_cols_map[source_key] + logger.debug( + f"Matched column: {source_col_name} -> {target_col_name}" + ) + try: + # Get fully qualified names for source and target columns + from_col = self._get_entity_column_fqn( + source_entity, source_col_name + ) + to_col = self._get_entity_column_fqn( + target_entity, target_col_name + ) + + logger.debug(f"FQNs: from_col={from_col}, to_col={to_col}") + + if from_col and to_col: + column_lineages.append( + ColumnLineage( + fromColumns=[from_col], + toColumn=to_col, + function=None, + ) + ) + logger.debug( + f"Added column lineage: {from_col} -> {to_col}" + ) + except (KeyError, AttributeError) as exc: + logger.debug( + f"Error creating column lineage for {source_col_name} -> {target_col_name}: {exc}" + ) + + if column_lineages: + logger.debug( + f"Created {len(column_lineages)} column lineages for {pipeline_details.name}" + ) + return column_lineages if column_lineages else None + + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Unable to build column lineage: {exc}") + + return None + + def _query_cdc_topics_from_messaging_service( + self, database_server_name: str + ) -> List[KafkaConnectTopics]: + """ + Query topics from messaging service and filter by CDC naming pattern. + + Used for CDC connectors without explicit topic lists - discovers topics + by matching against database.server.name prefix. + + Args: + database_server_name: The database.server.name or topic.prefix from connector config + + Returns: + List of matching CDC topics + """ + topics_found = [] + + try: + logger.debug( + f"CDC connector without topics list - querying messaging service " + f"for pattern: {database_server_name}.*" + ) + + # List topics from the configured messaging service only + topics_list = self.metadata.list_entities( + entity=Topic, + fields=["name", "fullyQualifiedName", "service"], + params={"service": self.service_connection.messagingServiceName}, + ).entities + + # Filter topics that match the CDC naming pattern + for topic_entity in topics_list or []: + topic_name = ( + topic_entity.name.root + if hasattr(topic_entity.name, "root") + else str(topic_entity.name) + ) + + # Parse the topic to see if it's a CDC topic related to this connector + topic_info = parse_cdc_topic_name(topic_name, database_server_name) + if topic_info: + topics_found.append(KafkaConnectTopics(name=topic_name)) + logger.debug(f"Matched CDC topic: {topic_name} -> {topic_info}") + + except Exception as exc: + logger.debug(f"Unable to query topics from messaging service: {exc}") + + return topics_found + def yield_pipeline_lineage_details( self, pipeline_details: KafkaConnectPipelineDetails ) -> Iterable[Either[AddLineageRequest]]: @@ -196,14 +647,30 @@ class KafkaconnectSource(PipelineServiceSource): entity=Pipeline, fqn=pipeline_fqn ) - lineage_details = LineageDetails( - pipeline=EntityReference(id=pipeline_entity.id.root, type="pipeline"), - source=LineageSource.PipelineLineage, - ) - dataset_entity = self.get_dataset_entity(pipeline_details=pipeline_details) - for topic in pipeline_details.topics or []: + # Get database.server.name or topic.prefix for CDC topic parsing + # These are ONLY set by Debezium CDC connectors + database_server_name = None + if pipeline_details.config: + database_server_name = pipeline_details.config.get( + "database.server.name" + ) or pipeline_details.config.get("topic.prefix") + + # For CDC connectors without explicit topics, query topics from messaging service + # and filter by CDC naming pattern + # Only do this for Debezium CDC connectors (identified by database.server.name or topic.prefix) + topics_to_process = pipeline_details.topics or [] + if ( + not topics_to_process + and database_server_name + and pipeline_details.conn_type == ConnectorType.SOURCE.value + ): + topics_to_process = self._query_cdc_topics_from_messaging_service( + database_server_name + ) + + for topic in topics_to_process: topic_fqn = fqn.build( metadata=self.metadata, entity_type=Topic, @@ -213,13 +680,76 @@ class KafkaconnectSource(PipelineServiceSource): topic_entity = self.metadata.get_by_name(entity=Topic, fqn=topic_fqn) - if topic_entity is None or dataset_entity is None: + if topic_entity is None: continue - if pipeline_details.conn_type.lower() == "sink": - from_entity, to_entity = topic_entity, dataset_entity + # If no dataset entity from config, try to parse table info from CDC topic name + current_dataset_entity = dataset_entity + if ( + current_dataset_entity is None + and pipeline_details.conn_type == ConnectorType.SOURCE.value + ): + # Parse CDC topic name to extract table information + topic_info = parse_cdc_topic_name( + str(topic.name), database_server_name + ) + if topic_info.get("database") and topic_info.get("table"): + logger.debug( + f"Parsed CDC topic {topic.name}: database={topic_info['database']}, table={topic_info['table']}" + ) + # Try to find the table entity + for ( + dbservicename + ) in self.source_config.lineageInformation.dbServiceNames or [ + "*" + ]: + table_fqn = fqn.build( + metadata=self.metadata, + entity_type=Table, + table_name=topic_info["table"], + database_name=None, + schema_name=topic_info["database"], + service_name=dbservicename, + ) + current_dataset_entity = self.metadata.get_by_name( + entity=Table, fqn=table_fqn + ) + if current_dataset_entity: + logger.debug(f"Found table entity: {table_fqn}") + break + + if current_dataset_entity is None: + # No table entity found, skip this topic + continue + + if pipeline_details.conn_type == ConnectorType.SINK.value: + from_entity, to_entity = topic_entity, current_dataset_entity else: - from_entity, to_entity = dataset_entity, topic_entity + from_entity, to_entity = current_dataset_entity, topic_entity + + # Build column-level lineage (best effort - don't fail entity-level lineage) + column_lineage = None + try: + column_lineage = self.build_column_lineage( + from_entity=from_entity, + to_entity=to_entity, + topic_entity=topic_entity, + pipeline_details=pipeline_details, + ) + except Exception as exc: + logger.warning( + f"Failed to build column-level lineage for {pipeline_details.name}: {exc}. " + "Entity-level lineage will still be created." + ) + logger.debug(traceback.format_exc()) + + lineage_details = LineageDetails( + pipeline=EntityReference( + id=pipeline_entity.id.root, type="pipeline" + ), + source=LineageSource.PipelineLineage, + columnsLineage=column_lineage, + ) yield Either( right=AddLineageRequest( @@ -291,7 +821,7 @@ class KafkaconnectSource(PipelineServiceSource): pipeline_details.status, StatusType.Pending ), taskStatus=task_status, - timestamp=Timestamp(datetime_to_ts(datetime.now())) + timestamp=Timestamp(datetime_to_ts(datetime.now())), # Kafka connect doesn't provide any details with exec time ) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/models.py b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/models.py index 6d03d59deae..44bad0f66d2 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/models.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/models.py @@ -13,14 +13,23 @@ KafkaConnect Source Model module """ +from enum import Enum from typing import List, Optional, Type, Union -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, field_validator from metadata.generated.schema.entity.data.container import Container from metadata.generated.schema.entity.data.table import Table +class ConnectorType(str, Enum): + """Kafka Connect connector types""" + + SOURCE = "source" + SINK = "sink" + UNKNOWN = "UNKNOWN" + + class KafkaConnectTasks(BaseModel): id: int = Field(..., description="ID of the task") state: Optional[str] = Field( @@ -35,10 +44,20 @@ class KafkaConnectTopics(BaseModel): name: str = Field(..., description="Name of the topic (e.g., random-source-avro)") +class KafkaConnectColumnMapping(BaseModel): + """Model for column-level mapping between source and target""" + + source_column: str = Field(..., description="Source column name") + target_column: str = Field(..., description="Target column/field name") + + class KafkaConnectDatasetDetails(BaseModel): table: Optional[str] = None database: Optional[str] = None container_name: Optional[str] = None + column_mappings: List[KafkaConnectColumnMapping] = Field( + default_factory=list, description="Column-level mappings if available" + ) @property def dataset_type(self) -> Optional[Type[Union[Table, Container]]]: @@ -63,3 +82,15 @@ class KafkaConnectPipelineDetails(BaseModel): description: Optional[str] = None dataset: Optional[KafkaConnectDatasetDetails] = None config: Optional[dict] = Field(default_factory=dict) + + @field_validator("conn_type", mode="before") + @classmethod + def normalize_connector_type(cls, value: str) -> str: + """Normalize connector type to enum value""" + if value: + value_lower = value.lower() + if value_lower == "source": + return ConnectorType.SOURCE.value + elif value_lower == "sink": + return ConnectorType.SINK.value + return ConnectorType.UNKNOWN.value diff --git a/ingestion/src/metadata/parsers/json_schema_parser.py b/ingestion/src/metadata/parsers/json_schema_parser.py index 3313bf21a8a..23fa686cb67 100644 --- a/ingestion/src/metadata/parsers/json_schema_parser.py +++ b/ingestion/src/metadata/parsers/json_schema_parser.py @@ -68,9 +68,49 @@ def parse_json_schema( def get_child_models(key, value, field_models, cls: Type[BaseModel] = FieldModel): """ - Method to parse the child objects in the json schema + Method to parse the child objects in the json schema. + Handles oneOf union types (e.g., Debezium CDC nullable fields). """ try: + # Handle oneOf union types (e.g., [{"type": "null"}, {"type": "object", "properties": {...}}]) + # Common in Debezium CDC schemas for nullable fields like "after" and "before" + if "oneOf" in value and isinstance(value["oneOf"], list): + # Find the non-null object schema in the union + object_schema = None + for option in value["oneOf"]: + if ( + isinstance(option, dict) + and option.get("type") == JsonSchemaDataTypes.RECORD.value + ): + object_schema = option + break + + if object_schema: + # Use the object schema's properties and metadata + cls_obj = cls( + name=key, + displayName=value.get("title") or object_schema.get("title"), + dataType=JsonSchemaDataTypes.RECORD.name, + description=value.get("description") + or object_schema.get("description"), + ) + children = get_json_schema_fields( + object_schema.get("properties", {}), cls=cls + ) + cls_obj.children = children + field_models.append(cls_obj) + return + # If no object found in oneOf, treat as UNKNOWN + cls_obj = cls( + name=key, + displayName=value.get("title"), + dataType=JsonSchemaDataTypes.UNKNOWN.name, + description=value.get("description"), + ) + field_models.append(cls_obj) + return + + # Standard type handling cls_obj = cls( name=key, displayName=value.get("title"), diff --git a/ingestion/tests/unit/topology/pipeline/test_kafkaconnect.py b/ingestion/tests/unit/topology/pipeline/test_kafkaconnect.py index 233283e0388..4d7a87216cd 100644 --- a/ingestion/tests/unit/topology/pipeline/test_kafkaconnect.py +++ b/ingestion/tests/unit/topology/pipeline/test_kafkaconnect.py @@ -13,13 +13,14 @@ Test KafkaConnect client and models """ from unittest import TestCase -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock, Mock, patch from metadata.generated.schema.entity.services.connections.pipeline.kafkaConnectConnection import ( KafkaConnectConnection, ) from metadata.ingestion.source.pipeline.kafkaconnect.client import KafkaConnectClient from metadata.ingestion.source.pipeline.kafkaconnect.models import ( + KafkaConnectColumnMapping, KafkaConnectDatasetDetails, KafkaConnectPipelineDetails, KafkaConnectTasks, @@ -273,18 +274,15 @@ class TestKafkaConnectClient(TestCase): client = KafkaConnectClient(self.mock_config) # Test various supported dataset configurations + # Note: Database-only configs return None (need table or container) test_configs = [ - # Table configurations + # Table configurations (have table name) ({"table": "users", "database": "mydb"}, "table", "users"), ({"collection": "users"}, "table", "users"), ({"snowflake.schema.name": "schema1"}, "table", "schema1"), ({"table.whitelist": "table1,table2"}, "table", "table1,table2"), ({"fields.whitelist": "field1,field2"}, "table", "field1,field2"), - # Database configurations - ({"database": "mydb"}, "database", "mydb"), - ({"db.name": "testdb"}, "database", "testdb"), - ({"snowflake.database.name": "snowdb"}, "database", "snowdb"), - # Container configurations + # Container configurations (have container name) ({"s3.bucket.name": "my-bucket"}, "container_name", "my-bucket"), ] @@ -298,3 +296,877 @@ class TestKafkaConnectClient(TestCase): expected_value, f"Expected {expected_field}={expected_value}, got {actual_value}", ) + + +class TestConfluentCloudSupport(TestCase): + """Test Confluent Cloud specific functionality""" + + def test_confluent_cloud_detection(self): + """Test that Confluent Cloud URLs are detected correctly""" + confluent_config = Mock(spec=KafkaConnectConnection) + confluent_config.hostPort = "https://api.confluent.cloud/connect/v1/environments/env-123/clusters/lkc-456" + confluent_config.verifySSL = True + confluent_config.KafkaConnectConfig = None + + client = KafkaConnectClient(confluent_config) + self.assertTrue(client.is_confluent_cloud) + + def test_self_hosted_detection(self): + """Test that self-hosted Kafka Connect is detected correctly""" + self_hosted_config = Mock(spec=KafkaConnectConnection) + self_hosted_config.hostPort = "http://localhost:8083" + self_hosted_config.verifySSL = False + self_hosted_config.KafkaConnectConfig = None + + client = KafkaConnectClient(self_hosted_config) + self.assertFalse(client.is_confluent_cloud) + + def test_confluent_cloud_get_cluster_info(self): + """Test that get_cluster_info works for Confluent Cloud""" + confluent_config = Mock(spec=KafkaConnectConnection) + confluent_config.hostPort = "https://api.confluent.cloud/connect/v1/environments/env-123/clusters/lkc-456" + confluent_config.verifySSL = True + confluent_config.KafkaConnectConfig = None + + client = KafkaConnectClient(confluent_config) + client.client.list_connectors = Mock(return_value=["connector1", "connector2"]) + + result = client.get_cluster_info() + self.assertIsNotNone(result) + self.assertEqual(result["version"], "confluent-cloud") + self.assertEqual(result["kafka_cluster_id"], "confluent-managed") + + def test_confluent_cloud_get_connector_topics_from_config(self): + """Test extracting topics from Confluent Cloud connector config""" + confluent_config = Mock(spec=KafkaConnectConnection) + confluent_config.hostPort = "https://api.confluent.cloud/connect/v1/environments/env-123/clusters/lkc-456" + confluent_config.verifySSL = True + confluent_config.KafkaConnectConfig = None + + client = KafkaConnectClient(confluent_config) + client.get_connector_config = Mock(return_value={"kafka.topic": "orders_topic"}) + + topics = client.get_connector_topics("test-connector") + self.assertIsNotNone(topics) + self.assertEqual(len(topics), 1) + self.assertEqual(topics[0].name, "orders_topic") + + def test_confluent_cloud_get_connector_topics_multiple(self): + """Test extracting multiple topics from Confluent Cloud connector config""" + confluent_config = Mock(spec=KafkaConnectConnection) + confluent_config.hostPort = "https://api.confluent.cloud/connect/v1/environments/env-123/clusters/lkc-456" + confluent_config.verifySSL = True + confluent_config.KafkaConnectConfig = None + + client = KafkaConnectClient(confluent_config) + client.get_connector_config = Mock( + return_value={"topics": "topic1,topic2,topic3"} + ) + + topics = client.get_connector_topics("test-connector") + self.assertIsNotNone(topics) + self.assertEqual(len(topics), 3) + self.assertEqual(topics[0].name, "topic1") + self.assertEqual(topics[1].name, "topic2") + self.assertEqual(topics[2].name, "topic3") + + def test_confluent_cloud_database_include_list(self): + """Test that database-only config returns None (needs table name)""" + config = {"database.include.list": "mydb"} + + client_config = Mock(spec=KafkaConnectConnection) + client_config.hostPort = "http://localhost:8083" + client_config.verifySSL = False + client_config.KafkaConnectConfig = None + + client = KafkaConnectClient(client_config) + result = client.get_connector_dataset_info(config) + + # Should return None - database alone is not enough + self.assertIsNone(result) + + def test_confluent_cloud_table_include_list(self): + """Test extracting table from Confluent Cloud table.include.list field""" + config = {"table.include.list": "mydb.customers,mydb.orders"} + + client_config = Mock(spec=KafkaConnectConnection) + client_config.hostPort = "http://localhost:8083" + client_config.verifySSL = False + client_config.KafkaConnectConfig = None + + client = KafkaConnectClient(client_config) + result = client.get_connector_dataset_info(config) + + self.assertIsNotNone(result) + self.assertEqual(result.table, "mydb.customers,mydb.orders") + + def test_confluent_cloud_database_hostname(self): + """Test that database-only config returns None (needs table name)""" + config = {"database.hostname": "mysql.example.com"} + + client_config = Mock(spec=KafkaConnectConnection) + client_config.hostPort = "http://localhost:8083" + client_config.verifySSL = False + client_config.KafkaConnectConfig = None + + client = KafkaConnectClient(client_config) + result = client.get_connector_dataset_info(config) + + # Should return None - database alone is not enough for table lineage + self.assertIsNone(result) + + def test_debezium_postgres_database_dbname(self): + """Test that database-only config returns None (needs table name)""" + config = {"database.dbname": "postgres"} + + client_config = Mock(spec=KafkaConnectConnection) + client_config.hostPort = "http://localhost:8083" + client_config.verifySSL = False + client_config.KafkaConnectConfig = None + + client = KafkaConnectClient(client_config) + result = client.get_connector_dataset_info(config) + + # Should return None - database alone is not enough + self.assertIsNone(result) + + def test_debezium_topic_prefix(self): + """Test that database-only config returns None (needs table name)""" + config = {"topic.prefix": "dbserver1"} + + client_config = Mock(spec=KafkaConnectConnection) + client_config.hostPort = "http://localhost:8083" + client_config.verifySSL = False + client_config.KafkaConnectConfig = None + + client = KafkaConnectClient(client_config) + result = client.get_connector_dataset_info(config) + + # Should return None - tables discovered via topic parsing for CDC + self.assertIsNone(result) + + def test_mysql_cdc_databases_include(self): + """Test that database-only config returns None (needs table name)""" + config = {"databases.include": "mydb1,mydb2"} + + client_config = Mock(spec=KafkaConnectConnection) + client_config.hostPort = "http://localhost:8083" + client_config.verifySSL = False + client_config.KafkaConnectConfig = None + + client = KafkaConnectClient(client_config) + result = client.get_connector_dataset_info(config) + + # Should return None - CDC uses topic parsing + self.assertIsNone(result) + + def test_mysql_cdc_tables_include(self): + """Test extracting tables from MySQL CDC V2 tables.include field""" + config = {"tables.include": "db1.users,db1.orders"} + + client_config = Mock(spec=KafkaConnectConnection) + client_config.hostPort = "http://localhost:8083" + client_config.verifySSL = False + client_config.KafkaConnectConfig = None + + client = KafkaConnectClient(client_config) + result = client.get_connector_dataset_info(config) + + self.assertIsNotNone(result) + self.assertEqual(result.table, "db1.users,db1.orders") + + def test_snowflake_database_field(self): + """Test that database-only config returns None (needs table name)""" + config = {"snowflake.database": "ANALYTICS_DB"} + + client_config = Mock(spec=KafkaConnectConnection) + client_config.hostPort = "http://localhost:8083" + client_config.verifySSL = False + client_config.KafkaConnectConfig = None + + client = KafkaConnectClient(client_config) + result = client.get_connector_dataset_info(config) + + # Should return None - database alone is not enough + self.assertIsNone(result) + + def test_snowflake_schema_field(self): + """Test extracting table/schema from Snowflake snowflake.schema field""" + config = {"snowflake.schema": "PUBLIC"} + + client_config = Mock(spec=KafkaConnectConnection) + client_config.hostPort = "http://localhost:8083" + client_config.verifySSL = False + client_config.KafkaConnectConfig = None + + client = KafkaConnectClient(client_config) + result = client.get_connector_dataset_info(config) + + self.assertIsNotNone(result) + self.assertEqual(result.table, "PUBLIC") + + def test_sql_server_database_names(self): + """Test that database-only config returns None (needs table name)""" + config = {"database.names": "AdventureWorks,Northwind"} + + client_config = Mock(spec=KafkaConnectConnection) + client_config.hostPort = "http://localhost:8083" + client_config.verifySSL = False + client_config.KafkaConnectConfig = None + + client = KafkaConnectClient(client_config) + result = client.get_connector_dataset_info(config) + + # Should return None - database alone is not enough + self.assertIsNone(result) + + def test_s3_bucket_field(self): + """Test extracting bucket from S3 s3.bucket field""" + config = {"s3.bucket": "my-data-lake"} + + client_config = Mock(spec=KafkaConnectConnection) + client_config.hostPort = "http://localhost:8083" + client_config.verifySSL = False + client_config.KafkaConnectConfig = None + + client = KafkaConnectClient(client_config) + result = client.get_connector_dataset_info(config) + + self.assertIsNotNone(result) + self.assertEqual(result.container_name, "my-data-lake") + + def test_gcs_bucket_field(self): + """Test extracting bucket from GCS gcs.bucket.name field""" + config = {"gcs.bucket.name": "my-gcs-bucket"} + + client_config = Mock(spec=KafkaConnectConnection) + client_config.hostPort = "http://localhost:8083" + client_config.verifySSL = False + client_config.KafkaConnectConfig = None + + client = KafkaConnectClient(client_config) + result = client.get_connector_dataset_info(config) + + self.assertIsNotNone(result) + self.assertEqual(result.container_name, "my-gcs-bucket") + + def test_postgres_sink_connection_host(self): + """Test that database-only config returns None (needs table name)""" + config = {"connection.host": "postgres.example.com"} + + client_config = Mock(spec=KafkaConnectConnection) + client_config.hostPort = "http://localhost:8083" + client_config.verifySSL = False + client_config.KafkaConnectConfig = None + + client = KafkaConnectClient(client_config) + result = client.get_connector_dataset_info(config) + + # Should return None - database alone is not enough + self.assertIsNone(result) + + def test_sink_fields_included(self): + """Test extracting fields from Sink connector fields.included field""" + config = {"fields.included": "id,name,email,created_at"} + + client_config = Mock(spec=KafkaConnectConnection) + client_config.hostPort = "http://localhost:8083" + client_config.verifySSL = False + client_config.KafkaConnectConfig = None + + client = KafkaConnectClient(client_config) + result = client.get_connector_dataset_info(config) + + self.assertIsNotNone(result) + self.assertEqual(result.table, "id,name,email,created_at") + + def test_debezium_mysql_database_exclude_list(self): + """Test that database-only config returns None (needs table name)""" + config = {"database.exclude.list": "test,temp"} + + client_config = Mock(spec=KafkaConnectConnection) + client_config.hostPort = "http://localhost:8083" + client_config.verifySSL = False + client_config.KafkaConnectConfig = None + + client = KafkaConnectClient(client_config) + result = client.get_connector_dataset_info(config) + + # Should return None - database alone is not enough + self.assertIsNone(result) + + def test_debezium_v1_database_server_name(self): + """Test that CDC connectors with only database.server.name return None + + CDC connectors don't have explicit table configs - tables are discovered + via topic name parsing instead. + """ + config = {"database.server.name": "mysql-server-1"} + + client_config = Mock(spec=KafkaConnectConnection) + client_config.hostPort = "http://localhost:8083" + client_config.verifySSL = False + client_config.KafkaConnectConfig = None + + client = KafkaConnectClient(client_config) + result = client.get_connector_dataset_info(config) + + # Should return None because CDC connectors don't have explicit table names + # Tables are discovered via topic parsing instead + self.assertIsNone(result) + + def test_debezium_v2_topic_prefix(self): + """Test that CDC connectors with only topic.prefix return None + + CDC connectors don't have explicit table configs - tables are discovered + via topic name parsing instead. + """ + config = {"topic.prefix": "postgres-server-1"} + + client_config = Mock(spec=KafkaConnectConnection) + client_config.hostPort = "http://localhost:8083" + client_config.verifySSL = False + client_config.KafkaConnectConfig = None + + client = KafkaConnectClient(client_config) + result = client.get_connector_dataset_info(config) + + # Should return None - tables discovered via topic parsing + self.assertIsNone(result) + + +class TestKafkaConnectColumnLineage(TestCase): + """Test KafkaConnect column-level lineage functionality""" + + def test_column_mapping_model(self): + """Test KafkaConnectColumnMapping model creation""" + mapping = KafkaConnectColumnMapping(source_column="id", target_column="user_id") + self.assertEqual(mapping.source_column, "id") + self.assertEqual(mapping.target_column, "user_id") + + def test_dataset_details_with_column_mappings(self): + """Test KafkaConnectDatasetDetails with column mappings""" + mappings = [ + KafkaConnectColumnMapping(source_column="id", target_column="user_id"), + KafkaConnectColumnMapping(source_column="name", target_column="full_name"), + ] + dataset = KafkaConnectDatasetDetails( + table="users", database="mydb", column_mappings=mappings + ) + + self.assertEqual(len(dataset.column_mappings), 2) + self.assertEqual(dataset.column_mappings[0].source_column, "id") + self.assertEqual(dataset.column_mappings[0].target_column, "user_id") + + def test_dataset_details_column_mappings_default(self): + """Test KafkaConnectDatasetDetails column_mappings defaults to empty list""" + dataset = KafkaConnectDatasetDetails(table="users") + self.assertEqual(dataset.column_mappings, []) + + def test_extract_column_mappings_with_smt_renames(self): + """Test extract_column_mappings with SMT ReplaceField transform""" + with patch( + "metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect" + ): + mock_config = MagicMock(spec=KafkaConnectConnection) + mock_config.hostPort = "http://localhost:8083" + mock_config.verifySSL = True + mock_config.KafkaConnectConfig = None + + client = KafkaConnectClient(mock_config) + + config = { + "transforms": "rename", + "transforms.rename.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", + "transforms.rename.renames": "id:user_id,name:full_name", + } + + result = client.extract_column_mappings(config) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 2) + self.assertEqual(result[0].source_column, "id") + self.assertEqual(result[0].target_column, "user_id") + self.assertEqual(result[1].source_column, "name") + self.assertEqual(result[1].target_column, "full_name") + + def test_extract_column_mappings_no_transforms(self): + """Test extract_column_mappings with no transforms""" + with patch( + "metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect" + ): + mock_config = MagicMock(spec=KafkaConnectConnection) + mock_config.hostPort = "http://localhost:8083" + mock_config.verifySSL = True + mock_config.KafkaConnectConfig = None + + client = KafkaConnectClient(mock_config) + config = {"some.config": "value"} + + result = client.extract_column_mappings(config) + + self.assertIsNone(result) + + def test_extract_column_mappings_transform_without_renames(self): + """Test extract_column_mappings with transform but no renames""" + with patch( + "metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect" + ): + mock_config = MagicMock(spec=KafkaConnectConnection) + mock_config.hostPort = "http://localhost:8083" + mock_config.verifySSL = True + mock_config.KafkaConnectConfig = None + + client = KafkaConnectClient(mock_config) + + config = { + "transforms": "mask", + "transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value", + } + + result = client.extract_column_mappings(config) + + self.assertIsNone(result) + + def test_extract_column_mappings_multiple_transforms(self): + """Test extract_column_mappings with multiple transforms""" + with patch( + "metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect" + ): + mock_config = MagicMock(spec=KafkaConnectConnection) + mock_config.hostPort = "http://localhost:8083" + mock_config.verifySSL = True + mock_config.KafkaConnectConfig = None + + client = KafkaConnectClient(mock_config) + + config = { + "transforms": "rename,mask", + "transforms.rename.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", + "transforms.rename.renames": "id:user_id", + "transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value", + } + + result = client.extract_column_mappings(config) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 1) + self.assertEqual(result[0].source_column, "id") + self.assertEqual(result[0].target_column, "user_id") + + def test_get_connector_dataset_info_includes_column_mappings(self): + """Test get_connector_dataset_info includes column mappings""" + with patch( + "metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect" + ): + mock_config = MagicMock(spec=KafkaConnectConnection) + mock_config.hostPort = "http://localhost:8083" + mock_config.verifySSL = True + mock_config.KafkaConnectConfig = None + + client = KafkaConnectClient(mock_config) + + config = { + "table": "users", + "database": "mydb", + "transforms": "rename", + "transforms.rename.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", + "transforms.rename.renames": "id:user_id", + } + + result = client.get_connector_dataset_info(config) + + self.assertIsNotNone(result) + self.assertEqual(result.table, "users") + self.assertIsNotNone(result.column_mappings) + self.assertEqual(len(result.column_mappings), 1) + self.assertEqual(result.column_mappings[0].source_column, "id") + self.assertEqual(result.column_mappings[0].target_column, "user_id") + + def test_column_lineage_failure_gracefully_handled(self): + """Test that column lineage building handles errors gracefully""" + from metadata.generated.schema.entity.data.table import Table + from metadata.generated.schema.entity.data.topic import Topic + from metadata.ingestion.source.pipeline.kafkaconnect.metadata import ( + KafkaconnectSource, + ) + + with patch( + "metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect" + ): + # Create a minimal source instance + mock_config = MagicMock(spec=KafkaConnectConnection) + mock_config.hostPort = "http://localhost:8083" + mock_config.verifySSL = True + mock_config.KafkaConnectConfig = None + mock_config.messagingServiceName = "test_kafka" + + mock_metadata = Mock() + + # Create source with minimal setup - we're only testing build_column_lineage + source = Mock(spec=KafkaconnectSource) + source._get_topic_field_fqn = ( + KafkaconnectSource._get_topic_field_fqn.__get__( + source, KafkaconnectSource + ) + ) + source.build_column_lineage = ( + KafkaconnectSource.build_column_lineage.__get__( + source, KafkaconnectSource + ) + ) + + # Create mock entities + mock_table_entity = Mock(spec=Table) + mock_table_entity.columns = [] + + mock_topic_entity = Mock(spec=Topic) + mock_topic_name = Mock() + mock_topic_name.root = "test-topic" + mock_topic_entity.name = mock_topic_name + # Missing messageSchema will cause column lineage to return None + mock_topic_entity.messageSchema = None + + pipeline_details = KafkaConnectPipelineDetails( + name="test-connector", + status="RUNNING", + conn_type="source", + ) + + # Test column lineage build - should return None gracefully without raising + result = source.build_column_lineage( + from_entity=mock_table_entity, + to_entity=mock_topic_entity, + topic_entity=mock_topic_entity, + pipeline_details=pipeline_details, + ) + + # Should return None when no column lineage can be built + self.assertIsNone(result) + + +class TestCDCTopicParsing(TestCase): + """Test CDC topic name parsing functionality""" + + def test_parse_cdc_topic_three_parts_standard(self): + """Test parsing CDC topic with 3 parts: {server}.{database}.{table}""" + from metadata.ingestion.source.pipeline.kafkaconnect.client import ( + parse_cdc_topic_name, + ) + + result = parse_cdc_topic_name("MysqlKafkaV2.ecommerce.orders", "MysqlKafkaV2") + self.assertEqual(result, {"database": "ecommerce", "table": "orders"}) + + def test_parse_cdc_topic_three_parts_postgres(self): + """Test parsing PostgreSQL CDC topic with schema.database.table""" + from metadata.ingestion.source.pipeline.kafkaconnect.client import ( + parse_cdc_topic_name, + ) + + result = parse_cdc_topic_name( + "PostgresKafkaCDC.public.orders", "PostgresKafkaCDC" + ) + self.assertEqual(result, {"database": "public", "table": "orders"}) + + def test_parse_cdc_topic_two_parts(self): + """Test parsing CDC topic with 2 parts: {database}.{table}""" + from metadata.ingestion.source.pipeline.kafkaconnect.client import ( + parse_cdc_topic_name, + ) + + result = parse_cdc_topic_name("ecommerce.customers") + self.assertEqual(result, {"database": "ecommerce", "table": "customers"}) + + def test_parse_cdc_topic_single_part_with_server_name(self): + """Test parsing CDC topic with 1 part when server name is provided""" + from metadata.ingestion.source.pipeline.kafkaconnect.client import ( + parse_cdc_topic_name, + ) + + result = parse_cdc_topic_name("orders", "MysqlKafkaV2") + self.assertEqual(result, {"database": "MysqlKafkaV2", "table": "orders"}) + + def test_parse_cdc_topic_single_part_without_server_name(self): + """Test parsing CDC topic with 1 part without server name returns empty""" + from metadata.ingestion.source.pipeline.kafkaconnect.client import ( + parse_cdc_topic_name, + ) + + result = parse_cdc_topic_name("orders") + self.assertEqual(result, {}) + + def test_parse_cdc_topic_skip_internal_topics(self): + """Test that internal topics are skipped""" + from metadata.ingestion.source.pipeline.kafkaconnect.client import ( + parse_cdc_topic_name, + ) + + # Kafka internal topics + self.assertEqual(parse_cdc_topic_name("_schemas"), {}) + self.assertEqual(parse_cdc_topic_name("__consumer_offsets"), {}) + self.assertEqual(parse_cdc_topic_name("dbhistory.mysql"), {}) + + def test_parse_cdc_topic_empty_input(self): + """Test parsing empty topic name returns empty dict""" + from metadata.ingestion.source.pipeline.kafkaconnect.client import ( + parse_cdc_topic_name, + ) + + result = parse_cdc_topic_name("") + self.assertEqual(result, {}) + + result = parse_cdc_topic_name(None) + self.assertEqual(result, {}) + + def test_parse_cdc_topic_case_insensitive_server_match(self): + """Test that server name matching is case-insensitive""" + from metadata.ingestion.source.pipeline.kafkaconnect.client import ( + parse_cdc_topic_name, + ) + + # Three part topic with different case + result = parse_cdc_topic_name("mysqlkafkav2.ecommerce.orders", "MysqlKafkaV2") + self.assertEqual(result, {"database": "ecommerce", "table": "orders"}) + + # Two part topic with different case + result = parse_cdc_topic_name("mysqlkafkav2.orders", "MysqlKafkaV2") + self.assertEqual(result, {"database": "mysqlkafkav2", "table": "orders"}) + + def test_parse_cdc_topic_sql_server_pattern(self): + """Test parsing SQL Server CDC topic patterns""" + from metadata.ingestion.source.pipeline.kafkaconnect.client import ( + parse_cdc_topic_name, + ) + + # SQL Server typically uses dbo schema + result = parse_cdc_topic_name("SqlServerCDC.dbo.users", "SqlServerCDC") + self.assertEqual(result, {"database": "dbo", "table": "users"}) + + def test_parse_cdc_topic_mongodb_pattern(self): + """Test parsing MongoDB CDC topic patterns""" + from metadata.ingestion.source.pipeline.kafkaconnect.client import ( + parse_cdc_topic_name, + ) + + # MongoDB uses database.collection + result = parse_cdc_topic_name("MongoCDC.mydb.users", "MongoCDC") + self.assertEqual(result, {"database": "mydb", "table": "users"}) + + +class TestKafkaConnectCDCColumnExtraction(TestCase): + """Test CDC column extraction from Debezium schema""" + + def _create_mock_source(self): + """Helper to create a minimal mock source for testing""" + from metadata.ingestion.source.pipeline.kafkaconnect.metadata import ( + KafkaconnectSource, + ) + + # Create a mock source that bypasses __init__ + source = object.__new__(KafkaconnectSource) + return source + + def setUp(self): + """Set up test fixtures""" + # Create a mock Debezium CDC topic with nested envelope structure + self.cdc_topic = MagicMock() + self.cdc_topic.name = "MysqlKafkaV2.ecommerce.orders" + self.cdc_topic.fullyQualifiedName.root = ( + 'KafkaProd."MysqlKafkaV2.ecommerce.orders"' + ) + + # Mock message schema with CDC structure + self.cdc_topic.messageSchema = MagicMock() + self.cdc_topic.messageSchema.schemaText = '{"type":"object","title":"MysqlKafkaV2.ecommerce.orders.Envelope","properties":{"op":{"type":"string"},"before":{"oneOf":[{"type":"null"},{"type":"object","properties":{"id":{"type":"integer"},"order_number":{"type":"string"},"customer_name":{"type":"string"}}}]},"after":{"oneOf":[{"type":"null"},{"type":"object","properties":{"id":{"type":"integer"},"order_number":{"type":"string"},"customer_name":{"type":"string"},"customer_email":{"type":"string"},"product_name":{"type":"string"}}}]},"source":{"type":"object","properties":{"version":{"type":"string"}}},"ts_ms":{"type":"integer"}}}' + + # Mock schema fields - single envelope with CDC children + # Use a helper function to create field names with root attribute + def create_field_name(name_str): + name_obj = MagicMock() + name_obj.root = name_str + return name_obj + + envelope_field = MagicMock() + envelope_field.name = create_field_name( + "MysqlKafkaV2.ecommerce.orders.Envelope" + ) + envelope_field.fullyQualifiedName.root = 'KafkaProd."MysqlKafkaV2.ecommerce.orders".MysqlKafkaV2.ecommerce.orders.Envelope' + + # CDC envelope children + op_field = MagicMock() + op_field.name = create_field_name("op") + op_field.children = None + op_field.fullyQualifiedName.root = 'KafkaProd."MysqlKafkaV2.ecommerce.orders".MysqlKafkaV2.ecommerce.orders.Envelope.op' + + before_field = MagicMock() + before_field.name = create_field_name("before") + before_field.children = None + before_field.fullyQualifiedName.root = 'KafkaProd."MysqlKafkaV2.ecommerce.orders".MysqlKafkaV2.ecommerce.orders.Envelope.before' + + after_field = MagicMock() + after_field.name = create_field_name("after") + after_field.children = None + after_field.fullyQualifiedName.root = 'KafkaProd."MysqlKafkaV2.ecommerce.orders".MysqlKafkaV2.ecommerce.orders.Envelope.after' + + source_field = MagicMock() + source_field.name = create_field_name("source") + source_field.children = [MagicMock()] # Has children (version field) + + ts_field = MagicMock() + ts_field.name = create_field_name("ts_ms") + ts_field.children = None + + envelope_field.children = [ + op_field, + before_field, + after_field, + source_field, + ts_field, + ] + + self.cdc_topic.messageSchema.schemaFields = [envelope_field] + + def test_extract_columns_from_cdc_topic(self): + """Test extracting columns from Debezium CDC topic schema text""" + source = self._create_mock_source() + + # Extract columns from CDC topic + columns = source._extract_columns_from_entity(self.cdc_topic) + + # Should extract columns from 'after' field in schema text + self.assertIsNotNone(columns) + self.assertIn("id", columns) + self.assertIn("order_number", columns) + self.assertIn("customer_name", columns) + self.assertIn("customer_email", columns) + self.assertIn("product_name", columns) + + # Should have 5 columns total + self.assertEqual(len(columns), 5) + + def test_get_topic_field_fqn_for_cdc(self): + """Test constructing FQN for CDC topic fields""" + source = self._create_mock_source() + + # Get FQN for a CDC column + fqn = source._get_topic_field_fqn(self.cdc_topic, "id") + + # Should construct FQN manually for CDC envelope structure + self.assertIsNotNone(fqn) + self.assertIn("MysqlKafkaV2.ecommerce.orders.Envelope", fqn) + self.assertTrue(fqn.endswith(".id")) + + def test_cdc_envelope_detection(self): + """Test that Debezium CDC envelope is correctly detected""" + source = self._create_mock_source() + + columns = source._extract_columns_from_entity(self.cdc_topic) + + # Should not return CDC envelope fields (op, before, after, source, ts_ms) + self.assertNotIn("op", columns) + self.assertNotIn("before", columns) + self.assertNotIn("after", columns) + self.assertNotIn("source", columns) + self.assertNotIn("ts_ms", columns) + + # Should return actual table columns + self.assertIn("id", columns) + self.assertIn("order_number", columns) + + def test_non_cdc_topic_column_extraction(self): + """Test that non-CDC topics still work correctly""" + source = self._create_mock_source() + + # Helper to create field names + def create_field_name(name_str): + name_obj = MagicMock() + name_obj.root = name_str + return name_obj + + # Create a regular (non-CDC) topic + regular_topic = MagicMock() + regular_topic.name = create_field_name("orders") + regular_topic.fullyQualifiedName.root = "KafkaProd.orders" + + # Mock regular fields (not CDC envelope) + id_field = MagicMock() + id_field.name = create_field_name("id") + id_field.children = None + id_field.fullyQualifiedName.root = "KafkaProd.orders.id" + + name_field = MagicMock() + name_field.name = create_field_name("customer_name") + name_field.children = None + name_field.fullyQualifiedName.root = "KafkaProd.orders.customer_name" + + regular_topic.messageSchema = MagicMock() + regular_topic.messageSchema.schemaFields = [id_field, name_field] + + columns = source._extract_columns_from_entity(regular_topic) + + # Should extract regular fields + self.assertEqual(len(columns), 2) + self.assertIn("id", columns) + self.assertIn("customer_name", columns) + + def test_cdc_schema_text_missing(self): + """Test handling CDC topic without schema text""" + source = self._create_mock_source() + + # Helper to create field names + def create_field_name(name_str): + name_obj = MagicMock() + name_obj.root = name_str + return name_obj + + # Create CDC topic without schemaText + cdc_topic_no_text = MagicMock() + cdc_topic_no_text.name = create_field_name("MysqlKafkaV2.ecommerce.orders") + + # CDC envelope structure but no schemaText + envelope_field = MagicMock() + envelope_field.name = create_field_name( + "MysqlKafkaV2.ecommerce.orders.Envelope" + ) + + op_field = MagicMock() + op_field.name = create_field_name("op") + op_field.children = None + + before_field = MagicMock() + before_field.name = create_field_name("before") + before_field.children = None + + after_field = MagicMock() + after_field.name = create_field_name("after") + after_field.children = None + + envelope_field.children = [op_field, before_field, after_field] + + cdc_topic_no_text.messageSchema = MagicMock() + cdc_topic_no_text.messageSchema.schemaText = None # No schema text + cdc_topic_no_text.messageSchema.schemaFields = [envelope_field] + + columns = source._extract_columns_from_entity(cdc_topic_no_text) + + # Should return empty list when schema text is not available + self.assertEqual(columns, []) + + def test_cdc_with_before_field(self): + """Test CDC extraction prefers 'after' but falls back to 'before'""" + source = self._create_mock_source() + + # Create CDC topic with only 'before' field having data (after only has null) + cdc_topic_before = MagicMock() + cdc_topic_before.name = "Test.cdc.topic" + cdc_topic_before.fullyQualifiedName.root = "KafkaProd.Test.cdc.topic" + + cdc_topic_before.messageSchema = MagicMock() + cdc_topic_before.messageSchema.schemaText = '{"type":"object","properties":{"op":{"type":"string"},"before":{"oneOf":[{"type":"null"},{"type":"object","properties":{"field1":{"type":"string"},"field2":{"type":"integer"}}}]},"after":{"oneOf":[{"type":"null"}]}}}' + cdc_topic_before.messageSchema.schemaFields = [] + + columns = source._extract_columns_from_entity(cdc_topic_before) + + # Should extract from 'before' field when 'after' only has null + self.assertIn("field1", columns) + self.assertIn("field2", columns)