mirror of
https://github.com/datahub-project/datahub.git
synced 2025-08-01 13:58:01 +00:00
64 lines
2.4 KiB
Python
64 lines
2.4 KiB
Python
![]() |
#! /usr/bin/python
|
||
|
import sys
|
||
|
import time
|
||
|
from kazoo.client import KazooClient
|
||
|
from confluent.schemaregistry.client import CachedSchemaRegistryClient
|
||
|
|
||
|
ZOOKEEPER='localhost:2181'
|
||
|
AVROLOADPATH = '../../metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeEvent.avsc'
|
||
|
KAFKATOPIC = 'MetadataChangeEvent'
|
||
|
BOOTSTRAP = 'localhost:9092'
|
||
|
SCHEMAREGISTRY = 'http://localhost:8081'
|
||
|
|
||
|
|
||
|
def build_kafka_dataset_mce(dataset_name, schema, schema_version):
|
||
|
"""
|
||
|
Create the MetadataChangeEvent via dataset_name and schema.
|
||
|
"""
|
||
|
actor, sys_time = "urn:li:corpuser:", long(time.time())
|
||
|
schema_name = {"schemaName":dataset_name,"platform":"urn:li:dataPlatform:kafka","version":schema_version,"created":{"time":sys_time,"actor":actor},
|
||
|
"lastModified":{"time":sys_time,"actor":actor},"hash":"","platformSchema":{"documentSchema": schema},
|
||
|
"fields":[{"fieldPath":"","description":"","nativeDataType":"string","type":{"type":{"com.linkedin.pegasus2avro.schema.StringType":{}}}}]}
|
||
|
|
||
|
mce = {"auditHeader": None,
|
||
|
"proposedSnapshot":("com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot",
|
||
|
{"urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,"+ dataset_name +",PROD)","aspects": [schema_name]}),
|
||
|
"proposedDelta": None}
|
||
|
|
||
|
produce_kafka_dataset_mce(mce)
|
||
|
|
||
|
def produce_kafka_dataset_mce(mce):
|
||
|
"""
|
||
|
Produce MetadataChangeEvent records.
|
||
|
"""
|
||
|
from confluent_kafka import avro
|
||
|
from confluent_kafka.avro import AvroProducer
|
||
|
|
||
|
conf = {'bootstrap.servers': BOOTSTRAP,
|
||
|
'schema.registry.url': SCHEMAREGISTRY}
|
||
|
record_schema = avro.load(AVROLOADPATH)
|
||
|
producer = AvroProducer(conf, default_value_schema=record_schema)
|
||
|
|
||
|
try:
|
||
|
producer.produce(topic=KAFKATOPIC, value=mce)
|
||
|
producer.poll(0)
|
||
|
sys.stdout.write('\n%s has been successfully produced!\n' % mce)
|
||
|
except ValueError as e:
|
||
|
sys.stdout.write('Message serialization failed %s' % e)
|
||
|
producer.flush()
|
||
|
|
||
|
zk = KazooClient(ZOOKEEPER)
|
||
|
zk.start()
|
||
|
client = CachedSchemaRegistryClient(SCHEMAREGISTRY)
|
||
|
|
||
|
topics = zk.get_children("/brokers/topics")
|
||
|
|
||
|
for dataset_name in topics:
|
||
|
if dataset_name.startswith('_'):
|
||
|
continue
|
||
|
topic = dataset_name + '-value'
|
||
|
schema_id, schema, schema_version = client.get_latest_schema(topic)
|
||
|
print topic
|
||
|
build_kafka_dataset_mce(dataset_name, str(schema), int(schema_version))
|
||
|
|
||
|
sys.exit(0)
|