fix(ingest): kafka-connect - support newer version of debezium (#6943)

Co-authored-by: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com>
Co-authored-by: John Joyce <john@acryl.io>
This commit is contained in:
서재권(Data Platform) 2023-01-10 16:13:00 +09:00 committed by GitHub
parent 54153eaafd
commit 9578e418c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -635,64 +635,64 @@ class DebeziumSourceConnector:
server_name: Optional[str] server_name: Optional[str]
database_name: Optional[str] database_name: Optional[str]
def get_server_name(self, connector_manifest: ConnectorManifest) -> str:
if "topic.prefix" in connector_manifest.config:
return connector_manifest.config["topic.prefix"]
else:
return connector_manifest.config.get("database.server.name", "")
def get_parser( def get_parser(
self, self,
connector_manifest: ConnectorManifest, connector_manifest: ConnectorManifest,
) -> DebeziumParser: ) -> DebeziumParser:
connector_class = connector_manifest.config.get("connector.class", "") connector_class = connector_manifest.config.get("connector.class", "")
if connector_class == "io.debezium.connector.mysql.MySqlConnector": if connector_class == "io.debezium.connector.mysql.MySqlConnector":
# https://debezium.io/documentation/reference/connectors/mysql.html#mysql-topic-names
parser = self.DebeziumParser( parser = self.DebeziumParser(
source_platform="mysql", source_platform="mysql",
server_name=connector_manifest.config.get("database.server.name"), server_name=self.get_server_name(connector_manifest),
database_name=None, database_name=None,
) )
elif connector_class == "MySqlConnector": elif connector_class == "MySqlConnector":
parser = self.DebeziumParser( parser = self.DebeziumParser(
source_platform="mysql", source_platform="mysql",
server_name=connector_manifest.config.get("database.server.name"), server_name=self.get_server_name(connector_manifest),
database_name=None, database_name=None,
) )
elif connector_class == "io.debezium.connector.mongodb.MongoDbConnector": elif connector_class == "io.debezium.connector.mongodb.MongoDbConnector":
# https://debezium.io/documentation/reference/connectors/mongodb.html#mongodb-topic-names
parser = self.DebeziumParser( parser = self.DebeziumParser(
source_platform="mongodb", source_platform="mongodb",
server_name=connector_manifest.config.get("database.server.name"), server_name=self.get_server_name(connector_manifest),
database_name=None, database_name=None,
) )
elif connector_class == "io.debezium.connector.postgresql.PostgresConnector": elif connector_class == "io.debezium.connector.postgresql.PostgresConnector":
# https://debezium.io/documentation/reference/connectors/postgresql.html#postgresql-topic-names
parser = self.DebeziumParser( parser = self.DebeziumParser(
source_platform="postgres", source_platform="postgres",
server_name=connector_manifest.config.get("database.server.name"), server_name=self.get_server_name(connector_manifest),
database_name=connector_manifest.config.get("database.dbname"), database_name=connector_manifest.config.get("database.dbname"),
) )
elif connector_class == "io.debezium.connector.oracle.OracleConnector": elif connector_class == "io.debezium.connector.oracle.OracleConnector":
# https://debezium.io/documentation/reference/connectors/oracle.html#oracle-topic-names
parser = self.DebeziumParser( parser = self.DebeziumParser(
source_platform="oracle", source_platform="oracle",
server_name=connector_manifest.config.get("database.server.name"), server_name=self.get_server_name(connector_manifest),
database_name=connector_manifest.config.get("database.dbname"), database_name=connector_manifest.config.get("database.dbname"),
) )
elif connector_class == "io.debezium.connector.sqlserver.SqlServerConnector": elif connector_class == "io.debezium.connector.sqlserver.SqlServerConnector":
# https://debezium.io/documentation/reference/connectors/sqlserver.html#sqlserver-topic-names
parser = self.DebeziumParser( parser = self.DebeziumParser(
source_platform="mssql", source_platform="mssql",
server_name=connector_manifest.config.get("database.server.name"), server_name=self.get_server_name(connector_manifest),
database_name=connector_manifest.config.get("database.dbname"), database_name=connector_manifest.config.get("database.dbname"),
) )
elif connector_class == "io.debezium.connector.db2.Db2Connector": elif connector_class == "io.debezium.connector.db2.Db2Connector":
# https://debezium.io/documentation/reference/connectors/db2.html#db2-topic-names
parser = self.DebeziumParser( parser = self.DebeziumParser(
source_platform="db2", source_platform="db2",
server_name=connector_manifest.config.get("database.server.name"), server_name=self.get_server_name(connector_manifest),
database_name=connector_manifest.config.get("database.dbname"), database_name=connector_manifest.config.get("database.dbname"),
) )
elif connector_class == "io.debezium.connector.vitess.VitessConnector": elif connector_class == "io.debezium.connector.vitess.VitessConnector":
# https://debezium.io/documentation/reference/connectors/vitess.html#vitess-topic-names
parser = self.DebeziumParser( parser = self.DebeziumParser(
source_platform="vitess", source_platform="vitess",
server_name=connector_manifest.config.get("database.server.name"), server_name=self.get_server_name(connector_manifest),
database_name=connector_manifest.config.get("vitess.keyspace"), database_name=connector_manifest.config.get("vitess.keyspace"),
) )
else: else:
@ -900,13 +900,10 @@ def transform_connector_config(
class KafkaConnectSource(Source): class KafkaConnectSource(Source):
""" """
This plugin extracts the following: This plugin extracts the following:
- Kafka Connect connector as individual `DataFlowSnapshotClass` entity - Kafka Connect connector as individual `DataFlowSnapshotClass` entity
- Creating individual `DataJobSnapshotClass` entity using `{connector_name}:{source_dataset}` naming - Creating individual `DataJobSnapshotClass` entity using `{connector_name}:{source_dataset}` naming
- Lineage information between source database to Kafka topic - Lineage information between source database to Kafka topic
Current limitations: Current limitations:
- works only for - works only for
- JDBC, Debezium, and Mongo source connectors - JDBC, Debezium, and Mongo source connectors
- Generic connectors with user-defined lineage graph - Generic connectors with user-defined lineage graph
@ -948,7 +945,6 @@ class KafkaConnectSource(Source):
def get_connectors_manifest(self) -> List[ConnectorManifest]: def get_connectors_manifest(self) -> List[ConnectorManifest]:
"""Get Kafka Connect connectors manifest using REST API. """Get Kafka Connect connectors manifest using REST API.
Enrich with lineages metadata. Enrich with lineages metadata.
""" """
connectors_manifest = list() connectors_manifest = list()