feat(ingest): add support for jdbc connector to kafka-connect source (#3076)

This commit is contained in:
Chinmay Bhat 2021-08-11 11:50:38 -04:00 committed by GitHub
parent 1ad1243371
commit 7abcb45f8c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 96 additions and 14 deletions

View File

@ -74,7 +74,7 @@ plugins: Dict[str, Set[str]] = {
"airflow": {"apache-airflow >= 1.10.2"},
# Source plugins
"kafka": kafka_common,
"kafka-connect": {"requests"},
"kafka-connect": sql_common | {"requests"},
"sqlalchemy": sql_common,
"athena": sql_common | {"PyAthena[SQLAlchemy]"},
"bigquery": sql_common | {"pybigquery >= 0.6.0"},

View File

@ -16,7 +16,7 @@ This plugin extracts the following:
Current limitations:
- Currently works only for Debezium source connectors.
- Currently works for JDBC and Debezium source connectors only.
## Quickstart recipe
@ -52,7 +52,7 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
| `cluster_name` | | `"connect-cluster"` | Cluster to ingest from. |
| `connector_patterns.deny` | | | List of regex patterns for connectors to include in ingestion. |
| `connector_patterns.allow` | | | List of regex patterns for connectors to exclude from ingestion. |
| `connector_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `connector_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
## Compatibility

View File

@ -5,6 +5,7 @@ from typing import Dict, Iterable, List, Optional
import requests
from pydantic import BaseModel
from sqlalchemy.engine.url import make_url
import datahub.emitter.mce_builder as builder
import datahub.metadata.schema_classes as models
@ -58,6 +59,18 @@ class ConnectorManifest(BaseModel):
url: Optional[str]
def get_jdbc_source_connector_parser(connector_manifest):
connector_class = connector_manifest.config.get("connector.class", "")
parser = {
# https://docs.confluent.io/kafka-connect-jdbc/current/source-connector/source_config_options.html
"io.confluent.connect.jdbc.JdbcSourceConnector": {
"db_connection_url": connector_manifest.config.get("connection.url"),
}
}
return parser.get(connector_class)
def get_debezium_source_connector_parser(connector_manifest):
connector_class = connector_manifest.config.get("connector.class", "")
parser = {
@ -113,6 +126,67 @@ def get_debezium_source_connector_parser(connector_manifest):
return parser.get(connector_class)
def remove_prefix(text: str, prefix: str) -> str:
if text.startswith(prefix):
index = len(prefix)
return text[index:]
return text
@dataclass
class JDBCSourceConnectorLineages:
connector_manifest: ConnectorManifest
def get_lineages(self):
lineages: List[KafkaConnectLineage] = list()
parser = get_jdbc_source_connector_parser(self.connector_manifest)
db_connection_url = parser.get("db_connection_url")
url = remove_prefix(str(db_connection_url), "jdbc:")
url_instance = make_url(url)
source_platform = url_instance.drivername
database_name = url_instance.database
logging.debug(
f"Extracting source platform: {source_platform} and database name: {database_name} from connection url "
)
topic_prefix = (
str(self.connector_manifest.config.get("topic.prefix"))
if self.connector_manifest.config.get("topic.prefix")
else None
)
query = (
self.connector_manifest.config.get("query")
if self.connector_manifest.config.get("query")
else None
)
if not self.connector_manifest.topic_names:
return lineages
for topic in self.connector_manifest.topic_names:
# if the connector uses a custom query
if topic_prefix and not query:
source_table = remove_prefix(topic, topic_prefix)
else:
source_table = topic
dataset_name = (
database_name + "." + source_table if database_name else source_table
)
lineage = KafkaConnectLineage(
source_dataset=dataset_name,
source_platform=source_platform,
target_dataset=topic,
target_platform="kafka",
)
lineages.append(lineage)
return lineages
@dataclass
class DebeziumSourceConnectorLineages:
connector_manifest: ConnectorManifest
@ -203,14 +277,12 @@ class KafkaConnectSource(Source):
manifest = connector_response.json()
connector_manifest = ConnectorManifest(**manifest)
# Initialize connector lineages
connector_manifest.lineages = list()
connector_manifest.url = connector_url
# Populate Source Connector metadata
if connector_manifest.type == "source":
# connector_config = manifest.get("config", {})
topics_response = self.session.get(
f"{self.config.connect_uri}/connectors/{c}/topics",
)
@ -218,13 +290,24 @@ class KafkaConnectSource(Source):
topics = topics_response.json()
connector_manifest.topic_names = topics[c]["topics"]
# Currently we only support Debezium Source Connector lineages
debezium_source_lineages = DebeziumSourceConnectorLineages(
connector_manifest=connector_manifest
)
connector_manifest.lineages.extend(
debezium_source_lineages.get_lineages()
)
# JDBC source connector lineages
if connector_manifest.config.get("connector.class").__eq__(
"io.confluent.connect.jdbc.JdbcSourceConnector"
):
jdbc_source_lineages = JDBCSourceConnectorLineages(
connector_manifest=connector_manifest
)
connector_manifest.lineages.extend(
jdbc_source_lineages.get_lineages()
)
else:
# Debezium Source Connector lineages
debezium_source_lineages = DebeziumSourceConnectorLineages(
connector_manifest=connector_manifest
)
connector_manifest.lineages.extend(
debezium_source_lineages.get_lineages()
)
if connector_manifest.type == "sink":
# TODO: Sink Connector not yet implemented
@ -273,6 +356,7 @@ class KafkaConnectSource(Source):
def construct_job_workunits(
self, connector: ConnectorManifest
) -> Iterable[MetadataWorkUnit]:
connector_name = connector.name
flow_urn = builder.make_data_flow_urn(
"kafka-connect", connector_name, self.config.env
@ -357,9 +441,7 @@ class KafkaConnectSource(Source):
yield wu
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
connectors_manifest = self.get_connectors_manifest()
for connector in connectors_manifest:
name = connector.name
if self.config.connector_patterns.allowed(name):