From 710e782f33f30459e8ac6e6b05bd6cf8b475c7c6 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Fri, 24 Jun 2022 20:39:41 -0700 Subject: [PATCH] Fix #5455: Ingestion from a kafka topic fails when sample data is enabled (#5536) Co-authored-by: ulixius9 --- ingestion/setup.py | 7 +--- .../ingestion/source/messaging/kafka.py | 37 ++++++++----------- ingestion/src/metadata/utils/connections.py | 22 ++++++++--- 3 files changed, 33 insertions(+), 33 deletions(-) diff --git a/ingestion/setup.py b/ingestion/setup.py index 1384e8fef2c..b8adcae19d0 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -94,12 +94,7 @@ plugins: Dict[str, Set[str]] = { "thrift-sasl==0.4.3", "presto-types-parser==0.0.2", }, - "kafka": { - "confluent_kafka==1.8.2", - "fastavro>=1.2.0", - "avro-python3", - "confluent_avro", - }, + "kafka": {"confluent_kafka==1.8.2", "fastavro>=1.2.0", "avro-python3"}, "ldap-users": {"ldap3==2.9.1"}, "looker": {"looker-sdk>=22.4.0"}, "mssql": {"sqlalchemy-pytds>=0.3"}, diff --git a/ingestion/src/metadata/ingestion/source/messaging/kafka.py b/ingestion/src/metadata/ingestion/source/messaging/kafka.py index 3ce968764fd..875e00398ae 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/kafka.py +++ b/ingestion/src/metadata/ingestion/source/messaging/kafka.py @@ -17,12 +17,8 @@ from dataclasses import dataclass, field from typing import Iterable, List, Optional import confluent_kafka -from confluent_avro import AvroKeyValueSerde, SchemaRegistry from confluent_kafka.admin import AdminClient, ConfigResource -from confluent_kafka.schema_registry.schema_registry_client import ( - Schema, - SchemaRegistryClient, -) +from confluent_kafka.schema_registry.schema_registry_client import Schema from metadata.generated.schema.api.data.createTopic import CreateTopicRequest from metadata.generated.schema.entity.data.topic import ( @@ -190,7 +186,6 @@ class KafkaSource(Source[CreateTopicRequest]): self.status.failure(topic_name, repr(err)) def _parse_topic_metadata(self, topic: str) -> Optional[Schema]: - logger.debug(f"topic = {topic}") schema: Optional[Schema] = None try: registered_schema = self.schema_registry_client.get_latest_version( @@ -203,31 +198,31 @@ class KafkaSource(Source[CreateTopicRequest]): return schema def _get_sample_data(self, topic_name): + sample_data = [] try: self.consumer_client.subscribe([topic_name.__root__]) - registry_client = SchemaRegistry( - self.service_connection.schemaRegistryURL, - headers={"Content-Type": "application/vnd.schemaregistry.v1+json"}, + logger.info( + f"Kafka consumer polling for sample messages in topic {topic_name.__root__}" ) - avro_serde = AvroKeyValueSerde(registry_client, topic_name.__root__) - logger.info("Kafka consumer polling for sample messages") messages = self.consumer_client.consume(num_messages=10, timeout=10) - sample_data = [] - if len(messages) > 0: - for message in messages: - sample_data.append( - str(avro_serde.value.deserialize(message.value())) - ) - topic_sample_data = TopicSampleData(messages=sample_data) - self.consumer_client.unsubscribe() - return topic_sample_data except Exception as e: logger.error( f"Failed to fetch sample data from topic {topic_name.__root__}" ) logger.error(traceback.format_exc()) logger.error(sys.exc_info()[2]) - return None + else: + if messages: + for message in messages: + sample_data.append( + str( + self.consumer_client._serializer.decode_message( + message.value() + ) + ) + ) + self.consumer_client.unsubscribe() + return TopicSampleData(messages=sample_data) def on_assign(self, a_consumer, partitions): # get offset tuple from the first partition diff --git a/ingestion/src/metadata/utils/connections.py b/ingestion/src/metadata/utils/connections.py index a6d0d26283f..c17d37cdb85 100644 --- a/ingestion/src/metadata/utils/connections.py +++ b/ingestion/src/metadata/utils/connections.py @@ -269,7 +269,6 @@ def _(connection: KafkaConnection, verbose: bool = False) -> KafkaClient: from confluent_kafka.admin import AdminClient, ConfigResource from confluent_kafka.avro import AvroConsumer from confluent_kafka.schema_registry.schema_registry_client import ( - Schema, SchemaRegistryClient, ) @@ -282,11 +281,22 @@ def _(connection: KafkaConnection, verbose: bool = False) -> KafkaClient: if connection.schemaRegistryURL: connection.schemaRegistryConfig["url"] = connection.schemaRegistryURL schema_registry_client = SchemaRegistryClient(connection.schemaRegistryConfig) - admin_client_config["schema.registry.url"] = connection.schemaRegistryURL - admin_client_config["group.id"] = "openmetadata-consumer-1" - admin_client_config["auto.offset.reset"] = "earliest" - admin_client_config["enable.auto.commit"] = False - consumer_client = AvroConsumer(admin_client_config) + connection.schemaRegistryConfig["url"] = str(connection.schemaRegistryURL) + consumer_config = { + **connection.consumerConfig, + "bootstrap.servers": connection.bootstrapServers, + } + if "group.id" not in consumer_config: + consumer_config["group.id"] = "openmetadata-consumer" + if "auto.offset.reset" not in consumer_config: + consumer_config["auto.offset.reset"] = "earliest" + + for key in connection.schemaRegistryConfig: + consumer_config["schema.registry." + key] = connection.schemaRegistryConfig[ + key + ] + logger.debug(consumer_config) + consumer_client = AvroConsumer(consumer_config) return KafkaClient( admin_client=admin_client,