diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py index c8a4c7a6ab..b3fa5e3401 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py @@ -626,12 +626,17 @@ class MongoSourceConnector: @dataclass class DebeziumSourceConnector: connector_manifest: ConnectorManifest + report: KafkaConnectSourceReport def __init__( - self, connector_manifest: ConnectorManifest, config: KafkaConnectSourceConfig + self, + connector_manifest: ConnectorManifest, + config: KafkaConnectSourceConfig, + report: KafkaConnectSourceReport, ) -> None: self.connector_manifest = connector_manifest self.config = config + self.report = report self._extract_lineages() @dataclass @@ -683,10 +688,19 @@ class DebeziumSourceConnector: database_name=connector_manifest.config.get("database.dbname"), ) elif connector_class == "io.debezium.connector.sqlserver.SqlServerConnector": + database_name = connector_manifest.config.get( + "database.names" + ) or connector_manifest.config.get("database.dbname") + + if "," in str(database_name): + raise Exception( + f"Only one database is supported for Debezium's SQL Server connector. Found: {database_name}" + ) + parser = self.DebeziumParser( source_platform="mssql", server_name=self.get_server_name(connector_manifest), - database_name=connector_manifest.config.get("database.dbname"), + database_name=database_name, ) elif connector_class == "io.debezium.connector.db2.Db2Connector": parser = self.DebeziumParser( @@ -707,29 +721,37 @@ class DebeziumSourceConnector: def _extract_lineages(self): lineages: List[KafkaConnectLineage] = list() - parser = self.get_parser(self.connector_manifest) - source_platform = parser.source_platform - server_name = parser.server_name - database_name = parser.database_name - topic_naming_pattern = r"({0})\.(\w+\.\w+)".format(server_name) - if not self.connector_manifest.topic_names: - return lineages + try: + parser = self.get_parser(self.connector_manifest) + source_platform = parser.source_platform + server_name = parser.server_name + database_name = parser.database_name + topic_naming_pattern = r"({0})\.(\w+\.\w+)".format(server_name) - for topic in self.connector_manifest.topic_names: - found = re.search(re.compile(topic_naming_pattern), topic) + if not self.connector_manifest.topic_names: + return lineages - if found: - table_name = get_dataset_name(database_name, found.group(2)) + for topic in self.connector_manifest.topic_names: + found = re.search(re.compile(topic_naming_pattern), topic) - lineage = KafkaConnectLineage( - source_dataset=table_name, - source_platform=source_platform, - target_dataset=topic, - target_platform=KAFKA, - ) - lineages.append(lineage) - self.connector_manifest.lineages = lineages + if found: + table_name = get_dataset_name(database_name, found.group(2)) + + lineage = KafkaConnectLineage( + source_dataset=table_name, + source_platform=source_platform, + target_dataset=topic, + target_platform=KAFKA, + ) + lineages.append(lineage) + self.connector_manifest.lineages = lineages + except Exception as e: + self.report.report_warning( + self.connector_manifest.name, f"Error resolving lineage: {e}" + ) + + return @dataclass @@ -1061,7 +1083,9 @@ class KafkaConnectSource(StatefulIngestionSourceBase): "io.debezium.connector" ): connector_manifest = DebeziumSourceConnector( - connector_manifest=connector_manifest, config=self.config + connector_manifest=connector_manifest, + config=self.config, + report=self.report, ).connector_manifest elif ( connector_manifest.config.get(CONNECTOR_CLASS, "")