diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py index 5377cf16d3..0dc00f955f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py @@ -37,6 +37,12 @@ class ProvidedConfig(ConfigModel): value: str +class GenericConnectorConfig(ConfigModel): + connector_name: str + source_dataset: str + source_platform: str + + class KafkaConnectSourceConfig(DatasetLineageProviderConfigBase): # See the Connect REST Interface for details # https://docs.confluent.io/platform/current/connect/references/restapi.html# @@ -67,6 +73,10 @@ class KafkaConnectSourceConfig(DatasetLineageProviderConfigBase): default=None, description='Platform instance mapping to use when constructing URNs. e.g.`platform_instance_map: { "hive": "warehouse" }`', ) + generic_connectors: List[GenericConnectorConfig] = Field( + default=[], + description="Provide lineage graph for sources connectors other than Confluent JDBC Source Connector or Debezium Source Connector", + ) @dataclass @@ -836,6 +846,7 @@ class KafkaConnectSource(Source): - works only for - JDBC and Debezium source connectors + - Generic connectors with user-defined lineage graph - BigQuery sink connector """ @@ -923,20 +934,35 @@ class KafkaConnectSource(Source): config=self.config, report=self.report, ).connector_manifest + elif connector_manifest.config.get("connector.class", "").startswith( + "io.debezium.connector" + ): + connector_manifest = DebeziumSourceConnector( + connector_manifest=connector_manifest, config=self.config + ).connector_manifest else: - # Debezium Source Connector lineages - try: - connector_manifest = DebeziumSourceConnector( - connector_manifest=connector_manifest, config=self.config - ).connector_manifest - - except ValueError as err: + # Find the target connector object in the list, or log an error if unknown. + target_connector = None + for connector in self.config.generic_connectors: + if connector.connector_name == connector_manifest.name: + target_connector = connector + break + if not target_connector: logger.warning( - f"Skipping connector {connector_manifest.name} due to error: {err}" + f"Detected undefined connector {connector_manifest.name}, which is not in the customized connector list. Please refer to Kafka Connect ingestion recipe to define this customized connector." ) - self.report.report_failure(connector_manifest.name, str(err)) continue + for topic in topics: + lineage = KafkaConnectLineage( + source_dataset=target_connector.source_dataset, + source_platform=target_connector.source_platform, + target_dataset=topic, + target_platform="kafka", + ) + + connector_manifest.lineages.append(lineage) + if connector_manifest.type == "sink": if connector_manifest.config.get("connector.class").__eq__( "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector" diff --git a/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml b/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml index cf455601be..77128adcca 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml +++ b/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml @@ -28,6 +28,8 @@ services: # confluent-hub install --no-prompt confluentinc/connect-transforms:1.4.1 # + confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.6.0 + # confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.0 # #confluent-hub install --no-prompt wepay/kafka-connect-bigquery:1.6.8 diff --git a/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_to_file.yml b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_to_file.yml index f74b834451..5adc0202d2 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_to_file.yml +++ b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_to_file.yml @@ -19,6 +19,10 @@ source: connect_to_platform_map: # optional postgres_source: # optional - Connector name postgres: postgres1 # optional - Platform to instance map + generic_connectors: # optional - Generic connector lineage info + - connector_name: generic-connector + source_dataset: generic-dataset + source_platform: generic-platform # see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation sink: diff --git a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py index 56ea383943..0547d354e1 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py +++ b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py @@ -215,6 +215,29 @@ def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, moc }""", ) assert r.status_code == 201 # Created + + # Creating Generic source + r = requests.post( + "http://localhost:58083/connectors", + headers={"Content-Type": "application/json"}, + data="""{ + "name": "generic_source", + "config": { + "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", + "kafka.topic": "my-topic", + "quickstart": "product", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "max.interval": 1000, + "iterations": 10000000, + "tasks.max": "1" + } + }""", + ) + r.raise_for_status() + assert r.status_code == 201 # Created + # Give time for connectors to process the table data time.sleep(60)