datahub/metadata-ingestion/tests/integration/kafka/create_key_value_topic.py

95 lines
2.6 KiB
Python

import json
import uuid
from argparse import ArgumentParser
from typing import Tuple
from avro.schema import Schema
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
def parse_command_line_args():
arg_parser = ArgumentParser()
arg_parser.add_argument("--topic", required=True, help="Topic name")
arg_parser.add_argument(
"--bootstrap-servers",
required=False,
default="localhost:9092",
help="Bootstrap server address",
)
arg_parser.add_argument(
"--schema-registry",
required=False,
default="http://localhost:8081",
help="Schema Registry url",
)
arg_parser.add_argument(
"--record-key",
required=False,
type=str,
help="Record key. If not provided, will be a random UUID",
)
arg_parser.add_argument(
"--key-schema-file", required=False, help="File name of key Avro schema to use"
)
arg_parser.add_argument("--record-value", required=False, help="Record value")
arg_parser.add_argument(
"--value-schema-file", required=False, help="File name of Avro schema to use"
)
return arg_parser.parse_args()
def load_avro_schema_from_file(
key_schema_file: str, value_schema_file: str
) -> Tuple[Schema, Schema]:
key_schema = (
avro.load(key_schema_file)
if key_schema_file is not None
else avro.loads('{"type": "string"}')
)
value_schema = (
avro.load(value_schema_file) if value_schema_file is not None else None
)
return key_schema, value_schema
def send_record(args):
key_schema, value_schema = load_avro_schema_from_file(
args.key_schema_file, args.value_schema_file
)
producer_config = {
"bootstrap.servers": args.bootstrap_servers,
"schema.registry.url": args.schema_registry,
}
producer = AvroProducer(
producer_config,
default_key_schema=key_schema,
default_value_schema=value_schema,
)
key = json.loads(args.record_key) if args.record_key else str(uuid.uuid4())
value = json.loads(args.record_value) if args.record_value else None
try:
producer.produce(topic=args.topic, key=key, value=value)
producer.flush()
except Exception as e:
print(
f"Exception while producing record value - {value} to topic - {args.topic}: {e}"
)
raise e
else:
print(f"Successfully producing record value - {value} to topic - {args.topic}")
"""
Creates a Key-Value topic by sending a record to it
"""
if __name__ == "__main__":
send_record(parse_command_line_args())