Kafka Connect: Support Confluent Cloud connectors (#23780)

This commit is contained in:
Sriharsha Chintalapani 2025-10-08 15:58:27 -04:00 committed by GitHub
parent da8c50d2a0
commit 454d7367b0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 2141 additions and 55 deletions

View File

@ -0,0 +1,70 @@
source:
type: kafkaconnect
serviceName: confluent_cdc_mysql_postgres
serviceConnection:
config:
type: KafkaConnect
hostPort: http://localhost:8083
# For Kafka Connect, choose one of the following authentication methods:
# Option 1: Username/Password Authentication (for self-hosted Kafka Connect)
KafkaConnectConfig:
username: admin
password: admin_password
# Option 2: API Key Authentication (for Confluent Cloud)
# KafkaConnectConfig:
# username: YOUR_CONFLUENT_CLOUD_API_KEY
# password: YOUR_CONFLUENT_CLOUD_API_SECRET
# Option 3: No Authentication
# KafkaConnectConfig: null
verifySSL: true
messagingServiceName: KafkaProd
# Optional: Filter CDC connectors/pipelines using regex patterns
# pipelineFilterPattern:
# includes:
# - "mysql-cdc-.*"
# - "postgres-sink-.*"
# excludes:
# - ".*test.*"
# - ".*dev.*"
sourceConfig:
config:
type: PipelineMetadata
lineageInformation:
dbServiceNames:
- "MysqlProd"
- "PostgresProd"
storageServiceNames: []
sink:
type: metadata-rest
config: {}
workflowConfig:
loggerLevel: INFO # DEBUG, INFO, WARN or ERROR
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: <your-jwt-token-here>
# To run this workflow:
# metadata ingest -c /path/to/confluent_cdc.yaml
# Prerequisites:
# 1. OpenMetadata server running at http://localhost:8585
# 2. MySQL database service already ingested into OpenMetadata
# 3. Postgres database service already ingested into OpenMetadata
# 4. Kafka messaging service already ingested into OpenMetadata
# 5. Kafka Connect connectors configured and running
#
# The connector will create:
# - Pipeline entities for each CDC connector
# - Table → Topic lineage with column-level mapping
# - Topic → Table lineage with column-level mapping
# - Complete end-to-end lineage: MySQL columns → Kafka topic fields → Postgres columns

View File

@ -0,0 +1,86 @@
# Example: Confluent Cloud CDC Configuration
# This example shows how to connect to Confluent Cloud CDC connectors
# that replicate data from MySQL to PostgreSQL via Kafka
source:
type: kafkaconnect
serviceName: confluent_cloud_cdc_production
serviceConnection:
config:
type: KafkaConnect
# Confluent Cloud Kafka Connect REST API endpoint
hostPort: https://pkc-xxxxx.us-east-1.aws.confluent.cloud:443
# Confluent Cloud API Key authentication
KafkaConnectConfig:
username: YOUR_CONFLUENT_CLOUD_API_KEY
password: YOUR_CONFLUENT_CLOUD_API_SECRET
verifySSL: true
messagingServiceName: KafkaCloudProd
# Filter to include only production CDC connectors
pipelineFilterPattern:
includes:
- "prod-mysql-cdc-.*"
- "prod-postgres-sink-.*"
excludes:
- ".*staging.*"
- ".*test.*"
sourceConfig:
config:
type: PipelineMetadata
lineageInformation:
dbServiceNames:
- "MysqlProd"
- "PostgresProd"
storageServiceNames: []
sink:
type: metadata-rest
config: {}
workflowConfig:
loggerLevel: INFO
openMetadataServerConfig:
hostPort: https://your-openmetadata-server.com/api
authProvider: openmetadata
securityConfig:
jwtToken: <your-jwt-token>
# Example CDC Connector Configuration that this workflow will discover:
#
# MySQL CDC Source Connector:
# {
# "name": "prod-mysql-cdc-customers",
# "config": {
# "connector.class": "io.debezium.connector.mysql.MySqlConnector",
# "database.hostname": "mysql.example.com",
# "database.name": "ecommerce",
# "table.include.list": "ecommerce.customers,ecommerce.orders",
# "database.server.name": "prod-mysql",
# "topics": "prod-mysql.ecommerce.customers"
# }
# }
#
# Postgres Sink Connector:
# {
# "name": "prod-postgres-sink-customers",
# "config": {
# "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
# "connection.url": "jdbc:postgresql://postgres.example.com:5432/analytics",
# "topics": "prod-mysql.ecommerce.customers",
# "table.name.format": "customers",
# "schema": "public"
# }
# }
#
# This will create lineage:
# MySQL: ecommerce.customers (id, name, email, created_at)
# ↓
# Kafka: prod-mysql.ecommerce.customers (id, name, email, created_at) [topic schema fields]
# ↓
# Postgres: analytics.public.customers (id, name, email, created_at)
#
# With column-level lineage showing exact field mappings at each hop!

View File

@ -0,0 +1,120 @@
# Example: Local Confluent CDC Setup for Development
# This example shows a typical local development setup with:
# - Local Kafka Connect cluster
# - MySQL source database
# - PostgreSQL target database
# - Local Kafka broker
source:
type: kafkaconnect
serviceName: local_cdc_dev
serviceConnection:
config:
type: KafkaConnect
# Local Kafka Connect REST API
hostPort: http://localhost:8083
# No authentication for local development
# KafkaConnectConfig: null
# Or use basic auth if configured
KafkaConnectConfig:
username: connect-user
password: connect-password
verifySSL: false # Typically disabled for local development
messagingServiceName: KafkaLocal
sourceConfig:
config:
type: PipelineMetadata
lineageInformation:
dbServiceNames:
- "MysqlLocal"
- "PostgresLocal"
storageServiceNames: []
sink:
type: metadata-rest
config: {}
workflowConfig:
loggerLevel: DEBUG # Use DEBUG for development to see detailed logs
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg
# ============================================================================
# Local Development Setup Guide
# ============================================================================
#
# 1. Start local infrastructure:
# docker-compose up -d # Start MySQL, Postgres, Kafka, Kafka Connect
#
# 2. Ingest database metadata into OpenMetadata:
# # MySQL
# metadata ingest -c mysql_local.yaml
#
# # PostgreSQL
# metadata ingest -c postgres_local.yaml
#
# # Kafka
# metadata ingest -c kafka_local.yaml
#
# 3. Configure CDC connectors in Kafka Connect:
#
# # MySQL CDC Source Connector (Debezium)
# curl -X POST http://localhost:8083/connectors \
# -H "Content-Type: application/json" \
# -d '{
# "name": "mysql-source-customers",
# "config": {
# "connector.class": "io.debezium.connector.mysql.MySqlConnector",
# "database.hostname": "mysql",
# "database.port": "3306",
# "database.user": "debezium",
# "database.password": "dbz",
# "database.server.id": "184054",
# "database.server.name": "local-mysql",
# "database.name": "testdb",
# "table.include.list": "testdb.customers,testdb.orders",
# "database.history.kafka.bootstrap.servers": "kafka:9092",
# "database.history.kafka.topic": "schema-changes.testdb"
# }
# }'
#
# # PostgreSQL Sink Connector
# curl -X POST http://localhost:8083/connectors \
# -H "Content-Type: application/json" \
# -d '{
# "name": "postgres-sink-customers",
# "config": {
# "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
# "connection.url": "jdbc:postgresql://postgres:5432/warehouse",
# "connection.user": "postgres",
# "connection.password": "postgres",
# "topics": "local-mysql.testdb.customers",
# "table.name.format": "customers",
# "auto.create": "true",
# "auto.evolve": "true",
# "insert.mode": "upsert",
# "pk.mode": "record_key",
# "delete.enabled": "true"
# }
# }'
#
# 4. Run this CDC workflow:
# metadata ingest -c confluent_cdc_local.yaml
#
# 5. View lineage in OpenMetadata UI:
# http://localhost:8585
# Navigate to: Pipeline Services → local_cdc_dev → Pipelines
# Click on a pipeline to see column-level lineage
#
# Expected Lineage:
# testdb.customers.id → local-mysql.testdb.customers.id → warehouse.public.customers.id
# testdb.customers.name → local-mysql.testdb.customers.name → warehouse.public.customers.name
# testdb.customers.email → local-mysql.testdb.customers.email → warehouse.public.customers.email
# testdb.customers.created → local-mysql.testdb.customers.created → warehouse.public.customers.created

View File

@ -0,0 +1,49 @@
source:
type: kafkaconnect
serviceName: confluent_cloud_cdc
serviceConnection:
config:
type: KafkaConnect
# IMPORTANT: Use the Kafka Connect API endpoint, NOT the Kafka broker endpoint
# Format: https://api.confluent.cloud/connect/v1/environments/{ENV_ID}/clusters/{CLUSTER_ID}
# Example: https://api.confluent.cloud/connect/v1/environments/env-abc123/clusters/lkc-xyz789
hostPort: https://api.confluent.cloud/connect/v1/environments/YOUR_ENV_ID/clusters/YOUR_CONNECT_CLUSTER_ID
# Use Kafka Connect API Key (NOT Kafka broker API key)
KafkaConnectConfig:
username: YOUR_KAFKA_CONNECT_API_KEY
password: YOUR_KAFKA_CONNECT_API_SECRET
verifySSL: true
# REQUIRED for Confluent Cloud: Specify your Kafka messaging service
# This should match the service name you used when ingesting Kafka topics
messagingServiceName: confluent_kafka_cloud
# Optional: Filter specific connectors
# pipelineFilterPattern:
# includes:
# - "mysql-cdc-.*"
# - "postgres-sink-.*"
sourceConfig:
config:
type: PipelineMetadata
# Specify the database services where your tables are ingested
lineageInformation:
dbServiceNames:
- local_mysql # Replace with your MySQL service name
- local_postgres # Replace with your Postgres service name
storageServiceNames: [] # Add S3/GCS service names if using sink connectors to storage
sink:
type: metadata-rest
config: {}
workflowConfig:
loggerLevel: DEBUG # Use DEBUG to see detailed connection info
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg

View File

@ -14,6 +14,7 @@ Client to interact with Kafka Connect REST APIs
import traceback
from typing import List, Optional
from urllib.parse import urlparse
from kafka_connect import KafkaConnect
@ -21,6 +22,7 @@ from metadata.generated.schema.entity.services.connections.pipeline.kafkaConnect
KafkaConnectConnection,
)
from metadata.ingestion.source.pipeline.kafkaconnect.models import (
KafkaConnectColumnMapping,
KafkaConnectDatasetDetails,
KafkaConnectPipelineDetails,
KafkaConnectTopics,
@ -30,16 +32,111 @@ from metadata.utils.logger import ometa_logger
logger = ometa_logger()
SUPPORTED_DATASETS = {
"table": [
def parse_cdc_topic_name(topic_name: str, database_server_name: str = None) -> dict:
"""
Parse CDC topic names to extract database and table information.
Common CDC topic naming patterns:
- Debezium: {server-name}.{database}.{table}
- Debezium V2: {topic-prefix}.{database}.{table}
- Examples:
- MysqlKafkaV2.ecommerce.orders -> database=ecommerce, table=orders
- PostgresKafkaCDC.public.orders -> database=public, table=orders
- ecommerce.customers -> database=ecommerce, table=customers (if server-name matches)
Args:
topic_name: The Kafka topic name
database_server_name: The database.server.name or topic.prefix from connector config
Returns:
dict with 'database' and 'table' keys, or empty dict if pattern doesn't match
"""
if not topic_name:
return {}
# Skip internal/system topics
if topic_name.startswith(("_", "dbhistory.", "__")):
return {}
parts = topic_name.split(".")
# Pattern: {prefix}.{database}.{table} (3 parts)
if len(parts) == 3:
prefix, database, table = parts
# Verify prefix matches server name if provided
if database_server_name and prefix.lower() != database_server_name.lower():
# Might be schema.database.table for some connectors
pass
return {"database": database, "table": table}
# Pattern: {database}.{table} (2 parts)
elif len(parts) == 2:
database, table = parts
# Only accept if server name matches or not provided
if database_server_name and database.lower() == database_server_name.lower():
# This is server_name.table, so database is the server name
return {"database": database, "table": table}
# Or accept as database.table
return {"database": database, "table": table}
# Pattern: just {table} (1 part) - use server name as database
elif len(parts) == 1 and database_server_name:
return {"database": database_server_name, "table": topic_name}
return {}
class ConnectorConfigKeys:
"""Configuration keys for various Kafka Connect connectors"""
TABLE_KEYS = [
"table",
"collection",
"snowflake.schema.name",
"table.whitelist",
"fields.whitelist",
],
"database": ["database", "db.name", "snowflake.database.name"],
"container_name": ["s3.bucket.name"],
"table.include.list",
"table.name.format",
"tables.include",
"table.exclude.list",
"snowflake.schema",
"snowflake.topic2table.map",
"fields.included",
]
DATABASE_KEYS = [
"database",
"db.name",
"snowflake.database.name",
"database.include.list",
"database.hostname",
"connection.url",
"database.dbname",
"topic.prefix",
"database.server.name", # Debezium V1
"databases.include",
"database.names",
"snowflake.database",
"connection.host",
"database.exclude.list",
]
CONTAINER_KEYS = [
"s3.bucket.name",
"s3.bucket",
"gcs.bucket.name",
"azure.container.name",
"topics.dir", # Directory path within storage container for sink connectors
]
TOPIC_KEYS = ["kafka.topic", "topics", "topic"]
SUPPORTED_DATASETS = {
"table": ConnectorConfigKeys.TABLE_KEYS,
"database": ConnectorConfigKeys.DATABASE_KEYS,
"container_name": ConnectorConfigKeys.CONTAINER_KEYS,
}
@ -56,6 +153,41 @@ class KafkaConnectClient:
auth = f"{config.KafkaConnectConfig.username}:{config.KafkaConnectConfig.password.get_secret_value()}"
self.client = KafkaConnect(url=url, auth=auth, ssl_verify=ssl_verify)
# Detect if this is Confluent Cloud (managed connectors)
parsed_url = urlparse(url)
self.is_confluent_cloud = parsed_url.hostname == "api.confluent.cloud"
def _infer_cdc_topics_from_server_name(
self, database_server_name: str
) -> Optional[List[KafkaConnectTopics]]:
"""
For CDC connectors, infer topic names based on database.server.name or topic.prefix.
CDC connectors create topics with pattern: {server-name}.{database}.{table}
This is a workaround for Confluent Cloud which doesn't expose topic lists.
We look for topics that start with the server name prefix.
Args:
database_server_name: The database.server.name or topic.prefix from config
Returns:
List of inferred KafkaConnectTopics, or None
"""
if not database_server_name or not self.is_confluent_cloud:
return None
try:
# Get all connectors and check their topics
# Note: This is a best-effort approach for Confluent Cloud
# In practice, the messaging service should already have ingested these topics
logger.debug(
f"CDC connector detected with server name: {database_server_name}"
)
return None # Topics will be matched via messaging service during lineage
except Exception as exc:
logger.debug(f"Unable to infer CDC topics: {exc}")
return None
def _enrich_connector_details(
self, connector_details: KafkaConnectPipelineDetails, connector_name: str
) -> None:
@ -70,10 +202,49 @@ class KafkaConnectClient:
connector_details.config
)
# For CDC connectors without explicit topics, try to infer from server name
if (
not connector_details.topics
and connector_details.conn_type.lower() == "source"
):
database_server_name = connector_details.config.get(
"database.server.name"
) or connector_details.config.get("topic.prefix")
if database_server_name:
inferred_topics = self._infer_cdc_topics_from_server_name(
database_server_name
)
if inferred_topics:
connector_details.topics = inferred_topics
def get_cluster_info(self) -> Optional[dict]:
"""
Get the version and other details of the Kafka Connect cluster.
For Confluent Cloud, the root endpoint is not supported, so we use
the /connectors endpoint to verify authentication and connectivity.
"""
if self.is_confluent_cloud:
# Confluent Cloud doesn't support the root endpoint (/)
# Use /connectors to test authentication and connectivity
logger.info(
"Confluent Cloud detected - testing connection via connectors list endpoint"
)
try:
connectors = self.client.list_connectors()
# Connection successful - return a valid response
logger.info(
f"Confluent Cloud connection successful - found {len(connectors) if connectors else 0} connectors"
)
return {
"version": "confluent-cloud",
"commit": "managed",
"kafka_cluster_id": "confluent-managed",
}
except Exception as exc:
logger.error(f"Failed to connect to Confluent Cloud: {exc}")
raise
return self.client.get_cluster_info()
def get_connectors_list(
@ -122,19 +293,97 @@ class KafkaConnectClient:
def get_connector_config(self, connector: str) -> Optional[dict]:
"""
Get the details of a single connector.
For Confluent Cloud, the API returns configs as an array of {config, value} objects.
For self-hosted Kafka Connect, it returns a flat config dictionary.
Args:
connector (str): The name of the connector.
"""
try:
result = self.client.get_connector(connector=connector)
if result:
return result.get("config")
if not result:
return None
# Check if this is Confluent Cloud format (array of {config, value})
if self.is_confluent_cloud and "configs" in result:
# Transform Confluent Cloud format: [{config: "key", value: "val"}] -> {key: val}
configs_array = result.get("configs", [])
if isinstance(configs_array, list):
config_dict = {
item["config"]: item["value"]
for item in configs_array
if isinstance(item, dict)
and "config" in item
and "value" in item
}
return config_dict or None
# Standard self-hosted Kafka Connect format
return result.get("config")
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(f"Unable to get connector configuration details {exc}")
return None
def extract_column_mappings(
self, connector_config: dict
) -> Optional[List[KafkaConnectColumnMapping]]:
"""
Extract column mappings from connector configuration.
For Debezium and JDBC connectors, columns are typically mapped 1:1
unless transforms are applied.
Args:
connector_config: The connector configuration dictionary
Returns:
List of KafkaConnectColumnMapping objects if mappings can be inferred
"""
if not connector_config or not isinstance(connector_config, dict):
logger.debug("Invalid connector_config: expected dict")
return None
try:
column_mappings = []
# Check for SMT (Single Message Transform) configurations
transforms = connector_config.get("transforms", "")
if not transforms:
return None
transform_list = [t.strip() for t in transforms.split(",")]
for transform in transform_list:
transform_type = connector_config.get(
f"transforms.{transform}.type", ""
)
# ReplaceField transform can rename columns
if "ReplaceField" in transform_type:
renames = connector_config.get(
f"transforms.{transform}.renames", ""
)
if renames:
for rename in renames.split(","):
if ":" in rename:
source_col, target_col = rename.split(":", 1)
column_mappings.append(
KafkaConnectColumnMapping(
source_column=source_col.strip(),
target_column=target_col.strip(),
)
)
return column_mappings if column_mappings else None
except (KeyError, AttributeError, ValueError) as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Unable to extract column mappings: {exc}")
return None
def get_connector_dataset_info(
self, connector_config: dict
) -> Optional[KafkaConnectDatasetDetails]:
@ -143,27 +392,36 @@ class KafkaConnectClient:
Checks in the connector configurations for dataset fields
if any related field is found returns the result
Args:
connector (str): The name of the connector.
connector_config: The connector configuration dictionary
Returns:
Optional[Dict]: A dictionary containing dataset information
(type, table, database, or bucket_name)
if a dataset is found, or None if the connector
is not found, has no dataset, or an error occurs.
Optional[KafkaConnectDatasetDetails]: Dataset information including
table, database, or container_name if found, or None otherwise
"""
if not connector_config or not isinstance(connector_config, dict):
logger.debug("Invalid connector_config: expected dict")
return None
try:
if not connector_config:
return None
result = {}
for dataset in SUPPORTED_DATASETS or []:
for key in SUPPORTED_DATASETS[dataset] or []:
for dataset_type, config_keys in SUPPORTED_DATASETS.items():
for key in config_keys:
if connector_config.get(key):
result[dataset] = connector_config[key]
return KafkaConnectDatasetDetails(**result)
result[dataset_type] = connector_config[key]
except Exception as exc:
# Only create dataset details if we have meaningful dataset information
# For CDC connectors, database.server.name/topic.prefix are captured
# but don't represent actual table names, so skip dataset creation
# We need either: table name OR container name
if result and (result.get("table") or result.get("container_name")):
dataset_details = KafkaConnectDatasetDetails(**result)
dataset_details.column_mappings = (
self.extract_column_mappings(connector_config) or []
)
return dataset_details
except (KeyError, ValueError, TypeError) as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Unable to get connector dataset details {exc}")
logger.warning(f"Unable to get connector dataset details: {exc}")
return None
@ -173,6 +431,9 @@ class KafkaConnectClient:
"""
Get the list of topics for a connector.
For Confluent Cloud, the /topics endpoint is not supported, so we extract
topics from the connector configuration instead.
Args:
connector (str): The name of the connector.
@ -183,15 +444,42 @@ class KafkaConnectClient:
or an error occurs.
"""
try:
result = self.client.list_connector_topics(connector=connector).get(
connector
)
if result:
topics = [
KafkaConnectTopics(name=topic)
for topic in result.get("topics") or []
]
return topics
if self.is_confluent_cloud:
# Confluent Cloud doesn't support /connectors/{name}/topics endpoint
# Extract topics from connector config instead
config = self.get_connector_config(connector=connector)
if config:
topics = []
# Check common topic configuration keys
for key in ConnectorConfigKeys.TOPIC_KEYS:
if key in config:
topic_value = config[key]
# Handle single topic or comma-separated list
if isinstance(topic_value, str):
topic_list = [t.strip() for t in topic_value.split(",")]
topics.extend(
[
KafkaConnectTopics(name=topic)
for topic in topic_list
]
)
if topics:
logger.info(
f"Extracted {len(topics)} topics from Confluent Cloud connector config"
)
return topics
else:
# Self-hosted Kafka Connect supports /topics endpoint
result = self.client.list_connector_topics(connector=connector).get(
connector
)
if result:
topics = [
KafkaConnectTopics(name=topic)
for topic in result.get("topics") or []
]
return topics
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Unable to get connector Topics {exc}")

View File

@ -13,7 +13,7 @@ KafkaConnect source to extract metadata from OM UI
"""
import traceback
from datetime import datetime
from typing import Iterable, Optional
from typing import Any, Iterable, List, Optional
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
@ -42,15 +42,23 @@ from metadata.generated.schema.type.basic import (
SourceUrl,
Timestamp,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityLineage import (
ColumnLineage,
EntitiesEdge,
LineageDetails,
)
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.sql_lineage import get_column_fqn
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata, T
from metadata.ingestion.source.pipeline.kafkaconnect.client import parse_cdc_topic_name
from metadata.ingestion.source.pipeline.kafkaconnect.models import (
ConnectorType,
KafkaConnectPipelineDetails,
KafkaConnectTopics,
)
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils import fqn
@ -67,6 +75,22 @@ STATUS_MAP = {
"UNASSIGNED": StatusType.Pending.value,
}
# CDC envelope field names used for Debezium detection and parsing
CDC_ENVELOPE_FIELDS = {"after", "before", "op"}
def get_field_name(field_name: Any) -> str:
"""
Extract string name from FieldName object or string.
Args:
field_name: FieldName object with .root attribute, or plain string
Returns:
String representation of the field name
"""
return field_name.root if hasattr(field_name, "root") else str(field_name)
class KafkaconnectSource(PipelineServiceSource):
"""
@ -105,9 +129,11 @@ class KafkaconnectSource(PipelineServiceSource):
for task in pipeline_details.tasks or []
],
service=self.context.get().pipeline_service,
description=Markdown(pipeline_details.description)
if pipeline_details.description
else None,
description=(
Markdown(pipeline_details.description)
if pipeline_details.description
else None
),
)
yield Either(right=pipeline_request)
self.register_record(pipeline_request=pipeline_request)
@ -174,6 +200,431 @@ class KafkaconnectSource(PipelineServiceSource):
return None
def _get_entity_column_fqn(self, entity: T, column_name: str) -> Optional[str]:
"""
Get column FQN for any supported entity type.
Dispatch based on entity type.
Args:
entity: Table or Topic entity
column_name: Column/field name
Returns:
Fully qualified column name or None
"""
if isinstance(entity, Topic):
return self._get_topic_field_fqn(entity, column_name)
elif isinstance(entity, Table):
return get_column_fqn(table_entity=entity, column=column_name)
else:
logger.warning(
f"Unsupported entity type for column FQN: {type(entity).__name__}"
)
return None
def _parse_cdc_schema_columns(self, schema_text: str) -> List[str]:
"""
Parse Debezium CDC schema JSON to extract table column names.
Looks for columns in 'after' or 'before' fields within the schema,
handling nullable oneOf structures.
Args:
schema_text: Raw JSON schema string from topic
Returns:
List of column names, or empty list if parsing fails
"""
try:
import json
schema_dict = json.loads(schema_text)
# Look for 'after' or 'before' field in the schema
for field_name in ["after", "before"]:
if field_name not in schema_dict.get("properties", {}):
continue
field_def = schema_dict["properties"][field_name]
# Handle oneOf (nullable types)
if "oneOf" not in field_def:
continue
for option in field_def["oneOf"]:
if isinstance(option, dict) and option.get("type") == "object":
columns = list(option.get("properties", {}).keys())
logger.debug(
f"Parsed {len(columns)} columns from CDC '{field_name}' field"
)
return columns
except Exception as exc:
logger.debug(f"Unable to parse CDC schema text: {exc}")
return []
def _extract_columns_from_entity(self, entity: T) -> List[str]:
"""
Extract column/field names from Table or Topic entity.
For Debezium CDC topics, extracts columns from the 'after' or 'before' field
which contains the actual table structure, not the CDC envelope fields.
Args:
entity: Table or Topic entity
Returns:
List of column/field names
"""
if isinstance(entity, Table):
return [col.name.root for col in entity.columns or []]
if hasattr(entity, "messageSchema") and entity.messageSchema:
schema_fields = entity.messageSchema.schemaFields or []
# Check if this is a Debezium CDC envelope structure
# Can be either flat (top-level: op, before, after) or nested (Envelope -> op, before, after)
field_names = {get_field_name(f.name) for f in schema_fields}
is_debezium_cdc = CDC_ENVELOPE_FIELDS.issubset(field_names)
# Fallback: Check schemaText for CDC structure if schemaFields doesn't indicate CDC
if not is_debezium_cdc and entity.messageSchema.schemaText:
try:
import json
schema_dict = json.loads(entity.messageSchema.schemaText)
schema_props = schema_dict.get("properties", {})
# Check if schemaText has CDC envelope fields
is_debezium_cdc = CDC_ENVELOPE_FIELDS.issubset(
set(schema_props.keys())
)
except Exception:
pass
logger.debug(
f"Topic {get_field_name(entity.name) if hasattr(entity, 'name') else 'unknown'}: field_names={field_names}, is_debezium_cdc={is_debezium_cdc}"
)
# Check for nested Debezium CDC structure (single Envelope field with CDC children)
if not is_debezium_cdc and len(schema_fields) == 1:
envelope_field = schema_fields[0]
if envelope_field.children:
envelope_child_names = {
get_field_name(c.name) for c in envelope_field.children
}
is_debezium_cdc = CDC_ENVELOPE_FIELDS.issubset(envelope_child_names)
if is_debezium_cdc:
logger.debug(
f"Nested Debezium CDC envelope detected: {get_field_name(envelope_field.name)}"
)
schema_fields = (
envelope_field.children
) # Use envelope children as schema fields
if is_debezium_cdc:
# For Debezium CDC, extract columns from the 'after' field (or 'before' as fallback)
# The 'after' field contains the complete record structure after the change
for field in schema_fields:
field_name_str = get_field_name(field.name)
# Prefer 'after' for source connectors (contains new/updated record state)
if field_name_str == "after" and field.children:
columns = [
get_field_name(child.name) for child in field.children
]
logger.debug(
f"Debezium CDC: extracted {len(columns)} columns from 'after' field"
)
return columns
# Fallback to 'before' if 'after' has no children
for field in schema_fields:
field_name_str = get_field_name(field.name)
if field_name_str == "before" and field.children:
columns = [
get_field_name(child.name) for child in field.children
]
logger.debug(
f"Debezium CDC: extracted {len(columns)} columns from 'before' field"
)
return columns
# Final fallback: Parse schemaText if after/before don't have children
if entity.messageSchema.schemaText:
columns = self._parse_cdc_schema_columns(
entity.messageSchema.schemaText
)
if columns:
logger.debug(
f"Debezium CDC: extracted {len(columns)} columns from schemaText"
)
return columns
logger.debug(
"Debezium CDC detected but unable to extract columns from after/before fields"
)
return []
# Non-CDC topic: extract all fields
columns = []
for field in schema_fields:
if field.children:
columns.extend(
[get_field_name(child.name) for child in field.children]
)
else:
columns.append(get_field_name(field.name))
return columns
return []
def _get_topic_field_fqn(
self, topic_entity: Topic, field_name: str
) -> Optional[str]:
"""
Get the fully qualified name for a field in a Topic's schema.
Handles nested structures where fields may be children of a parent RECORD.
For Debezium CDC topics, searches for fields inside after/before envelope children.
"""
if (
not topic_entity.messageSchema
or not topic_entity.messageSchema.schemaFields
):
logger.debug(
f"Topic {get_field_name(topic_entity.name)} has no message schema"
)
return None
# Search for the field in the schema (including nested fields)
for field in topic_entity.messageSchema.schemaFields:
field_name_str = get_field_name(field.name)
# Check if it's a direct field
if field_name_str == field_name:
return (
field.fullyQualifiedName.root if field.fullyQualifiedName else None
)
# Check if it's a child field (nested - one level deep)
if field.children:
# For Debezium CDC, prioritize 'after' over 'before' when searching for grandchildren
children_to_search = field.children
after_child = None
before_child = None
for child in field.children:
child_name = get_field_name(child.name)
if child_name == "after":
after_child = child
elif child_name == "before":
before_child = child
# Check direct child match
if child_name == field_name:
return (
child.fullyQualifiedName.root
if child.fullyQualifiedName
else None
)
# Search grandchildren - prefer 'after' over 'before' for CDC topics
for cdc_child in [after_child, before_child]:
if cdc_child and cdc_child.children:
for grandchild in cdc_child.children:
if get_field_name(grandchild.name) == field_name:
return (
grandchild.fullyQualifiedName.root
if grandchild.fullyQualifiedName
else None
)
# Search other grandchildren (non-CDC fields)
for child in field.children:
if child not in [after_child, before_child] and child.children:
for grandchild in child.children:
if get_field_name(grandchild.name) == field_name:
return (
grandchild.fullyQualifiedName.root
if grandchild.fullyQualifiedName
else None
)
# For Debezium CDC topics, columns might only exist in schemaText (not as field objects)
# Manually construct FQN: topicFQN.Envelope.columnName
for field in topic_entity.messageSchema.schemaFields:
field_name_str = get_field_name(field.name)
# Check if this is a CDC envelope field
if "Envelope" in field_name_str and field.fullyQualifiedName:
# Construct FQN manually for CDC column
envelope_fqn = field.fullyQualifiedName.root
return f"{envelope_fqn}.{field_name}"
logger.debug(
f"Field {field_name} not found in topic {get_field_name(topic_entity.name)} schema"
)
return None
def build_column_lineage(
self,
from_entity: T,
to_entity: T,
topic_entity: Topic,
pipeline_details: KafkaConnectPipelineDetails,
) -> Optional[List[ColumnLineage]]:
"""
Build column-level lineage between source table, topic, and target table.
For source connectors: Table columns -> Topic schema fields
For sink connectors: Topic schema fields -> Table columns
"""
try:
column_lineages = []
# Get column mappings from connector config if available
if pipeline_details.dataset and pipeline_details.dataset.column_mappings:
# Use explicit column mappings from connector config
for mapping in pipeline_details.dataset.column_mappings:
if pipeline_details.conn_type == ConnectorType.SINK.value:
from_col = get_column_fqn(
table_entity=topic_entity, column=mapping.source_column
)
to_col = get_column_fqn(
table_entity=to_entity, column=mapping.target_column
)
else:
from_col = get_column_fqn(
table_entity=from_entity, column=mapping.source_column
)
to_col = get_column_fqn(
table_entity=topic_entity, column=mapping.target_column
)
if from_col and to_col:
column_lineages.append(
ColumnLineage(
fromColumns=[from_col],
toColumn=to_col,
function=None,
)
)
else:
# Infer 1:1 column mappings based on matching column names
if pipeline_details.conn_type == ConnectorType.SINK.value:
source_entity = topic_entity
target_entity = to_entity
else:
source_entity = from_entity
target_entity = topic_entity
# Extract columns from both entities
source_columns = self._extract_columns_from_entity(source_entity)
target_columns = self._extract_columns_from_entity(target_entity)
logger.debug(
f"Column matching for {pipeline_details.name}: "
f"source={len(source_columns)} cols from {source_entity.__class__.__name__}, "
f"target={len(target_columns)} cols from {target_entity.__class__.__name__}"
)
logger.debug(f"Source columns: {source_columns[:5]}") # First 5
logger.debug(f"Target columns: {target_columns}")
# Create lookup dictionary for O(n) performance instead of O(n²)
target_cols_map = {str(col).lower(): col for col in target_columns}
# Match columns by name (case-insensitive)
for source_col_name in source_columns:
source_key = str(source_col_name).lower()
if source_key in target_cols_map:
target_col_name = target_cols_map[source_key]
logger.debug(
f"Matched column: {source_col_name} -> {target_col_name}"
)
try:
# Get fully qualified names for source and target columns
from_col = self._get_entity_column_fqn(
source_entity, source_col_name
)
to_col = self._get_entity_column_fqn(
target_entity, target_col_name
)
logger.debug(f"FQNs: from_col={from_col}, to_col={to_col}")
if from_col and to_col:
column_lineages.append(
ColumnLineage(
fromColumns=[from_col],
toColumn=to_col,
function=None,
)
)
logger.debug(
f"Added column lineage: {from_col} -> {to_col}"
)
except (KeyError, AttributeError) as exc:
logger.debug(
f"Error creating column lineage for {source_col_name} -> {target_col_name}: {exc}"
)
if column_lineages:
logger.debug(
f"Created {len(column_lineages)} column lineages for {pipeline_details.name}"
)
return column_lineages if column_lineages else None
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Unable to build column lineage: {exc}")
return None
def _query_cdc_topics_from_messaging_service(
self, database_server_name: str
) -> List[KafkaConnectTopics]:
"""
Query topics from messaging service and filter by CDC naming pattern.
Used for CDC connectors without explicit topic lists - discovers topics
by matching against database.server.name prefix.
Args:
database_server_name: The database.server.name or topic.prefix from connector config
Returns:
List of matching CDC topics
"""
topics_found = []
try:
logger.debug(
f"CDC connector without topics list - querying messaging service "
f"for pattern: {database_server_name}.*"
)
# List topics from the configured messaging service only
topics_list = self.metadata.list_entities(
entity=Topic,
fields=["name", "fullyQualifiedName", "service"],
params={"service": self.service_connection.messagingServiceName},
).entities
# Filter topics that match the CDC naming pattern
for topic_entity in topics_list or []:
topic_name = (
topic_entity.name.root
if hasattr(topic_entity.name, "root")
else str(topic_entity.name)
)
# Parse the topic to see if it's a CDC topic related to this connector
topic_info = parse_cdc_topic_name(topic_name, database_server_name)
if topic_info:
topics_found.append(KafkaConnectTopics(name=topic_name))
logger.debug(f"Matched CDC topic: {topic_name} -> {topic_info}")
except Exception as exc:
logger.debug(f"Unable to query topics from messaging service: {exc}")
return topics_found
def yield_pipeline_lineage_details(
self, pipeline_details: KafkaConnectPipelineDetails
) -> Iterable[Either[AddLineageRequest]]:
@ -196,14 +647,30 @@ class KafkaconnectSource(PipelineServiceSource):
entity=Pipeline, fqn=pipeline_fqn
)
lineage_details = LineageDetails(
pipeline=EntityReference(id=pipeline_entity.id.root, type="pipeline"),
source=LineageSource.PipelineLineage,
)
dataset_entity = self.get_dataset_entity(pipeline_details=pipeline_details)
for topic in pipeline_details.topics or []:
# Get database.server.name or topic.prefix for CDC topic parsing
# These are ONLY set by Debezium CDC connectors
database_server_name = None
if pipeline_details.config:
database_server_name = pipeline_details.config.get(
"database.server.name"
) or pipeline_details.config.get("topic.prefix")
# For CDC connectors without explicit topics, query topics from messaging service
# and filter by CDC naming pattern
# Only do this for Debezium CDC connectors (identified by database.server.name or topic.prefix)
topics_to_process = pipeline_details.topics or []
if (
not topics_to_process
and database_server_name
and pipeline_details.conn_type == ConnectorType.SOURCE.value
):
topics_to_process = self._query_cdc_topics_from_messaging_service(
database_server_name
)
for topic in topics_to_process:
topic_fqn = fqn.build(
metadata=self.metadata,
entity_type=Topic,
@ -213,13 +680,76 @@ class KafkaconnectSource(PipelineServiceSource):
topic_entity = self.metadata.get_by_name(entity=Topic, fqn=topic_fqn)
if topic_entity is None or dataset_entity is None:
if topic_entity is None:
continue
if pipeline_details.conn_type.lower() == "sink":
from_entity, to_entity = topic_entity, dataset_entity
# If no dataset entity from config, try to parse table info from CDC topic name
current_dataset_entity = dataset_entity
if (
current_dataset_entity is None
and pipeline_details.conn_type == ConnectorType.SOURCE.value
):
# Parse CDC topic name to extract table information
topic_info = parse_cdc_topic_name(
str(topic.name), database_server_name
)
if topic_info.get("database") and topic_info.get("table"):
logger.debug(
f"Parsed CDC topic {topic.name}: database={topic_info['database']}, table={topic_info['table']}"
)
# Try to find the table entity
for (
dbservicename
) in self.source_config.lineageInformation.dbServiceNames or [
"*"
]:
table_fqn = fqn.build(
metadata=self.metadata,
entity_type=Table,
table_name=topic_info["table"],
database_name=None,
schema_name=topic_info["database"],
service_name=dbservicename,
)
current_dataset_entity = self.metadata.get_by_name(
entity=Table, fqn=table_fqn
)
if current_dataset_entity:
logger.debug(f"Found table entity: {table_fqn}")
break
if current_dataset_entity is None:
# No table entity found, skip this topic
continue
if pipeline_details.conn_type == ConnectorType.SINK.value:
from_entity, to_entity = topic_entity, current_dataset_entity
else:
from_entity, to_entity = dataset_entity, topic_entity
from_entity, to_entity = current_dataset_entity, topic_entity
# Build column-level lineage (best effort - don't fail entity-level lineage)
column_lineage = None
try:
column_lineage = self.build_column_lineage(
from_entity=from_entity,
to_entity=to_entity,
topic_entity=topic_entity,
pipeline_details=pipeline_details,
)
except Exception as exc:
logger.warning(
f"Failed to build column-level lineage for {pipeline_details.name}: {exc}. "
"Entity-level lineage will still be created."
)
logger.debug(traceback.format_exc())
lineage_details = LineageDetails(
pipeline=EntityReference(
id=pipeline_entity.id.root, type="pipeline"
),
source=LineageSource.PipelineLineage,
columnsLineage=column_lineage,
)
yield Either(
right=AddLineageRequest(
@ -291,7 +821,7 @@ class KafkaconnectSource(PipelineServiceSource):
pipeline_details.status, StatusType.Pending
),
taskStatus=task_status,
timestamp=Timestamp(datetime_to_ts(datetime.now()))
timestamp=Timestamp(datetime_to_ts(datetime.now())),
# Kafka connect doesn't provide any details with exec time
)

View File

@ -13,14 +13,23 @@
KafkaConnect Source Model module
"""
from enum import Enum
from typing import List, Optional, Type, Union
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, field_validator
from metadata.generated.schema.entity.data.container import Container
from metadata.generated.schema.entity.data.table import Table
class ConnectorType(str, Enum):
"""Kafka Connect connector types"""
SOURCE = "source"
SINK = "sink"
UNKNOWN = "UNKNOWN"
class KafkaConnectTasks(BaseModel):
id: int = Field(..., description="ID of the task")
state: Optional[str] = Field(
@ -35,10 +44,20 @@ class KafkaConnectTopics(BaseModel):
name: str = Field(..., description="Name of the topic (e.g., random-source-avro)")
class KafkaConnectColumnMapping(BaseModel):
"""Model for column-level mapping between source and target"""
source_column: str = Field(..., description="Source column name")
target_column: str = Field(..., description="Target column/field name")
class KafkaConnectDatasetDetails(BaseModel):
table: Optional[str] = None
database: Optional[str] = None
container_name: Optional[str] = None
column_mappings: List[KafkaConnectColumnMapping] = Field(
default_factory=list, description="Column-level mappings if available"
)
@property
def dataset_type(self) -> Optional[Type[Union[Table, Container]]]:
@ -63,3 +82,15 @@ class KafkaConnectPipelineDetails(BaseModel):
description: Optional[str] = None
dataset: Optional[KafkaConnectDatasetDetails] = None
config: Optional[dict] = Field(default_factory=dict)
@field_validator("conn_type", mode="before")
@classmethod
def normalize_connector_type(cls, value: str) -> str:
"""Normalize connector type to enum value"""
if value:
value_lower = value.lower()
if value_lower == "source":
return ConnectorType.SOURCE.value
elif value_lower == "sink":
return ConnectorType.SINK.value
return ConnectorType.UNKNOWN.value

View File

@ -68,9 +68,49 @@ def parse_json_schema(
def get_child_models(key, value, field_models, cls: Type[BaseModel] = FieldModel):
"""
Method to parse the child objects in the json schema
Method to parse the child objects in the json schema.
Handles oneOf union types (e.g., Debezium CDC nullable fields).
"""
try:
# Handle oneOf union types (e.g., [{"type": "null"}, {"type": "object", "properties": {...}}])
# Common in Debezium CDC schemas for nullable fields like "after" and "before"
if "oneOf" in value and isinstance(value["oneOf"], list):
# Find the non-null object schema in the union
object_schema = None
for option in value["oneOf"]:
if (
isinstance(option, dict)
and option.get("type") == JsonSchemaDataTypes.RECORD.value
):
object_schema = option
break
if object_schema:
# Use the object schema's properties and metadata
cls_obj = cls(
name=key,
displayName=value.get("title") or object_schema.get("title"),
dataType=JsonSchemaDataTypes.RECORD.name,
description=value.get("description")
or object_schema.get("description"),
)
children = get_json_schema_fields(
object_schema.get("properties", {}), cls=cls
)
cls_obj.children = children
field_models.append(cls_obj)
return
# If no object found in oneOf, treat as UNKNOWN
cls_obj = cls(
name=key,
displayName=value.get("title"),
dataType=JsonSchemaDataTypes.UNKNOWN.name,
description=value.get("description"),
)
field_models.append(cls_obj)
return
# Standard type handling
cls_obj = cls(
name=key,
displayName=value.get("title"),

View File

@ -13,13 +13,14 @@
Test KafkaConnect client and models
"""
from unittest import TestCase
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock, Mock, patch
from metadata.generated.schema.entity.services.connections.pipeline.kafkaConnectConnection import (
KafkaConnectConnection,
)
from metadata.ingestion.source.pipeline.kafkaconnect.client import KafkaConnectClient
from metadata.ingestion.source.pipeline.kafkaconnect.models import (
KafkaConnectColumnMapping,
KafkaConnectDatasetDetails,
KafkaConnectPipelineDetails,
KafkaConnectTasks,
@ -273,18 +274,15 @@ class TestKafkaConnectClient(TestCase):
client = KafkaConnectClient(self.mock_config)
# Test various supported dataset configurations
# Note: Database-only configs return None (need table or container)
test_configs = [
# Table configurations
# Table configurations (have table name)
({"table": "users", "database": "mydb"}, "table", "users"),
({"collection": "users"}, "table", "users"),
({"snowflake.schema.name": "schema1"}, "table", "schema1"),
({"table.whitelist": "table1,table2"}, "table", "table1,table2"),
({"fields.whitelist": "field1,field2"}, "table", "field1,field2"),
# Database configurations
({"database": "mydb"}, "database", "mydb"),
({"db.name": "testdb"}, "database", "testdb"),
({"snowflake.database.name": "snowdb"}, "database", "snowdb"),
# Container configurations
# Container configurations (have container name)
({"s3.bucket.name": "my-bucket"}, "container_name", "my-bucket"),
]
@ -298,3 +296,877 @@ class TestKafkaConnectClient(TestCase):
expected_value,
f"Expected {expected_field}={expected_value}, got {actual_value}",
)
class TestConfluentCloudSupport(TestCase):
"""Test Confluent Cloud specific functionality"""
def test_confluent_cloud_detection(self):
"""Test that Confluent Cloud URLs are detected correctly"""
confluent_config = Mock(spec=KafkaConnectConnection)
confluent_config.hostPort = "https://api.confluent.cloud/connect/v1/environments/env-123/clusters/lkc-456"
confluent_config.verifySSL = True
confluent_config.KafkaConnectConfig = None
client = KafkaConnectClient(confluent_config)
self.assertTrue(client.is_confluent_cloud)
def test_self_hosted_detection(self):
"""Test that self-hosted Kafka Connect is detected correctly"""
self_hosted_config = Mock(spec=KafkaConnectConnection)
self_hosted_config.hostPort = "http://localhost:8083"
self_hosted_config.verifySSL = False
self_hosted_config.KafkaConnectConfig = None
client = KafkaConnectClient(self_hosted_config)
self.assertFalse(client.is_confluent_cloud)
def test_confluent_cloud_get_cluster_info(self):
"""Test that get_cluster_info works for Confluent Cloud"""
confluent_config = Mock(spec=KafkaConnectConnection)
confluent_config.hostPort = "https://api.confluent.cloud/connect/v1/environments/env-123/clusters/lkc-456"
confluent_config.verifySSL = True
confluent_config.KafkaConnectConfig = None
client = KafkaConnectClient(confluent_config)
client.client.list_connectors = Mock(return_value=["connector1", "connector2"])
result = client.get_cluster_info()
self.assertIsNotNone(result)
self.assertEqual(result["version"], "confluent-cloud")
self.assertEqual(result["kafka_cluster_id"], "confluent-managed")
def test_confluent_cloud_get_connector_topics_from_config(self):
"""Test extracting topics from Confluent Cloud connector config"""
confluent_config = Mock(spec=KafkaConnectConnection)
confluent_config.hostPort = "https://api.confluent.cloud/connect/v1/environments/env-123/clusters/lkc-456"
confluent_config.verifySSL = True
confluent_config.KafkaConnectConfig = None
client = KafkaConnectClient(confluent_config)
client.get_connector_config = Mock(return_value={"kafka.topic": "orders_topic"})
topics = client.get_connector_topics("test-connector")
self.assertIsNotNone(topics)
self.assertEqual(len(topics), 1)
self.assertEqual(topics[0].name, "orders_topic")
def test_confluent_cloud_get_connector_topics_multiple(self):
"""Test extracting multiple topics from Confluent Cloud connector config"""
confluent_config = Mock(spec=KafkaConnectConnection)
confluent_config.hostPort = "https://api.confluent.cloud/connect/v1/environments/env-123/clusters/lkc-456"
confluent_config.verifySSL = True
confluent_config.KafkaConnectConfig = None
client = KafkaConnectClient(confluent_config)
client.get_connector_config = Mock(
return_value={"topics": "topic1,topic2,topic3"}
)
topics = client.get_connector_topics("test-connector")
self.assertIsNotNone(topics)
self.assertEqual(len(topics), 3)
self.assertEqual(topics[0].name, "topic1")
self.assertEqual(topics[1].name, "topic2")
self.assertEqual(topics[2].name, "topic3")
def test_confluent_cloud_database_include_list(self):
"""Test that database-only config returns None (needs table name)"""
config = {"database.include.list": "mydb"}
client_config = Mock(spec=KafkaConnectConnection)
client_config.hostPort = "http://localhost:8083"
client_config.verifySSL = False
client_config.KafkaConnectConfig = None
client = KafkaConnectClient(client_config)
result = client.get_connector_dataset_info(config)
# Should return None - database alone is not enough
self.assertIsNone(result)
def test_confluent_cloud_table_include_list(self):
"""Test extracting table from Confluent Cloud table.include.list field"""
config = {"table.include.list": "mydb.customers,mydb.orders"}
client_config = Mock(spec=KafkaConnectConnection)
client_config.hostPort = "http://localhost:8083"
client_config.verifySSL = False
client_config.KafkaConnectConfig = None
client = KafkaConnectClient(client_config)
result = client.get_connector_dataset_info(config)
self.assertIsNotNone(result)
self.assertEqual(result.table, "mydb.customers,mydb.orders")
def test_confluent_cloud_database_hostname(self):
"""Test that database-only config returns None (needs table name)"""
config = {"database.hostname": "mysql.example.com"}
client_config = Mock(spec=KafkaConnectConnection)
client_config.hostPort = "http://localhost:8083"
client_config.verifySSL = False
client_config.KafkaConnectConfig = None
client = KafkaConnectClient(client_config)
result = client.get_connector_dataset_info(config)
# Should return None - database alone is not enough for table lineage
self.assertIsNone(result)
def test_debezium_postgres_database_dbname(self):
"""Test that database-only config returns None (needs table name)"""
config = {"database.dbname": "postgres"}
client_config = Mock(spec=KafkaConnectConnection)
client_config.hostPort = "http://localhost:8083"
client_config.verifySSL = False
client_config.KafkaConnectConfig = None
client = KafkaConnectClient(client_config)
result = client.get_connector_dataset_info(config)
# Should return None - database alone is not enough
self.assertIsNone(result)
def test_debezium_topic_prefix(self):
"""Test that database-only config returns None (needs table name)"""
config = {"topic.prefix": "dbserver1"}
client_config = Mock(spec=KafkaConnectConnection)
client_config.hostPort = "http://localhost:8083"
client_config.verifySSL = False
client_config.KafkaConnectConfig = None
client = KafkaConnectClient(client_config)
result = client.get_connector_dataset_info(config)
# Should return None - tables discovered via topic parsing for CDC
self.assertIsNone(result)
def test_mysql_cdc_databases_include(self):
"""Test that database-only config returns None (needs table name)"""
config = {"databases.include": "mydb1,mydb2"}
client_config = Mock(spec=KafkaConnectConnection)
client_config.hostPort = "http://localhost:8083"
client_config.verifySSL = False
client_config.KafkaConnectConfig = None
client = KafkaConnectClient(client_config)
result = client.get_connector_dataset_info(config)
# Should return None - CDC uses topic parsing
self.assertIsNone(result)
def test_mysql_cdc_tables_include(self):
"""Test extracting tables from MySQL CDC V2 tables.include field"""
config = {"tables.include": "db1.users,db1.orders"}
client_config = Mock(spec=KafkaConnectConnection)
client_config.hostPort = "http://localhost:8083"
client_config.verifySSL = False
client_config.KafkaConnectConfig = None
client = KafkaConnectClient(client_config)
result = client.get_connector_dataset_info(config)
self.assertIsNotNone(result)
self.assertEqual(result.table, "db1.users,db1.orders")
def test_snowflake_database_field(self):
"""Test that database-only config returns None (needs table name)"""
config = {"snowflake.database": "ANALYTICS_DB"}
client_config = Mock(spec=KafkaConnectConnection)
client_config.hostPort = "http://localhost:8083"
client_config.verifySSL = False
client_config.KafkaConnectConfig = None
client = KafkaConnectClient(client_config)
result = client.get_connector_dataset_info(config)
# Should return None - database alone is not enough
self.assertIsNone(result)
def test_snowflake_schema_field(self):
"""Test extracting table/schema from Snowflake snowflake.schema field"""
config = {"snowflake.schema": "PUBLIC"}
client_config = Mock(spec=KafkaConnectConnection)
client_config.hostPort = "http://localhost:8083"
client_config.verifySSL = False
client_config.KafkaConnectConfig = None
client = KafkaConnectClient(client_config)
result = client.get_connector_dataset_info(config)
self.assertIsNotNone(result)
self.assertEqual(result.table, "PUBLIC")
def test_sql_server_database_names(self):
"""Test that database-only config returns None (needs table name)"""
config = {"database.names": "AdventureWorks,Northwind"}
client_config = Mock(spec=KafkaConnectConnection)
client_config.hostPort = "http://localhost:8083"
client_config.verifySSL = False
client_config.KafkaConnectConfig = None
client = KafkaConnectClient(client_config)
result = client.get_connector_dataset_info(config)
# Should return None - database alone is not enough
self.assertIsNone(result)
def test_s3_bucket_field(self):
"""Test extracting bucket from S3 s3.bucket field"""
config = {"s3.bucket": "my-data-lake"}
client_config = Mock(spec=KafkaConnectConnection)
client_config.hostPort = "http://localhost:8083"
client_config.verifySSL = False
client_config.KafkaConnectConfig = None
client = KafkaConnectClient(client_config)
result = client.get_connector_dataset_info(config)
self.assertIsNotNone(result)
self.assertEqual(result.container_name, "my-data-lake")
def test_gcs_bucket_field(self):
"""Test extracting bucket from GCS gcs.bucket.name field"""
config = {"gcs.bucket.name": "my-gcs-bucket"}
client_config = Mock(spec=KafkaConnectConnection)
client_config.hostPort = "http://localhost:8083"
client_config.verifySSL = False
client_config.KafkaConnectConfig = None
client = KafkaConnectClient(client_config)
result = client.get_connector_dataset_info(config)
self.assertIsNotNone(result)
self.assertEqual(result.container_name, "my-gcs-bucket")
def test_postgres_sink_connection_host(self):
"""Test that database-only config returns None (needs table name)"""
config = {"connection.host": "postgres.example.com"}
client_config = Mock(spec=KafkaConnectConnection)
client_config.hostPort = "http://localhost:8083"
client_config.verifySSL = False
client_config.KafkaConnectConfig = None
client = KafkaConnectClient(client_config)
result = client.get_connector_dataset_info(config)
# Should return None - database alone is not enough
self.assertIsNone(result)
def test_sink_fields_included(self):
"""Test extracting fields from Sink connector fields.included field"""
config = {"fields.included": "id,name,email,created_at"}
client_config = Mock(spec=KafkaConnectConnection)
client_config.hostPort = "http://localhost:8083"
client_config.verifySSL = False
client_config.KafkaConnectConfig = None
client = KafkaConnectClient(client_config)
result = client.get_connector_dataset_info(config)
self.assertIsNotNone(result)
self.assertEqual(result.table, "id,name,email,created_at")
def test_debezium_mysql_database_exclude_list(self):
"""Test that database-only config returns None (needs table name)"""
config = {"database.exclude.list": "test,temp"}
client_config = Mock(spec=KafkaConnectConnection)
client_config.hostPort = "http://localhost:8083"
client_config.verifySSL = False
client_config.KafkaConnectConfig = None
client = KafkaConnectClient(client_config)
result = client.get_connector_dataset_info(config)
# Should return None - database alone is not enough
self.assertIsNone(result)
def test_debezium_v1_database_server_name(self):
"""Test that CDC connectors with only database.server.name return None
CDC connectors don't have explicit table configs - tables are discovered
via topic name parsing instead.
"""
config = {"database.server.name": "mysql-server-1"}
client_config = Mock(spec=KafkaConnectConnection)
client_config.hostPort = "http://localhost:8083"
client_config.verifySSL = False
client_config.KafkaConnectConfig = None
client = KafkaConnectClient(client_config)
result = client.get_connector_dataset_info(config)
# Should return None because CDC connectors don't have explicit table names
# Tables are discovered via topic parsing instead
self.assertIsNone(result)
def test_debezium_v2_topic_prefix(self):
"""Test that CDC connectors with only topic.prefix return None
CDC connectors don't have explicit table configs - tables are discovered
via topic name parsing instead.
"""
config = {"topic.prefix": "postgres-server-1"}
client_config = Mock(spec=KafkaConnectConnection)
client_config.hostPort = "http://localhost:8083"
client_config.verifySSL = False
client_config.KafkaConnectConfig = None
client = KafkaConnectClient(client_config)
result = client.get_connector_dataset_info(config)
# Should return None - tables discovered via topic parsing
self.assertIsNone(result)
class TestKafkaConnectColumnLineage(TestCase):
"""Test KafkaConnect column-level lineage functionality"""
def test_column_mapping_model(self):
"""Test KafkaConnectColumnMapping model creation"""
mapping = KafkaConnectColumnMapping(source_column="id", target_column="user_id")
self.assertEqual(mapping.source_column, "id")
self.assertEqual(mapping.target_column, "user_id")
def test_dataset_details_with_column_mappings(self):
"""Test KafkaConnectDatasetDetails with column mappings"""
mappings = [
KafkaConnectColumnMapping(source_column="id", target_column="user_id"),
KafkaConnectColumnMapping(source_column="name", target_column="full_name"),
]
dataset = KafkaConnectDatasetDetails(
table="users", database="mydb", column_mappings=mappings
)
self.assertEqual(len(dataset.column_mappings), 2)
self.assertEqual(dataset.column_mappings[0].source_column, "id")
self.assertEqual(dataset.column_mappings[0].target_column, "user_id")
def test_dataset_details_column_mappings_default(self):
"""Test KafkaConnectDatasetDetails column_mappings defaults to empty list"""
dataset = KafkaConnectDatasetDetails(table="users")
self.assertEqual(dataset.column_mappings, [])
def test_extract_column_mappings_with_smt_renames(self):
"""Test extract_column_mappings with SMT ReplaceField transform"""
with patch(
"metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect"
):
mock_config = MagicMock(spec=KafkaConnectConnection)
mock_config.hostPort = "http://localhost:8083"
mock_config.verifySSL = True
mock_config.KafkaConnectConfig = None
client = KafkaConnectClient(mock_config)
config = {
"transforms": "rename",
"transforms.rename.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.rename.renames": "id:user_id,name:full_name",
}
result = client.extract_column_mappings(config)
self.assertIsNotNone(result)
self.assertEqual(len(result), 2)
self.assertEqual(result[0].source_column, "id")
self.assertEqual(result[0].target_column, "user_id")
self.assertEqual(result[1].source_column, "name")
self.assertEqual(result[1].target_column, "full_name")
def test_extract_column_mappings_no_transforms(self):
"""Test extract_column_mappings with no transforms"""
with patch(
"metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect"
):
mock_config = MagicMock(spec=KafkaConnectConnection)
mock_config.hostPort = "http://localhost:8083"
mock_config.verifySSL = True
mock_config.KafkaConnectConfig = None
client = KafkaConnectClient(mock_config)
config = {"some.config": "value"}
result = client.extract_column_mappings(config)
self.assertIsNone(result)
def test_extract_column_mappings_transform_without_renames(self):
"""Test extract_column_mappings with transform but no renames"""
with patch(
"metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect"
):
mock_config = MagicMock(spec=KafkaConnectConnection)
mock_config.hostPort = "http://localhost:8083"
mock_config.verifySSL = True
mock_config.KafkaConnectConfig = None
client = KafkaConnectClient(mock_config)
config = {
"transforms": "mask",
"transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
}
result = client.extract_column_mappings(config)
self.assertIsNone(result)
def test_extract_column_mappings_multiple_transforms(self):
"""Test extract_column_mappings with multiple transforms"""
with patch(
"metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect"
):
mock_config = MagicMock(spec=KafkaConnectConnection)
mock_config.hostPort = "http://localhost:8083"
mock_config.verifySSL = True
mock_config.KafkaConnectConfig = None
client = KafkaConnectClient(mock_config)
config = {
"transforms": "rename,mask",
"transforms.rename.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.rename.renames": "id:user_id",
"transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
}
result = client.extract_column_mappings(config)
self.assertIsNotNone(result)
self.assertEqual(len(result), 1)
self.assertEqual(result[0].source_column, "id")
self.assertEqual(result[0].target_column, "user_id")
def test_get_connector_dataset_info_includes_column_mappings(self):
"""Test get_connector_dataset_info includes column mappings"""
with patch(
"metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect"
):
mock_config = MagicMock(spec=KafkaConnectConnection)
mock_config.hostPort = "http://localhost:8083"
mock_config.verifySSL = True
mock_config.KafkaConnectConfig = None
client = KafkaConnectClient(mock_config)
config = {
"table": "users",
"database": "mydb",
"transforms": "rename",
"transforms.rename.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.rename.renames": "id:user_id",
}
result = client.get_connector_dataset_info(config)
self.assertIsNotNone(result)
self.assertEqual(result.table, "users")
self.assertIsNotNone(result.column_mappings)
self.assertEqual(len(result.column_mappings), 1)
self.assertEqual(result.column_mappings[0].source_column, "id")
self.assertEqual(result.column_mappings[0].target_column, "user_id")
def test_column_lineage_failure_gracefully_handled(self):
"""Test that column lineage building handles errors gracefully"""
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.topic import Topic
from metadata.ingestion.source.pipeline.kafkaconnect.metadata import (
KafkaconnectSource,
)
with patch(
"metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect"
):
# Create a minimal source instance
mock_config = MagicMock(spec=KafkaConnectConnection)
mock_config.hostPort = "http://localhost:8083"
mock_config.verifySSL = True
mock_config.KafkaConnectConfig = None
mock_config.messagingServiceName = "test_kafka"
mock_metadata = Mock()
# Create source with minimal setup - we're only testing build_column_lineage
source = Mock(spec=KafkaconnectSource)
source._get_topic_field_fqn = (
KafkaconnectSource._get_topic_field_fqn.__get__(
source, KafkaconnectSource
)
)
source.build_column_lineage = (
KafkaconnectSource.build_column_lineage.__get__(
source, KafkaconnectSource
)
)
# Create mock entities
mock_table_entity = Mock(spec=Table)
mock_table_entity.columns = []
mock_topic_entity = Mock(spec=Topic)
mock_topic_name = Mock()
mock_topic_name.root = "test-topic"
mock_topic_entity.name = mock_topic_name
# Missing messageSchema will cause column lineage to return None
mock_topic_entity.messageSchema = None
pipeline_details = KafkaConnectPipelineDetails(
name="test-connector",
status="RUNNING",
conn_type="source",
)
# Test column lineage build - should return None gracefully without raising
result = source.build_column_lineage(
from_entity=mock_table_entity,
to_entity=mock_topic_entity,
topic_entity=mock_topic_entity,
pipeline_details=pipeline_details,
)
# Should return None when no column lineage can be built
self.assertIsNone(result)
class TestCDCTopicParsing(TestCase):
"""Test CDC topic name parsing functionality"""
def test_parse_cdc_topic_three_parts_standard(self):
"""Test parsing CDC topic with 3 parts: {server}.{database}.{table}"""
from metadata.ingestion.source.pipeline.kafkaconnect.client import (
parse_cdc_topic_name,
)
result = parse_cdc_topic_name("MysqlKafkaV2.ecommerce.orders", "MysqlKafkaV2")
self.assertEqual(result, {"database": "ecommerce", "table": "orders"})
def test_parse_cdc_topic_three_parts_postgres(self):
"""Test parsing PostgreSQL CDC topic with schema.database.table"""
from metadata.ingestion.source.pipeline.kafkaconnect.client import (
parse_cdc_topic_name,
)
result = parse_cdc_topic_name(
"PostgresKafkaCDC.public.orders", "PostgresKafkaCDC"
)
self.assertEqual(result, {"database": "public", "table": "orders"})
def test_parse_cdc_topic_two_parts(self):
"""Test parsing CDC topic with 2 parts: {database}.{table}"""
from metadata.ingestion.source.pipeline.kafkaconnect.client import (
parse_cdc_topic_name,
)
result = parse_cdc_topic_name("ecommerce.customers")
self.assertEqual(result, {"database": "ecommerce", "table": "customers"})
def test_parse_cdc_topic_single_part_with_server_name(self):
"""Test parsing CDC topic with 1 part when server name is provided"""
from metadata.ingestion.source.pipeline.kafkaconnect.client import (
parse_cdc_topic_name,
)
result = parse_cdc_topic_name("orders", "MysqlKafkaV2")
self.assertEqual(result, {"database": "MysqlKafkaV2", "table": "orders"})
def test_parse_cdc_topic_single_part_without_server_name(self):
"""Test parsing CDC topic with 1 part without server name returns empty"""
from metadata.ingestion.source.pipeline.kafkaconnect.client import (
parse_cdc_topic_name,
)
result = parse_cdc_topic_name("orders")
self.assertEqual(result, {})
def test_parse_cdc_topic_skip_internal_topics(self):
"""Test that internal topics are skipped"""
from metadata.ingestion.source.pipeline.kafkaconnect.client import (
parse_cdc_topic_name,
)
# Kafka internal topics
self.assertEqual(parse_cdc_topic_name("_schemas"), {})
self.assertEqual(parse_cdc_topic_name("__consumer_offsets"), {})
self.assertEqual(parse_cdc_topic_name("dbhistory.mysql"), {})
def test_parse_cdc_topic_empty_input(self):
"""Test parsing empty topic name returns empty dict"""
from metadata.ingestion.source.pipeline.kafkaconnect.client import (
parse_cdc_topic_name,
)
result = parse_cdc_topic_name("")
self.assertEqual(result, {})
result = parse_cdc_topic_name(None)
self.assertEqual(result, {})
def test_parse_cdc_topic_case_insensitive_server_match(self):
"""Test that server name matching is case-insensitive"""
from metadata.ingestion.source.pipeline.kafkaconnect.client import (
parse_cdc_topic_name,
)
# Three part topic with different case
result = parse_cdc_topic_name("mysqlkafkav2.ecommerce.orders", "MysqlKafkaV2")
self.assertEqual(result, {"database": "ecommerce", "table": "orders"})
# Two part topic with different case
result = parse_cdc_topic_name("mysqlkafkav2.orders", "MysqlKafkaV2")
self.assertEqual(result, {"database": "mysqlkafkav2", "table": "orders"})
def test_parse_cdc_topic_sql_server_pattern(self):
"""Test parsing SQL Server CDC topic patterns"""
from metadata.ingestion.source.pipeline.kafkaconnect.client import (
parse_cdc_topic_name,
)
# SQL Server typically uses dbo schema
result = parse_cdc_topic_name("SqlServerCDC.dbo.users", "SqlServerCDC")
self.assertEqual(result, {"database": "dbo", "table": "users"})
def test_parse_cdc_topic_mongodb_pattern(self):
"""Test parsing MongoDB CDC topic patterns"""
from metadata.ingestion.source.pipeline.kafkaconnect.client import (
parse_cdc_topic_name,
)
# MongoDB uses database.collection
result = parse_cdc_topic_name("MongoCDC.mydb.users", "MongoCDC")
self.assertEqual(result, {"database": "mydb", "table": "users"})
class TestKafkaConnectCDCColumnExtraction(TestCase):
"""Test CDC column extraction from Debezium schema"""
def _create_mock_source(self):
"""Helper to create a minimal mock source for testing"""
from metadata.ingestion.source.pipeline.kafkaconnect.metadata import (
KafkaconnectSource,
)
# Create a mock source that bypasses __init__
source = object.__new__(KafkaconnectSource)
return source
def setUp(self):
"""Set up test fixtures"""
# Create a mock Debezium CDC topic with nested envelope structure
self.cdc_topic = MagicMock()
self.cdc_topic.name = "MysqlKafkaV2.ecommerce.orders"
self.cdc_topic.fullyQualifiedName.root = (
'KafkaProd."MysqlKafkaV2.ecommerce.orders"'
)
# Mock message schema with CDC structure
self.cdc_topic.messageSchema = MagicMock()
self.cdc_topic.messageSchema.schemaText = '{"type":"object","title":"MysqlKafkaV2.ecommerce.orders.Envelope","properties":{"op":{"type":"string"},"before":{"oneOf":[{"type":"null"},{"type":"object","properties":{"id":{"type":"integer"},"order_number":{"type":"string"},"customer_name":{"type":"string"}}}]},"after":{"oneOf":[{"type":"null"},{"type":"object","properties":{"id":{"type":"integer"},"order_number":{"type":"string"},"customer_name":{"type":"string"},"customer_email":{"type":"string"},"product_name":{"type":"string"}}}]},"source":{"type":"object","properties":{"version":{"type":"string"}}},"ts_ms":{"type":"integer"}}}'
# Mock schema fields - single envelope with CDC children
# Use a helper function to create field names with root attribute
def create_field_name(name_str):
name_obj = MagicMock()
name_obj.root = name_str
return name_obj
envelope_field = MagicMock()
envelope_field.name = create_field_name(
"MysqlKafkaV2.ecommerce.orders.Envelope"
)
envelope_field.fullyQualifiedName.root = 'KafkaProd."MysqlKafkaV2.ecommerce.orders".MysqlKafkaV2.ecommerce.orders.Envelope'
# CDC envelope children
op_field = MagicMock()
op_field.name = create_field_name("op")
op_field.children = None
op_field.fullyQualifiedName.root = 'KafkaProd."MysqlKafkaV2.ecommerce.orders".MysqlKafkaV2.ecommerce.orders.Envelope.op'
before_field = MagicMock()
before_field.name = create_field_name("before")
before_field.children = None
before_field.fullyQualifiedName.root = 'KafkaProd."MysqlKafkaV2.ecommerce.orders".MysqlKafkaV2.ecommerce.orders.Envelope.before'
after_field = MagicMock()
after_field.name = create_field_name("after")
after_field.children = None
after_field.fullyQualifiedName.root = 'KafkaProd."MysqlKafkaV2.ecommerce.orders".MysqlKafkaV2.ecommerce.orders.Envelope.after'
source_field = MagicMock()
source_field.name = create_field_name("source")
source_field.children = [MagicMock()] # Has children (version field)
ts_field = MagicMock()
ts_field.name = create_field_name("ts_ms")
ts_field.children = None
envelope_field.children = [
op_field,
before_field,
after_field,
source_field,
ts_field,
]
self.cdc_topic.messageSchema.schemaFields = [envelope_field]
def test_extract_columns_from_cdc_topic(self):
"""Test extracting columns from Debezium CDC topic schema text"""
source = self._create_mock_source()
# Extract columns from CDC topic
columns = source._extract_columns_from_entity(self.cdc_topic)
# Should extract columns from 'after' field in schema text
self.assertIsNotNone(columns)
self.assertIn("id", columns)
self.assertIn("order_number", columns)
self.assertIn("customer_name", columns)
self.assertIn("customer_email", columns)
self.assertIn("product_name", columns)
# Should have 5 columns total
self.assertEqual(len(columns), 5)
def test_get_topic_field_fqn_for_cdc(self):
"""Test constructing FQN for CDC topic fields"""
source = self._create_mock_source()
# Get FQN for a CDC column
fqn = source._get_topic_field_fqn(self.cdc_topic, "id")
# Should construct FQN manually for CDC envelope structure
self.assertIsNotNone(fqn)
self.assertIn("MysqlKafkaV2.ecommerce.orders.Envelope", fqn)
self.assertTrue(fqn.endswith(".id"))
def test_cdc_envelope_detection(self):
"""Test that Debezium CDC envelope is correctly detected"""
source = self._create_mock_source()
columns = source._extract_columns_from_entity(self.cdc_topic)
# Should not return CDC envelope fields (op, before, after, source, ts_ms)
self.assertNotIn("op", columns)
self.assertNotIn("before", columns)
self.assertNotIn("after", columns)
self.assertNotIn("source", columns)
self.assertNotIn("ts_ms", columns)
# Should return actual table columns
self.assertIn("id", columns)
self.assertIn("order_number", columns)
def test_non_cdc_topic_column_extraction(self):
"""Test that non-CDC topics still work correctly"""
source = self._create_mock_source()
# Helper to create field names
def create_field_name(name_str):
name_obj = MagicMock()
name_obj.root = name_str
return name_obj
# Create a regular (non-CDC) topic
regular_topic = MagicMock()
regular_topic.name = create_field_name("orders")
regular_topic.fullyQualifiedName.root = "KafkaProd.orders"
# Mock regular fields (not CDC envelope)
id_field = MagicMock()
id_field.name = create_field_name("id")
id_field.children = None
id_field.fullyQualifiedName.root = "KafkaProd.orders.id"
name_field = MagicMock()
name_field.name = create_field_name("customer_name")
name_field.children = None
name_field.fullyQualifiedName.root = "KafkaProd.orders.customer_name"
regular_topic.messageSchema = MagicMock()
regular_topic.messageSchema.schemaFields = [id_field, name_field]
columns = source._extract_columns_from_entity(regular_topic)
# Should extract regular fields
self.assertEqual(len(columns), 2)
self.assertIn("id", columns)
self.assertIn("customer_name", columns)
def test_cdc_schema_text_missing(self):
"""Test handling CDC topic without schema text"""
source = self._create_mock_source()
# Helper to create field names
def create_field_name(name_str):
name_obj = MagicMock()
name_obj.root = name_str
return name_obj
# Create CDC topic without schemaText
cdc_topic_no_text = MagicMock()
cdc_topic_no_text.name = create_field_name("MysqlKafkaV2.ecommerce.orders")
# CDC envelope structure but no schemaText
envelope_field = MagicMock()
envelope_field.name = create_field_name(
"MysqlKafkaV2.ecommerce.orders.Envelope"
)
op_field = MagicMock()
op_field.name = create_field_name("op")
op_field.children = None
before_field = MagicMock()
before_field.name = create_field_name("before")
before_field.children = None
after_field = MagicMock()
after_field.name = create_field_name("after")
after_field.children = None
envelope_field.children = [op_field, before_field, after_field]
cdc_topic_no_text.messageSchema = MagicMock()
cdc_topic_no_text.messageSchema.schemaText = None # No schema text
cdc_topic_no_text.messageSchema.schemaFields = [envelope_field]
columns = source._extract_columns_from_entity(cdc_topic_no_text)
# Should return empty list when schema text is not available
self.assertEqual(columns, [])
def test_cdc_with_before_field(self):
"""Test CDC extraction prefers 'after' but falls back to 'before'"""
source = self._create_mock_source()
# Create CDC topic with only 'before' field having data (after only has null)
cdc_topic_before = MagicMock()
cdc_topic_before.name = "Test.cdc.topic"
cdc_topic_before.fullyQualifiedName.root = "KafkaProd.Test.cdc.topic"
cdc_topic_before.messageSchema = MagicMock()
cdc_topic_before.messageSchema.schemaText = '{"type":"object","properties":{"op":{"type":"string"},"before":{"oneOf":[{"type":"null"},{"type":"object","properties":{"field1":{"type":"string"},"field2":{"type":"integer"}}}]},"after":{"oneOf":[{"type":"null"}]}}}'
cdc_topic_before.messageSchema.schemaFields = []
columns = source._extract_columns_from_entity(cdc_topic_before)
# Should extract from 'before' field when 'after' only has null
self.assertIn("field1", columns)
self.assertIn("field2", columns)