diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 6bba479202..13ae9c884a 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -102,7 +102,7 @@ plugins: Dict[str, Set[str]] = { "acryl-pyhive[hive]>=0.6.11" }, "kafka": kafka_common, - "kafka-connect": sql_common | {"requests"}, + "kafka-connect": sql_common | {"requests","JPype1"}, "ldap": {"python-ldap>=2.4"}, "looker": looker_common, "lookml": looker_common | {"lkml>=1.1.0", "sql-metadata==2.2.2"}, @@ -234,6 +234,7 @@ full_test_dev_requirements = { "mariadb", "snowflake", "redash", + "kafka-connect" ] for dependency in plugins[plugin] ), diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py index ce05df1019..35eadbaede 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py @@ -3,13 +3,15 @@ import re from dataclasses import dataclass, field from typing import Dict, Iterable, List, Optional +import jpype +import jpype.imports import requests -from pydantic import BaseModel from sqlalchemy.engine.url import make_url import datahub.emitter.mce_builder as builder import datahub.metadata.schema_classes as models from datahub.configuration.common import AllowDenyPattern, ConfigModel +from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit @@ -26,7 +28,7 @@ class KafkaConnectSourceConfig(ConfigModel): cluster_name: Optional[str] = "connect-cluster" env: str = builder.DEFAULT_ENV construct_lineage_workunits: bool = True - connector_patterns: AllowDenyPattern = AllowDenyPattern(allow=[".*"], deny=["^_.*"]) + connector_patterns: AllowDenyPattern = AllowDenyPattern.allow_all() @dataclass @@ -41,106 +43,29 @@ class KafkaConnectSourceReport(SourceReport): self.filtered.append(connector) -@dataclass -class DebeziumParser: - source_platform: str - server_name: Optional[str] - database_name: Optional[str] - - -@dataclass -class JdbcParser: - db_connection_url: Optional[str] - - @dataclass class KafkaConnectLineage: - """Class to store Kafka Connect lineage mapping""" + """Class to store Kafka Connect lineage mapping, Each instance is potential DataJob""" - source_dataset: str source_platform: str target_dataset: str target_platform: str + job_property_bag: Optional[Dict[str, str]] = None + source_dataset: Optional[str] = None -class ConnectorManifest(BaseModel): +@dataclass +class ConnectorManifest: + """Each instance is potential DataFlow""" + name: str - config: Dict = {} - lineages: Optional[List[KafkaConnectLineage]] = [] - topic_names: Optional[Iterable[str]] = [] type: str - url: Optional[str] - - -def get_jdbc_source_connector_parser( - connector_manifest: ConnectorManifest, -) -> JdbcParser: - return JdbcParser(connector_manifest.config.get("connection.url")) - - -def get_debezium_source_connector_parser( - connector_manifest: ConnectorManifest, -) -> DebeziumParser: - connector_class = connector_manifest.config.get("connector.class", "") - if connector_class == "io.debezium.connector.mysql.MySqlConnector": - # https://debezium.io/documentation/reference/connectors/mysql.html#mysql-topic-names - parser = DebeziumParser( - source_platform="mysql", - server_name=connector_manifest.config.get("database.server.name"), - database_name=None, - ) - elif connector_class == "MySqlConnector": - parser = DebeziumParser( - source_platform="mysql", - server_name=connector_manifest.config.get("database.server.name"), - database_name=None, - ) - elif connector_class == "io.debezium.connector.mongodb.MongoDbConnector": - # https://debezium.io/documentation/reference/connectors/mongodb.html#mongodb-topic-names - parser = DebeziumParser( - source_platform="mongodb", - server_name=connector_manifest.config.get("database.server.name"), - database_name=None, - ) - elif connector_class == "io.debezium.connector.postgresql.PostgresConnector": - # https://debezium.io/documentation/reference/connectors/postgresql.html#postgresql-topic-names - parser = DebeziumParser( - source_platform="postgres", - server_name=connector_manifest.config.get("database.server.name"), - database_name=connector_manifest.config.get("database.dbname"), - ) - elif connector_class == "io.debezium.connector.oracle.OracleConnector": - # https://debezium.io/documentation/reference/connectors/oracle.html#oracle-topic-names - parser = DebeziumParser( - source_platform="oracle", - server_name=connector_manifest.config.get("database.server.name"), - database_name=connector_manifest.config.get("database.dbname"), - ) - elif connector_class == "io.debezium.connector.sqlserver.SqlServerConnector": - # https://debezium.io/documentation/reference/connectors/sqlserver.html#sqlserver-topic-names - parser = DebeziumParser( - source_platform="mssql", - server_name=connector_manifest.config.get("database.server.name"), - database_name=connector_manifest.config.get("database.dbname"), - ) - elif connector_class == "io.debezium.connector.db2.Db2Connector": - # https://debezium.io/documentation/reference/connectors/db2.html#db2-topic-names - parser = DebeziumParser( - source_platform="db2", - server_name=connector_manifest.config.get("database.server.name"), - database_name=connector_manifest.config.get("database.dbname"), - ) - elif connector_class == "io.debezium.connector.vitess.VitessConnector": - # https://debezium.io/documentation/reference/connectors/vitess.html#vitess-topic-names - parser = DebeziumParser( - source_platform="vitess", - server_name=connector_manifest.config.get("database.server.name"), - database_name=connector_manifest.config.get("vitess.keyspace"), - ) - else: - raise ValueError(f"Connector class '{connector_class}' is unknown.") - - return parser + config: Dict + tasks: Dict + url: Optional[str] = None + flow_property_bag: Optional[Dict[str, str]] = None + lineages: List[KafkaConnectLineage] = field(default_factory=list) + topic_names: Iterable[str] = field(default_factory=list) def remove_prefix(text: str, prefix: str) -> str: @@ -150,67 +75,436 @@ def remove_prefix(text: str, prefix: str) -> str: return text -@dataclass -class JDBCSourceConnectorLineages: - connector_manifest: ConnectorManifest +def unquote(string: str, leading_quote: str = '"', trailing_quote: str = None) -> str: + """ + If string starts and ends with a quote, unquote it + """ + trailing_quote = trailing_quote if trailing_quote else leading_quote + if string.startswith(leading_quote) and string.endswith(trailing_quote): + string = string[1:-1] + return string - def get_lineages(self): - lineages: List[KafkaConnectLineage] = list() - parser = get_jdbc_source_connector_parser(self.connector_manifest) - db_connection_url = parser.db_connection_url - url = remove_prefix(str(db_connection_url), "jdbc:") + +@dataclass +class ConfluentJDBCSourceConnector: + connector_manifest: ConnectorManifest + report: KafkaConnectSourceReport + + def __init__( + self, connector_manifest: ConnectorManifest, report: KafkaConnectSourceReport + ) -> None: + self.connector_manifest = connector_manifest + self.report = report + self._extract_lineages() + + REGEXROUTER = "org.apache.kafka.connect.transforms.RegexRouter" + KNOWN_TOPICROUTING_TRANSFORMS = [REGEXROUTER] + # https://kafka.apache.org/documentation/#connect_included_transformation + KAFKA_NONTOPICROUTING_TRANSFORMS = [ + "InsertField", + "InsertField$Key", + "InsertField$Value", + "ReplaceField", + "ReplaceField$Key", + "ReplaceField$Value", + "MaskField", + "MaskField$Key", + "MaskField$Value", + "ValueToKey", + "ValueToKey$Key", + "ValueToKey$Value", + "HoistField", + "HoistField$Key", + "HoistField$Value", + "ExtractField", + "ExtractField$Key", + "ExtractField$Value", + "SetSchemaMetadata", + "SetSchemaMetadata$Key", + "SetSchemaMetadata$Value", + "Flatten", + "Flatten$Key", + "Flatten$Value", + "Cast", + "Cast$Key", + "Cast$Value", + "HeadersFrom", + "HeadersFrom$Key", + "HeadersFrom$Value", + "TimestampConverter", + "Filter", + "InsertHeader", + "DropHeaders", + ] + # https://docs.confluent.io/platform/current/connect/transforms/overview.html + CONFLUENT_NONTOPICROUTING_TRANSFORMS = [ + "Drop", + "Drop$Key", + "Drop$Value", + "Filter", + "Filter$Key", + "Filter$Value", + "TombstoneHandler", + ] + KNOWN_NONTOPICROUTING_TRANSFORMS = ( + KAFKA_NONTOPICROUTING_TRANSFORMS + + [ + "org.apache.kafka.connect.transforms.{}".format(t) + for t in KAFKA_NONTOPICROUTING_TRANSFORMS + ] + + CONFLUENT_NONTOPICROUTING_TRANSFORMS + + [ + "io.confluent.connect.transforms.{}".format(t) + for t in CONFLUENT_NONTOPICROUTING_TRANSFORMS + ] + ) + + @dataclass + class JdbcParser: + db_connection_url: str + source_platform: str + database_name: str + topic_prefix: str + query: str + transforms: list + + def report_warning(self, key: str, reason: str) -> None: + logger.warning(f"{key}: {reason}") + self.report.report_warning(key, reason) + + def get_parser( + self, + connector_manifest: ConnectorManifest, + ) -> JdbcParser: + + url = remove_prefix( + str(connector_manifest.config.get("connection.url")), "jdbc:" + ) url_instance = make_url(url) source_platform = url_instance.drivername database_name = url_instance.database + db_connection_url = f"{url_instance.drivername}://{url_instance.host}:{url_instance.port}/{url_instance.database}" - logging.debug( - f"Extracting source platform: {source_platform} and database name: {database_name} from connection url " + topic_prefix = self.connector_manifest.config.get("topic.prefix", None) + + query = self.connector_manifest.config.get("query", None) + + transform_names = ( + self.connector_manifest.config.get("transforms", "").split(",") + if self.connector_manifest.config.get("transforms") + else [] ) - topic_prefix = ( - str(self.connector_manifest.config.get("topic.prefix")) - if self.connector_manifest.config.get("topic.prefix") - else None + transforms = [] + for name in transform_names: + transform = {"name": name} + transforms.append(transform) + for key in self.connector_manifest.config.keys(): + if key.startswith("transforms.{}.".format(name)): + transform[ + key.replace("transforms.{}.".format(name), "") + ] = self.connector_manifest.config[key] + + return self.JdbcParser( + db_connection_url, + source_platform, + database_name, + topic_prefix, + query, + transforms, ) - query = ( - self.connector_manifest.config.get("query") - if self.connector_manifest.config.get("query") - else None - ) - - if not self.connector_manifest.topic_names: - return lineages - - for topic in self.connector_manifest.topic_names: - # if the connector uses a custom query - if topic_prefix and not query: + def default_get_lineages( + self, + topic_prefix, + database_name, + source_platform, + topic_names=None, + include_source_dataset=True, + ): + lineages: List[KafkaConnectLineage] = list() + if not topic_names: + topic_names = self.connector_manifest.topic_names + for topic in topic_names: + # All good for NO_TRANSFORM or (SINGLE_TRANSFORM and KNOWN_NONTOPICROUTING_TRANSFORM) or (not SINGLE_TRANSFORM and all(KNOWN_NONTOPICROUTING_TRANSFORM)) + # default method - as per earlier implementation + if topic_prefix: source_table = remove_prefix(topic, topic_prefix) else: source_table = topic - dataset_name = ( database_name + "." + source_table if database_name else source_table ) - lineage = KafkaConnectLineage( - source_dataset=dataset_name, + source_dataset=dataset_name if include_source_dataset else None, source_platform=source_platform, target_dataset=topic, target_platform="kafka", ) lineages.append(lineage) - return lineages + def get_table_names(self): + if self.connector_manifest.config.get("table.whitelist"): + return self.connector_manifest.config.get("table.whitelist").split(",") # type: ignore + + if self.connector_manifest.tasks: + sep = "." + leading_quote_char = trailing_quote_char = '"' + quote_method = self.connector_manifest.config.get( + "quote.sql.identifiers", "always" + ) + + tableIds = ",".join( + [task["config"].get("tables") for task in self.connector_manifest.tasks] + ) + if quote_method == "always": + leading_quote_char = tableIds[0] + trailing_quote_char = tableIds[-1] + # This will only work for single character quotes + + tables = [ + unquote(tableId.split(sep)[-1], leading_quote_char, trailing_quote_char) + for tableId in tableIds.split(",") + ] + return tables + + return [] + + def _extract_lineages(self): + lineages: List[KafkaConnectLineage] = list() + parser = self.get_parser(self.connector_manifest) + source_platform = parser.source_platform + database_name = parser.database_name + query = parser.query + topic_prefix = parser.topic_prefix + transforms = parser.transforms + self.connector_manifest.flow_property_bag = self.connector_manifest.config + + # Mask/Remove properties that may reveal credentials + self.connector_manifest.flow_property_bag[ + "connection.url" + ] = parser.db_connection_url + if "connection.password" in self.connector_manifest.flow_property_bag: + del self.connector_manifest.flow_property_bag["connection.password"] + if "connection.user" in self.connector_manifest.flow_property_bag: + del self.connector_manifest.flow_property_bag["connection.user"] + + logging.debug( + f"Extracting source platform: {source_platform} and database name: {database_name} from connection url " + ) + + if not self.connector_manifest.topic_names: + self.connector_manifest.lineages = lineages + return + + if query: + # Lineage source_table can be extracted by parsing query + # For now, we use source table as topic (expected to be same as topic prefix) + for topic in self.connector_manifest.topic_names: + # default method - as per earlier implementation + source_table = topic + dataset_name = ( + database_name + "." + source_table + if database_name + else source_table + ) + lineage = KafkaConnectLineage( + source_platform=source_platform, + target_dataset=topic, + target_platform="kafka", + ) + lineages.append(lineage) + self.report_warning( + self.connector_manifest.name, + "could not find input dataset, the connector has query configuration set", + ) + self.connector_manifest.lineages = lineages + return + + SINGLE_TRANSFORM = len(transforms) == 1 + NO_TRANSFORM = len(transforms) == 0 + UNKNOWN_TRANSFORM = any( + [ + transform["type"] + not in self.KNOWN_TOPICROUTING_TRANSFORMS + + self.KNOWN_NONTOPICROUTING_TRANSFORMS + for transform in transforms + ] + ) + ALL_TRANSFORMS_NON_TOPICROUTING = all( + [ + transform["type"] in self.KNOWN_NONTOPICROUTING_TRANSFORMS + for transform in transforms + ] + ) + + if NO_TRANSFORM or ALL_TRANSFORMS_NON_TOPICROUTING: + self.connector_manifest.lineages = self.default_get_lineages( + database_name=database_name, + source_platform=source_platform, + topic_prefix=topic_prefix, + ) + return + + if SINGLE_TRANSFORM and transforms[0]["type"] == self.REGEXROUTER: + + tables = self.get_table_names() + topic_names = list(self.connector_manifest.topic_names) + + from java.util.regex import Pattern + + for source_table in tables: + topic = topic_prefix + source_table if topic_prefix else source_table + + transform_regex = Pattern.compile(transforms[0]["regex"]) + transform_replacement = transforms[0]["replacement"] + + matcher = transform_regex.matcher(topic) + if matcher.matches(): + topic = matcher.replaceFirst(transform_replacement) + + # Additional check to confirm that the topic present + # in connector topics + + if topic in self.connector_manifest.topic_names: + dataset_name = ( + database_name + "." + source_table + if database_name + else source_table + ) + + lineage = KafkaConnectLineage( + source_dataset=dataset_name, + source_platform=source_platform, + target_dataset=topic, + target_platform="kafka", + ) + topic_names.remove(topic) + lineages.append(lineage) + + if topic_names: + lineages.extend( + self.default_get_lineages( + database_name=database_name, + source_platform=source_platform, + topic_prefix=topic_prefix, + topic_names=topic_names, + include_source_dataset=False, + ) + ) + self.report_warning( + self.connector_manifest.name, + f"could not find input dataset, for connector topics {topic_names}", + ) + self.connector_manifest.lineages = lineages + return + else: + include_source_dataset = True + if SINGLE_TRANSFORM and UNKNOWN_TRANSFORM: + self.report_warning( + self.connector_manifest.name, + f"could not find input dataset, connector has unknown transform - {transforms[0]['type']}", + ) + include_source_dataset = False + if not SINGLE_TRANSFORM and UNKNOWN_TRANSFORM: + self.report_warning( + self.connector_manifest.name, + "could not find input dataset, connector has one or more unknown transforms", + ) + include_source_dataset = False + lineages = self.default_get_lineages( + database_name=database_name, + source_platform=source_platform, + topic_prefix=topic_prefix, + include_source_dataset=include_source_dataset, + ) + self.connector_manifest.lineages = lineages + return + @dataclass -class DebeziumSourceConnectorLineages: +class DebeziumSourceConnector: connector_manifest: ConnectorManifest - def get_lineages(self) -> List[KafkaConnectLineage]: + def __init__(self, connector_manifest: ConnectorManifest) -> None: + self.connector_manifest = connector_manifest + self._extract_lineages() + + @dataclass + class DebeziumParser: + source_platform: str + server_name: Optional[str] + database_name: Optional[str] + + def get_parser( + self, + connector_manifest: ConnectorManifest, + ) -> DebeziumParser: + connector_class = connector_manifest.config.get("connector.class", "") + if connector_class == "io.debezium.connector.mysql.MySqlConnector": + # https://debezium.io/documentation/reference/connectors/mysql.html#mysql-topic-names + parser = self.DebeziumParser( + source_platform="mysql", + server_name=connector_manifest.config.get("database.server.name"), + database_name=None, + ) + elif connector_class == "MySqlConnector": + parser = self.DebeziumParser( + source_platform="mysql", + server_name=connector_manifest.config.get("database.server.name"), + database_name=None, + ) + elif connector_class == "io.debezium.connector.mongodb.MongoDbConnector": + # https://debezium.io/documentation/reference/connectors/mongodb.html#mongodb-topic-names + parser = self.DebeziumParser( + source_platform="mongodb", + server_name=connector_manifest.config.get("database.server.name"), + database_name=None, + ) + elif connector_class == "io.debezium.connector.postgresql.PostgresConnector": + # https://debezium.io/documentation/reference/connectors/postgresql.html#postgresql-topic-names + parser = self.DebeziumParser( + source_platform="postgres", + server_name=connector_manifest.config.get("database.server.name"), + database_name=connector_manifest.config.get("database.dbname"), + ) + elif connector_class == "io.debezium.connector.oracle.OracleConnector": + # https://debezium.io/documentation/reference/connectors/oracle.html#oracle-topic-names + parser = self.DebeziumParser( + source_platform="oracle", + server_name=connector_manifest.config.get("database.server.name"), + database_name=connector_manifest.config.get("database.dbname"), + ) + elif connector_class == "io.debezium.connector.sqlserver.SqlServerConnector": + # https://debezium.io/documentation/reference/connectors/sqlserver.html#sqlserver-topic-names + parser = self.DebeziumParser( + source_platform="mssql", + server_name=connector_manifest.config.get("database.server.name"), + database_name=connector_manifest.config.get("database.dbname"), + ) + elif connector_class == "io.debezium.connector.db2.Db2Connector": + # https://debezium.io/documentation/reference/connectors/db2.html#db2-topic-names + parser = self.DebeziumParser( + source_platform="db2", + server_name=connector_manifest.config.get("database.server.name"), + database_name=connector_manifest.config.get("database.dbname"), + ) + elif connector_class == "io.debezium.connector.vitess.VitessConnector": + # https://debezium.io/documentation/reference/connectors/vitess.html#vitess-topic-names + parser = self.DebeziumParser( + source_platform="vitess", + server_name=connector_manifest.config.get("database.server.name"), + database_name=connector_manifest.config.get("vitess.keyspace"), + ) + else: + raise ValueError(f"Connector class '{connector_class}' is unknown.") + + return parser + + def _extract_lineages(self): lineages: List[KafkaConnectLineage] = list() - parser = get_debezium_source_connector_parser(self.connector_manifest) + parser = self.get_parser(self.connector_manifest) source_platform = parser.source_platform server_name = parser.server_name database_name = parser.database_name @@ -236,8 +530,7 @@ class DebeziumSourceConnectorLineages: target_platform="kafka", ) lineages.append(lineage) - - return lineages + self.connector_manifest.lineages = lineages class KafkaConnectSource(Source): @@ -256,7 +549,6 @@ class KafkaConnectSource(Source): super().__init__(ctx) self.config = config self.report = KafkaConnectSourceReport() - self.session = requests.Session() self.session.headers.update( { @@ -270,6 +562,9 @@ class KafkaConnectSource(Source): test_response.raise_for_status() logger.info(f"Connection to {self.config.connect_uri} is ok") + if not jpype.isJVMStarted(): + jpype.startJVM() + @classmethod def create(cls, config_dict: dict, ctx: PipelineContext) -> Source: config = KafkaConnectSourceConfig.parse_obj(config_dict) @@ -300,32 +595,32 @@ class KafkaConnectSource(Source): # Populate Source Connector metadata if connector_manifest.type == "source": - topics_response = self.session.get( + topics = self.session.get( f"{self.config.connect_uri}/connectors/{c}/topics", - ) + ).json() - topics = topics_response.json() connector_manifest.topic_names = topics[c]["topics"] + tasks = self.session.get( + f"{self.config.connect_uri}/connectors/{c}/tasks", + ).json() + + connector_manifest.tasks = tasks + # JDBC source connector lineages if connector_manifest.config.get("connector.class").__eq__( "io.confluent.connect.jdbc.JdbcSourceConnector" ): - jdbc_source_lineages = JDBCSourceConnectorLineages( - connector_manifest=connector_manifest - ) - connector_manifest.lineages.extend( - jdbc_source_lineages.get_lineages() - ) + connector_manifest = ConfluentJDBCSourceConnector( + connector_manifest=connector_manifest, report=self.report + ).connector_manifest else: # Debezium Source Connector lineages try: - debezium_source_lineages = DebeziumSourceConnectorLineages( + connector_manifest = DebeziumSourceConnector( connector_manifest=connector_manifest - ) - connector_manifest.lineages.extend( - debezium_source_lineages.get_lineages() - ) + ).connector_manifest + except ValueError as err: logger.warning( f"Skipping connector {connector_manifest.name} due to error: {err}" @@ -337,7 +632,7 @@ class KafkaConnectSource(Source): # TODO: Sink Connector not yet implemented self.report.report_dropped(connector_manifest.name) logger.warning( - f"Skipping connector {connector_manifest.name}. Sink Connector not yet implemented" + f"Skipping connector {connector_manifest.name}. Lineage for Sink Connector not yet implemented" ) pass @@ -351,29 +646,29 @@ class KafkaConnectSource(Source): connector_name = connector.name connector_type = connector.type connector_class = connector.config.get("connector.class") + flow_property_bag = connector.flow_property_bag # connector_url = connector.url # NOTE: this will expose connector credential when used flow_urn = builder.make_data_flow_urn( "kafka-connect", connector_name, self.config.env ) - flow_property_bag: Optional[Dict[str, str]] = None - mce = models.MetadataChangeEventClass( - proposedSnapshot=models.DataFlowSnapshotClass( - urn=flow_urn, - aspects=[ - models.DataFlowInfoClass( - name=connector_name, - description=f"{connector_type.capitalize()} connector using `{connector_class}` plugin.", - customProperties=flow_property_bag, - # externalUrl=connector_url, # NOTE: this will expose connector credential when used - ), - # ownership, - # tags, - ], - ) + + mcp = MetadataChangeProposalWrapper( + entityType="dataFlow", + entityUrn=flow_urn, + changeType=models.ChangeTypeClass.UPSERT, + aspectName="dataFlowInfo", + aspect=models.DataFlowInfoClass( + name=connector_name, + description=f"{connector_type.capitalize()} connector using `{connector_class}` plugin.", + customProperties=flow_property_bag, + # externalUrl=connector_url, # NOTE: this will expose connector credential when used + ), ) - for c in [connector_name]: - wu = MetadataWorkUnit(id=c, mce=mce) + for proposal in [mcp]: + wu = MetadataWorkUnit( + id=f"kafka-connect.{connector_name}.{proposal.aspectName}", mcp=proposal + ) self.report.report_workunit(wu) yield wu @@ -386,8 +681,6 @@ class KafkaConnectSource(Source): "kafka-connect", connector_name, self.config.env ) - job_property_bag: Optional[Dict[str, str]] = None - lineages = connector.lineages if lineages: for lineage in lineages: @@ -395,34 +688,58 @@ class KafkaConnectSource(Source): source_platform = lineage.source_platform target_dataset = lineage.target_dataset target_platform = lineage.target_platform + # job_property_bag = lineage.job_property_bag - job_urn = builder.make_data_job_urn_with_flow(flow_urn, source_dataset) + job_id = ( + source_dataset + if source_dataset + else f"unknown_source.{target_dataset}" + ) + job_urn = builder.make_data_job_urn_with_flow(flow_urn, job_id) - inlets = [builder.make_dataset_urn(source_platform, source_dataset)] + inlets = ( + [builder.make_dataset_urn(source_platform, source_dataset)] + if source_dataset + else [] + ) outlets = [builder.make_dataset_urn(target_platform, target_dataset)] - mce = models.MetadataChangeEventClass( - proposedSnapshot=models.DataJobSnapshotClass( - urn=job_urn, - aspects=[ - models.DataJobInfoClass( - name=f"{connector_name}:{source_dataset}", - type="COMMAND", - description=None, - customProperties=job_property_bag, - # externalUrl=job_url, - ), - models.DataJobInputOutputClass( - inputDatasets=inlets or [], - outputDatasets=outlets or [], - ), - # ownership, - # tags, - ], - ) + mcp = MetadataChangeProposalWrapper( + entityType="dataJob", + entityUrn=job_urn, + changeType=models.ChangeTypeClass.UPSERT, + aspectName="dataJobInfo", + aspect=models.DataJobInfoClass( + name=f"{connector_name}:{job_id}", + type="COMMAND", + description=None, + # customProperties=job_property_bag + # externalUrl=job_url, + ), ) - wu = MetadataWorkUnit(id=source_dataset, mce=mce) + wu = MetadataWorkUnit( + id=f"kafka-connect.{connector_name}.{job_id}.{mcp.aspectName}", + mcp=mcp, + ) + self.report.report_workunit(wu) + yield wu + + mcp = MetadataChangeProposalWrapper( + entityType="dataJob", + entityUrn=job_urn, + changeType=models.ChangeTypeClass.UPSERT, + aspectName="dataJobInputOutput", + aspect=models.DataJobInputOutputClass( + inputDatasets=inlets, + outputDatasets=outlets, + ), + ) + + wu = MetadataWorkUnit( + id=f"kafka-connect.{connector_name}.{job_id}.{mcp.aspectName}", + mcp=mcp, + ) self.report.report_workunit(wu) yield wu @@ -438,31 +755,35 @@ class KafkaConnectSource(Source): target_dataset = lineage.target_dataset target_platform = lineage.target_platform - mce = models.MetadataChangeEventClass( - proposedSnapshot=models.DatasetSnapshotClass( - urn=builder.make_dataset_urn( - target_platform, target_dataset, self.config.env - ), - aspects=[ - models.UpstreamLineageClass( - upstreams=[ - models.UpstreamClass( - dataset=builder.make_dataset_urn( - source_platform, - source_dataset, - self.config.env, - ), - type=models.DatasetLineageTypeClass.TRANSFORMED, - ) - ] - ) - ], - ) + mcp = MetadataChangeProposalWrapper( + entityType="dataset", + entityUrn=builder.make_dataset_urn( + target_platform, target_dataset, self.config.env + ), + changeType=models.ChangeTypeClass.UPSERT, + aspectName="dataPlatformInstance", + aspect=models.DataPlatformInstanceClass(platform=target_platform), ) - wu = MetadataWorkUnit(id=source_dataset, mce=mce) + wu = MetadataWorkUnit(id=target_dataset, mcp=mcp) self.report.report_workunit(wu) yield wu + if source_dataset: + mcp = MetadataChangeProposalWrapper( + entityType="dataset", + entityUrn=builder.make_dataset_urn( + source_platform, source_dataset, self.config.env + ), + changeType=models.ChangeTypeClass.UPSERT, + aspectName="dataPlatformInstance", + aspect=models.DataPlatformInstanceClass( + platform=source_platform + ), + ) + + wu = MetadataWorkUnit(id=source_dataset, mcp=mcp) + self.report.report_workunit(wu) + yield wu def get_workunits(self) -> Iterable[MetadataWorkUnit]: connectors_manifest = self.get_connectors_manifest() diff --git a/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml b/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml new file mode 100644 index 0000000000..ae17e7c9ae --- /dev/null +++ b/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml @@ -0,0 +1,57 @@ +--- +version: '3.8' +services: + + connect: + image: confluentinc/cp-kafka-connect:6.2.1 + env_file: ./../kafka-connect/setup/connect.env + container_name: test_connect + hostname: test_connect + depends_on: + - zookeeper + - broker + - mysqldb + ports: + - "58083:58083" +# volumes: +# - ./../kafka-connect/setup/confluentinc-kafka-connect-jdbc-10.2.5:/usr/local/share/kafka/plugins/confluentinc-kafka-connect-jdbc-10.2.5 +# - ./../kafka-connect/setup/confluentinc-connect-transforms-1.4.1:/usr/local/share/kafka/plugins/confluentinc-connect-transforms-1.4.1 +# - ./../kafka-connect/setup/debezium-debezium-connector-mysql-1.7.0:/usr/local/share/kafka/plugins/debezium-debezium-connector-mysql-1.7.0 + command: + - bash + - -c + - | + echo "Installing Connector" + # + confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.2.5 + # + confluent-hub install --no-prompt confluentinc/connect-transforms:1.4.1 + # + confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.0 + # + curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.27.tar.gz" \ + | tar -xzf - -C /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib \ + --strip-components=1 mysql-connector-java-8.0.27/mysql-connector-java-8.0.27.jar + # + echo "Launching Kafka Connect worker" + # + /etc/confluent/docker/run & + # + sleep infinity + mysqldb: + image: mysql:5.7 + environment: + MYSQL_ROOT_PASSWORD: rootpwd + MYSQL_USER: foo + MYSQL_PASSWORD: datahub + MYSQL_DATABASE: librarydb + container_name: test_mysql + hostname: test_mysql + ports: + - "53306:3306" + volumes: + - ./../kafka-connect/setup/conf/mysqld.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf + - ./../kafka-connect/setup/mysql-setup.sql:/docker-entrypoint-initdb.d/mysql-setup.sql +volumes: + test_zkdata: + diff --git a/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_mces_golden.json b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_mces_golden.json new file mode 100644 index 0000000000..6cf123a1e7 --- /dev/null +++ b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_mces_golden.json @@ -0,0 +1,379 @@ +[ + { + "auditHeader": null, + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(kafka-connect,mysql_sink,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "value": "{\"customProperties\": {}, \"name\": \"mysql_sink\", \"description\": \"Sink connector using `io.confluent.connect.jdbc.JdbcSinkConnector` plugin.\"}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(kafka-connect,debezium-mysql-connector,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "value": "{\"customProperties\": {}, \"name\": \"debezium-mysql-connector\", \"description\": \"Source connector using `io.debezium.connector.mysql.MySqlConnector` plugin.\"}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,debezium-mysql-connector,PROD),librarydb.member)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "value": "{\"customProperties\": {}, \"name\": \"debezium-mysql-connector:librarydb.member\", \"type\": {\"string\": \"COMMAND\"}}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,debezium-mysql-connector,PROD),librarydb.member)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:mysql,librarydb.member,PROD)\"], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:kafka,debezium.topics.librarydb.member,PROD)\"]}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,debezium-mysql-connector,PROD),librarydb.book)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "value": "{\"customProperties\": {}, \"name\": \"debezium-mysql-connector:librarydb.book\", \"type\": {\"string\": \"COMMAND\"}}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,debezium-mysql-connector,PROD),librarydb.book)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:mysql,librarydb.book,PROD)\"], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:kafka,debezium.topics.librarydb.book,PROD)\"]}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(kafka-connect,mysql_source3,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "value": "{\"customProperties\": {\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\", \"incrementing.column.name\": \"id\", \"tasks.max\": \"1\", \"transforms\": \"TotalReplacement\", \"transforms.TotalReplacement.type\": \"org.apache.kafka.connect.transforms.RegexRouter\", \"table.whitelist\": \"book\", \"mode\": \"incrementing\", \"name\": \"mysql_source3\", \"connection.url\": \"mysql://test_mysql:3306/librarydb\", \"transforms.TotalReplacement.regex\": \".*\", \"transforms.TotalReplacement.replacement\": \"my-new-topic\"}, \"name\": \"mysql_source3\", \"description\": \"Source connector using `io.confluent.connect.jdbc.JdbcSourceConnector` plugin.\"}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source3,PROD),librarydb.book)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "value": "{\"customProperties\": {}, \"name\": \"mysql_source3:librarydb.book\", \"type\": {\"string\": \"COMMAND\"}}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source3,PROD),librarydb.book)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:mysql,librarydb.book,PROD)\"], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:kafka,my-new-topic,PROD)\"]}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(kafka-connect,mysql_source2,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "value": "{\"customProperties\": {\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\", \"mode\": \"incrementing\", \"incrementing.column.name\": \"id\", \"tasks.max\": \"1\", \"transforms\": \"TotalReplacement\", \"name\": \"mysql_source2\", \"connection.url\": \"mysql://test_mysql:3306/librarydb\", \"transforms.TotalReplacement.regex\": \".*(book)\", \"transforms.TotalReplacement.type\": \"org.apache.kafka.connect.transforms.RegexRouter\", \"transforms.TotalReplacement.replacement\": \"my-new-topic-$1\"}, \"name\": \"mysql_source2\", \"description\": \"Source connector using `io.confluent.connect.jdbc.JdbcSourceConnector` plugin.\"}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source2,PROD),librarydb.book)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "value": "{\"customProperties\": {}, \"name\": \"mysql_source2:librarydb.book\", \"type\": {\"string\": \"COMMAND\"}}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source2,PROD),librarydb.book)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:mysql,librarydb.book,PROD)\"], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:kafka,my-new-topic-book,PROD)\"]}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source2,PROD),librarydb.member)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "value": "{\"customProperties\": {}, \"name\": \"mysql_source2:librarydb.member\", \"type\": {\"string\": \"COMMAND\"}}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source2,PROD),librarydb.member)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:mysql,librarydb.member,PROD)\"], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:kafka,member,PROD)\"]}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(kafka-connect,mysql_source1,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "value": "{\"customProperties\": {\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\", \"mode\": \"incrementing\", \"incrementing.column.name\": \"id\", \"topic.prefix\": \"test-mysql-jdbc-\", \"tasks.max\": \"1\", \"name\": \"mysql_source1\", \"connection.url\": \"mysql://test_mysql:3306/librarydb\"}, \"name\": \"mysql_source1\", \"description\": \"Source connector using `io.confluent.connect.jdbc.JdbcSourceConnector` plugin.\"}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source1,PROD),librarydb.book)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "value": "{\"customProperties\": {}, \"name\": \"mysql_source1:librarydb.book\", \"type\": {\"string\": \"COMMAND\"}}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source1,PROD),librarydb.book)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:mysql,librarydb.book,PROD)\"], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:kafka,test-mysql-jdbc-book,PROD)\"]}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source1,PROD),librarydb.member)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "value": "{\"customProperties\": {}, \"name\": \"mysql_source1:librarydb.member\", \"type\": {\"string\": \"COMMAND\"}}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source1,PROD),librarydb.member)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:mysql,librarydb.member,PROD)\"], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:kafka,test-mysql-jdbc-member,PROD)\"]}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(kafka-connect,mysql_source5,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "value": "{\"customProperties\": {\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\", \"incrementing.column.name\": \"id\", \"transforms.changetopic.type\": \"io.confluent.connect.transforms.ExtractTopic$Value\", \"tasks.max\": \"1\", \"transforms\": \"changetopic\", \"transforms.changetopic.field\": \"name\", \"table.whitelist\": \"book\", \"mode\": \"incrementing\", \"topic.prefix\": \"test-mysql-jdbc2-\", \"name\": \"mysql_source5\", \"connection.url\": \"mysql://test_mysql:3306/librarydb\"}, \"name\": \"mysql_source5\", \"description\": \"Source connector using `io.confluent.connect.jdbc.JdbcSourceConnector` plugin.\"}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source5,PROD),unknown_source.Book3)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "value": "{\"customProperties\": {}, \"name\": \"mysql_source5:unknown_source.Book3\", \"type\": {\"string\": \"COMMAND\"}}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source5,PROD),unknown_source.Book3)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "value": "{\"inputDatasets\": [], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:kafka,Book3,PROD)\"]}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source5,PROD),unknown_source.Book1)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "value": "{\"customProperties\": {}, \"name\": \"mysql_source5:unknown_source.Book1\", \"type\": {\"string\": \"COMMAND\"}}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source5,PROD),unknown_source.Book1)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "value": "{\"inputDatasets\": [], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:kafka,Book1,PROD)\"]}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source5,PROD),unknown_source.Book2)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "value": "{\"customProperties\": {}, \"name\": \"mysql_source5:unknown_source.Book2\", \"type\": {\"string\": \"COMMAND\"}}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source5,PROD),unknown_source.Book2)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "value": "{\"inputDatasets\": [], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:kafka,Book2,PROD)\"]}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(kafka-connect,mysql_source4,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "value": "{\"customProperties\": {\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\", \"mode\": \"incrementing\", \"incrementing.column.name\": \"id\", \"topic.prefix\": \"query-topic\", \"tasks.max\": \"1\", \"query\": \"select * from member\", \"name\": \"mysql_source4\", \"connection.url\": \"mysql://test_mysql:3306/librarydb\"}, \"name\": \"mysql_source4\", \"description\": \"Source connector using `io.confluent.connect.jdbc.JdbcSourceConnector` plugin.\"}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source4,PROD),unknown_source.query-topic)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "value": "{\"customProperties\": {}, \"name\": \"mysql_source4:unknown_source.query-topic\", \"type\": {\"string\": \"COMMAND\"}}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source4,PROD),unknown_source.query-topic)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "value": "{\"inputDatasets\": [], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:kafka,query-topic,PROD)\"]}", + "contentType": "application/json" + }, + "systemMetadata": null + } +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_to_file.yml b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_to_file.yml new file mode 100644 index 0000000000..4974b20bae --- /dev/null +++ b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_to_file.yml @@ -0,0 +1,15 @@ +--- +run_id: kafka-connect-run + +# see https://datahubproject.io/docs/metadata-ingestion/source_docs/kafka-connect for complete documentation +source: + type: "kafka-connect" + config: + connect_uri: "http://localhost:58083" + construct_lineage_workunits: False + +# see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation +sink: + type: file + config: + filename: "./kafka_connect_mces.json" diff --git a/metadata-ingestion/tests/integration/kafka-connect/setup/conf/mysqld.cnf b/metadata-ingestion/tests/integration/kafka-connect/setup/conf/mysqld.cnf new file mode 100644 index 0000000000..a21b512941 --- /dev/null +++ b/metadata-ingestion/tests/integration/kafka-connect/setup/conf/mysqld.cnf @@ -0,0 +1,43 @@ +# Copyright (c) 2014, 2021, Oracle and/or its affiliates. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License, version 2.0, +# as published by the Free Software Foundation. +# +# This program is also distributed with certain software (including +# but not limited to OpenSSL) that is licensed under separate terms, +# as designated in a particular file or component or in included license +# documentation. The authors of MySQL hereby grant you an additional +# permission to link the program and your derivative works with the +# separately licensed software that they have included with MySQL. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License, version 2.0, for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +# +# The MySQL Server configuration file. +# +# For explanations see +# http://dev.mysql.com/doc/mysql/en/server-system-variables.html + +[mysqld] +pid-file = /var/run/mysqld/mysqld.pid +socket = /var/run/mysqld/mysqld.sock +datadir = /var/lib/mysql +#log-error = /var/log/mysql/error.log +# By default we only accept connections from localhost +#bind-address = 127.0.0.1 +# Disabling symbolic-links is recommended to prevent assorted security risks +symbolic-links=0 + +server-id = 223344 +log_bin = mysql-bin +binlog_format = ROW +binlog_row_image = FULL +expire_logs_days = 10 \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/kafka-connect/setup/connect.env b/metadata-ingestion/tests/integration/kafka-connect/setup/connect.env new file mode 100644 index 0000000000..c1088c2fe5 --- /dev/null +++ b/metadata-ingestion/tests/integration/kafka-connect/setup/connect.env @@ -0,0 +1,18 @@ +CONNECT_BOOTSTRAP_SERVERS=test_broker:9092 +CONNECT_REST_PORT=58083 +CONNECT_GROUP_ID=kafka-connect +CONNECT_CONFIG_STORAGE_TOPIC=_connect-configs +CONNECT_OFFSET_STORAGE_TOPIC=_connect-offsets +CONNECT_STATUS_STORAGE_TOPIC=_connect-status +CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter +CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter +CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter +CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter +# CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL='http://schema-registry:8081' +CONNECT_REST_ADVERTISED_HOST_NAME=test_connect +CONNECT_LOG4J_ROOT_LOGLEVEL=INFO +# CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN=[%d] %p %X{connector.context}%m (%c:%L)%n +CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 +CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 +CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 +CONNECT_PLUGIN_PATH=/usr/share/confluent-hub-components, /usr/local/share/kafka/plugins \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/kafka-connect/setup/mysql-setup.sql b/metadata-ingestion/tests/integration/kafka-connect/setup/mysql-setup.sql new file mode 100644 index 0000000000..ae7dea1b8c --- /dev/null +++ b/metadata-ingestion/tests/integration/kafka-connect/setup/mysql-setup.sql @@ -0,0 +1,23 @@ +CREATE TABLE book ( + id INTEGER NOT NULL, + name VARCHAR ( 50 ) NOT NULL, + author VARCHAR ( 50 ), + publisher VARCHAR (50), + tags JSON, + genre_ids INTEGER, + PRIMARY KEY (id) +); + +CREATE TABLE member ( + id INTEGER NOT NULL, + name VARCHAR ( 50 ) NOT NULL, + PRIMARY KEY (id) +); + + +INSERT INTO book (id, name, author) VALUES (1, 'Book1', 'ABC'); +INSERT INTO book (id, name, author) VALUES (2, 'Book2', 'PQR'); +INSERT INTO book (id, name, author) VALUES (3, 'Book3', 'XYZ'); + +INSERT INTO member(id, name) VALUES (1, 'Member1'); +INSERT INTO member(id, name) VALUES (2, 'Member2'); diff --git a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py new file mode 100644 index 0000000000..91e3356317 --- /dev/null +++ b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py @@ -0,0 +1,213 @@ +import time + +import pytest +import requests +from click.testing import CliRunner +from freezegun import freeze_time + +from datahub.entrypoints import datahub +from tests.test_helpers import fs_helpers, mce_helpers +from tests.test_helpers.click_helpers import assert_result_ok +from tests.test_helpers.docker_helpers import wait_for_port + +FROZEN_TIME = "2021-10-25 13:00:00" + + +@freeze_time(FROZEN_TIME) +@pytest.mark.integration +def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time): + test_resources_dir = pytestconfig.rootpath / "tests/integration/kafka-connect" + test_resources_dir_kafka = pytestconfig.rootpath / "tests/integration/kafka" + + # Share Compose configurations between files and projects + # https://docs.docker.com/compose/extends/ + docker_compose_file = [ + str(test_resources_dir_kafka / "docker-compose.yml"), + str(test_resources_dir / "docker-compose.override.yml"), + ] + with docker_compose_runner(docker_compose_file, "kafka-connect") as docker_services: + wait_for_port(docker_services, "test_broker", 59092, timeout=120) + wait_for_port(docker_services, "test_connect", 58083, timeout=120) + docker_services.wait_until_responsive( + timeout=30, + pause=1, + check=lambda: requests.get( + "http://localhost:58083/connectors", + ).status_code + == 200, + ) + # Creating MySQL source with no transformations , only topic prefix + r = requests.post( + "http://localhost:58083/connectors", + headers={"Content-Type": "application/json"}, + data="""{ + "name": "mysql_source1", + "config": { + "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", + "mode": "incrementing", + "incrementing.column.name": "id", + "topic.prefix": "test-mysql-jdbc-", + "connection.password": "datahub", + "connection.user": "foo", + "tasks.max": "1", + "connection.url": "jdbc:mysql://test_mysql:3306/librarydb" + } + } + """, + ) + assert r.status_code == 201 # Created + # Creating MySQL source with regex router transformations , only topic prefix + r = requests.post( + "http://localhost:58083/connectors", + headers={"Content-Type": "application/json"}, + data="""{ + "name": "mysql_source2", + "config": { + "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", + "mode": "incrementing", + "incrementing.column.name": "id", + "connection.password": "datahub", + "connection.user": "foo", + "tasks.max": "1", + "connection.url": "jdbc:mysql://test_mysql:3306/librarydb", + "transforms": "TotalReplacement", + "transforms.TotalReplacement.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.TotalReplacement.regex": ".*(book)", + "transforms.TotalReplacement.replacement": "my-new-topic-$1" + } + } + """, + ) + assert r.status_code == 201 # Created + # Creating MySQL source with regex router transformations , no topic prefix, table whitelist + r = requests.post( + "http://localhost:58083/connectors", + headers={"Content-Type": "application/json"}, + data="""{ + "name": "mysql_source3", + "config": { + "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", + "mode": "incrementing", + "incrementing.column.name": "id", + "table.whitelist": "book", + "connection.password": "datahub", + "connection.user": "foo", + "tasks.max": "1", + "connection.url": "jdbc:mysql://test_mysql:3306/librarydb", + "transforms": "TotalReplacement", + "transforms.TotalReplacement.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.TotalReplacement.regex": ".*", + "transforms.TotalReplacement.replacement": "my-new-topic" + } + } + """, + ) + assert r.status_code == 201 # Created + # Creating MySQL source with query , topic prefix + r = requests.post( + "http://localhost:58083/connectors", + headers={"Content-Type": "application/json"}, + data="""{ + "name": "mysql_source4", + "config": { + "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", + "mode": "incrementing", + "incrementing.column.name": "id", + "query": "select * from member", + "topic.prefix": "query-topic", + "connection.password": "datahub", + "connection.user": "foo", + "tasks.max": "1", + "connection.url": "jdbc:mysql://test_mysql:3306/librarydb" + } + } + """, + ) + assert r.status_code == 201 # Created + # Creating MySQL source with ExtractTopic router transformations - source dataset not added + r = requests.post( + "http://localhost:58083/connectors", + headers={"Content-Type": "application/json"}, + data="""{ + "name": "mysql_source5", + "config": { + "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", + "mode": "incrementing", + "incrementing.column.name": "id", + "connection.password": "datahub", + "connection.user": "foo", + "table.whitelist": "book", + "topic.prefix": "test-mysql-jdbc2-", + "tasks.max": "1", + "connection.url": "jdbc:mysql://test_mysql:3306/librarydb", + "transforms": "changetopic", + "transforms.changetopic.type": "io.confluent.connect.transforms.ExtractTopic$Value", + "transforms.changetopic.field": "name" + } + } + """, + ) + assert r.status_code == 201 # Created + # Creating MySQL sink connector - not added + r = requests.post( + "http://localhost:58083/connectors", + headers={"Content-Type": "application/json"}, + data="""{ + "name": "mysql_sink", + "config": { + "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", + "insert.mode": "insert", + "auto.create": true, + "topics": "my-topic", + "connection.password": "datahub", + "connection.user": "foo", + "tasks.max": "1", + "connection.url": "jdbc:mysql://test_mysql:3306/librarydb" + } + } + """, + ) + assert r.status_code == 201 # Created + + # Creating Debezium MySQL source connector + r = requests.post( + "http://localhost:58083/connectors", + headers={"Content-Type": "application/json"}, + data="""{ + "name": "debezium-mysql-connector", + "config": { + "name": "debezium-mysql-connector", + "connector.class": "io.debezium.connector.mysql.MySqlConnector", + "database.hostname": "test_mysql", + "database.port": "3306", + "database.user": "root", + "database.password": "rootpwd", + "database.server.name": "debezium.topics", + "database.history.kafka.bootstrap.servers": "test_broker:9092", + "database.history.kafka.topic": "dbhistory.debeziummysql", + "include.schema.changes": "false" + } + } + """, + ) + assert r.status_code == 201 # Created + + # Give time for connectors to process the table data + time.sleep(60) + + # Run the metadata ingestion pipeline. + runner = CliRunner() + with fs_helpers.isolated_filesystem(tmp_path): + print(tmp_path) + config_file = (test_resources_dir / "kafka_connect_to_file.yml").resolve() + result = runner.invoke(datahub, ["ingest", "-c", f"{config_file}"]) + # import pdb;pdb.set_trace(); + assert_result_ok(result) + + # Verify the output. + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / "kafka_connect_mces.json", + golden_path=test_resources_dir / "kafka_connect_mces_golden.json", + ignore_paths=[], + ) diff --git a/metadata-ingestion/tests/test_helpers/docker_helpers.py b/metadata-ingestion/tests/test_helpers/docker_helpers.py index 8eab49405e..8f280b613b 100644 --- a/metadata-ingestion/tests/test_helpers/docker_helpers.py +++ b/metadata-ingestion/tests/test_helpers/docker_helpers.py @@ -1,6 +1,6 @@ import contextlib import subprocess -from typing import Optional +from typing import Optional, Union import pytest import pytest_docker.plugin @@ -45,9 +45,11 @@ def wait_for_port( @pytest.fixture def docker_compose_runner(docker_compose_project_name, docker_cleanup): @contextlib.contextmanager - def run(compose_file_path: str, key: str) -> pytest_docker.plugin.Services: + def run( + compose_file_path: Union[str, list], key: str + ) -> pytest_docker.plugin.Services: with pytest_docker.plugin.get_docker_services( - str(compose_file_path), + compose_file_path, f"{docker_compose_project_name}-{key}", docker_cleanup, ) as docker_services: