Ingestion: Add Confluent Kafka topic and schema connector

This commit is contained in:
Suresh Srinivas 2021-08-18 00:41:24 -07:00
parent 9ab269b74e
commit dc7e05dd74
5 changed files with 163 additions and 3 deletions

View File

@ -213,7 +213,6 @@
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-client</artifactId>
<version>${jersey-client.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>

View File

@ -0,0 +1,28 @@
{
"source": {
"type": "kafka",
"config": {
"service_name": "local_kafka",
"service_type": "kafka",
"bootstrap_servers": "192.168.1.32:9092",
"schema_registry_url": "http://192.168.1.32:8081",
"filter_pattern": {
"excludes": ["_confluent.*"]
}
}
},
"metadata_server": {
"type": "metadata-server",
"config": {
"api_endpoint": "http://localhost:8585/api",
"auth_provider_type": "no-auth"
}
},
"cron": {
"minute": "*/5",
"hour": null,
"day": null,
"month": null,
"day_of_week": null
}
}

View File

@ -12,3 +12,5 @@ commonregex~=1.5.4
setuptools~=57.0.0
PyHive~=0.6.4
ldap3~=2.9.1
confluent_kafka>=1.5.0
fastavro>=1.2.0

View File

@ -79,12 +79,13 @@ plugins: Dict[str, Set[str]] = {
"bigquery-usage": {"google-cloud-logging", "cachetools"},
"elasticsearch": {"elasticsearch~=7.13.1"},
"hive": {"pyhive~=0.6.3", "thrift~=0.13.0", "sasl==0.3.1", "thrift-sasl==0.4.3"},
"presto": {"pyhive~=0.6.3"},
"kafka": {"confluent_kafka>=1.5.0", "fastavro>=1.2.0"},
"ldap-users": {"ldap3==2.9.1"},
"mssql": {"sqlalchemy-pytds>=0.3"},
"mssql-odbc": {"pyodbc"},
"mysql": {"pymysql>=1.0.2"},
"oracle": {"cx_Oracle"},
"presto": {"pyhive~=0.6.3"},
"postgres": {"pymysql>=1.0.2", "psycopg2-binary", "GeoAlchemy2"},
"redshift": {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"},
"redshift-usage": {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"},

View File

@ -0,0 +1,130 @@
from dataclasses import field, dataclass, Field
from typing import List, Iterable, Optional
from metadata.config.common import ConfigModel
from metadata.ingestion.api.common import IncludeFilterPattern, Record, logger, WorkflowContext
from metadata.ingestion.api.source import SourceStatus, Source
from fastavro import json_reader
from fastavro import parse_schema
import confluent_kafka
from confluent_kafka.schema_registry.schema_registry_client import (
Schema,
SchemaRegistryClient,
)
@dataclass
class KafkaSourceStatus(SourceStatus):
topics_scanned: List[str] = field(default_factory=list)
filtered: List[str] = field(default_factory=list)
def topic_scanned(self, topic: str) -> None:
self.topics_scanned.append(topic)
def dropped(self, topic: str) -> None:
self.filtered.append(topic)
class KafkaSourceConfig(ConfigModel):
bootstrap_servers: str = "localhost:9092"
schema_registry_url: str = "http://localhost:8081"
consumer_config: dict = {}
service_name: str
service_type: str
filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
@dataclass
class KafkaSource(Source):
config: KafkaSourceConfig
consumer: confluent_kafka.Consumer
report: KafkaSourceStatus
def __init__(self, config: KafkaSourceConfig, ctx: WorkflowContext):
super().__init__(ctx)
self.config = config
self.status = KafkaSourceStatus()
self.schema_registry_client = SchemaRegistryClient(
{"url": self.config.schema_registry_url}
)
self.consumer = confluent_kafka.Consumer(
{
"group.id": "test",
"bootstrap.servers": self.config.bootstrap_servers,
**self.config.consumer_config,
}
)
@classmethod
def create(cls, config_dict, metadata_config_dict, ctx):
config = KafkaSourceConfig.parse_obj(config_dict)
return cls(config, ctx)
def prepare(self):
pass
def next_record(self) -> Iterable[Record]:
topics = self.consumer.list_topics().topics
for t in topics:
if self.config.filter_pattern.included(t):
topic_schema = self._parse_topic_metadata(t)
self.status.topic_scanned(t)
yield topic_schema
else:
self.status.dropped(t)
def _parse_topic_metadata(self, topic: str) -> Record:
logger.debug(f"topic = {topic}")
dataset_name = topic
schema: Optional[Schema] = None
try:
registered_schema = self.schema_registry_client.get_latest_version(
topic + "-value"
)
schema = registered_schema.schema
except Exception as e:
self.status.warning(topic, f"failed to get schema: {e} for topic {topic}")
# Parse the schema
fields: List[str] = []
if schema and schema.schema_type == "AVRO":
# "value.id" or "value.[type=string]id"
parsed_schema = parse_schema(schema.schema_str)
elif schema is not None:
self.status.warning(
topic,
f"{schema.schema_type} is not supported"
)
# Fetch key schema from the registry
key_schema: Optional[Schema] = None
try:
registered_schema = self.schema_registry_client.get_latest_version(
topic + "-key"
)
key_schema = registered_schema.schema
except Exception as e:
# do not report warnings because it is okay to not have key schemas
logger.debug(f"{topic}: no key schema found. {e}")
pass
# Parse the key schema
key_fields: List[str] = []
if key_schema and schema.schema_type == "AVRO":
print(key_schema.schema_str)
elif key_schema is not None:
self.status.warning(
topic,
f"Parsing kafka schema type {key_schema.schema_type} is currently not implemented",
)
key_schema_str: Optional[str] = None
return None
def get_status(self):
return self.status
def close(self):
if self.consumer:
self.consumer.close()