mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-08 01:31:55 +00:00
95 lines
2.6 KiB
Python
95 lines
2.6 KiB
Python
import json
|
|
import uuid
|
|
from argparse import ArgumentParser
|
|
from typing import Tuple
|
|
|
|
from avro.schema import Schema
|
|
from confluent_kafka import avro
|
|
from confluent_kafka.avro import AvroProducer
|
|
|
|
|
|
def parse_command_line_args():
|
|
arg_parser = ArgumentParser()
|
|
|
|
arg_parser.add_argument("--topic", required=True, help="Topic name")
|
|
arg_parser.add_argument(
|
|
"--bootstrap-servers",
|
|
required=False,
|
|
default="localhost:9092",
|
|
help="Bootstrap server address",
|
|
)
|
|
arg_parser.add_argument(
|
|
"--schema-registry",
|
|
required=False,
|
|
default="http://localhost:8081",
|
|
help="Schema Registry url",
|
|
)
|
|
arg_parser.add_argument(
|
|
"--record-key",
|
|
required=False,
|
|
type=str,
|
|
help="Record key. If not provided, will be a random UUID",
|
|
)
|
|
arg_parser.add_argument(
|
|
"--key-schema-file", required=False, help="File name of key Avro schema to use"
|
|
)
|
|
arg_parser.add_argument("--record-value", required=False, help="Record value")
|
|
arg_parser.add_argument(
|
|
"--value-schema-file", required=False, help="File name of Avro schema to use"
|
|
)
|
|
|
|
return arg_parser.parse_args()
|
|
|
|
|
|
def load_avro_schema_from_file(
|
|
key_schema_file: str, value_schema_file: str
|
|
) -> Tuple[Schema, Schema]:
|
|
key_schema = (
|
|
avro.load(key_schema_file)
|
|
if key_schema_file is not None
|
|
else avro.loads('{"type": "string"}')
|
|
)
|
|
value_schema = (
|
|
avro.load(value_schema_file) if value_schema_file is not None else None
|
|
)
|
|
|
|
return key_schema, value_schema
|
|
|
|
|
|
def send_record(args):
|
|
key_schema, value_schema = load_avro_schema_from_file(
|
|
args.key_schema_file, args.value_schema_file
|
|
)
|
|
|
|
producer_config = {
|
|
"bootstrap.servers": args.bootstrap_servers,
|
|
"schema.registry.url": args.schema_registry,
|
|
}
|
|
|
|
producer = AvroProducer(
|
|
producer_config,
|
|
default_key_schema=key_schema,
|
|
default_value_schema=value_schema,
|
|
)
|
|
|
|
key = json.loads(args.record_key) if args.record_key else str(uuid.uuid4())
|
|
value = json.loads(args.record_value) if args.record_value else None
|
|
|
|
try:
|
|
producer.produce(topic=args.topic, key=key, value=value)
|
|
producer.flush()
|
|
except Exception as e:
|
|
print(
|
|
f"Exception while producing record value - {value} to topic - {args.topic}: {e}"
|
|
)
|
|
raise e
|
|
else:
|
|
print(f"Successfully producing record value - {value} to topic - {args.topic}")
|
|
|
|
|
|
"""
|
|
Creates a Key-Value topic by sending a record to it
|
|
"""
|
|
if __name__ == "__main__":
|
|
send_record(parse_command_line_args())
|