feat(ingest/kafka-connect): Add more connectors the regexp transformation support (#13748)

This commit is contained in:
Tamas Nemeth 2025-07-03 07:57:50 +01:00 committed by GitHub
parent 69ac3147d4
commit eaf2bf6dec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 703 additions and 47 deletions

View File

@ -1,3 +1,4 @@
import logging
import re
from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional, Tuple
@ -9,6 +10,81 @@ from datahub.ingestion.source.kafka_connect.common import (
KafkaConnectLineage,
)
logger = logging.getLogger(__name__)
class RegexRouterTransform:
"""Helper class to handle RegexRouter transformations for topic/table names."""
def __init__(self, config: Dict[str, str]) -> None:
self.transforms = self._parse_transforms(config)
def _parse_transforms(self, config: Dict[str, str]) -> List[Dict[str, str]]:
"""Parse transforms configuration from connector config."""
transforms_list: List[Dict[str, str]] = []
# Get the transforms parameter
transforms_param: str = config.get("transforms", "")
if not transforms_param:
return transforms_list
# Parse individual transforms
transform_names: List[str] = [
name.strip() for name in transforms_param.split(",")
]
for transform_name in transform_names:
if not transform_name:
continue
transform_config: Dict[str, str] = {}
transform_prefix: str = f"transforms.{transform_name}."
# Extract transform configuration
for key, value in config.items():
if key.startswith(transform_prefix):
config_key: str = key[len(transform_prefix) :]
transform_config[config_key] = value
# Only process RegexRouter transforms
if (
transform_config.get("type")
== "org.apache.kafka.connect.transforms.RegexRouter"
):
transform_config["name"] = transform_name
transforms_list.append(transform_config)
return transforms_list
def apply_transforms(self, topic_name: str) -> str:
"""Apply RegexRouter transforms to the topic name using Java regex."""
result: str = topic_name
for transform in self.transforms:
regex_pattern: Optional[str] = transform.get("regex")
replacement: str = transform.get("replacement", "")
if regex_pattern:
try:
# Use Java Pattern and Matcher for exact Kafka Connect compatibility
from java.util.regex import Pattern
pattern = Pattern.compile(regex_pattern)
matcher = pattern.matcher(result)
if matcher.find():
# Reset matcher to beginning for replaceFirst
matcher.reset()
result = matcher.replaceFirst(replacement)
logger.debug(
f"Applied transform {transform['name']}: {topic_name} -> {result}"
)
except Exception as e:
logger.warning(
f"Invalid regex pattern in transform {transform['name']}: {e}"
)
return str(result)
@dataclass
class ConfluentS3SinkConnector(BaseConnector):
@ -18,28 +94,35 @@ class ConfluentS3SinkConnector(BaseConnector):
bucket: str
topics_dir: str
topics: Iterable[str]
regex_router: RegexRouterTransform
def _get_parser(self, connector_manifest: ConnectorManifest) -> S3SinkParser:
# https://docs.confluent.io/kafka-connectors/s3-sink/current/configuration_options.html#s3
bucket = connector_manifest.config.get("s3.bucket.name")
bucket: Optional[str] = connector_manifest.config.get("s3.bucket.name")
if not bucket:
raise ValueError(
"Could not find 's3.bucket.name' in connector configuration"
)
# https://docs.confluent.io/kafka-connectors/s3-sink/current/configuration_options.html#storage
topics_dir = connector_manifest.config.get("topics.dir", "topics")
topics_dir: str = connector_manifest.config.get("topics.dir", "topics")
# Create RegexRouterTransform instance
regex_router: RegexRouterTransform = RegexRouterTransform(
connector_manifest.config
)
return self.S3SinkParser(
target_platform="s3",
bucket=bucket,
topics_dir=topics_dir,
topics=connector_manifest.topic_names,
regex_router=regex_router,
)
def extract_flow_property_bag(self) -> Dict[str, str]:
# Mask/Remove properties that may reveal credentials
flow_property_bag = {
flow_property_bag: Dict[str, str] = {
k: v
for k, v in self.connector_manifest.config.items()
if k
@ -54,11 +137,17 @@ class ConfluentS3SinkConnector(BaseConnector):
def extract_lineages(self) -> List[KafkaConnectLineage]:
try:
parser = self._get_parser(self.connector_manifest)
parser: ConfluentS3SinkConnector.S3SinkParser = self._get_parser(
self.connector_manifest
)
lineages: List[KafkaConnectLineage] = list()
for topic in parser.topics:
target_dataset = f"{parser.bucket}/{parser.topics_dir}/{topic}"
# Apply RegexRouter transformations using the RegexRouterTransform class
transformed_topic: str = parser.regex_router.apply_transforms(topic)
target_dataset: str = (
f"{parser.bucket}/{parser.topics_dir}/{transformed_topic}"
)
lineages.append(
KafkaConnectLineage(
@ -86,6 +175,7 @@ class SnowflakeSinkConnector(BaseConnector):
database_name: str
schema_name: str
topics_to_tables: Dict[str, str]
regex_router: RegexRouterTransform
def get_table_name_from_topic_name(self, topic_name: str) -> str:
"""
@ -93,7 +183,7 @@ class SnowflakeSinkConnector(BaseConnector):
Refer below link for more info
https://docs.snowflake.com/en/user-guide/kafka-connector-overview#target-tables-for-kafka-topics
"""
table_name = re.sub("[^a-zA-Z0-9_]", "_", topic_name)
table_name: str = re.sub("[^a-zA-Z0-9_]", "_", topic_name)
if re.match("^[^a-zA-Z_].*", table_name):
table_name = "_" + table_name
# Connector may append original topic's hash code as suffix for conflict resolution
@ -106,8 +196,13 @@ class SnowflakeSinkConnector(BaseConnector):
self,
connector_manifest: ConnectorManifest,
) -> SnowflakeParser:
database_name = connector_manifest.config["snowflake.database.name"]
schema_name = connector_manifest.config["snowflake.schema.name"]
database_name: str = connector_manifest.config["snowflake.database.name"]
schema_name: str = connector_manifest.config["snowflake.schema.name"]
# Create RegexRouterTransform instance
regex_router: RegexRouterTransform = RegexRouterTransform(
connector_manifest.config
)
# Fetch user provided topic to table map
provided_topics_to_tables: Dict[str, str] = {}
@ -121,24 +216,30 @@ class SnowflakeSinkConnector(BaseConnector):
topics_to_tables: Dict[str, str] = {}
# Extract lineage for only those topics whose data ingestion started
for topic in connector_manifest.topic_names:
# Apply transforms first to get the transformed topic name
transformed_topic: str = regex_router.apply_transforms(topic)
if topic in provided_topics_to_tables:
# If user provided which table to get mapped with this topic
topics_to_tables[topic] = provided_topics_to_tables[topic]
else:
# Else connector converts topic name to a valid Snowflake table name.
topics_to_tables[topic] = self.get_table_name_from_topic_name(topic)
# Use the transformed topic name to generate table name
topics_to_tables[topic] = self.get_table_name_from_topic_name(
transformed_topic
)
return self.SnowflakeParser(
database_name=database_name,
schema_name=schema_name,
topics_to_tables=topics_to_tables,
regex_router=regex_router,
)
def extract_flow_property_bag(self) -> Dict[str, str]:
# For all snowflake sink connector properties, refer below link
# https://docs.snowflake.com/en/user-guide/kafka-connector-install#configuring-the-kafka-connector
# remove private keys, secrets from properties
flow_property_bag = {
flow_property_bag: Dict[str, str] = {
k: v
for k, v in self.connector_manifest.config.items()
if k
@ -153,10 +254,12 @@ class SnowflakeSinkConnector(BaseConnector):
def extract_lineages(self) -> List[KafkaConnectLineage]:
lineages: List[KafkaConnectLineage] = list()
parser = self.get_parser(self.connector_manifest)
parser: SnowflakeSinkConnector.SnowflakeParser = self.get_parser(
self.connector_manifest
)
for topic, table in parser.topics_to_tables.items():
target_dataset = f"{parser.database_name}.{parser.schema_name}.{table}"
target_dataset: str = f"{parser.database_name}.{parser.schema_name}.{table}"
lineages.append(
KafkaConnectLineage(
source_dataset=topic,
@ -176,7 +279,8 @@ class BigQuerySinkConnector(BaseConnector):
project: str
target_platform: str
sanitizeTopics: bool
transforms: list
transforms: List[Dict[str, str]]
regex_router: RegexRouterTransform
topicsToTables: Optional[str] = None
datasets: Optional[str] = None
defaultDataset: Optional[str] = None
@ -186,16 +290,18 @@ class BigQuerySinkConnector(BaseConnector):
self,
connector_manifest: ConnectorManifest,
) -> BQParser:
project = connector_manifest.config["project"]
sanitizeTopics = connector_manifest.config.get("sanitizeTopics") or "false"
transform_names = (
project: str = connector_manifest.config["project"]
sanitizeTopics: str = connector_manifest.config.get("sanitizeTopics") or "false"
# Parse ALL transforms (original BigQuery logic)
transform_names: List[str] = (
self.connector_manifest.config.get("transforms", "").split(",")
if self.connector_manifest.config.get("transforms")
else []
)
transforms = []
transforms: List[Dict[str, str]] = []
for name in transform_names:
transform = {"name": name}
transform: Dict[str, str] = {"name": name}
transforms.append(transform)
for key in self.connector_manifest.config:
if key.startswith(f"transforms.{name}."):
@ -203,8 +309,13 @@ class BigQuerySinkConnector(BaseConnector):
self.connector_manifest.config[key]
)
# Create RegexRouterTransform instance for RegexRouter-specific handling
regex_router: RegexRouterTransform = RegexRouterTransform(
connector_manifest.config
)
if "defaultDataset" in connector_manifest.config:
defaultDataset = connector_manifest.config["defaultDataset"]
defaultDataset: str = connector_manifest.config["defaultDataset"]
return self.BQParser(
project=project,
defaultDataset=defaultDataset,
@ -212,11 +323,14 @@ class BigQuerySinkConnector(BaseConnector):
sanitizeTopics=sanitizeTopics.lower() == "true",
version="v2",
transforms=transforms,
regex_router=regex_router,
)
else:
# version 1.6.x and similar configs supported
datasets = connector_manifest.config["datasets"]
topicsToTables = connector_manifest.config.get("topicsToTables")
datasets: str = connector_manifest.config["datasets"]
topicsToTables: Optional[str] = connector_manifest.config.get(
"topicsToTables"
)
return self.BQParser(
project=project,
@ -225,10 +339,11 @@ class BigQuerySinkConnector(BaseConnector):
target_platform="bigquery",
sanitizeTopics=sanitizeTopics.lower() == "true",
transforms=transforms,
regex_router=regex_router,
)
def get_list(self, property: str) -> Iterable[Tuple[str, str]]:
entries = property.split(",")
entries: List[str] = property.split(",")
for entry in entries:
key, val = entry.rsplit("=")
yield (key.strip(), val.strip())
@ -243,7 +358,7 @@ class BigQuerySinkConnector(BaseConnector):
return dataset
return None
def sanitize_table_name(self, table_name):
def sanitize_table_name(self, table_name: str) -> str:
table_name = re.sub("[^a-zA-Z0-9_]", "_", table_name)
if re.match("^[^a-zA-Z_].*", table_name):
table_name = "_" + table_name
@ -254,8 +369,8 @@ class BigQuerySinkConnector(BaseConnector):
self, topic: str, parser: BQParser
) -> Optional[str]:
if parser.version == "v2":
dataset = parser.defaultDataset
parts = topic.split(":")
dataset: Optional[str] = parser.defaultDataset
parts: List[str] = topic.split(":")
if len(parts) == 2:
dataset = parts[0]
table = parts[1]
@ -283,21 +398,9 @@ class BigQuerySinkConnector(BaseConnector):
table = self.sanitize_table_name(table)
return f"{dataset}.{table}"
def apply_transformations(
self, topic: str, transforms: List[Dict[str, str]]
) -> str:
for transform in transforms:
if transform["type"] == "org.apache.kafka.connect.transforms.RegexRouter":
regex = transform["regex"]
replacement = transform["replacement"]
pattern = re.compile(regex)
if pattern.match(topic):
topic = pattern.sub(replacement, topic, count=1)
return topic
def extract_flow_property_bag(self) -> Dict[str, str]:
# Mask/Remove properties that may reveal credentials
flow_property_bag = {
flow_property_bag: Dict[str, str] = {
k: v
for k, v in self.connector_manifest.config.items()
if k not in ["keyfile"]
@ -307,27 +410,33 @@ class BigQuerySinkConnector(BaseConnector):
def extract_lineages(self) -> List[KafkaConnectLineage]:
lineages: List[KafkaConnectLineage] = list()
parser = self.get_parser(self.connector_manifest)
parser: BigQuerySinkConnector.BQParser = self.get_parser(
self.connector_manifest
)
if not parser:
return lineages
target_platform = parser.target_platform
project = parser.project
transforms = parser.transforms
target_platform: str = parser.target_platform
project: str = parser.project
for topic in self.connector_manifest.topic_names:
transformed_topic = self.apply_transformations(topic, transforms)
dataset_table = self.get_dataset_table_for_topic(transformed_topic, parser)
# Apply RegexRouter transformations using the RegexRouterTransform class
transformed_topic: str = parser.regex_router.apply_transforms(topic)
# Use the transformed topic to determine dataset/table
dataset_table: Optional[str] = self.get_dataset_table_for_topic(
transformed_topic, parser
)
if dataset_table is None:
self.report.warning(
"Could not find target dataset for topic, please check your connector configuration"
f"{self.connector_manifest.name} : {transformed_topic} ",
)
continue
target_dataset = f"{project}.{dataset_table}"
target_dataset: str = f"{project}.{dataset_table}"
lineages.append(
KafkaConnectLineage(
source_dataset=transformed_topic,
source_dataset=topic, # Keep original topic as source
source_platform=KAFKA,
target_dataset=target_dataset,
target_platform=target_platform,

View File

@ -0,0 +1,547 @@
import logging
from typing import Dict, List, Tuple
from unittest.mock import Mock, patch
import jpype
import jpype.imports
# Import the classes we're testing
from datahub.ingestion.source.kafka_connect.common import (
ConnectorManifest,
KafkaConnectSourceConfig,
KafkaConnectSourceReport,
)
from datahub.ingestion.source.kafka_connect.sink_connectors import (
BigQuerySinkConnector,
ConfluentS3SinkConnector,
RegexRouterTransform,
SnowflakeSinkConnector,
)
from datahub.ingestion.source.kafka_connect.source_connectors import (
ConfluentJDBCSourceConnector,
)
logger = logging.getLogger(__name__)
if not jpype.isJVMStarted():
jpype.startJVM()
class TestRegexRouterTransform:
"""Test the RegexRouterTransform class."""
def test_no_transforms_configured(self) -> None:
"""Test when no transforms are configured."""
config: Dict[str, str] = {"connector.class": "some.connector"}
transform: RegexRouterTransform = RegexRouterTransform(config)
result: str = transform.apply_transforms("test-topic")
assert result == "test-topic"
def test_non_regex_router_transforms(self) -> None:
"""Test when transforms exist but none are RegexRouter."""
config: Dict[str, str] = {
"transforms": "MyTransform",
"transforms.MyTransform.type": "org.apache.kafka.connect.transforms.InsertField",
"transforms.MyTransform.field": "timestamp",
}
transform: RegexRouterTransform = RegexRouterTransform(config)
result: str = transform.apply_transforms("test-topic")
assert result == "test-topic"
def test_single_regex_router_transform(self) -> None:
"""Test single RegexRouter transformation."""
config: Dict[str, str] = {
"transforms": "TableNameTransformation",
"transforms.TableNameTransformation.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.TableNameTransformation.regex": ".*",
"transforms.TableNameTransformation.replacement": "my_sink_table",
}
transform: RegexRouterTransform = RegexRouterTransform(config)
result: str = transform.apply_transforms("source-topic")
assert result == "my_sink_table"
def test_multiple_regex_router_transforms(self) -> None:
"""Test multiple RegexRouter transformations applied in sequence."""
config: Dict[str, str] = {
"transforms": "First,Second",
"transforms.First.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.First.regex": "user-(.*)",
"transforms.First.replacement": "customer_$1",
"transforms.Second.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.Second.regex": "customer_(.*)",
"transforms.Second.replacement": "final_$1",
}
transform: RegexRouterTransform = RegexRouterTransform(config)
result: str = transform.apply_transforms("user-events")
assert result == "final_events"
def test_mysql_source_config_example(self) -> None:
"""Test the specific MySQL source configuration from the example."""
config: Dict[str, str] = {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"incrementing.column.name": "id",
"tasks.max": "1",
"connection.url": "${env:MYSQL_CONNECTION_URL}",
"transforms": "TotalReplacement",
"transforms.TotalReplacement.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.TotalReplacement.regex": ".*(book)",
"transforms.TotalReplacement.replacement": "my-new-topic-$1",
}
transform: RegexRouterTransform = RegexRouterTransform(config)
# Test with a topic that matches the pattern
result: str = transform.apply_transforms("library-book")
assert result == "my-new-topic-book"
# Test with a topic that doesn't match
result = transform.apply_transforms("user-data")
assert result == "user-data" # Should remain unchanged
def test_mixed_transforms(self) -> None:
"""Test mix of RegexRouter and other transforms."""
config: Dict[str, str] = {
"transforms": "NonRouter,Router,AnotherNonRouter",
"transforms.NonRouter.type": "org.apache.kafka.connect.transforms.InsertField",
"transforms.NonRouter.field": "timestamp",
"transforms.Router.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.Router.regex": "events-(.*)",
"transforms.Router.replacement": "processed_$1",
"transforms.AnotherNonRouter.type": "org.apache.kafka.connect.transforms.MaskField",
"transforms.AnotherNonRouter.fields": "sensitive",
}
transform: RegexRouterTransform = RegexRouterTransform(config)
result: str = transform.apply_transforms("events-user")
assert result == "processed_user"
def test_invalid_regex_pattern(self) -> None:
"""Test handling of invalid regex patterns."""
config: Dict[str, str] = {
"transforms": "BadRegex",
"transforms.BadRegex.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.BadRegex.regex": "[invalid", # Invalid regex
"transforms.BadRegex.replacement": "fixed",
}
transform: RegexRouterTransform = RegexRouterTransform(config)
# Should not crash and return original topic
result: str = transform.apply_transforms("test-topic")
assert result == "test-topic"
def test_empty_replacement(self) -> None:
"""Test with empty replacement string."""
config: Dict[str, str] = {
"transforms": "EmptyReplace",
"transforms.EmptyReplace.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.EmptyReplace.regex": "prefix-(.*)",
"transforms.EmptyReplace.replacement": "",
}
transform: RegexRouterTransform = RegexRouterTransform(config)
result: str = transform.apply_transforms("prefix-suffix")
assert result == ""
def test_whitespace_in_transform_names(self) -> None:
"""Test handling of whitespace in transform names."""
config: Dict[str, str] = {
"transforms": " Transform1 , Transform2 ",
"transforms.Transform1.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.Transform1.regex": "test-(.*)",
"transforms.Transform1.replacement": "result_$1",
}
transform: RegexRouterTransform = RegexRouterTransform(config)
result: str = transform.apply_transforms("test-data")
assert result == "result_data"
class TestBigQuerySinkConnector:
"""Test BigQuery sink connector with RegexRouter support."""
def create_mock_manifest(self, config: Dict[str, str]) -> ConnectorManifest:
"""Helper to create a mock connector manifest."""
return ConnectorManifest(
name="test-bigquery-connector",
type="sink",
config=config,
tasks={},
topic_names=["source-topic"],
)
def create_mock_dependencies(self) -> Tuple[Mock, Mock]:
"""Helper to create mock dependencies."""
config: Mock = Mock(spec=KafkaConnectSourceConfig)
report: Mock = Mock(spec=KafkaConnectSourceReport)
return config, report
def test_bigquery_with_regex_router(self) -> None:
"""Test BigQuery connector with RegexRouter transformation."""
connector_config: Dict[str, str] = {
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"project": "my-gcp-project",
"defaultDataset": "ingest",
"transforms": "TableNameTransformation",
"transforms.TableNameTransformation.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.TableNameTransformation.regex": ".*",
"transforms.TableNameTransformation.replacement": "my_sink_table",
}
manifest: ConnectorManifest = self.create_mock_manifest(connector_config)
config, report = self.create_mock_dependencies()
connector: BigQuerySinkConnector = BigQuerySinkConnector(
manifest, config, report
)
lineages: List = connector.extract_lineages()
assert len(lineages) == 1
lineage = lineages[0]
assert lineage.source_dataset == "source-topic"
assert lineage.source_platform == "kafka"
assert lineage.target_dataset == "my-gcp-project.ingest.my_sink_table"
assert lineage.target_platform == "bigquery"
def test_bigquery_with_complex_regex(self) -> None:
"""Test BigQuery with complex regex pattern."""
connector_config: Dict[str, str] = {
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"project": "analytics",
"defaultDataset": "raw",
"transforms": "TopicTransform",
"transforms.TopicTransform.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.TopicTransform.regex": "app_(.*)_events",
"transforms.TopicTransform.replacement": "$1_processed",
}
manifest: ConnectorManifest = self.create_mock_manifest(connector_config)
manifest.topic_names = ["app_user_events", "app_order_events"]
config, report = self.create_mock_dependencies()
connector: BigQuerySinkConnector = BigQuerySinkConnector(
manifest, config, report
)
lineages: List = connector.extract_lineages()
assert len(lineages) == 2
# Check first lineage
user_lineage = next(
line for line in lineages if line.source_dataset == "app_user_events"
)
assert user_lineage.target_dataset == "analytics.raw.user_processed"
# Check second lineage
order_lineage = next(
line for line in lineages if line.source_dataset == "app_order_events"
)
assert order_lineage.target_dataset == "analytics.raw.order_processed"
def test_bigquery_no_transforms(self) -> None:
"""Test BigQuery connector without transforms."""
connector_config: Dict[str, str] = {
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"project": "my-project",
"defaultDataset": "dataset",
}
manifest: ConnectorManifest = self.create_mock_manifest(connector_config)
config, report = self.create_mock_dependencies()
connector: BigQuerySinkConnector = BigQuerySinkConnector(
manifest, config, report
)
lineages: List = connector.extract_lineages()
assert len(lineages) == 1
lineage = lineages[0]
assert lineage.target_dataset == "my-project.dataset.source-topic"
class TestS3SinkConnector:
"""Test S3 sink connector with RegexRouter support."""
def create_mock_manifest(self, config: Dict[str, str]) -> ConnectorManifest:
"""Helper to create a mock connector manifest."""
return ConnectorManifest(
name="test-s3-connector",
type="sink",
config=config,
tasks={},
topic_names=["user-events"],
)
def test_s3_with_regex_router(self) -> None:
"""Test S3 connector with RegexRouter transformation."""
connector_config: Dict[str, str] = {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.bucket.name": "my-data-lake",
"topics.dir": "kafka-data",
"transforms": "PathTransform",
"transforms.PathTransform.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.PathTransform.regex": "user-(.*)",
"transforms.PathTransform.replacement": "processed-$1",
}
manifest: ConnectorManifest = self.create_mock_manifest(connector_config)
config: Mock = Mock(spec=KafkaConnectSourceConfig)
report: Mock = Mock(spec=KafkaConnectSourceReport)
connector: ConfluentS3SinkConnector = ConfluentS3SinkConnector(
manifest, config, report
)
lineages: List = connector.extract_lineages()
assert len(lineages) == 1
lineage = lineages[0]
assert lineage.source_dataset == "user-events"
assert lineage.source_platform == "kafka"
assert lineage.target_dataset == "my-data-lake/kafka-data/processed-events"
assert lineage.target_platform == "s3"
class TestBigquerySinkConnector:
"""Test BigQuery sink connector with RegexRouter support."""
def create_mock_manifest(self, config: Dict[str, str]) -> ConnectorManifest:
"""Helper to create a mock connector manifest."""
return ConnectorManifest(
name="test-bigquery-connector",
type="sink",
config=config,
tasks={},
topic_names=["my-source-kafka-topic"],
)
def test_bigquery_with_regex_router(self) -> None:
"""Test BigQuery connector with RegexRouter transformation."""
connector_config: Dict[str, str] = {
"autoCreateTables": "true",
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"defaultDataset": "ingest",
"project": "my-gcp-project",
"sanitizeTopics": "true",
"schemaRegistryLocation": "http://schema-registry",
"schemaRetriever": "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever",
"tasks.max": "1",
"topics": "my-source-kafka-topic",
"transforms": "TableNameTransformation",
"transforms.TableNameTransformation.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.TableNameTransformation.regex": ".*",
"transforms.TableNameTransformation.replacement": "my_sink_bigquery_table",
}
manifest: ConnectorManifest = self.create_mock_manifest(connector_config)
config: Mock = Mock(spec=KafkaConnectSourceConfig)
report: Mock = Mock(spec=KafkaConnectSourceReport)
connector: BigQuerySinkConnector = BigQuerySinkConnector(
manifest, config, report
)
lineages: List = connector.extract_lineages()
# Assert that lineage was created
assert len(lineages) == 1
# Verify the lineage details
lineage = lineages[0]
assert lineage.source_platform == "kafka"
assert lineage.source_dataset == "my-source-kafka-topic"
assert lineage.target_platform == "bigquery"
assert lineage.target_dataset == "my-gcp-project.ingest.my_sink_bigquery_table"
class TestSnowflakeSinkConnector:
"""Test Snowflake sink connector with RegexRouter support."""
def create_mock_manifest(self, config: Dict[str, str]) -> ConnectorManifest:
"""Helper to create a mock connector manifest."""
return ConnectorManifest(
name="test-snowflake-connector",
type="sink",
config=config,
tasks={},
topic_names=["app_logs"],
)
def test_snowflake_with_regex_router(self) -> None:
"""Test Snowflake connector with RegexRouter transformation."""
connector_config: Dict[str, str] = {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"snowflake.database.name": "ANALYTICS",
"snowflake.schema.name": "RAW",
"transforms": "TableTransform",
"transforms.TableTransform.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.TableTransform.regex": "app_(.*)",
"transforms.TableTransform.replacement": "APPLICATION_$1",
}
manifest: ConnectorManifest = self.create_mock_manifest(connector_config)
config: Mock = Mock(spec=KafkaConnectSourceConfig)
report: Mock = Mock(spec=KafkaConnectSourceReport)
connector: SnowflakeSinkConnector = SnowflakeSinkConnector(
manifest, config, report
)
lineages: List = connector.extract_lineages()
assert len(lineages) == 1
lineage = lineages[0]
assert lineage.source_dataset == "app_logs"
assert lineage.source_platform == "kafka"
assert lineage.target_dataset == "ANALYTICS.RAW.APPLICATION_logs"
assert lineage.target_platform == "snowflake"
class TestJDBCSourceConnector:
"""Test JDBC source connector with RegexRouter support."""
def create_mock_manifest(self, config: Dict[str, str]) -> ConnectorManifest:
"""Helper to create a mock connector manifest."""
return ConnectorManifest(
name="mysql_source2",
type="source",
config=config,
tasks={"config": {"tables": "library.book"}},
topic_names=["library-book"],
)
@patch("datahub.ingestion.source.kafka_connect.source_connectors.make_url")
@patch(
"datahub.ingestion.source.kafka_connect.source_connectors.get_platform_from_sqlalchemy_uri"
)
def test_mysql_source_with_regex_router(
self, mock_platform: Mock, mock_url: Mock
) -> None:
"""Test the specific MySQL source configuration example."""
# Mock the database connection parsing
mock_url_obj: Mock = Mock()
mock_url_obj.drivername = "mysql+pymysql"
mock_url_obj.host = "localhost"
mock_url_obj.port = 3306
mock_url_obj.database = "library"
mock_url.return_value = mock_url_obj
mock_platform.return_value = "mysql"
connector_config: Dict[str, str] = {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"incrementing.column.name": "id",
"tasks.max": "1",
"connection.url": "jdbc:mysql://localhost:3306/library",
"transforms": "TotalReplacement",
"transforms.TotalReplacement.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.TotalReplacement.regex": ".*(book)",
"transforms.TotalReplacement.replacement": "my-new-topic-$1",
}
manifest: ConnectorManifest = self.create_mock_manifest(connector_config)
config: Mock = Mock(spec=KafkaConnectSourceConfig)
report: Mock = Mock(spec=KafkaConnectSourceReport)
connector: ConfluentJDBCSourceConnector = ConfluentJDBCSourceConnector(
manifest, config, report
)
# Test the transform parsing
parser = connector.get_parser(manifest)
assert len(parser.transforms) == 1
transform: Dict[str, str] = parser.transforms[0]
assert transform["name"] == "TotalReplacement"
assert transform["type"] == "org.apache.kafka.connect.transforms.RegexRouter"
assert transform["regex"] == ".*(book)"
assert transform["replacement"] == "my-new-topic-$1"
class TestIntegration:
"""Integration tests for the complete RegexRouter functionality."""
def test_end_to_end_bigquery_transformation(self) -> None:
"""Test complete end-to-end BigQuery transformation."""
# Test multiple topics with different transformation patterns
connector_config: Dict[str, str] = {
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"project": "data-warehouse",
"defaultDataset": "staging",
"transforms": "Standardize,Prefix",
"transforms.Standardize.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.Standardize.regex": "raw_(.*)_data",
"transforms.Standardize.replacement": "$1_cleaned",
"transforms.Prefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.Prefix.regex": "(.*)_cleaned",
"transforms.Prefix.replacement": "final_$1",
}
manifest: ConnectorManifest = ConnectorManifest(
name="multi-transform-connector",
type="sink",
config=connector_config,
tasks={},
topic_names=["raw_users_data", "raw_orders_data", "other_topic"],
)
config: Mock = Mock(spec=KafkaConnectSourceConfig)
report: Mock = Mock(spec=KafkaConnectSourceReport)
connector: BigQuerySinkConnector = BigQuerySinkConnector(
manifest, config, report
)
lineages: List = connector.extract_lineages()
# Should have 3 lineages
assert len(lineages) == 3
# Check transformed topics
users_lineage = next(
line for line in lineages if line.source_dataset == "raw_users_data"
)
assert users_lineage.target_dataset == "data-warehouse.staging.final_users"
orders_lineage = next(
line for line in lineages if line.source_dataset == "raw_orders_data"
)
assert orders_lineage.target_dataset == "data-warehouse.staging.final_orders"
# Non-matching topic should remain unchanged
other_lineage = next(
line for line in lineages if line.source_dataset == "other_topic"
)
assert other_lineage.target_dataset == "data-warehouse.staging.other_topic"
def test_regex_router_error_handling(self) -> None:
"""Test that invalid regex patterns don't crash the system."""
connector_config: Dict[str, str] = {
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"project": "test-project",
"defaultDataset": "test",
"transforms": "BadRegex",
"transforms.BadRegex.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.BadRegex.regex": "[invalid-regex", # Invalid regex
"transforms.BadRegex.replacement": "fixed",
}
manifest: ConnectorManifest = ConnectorManifest(
name="error-test-connector",
type="sink",
config=connector_config,
tasks={},
topic_names=["test-topic"],
)
config: Mock = Mock(spec=KafkaConnectSourceConfig)
report: Mock = Mock(spec=KafkaConnectSourceReport)
# Should not raise an exception
connector: BigQuerySinkConnector = BigQuerySinkConnector(
manifest, config, report
)
lineages: List = connector.extract_lineages()
# Should still create lineage with original topic name
assert len(lineages) == 1
lineage = lineages[0]
assert lineage.target_dataset == "test-project.test.test-topic"