fix(metadata-ingestion): pass schema_record to mce-cli cosumer (#1646)

This commit is contained in:
Chris Lee 2020-04-24 14:34:16 -07:00 committed by GitHub
parent ec5fbbc496
commit 2a59070d54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -42,7 +42,7 @@ def produce(conf, data_file, schema_record):
producer.flush()
def consume(conf):
def consume(conf, schema_record):
"""
Consume MetadataChangeEvent records
"""
@ -51,7 +51,7 @@ def consume(conf):
print("Consuming MetadataChangeEvent records from topic {} with group {}. ^c to exit.".format(topic, conf["group.id"]))
c = AvroConsumer(conf, reader_value_schema=record_schema)
c = AvroConsumer(conf, reader_value_schema=avro.load(schema_record))
c.subscribe([topic])
while True:
@ -90,7 +90,7 @@ def main(args):
# Fallback to earliest to ensure all messages are consumed
conf['group.id'] = topic
conf['auto.offset.reset'] = "earliest"
consume(conf)
consume(conf, args.schema_record)
if __name__ == '__main__':
@ -105,4 +105,4 @@ if __name__ == '__main__':
help="Avro schema record; required if running 'producer' mode")
parser.add_argument('-d', dest="data_file", default="bootstrap_mce.dat",
help="MCE data file; required if running 'producer' mode")
main(parser.parse_args())
main(parser.parse_args())