mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-08 17:53:11 +00:00
89 lines
2.4 KiB
Python
89 lines
2.4 KiB
Python
![]() |
import json
|
||
|
import uuid
|
||
|
from argparse import ArgumentParser
|
||
|
from typing import Tuple
|
||
|
|
||
|
from avro.schema import Schema
|
||
|
from confluent_kafka import avro
|
||
|
from confluent_kafka.avro import AvroProducer
|
||
|
|
||
|
|
||
|
def parse_command_line_args():
|
||
|
arg_parser = ArgumentParser()
|
||
|
|
||
|
arg_parser.add_argument("--topic", required=True, help="Topic name")
|
||
|
arg_parser.add_argument(
|
||
|
"--bootstrap-servers",
|
||
|
required=False,
|
||
|
default="localhost:9092",
|
||
|
help="Bootstrap server address",
|
||
|
)
|
||
|
arg_parser.add_argument(
|
||
|
"--schema-registry",
|
||
|
required=False,
|
||
|
default="http://localhost:8081",
|
||
|
help="Schema Registry url",
|
||
|
)
|
||
|
arg_parser.add_argument(
|
||
|
"--record-key",
|
||
|
required=False,
|
||
|
type=str,
|
||
|
help="Record key. If not provided, will be a random UUID",
|
||
|
)
|
||
|
arg_parser.add_argument(
|
||
|
"--key-schema-file", required=False, help="File name of key Avro schema to use"
|
||
|
)
|
||
|
arg_parser.add_argument("--record-value", required=True, help="Record value")
|
||
|
arg_parser.add_argument(
|
||
|
"--value-schema-file", required=True, help="File name of Avro schema to use"
|
||
|
)
|
||
|
|
||
|
return arg_parser.parse_args()
|
||
|
|
||
|
|
||
|
def load_avro_schema_from_file(
|
||
|
key_schema_file: str, value_schema_file: str
|
||
|
) -> Tuple[Schema, Schema]:
|
||
|
key_schema = avro.load(key_schema_file)
|
||
|
value_schema = avro.load(value_schema_file)
|
||
|
|
||
|
return key_schema, value_schema
|
||
|
|
||
|
|
||
|
def send_record(args):
|
||
|
key_schema, value_schema = load_avro_schema_from_file(
|
||
|
args.key_schema_file, args.value_schema_file
|
||
|
)
|
||
|
|
||
|
producer_config = {
|
||
|
"bootstrap.servers": args.bootstrap_servers,
|
||
|
"schema.registry.url": args.schema_registry,
|
||
|
}
|
||
|
|
||
|
producer = AvroProducer(
|
||
|
producer_config,
|
||
|
default_key_schema=key_schema,
|
||
|
default_value_schema=value_schema,
|
||
|
)
|
||
|
|
||
|
key = json.loads(args.record_key) if args.record_key else str(uuid.uuid4())
|
||
|
value = json.loads(args.record_value)
|
||
|
|
||
|
try:
|
||
|
producer.produce(topic=args.topic, key=key, value=value)
|
||
|
except Exception as e:
|
||
|
print(
|
||
|
f"Exception while producing record value - {value} to topic - {args.topic}: {e}"
|
||
|
)
|
||
|
else:
|
||
|
print(f"Successfully producing record value - {value} to topic - {args.topic}")
|
||
|
|
||
|
producer.flush()
|
||
|
|
||
|
|
||
|
"""
|
||
|
Creates a Key-Value topic by sending a record to it
|
||
|
"""
|
||
|
if __name__ == "__main__":
|
||
|
send_record(parse_command_line_args())
|