Fix #5455: Ingestion from a kafka topic fails when sample data is enabled (#5536)

Co-authored-by: ulixius9 <mayursingal9@gmail.com>
This commit is contained in:
Sriharsha Chintalapani 2022-06-24 20:39:41 -07:00 committed by GitHub
parent ad0c6dad88
commit 710e782f33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 33 additions and 33 deletions

View File

@ -94,12 +94,7 @@ plugins: Dict[str, Set[str]] = {
"thrift-sasl==0.4.3",
"presto-types-parser==0.0.2",
},
"kafka": {
"confluent_kafka==1.8.2",
"fastavro>=1.2.0",
"avro-python3",
"confluent_avro",
},
"kafka": {"confluent_kafka==1.8.2", "fastavro>=1.2.0", "avro-python3"},
"ldap-users": {"ldap3==2.9.1"},
"looker": {"looker-sdk>=22.4.0"},
"mssql": {"sqlalchemy-pytds>=0.3"},

View File

@ -17,12 +17,8 @@ from dataclasses import dataclass, field
from typing import Iterable, List, Optional
import confluent_kafka
from confluent_avro import AvroKeyValueSerde, SchemaRegistry
from confluent_kafka.admin import AdminClient, ConfigResource
from confluent_kafka.schema_registry.schema_registry_client import (
Schema,
SchemaRegistryClient,
)
from confluent_kafka.schema_registry.schema_registry_client import Schema
from metadata.generated.schema.api.data.createTopic import CreateTopicRequest
from metadata.generated.schema.entity.data.topic import (
@ -190,7 +186,6 @@ class KafkaSource(Source[CreateTopicRequest]):
self.status.failure(topic_name, repr(err))
def _parse_topic_metadata(self, topic: str) -> Optional[Schema]:
logger.debug(f"topic = {topic}")
schema: Optional[Schema] = None
try:
registered_schema = self.schema_registry_client.get_latest_version(
@ -203,31 +198,31 @@ class KafkaSource(Source[CreateTopicRequest]):
return schema
def _get_sample_data(self, topic_name):
sample_data = []
try:
self.consumer_client.subscribe([topic_name.__root__])
registry_client = SchemaRegistry(
self.service_connection.schemaRegistryURL,
headers={"Content-Type": "application/vnd.schemaregistry.v1+json"},
logger.info(
f"Kafka consumer polling for sample messages in topic {topic_name.__root__}"
)
avro_serde = AvroKeyValueSerde(registry_client, topic_name.__root__)
logger.info("Kafka consumer polling for sample messages")
messages = self.consumer_client.consume(num_messages=10, timeout=10)
sample_data = []
if len(messages) > 0:
for message in messages:
sample_data.append(
str(avro_serde.value.deserialize(message.value()))
)
topic_sample_data = TopicSampleData(messages=sample_data)
self.consumer_client.unsubscribe()
return topic_sample_data
except Exception as e:
logger.error(
f"Failed to fetch sample data from topic {topic_name.__root__}"
)
logger.error(traceback.format_exc())
logger.error(sys.exc_info()[2])
return None
else:
if messages:
for message in messages:
sample_data.append(
str(
self.consumer_client._serializer.decode_message(
message.value()
)
)
)
self.consumer_client.unsubscribe()
return TopicSampleData(messages=sample_data)
def on_assign(self, a_consumer, partitions):
# get offset tuple from the first partition

View File

@ -269,7 +269,6 @@ def _(connection: KafkaConnection, verbose: bool = False) -> KafkaClient:
from confluent_kafka.admin import AdminClient, ConfigResource
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.schema_registry.schema_registry_client import (
Schema,
SchemaRegistryClient,
)
@ -282,11 +281,22 @@ def _(connection: KafkaConnection, verbose: bool = False) -> KafkaClient:
if connection.schemaRegistryURL:
connection.schemaRegistryConfig["url"] = connection.schemaRegistryURL
schema_registry_client = SchemaRegistryClient(connection.schemaRegistryConfig)
admin_client_config["schema.registry.url"] = connection.schemaRegistryURL
admin_client_config["group.id"] = "openmetadata-consumer-1"
admin_client_config["auto.offset.reset"] = "earliest"
admin_client_config["enable.auto.commit"] = False
consumer_client = AvroConsumer(admin_client_config)
connection.schemaRegistryConfig["url"] = str(connection.schemaRegistryURL)
consumer_config = {
**connection.consumerConfig,
"bootstrap.servers": connection.bootstrapServers,
}
if "group.id" not in consumer_config:
consumer_config["group.id"] = "openmetadata-consumer"
if "auto.offset.reset" not in consumer_config:
consumer_config["auto.offset.reset"] = "earliest"
for key in connection.schemaRegistryConfig:
consumer_config["schema.registry." + key] = connection.schemaRegistryConfig[
key
]
logger.debug(consumer_config)
consumer_client = AvroConsumer(consumer_config)
return KafkaClient(
admin_client=admin_client,