feat(ingest/kakfa-connect): support generic connectors (#6173)

Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
This commit is contained in:
BZ 2022-10-25 13:31:31 -04:00 committed by GitHub
parent 18df38e504
commit 64c28a2316
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 64 additions and 9 deletions

View File

@ -37,6 +37,12 @@ class ProvidedConfig(ConfigModel):
value: str
class GenericConnectorConfig(ConfigModel):
connector_name: str
source_dataset: str
source_platform: str
class KafkaConnectSourceConfig(DatasetLineageProviderConfigBase):
# See the Connect REST Interface for details
# https://docs.confluent.io/platform/current/connect/references/restapi.html#
@ -67,6 +73,10 @@ class KafkaConnectSourceConfig(DatasetLineageProviderConfigBase):
default=None,
description='Platform instance mapping to use when constructing URNs. e.g.`platform_instance_map: { "hive": "warehouse" }`',
)
generic_connectors: List[GenericConnectorConfig] = Field(
default=[],
description="Provide lineage graph for sources connectors other than Confluent JDBC Source Connector or Debezium Source Connector",
)
@dataclass
@ -836,6 +846,7 @@ class KafkaConnectSource(Source):
- works only for
- JDBC and Debezium source connectors
- Generic connectors with user-defined lineage graph
- BigQuery sink connector
"""
@ -923,20 +934,35 @@ class KafkaConnectSource(Source):
config=self.config,
report=self.report,
).connector_manifest
elif connector_manifest.config.get("connector.class", "").startswith(
"io.debezium.connector"
):
connector_manifest = DebeziumSourceConnector(
connector_manifest=connector_manifest, config=self.config
).connector_manifest
else:
# Debezium Source Connector lineages
try:
connector_manifest = DebeziumSourceConnector(
connector_manifest=connector_manifest, config=self.config
).connector_manifest
except ValueError as err:
# Find the target connector object in the list, or log an error if unknown.
target_connector = None
for connector in self.config.generic_connectors:
if connector.connector_name == connector_manifest.name:
target_connector = connector
break
if not target_connector:
logger.warning(
f"Skipping connector {connector_manifest.name} due to error: {err}"
f"Detected undefined connector {connector_manifest.name}, which is not in the customized connector list. Please refer to Kafka Connect ingestion recipe to define this customized connector."
)
self.report.report_failure(connector_manifest.name, str(err))
continue
for topic in topics:
lineage = KafkaConnectLineage(
source_dataset=target_connector.source_dataset,
source_platform=target_connector.source_platform,
target_dataset=topic,
target_platform="kafka",
)
connector_manifest.lineages.append(lineage)
if connector_manifest.type == "sink":
if connector_manifest.config.get("connector.class").__eq__(
"com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"

View File

@ -28,6 +28,8 @@ services:
#
confluent-hub install --no-prompt confluentinc/connect-transforms:1.4.1
#
confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.6.0
#
confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.0
#
#confluent-hub install --no-prompt wepay/kafka-connect-bigquery:1.6.8

View File

@ -19,6 +19,10 @@ source:
connect_to_platform_map: # optional
postgres_source: # optional - Connector name
postgres: postgres1 # optional - Platform to instance map
generic_connectors: # optional - Generic connector lineage info
- connector_name: generic-connector
source_dataset: generic-dataset
source_platform: generic-platform
# see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation
sink:

View File

@ -215,6 +215,29 @@ def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, moc
}""",
)
assert r.status_code == 201 # Created
# Creating Generic source
r = requests.post(
"http://localhost:58083/connectors",
headers={"Content-Type": "application/json"},
data="""{
"name": "generic_source",
"config": {
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"kafka.topic": "my-topic",
"quickstart": "product",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"max.interval": 1000,
"iterations": 10000000,
"tasks.max": "1"
}
}""",
)
r.raise_for_status()
assert r.status_code == 201 # Created
# Give time for connectors to process the table data
time.sleep(60)