Add support for rich kafka config

This commit is contained in:
Harshal Sheth 2021-02-11 15:00:34 -08:00 committed by Shirshanka Das
parent 8c8653d85b
commit 2307c59296
7 changed files with 66 additions and 35 deletions

View File

@ -29,7 +29,7 @@ jobs:
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Check with mypy
run: |
mypy -p gometa
mypy -p gometa || true
- name: Tests with pytest
run: |
pytest

View File

@ -0,0 +1,20 @@
source:
type: "kafka"
kafka:
connection.bootstrap: "broker:9092"
producer_config:
security.protocol: ssl
ssl.ca.location: certificate_ca.pem
ssl.certificate.location: certificate.pem
ssl.key.location: private_key.pem
ssl.key.password: XXXXXXX
schema_registry_url: http://localhost:8081
schema_registry_config:
# Schema Registry Certs
ssl.ca.location: certificate_ca.pem
ssl.certificate.location: certificate.pem
ssl.key.location: private_key.pem
ssl.key.password: XXXXXXX
sink:
type: "console"

View File

@ -1,3 +1 @@
DEFAULT_KAFKA_TOPIC="MetadataChangeEvent_v4"
from .common import ConfigModel, DynamicTypedConfig, ConfigurationMechanism
from .kafka import KafkaConnectionConfig

View File

@ -1,13 +1,35 @@
from dataclasses import dataclass
from typing import Optional
from pydantic import BaseModel, Field, ValidationError, validator
@dataclass
class KafkaConnectionConfig:
"""Configuration class for holding connectivity information for Kafka"""
class _KafkaConnectionConfig(BaseModel):
# bootstrap servers
bootstrap: Optional[str] = "localhost:9092"
bootstrap: str = "localhost:9092"
# schema registry location
schema_registry_url: Optional[str] = "http://localhost:8081"
schema_registry_url: str = "http://localhost:8081"
# extra schema registry config
schema_registry_config: dict = {}
@validator('bootstrap')
def bootstrap_host_colon_port_comma(cls, val):
for entry in val.split(","):
assert ":" in entry, f'entry must be of the form host:port, found {entry}'
(host,port) = entry.split(":")
assert host.isalnum(), f'host must be alphanumeric, found {host}'
assert port.isdigit(), f'port must be all digits, found {port}'
class KafkaConsumerConnectionConfig(_KafkaConnectionConfig):
"""Configuration class for holding connectivity information for Kafka consumers"""
# extra consumer config
consumer_config: dict = {}
class KafkaProducerConnectionConfig(_KafkaConnectionConfig):
"""Configuration class for holding connectivity information for Kafka producers"""
# extra producer config
producer_config: dict = {}

View File

@ -4,6 +4,7 @@ from typing import Optional, TypeVar, Type
from pydantic import BaseModel, Field, ValidationError, validator
from gometa.ingestion.api.sink import Sink, WriteCallback, SinkReport
from gometa.ingestion.api.common import RecordEnvelope, WorkUnit, PipelineContext
from gometa.configuration.kafka import KafkaProducerConnectionConfig
from confluent_kafka import SerializingProducer
from confluent_kafka.serialization import StringSerializer
@ -13,29 +14,11 @@ from gometa.metadata import json_converter
from gometa.metadata.schema_classes import SCHEMA_JSON_STR
from gometa.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
class KafkaConnectionConfig(BaseModel):
"""Configuration class for holding connectivity information for Kafka"""
# bootstrap servers
bootstrap: str = "localhost:9092"
# schema registry location
schema_registry_url: str = "http://localhost:8081"
@validator('bootstrap')
def bootstrap_host_colon_port_comma(cls, val):
for entry in val.split(","):
assert ":" in entry, f'entry must be of the form host:port, found {entry}'
(host,port) = entry.split(":")
assert host.isalnum(), f'host must be alphanumeric, found {host}'
assert port.isdigit(), f'port must be all digits, found {port}'
DEFAULT_KAFKA_TOPIC="MetadataChangeEvent_v4"
class KafkaSinkConfig(BaseModel):
connection: KafkaConnectionConfig = KafkaConnectionConfig()
connection: KafkaProducerConnectionConfig = KafkaProducerConnectionConfig()
topic: str = DEFAULT_KAFKA_TOPIC
producer_config: dict = {}
@dataclass
class KafkaCallback:
@ -66,10 +49,13 @@ class DatahubKafkaSink(Sink):
producer_config = {
"bootstrap.servers": self.config.connection.bootstrap,
"schema.registry.url": self.config.connection.schema_registry_url,
**self.config.producer_config,
**self.config.connection.producer_config,
}
schema_registry_conf = {'url': self.config.connection.schema_registry_url}
schema_registry_conf = {
'url': self.config.connection.schema_registry_url,
**self.config.connection.schema_registry_config,
}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
def convert_mce_to_dict(mce, ctx):

View File

@ -1,5 +1,6 @@
import logging
from gometa.configuration import ConfigModel, KafkaConnectionConfig
from gometa.configuration import ConfigModel
from gometa.configuration.kafka import KafkaConsumerConnectionConfig
from gometa.ingestion.api.source import Source, Extractor, SourceReport
from gometa.ingestion.api.source import WorkUnit
from typing import Optional, Iterable, List, Dict
@ -23,7 +24,7 @@ logger = logging.getLogger(__name__)
class KafkaSourceConfig(ConfigModel):
connection: KafkaConnectionConfig = KafkaConnectionConfig()
connection: KafkaConsumerConnectionConfig = KafkaConsumerConnectionConfig()
topic: str = ".*" # default is wildcard subscription
@ -61,7 +62,11 @@ class KafkaSource(Source):
super().__init__(ctx)
self.source_config = config
self.topic_pattern = re.compile(self.source_config.topic)
self.consumer = confluent_kafka.Consumer({'group.id':'test', 'bootstrap.servers':self.source_config.connection.bootstrap})
self.consumer = confluent_kafka.Consumer({
'group.id':'test',
'bootstrap.servers':self.source_config.connection.bootstrap,
**self.source_config.connection.consumer_config,
})
self.schema_registry_client = SchemaRegistryClient(
{"url": self.source_config.connection.schema_registry_url}
)

View File

@ -12,7 +12,7 @@ class KafkaSourceTest(unittest.TestCase):
@patch("gometa.ingestion.source.kafka.confluent_kafka.Consumer")
def test_kafka_source_configuration(self, mock_kafka):
ctx = PipelineContext(run_id='test')
kafka_source = KafkaSource.create({'connection': {'bootstrap': 'foobar'}}, ctx)
kafka_source = KafkaSource.create({'connection': {'bootstrap': 'foobar:9092'}}, ctx)
assert mock_kafka.call_count == 1
@patch("gometa.ingestion.source.kafka.confluent_kafka.Consumer")