datahub/metadata-ingestion/examples/library/lineage_emitter_kafka.py

37 lines
994 B
Python

import datahub.emitter.mce_builder as builder
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig
# Construct a lineage object.
lineage_mce = builder.make_lineage_mce(
[
builder.make_dataset_urn("bigquery", "upstream1"),
builder.make_dataset_urn("bigquery", "upstream2"),
],
builder.make_dataset_urn("bigquery", "downstream"),
)
# Create an emitter to DataHub's Kafka broker.
emitter = DatahubKafkaEmitter(
KafkaEmitterConfig.parse_obj(
# This is the same config format as the standard Kafka sink's YAML.
{
"connection": {
"bootstrap": "broker:9092",
"producer_config": {},
"schema_registry_url": "http://schema-registry:8081",
}
}
)
)
# Emit metadata!
def callback(err, msg):
if err:
# Handle the metadata emission error.
print("error:", err)
emitter.emit_mce_async(lineage_mce, callback)
emitter.flush()