feat(ingest): kafka-connect - support mapping for multiple DB instances (#4501)

This commit is contained in:
Arun Vasudevan 2022-03-29 22:46:07 -05:00 committed by GitHub
parent c69310522b
commit c79c778270
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 96 additions and 35 deletions

View File

@ -43,9 +43,11 @@ source:
path_key: MYSQL_CONNECTION_URL
value: jdbc:mysql://test_mysql:3306/librarydb
# Optional mapping of platform types to instance ids
platform_instance_map: # optional
mysql: test_mysql # optional
platform_instance_map: # optional
mysql: test_mysql # optional
connect_to_platform_map: # optional
postgres-connector-finance-db: # optional - Connector name
postgres: core_finance_instance # optional - Platform to instance map
# Credentials
username: admin
password: password
@ -54,23 +56,25 @@ sink:
# sink configs
```
## Config details
Note that a `.` is used to denote nested fields in the YAML recipe.
| Field | Required | Default | Description |
| -------------------------- | -------- | -------------------------- | ------------------------------------------------------- |
| `connect_uri` | ✅ | `"http://localhost:8083/"` | URI to connect to. |
| `username` | | | Kafka Connect username. |
| `password` | | | Kafka Connect password. |
| `cluster_name` | | `"connect-cluster"` | Cluster to ingest from. |
| `provided_configs` | | | Provided Configurations |
| `construct_lineage_workunits` | | `True` | Whether to create the input and output Dataset entities |
| `connector_patterns.deny` | | | List of regex patterns for connectors to include in ingestion. |
| `connector_patterns.allow` | | | List of regex patterns for connectors to exclude from ingestion. |
| `connector_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `platform_instance_map` | | | Platform instance mapping to use when constructing URNs. e.g.`platform_instance_map: { "hive": "warehouse" }` |
| Field | Required | Default | Description |
|-----------------------------| -------- | -------------------------- |----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `connect_uri` | ✅ | `"http://localhost:8083/"` | URI to connect to. |
| `username` | | | Kafka Connect username. |
| `password` | | | Kafka Connect password. |
| `cluster_name` | | `"connect-cluster"` | Cluster to ingest from. |
| `provided_configs` | | | Provided Configurations |
| `construct_lineage_workunits` | | `True` | Whether to create the input and output Dataset entities |
| `connector_patterns.deny` | | | List of regex patterns for connectors to include in ingestion. |
| `connector_patterns.allow` | | | List of regex patterns for connectors to exclude from ingestion. |
| `connector_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `platform_instance_map` | | | Platform instance mapping to use when constructing URNs. e.g.`platform_instance_map: { "hive": "warehouse" }` |
| `connect_to_platform_map` | | | Platform instance mapping when multiple instances for a platform is available. Entry for a platform should be in either `platform_instance_map` or `connect_to_platform_map`. e.g.`connect_to_platform_map: { "postgres-connector-finance-db": "postgres": "core_finance_instance" }` |
## Compatibility

View File

@ -1,5 +1,6 @@
import logging
import re
import sys
from dataclasses import dataclass, field
from typing import Dict, Iterable, List, Optional, Tuple
@ -36,6 +37,7 @@ class KafkaConnectSourceConfig(DatasetLineageProviderConfigBase):
construct_lineage_workunits: bool = True
connector_patterns: AllowDenyPattern = AllowDenyPattern.allow_all()
provided_configs: Optional[List[ProvidedConfig]] = None
connect_to_platform_map: Optional[dict] = None
@dataclass
@ -92,15 +94,45 @@ def unquote(string: str, leading_quote: str = '"', trailing_quote: str = None) -
return string
def get_instance_name(
config: KafkaConnectSourceConfig, kafka_connector_name: str, source_platform: str
) -> Optional[str]:
instance_name = None
if config.connect_to_platform_map:
for connector_name in config.connect_to_platform_map:
if connector_name == kafka_connector_name:
instance_name = config.connect_to_platform_map[connector_name][
source_platform
]
if config.platform_instance_map and config.platform_instance_map.get(
source_platform
):
logger.error(
f"Same source platform {source_platform} configured in both platform_instance_map and connect_to_platform_map"
)
sys.exit(
"Config Error: Same source platform configured in both platform_instance_map and connect_to_platform_map. Fix the config and re-run again."
)
logger.info(
f"Instance name assigned is: {instance_name} for Connector Name {connector_name} and source platform {source_platform}"
)
break
return instance_name
@dataclass
class ConfluentJDBCSourceConnector:
connector_manifest: ConnectorManifest
report: KafkaConnectSourceReport
def __init__(
self, connector_manifest: ConnectorManifest, report: KafkaConnectSourceReport
self,
connector_manifest: ConnectorManifest,
config: KafkaConnectSourceConfig,
report: KafkaConnectSourceReport,
) -> None:
self.connector_manifest = connector_manifest
self.config = config
self.report = report
self._extract_lineages()
@ -278,6 +310,21 @@ class ConfluentJDBCSourceConnector:
return []
def get_dataset_name(
self,
database_name: Optional[str],
instance_name: Optional[str],
source_table: str,
) -> str:
if database_name and instance_name:
dataset_name = instance_name + "." + database_name + "." + source_table
elif database_name:
dataset_name = database_name + "." + source_table
else:
dataset_name = source_table
return dataset_name
def _extract_lineages(self):
lineages: List[KafkaConnectLineage] = list()
parser = self.get_parser(self.connector_manifest)
@ -287,6 +334,9 @@ class ConfluentJDBCSourceConnector:
topic_prefix = parser.topic_prefix
transforms = parser.transforms
self.connector_manifest.flow_property_bag = self.connector_manifest.config
instance_name = get_instance_name(
self.config, self.connector_manifest.name, source_platform
)
# Mask/Remove properties that may reveal credentials
self.connector_manifest.flow_property_bag[
@ -310,12 +360,10 @@ class ConfluentJDBCSourceConnector:
# For now, we use source table as topic (expected to be same as topic prefix)
for topic in self.connector_manifest.topic_names:
# default method - as per earlier implementation
source_table = topic
dataset_name = (
database_name + "." + source_table
if database_name
else source_table
dataset_name = self.get_dataset_name(
database_name, instance_name, topic
)
lineage = KafkaConnectLineage(
source_platform=source_platform,
target_dataset=topic,
@ -375,10 +423,8 @@ class ConfluentJDBCSourceConnector:
# in connector topics
if topic in self.connector_manifest.topic_names:
dataset_name = (
database_name + "." + source_table
if database_name
else source_table
dataset_name = self.get_dataset_name(
database_name, instance_name, source_table
)
lineage = KafkaConnectLineage(
@ -434,8 +480,11 @@ class ConfluentJDBCSourceConnector:
class DebeziumSourceConnector:
connector_manifest: ConnectorManifest
def __init__(self, connector_manifest: ConnectorManifest) -> None:
def __init__(
self, connector_manifest: ConnectorManifest, config: KafkaConnectSourceConfig
) -> None:
self.connector_manifest = connector_manifest
self.config = config
self._extract_lineages()
@dataclass
@ -516,19 +565,26 @@ class DebeziumSourceConnector:
server_name = parser.server_name
database_name = parser.database_name
topic_naming_pattern = r"({0})\.(\w+\.\w+)".format(server_name)
instance_name = get_instance_name(
self.config, self.connector_manifest.name, source_platform
)
if not self.connector_manifest.topic_names:
return lineages
# Get the platform/platform_instance mapping for every database_server from connect_to_platform_map
for topic in self.connector_manifest.topic_names:
found = re.search(re.compile(topic_naming_pattern), topic)
if found:
table_name = (
database_name + "." + found.group(2)
if database_name
else found.group(2)
)
if database_name and instance_name:
table_name = (
instance_name + "." + database_name + "." + found.group(2)
)
elif database_name:
table_name = database_name + "." + found.group(2)
else:
table_name = found.group(2)
lineage = KafkaConnectLineage(
source_dataset=table_name,
@ -734,7 +790,6 @@ class KafkaConnectSource(Source):
test_response = self.session.get(f"{self.config.connect_uri}")
test_response.raise_for_status()
logger.info(f"Connection to {self.config.connect_uri} is ok")
if not jpype.isJVMStarted():
jpype.startJVM()
@ -790,13 +845,15 @@ class KafkaConnectSource(Source):
"io.confluent.connect.jdbc.JdbcSourceConnector"
):
connector_manifest = ConfluentJDBCSourceConnector(
connector_manifest=connector_manifest, report=self.report
connector_manifest=connector_manifest,
config=self.config,
report=self.report,
).connector_manifest
else:
# Debezium Source Connector lineages
try:
connector_manifest = DebeziumSourceConnector(
connector_manifest=connector_manifest
connector_manifest=connector_manifest, config=self.config
).connector_manifest
except ValueError as err: