diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect/sink_connectors.py b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect/sink_connectors.py index 5810b66721..2c77885edc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect/sink_connectors.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect/sink_connectors.py @@ -1,3 +1,4 @@ +import logging import re from dataclasses import dataclass from typing import Dict, Iterable, List, Optional, Tuple @@ -9,6 +10,81 @@ from datahub.ingestion.source.kafka_connect.common import ( KafkaConnectLineage, ) +logger = logging.getLogger(__name__) + + +class RegexRouterTransform: + """Helper class to handle RegexRouter transformations for topic/table names.""" + + def __init__(self, config: Dict[str, str]) -> None: + self.transforms = self._parse_transforms(config) + + def _parse_transforms(self, config: Dict[str, str]) -> List[Dict[str, str]]: + """Parse transforms configuration from connector config.""" + transforms_list: List[Dict[str, str]] = [] + + # Get the transforms parameter + transforms_param: str = config.get("transforms", "") + if not transforms_param: + return transforms_list + + # Parse individual transforms + transform_names: List[str] = [ + name.strip() for name in transforms_param.split(",") + ] + + for transform_name in transform_names: + if not transform_name: + continue + transform_config: Dict[str, str] = {} + transform_prefix: str = f"transforms.{transform_name}." + + # Extract transform configuration + for key, value in config.items(): + if key.startswith(transform_prefix): + config_key: str = key[len(transform_prefix) :] + transform_config[config_key] = value + + # Only process RegexRouter transforms + if ( + transform_config.get("type") + == "org.apache.kafka.connect.transforms.RegexRouter" + ): + transform_config["name"] = transform_name + transforms_list.append(transform_config) + + return transforms_list + + def apply_transforms(self, topic_name: str) -> str: + """Apply RegexRouter transforms to the topic name using Java regex.""" + result: str = topic_name + + for transform in self.transforms: + regex_pattern: Optional[str] = transform.get("regex") + replacement: str = transform.get("replacement", "") + + if regex_pattern: + try: + # Use Java Pattern and Matcher for exact Kafka Connect compatibility + from java.util.regex import Pattern + + pattern = Pattern.compile(regex_pattern) + matcher = pattern.matcher(result) + + if matcher.find(): + # Reset matcher to beginning for replaceFirst + matcher.reset() + result = matcher.replaceFirst(replacement) + logger.debug( + f"Applied transform {transform['name']}: {topic_name} -> {result}" + ) + except Exception as e: + logger.warning( + f"Invalid regex pattern in transform {transform['name']}: {e}" + ) + + return str(result) + @dataclass class ConfluentS3SinkConnector(BaseConnector): @@ -18,28 +94,35 @@ class ConfluentS3SinkConnector(BaseConnector): bucket: str topics_dir: str topics: Iterable[str] + regex_router: RegexRouterTransform def _get_parser(self, connector_manifest: ConnectorManifest) -> S3SinkParser: # https://docs.confluent.io/kafka-connectors/s3-sink/current/configuration_options.html#s3 - bucket = connector_manifest.config.get("s3.bucket.name") + bucket: Optional[str] = connector_manifest.config.get("s3.bucket.name") if not bucket: raise ValueError( "Could not find 's3.bucket.name' in connector configuration" ) # https://docs.confluent.io/kafka-connectors/s3-sink/current/configuration_options.html#storage - topics_dir = connector_manifest.config.get("topics.dir", "topics") + topics_dir: str = connector_manifest.config.get("topics.dir", "topics") + + # Create RegexRouterTransform instance + regex_router: RegexRouterTransform = RegexRouterTransform( + connector_manifest.config + ) return self.S3SinkParser( target_platform="s3", bucket=bucket, topics_dir=topics_dir, topics=connector_manifest.topic_names, + regex_router=regex_router, ) def extract_flow_property_bag(self) -> Dict[str, str]: # Mask/Remove properties that may reveal credentials - flow_property_bag = { + flow_property_bag: Dict[str, str] = { k: v for k, v in self.connector_manifest.config.items() if k @@ -54,11 +137,17 @@ class ConfluentS3SinkConnector(BaseConnector): def extract_lineages(self) -> List[KafkaConnectLineage]: try: - parser = self._get_parser(self.connector_manifest) + parser: ConfluentS3SinkConnector.S3SinkParser = self._get_parser( + self.connector_manifest + ) lineages: List[KafkaConnectLineage] = list() for topic in parser.topics: - target_dataset = f"{parser.bucket}/{parser.topics_dir}/{topic}" + # Apply RegexRouter transformations using the RegexRouterTransform class + transformed_topic: str = parser.regex_router.apply_transforms(topic) + target_dataset: str = ( + f"{parser.bucket}/{parser.topics_dir}/{transformed_topic}" + ) lineages.append( KafkaConnectLineage( @@ -86,6 +175,7 @@ class SnowflakeSinkConnector(BaseConnector): database_name: str schema_name: str topics_to_tables: Dict[str, str] + regex_router: RegexRouterTransform def get_table_name_from_topic_name(self, topic_name: str) -> str: """ @@ -93,7 +183,7 @@ class SnowflakeSinkConnector(BaseConnector): Refer below link for more info https://docs.snowflake.com/en/user-guide/kafka-connector-overview#target-tables-for-kafka-topics """ - table_name = re.sub("[^a-zA-Z0-9_]", "_", topic_name) + table_name: str = re.sub("[^a-zA-Z0-9_]", "_", topic_name) if re.match("^[^a-zA-Z_].*", table_name): table_name = "_" + table_name # Connector may append original topic's hash code as suffix for conflict resolution @@ -106,8 +196,13 @@ class SnowflakeSinkConnector(BaseConnector): self, connector_manifest: ConnectorManifest, ) -> SnowflakeParser: - database_name = connector_manifest.config["snowflake.database.name"] - schema_name = connector_manifest.config["snowflake.schema.name"] + database_name: str = connector_manifest.config["snowflake.database.name"] + schema_name: str = connector_manifest.config["snowflake.schema.name"] + + # Create RegexRouterTransform instance + regex_router: RegexRouterTransform = RegexRouterTransform( + connector_manifest.config + ) # Fetch user provided topic to table map provided_topics_to_tables: Dict[str, str] = {} @@ -121,24 +216,30 @@ class SnowflakeSinkConnector(BaseConnector): topics_to_tables: Dict[str, str] = {} # Extract lineage for only those topics whose data ingestion started for topic in connector_manifest.topic_names: + # Apply transforms first to get the transformed topic name + transformed_topic: str = regex_router.apply_transforms(topic) + if topic in provided_topics_to_tables: # If user provided which table to get mapped with this topic topics_to_tables[topic] = provided_topics_to_tables[topic] else: - # Else connector converts topic name to a valid Snowflake table name. - topics_to_tables[topic] = self.get_table_name_from_topic_name(topic) + # Use the transformed topic name to generate table name + topics_to_tables[topic] = self.get_table_name_from_topic_name( + transformed_topic + ) return self.SnowflakeParser( database_name=database_name, schema_name=schema_name, topics_to_tables=topics_to_tables, + regex_router=regex_router, ) def extract_flow_property_bag(self) -> Dict[str, str]: # For all snowflake sink connector properties, refer below link # https://docs.snowflake.com/en/user-guide/kafka-connector-install#configuring-the-kafka-connector # remove private keys, secrets from properties - flow_property_bag = { + flow_property_bag: Dict[str, str] = { k: v for k, v in self.connector_manifest.config.items() if k @@ -153,10 +254,12 @@ class SnowflakeSinkConnector(BaseConnector): def extract_lineages(self) -> List[KafkaConnectLineage]: lineages: List[KafkaConnectLineage] = list() - parser = self.get_parser(self.connector_manifest) + parser: SnowflakeSinkConnector.SnowflakeParser = self.get_parser( + self.connector_manifest + ) for topic, table in parser.topics_to_tables.items(): - target_dataset = f"{parser.database_name}.{parser.schema_name}.{table}" + target_dataset: str = f"{parser.database_name}.{parser.schema_name}.{table}" lineages.append( KafkaConnectLineage( source_dataset=topic, @@ -176,7 +279,8 @@ class BigQuerySinkConnector(BaseConnector): project: str target_platform: str sanitizeTopics: bool - transforms: list + transforms: List[Dict[str, str]] + regex_router: RegexRouterTransform topicsToTables: Optional[str] = None datasets: Optional[str] = None defaultDataset: Optional[str] = None @@ -186,16 +290,18 @@ class BigQuerySinkConnector(BaseConnector): self, connector_manifest: ConnectorManifest, ) -> BQParser: - project = connector_manifest.config["project"] - sanitizeTopics = connector_manifest.config.get("sanitizeTopics") or "false" - transform_names = ( + project: str = connector_manifest.config["project"] + sanitizeTopics: str = connector_manifest.config.get("sanitizeTopics") or "false" + + # Parse ALL transforms (original BigQuery logic) + transform_names: List[str] = ( self.connector_manifest.config.get("transforms", "").split(",") if self.connector_manifest.config.get("transforms") else [] ) - transforms = [] + transforms: List[Dict[str, str]] = [] for name in transform_names: - transform = {"name": name} + transform: Dict[str, str] = {"name": name} transforms.append(transform) for key in self.connector_manifest.config: if key.startswith(f"transforms.{name}."): @@ -203,8 +309,13 @@ class BigQuerySinkConnector(BaseConnector): self.connector_manifest.config[key] ) + # Create RegexRouterTransform instance for RegexRouter-specific handling + regex_router: RegexRouterTransform = RegexRouterTransform( + connector_manifest.config + ) + if "defaultDataset" in connector_manifest.config: - defaultDataset = connector_manifest.config["defaultDataset"] + defaultDataset: str = connector_manifest.config["defaultDataset"] return self.BQParser( project=project, defaultDataset=defaultDataset, @@ -212,11 +323,14 @@ class BigQuerySinkConnector(BaseConnector): sanitizeTopics=sanitizeTopics.lower() == "true", version="v2", transforms=transforms, + regex_router=regex_router, ) else: # version 1.6.x and similar configs supported - datasets = connector_manifest.config["datasets"] - topicsToTables = connector_manifest.config.get("topicsToTables") + datasets: str = connector_manifest.config["datasets"] + topicsToTables: Optional[str] = connector_manifest.config.get( + "topicsToTables" + ) return self.BQParser( project=project, @@ -225,10 +339,11 @@ class BigQuerySinkConnector(BaseConnector): target_platform="bigquery", sanitizeTopics=sanitizeTopics.lower() == "true", transforms=transforms, + regex_router=regex_router, ) def get_list(self, property: str) -> Iterable[Tuple[str, str]]: - entries = property.split(",") + entries: List[str] = property.split(",") for entry in entries: key, val = entry.rsplit("=") yield (key.strip(), val.strip()) @@ -243,7 +358,7 @@ class BigQuerySinkConnector(BaseConnector): return dataset return None - def sanitize_table_name(self, table_name): + def sanitize_table_name(self, table_name: str) -> str: table_name = re.sub("[^a-zA-Z0-9_]", "_", table_name) if re.match("^[^a-zA-Z_].*", table_name): table_name = "_" + table_name @@ -254,8 +369,8 @@ class BigQuerySinkConnector(BaseConnector): self, topic: str, parser: BQParser ) -> Optional[str]: if parser.version == "v2": - dataset = parser.defaultDataset - parts = topic.split(":") + dataset: Optional[str] = parser.defaultDataset + parts: List[str] = topic.split(":") if len(parts) == 2: dataset = parts[0] table = parts[1] @@ -283,21 +398,9 @@ class BigQuerySinkConnector(BaseConnector): table = self.sanitize_table_name(table) return f"{dataset}.{table}" - def apply_transformations( - self, topic: str, transforms: List[Dict[str, str]] - ) -> str: - for transform in transforms: - if transform["type"] == "org.apache.kafka.connect.transforms.RegexRouter": - regex = transform["regex"] - replacement = transform["replacement"] - pattern = re.compile(regex) - if pattern.match(topic): - topic = pattern.sub(replacement, topic, count=1) - return topic - def extract_flow_property_bag(self) -> Dict[str, str]: # Mask/Remove properties that may reveal credentials - flow_property_bag = { + flow_property_bag: Dict[str, str] = { k: v for k, v in self.connector_manifest.config.items() if k not in ["keyfile"] @@ -307,27 +410,33 @@ class BigQuerySinkConnector(BaseConnector): def extract_lineages(self) -> List[KafkaConnectLineage]: lineages: List[KafkaConnectLineage] = list() - parser = self.get_parser(self.connector_manifest) + parser: BigQuerySinkConnector.BQParser = self.get_parser( + self.connector_manifest + ) if not parser: return lineages - target_platform = parser.target_platform - project = parser.project - transforms = parser.transforms + target_platform: str = parser.target_platform + project: str = parser.project for topic in self.connector_manifest.topic_names: - transformed_topic = self.apply_transformations(topic, transforms) - dataset_table = self.get_dataset_table_for_topic(transformed_topic, parser) + # Apply RegexRouter transformations using the RegexRouterTransform class + transformed_topic: str = parser.regex_router.apply_transforms(topic) + + # Use the transformed topic to determine dataset/table + dataset_table: Optional[str] = self.get_dataset_table_for_topic( + transformed_topic, parser + ) if dataset_table is None: self.report.warning( "Could not find target dataset for topic, please check your connector configuration" f"{self.connector_manifest.name} : {transformed_topic} ", ) continue - target_dataset = f"{project}.{dataset_table}" + target_dataset: str = f"{project}.{dataset_table}" lineages.append( KafkaConnectLineage( - source_dataset=transformed_topic, + source_dataset=topic, # Keep original topic as source source_platform=KAFKA, target_dataset=target_dataset, target_platform=target_platform, diff --git a/metadata-ingestion/tests/unit/test_kafka_connect.py b/metadata-ingestion/tests/unit/test_kafka_connect.py new file mode 100644 index 0000000000..9f799659df --- /dev/null +++ b/metadata-ingestion/tests/unit/test_kafka_connect.py @@ -0,0 +1,547 @@ +import logging +from typing import Dict, List, Tuple +from unittest.mock import Mock, patch + +import jpype +import jpype.imports + +# Import the classes we're testing +from datahub.ingestion.source.kafka_connect.common import ( + ConnectorManifest, + KafkaConnectSourceConfig, + KafkaConnectSourceReport, +) +from datahub.ingestion.source.kafka_connect.sink_connectors import ( + BigQuerySinkConnector, + ConfluentS3SinkConnector, + RegexRouterTransform, + SnowflakeSinkConnector, +) +from datahub.ingestion.source.kafka_connect.source_connectors import ( + ConfluentJDBCSourceConnector, +) + +logger = logging.getLogger(__name__) + +if not jpype.isJVMStarted(): + jpype.startJVM() + + +class TestRegexRouterTransform: + """Test the RegexRouterTransform class.""" + + def test_no_transforms_configured(self) -> None: + """Test when no transforms are configured.""" + config: Dict[str, str] = {"connector.class": "some.connector"} + transform: RegexRouterTransform = RegexRouterTransform(config) + + result: str = transform.apply_transforms("test-topic") + assert result == "test-topic" + + def test_non_regex_router_transforms(self) -> None: + """Test when transforms exist but none are RegexRouter.""" + config: Dict[str, str] = { + "transforms": "MyTransform", + "transforms.MyTransform.type": "org.apache.kafka.connect.transforms.InsertField", + "transforms.MyTransform.field": "timestamp", + } + transform: RegexRouterTransform = RegexRouterTransform(config) + + result: str = transform.apply_transforms("test-topic") + assert result == "test-topic" + + def test_single_regex_router_transform(self) -> None: + """Test single RegexRouter transformation.""" + config: Dict[str, str] = { + "transforms": "TableNameTransformation", + "transforms.TableNameTransformation.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.TableNameTransformation.regex": ".*", + "transforms.TableNameTransformation.replacement": "my_sink_table", + } + transform: RegexRouterTransform = RegexRouterTransform(config) + + result: str = transform.apply_transforms("source-topic") + assert result == "my_sink_table" + + def test_multiple_regex_router_transforms(self) -> None: + """Test multiple RegexRouter transformations applied in sequence.""" + config: Dict[str, str] = { + "transforms": "First,Second", + "transforms.First.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.First.regex": "user-(.*)", + "transforms.First.replacement": "customer_$1", + "transforms.Second.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.Second.regex": "customer_(.*)", + "transforms.Second.replacement": "final_$1", + } + transform: RegexRouterTransform = RegexRouterTransform(config) + + result: str = transform.apply_transforms("user-events") + assert result == "final_events" + + def test_mysql_source_config_example(self) -> None: + """Test the specific MySQL source configuration from the example.""" + config: Dict[str, str] = { + "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", + "mode": "incrementing", + "incrementing.column.name": "id", + "tasks.max": "1", + "connection.url": "${env:MYSQL_CONNECTION_URL}", + "transforms": "TotalReplacement", + "transforms.TotalReplacement.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.TotalReplacement.regex": ".*(book)", + "transforms.TotalReplacement.replacement": "my-new-topic-$1", + } + transform: RegexRouterTransform = RegexRouterTransform(config) + + # Test with a topic that matches the pattern + result: str = transform.apply_transforms("library-book") + assert result == "my-new-topic-book" + + # Test with a topic that doesn't match + result = transform.apply_transforms("user-data") + assert result == "user-data" # Should remain unchanged + + def test_mixed_transforms(self) -> None: + """Test mix of RegexRouter and other transforms.""" + config: Dict[str, str] = { + "transforms": "NonRouter,Router,AnotherNonRouter", + "transforms.NonRouter.type": "org.apache.kafka.connect.transforms.InsertField", + "transforms.NonRouter.field": "timestamp", + "transforms.Router.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.Router.regex": "events-(.*)", + "transforms.Router.replacement": "processed_$1", + "transforms.AnotherNonRouter.type": "org.apache.kafka.connect.transforms.MaskField", + "transforms.AnotherNonRouter.fields": "sensitive", + } + transform: RegexRouterTransform = RegexRouterTransform(config) + + result: str = transform.apply_transforms("events-user") + assert result == "processed_user" + + def test_invalid_regex_pattern(self) -> None: + """Test handling of invalid regex patterns.""" + config: Dict[str, str] = { + "transforms": "BadRegex", + "transforms.BadRegex.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.BadRegex.regex": "[invalid", # Invalid regex + "transforms.BadRegex.replacement": "fixed", + } + transform: RegexRouterTransform = RegexRouterTransform(config) + + # Should not crash and return original topic + result: str = transform.apply_transforms("test-topic") + assert result == "test-topic" + + def test_empty_replacement(self) -> None: + """Test with empty replacement string.""" + config: Dict[str, str] = { + "transforms": "EmptyReplace", + "transforms.EmptyReplace.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.EmptyReplace.regex": "prefix-(.*)", + "transforms.EmptyReplace.replacement": "", + } + transform: RegexRouterTransform = RegexRouterTransform(config) + + result: str = transform.apply_transforms("prefix-suffix") + assert result == "" + + def test_whitespace_in_transform_names(self) -> None: + """Test handling of whitespace in transform names.""" + config: Dict[str, str] = { + "transforms": " Transform1 , Transform2 ", + "transforms.Transform1.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.Transform1.regex": "test-(.*)", + "transforms.Transform1.replacement": "result_$1", + } + transform: RegexRouterTransform = RegexRouterTransform(config) + + result: str = transform.apply_transforms("test-data") + assert result == "result_data" + + +class TestBigQuerySinkConnector: + """Test BigQuery sink connector with RegexRouter support.""" + + def create_mock_manifest(self, config: Dict[str, str]) -> ConnectorManifest: + """Helper to create a mock connector manifest.""" + return ConnectorManifest( + name="test-bigquery-connector", + type="sink", + config=config, + tasks={}, + topic_names=["source-topic"], + ) + + def create_mock_dependencies(self) -> Tuple[Mock, Mock]: + """Helper to create mock dependencies.""" + config: Mock = Mock(spec=KafkaConnectSourceConfig) + report: Mock = Mock(spec=KafkaConnectSourceReport) + return config, report + + def test_bigquery_with_regex_router(self) -> None: + """Test BigQuery connector with RegexRouter transformation.""" + connector_config: Dict[str, str] = { + "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", + "project": "my-gcp-project", + "defaultDataset": "ingest", + "transforms": "TableNameTransformation", + "transforms.TableNameTransformation.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.TableNameTransformation.regex": ".*", + "transforms.TableNameTransformation.replacement": "my_sink_table", + } + + manifest: ConnectorManifest = self.create_mock_manifest(connector_config) + config, report = self.create_mock_dependencies() + + connector: BigQuerySinkConnector = BigQuerySinkConnector( + manifest, config, report + ) + lineages: List = connector.extract_lineages() + + assert len(lineages) == 1 + lineage = lineages[0] + assert lineage.source_dataset == "source-topic" + assert lineage.source_platform == "kafka" + assert lineage.target_dataset == "my-gcp-project.ingest.my_sink_table" + assert lineage.target_platform == "bigquery" + + def test_bigquery_with_complex_regex(self) -> None: + """Test BigQuery with complex regex pattern.""" + connector_config: Dict[str, str] = { + "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", + "project": "analytics", + "defaultDataset": "raw", + "transforms": "TopicTransform", + "transforms.TopicTransform.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.TopicTransform.regex": "app_(.*)_events", + "transforms.TopicTransform.replacement": "$1_processed", + } + + manifest: ConnectorManifest = self.create_mock_manifest(connector_config) + manifest.topic_names = ["app_user_events", "app_order_events"] + config, report = self.create_mock_dependencies() + + connector: BigQuerySinkConnector = BigQuerySinkConnector( + manifest, config, report + ) + lineages: List = connector.extract_lineages() + + assert len(lineages) == 2 + + # Check first lineage + user_lineage = next( + line for line in lineages if line.source_dataset == "app_user_events" + ) + assert user_lineage.target_dataset == "analytics.raw.user_processed" + + # Check second lineage + order_lineage = next( + line for line in lineages if line.source_dataset == "app_order_events" + ) + assert order_lineage.target_dataset == "analytics.raw.order_processed" + + def test_bigquery_no_transforms(self) -> None: + """Test BigQuery connector without transforms.""" + connector_config: Dict[str, str] = { + "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", + "project": "my-project", + "defaultDataset": "dataset", + } + + manifest: ConnectorManifest = self.create_mock_manifest(connector_config) + config, report = self.create_mock_dependencies() + + connector: BigQuerySinkConnector = BigQuerySinkConnector( + manifest, config, report + ) + lineages: List = connector.extract_lineages() + + assert len(lineages) == 1 + lineage = lineages[0] + assert lineage.target_dataset == "my-project.dataset.source-topic" + + +class TestS3SinkConnector: + """Test S3 sink connector with RegexRouter support.""" + + def create_mock_manifest(self, config: Dict[str, str]) -> ConnectorManifest: + """Helper to create a mock connector manifest.""" + return ConnectorManifest( + name="test-s3-connector", + type="sink", + config=config, + tasks={}, + topic_names=["user-events"], + ) + + def test_s3_with_regex_router(self) -> None: + """Test S3 connector with RegexRouter transformation.""" + connector_config: Dict[str, str] = { + "connector.class": "io.confluent.connect.s3.S3SinkConnector", + "s3.bucket.name": "my-data-lake", + "topics.dir": "kafka-data", + "transforms": "PathTransform", + "transforms.PathTransform.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.PathTransform.regex": "user-(.*)", + "transforms.PathTransform.replacement": "processed-$1", + } + + manifest: ConnectorManifest = self.create_mock_manifest(connector_config) + config: Mock = Mock(spec=KafkaConnectSourceConfig) + report: Mock = Mock(spec=KafkaConnectSourceReport) + + connector: ConfluentS3SinkConnector = ConfluentS3SinkConnector( + manifest, config, report + ) + lineages: List = connector.extract_lineages() + + assert len(lineages) == 1 + lineage = lineages[0] + assert lineage.source_dataset == "user-events" + assert lineage.source_platform == "kafka" + assert lineage.target_dataset == "my-data-lake/kafka-data/processed-events" + assert lineage.target_platform == "s3" + + +class TestBigquerySinkConnector: + """Test BigQuery sink connector with RegexRouter support.""" + + def create_mock_manifest(self, config: Dict[str, str]) -> ConnectorManifest: + """Helper to create a mock connector manifest.""" + return ConnectorManifest( + name="test-bigquery-connector", + type="sink", + config=config, + tasks={}, + topic_names=["my-source-kafka-topic"], + ) + + def test_bigquery_with_regex_router(self) -> None: + """Test BigQuery connector with RegexRouter transformation.""" + connector_config: Dict[str, str] = { + "autoCreateTables": "true", + "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", + "defaultDataset": "ingest", + "project": "my-gcp-project", + "sanitizeTopics": "true", + "schemaRegistryLocation": "http://schema-registry", + "schemaRetriever": "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever", + "tasks.max": "1", + "topics": "my-source-kafka-topic", + "transforms": "TableNameTransformation", + "transforms.TableNameTransformation.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.TableNameTransformation.regex": ".*", + "transforms.TableNameTransformation.replacement": "my_sink_bigquery_table", + } + + manifest: ConnectorManifest = self.create_mock_manifest(connector_config) + config: Mock = Mock(spec=KafkaConnectSourceConfig) + report: Mock = Mock(spec=KafkaConnectSourceReport) + + connector: BigQuerySinkConnector = BigQuerySinkConnector( + manifest, config, report + ) + lineages: List = connector.extract_lineages() + + # Assert that lineage was created + assert len(lineages) == 1 + + # Verify the lineage details + lineage = lineages[0] + assert lineage.source_platform == "kafka" + assert lineage.source_dataset == "my-source-kafka-topic" + assert lineage.target_platform == "bigquery" + assert lineage.target_dataset == "my-gcp-project.ingest.my_sink_bigquery_table" + + +class TestSnowflakeSinkConnector: + """Test Snowflake sink connector with RegexRouter support.""" + + def create_mock_manifest(self, config: Dict[str, str]) -> ConnectorManifest: + """Helper to create a mock connector manifest.""" + return ConnectorManifest( + name="test-snowflake-connector", + type="sink", + config=config, + tasks={}, + topic_names=["app_logs"], + ) + + def test_snowflake_with_regex_router(self) -> None: + """Test Snowflake connector with RegexRouter transformation.""" + connector_config: Dict[str, str] = { + "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector", + "snowflake.database.name": "ANALYTICS", + "snowflake.schema.name": "RAW", + "transforms": "TableTransform", + "transforms.TableTransform.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.TableTransform.regex": "app_(.*)", + "transforms.TableTransform.replacement": "APPLICATION_$1", + } + + manifest: ConnectorManifest = self.create_mock_manifest(connector_config) + config: Mock = Mock(spec=KafkaConnectSourceConfig) + report: Mock = Mock(spec=KafkaConnectSourceReport) + + connector: SnowflakeSinkConnector = SnowflakeSinkConnector( + manifest, config, report + ) + lineages: List = connector.extract_lineages() + + assert len(lineages) == 1 + lineage = lineages[0] + assert lineage.source_dataset == "app_logs" + assert lineage.source_platform == "kafka" + assert lineage.target_dataset == "ANALYTICS.RAW.APPLICATION_logs" + assert lineage.target_platform == "snowflake" + + +class TestJDBCSourceConnector: + """Test JDBC source connector with RegexRouter support.""" + + def create_mock_manifest(self, config: Dict[str, str]) -> ConnectorManifest: + """Helper to create a mock connector manifest.""" + return ConnectorManifest( + name="mysql_source2", + type="source", + config=config, + tasks={"config": {"tables": "library.book"}}, + topic_names=["library-book"], + ) + + @patch("datahub.ingestion.source.kafka_connect.source_connectors.make_url") + @patch( + "datahub.ingestion.source.kafka_connect.source_connectors.get_platform_from_sqlalchemy_uri" + ) + def test_mysql_source_with_regex_router( + self, mock_platform: Mock, mock_url: Mock + ) -> None: + """Test the specific MySQL source configuration example.""" + # Mock the database connection parsing + mock_url_obj: Mock = Mock() + mock_url_obj.drivername = "mysql+pymysql" + mock_url_obj.host = "localhost" + mock_url_obj.port = 3306 + mock_url_obj.database = "library" + mock_url.return_value = mock_url_obj + mock_platform.return_value = "mysql" + + connector_config: Dict[str, str] = { + "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", + "mode": "incrementing", + "incrementing.column.name": "id", + "tasks.max": "1", + "connection.url": "jdbc:mysql://localhost:3306/library", + "transforms": "TotalReplacement", + "transforms.TotalReplacement.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.TotalReplacement.regex": ".*(book)", + "transforms.TotalReplacement.replacement": "my-new-topic-$1", + } + + manifest: ConnectorManifest = self.create_mock_manifest(connector_config) + config: Mock = Mock(spec=KafkaConnectSourceConfig) + report: Mock = Mock(spec=KafkaConnectSourceReport) + + connector: ConfluentJDBCSourceConnector = ConfluentJDBCSourceConnector( + manifest, config, report + ) + + # Test the transform parsing + parser = connector.get_parser(manifest) + assert len(parser.transforms) == 1 + transform: Dict[str, str] = parser.transforms[0] + assert transform["name"] == "TotalReplacement" + assert transform["type"] == "org.apache.kafka.connect.transforms.RegexRouter" + assert transform["regex"] == ".*(book)" + assert transform["replacement"] == "my-new-topic-$1" + + +class TestIntegration: + """Integration tests for the complete RegexRouter functionality.""" + + def test_end_to_end_bigquery_transformation(self) -> None: + """Test complete end-to-end BigQuery transformation.""" + # Test multiple topics with different transformation patterns + connector_config: Dict[str, str] = { + "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", + "project": "data-warehouse", + "defaultDataset": "staging", + "transforms": "Standardize,Prefix", + "transforms.Standardize.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.Standardize.regex": "raw_(.*)_data", + "transforms.Standardize.replacement": "$1_cleaned", + "transforms.Prefix.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.Prefix.regex": "(.*)_cleaned", + "transforms.Prefix.replacement": "final_$1", + } + + manifest: ConnectorManifest = ConnectorManifest( + name="multi-transform-connector", + type="sink", + config=connector_config, + tasks={}, + topic_names=["raw_users_data", "raw_orders_data", "other_topic"], + ) + + config: Mock = Mock(spec=KafkaConnectSourceConfig) + report: Mock = Mock(spec=KafkaConnectSourceReport) + + connector: BigQuerySinkConnector = BigQuerySinkConnector( + manifest, config, report + ) + lineages: List = connector.extract_lineages() + + # Should have 3 lineages + assert len(lineages) == 3 + + # Check transformed topics + users_lineage = next( + line for line in lineages if line.source_dataset == "raw_users_data" + ) + assert users_lineage.target_dataset == "data-warehouse.staging.final_users" + + orders_lineage = next( + line for line in lineages if line.source_dataset == "raw_orders_data" + ) + assert orders_lineage.target_dataset == "data-warehouse.staging.final_orders" + + # Non-matching topic should remain unchanged + other_lineage = next( + line for line in lineages if line.source_dataset == "other_topic" + ) + assert other_lineage.target_dataset == "data-warehouse.staging.other_topic" + + def test_regex_router_error_handling(self) -> None: + """Test that invalid regex patterns don't crash the system.""" + connector_config: Dict[str, str] = { + "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", + "project": "test-project", + "defaultDataset": "test", + "transforms": "BadRegex", + "transforms.BadRegex.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.BadRegex.regex": "[invalid-regex", # Invalid regex + "transforms.BadRegex.replacement": "fixed", + } + + manifest: ConnectorManifest = ConnectorManifest( + name="error-test-connector", + type="sink", + config=connector_config, + tasks={}, + topic_names=["test-topic"], + ) + + config: Mock = Mock(spec=KafkaConnectSourceConfig) + report: Mock = Mock(spec=KafkaConnectSourceReport) + + # Should not raise an exception + connector: BigQuerySinkConnector = BigQuerySinkConnector( + manifest, config, report + ) + lineages: List = connector.extract_lineages() + + # Should still create lineage with original topic name + assert len(lineages) == 1 + lineage = lineages[0] + assert lineage.target_dataset == "test-project.test.test-topic"