From dc7e05dd74a73cdb71a05aa9f5e1d277199f7324 Mon Sep 17 00:00:00 2001 From: Suresh Srinivas Date: Wed, 18 Aug 2021 00:41:24 -0700 Subject: [PATCH] Ingestion: Add Confluent Kafka topic and schema connector --- catalog-rest-service/pom.xml | 1 - .../examples/workflows/confluent_kafka.json | 28 ++++ ingestion/requirements.txt | 4 +- ingestion/setup.py | 3 +- .../src/metadata/ingestion/source/kafka.py | 130 ++++++++++++++++++ 5 files changed, 163 insertions(+), 3 deletions(-) create mode 100644 ingestion/examples/workflows/confluent_kafka.json create mode 100644 ingestion/src/metadata/ingestion/source/kafka.py diff --git a/catalog-rest-service/pom.xml b/catalog-rest-service/pom.xml index 8406d420701..8db2855b1d5 100644 --- a/catalog-rest-service/pom.xml +++ b/catalog-rest-service/pom.xml @@ -213,7 +213,6 @@ org.glassfish.jersey.core jersey-client ${jersey-client.version} - test org.junit.jupiter diff --git a/ingestion/examples/workflows/confluent_kafka.json b/ingestion/examples/workflows/confluent_kafka.json new file mode 100644 index 00000000000..e73c0d225f8 --- /dev/null +++ b/ingestion/examples/workflows/confluent_kafka.json @@ -0,0 +1,28 @@ +{ + "source": { + "type": "kafka", + "config": { + "service_name": "local_kafka", + "service_type": "kafka", + "bootstrap_servers": "192.168.1.32:9092", + "schema_registry_url": "http://192.168.1.32:8081", + "filter_pattern": { + "excludes": ["_confluent.*"] + } + } + }, + "metadata_server": { + "type": "metadata-server", + "config": { + "api_endpoint": "http://localhost:8585/api", + "auth_provider_type": "no-auth" + } + }, + "cron": { + "minute": "*/5", + "hour": null, + "day": null, + "month": null, + "day_of_week": null + } +} diff --git a/ingestion/requirements.txt b/ingestion/requirements.txt index eac7ca6a1f7..8cce91be5da 100644 --- a/ingestion/requirements.txt +++ b/ingestion/requirements.txt @@ -11,4 +11,6 @@ spacy~=3.0.5 commonregex~=1.5.4 setuptools~=57.0.0 PyHive~=0.6.4 -ldap3~=2.9.1 \ No newline at end of file +ldap3~=2.9.1 +confluent_kafka>=1.5.0 +fastavro>=1.2.0 \ No newline at end of file diff --git a/ingestion/setup.py b/ingestion/setup.py index c96eef718c8..54c5dfe8770 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -79,12 +79,13 @@ plugins: Dict[str, Set[str]] = { "bigquery-usage": {"google-cloud-logging", "cachetools"}, "elasticsearch": {"elasticsearch~=7.13.1"}, "hive": {"pyhive~=0.6.3", "thrift~=0.13.0", "sasl==0.3.1", "thrift-sasl==0.4.3"}, - "presto": {"pyhive~=0.6.3"}, + "kafka": {"confluent_kafka>=1.5.0", "fastavro>=1.2.0"}, "ldap-users": {"ldap3==2.9.1"}, "mssql": {"sqlalchemy-pytds>=0.3"}, "mssql-odbc": {"pyodbc"}, "mysql": {"pymysql>=1.0.2"}, "oracle": {"cx_Oracle"}, + "presto": {"pyhive~=0.6.3"}, "postgres": {"pymysql>=1.0.2", "psycopg2-binary", "GeoAlchemy2"}, "redshift": {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"}, "redshift-usage": {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"}, diff --git a/ingestion/src/metadata/ingestion/source/kafka.py b/ingestion/src/metadata/ingestion/source/kafka.py new file mode 100644 index 00000000000..79ac3a0ea29 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/kafka.py @@ -0,0 +1,130 @@ +from dataclasses import field, dataclass, Field +from typing import List, Iterable, Optional + +from metadata.config.common import ConfigModel +from metadata.ingestion.api.common import IncludeFilterPattern, Record, logger, WorkflowContext +from metadata.ingestion.api.source import SourceStatus, Source +from fastavro import json_reader +from fastavro import parse_schema + +import confluent_kafka +from confluent_kafka.schema_registry.schema_registry_client import ( + Schema, + SchemaRegistryClient, +) + + +@dataclass +class KafkaSourceStatus(SourceStatus): + topics_scanned: List[str] = field(default_factory=list) + filtered: List[str] = field(default_factory=list) + + def topic_scanned(self, topic: str) -> None: + self.topics_scanned.append(topic) + + def dropped(self, topic: str) -> None: + self.filtered.append(topic) + + +class KafkaSourceConfig(ConfigModel): + bootstrap_servers: str = "localhost:9092" + schema_registry_url: str = "http://localhost:8081" + consumer_config: dict = {} + service_name: str + service_type: str + filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all() + + +@dataclass +class KafkaSource(Source): + config: KafkaSourceConfig + consumer: confluent_kafka.Consumer + report: KafkaSourceStatus + + def __init__(self, config: KafkaSourceConfig, ctx: WorkflowContext): + super().__init__(ctx) + self.config = config + self.status = KafkaSourceStatus() + self.schema_registry_client = SchemaRegistryClient( + {"url": self.config.schema_registry_url} + ) + self.consumer = confluent_kafka.Consumer( + { + "group.id": "test", + "bootstrap.servers": self.config.bootstrap_servers, + **self.config.consumer_config, + } + ) + + @classmethod + def create(cls, config_dict, metadata_config_dict, ctx): + config = KafkaSourceConfig.parse_obj(config_dict) + return cls(config, ctx) + + def prepare(self): + pass + + def next_record(self) -> Iterable[Record]: + topics = self.consumer.list_topics().topics + for t in topics: + if self.config.filter_pattern.included(t): + topic_schema = self._parse_topic_metadata(t) + self.status.topic_scanned(t) + yield topic_schema + else: + self.status.dropped(t) + + def _parse_topic_metadata(self, topic: str) -> Record: + logger.debug(f"topic = {topic}") + dataset_name = topic + + schema: Optional[Schema] = None + try: + registered_schema = self.schema_registry_client.get_latest_version( + topic + "-value" + ) + schema = registered_schema.schema + except Exception as e: + self.status.warning(topic, f"failed to get schema: {e} for topic {topic}") + + # Parse the schema + fields: List[str] = [] + if schema and schema.schema_type == "AVRO": + # "value.id" or "value.[type=string]id" + parsed_schema = parse_schema(schema.schema_str) + elif schema is not None: + self.status.warning( + topic, + f"{schema.schema_type} is not supported" + ) + # Fetch key schema from the registry + key_schema: Optional[Schema] = None + try: + registered_schema = self.schema_registry_client.get_latest_version( + topic + "-key" + ) + key_schema = registered_schema.schema + except Exception as e: + # do not report warnings because it is okay to not have key schemas + logger.debug(f"{topic}: no key schema found. {e}") + pass + + # Parse the key schema + key_fields: List[str] = [] + if key_schema and schema.schema_type == "AVRO": + print(key_schema.schema_str) + elif key_schema is not None: + self.status.warning( + topic, + f"Parsing kafka schema type {key_schema.schema_type} is currently not implemented", + ) + + key_schema_str: Optional[str] = None + return None + + def get_status(self): + return self.status + + def close(self): + if self.consumer: + self.consumer.close()