feat(ingest): add bigquery sink connector lineage in kafka connect source (#3590)

This commit is contained in:
mayurinehate 2021-11-18 06:48:37 +05:30 committed by GitHub
parent ad340be327
commit 69036acfca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 233 additions and 44 deletions

View File

@ -16,7 +16,9 @@ This plugin extracts the following:
Current limitations:
- Currently works for JDBC and Debezium source connectors only.
- works only for
- JDBC and Debezium source connectors
- BigQuery sink connector
## Quickstart recipe
@ -31,6 +33,10 @@ source:
# Coordinates
connect_uri: "http://localhost:8083"
cluster_name: "connect-cluster"
provided_configs:
- provider: env
path_key: MYSQL_CONNECTION_URL
value: jdbc:mysql://test_mysql:3306/librarydb
# Credentials
username: admin
@ -46,10 +52,11 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
| Field | Required | Default | Description |
| -------------------------- | -------- | -------------------------- | ------------------------------------------------------- |
| `connect_uri` | | `"http://localhost:8083/"` | URI to connect to. |
| `connect_uri` | | `"http://localhost:8083/"` | URI to connect to. |
| `username` | | | Kafka Connect username. |
| `password` | | | Kafka Connect password. |
| `cluster_name` | | `"connect-cluster"` | Cluster to ingest from. |
| `provided_configs` | | | Provided Configurations |
| `construct_lineage_workunits` | | `True` | Whether to create the input and output Dataset entities |
| `connector_patterns.deny` | | | List of regex patterns for connectors to include in ingestion. |
| `connector_patterns.allow` | | | List of regex patterns for connectors to exclude from ingestion. |

View File

@ -1,7 +1,7 @@
import logging
import re
from dataclasses import dataclass, field
from typing import Dict, Iterable, List, Optional
from typing import Dict, Iterable, List, Optional, Tuple
import jpype
import jpype.imports
@ -19,6 +19,12 @@ from datahub.ingestion.api.workunit import MetadataWorkUnit
logger = logging.getLogger(__name__)
class ProvidedConfig(ConfigModel):
provider: str
path_key: str
value: str
class KafkaConnectSourceConfig(ConfigModel):
# See the Connect REST Interface for details
# https://docs.confluent.io/platform/current/connect/references/restapi.html#
@ -29,6 +35,7 @@ class KafkaConnectSourceConfig(ConfigModel):
env: str = builder.DEFAULT_ENV
construct_lineage_workunits: bool = True
connector_patterns: AllowDenyPattern = AllowDenyPattern.allow_all()
provided_configs: Optional[List[ProvidedConfig]] = None
@dataclass
@ -533,6 +540,166 @@ class DebeziumSourceConnector:
self.connector_manifest.lineages = lineages
@dataclass
class BigQuerySinkConnector:
connector_manifest: ConnectorManifest
report: KafkaConnectSourceReport
def __init__(
self, connector_manifest: ConnectorManifest, report: KafkaConnectSourceReport
) -> None:
self.connector_manifest = connector_manifest
self.report = report
self._extract_lineages()
@dataclass
class BQParser:
project: str
target_platform: str
sanitizeTopics: str
topicsToTables: Optional[str] = None
datasets: Optional[str] = None
defaultDataset: Optional[str] = None
version: str = "v1"
def report_warning(self, key: str, reason: str) -> None:
logger.warning(f"{key}: {reason}")
self.report.report_warning(key, reason)
def get_parser(
self,
connector_manifest: ConnectorManifest,
) -> BQParser:
project = connector_manifest.config["project"]
sanitizeTopics = connector_manifest.config.get("sanitizeTopics", "false")
if "defaultDataset" in connector_manifest.config:
defaultDataset = connector_manifest.config["defaultDataset"]
return self.BQParser(
project=project,
defaultDataset=defaultDataset,
target_platform="bigquery",
sanitizeTopics=sanitizeTopics.lower() == "true",
version="v2",
)
else:
# version 1.6.x and similar configs supported
datasets = connector_manifest.config["datasets"]
topicsToTables = connector_manifest.config.get("topicsToTables")
return self.BQParser(
project=project,
topicsToTables=topicsToTables,
datasets=datasets,
target_platform="bigquery",
sanitizeTopics=sanitizeTopics.lower() == "true",
)
def get_list(self, property: str) -> Iterable[Tuple[str, str]]:
entries = property.split(",")
for entry in entries:
key, val = entry.rsplit("=")
yield (key.strip(), val.strip())
def get_dataset_for_topic_v1(self, topic: str, parser: BQParser) -> Optional[str]:
topicregex_dataset_map: Dict[str, str] = dict(self.get_list(parser.datasets)) # type: ignore
from java.util.regex import Pattern
for pattern, dataset in topicregex_dataset_map.items():
patternMatcher = Pattern.compile(pattern).matcher(topic)
if patternMatcher.matches():
return dataset
return None
def sanitize_table_name(self, table_name):
table_name = re.sub("[^a-zA-Z0-9_]", "_", table_name)
if re.match("^[^a-zA-Z_].*", table_name):
table_name = "_" + table_name
return table_name
def get_dataset_table_for_topic(
self, topic: str, parser: BQParser
) -> Optional[str]:
if parser.version == "v2":
dataset = parser.defaultDataset
parts = topic.split(":")
if len(parts) == 2:
dataset = parts[0]
table = parts[1]
else:
table = parts[0]
else:
dataset = self.get_dataset_for_topic_v1(topic, parser)
if dataset is None:
return None
table = topic
if parser.topicsToTables:
topicregex_table_map: Dict[str, str] = dict(
self.get_list(parser.topicsToTables) # type: ignore
)
from java.util.regex import Pattern
for pattern, tbl in topicregex_table_map.items():
patternMatcher = Pattern.compile(pattern).matcher(topic)
if patternMatcher.matches():
table = tbl
break
if parser.sanitizeTopics:
table = self.sanitize_table_name(table)
return f"{dataset}.{table}"
def _extract_lineages(self):
lineages: List[KafkaConnectLineage] = list()
parser = self.get_parser(self.connector_manifest)
if not parser:
return lineages
target_platform = parser.target_platform
project = parser.project
self.connector_manifest.flow_property_bag = self.connector_manifest.config
# Mask/Remove properties that may reveal credentials
if "keyfile" in self.connector_manifest.flow_property_bag:
del self.connector_manifest.flow_property_bag["keyfile"]
for topic in self.connector_manifest.topic_names:
dataset_table = self.get_dataset_table_for_topic(topic, parser)
if dataset_table is None:
self.report_warning(
self.connector_manifest.name,
f"could not find target dataset for topic {topic}, please check your connector configuration",
)
continue
target_dataset = f"{project}.{dataset_table}"
lineages.append(
KafkaConnectLineage(
source_dataset=topic,
source_platform="kafka",
target_dataset=target_dataset,
target_platform=target_platform,
)
)
self.connector_manifest.lineages = lineages
return
def transform_connector_config(
connector_config: Dict, provided_configs: List[ProvidedConfig]
) -> None:
"""This method will update provided configs in connector config values, if any"""
lookupsByProvider = {}
for pconfig in provided_configs:
lookupsByProvider[f"${{{pconfig.provider}:{pconfig.path_key}}}"] = pconfig.value
for k, v in connector_config.items():
for key, value in lookupsByProvider.items():
if key in v:
connector_config[k] = v.replace(key, value)
class KafkaConnectSource(Source):
"""The class for Kafka Connect source.
@ -589,17 +756,22 @@ class KafkaConnectSource(Source):
manifest = connector_response.json()
connector_manifest = ConnectorManifest(**manifest)
if self.config.provided_configs:
transform_connector_config(
connector_manifest.config, self.config.provided_configs
)
# Initialize connector lineages
connector_manifest.lineages = list()
connector_manifest.url = connector_url
topics = self.session.get(
f"{self.config.connect_uri}/connectors/{c}/topics",
).json()
connector_manifest.topic_names = topics[c]["topics"]
# Populate Source Connector metadata
if connector_manifest.type == "source":
topics = self.session.get(
f"{self.config.connect_uri}/connectors/{c}/topics",
).json()
connector_manifest.topic_names = topics[c]["topics"]
tasks = self.session.get(
f"{self.config.connect_uri}/connectors/{c}/tasks",
@ -629,11 +801,17 @@ class KafkaConnectSource(Source):
continue
if connector_manifest.type == "sink":
# TODO: Sink Connector not yet implemented
self.report.report_dropped(connector_manifest.name)
logger.warning(
f"Skipping connector {connector_manifest.name}. Lineage for Sink Connector not yet implemented"
)
if connector_manifest.config.get("connector.class").__eq__(
"com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
):
connector_manifest = BigQuerySinkConnector(
connector_manifest=connector_manifest, report=self.report
).connector_manifest
else:
self.report.report_dropped(connector_manifest.name)
logger.warning(
f"Skipping connector {connector_manifest.name}. Lineage for Connector not yet implemented"
)
pass
connectors_manifest.append(connector_manifest)
@ -688,7 +866,7 @@ class KafkaConnectSource(Source):
source_platform = lineage.source_platform
target_dataset = lineage.target_dataset
target_platform = lineage.target_platform
# job_property_bag = lineage.job_property_bag
job_property_bag = lineage.job_property_bag
job_id = (
source_dataset
@ -713,7 +891,7 @@ class KafkaConnectSource(Source):
name=f"{connector_name}:{job_id}",
type="COMMAND",
description=None,
# customProperties=job_property_bag
customProperties=job_property_bag
# externalUrl=job_url,
),
)

View File

@ -17,6 +17,7 @@ services:
# - ./../kafka-connect/setup/confluentinc-kafka-connect-jdbc-10.2.5:/usr/local/share/kafka/plugins/confluentinc-kafka-connect-jdbc-10.2.5
# - ./../kafka-connect/setup/confluentinc-connect-transforms-1.4.1:/usr/local/share/kafka/plugins/confluentinc-connect-transforms-1.4.1
# - ./../kafka-connect/setup/debezium-debezium-connector-mysql-1.7.0:/usr/local/share/kafka/plugins/debezium-debezium-connector-mysql-1.7.0
# - ./../kafka-connect/setup/gcp-bigquery-project-keyfile.json:/usr/local/share/gcp-bigquery-project-keyfile.json
command:
- bash
- -c
@ -29,10 +30,15 @@ services:
#
confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.0
#
#confluent-hub install --no-prompt wepay/kafka-connect-bigquery:1.6.8
#
curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.27.tar.gz" \
| tar -xzf - -C /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib \
--strip-components=1 mysql-connector-java-8.0.27/mysql-connector-java-8.0.27.jar
#
curl -k -SL "https://repo1.maven.org/maven2/io/strimzi/kafka-env-var-config-provider/0.1.1/kafka-env-var-config-provider-0.1.1.tar.gz" \
| tar -xzf - -C /usr/share/confluent-hub-components/
#
echo "Launching Kafka Connect worker"
#
/etc/confluent/docker/run &

View File

@ -85,7 +85,7 @@
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"value": "{\"customProperties\": {\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\", \"incrementing.column.name\": \"id\", \"tasks.max\": \"1\", \"transforms\": \"TotalReplacement\", \"transforms.TotalReplacement.type\": \"org.apache.kafka.connect.transforms.RegexRouter\", \"table.whitelist\": \"book\", \"mode\": \"incrementing\", \"name\": \"mysql_source3\", \"connection.url\": \"mysql://test_mysql:3306/librarydb\", \"transforms.TotalReplacement.regex\": \".*\", \"transforms.TotalReplacement.replacement\": \"my-new-topic\"}, \"name\": \"mysql_source3\", \"description\": \"Source connector using `io.confluent.connect.jdbc.JdbcSourceConnector` plugin.\"}",
"value": "{\"customProperties\": {\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\", \"mode\": \"incrementing\", \"incrementing.column.name\": \"id\", \"tasks.max\": \"1\", \"transforms\": \"TotalReplacement\", \"name\": \"mysql_source3\", \"connection.url\": \"mysql://test_mysql:3306/librarydb\", \"transforms.TotalReplacement.regex\": \".*\", \"transforms.TotalReplacement.type\": \"org.apache.kafka.connect.transforms.RegexRouter\", \"table.whitelist\": \"book\", \"transforms.TotalReplacement.replacement\": \"my-new-topic\"}, \"name\": \"mysql_source3\", \"description\": \"Source connector using `io.confluent.connect.jdbc.JdbcSourceConnector` plugin.\"}",
"contentType": "application/json"
},
"systemMetadata": null
@ -254,7 +254,7 @@
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"value": "{\"customProperties\": {\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\", \"incrementing.column.name\": \"id\", \"transforms.changetopic.type\": \"io.confluent.connect.transforms.ExtractTopic$Value\", \"tasks.max\": \"1\", \"transforms\": \"changetopic\", \"transforms.changetopic.field\": \"name\", \"table.whitelist\": \"book\", \"mode\": \"incrementing\", \"topic.prefix\": \"test-mysql-jdbc2-\", \"name\": \"mysql_source5\", \"connection.url\": \"mysql://test_mysql:3306/librarydb\"}, \"name\": \"mysql_source5\", \"description\": \"Source connector using `io.confluent.connect.jdbc.JdbcSourceConnector` plugin.\"}",
"value": "{\"customProperties\": {\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\", \"mode\": \"incrementing\", \"incrementing.column.name\": \"id\", \"topic.prefix\": \"test-mysql-jdbc2-\", \"transforms.changetopic.type\": \"io.confluent.connect.transforms.ExtractTopic$Value\", \"tasks.max\": \"1\", \"transforms\": \"changetopic\", \"name\": \"mysql_source5\", \"transforms.changetopic.field\": \"name\", \"connection.url\": \"mysql://test_mysql:3306/librarydb\", \"table.whitelist\": \"book\"}, \"name\": \"mysql_source5\", \"description\": \"Source connector using `io.confluent.connect.jdbc.JdbcSourceConnector` plugin.\"}",
"contentType": "application/json"
},
"systemMetadata": null

View File

@ -5,8 +5,12 @@ run_id: kafka-connect-run
source:
type: "kafka-connect"
config:
connect_uri: "http://localhost:58083"
construct_lineage_workunits: False
connect_uri: "http://localhost:58083"
provided_configs:
- provider: env
path_key: MYSQL_CONNECTION_URL
value: jdbc:mysql://test_mysql:3306/librarydb
construct_lineage_workunits: false
# see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation
sink:

View File

@ -4,15 +4,21 @@ CONNECT_GROUP_ID=kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC=_connect-configs
CONNECT_OFFSET_STORAGE_TOPIC=_connect-offsets
CONNECT_STATUS_STORAGE_TOPIC=_connect-status
CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
# CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL='http://schema-registry:8081'
CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
# CONNECT_INTERNAL_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
# CONNECT_INTERNAL_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL='http://schema-registry:8081'
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL='http://schema-registry:8081'
# CONNECT_INTERNAL_KEY_CONVERTER_SCHEMA_REGISTRY_URL='http://schema-registry:8081'
# CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMA_REGISTRY_URL='http://schema-registry:8081'
CONNECT_REST_ADVERTISED_HOST_NAME=test_connect
CONNECT_LOG4J_ROOT_LOGLEVEL=INFO
# CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN=[%d] %p %X{connector.context}%m (%c:%L)%n
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1
CONNECT_PLUGIN_PATH=/usr/share/confluent-hub-components, /usr/local/share/kafka/plugins
CONNECT_PLUGIN_PATH=/usr/share/confluent-hub-components, /usr/local/share/kafka/plugins
CONNECT_CONFIG_PROVIDERS=env
CONNECT_CONFIG_PROVIDERS_ENV_CLASS=io.strimzi.kafka.EnvVarConfigProvider
MYSQL_CONNECTION_URL=jdbc:mysql://foo:datahub@test_mysql:3306/librarydb

View File

@ -47,10 +47,8 @@ def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, moc
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "test-mysql-jdbc-",
"connection.password": "datahub",
"connection.user": "foo",
"tasks.max": "1",
"connection.url": "jdbc:mysql://test_mysql:3306/librarydb"
"connection.url": "${env:MYSQL_CONNECTION_URL}"
}
}
""",
@ -66,10 +64,8 @@ def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, moc
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"incrementing.column.name": "id",
"connection.password": "datahub",
"connection.user": "foo",
"tasks.max": "1",
"connection.url": "jdbc:mysql://test_mysql:3306/librarydb",
"connection.url": "${env:MYSQL_CONNECTION_URL}",
"transforms": "TotalReplacement",
"transforms.TotalReplacement.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.TotalReplacement.regex": ".*(book)",
@ -90,10 +86,8 @@ def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, moc
"mode": "incrementing",
"incrementing.column.name": "id",
"table.whitelist": "book",
"connection.password": "datahub",
"connection.user": "foo",
"tasks.max": "1",
"connection.url": "jdbc:mysql://test_mysql:3306/librarydb",
"connection.url": "${env:MYSQL_CONNECTION_URL}",
"transforms": "TotalReplacement",
"transforms.TotalReplacement.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.TotalReplacement.regex": ".*",
@ -115,10 +109,8 @@ def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, moc
"incrementing.column.name": "id",
"query": "select * from member",
"topic.prefix": "query-topic",
"connection.password": "datahub",
"connection.user": "foo",
"tasks.max": "1",
"connection.url": "jdbc:mysql://test_mysql:3306/librarydb"
"connection.url": "${env:MYSQL_CONNECTION_URL}"
}
}
""",
@ -134,12 +126,10 @@ def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, moc
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"incrementing.column.name": "id",
"connection.password": "datahub",
"connection.user": "foo",
"table.whitelist": "book",
"topic.prefix": "test-mysql-jdbc2-",
"tasks.max": "1",
"connection.url": "jdbc:mysql://test_mysql:3306/librarydb",
"connection.url": "${env:MYSQL_CONNECTION_URL}",
"transforms": "changetopic",
"transforms.changetopic.type": "io.confluent.connect.transforms.ExtractTopic$Value",
"transforms.changetopic.field": "name"
@ -159,10 +149,8 @@ def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, moc
"insert.mode": "insert",
"auto.create": true,
"topics": "my-topic",
"connection.password": "datahub",
"connection.user": "foo",
"tasks.max": "1",
"connection.url": "jdbc:mysql://test_mysql:3306/librarydb"
"connection.url": "${env:MYSQL_CONNECTION_URL}"
}
}
""",
@ -193,7 +181,7 @@ def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, moc
assert r.status_code == 201 # Created
# Give time for connectors to process the table data
time.sleep(60)
time.sleep(45)
# Run the metadata ingestion pipeline.
runner = CliRunner()