feat(ingest): kafka connect source improvements (#3481)

This commit is contained in:
mayurinehate 2021-11-04 03:33:05 +05:30 committed by GitHub
parent a5ec592f1b
commit 192f0d33a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1281 additions and 209 deletions

View File

@ -102,7 +102,7 @@ plugins: Dict[str, Set[str]] = {
"acryl-pyhive[hive]>=0.6.11" "acryl-pyhive[hive]>=0.6.11"
}, },
"kafka": kafka_common, "kafka": kafka_common,
"kafka-connect": sql_common | {"requests"}, "kafka-connect": sql_common | {"requests","JPype1"},
"ldap": {"python-ldap>=2.4"}, "ldap": {"python-ldap>=2.4"},
"looker": looker_common, "looker": looker_common,
"lookml": looker_common | {"lkml>=1.1.0", "sql-metadata==2.2.2"}, "lookml": looker_common | {"lkml>=1.1.0", "sql-metadata==2.2.2"},
@ -234,6 +234,7 @@ full_test_dev_requirements = {
"mariadb", "mariadb",
"snowflake", "snowflake",
"redash", "redash",
"kafka-connect"
] ]
for dependency in plugins[plugin] for dependency in plugins[plugin]
), ),

View File

@ -3,13 +3,15 @@ import re
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Dict, Iterable, List, Optional from typing import Dict, Iterable, List, Optional
import jpype
import jpype.imports
import requests import requests
from pydantic import BaseModel
from sqlalchemy.engine.url import make_url from sqlalchemy.engine.url import make_url
import datahub.emitter.mce_builder as builder import datahub.emitter.mce_builder as builder
import datahub.metadata.schema_classes as models import datahub.metadata.schema_classes as models
from datahub.configuration.common import AllowDenyPattern, ConfigModel from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.api.workunit import MetadataWorkUnit
@ -26,7 +28,7 @@ class KafkaConnectSourceConfig(ConfigModel):
cluster_name: Optional[str] = "connect-cluster" cluster_name: Optional[str] = "connect-cluster"
env: str = builder.DEFAULT_ENV env: str = builder.DEFAULT_ENV
construct_lineage_workunits: bool = True construct_lineage_workunits: bool = True
connector_patterns: AllowDenyPattern = AllowDenyPattern(allow=[".*"], deny=["^_.*"]) connector_patterns: AllowDenyPattern = AllowDenyPattern.allow_all()
@dataclass @dataclass
@ -41,106 +43,29 @@ class KafkaConnectSourceReport(SourceReport):
self.filtered.append(connector) self.filtered.append(connector)
@dataclass
class DebeziumParser:
source_platform: str
server_name: Optional[str]
database_name: Optional[str]
@dataclass
class JdbcParser:
db_connection_url: Optional[str]
@dataclass @dataclass
class KafkaConnectLineage: class KafkaConnectLineage:
"""Class to store Kafka Connect lineage mapping""" """Class to store Kafka Connect lineage mapping, Each instance is potential DataJob"""
source_dataset: str
source_platform: str source_platform: str
target_dataset: str target_dataset: str
target_platform: str target_platform: str
job_property_bag: Optional[Dict[str, str]] = None
source_dataset: Optional[str] = None
class ConnectorManifest(BaseModel): @dataclass
class ConnectorManifest:
"""Each instance is potential DataFlow"""
name: str name: str
config: Dict = {}
lineages: Optional[List[KafkaConnectLineage]] = []
topic_names: Optional[Iterable[str]] = []
type: str type: str
url: Optional[str] config: Dict
tasks: Dict
url: Optional[str] = None
def get_jdbc_source_connector_parser( flow_property_bag: Optional[Dict[str, str]] = None
connector_manifest: ConnectorManifest, lineages: List[KafkaConnectLineage] = field(default_factory=list)
) -> JdbcParser: topic_names: Iterable[str] = field(default_factory=list)
return JdbcParser(connector_manifest.config.get("connection.url"))
def get_debezium_source_connector_parser(
connector_manifest: ConnectorManifest,
) -> DebeziumParser:
connector_class = connector_manifest.config.get("connector.class", "")
if connector_class == "io.debezium.connector.mysql.MySqlConnector":
# https://debezium.io/documentation/reference/connectors/mysql.html#mysql-topic-names
parser = DebeziumParser(
source_platform="mysql",
server_name=connector_manifest.config.get("database.server.name"),
database_name=None,
)
elif connector_class == "MySqlConnector":
parser = DebeziumParser(
source_platform="mysql",
server_name=connector_manifest.config.get("database.server.name"),
database_name=None,
)
elif connector_class == "io.debezium.connector.mongodb.MongoDbConnector":
# https://debezium.io/documentation/reference/connectors/mongodb.html#mongodb-topic-names
parser = DebeziumParser(
source_platform="mongodb",
server_name=connector_manifest.config.get("database.server.name"),
database_name=None,
)
elif connector_class == "io.debezium.connector.postgresql.PostgresConnector":
# https://debezium.io/documentation/reference/connectors/postgresql.html#postgresql-topic-names
parser = DebeziumParser(
source_platform="postgres",
server_name=connector_manifest.config.get("database.server.name"),
database_name=connector_manifest.config.get("database.dbname"),
)
elif connector_class == "io.debezium.connector.oracle.OracleConnector":
# https://debezium.io/documentation/reference/connectors/oracle.html#oracle-topic-names
parser = DebeziumParser(
source_platform="oracle",
server_name=connector_manifest.config.get("database.server.name"),
database_name=connector_manifest.config.get("database.dbname"),
)
elif connector_class == "io.debezium.connector.sqlserver.SqlServerConnector":
# https://debezium.io/documentation/reference/connectors/sqlserver.html#sqlserver-topic-names
parser = DebeziumParser(
source_platform="mssql",
server_name=connector_manifest.config.get("database.server.name"),
database_name=connector_manifest.config.get("database.dbname"),
)
elif connector_class == "io.debezium.connector.db2.Db2Connector":
# https://debezium.io/documentation/reference/connectors/db2.html#db2-topic-names
parser = DebeziumParser(
source_platform="db2",
server_name=connector_manifest.config.get("database.server.name"),
database_name=connector_manifest.config.get("database.dbname"),
)
elif connector_class == "io.debezium.connector.vitess.VitessConnector":
# https://debezium.io/documentation/reference/connectors/vitess.html#vitess-topic-names
parser = DebeziumParser(
source_platform="vitess",
server_name=connector_manifest.config.get("database.server.name"),
database_name=connector_manifest.config.get("vitess.keyspace"),
)
else:
raise ValueError(f"Connector class '{connector_class}' is unknown.")
return parser
def remove_prefix(text: str, prefix: str) -> str: def remove_prefix(text: str, prefix: str) -> str:
@ -150,67 +75,436 @@ def remove_prefix(text: str, prefix: str) -> str:
return text return text
@dataclass def unquote(string: str, leading_quote: str = '"', trailing_quote: str = None) -> str:
class JDBCSourceConnectorLineages: """
connector_manifest: ConnectorManifest If string starts and ends with a quote, unquote it
"""
trailing_quote = trailing_quote if trailing_quote else leading_quote
if string.startswith(leading_quote) and string.endswith(trailing_quote):
string = string[1:-1]
return string
def get_lineages(self):
lineages: List[KafkaConnectLineage] = list() @dataclass
parser = get_jdbc_source_connector_parser(self.connector_manifest) class ConfluentJDBCSourceConnector:
db_connection_url = parser.db_connection_url connector_manifest: ConnectorManifest
url = remove_prefix(str(db_connection_url), "jdbc:") report: KafkaConnectSourceReport
def __init__(
self, connector_manifest: ConnectorManifest, report: KafkaConnectSourceReport
) -> None:
self.connector_manifest = connector_manifest
self.report = report
self._extract_lineages()
REGEXROUTER = "org.apache.kafka.connect.transforms.RegexRouter"
KNOWN_TOPICROUTING_TRANSFORMS = [REGEXROUTER]
# https://kafka.apache.org/documentation/#connect_included_transformation
KAFKA_NONTOPICROUTING_TRANSFORMS = [
"InsertField",
"InsertField$Key",
"InsertField$Value",
"ReplaceField",
"ReplaceField$Key",
"ReplaceField$Value",
"MaskField",
"MaskField$Key",
"MaskField$Value",
"ValueToKey",
"ValueToKey$Key",
"ValueToKey$Value",
"HoistField",
"HoistField$Key",
"HoistField$Value",
"ExtractField",
"ExtractField$Key",
"ExtractField$Value",
"SetSchemaMetadata",
"SetSchemaMetadata$Key",
"SetSchemaMetadata$Value",
"Flatten",
"Flatten$Key",
"Flatten$Value",
"Cast",
"Cast$Key",
"Cast$Value",
"HeadersFrom",
"HeadersFrom$Key",
"HeadersFrom$Value",
"TimestampConverter",
"Filter",
"InsertHeader",
"DropHeaders",
]
# https://docs.confluent.io/platform/current/connect/transforms/overview.html
CONFLUENT_NONTOPICROUTING_TRANSFORMS = [
"Drop",
"Drop$Key",
"Drop$Value",
"Filter",
"Filter$Key",
"Filter$Value",
"TombstoneHandler",
]
KNOWN_NONTOPICROUTING_TRANSFORMS = (
KAFKA_NONTOPICROUTING_TRANSFORMS
+ [
"org.apache.kafka.connect.transforms.{}".format(t)
for t in KAFKA_NONTOPICROUTING_TRANSFORMS
]
+ CONFLUENT_NONTOPICROUTING_TRANSFORMS
+ [
"io.confluent.connect.transforms.{}".format(t)
for t in CONFLUENT_NONTOPICROUTING_TRANSFORMS
]
)
@dataclass
class JdbcParser:
db_connection_url: str
source_platform: str
database_name: str
topic_prefix: str
query: str
transforms: list
def report_warning(self, key: str, reason: str) -> None:
logger.warning(f"{key}: {reason}")
self.report.report_warning(key, reason)
def get_parser(
self,
connector_manifest: ConnectorManifest,
) -> JdbcParser:
url = remove_prefix(
str(connector_manifest.config.get("connection.url")), "jdbc:"
)
url_instance = make_url(url) url_instance = make_url(url)
source_platform = url_instance.drivername source_platform = url_instance.drivername
database_name = url_instance.database database_name = url_instance.database
db_connection_url = f"{url_instance.drivername}://{url_instance.host}:{url_instance.port}/{url_instance.database}"
logging.debug( topic_prefix = self.connector_manifest.config.get("topic.prefix", None)
f"Extracting source platform: {source_platform} and database name: {database_name} from connection url "
query = self.connector_manifest.config.get("query", None)
transform_names = (
self.connector_manifest.config.get("transforms", "").split(",")
if self.connector_manifest.config.get("transforms")
else []
) )
topic_prefix = ( transforms = []
str(self.connector_manifest.config.get("topic.prefix")) for name in transform_names:
if self.connector_manifest.config.get("topic.prefix") transform = {"name": name}
else None transforms.append(transform)
for key in self.connector_manifest.config.keys():
if key.startswith("transforms.{}.".format(name)):
transform[
key.replace("transforms.{}.".format(name), "")
] = self.connector_manifest.config[key]
return self.JdbcParser(
db_connection_url,
source_platform,
database_name,
topic_prefix,
query,
transforms,
) )
query = ( def default_get_lineages(
self.connector_manifest.config.get("query") self,
if self.connector_manifest.config.get("query") topic_prefix,
else None database_name,
) source_platform,
topic_names=None,
if not self.connector_manifest.topic_names: include_source_dataset=True,
return lineages ):
lineages: List[KafkaConnectLineage] = list()
for topic in self.connector_manifest.topic_names: if not topic_names:
# if the connector uses a custom query topic_names = self.connector_manifest.topic_names
if topic_prefix and not query: for topic in topic_names:
# All good for NO_TRANSFORM or (SINGLE_TRANSFORM and KNOWN_NONTOPICROUTING_TRANSFORM) or (not SINGLE_TRANSFORM and all(KNOWN_NONTOPICROUTING_TRANSFORM))
# default method - as per earlier implementation
if topic_prefix:
source_table = remove_prefix(topic, topic_prefix) source_table = remove_prefix(topic, topic_prefix)
else: else:
source_table = topic source_table = topic
dataset_name = ( dataset_name = (
database_name + "." + source_table if database_name else source_table database_name + "." + source_table if database_name else source_table
) )
lineage = KafkaConnectLineage( lineage = KafkaConnectLineage(
source_dataset=dataset_name, source_dataset=dataset_name if include_source_dataset else None,
source_platform=source_platform, source_platform=source_platform,
target_dataset=topic, target_dataset=topic,
target_platform="kafka", target_platform="kafka",
) )
lineages.append(lineage) lineages.append(lineage)
return lineages return lineages
def get_table_names(self):
if self.connector_manifest.config.get("table.whitelist"):
return self.connector_manifest.config.get("table.whitelist").split(",") # type: ignore
if self.connector_manifest.tasks:
sep = "."
leading_quote_char = trailing_quote_char = '"'
quote_method = self.connector_manifest.config.get(
"quote.sql.identifiers", "always"
)
tableIds = ",".join(
[task["config"].get("tables") for task in self.connector_manifest.tasks]
)
if quote_method == "always":
leading_quote_char = tableIds[0]
trailing_quote_char = tableIds[-1]
# This will only work for single character quotes
tables = [
unquote(tableId.split(sep)[-1], leading_quote_char, trailing_quote_char)
for tableId in tableIds.split(",")
]
return tables
return []
def _extract_lineages(self):
lineages: List[KafkaConnectLineage] = list()
parser = self.get_parser(self.connector_manifest)
source_platform = parser.source_platform
database_name = parser.database_name
query = parser.query
topic_prefix = parser.topic_prefix
transforms = parser.transforms
self.connector_manifest.flow_property_bag = self.connector_manifest.config
# Mask/Remove properties that may reveal credentials
self.connector_manifest.flow_property_bag[
"connection.url"
] = parser.db_connection_url
if "connection.password" in self.connector_manifest.flow_property_bag:
del self.connector_manifest.flow_property_bag["connection.password"]
if "connection.user" in self.connector_manifest.flow_property_bag:
del self.connector_manifest.flow_property_bag["connection.user"]
logging.debug(
f"Extracting source platform: {source_platform} and database name: {database_name} from connection url "
)
if not self.connector_manifest.topic_names:
self.connector_manifest.lineages = lineages
return
if query:
# Lineage source_table can be extracted by parsing query
# For now, we use source table as topic (expected to be same as topic prefix)
for topic in self.connector_manifest.topic_names:
# default method - as per earlier implementation
source_table = topic
dataset_name = (
database_name + "." + source_table
if database_name
else source_table
)
lineage = KafkaConnectLineage(
source_platform=source_platform,
target_dataset=topic,
target_platform="kafka",
)
lineages.append(lineage)
self.report_warning(
self.connector_manifest.name,
"could not find input dataset, the connector has query configuration set",
)
self.connector_manifest.lineages = lineages
return
SINGLE_TRANSFORM = len(transforms) == 1
NO_TRANSFORM = len(transforms) == 0
UNKNOWN_TRANSFORM = any(
[
transform["type"]
not in self.KNOWN_TOPICROUTING_TRANSFORMS
+ self.KNOWN_NONTOPICROUTING_TRANSFORMS
for transform in transforms
]
)
ALL_TRANSFORMS_NON_TOPICROUTING = all(
[
transform["type"] in self.KNOWN_NONTOPICROUTING_TRANSFORMS
for transform in transforms
]
)
if NO_TRANSFORM or ALL_TRANSFORMS_NON_TOPICROUTING:
self.connector_manifest.lineages = self.default_get_lineages(
database_name=database_name,
source_platform=source_platform,
topic_prefix=topic_prefix,
)
return
if SINGLE_TRANSFORM and transforms[0]["type"] == self.REGEXROUTER:
tables = self.get_table_names()
topic_names = list(self.connector_manifest.topic_names)
from java.util.regex import Pattern
for source_table in tables:
topic = topic_prefix + source_table if topic_prefix else source_table
transform_regex = Pattern.compile(transforms[0]["regex"])
transform_replacement = transforms[0]["replacement"]
matcher = transform_regex.matcher(topic)
if matcher.matches():
topic = matcher.replaceFirst(transform_replacement)
# Additional check to confirm that the topic present
# in connector topics
if topic in self.connector_manifest.topic_names:
dataset_name = (
database_name + "." + source_table
if database_name
else source_table
)
lineage = KafkaConnectLineage(
source_dataset=dataset_name,
source_platform=source_platform,
target_dataset=topic,
target_platform="kafka",
)
topic_names.remove(topic)
lineages.append(lineage)
if topic_names:
lineages.extend(
self.default_get_lineages(
database_name=database_name,
source_platform=source_platform,
topic_prefix=topic_prefix,
topic_names=topic_names,
include_source_dataset=False,
)
)
self.report_warning(
self.connector_manifest.name,
f"could not find input dataset, for connector topics {topic_names}",
)
self.connector_manifest.lineages = lineages
return
else:
include_source_dataset = True
if SINGLE_TRANSFORM and UNKNOWN_TRANSFORM:
self.report_warning(
self.connector_manifest.name,
f"could not find input dataset, connector has unknown transform - {transforms[0]['type']}",
)
include_source_dataset = False
if not SINGLE_TRANSFORM and UNKNOWN_TRANSFORM:
self.report_warning(
self.connector_manifest.name,
"could not find input dataset, connector has one or more unknown transforms",
)
include_source_dataset = False
lineages = self.default_get_lineages(
database_name=database_name,
source_platform=source_platform,
topic_prefix=topic_prefix,
include_source_dataset=include_source_dataset,
)
self.connector_manifest.lineages = lineages
return
@dataclass @dataclass
class DebeziumSourceConnectorLineages: class DebeziumSourceConnector:
connector_manifest: ConnectorManifest connector_manifest: ConnectorManifest
def get_lineages(self) -> List[KafkaConnectLineage]: def __init__(self, connector_manifest: ConnectorManifest) -> None:
self.connector_manifest = connector_manifest
self._extract_lineages()
@dataclass
class DebeziumParser:
source_platform: str
server_name: Optional[str]
database_name: Optional[str]
def get_parser(
self,
connector_manifest: ConnectorManifest,
) -> DebeziumParser:
connector_class = connector_manifest.config.get("connector.class", "")
if connector_class == "io.debezium.connector.mysql.MySqlConnector":
# https://debezium.io/documentation/reference/connectors/mysql.html#mysql-topic-names
parser = self.DebeziumParser(
source_platform="mysql",
server_name=connector_manifest.config.get("database.server.name"),
database_name=None,
)
elif connector_class == "MySqlConnector":
parser = self.DebeziumParser(
source_platform="mysql",
server_name=connector_manifest.config.get("database.server.name"),
database_name=None,
)
elif connector_class == "io.debezium.connector.mongodb.MongoDbConnector":
# https://debezium.io/documentation/reference/connectors/mongodb.html#mongodb-topic-names
parser = self.DebeziumParser(
source_platform="mongodb",
server_name=connector_manifest.config.get("database.server.name"),
database_name=None,
)
elif connector_class == "io.debezium.connector.postgresql.PostgresConnector":
# https://debezium.io/documentation/reference/connectors/postgresql.html#postgresql-topic-names
parser = self.DebeziumParser(
source_platform="postgres",
server_name=connector_manifest.config.get("database.server.name"),
database_name=connector_manifest.config.get("database.dbname"),
)
elif connector_class == "io.debezium.connector.oracle.OracleConnector":
# https://debezium.io/documentation/reference/connectors/oracle.html#oracle-topic-names
parser = self.DebeziumParser(
source_platform="oracle",
server_name=connector_manifest.config.get("database.server.name"),
database_name=connector_manifest.config.get("database.dbname"),
)
elif connector_class == "io.debezium.connector.sqlserver.SqlServerConnector":
# https://debezium.io/documentation/reference/connectors/sqlserver.html#sqlserver-topic-names
parser = self.DebeziumParser(
source_platform="mssql",
server_name=connector_manifest.config.get("database.server.name"),
database_name=connector_manifest.config.get("database.dbname"),
)
elif connector_class == "io.debezium.connector.db2.Db2Connector":
# https://debezium.io/documentation/reference/connectors/db2.html#db2-topic-names
parser = self.DebeziumParser(
source_platform="db2",
server_name=connector_manifest.config.get("database.server.name"),
database_name=connector_manifest.config.get("database.dbname"),
)
elif connector_class == "io.debezium.connector.vitess.VitessConnector":
# https://debezium.io/documentation/reference/connectors/vitess.html#vitess-topic-names
parser = self.DebeziumParser(
source_platform="vitess",
server_name=connector_manifest.config.get("database.server.name"),
database_name=connector_manifest.config.get("vitess.keyspace"),
)
else:
raise ValueError(f"Connector class '{connector_class}' is unknown.")
return parser
def _extract_lineages(self):
lineages: List[KafkaConnectLineage] = list() lineages: List[KafkaConnectLineage] = list()
parser = get_debezium_source_connector_parser(self.connector_manifest) parser = self.get_parser(self.connector_manifest)
source_platform = parser.source_platform source_platform = parser.source_platform
server_name = parser.server_name server_name = parser.server_name
database_name = parser.database_name database_name = parser.database_name
@ -236,8 +530,7 @@ class DebeziumSourceConnectorLineages:
target_platform="kafka", target_platform="kafka",
) )
lineages.append(lineage) lineages.append(lineage)
self.connector_manifest.lineages = lineages
return lineages
class KafkaConnectSource(Source): class KafkaConnectSource(Source):
@ -256,7 +549,6 @@ class KafkaConnectSource(Source):
super().__init__(ctx) super().__init__(ctx)
self.config = config self.config = config
self.report = KafkaConnectSourceReport() self.report = KafkaConnectSourceReport()
self.session = requests.Session() self.session = requests.Session()
self.session.headers.update( self.session.headers.update(
{ {
@ -270,6 +562,9 @@ class KafkaConnectSource(Source):
test_response.raise_for_status() test_response.raise_for_status()
logger.info(f"Connection to {self.config.connect_uri} is ok") logger.info(f"Connection to {self.config.connect_uri} is ok")
if not jpype.isJVMStarted():
jpype.startJVM()
@classmethod @classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> Source: def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
config = KafkaConnectSourceConfig.parse_obj(config_dict) config = KafkaConnectSourceConfig.parse_obj(config_dict)
@ -300,32 +595,32 @@ class KafkaConnectSource(Source):
# Populate Source Connector metadata # Populate Source Connector metadata
if connector_manifest.type == "source": if connector_manifest.type == "source":
topics_response = self.session.get( topics = self.session.get(
f"{self.config.connect_uri}/connectors/{c}/topics", f"{self.config.connect_uri}/connectors/{c}/topics",
) ).json()
topics = topics_response.json()
connector_manifest.topic_names = topics[c]["topics"] connector_manifest.topic_names = topics[c]["topics"]
tasks = self.session.get(
f"{self.config.connect_uri}/connectors/{c}/tasks",
).json()
connector_manifest.tasks = tasks
# JDBC source connector lineages # JDBC source connector lineages
if connector_manifest.config.get("connector.class").__eq__( if connector_manifest.config.get("connector.class").__eq__(
"io.confluent.connect.jdbc.JdbcSourceConnector" "io.confluent.connect.jdbc.JdbcSourceConnector"
): ):
jdbc_source_lineages = JDBCSourceConnectorLineages( connector_manifest = ConfluentJDBCSourceConnector(
connector_manifest=connector_manifest connector_manifest=connector_manifest, report=self.report
) ).connector_manifest
connector_manifest.lineages.extend(
jdbc_source_lineages.get_lineages()
)
else: else:
# Debezium Source Connector lineages # Debezium Source Connector lineages
try: try:
debezium_source_lineages = DebeziumSourceConnectorLineages( connector_manifest = DebeziumSourceConnector(
connector_manifest=connector_manifest connector_manifest=connector_manifest
) ).connector_manifest
connector_manifest.lineages.extend(
debezium_source_lineages.get_lineages()
)
except ValueError as err: except ValueError as err:
logger.warning( logger.warning(
f"Skipping connector {connector_manifest.name} due to error: {err}" f"Skipping connector {connector_manifest.name} due to error: {err}"
@ -337,7 +632,7 @@ class KafkaConnectSource(Source):
# TODO: Sink Connector not yet implemented # TODO: Sink Connector not yet implemented
self.report.report_dropped(connector_manifest.name) self.report.report_dropped(connector_manifest.name)
logger.warning( logger.warning(
f"Skipping connector {connector_manifest.name}. Sink Connector not yet implemented" f"Skipping connector {connector_manifest.name}. Lineage for Sink Connector not yet implemented"
) )
pass pass
@ -351,29 +646,29 @@ class KafkaConnectSource(Source):
connector_name = connector.name connector_name = connector.name
connector_type = connector.type connector_type = connector.type
connector_class = connector.config.get("connector.class") connector_class = connector.config.get("connector.class")
flow_property_bag = connector.flow_property_bag
# connector_url = connector.url # NOTE: this will expose connector credential when used # connector_url = connector.url # NOTE: this will expose connector credential when used
flow_urn = builder.make_data_flow_urn( flow_urn = builder.make_data_flow_urn(
"kafka-connect", connector_name, self.config.env "kafka-connect", connector_name, self.config.env
) )
flow_property_bag: Optional[Dict[str, str]] = None
mce = models.MetadataChangeEventClass( mcp = MetadataChangeProposalWrapper(
proposedSnapshot=models.DataFlowSnapshotClass( entityType="dataFlow",
urn=flow_urn, entityUrn=flow_urn,
aspects=[ changeType=models.ChangeTypeClass.UPSERT,
models.DataFlowInfoClass( aspectName="dataFlowInfo",
name=connector_name, aspect=models.DataFlowInfoClass(
description=f"{connector_type.capitalize()} connector using `{connector_class}` plugin.", name=connector_name,
customProperties=flow_property_bag, description=f"{connector_type.capitalize()} connector using `{connector_class}` plugin.",
# externalUrl=connector_url, # NOTE: this will expose connector credential when used customProperties=flow_property_bag,
), # externalUrl=connector_url, # NOTE: this will expose connector credential when used
# ownership, ),
# tags,
],
)
) )
for c in [connector_name]: for proposal in [mcp]:
wu = MetadataWorkUnit(id=c, mce=mce) wu = MetadataWorkUnit(
id=f"kafka-connect.{connector_name}.{proposal.aspectName}", mcp=proposal
)
self.report.report_workunit(wu) self.report.report_workunit(wu)
yield wu yield wu
@ -386,8 +681,6 @@ class KafkaConnectSource(Source):
"kafka-connect", connector_name, self.config.env "kafka-connect", connector_name, self.config.env
) )
job_property_bag: Optional[Dict[str, str]] = None
lineages = connector.lineages lineages = connector.lineages
if lineages: if lineages:
for lineage in lineages: for lineage in lineages:
@ -395,34 +688,58 @@ class KafkaConnectSource(Source):
source_platform = lineage.source_platform source_platform = lineage.source_platform
target_dataset = lineage.target_dataset target_dataset = lineage.target_dataset
target_platform = lineage.target_platform target_platform = lineage.target_platform
# job_property_bag = lineage.job_property_bag
job_urn = builder.make_data_job_urn_with_flow(flow_urn, source_dataset) job_id = (
source_dataset
if source_dataset
else f"unknown_source.{target_dataset}"
)
job_urn = builder.make_data_job_urn_with_flow(flow_urn, job_id)
inlets = [builder.make_dataset_urn(source_platform, source_dataset)] inlets = (
[builder.make_dataset_urn(source_platform, source_dataset)]
if source_dataset
else []
)
outlets = [builder.make_dataset_urn(target_platform, target_dataset)] outlets = [builder.make_dataset_urn(target_platform, target_dataset)]
mce = models.MetadataChangeEventClass( mcp = MetadataChangeProposalWrapper(
proposedSnapshot=models.DataJobSnapshotClass( entityType="dataJob",
urn=job_urn, entityUrn=job_urn,
aspects=[ changeType=models.ChangeTypeClass.UPSERT,
models.DataJobInfoClass( aspectName="dataJobInfo",
name=f"{connector_name}:{source_dataset}", aspect=models.DataJobInfoClass(
type="COMMAND", name=f"{connector_name}:{job_id}",
description=None, type="COMMAND",
customProperties=job_property_bag, description=None,
# externalUrl=job_url, # customProperties=job_property_bag
), # externalUrl=job_url,
models.DataJobInputOutputClass( ),
inputDatasets=inlets or [],
outputDatasets=outlets or [],
),
# ownership,
# tags,
],
)
) )
wu = MetadataWorkUnit(id=source_dataset, mce=mce) wu = MetadataWorkUnit(
id=f"kafka-connect.{connector_name}.{job_id}.{mcp.aspectName}",
mcp=mcp,
)
self.report.report_workunit(wu)
yield wu
mcp = MetadataChangeProposalWrapper(
entityType="dataJob",
entityUrn=job_urn,
changeType=models.ChangeTypeClass.UPSERT,
aspectName="dataJobInputOutput",
aspect=models.DataJobInputOutputClass(
inputDatasets=inlets,
outputDatasets=outlets,
),
)
wu = MetadataWorkUnit(
id=f"kafka-connect.{connector_name}.{job_id}.{mcp.aspectName}",
mcp=mcp,
)
self.report.report_workunit(wu) self.report.report_workunit(wu)
yield wu yield wu
@ -438,31 +755,35 @@ class KafkaConnectSource(Source):
target_dataset = lineage.target_dataset target_dataset = lineage.target_dataset
target_platform = lineage.target_platform target_platform = lineage.target_platform
mce = models.MetadataChangeEventClass( mcp = MetadataChangeProposalWrapper(
proposedSnapshot=models.DatasetSnapshotClass( entityType="dataset",
urn=builder.make_dataset_urn( entityUrn=builder.make_dataset_urn(
target_platform, target_dataset, self.config.env target_platform, target_dataset, self.config.env
), ),
aspects=[ changeType=models.ChangeTypeClass.UPSERT,
models.UpstreamLineageClass( aspectName="dataPlatformInstance",
upstreams=[ aspect=models.DataPlatformInstanceClass(platform=target_platform),
models.UpstreamClass(
dataset=builder.make_dataset_urn(
source_platform,
source_dataset,
self.config.env,
),
type=models.DatasetLineageTypeClass.TRANSFORMED,
)
]
)
],
)
) )
wu = MetadataWorkUnit(id=source_dataset, mce=mce) wu = MetadataWorkUnit(id=target_dataset, mcp=mcp)
self.report.report_workunit(wu) self.report.report_workunit(wu)
yield wu yield wu
if source_dataset:
mcp = MetadataChangeProposalWrapper(
entityType="dataset",
entityUrn=builder.make_dataset_urn(
source_platform, source_dataset, self.config.env
),
changeType=models.ChangeTypeClass.UPSERT,
aspectName="dataPlatformInstance",
aspect=models.DataPlatformInstanceClass(
platform=source_platform
),
)
wu = MetadataWorkUnit(id=source_dataset, mcp=mcp)
self.report.report_workunit(wu)
yield wu
def get_workunits(self) -> Iterable[MetadataWorkUnit]: def get_workunits(self) -> Iterable[MetadataWorkUnit]:
connectors_manifest = self.get_connectors_manifest() connectors_manifest = self.get_connectors_manifest()

View File

@ -0,0 +1,57 @@
---
version: '3.8'
services:
connect:
image: confluentinc/cp-kafka-connect:6.2.1
env_file: ./../kafka-connect/setup/connect.env
container_name: test_connect
hostname: test_connect
depends_on:
- zookeeper
- broker
- mysqldb
ports:
- "58083:58083"
# volumes:
# - ./../kafka-connect/setup/confluentinc-kafka-connect-jdbc-10.2.5:/usr/local/share/kafka/plugins/confluentinc-kafka-connect-jdbc-10.2.5
# - ./../kafka-connect/setup/confluentinc-connect-transforms-1.4.1:/usr/local/share/kafka/plugins/confluentinc-connect-transforms-1.4.1
# - ./../kafka-connect/setup/debezium-debezium-connector-mysql-1.7.0:/usr/local/share/kafka/plugins/debezium-debezium-connector-mysql-1.7.0
command:
- bash
- -c
- |
echo "Installing Connector"
#
confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.2.5
#
confluent-hub install --no-prompt confluentinc/connect-transforms:1.4.1
#
confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.0
#
curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.27.tar.gz" \
| tar -xzf - -C /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib \
--strip-components=1 mysql-connector-java-8.0.27/mysql-connector-java-8.0.27.jar
#
echo "Launching Kafka Connect worker"
#
/etc/confluent/docker/run &
#
sleep infinity
mysqldb:
image: mysql:5.7
environment:
MYSQL_ROOT_PASSWORD: rootpwd
MYSQL_USER: foo
MYSQL_PASSWORD: datahub
MYSQL_DATABASE: librarydb
container_name: test_mysql
hostname: test_mysql
ports:
- "53306:3306"
volumes:
- ./../kafka-connect/setup/conf/mysqld.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf
- ./../kafka-connect/setup/mysql-setup.sql:/docker-entrypoint-initdb.d/mysql-setup.sql
volumes:
test_zkdata:

View File

@ -0,0 +1,379 @@
[
{
"auditHeader": null,
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(kafka-connect,mysql_sink,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"value": "{\"customProperties\": {}, \"name\": \"mysql_sink\", \"description\": \"Sink connector using `io.confluent.connect.jdbc.JdbcSinkConnector` plugin.\"}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(kafka-connect,debezium-mysql-connector,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"value": "{\"customProperties\": {}, \"name\": \"debezium-mysql-connector\", \"description\": \"Source connector using `io.debezium.connector.mysql.MySqlConnector` plugin.\"}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,debezium-mysql-connector,PROD),librarydb.member)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"value": "{\"customProperties\": {}, \"name\": \"debezium-mysql-connector:librarydb.member\", \"type\": {\"string\": \"COMMAND\"}}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,debezium-mysql-connector,PROD),librarydb.member)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:mysql,librarydb.member,PROD)\"], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:kafka,debezium.topics.librarydb.member,PROD)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,debezium-mysql-connector,PROD),librarydb.book)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"value": "{\"customProperties\": {}, \"name\": \"debezium-mysql-connector:librarydb.book\", \"type\": {\"string\": \"COMMAND\"}}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,debezium-mysql-connector,PROD),librarydb.book)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:mysql,librarydb.book,PROD)\"], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:kafka,debezium.topics.librarydb.book,PROD)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(kafka-connect,mysql_source3,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"value": "{\"customProperties\": {\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\", \"incrementing.column.name\": \"id\", \"tasks.max\": \"1\", \"transforms\": \"TotalReplacement\", \"transforms.TotalReplacement.type\": \"org.apache.kafka.connect.transforms.RegexRouter\", \"table.whitelist\": \"book\", \"mode\": \"incrementing\", \"name\": \"mysql_source3\", \"connection.url\": \"mysql://test_mysql:3306/librarydb\", \"transforms.TotalReplacement.regex\": \".*\", \"transforms.TotalReplacement.replacement\": \"my-new-topic\"}, \"name\": \"mysql_source3\", \"description\": \"Source connector using `io.confluent.connect.jdbc.JdbcSourceConnector` plugin.\"}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source3,PROD),librarydb.book)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"value": "{\"customProperties\": {}, \"name\": \"mysql_source3:librarydb.book\", \"type\": {\"string\": \"COMMAND\"}}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source3,PROD),librarydb.book)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:mysql,librarydb.book,PROD)\"], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:kafka,my-new-topic,PROD)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(kafka-connect,mysql_source2,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"value": "{\"customProperties\": {\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\", \"mode\": \"incrementing\", \"incrementing.column.name\": \"id\", \"tasks.max\": \"1\", \"transforms\": \"TotalReplacement\", \"name\": \"mysql_source2\", \"connection.url\": \"mysql://test_mysql:3306/librarydb\", \"transforms.TotalReplacement.regex\": \".*(book)\", \"transforms.TotalReplacement.type\": \"org.apache.kafka.connect.transforms.RegexRouter\", \"transforms.TotalReplacement.replacement\": \"my-new-topic-$1\"}, \"name\": \"mysql_source2\", \"description\": \"Source connector using `io.confluent.connect.jdbc.JdbcSourceConnector` plugin.\"}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source2,PROD),librarydb.book)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"value": "{\"customProperties\": {}, \"name\": \"mysql_source2:librarydb.book\", \"type\": {\"string\": \"COMMAND\"}}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source2,PROD),librarydb.book)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:mysql,librarydb.book,PROD)\"], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:kafka,my-new-topic-book,PROD)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source2,PROD),librarydb.member)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"value": "{\"customProperties\": {}, \"name\": \"mysql_source2:librarydb.member\", \"type\": {\"string\": \"COMMAND\"}}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source2,PROD),librarydb.member)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:mysql,librarydb.member,PROD)\"], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:kafka,member,PROD)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(kafka-connect,mysql_source1,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"value": "{\"customProperties\": {\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\", \"mode\": \"incrementing\", \"incrementing.column.name\": \"id\", \"topic.prefix\": \"test-mysql-jdbc-\", \"tasks.max\": \"1\", \"name\": \"mysql_source1\", \"connection.url\": \"mysql://test_mysql:3306/librarydb\"}, \"name\": \"mysql_source1\", \"description\": \"Source connector using `io.confluent.connect.jdbc.JdbcSourceConnector` plugin.\"}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source1,PROD),librarydb.book)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"value": "{\"customProperties\": {}, \"name\": \"mysql_source1:librarydb.book\", \"type\": {\"string\": \"COMMAND\"}}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source1,PROD),librarydb.book)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:mysql,librarydb.book,PROD)\"], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:kafka,test-mysql-jdbc-book,PROD)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source1,PROD),librarydb.member)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"value": "{\"customProperties\": {}, \"name\": \"mysql_source1:librarydb.member\", \"type\": {\"string\": \"COMMAND\"}}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source1,PROD),librarydb.member)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:mysql,librarydb.member,PROD)\"], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:kafka,test-mysql-jdbc-member,PROD)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(kafka-connect,mysql_source5,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"value": "{\"customProperties\": {\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\", \"incrementing.column.name\": \"id\", \"transforms.changetopic.type\": \"io.confluent.connect.transforms.ExtractTopic$Value\", \"tasks.max\": \"1\", \"transforms\": \"changetopic\", \"transforms.changetopic.field\": \"name\", \"table.whitelist\": \"book\", \"mode\": \"incrementing\", \"topic.prefix\": \"test-mysql-jdbc2-\", \"name\": \"mysql_source5\", \"connection.url\": \"mysql://test_mysql:3306/librarydb\"}, \"name\": \"mysql_source5\", \"description\": \"Source connector using `io.confluent.connect.jdbc.JdbcSourceConnector` plugin.\"}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source5,PROD),unknown_source.Book3)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"value": "{\"customProperties\": {}, \"name\": \"mysql_source5:unknown_source.Book3\", \"type\": {\"string\": \"COMMAND\"}}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source5,PROD),unknown_source.Book3)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"value": "{\"inputDatasets\": [], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:kafka,Book3,PROD)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source5,PROD),unknown_source.Book1)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"value": "{\"customProperties\": {}, \"name\": \"mysql_source5:unknown_source.Book1\", \"type\": {\"string\": \"COMMAND\"}}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source5,PROD),unknown_source.Book1)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"value": "{\"inputDatasets\": [], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:kafka,Book1,PROD)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source5,PROD),unknown_source.Book2)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"value": "{\"customProperties\": {}, \"name\": \"mysql_source5:unknown_source.Book2\", \"type\": {\"string\": \"COMMAND\"}}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source5,PROD),unknown_source.Book2)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"value": "{\"inputDatasets\": [], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:kafka,Book2,PROD)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(kafka-connect,mysql_source4,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"value": "{\"customProperties\": {\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\", \"mode\": \"incrementing\", \"incrementing.column.name\": \"id\", \"topic.prefix\": \"query-topic\", \"tasks.max\": \"1\", \"query\": \"select * from member\", \"name\": \"mysql_source4\", \"connection.url\": \"mysql://test_mysql:3306/librarydb\"}, \"name\": \"mysql_source4\", \"description\": \"Source connector using `io.confluent.connect.jdbc.JdbcSourceConnector` plugin.\"}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source4,PROD),unknown_source.query-topic)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"value": "{\"customProperties\": {}, \"name\": \"mysql_source4:unknown_source.query-topic\", \"type\": {\"string\": \"COMMAND\"}}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source4,PROD),unknown_source.query-topic)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"value": "{\"inputDatasets\": [], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:kafka,query-topic,PROD)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
}
]

View File

@ -0,0 +1,15 @@
---
run_id: kafka-connect-run
# see https://datahubproject.io/docs/metadata-ingestion/source_docs/kafka-connect for complete documentation
source:
type: "kafka-connect"
config:
connect_uri: "http://localhost:58083"
construct_lineage_workunits: False
# see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation
sink:
type: file
config:
filename: "./kafka_connect_mces.json"

View File

@ -0,0 +1,43 @@
# Copyright (c) 2014, 2021, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0,
# as published by the Free Software Foundation.
#
# This program is also distributed with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms,
# as designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an additional
# permission to link the program and your derivative works with the
# separately licensed software that they have included with MySQL.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
#
# The MySQL Server configuration file.
#
# For explanations see
# http://dev.mysql.com/doc/mysql/en/server-system-variables.html
[mysqld]
pid-file = /var/run/mysqld/mysqld.pid
socket = /var/run/mysqld/mysqld.sock
datadir = /var/lib/mysql
#log-error = /var/log/mysql/error.log
# By default we only accept connections from localhost
#bind-address = 127.0.0.1
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0
server-id = 223344
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
expire_logs_days = 10

View File

@ -0,0 +1,18 @@
CONNECT_BOOTSTRAP_SERVERS=test_broker:9092
CONNECT_REST_PORT=58083
CONNECT_GROUP_ID=kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC=_connect-configs
CONNECT_OFFSET_STORAGE_TOPIC=_connect-offsets
CONNECT_STATUS_STORAGE_TOPIC=_connect-status
CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
# CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL='http://schema-registry:8081'
CONNECT_REST_ADVERTISED_HOST_NAME=test_connect
CONNECT_LOG4J_ROOT_LOGLEVEL=INFO
# CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN=[%d] %p %X{connector.context}%m (%c:%L)%n
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1
CONNECT_PLUGIN_PATH=/usr/share/confluent-hub-components, /usr/local/share/kafka/plugins

View File

@ -0,0 +1,23 @@
CREATE TABLE book (
id INTEGER NOT NULL,
name VARCHAR ( 50 ) NOT NULL,
author VARCHAR ( 50 ),
publisher VARCHAR (50),
tags JSON,
genre_ids INTEGER,
PRIMARY KEY (id)
);
CREATE TABLE member (
id INTEGER NOT NULL,
name VARCHAR ( 50 ) NOT NULL,
PRIMARY KEY (id)
);
INSERT INTO book (id, name, author) VALUES (1, 'Book1', 'ABC');
INSERT INTO book (id, name, author) VALUES (2, 'Book2', 'PQR');
INSERT INTO book (id, name, author) VALUES (3, 'Book3', 'XYZ');
INSERT INTO member(id, name) VALUES (1, 'Member1');
INSERT INTO member(id, name) VALUES (2, 'Member2');

View File

@ -0,0 +1,213 @@
import time
import pytest
import requests
from click.testing import CliRunner
from freezegun import freeze_time
from datahub.entrypoints import datahub
from tests.test_helpers import fs_helpers, mce_helpers
from tests.test_helpers.click_helpers import assert_result_ok
from tests.test_helpers.docker_helpers import wait_for_port
FROZEN_TIME = "2021-10-25 13:00:00"
@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time):
test_resources_dir = pytestconfig.rootpath / "tests/integration/kafka-connect"
test_resources_dir_kafka = pytestconfig.rootpath / "tests/integration/kafka"
# Share Compose configurations between files and projects
# https://docs.docker.com/compose/extends/
docker_compose_file = [
str(test_resources_dir_kafka / "docker-compose.yml"),
str(test_resources_dir / "docker-compose.override.yml"),
]
with docker_compose_runner(docker_compose_file, "kafka-connect") as docker_services:
wait_for_port(docker_services, "test_broker", 59092, timeout=120)
wait_for_port(docker_services, "test_connect", 58083, timeout=120)
docker_services.wait_until_responsive(
timeout=30,
pause=1,
check=lambda: requests.get(
"http://localhost:58083/connectors",
).status_code
== 200,
)
# Creating MySQL source with no transformations , only topic prefix
r = requests.post(
"http://localhost:58083/connectors",
headers={"Content-Type": "application/json"},
data="""{
"name": "mysql_source1",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "test-mysql-jdbc-",
"connection.password": "datahub",
"connection.user": "foo",
"tasks.max": "1",
"connection.url": "jdbc:mysql://test_mysql:3306/librarydb"
}
}
""",
)
assert r.status_code == 201 # Created
# Creating MySQL source with regex router transformations , only topic prefix
r = requests.post(
"http://localhost:58083/connectors",
headers={"Content-Type": "application/json"},
data="""{
"name": "mysql_source2",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"incrementing.column.name": "id",
"connection.password": "datahub",
"connection.user": "foo",
"tasks.max": "1",
"connection.url": "jdbc:mysql://test_mysql:3306/librarydb",
"transforms": "TotalReplacement",
"transforms.TotalReplacement.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.TotalReplacement.regex": ".*(book)",
"transforms.TotalReplacement.replacement": "my-new-topic-$1"
}
}
""",
)
assert r.status_code == 201 # Created
# Creating MySQL source with regex router transformations , no topic prefix, table whitelist
r = requests.post(
"http://localhost:58083/connectors",
headers={"Content-Type": "application/json"},
data="""{
"name": "mysql_source3",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"incrementing.column.name": "id",
"table.whitelist": "book",
"connection.password": "datahub",
"connection.user": "foo",
"tasks.max": "1",
"connection.url": "jdbc:mysql://test_mysql:3306/librarydb",
"transforms": "TotalReplacement",
"transforms.TotalReplacement.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.TotalReplacement.regex": ".*",
"transforms.TotalReplacement.replacement": "my-new-topic"
}
}
""",
)
assert r.status_code == 201 # Created
# Creating MySQL source with query , topic prefix
r = requests.post(
"http://localhost:58083/connectors",
headers={"Content-Type": "application/json"},
data="""{
"name": "mysql_source4",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"incrementing.column.name": "id",
"query": "select * from member",
"topic.prefix": "query-topic",
"connection.password": "datahub",
"connection.user": "foo",
"tasks.max": "1",
"connection.url": "jdbc:mysql://test_mysql:3306/librarydb"
}
}
""",
)
assert r.status_code == 201 # Created
# Creating MySQL source with ExtractTopic router transformations - source dataset not added
r = requests.post(
"http://localhost:58083/connectors",
headers={"Content-Type": "application/json"},
data="""{
"name": "mysql_source5",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"incrementing.column.name": "id",
"connection.password": "datahub",
"connection.user": "foo",
"table.whitelist": "book",
"topic.prefix": "test-mysql-jdbc2-",
"tasks.max": "1",
"connection.url": "jdbc:mysql://test_mysql:3306/librarydb",
"transforms": "changetopic",
"transforms.changetopic.type": "io.confluent.connect.transforms.ExtractTopic$Value",
"transforms.changetopic.field": "name"
}
}
""",
)
assert r.status_code == 201 # Created
# Creating MySQL sink connector - not added
r = requests.post(
"http://localhost:58083/connectors",
headers={"Content-Type": "application/json"},
data="""{
"name": "mysql_sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"insert.mode": "insert",
"auto.create": true,
"topics": "my-topic",
"connection.password": "datahub",
"connection.user": "foo",
"tasks.max": "1",
"connection.url": "jdbc:mysql://test_mysql:3306/librarydb"
}
}
""",
)
assert r.status_code == 201 # Created
# Creating Debezium MySQL source connector
r = requests.post(
"http://localhost:58083/connectors",
headers={"Content-Type": "application/json"},
data="""{
"name": "debezium-mysql-connector",
"config": {
"name": "debezium-mysql-connector",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "test_mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "rootpwd",
"database.server.name": "debezium.topics",
"database.history.kafka.bootstrap.servers": "test_broker:9092",
"database.history.kafka.topic": "dbhistory.debeziummysql",
"include.schema.changes": "false"
}
}
""",
)
assert r.status_code == 201 # Created
# Give time for connectors to process the table data
time.sleep(60)
# Run the metadata ingestion pipeline.
runner = CliRunner()
with fs_helpers.isolated_filesystem(tmp_path):
print(tmp_path)
config_file = (test_resources_dir / "kafka_connect_to_file.yml").resolve()
result = runner.invoke(datahub, ["ingest", "-c", f"{config_file}"])
# import pdb;pdb.set_trace();
assert_result_ok(result)
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "kafka_connect_mces.json",
golden_path=test_resources_dir / "kafka_connect_mces_golden.json",
ignore_paths=[],
)

View File

@ -1,6 +1,6 @@
import contextlib import contextlib
import subprocess import subprocess
from typing import Optional from typing import Optional, Union
import pytest import pytest
import pytest_docker.plugin import pytest_docker.plugin
@ -45,9 +45,11 @@ def wait_for_port(
@pytest.fixture @pytest.fixture
def docker_compose_runner(docker_compose_project_name, docker_cleanup): def docker_compose_runner(docker_compose_project_name, docker_cleanup):
@contextlib.contextmanager @contextlib.contextmanager
def run(compose_file_path: str, key: str) -> pytest_docker.plugin.Services: def run(
compose_file_path: Union[str, list], key: str
) -> pytest_docker.plugin.Services:
with pytest_docker.plugin.get_docker_services( with pytest_docker.plugin.get_docker_services(
str(compose_file_path), compose_file_path,
f"{docker_compose_project_name}-{key}", f"{docker_compose_project_name}-{key}",
docker_cleanup, docker_cleanup,
) as docker_services: ) as docker_services: