Ingestion: Add Confluent Kafka topic and schema connector

This commit is contained in:
Suresh Srinivas 2021-08-18 01:01:05 -07:00
parent dc7e05dd74
commit 4f6cc54465
2 changed files with 11 additions and 9 deletions

View File

@ -79,7 +79,7 @@ plugins: Dict[str, Set[str]] = {
"bigquery-usage": {"google-cloud-logging", "cachetools"},
"elasticsearch": {"elasticsearch~=7.13.1"},
"hive": {"pyhive~=0.6.3", "thrift~=0.13.0", "sasl==0.3.1", "thrift-sasl==0.4.3"},
"kafka": {"confluent_kafka>=1.5.0", "fastavro>=1.2.0"},
"kafka": {"confluent_kafka>=1.7.0", "fastavro>=1.2.0"},
"ldap-users": {"ldap3==2.9.1"},
"mssql": {"sqlalchemy-pytds>=0.3"},
"mssql-odbc": {"pyodbc"},

View File

@ -8,6 +8,7 @@ from fastavro import json_reader
from fastavro import parse_schema
import confluent_kafka
from confluent_kafka.admin import AdminClient, ConfigResource
from confluent_kafka.schema_registry.schema_registry_client import (
Schema,
SchemaRegistryClient,
@ -38,7 +39,7 @@ class KafkaSourceConfig(ConfigModel):
@dataclass
class KafkaSource(Source):
config: KafkaSourceConfig
consumer: confluent_kafka.Consumer
admin_client: AdminClient
report: KafkaSourceStatus
def __init__(self, config: KafkaSourceConfig, ctx: WorkflowContext):
@ -48,11 +49,9 @@ class KafkaSource(Source):
self.schema_registry_client = SchemaRegistryClient(
{"url": self.config.schema_registry_url}
)
self.consumer = confluent_kafka.Consumer(
self.admin_client = AdminClient(
{
"group.id": "test",
"bootstrap.servers": self.config.bootstrap_servers,
**self.config.consumer_config,
}
)
@ -65,10 +64,13 @@ class KafkaSource(Source):
pass
def next_record(self) -> Iterable[Record]:
topics = self.consumer.list_topics().topics
topics = self.admin_client.list_topics().topics
for t in topics:
if self.config.filter_pattern.included(t):
topic_schema = self._parse_topic_metadata(t)
#resources = [ConfigResource(confluent_kafka.admin.RESOURCE_TOPIC, t)]
#topic_config = self.admin_client.describe_configs(resources)
#logger.info(topic_config)
self.status.topic_scanned(t)
yield topic_schema
else:
@ -91,7 +93,7 @@ class KafkaSource(Source):
fields: List[str] = []
if schema and schema.schema_type == "AVRO":
# "value.id" or "value.[type=string]id"
parsed_schema = parse_schema(schema.schema_str)
logger.info(schema.schema_str)
elif schema is not None:
self.status.warning(
topic,
@ -126,5 +128,5 @@ class KafkaSource(Source):
return self.status
def close(self):
if self.consumer:
self.consumer.close()
if self.admin_client:
self.admin_client.close()