diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 15e3acd556..dfd5c5a71f 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -74,7 +74,7 @@ plugins: Dict[str, Set[str]] = { "airflow": {"apache-airflow >= 1.10.2"}, # Source plugins "kafka": kafka_common, - "kafka-connect": {"requests"}, + "kafka-connect": sql_common | {"requests"}, "sqlalchemy": sql_common, "athena": sql_common | {"PyAthena[SQLAlchemy]"}, "bigquery": sql_common | {"pybigquery >= 0.6.0"}, diff --git a/metadata-ingestion/source_docs/kafka-connect.md b/metadata-ingestion/source_docs/kafka-connect.md index cee946fa87..3efe5f9b27 100644 --- a/metadata-ingestion/source_docs/kafka-connect.md +++ b/metadata-ingestion/source_docs/kafka-connect.md @@ -16,7 +16,7 @@ This plugin extracts the following: Current limitations: -- Currently works only for Debezium source connectors. +- Currently works for JDBC and Debezium source connectors only. ## Quickstart recipe @@ -52,7 +52,7 @@ Note that a `.` is used to denote nested fields in the YAML recipe. | `cluster_name` | | `"connect-cluster"` | Cluster to ingest from. | | `connector_patterns.deny` | | | List of regex patterns for connectors to include in ingestion. | | `connector_patterns.allow` | | | List of regex patterns for connectors to exclude from ingestion. | -| `connector_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. | +| `connector_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. | | `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. | ## Compatibility diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py index e60df579c5..510f4bc203 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py @@ -5,6 +5,7 @@ from typing import Dict, Iterable, List, Optional import requests from pydantic import BaseModel +from sqlalchemy.engine.url import make_url import datahub.emitter.mce_builder as builder import datahub.metadata.schema_classes as models @@ -58,6 +59,18 @@ class ConnectorManifest(BaseModel): url: Optional[str] +def get_jdbc_source_connector_parser(connector_manifest): + connector_class = connector_manifest.config.get("connector.class", "") + parser = { + # https://docs.confluent.io/kafka-connect-jdbc/current/source-connector/source_config_options.html + "io.confluent.connect.jdbc.JdbcSourceConnector": { + "db_connection_url": connector_manifest.config.get("connection.url"), + } + } + + return parser.get(connector_class) + + def get_debezium_source_connector_parser(connector_manifest): connector_class = connector_manifest.config.get("connector.class", "") parser = { @@ -113,6 +126,67 @@ def get_debezium_source_connector_parser(connector_manifest): return parser.get(connector_class) +def remove_prefix(text: str, prefix: str) -> str: + if text.startswith(prefix): + index = len(prefix) + return text[index:] + return text + + +@dataclass +class JDBCSourceConnectorLineages: + connector_manifest: ConnectorManifest + + def get_lineages(self): + lineages: List[KafkaConnectLineage] = list() + parser = get_jdbc_source_connector_parser(self.connector_manifest) + db_connection_url = parser.get("db_connection_url") + url = remove_prefix(str(db_connection_url), "jdbc:") + url_instance = make_url(url) + source_platform = url_instance.drivername + database_name = url_instance.database + + logging.debug( + f"Extracting source platform: {source_platform} and database name: {database_name} from connection url " + ) + + topic_prefix = ( + str(self.connector_manifest.config.get("topic.prefix")) + if self.connector_manifest.config.get("topic.prefix") + else None + ) + + query = ( + self.connector_manifest.config.get("query") + if self.connector_manifest.config.get("query") + else None + ) + + if not self.connector_manifest.topic_names: + return lineages + + for topic in self.connector_manifest.topic_names: + # if the connector uses a custom query + if topic_prefix and not query: + source_table = remove_prefix(topic, topic_prefix) + else: + source_table = topic + + dataset_name = ( + database_name + "." + source_table if database_name else source_table + ) + + lineage = KafkaConnectLineage( + source_dataset=dataset_name, + source_platform=source_platform, + target_dataset=topic, + target_platform="kafka", + ) + lineages.append(lineage) + + return lineages + + @dataclass class DebeziumSourceConnectorLineages: connector_manifest: ConnectorManifest @@ -203,14 +277,12 @@ class KafkaConnectSource(Source): manifest = connector_response.json() connector_manifest = ConnectorManifest(**manifest) - # Initialize connector lineages connector_manifest.lineages = list() connector_manifest.url = connector_url # Populate Source Connector metadata if connector_manifest.type == "source": - # connector_config = manifest.get("config", {}) topics_response = self.session.get( f"{self.config.connect_uri}/connectors/{c}/topics", ) @@ -218,13 +290,24 @@ class KafkaConnectSource(Source): topics = topics_response.json() connector_manifest.topic_names = topics[c]["topics"] - # Currently we only support Debezium Source Connector lineages - debezium_source_lineages = DebeziumSourceConnectorLineages( - connector_manifest=connector_manifest - ) - connector_manifest.lineages.extend( - debezium_source_lineages.get_lineages() - ) + # JDBC source connector lineages + if connector_manifest.config.get("connector.class").__eq__( + "io.confluent.connect.jdbc.JdbcSourceConnector" + ): + jdbc_source_lineages = JDBCSourceConnectorLineages( + connector_manifest=connector_manifest + ) + connector_manifest.lineages.extend( + jdbc_source_lineages.get_lineages() + ) + else: + # Debezium Source Connector lineages + debezium_source_lineages = DebeziumSourceConnectorLineages( + connector_manifest=connector_manifest + ) + connector_manifest.lineages.extend( + debezium_source_lineages.get_lineages() + ) if connector_manifest.type == "sink": # TODO: Sink Connector not yet implemented @@ -273,6 +356,7 @@ class KafkaConnectSource(Source): def construct_job_workunits( self, connector: ConnectorManifest ) -> Iterable[MetadataWorkUnit]: + connector_name = connector.name flow_urn = builder.make_data_flow_urn( "kafka-connect", connector_name, self.config.env @@ -357,9 +441,7 @@ class KafkaConnectSource(Source): yield wu def get_workunits(self) -> Iterable[MetadataWorkUnit]: - connectors_manifest = self.get_connectors_manifest() - for connector in connectors_manifest: name = connector.name if self.config.connector_patterns.allowed(name):