Added consumer config in kafka (#4365)

This commit is contained in:
Ayush Shah 2022-04-23 20:54:25 +05:30 committed by GitHub
parent 6cd30ce945
commit 7c8c51e862
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 57 additions and 17 deletions

View File

@ -33,6 +33,20 @@
"format": "uri",
"default": "http://localhost:8081"
},
"consumerConfig": {
"title": "Consumer Config",
"description": "Confluent Kafka Consumer Config",
"type": "object",
"default": {},
"additionalProperties": true
},
"schemaRegistryConfig": {
"title": "Schema Registry Config",
"description": "Confluent Kafka Schema Registry Config.",
"type": "object",
"default": {},
"additionalProperties": true
},
"supportsMetadataExtraction": {
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"

View File

@ -6,7 +6,9 @@
"config": {
"type": "Kafka",
"bootstrapServers": "localhost:9092",
"schemaRegistryURL": "http://192.168.1.43:8081"
"schemaRegistryURL": "http://192.168.1.43:8081",
"consumerConfig": {},
"schemaRegistryConfig": {}
}
},
"sourceConfig": {
@ -20,11 +22,10 @@
"config": {
}
},
"metadata_server": {
"type": "metadata-server",
"config": {
"api_endpoint": "http://localhost:8585/api",
"auth_provider_type": "no-auth"
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "no-auth"
}
}
}

View File

@ -31,6 +31,7 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.messagingService import (
MessagingService,
MessagingServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
@ -39,6 +40,9 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import logger
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.connection_clients import KafkaClient
from metadata.utils.connections import test_connection
from metadata.utils.filters import filter_by_topic
from metadata.utils.helpers import get_messaging_service_or_create
@ -71,19 +75,16 @@ class KafkaSource(Source[CreateTopicRequest]):
self.source_config = self.config.sourceConfig.config
self.service_connection = self.config.serviceConnection.__root__.config
self.metadata_config = metadata_config
self.metadata = OpenMetadata(self.metadata_config)
self.status = KafkaSourceStatus()
self.kafka_service_config = {
"bootstrapServers": self.service_connection.bootstrapServers,
"schemaRegistryURL": self.service_connection.schemaRegistryURL,
}
self.service = get_messaging_service_or_create(
service_name=config.serviceName,
message_service_type=MessagingServiceType.Kafka.name,
config=self.kafka_service_config,
metadata_config=metadata_config,
self.service = self.metadata.get_service_or_create(
entity=MessagingService, config=config
)
self.service_connection.schemaRegistryConfig[
"url"
] = self.service_connection.schemaRegistryURL
self.schema_registry_client = SchemaRegistryClient(
{"url": self.service_connection.schemaRegistryURL}
self.service_connection.schemaRegistryConfig
)
self.admin_client = AdminClient(
{
@ -190,4 +191,6 @@ class KafkaSource(Source[CreateTopicRequest]):
pass
def test_connection(self) -> None:
pass
test_connection(KafkaClient(client=self.admin_client))
if self.service_connection.schemaRegistryURL:
test_connection(KafkaClient(client=self.schema_registry_client))

View File

@ -34,3 +34,9 @@ class SalesforceClient:
class DeltaLakeClient:
def __init__(self, client) -> None:
self.client = client
@dataclass
class KafkaClient:
def __init__(self, client) -> None:
self.client = client

View File

@ -50,6 +50,7 @@ from metadata.utils.connection_clients import (
DeltaLakeClient,
DynamoClient,
GlueClient,
KafkaClient,
SalesforceClient,
)
from metadata.utils.credentials import set_google_credentials
@ -295,6 +296,21 @@ def _(connection: DeltaLakeConnection, verbose: bool = False):
return deltalake_connection
@test_connection.register
def _(connection: KafkaClient) -> None:
from confluent_kafka.admin import AdminClient
try:
if isinstance(connection.client, AdminClient):
return connection.client.list_topics().topics
else:
return connection.client.get_subjects()
except Exception as err:
raise SourceConnectionException(
f"Unknown error connecting with {connection} - {err}."
)
@test_connection.register
def _(connection: DeltaLakeClient) -> None:
try: