mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-15 10:52:41 +00:00
fix(mongo-source): Add failing test for complex topic name parsing (#15002)
Co-authored-by: Winnie <winnie@wrtn.io>
This commit is contained in:
parent
1934ed45e8
commit
afc6b138b7
@ -412,7 +412,10 @@ class MongoSourceConnector(BaseConnector):
|
|||||||
|
|
||||||
# Escape topic_prefix to handle cases where it contains dots
|
# Escape topic_prefix to handle cases where it contains dots
|
||||||
# Some users configure topic.prefix like "my.mongodb" which breaks the regex
|
# Some users configure topic.prefix like "my.mongodb" which breaks the regex
|
||||||
topic_naming_pattern = rf"{re.escape(topic_prefix)}\.(\w+)\.(\w+)"
|
|
||||||
|
# \w is equivalent to [a-zA-Z0-9_]
|
||||||
|
# So [\w-]+ matches alphanumeric characters, underscores, and hyphens
|
||||||
|
topic_naming_pattern = rf"{re.escape(topic_prefix)}\.([\w-]+)\.([\w-]+)"
|
||||||
|
|
||||||
if not self.connector_manifest.topic_names:
|
if not self.connector_manifest.topic_names:
|
||||||
return lineages
|
return lineages
|
||||||
|
|||||||
@ -19,6 +19,7 @@ from datahub.ingestion.source.kafka_connect.sink_connectors import (
|
|||||||
)
|
)
|
||||||
from datahub.ingestion.source.kafka_connect.source_connectors import (
|
from datahub.ingestion.source.kafka_connect.source_connectors import (
|
||||||
ConfluentJDBCSourceConnector,
|
ConfluentJDBCSourceConnector,
|
||||||
|
MongoSourceConnector,
|
||||||
)
|
)
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@ -545,3 +546,39 @@ class TestIntegration:
|
|||||||
assert len(lineages) == 1
|
assert len(lineages) == 1
|
||||||
lineage = lineages[0]
|
lineage = lineages[0]
|
||||||
assert lineage.target_dataset == "test-project.test.test-topic"
|
assert lineage.target_dataset == "test-project.test.test-topic"
|
||||||
|
|
||||||
|
|
||||||
|
class TestMongoSourceConnector:
|
||||||
|
"""Test Mongo source connector lineage extraction."""
|
||||||
|
|
||||||
|
def create_mock_manifest(self, config: Dict[str, str]) -> ConnectorManifest:
|
||||||
|
"""Helper to create a mock connector manifest."""
|
||||||
|
return ConnectorManifest(
|
||||||
|
name="mongo-source-connector",
|
||||||
|
type="source",
|
||||||
|
config=config,
|
||||||
|
tasks={},
|
||||||
|
topic_names=[
|
||||||
|
"prod.mongo.avro.my-new-database.users",
|
||||||
|
"prod.mongo.avro.-leading-hyphen._leading-underscore",
|
||||||
|
"prod.mongo.avro.!user?<db>=.[]user=logs!",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_mongo_source_lineage_topic_parsing(self) -> None:
|
||||||
|
"""Test MongoDB topic name parsing with various patterns including special characters and filtering of invalid topics."""
|
||||||
|
connector_config: Dict[str, str] = {
|
||||||
|
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
|
||||||
|
"topic.prefix": "prod.mongo.avro",
|
||||||
|
}
|
||||||
|
|
||||||
|
manifest: ConnectorManifest = self.create_mock_manifest(connector_config)
|
||||||
|
config: Mock = Mock(spec=KafkaConnectSourceConfig)
|
||||||
|
report: Mock = Mock(spec=KafkaConnectSourceReport)
|
||||||
|
|
||||||
|
connector: MongoSourceConnector = MongoSourceConnector(manifest, config, report)
|
||||||
|
lineages: List = connector.extract_lineages()
|
||||||
|
|
||||||
|
assert len(lineages) == 2
|
||||||
|
assert lineages[0].source_dataset == "my-new-database.users"
|
||||||
|
assert lineages[1].source_dataset == "-leading-hyphen._leading-underscore"
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user