feat(ingest): add Kafka-based emitter example (#2412)

This commit is contained in:
Harshal Sheth 2021-04-16 17:54:50 -07:00 committed by GitHub
parent 43075c4a91
commit 27a672e697
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 38 additions and 4 deletions

View File

@ -518,10 +518,8 @@ sink:
In some cases, you might want to construct the MetadataChangeEvents yourself but still use this framework to emit that metadata to DataHub. In this case, take a look at the emitter interfaces, which can easily be imported and called from your own code.
- [DataHub emitter via REST](./src/datahub/emitter/rest_emitter.py) (same requirements as `datahub-rest`)
- [DataHub emitter via Kafka](./src/datahub/emitter/kafka_emitter.py) (same requirements as `datahub-kafka`)
For a basic usage example, see the [lineage_emitter.py](./examples/library/lineage_emitter.py) example.
- [DataHub emitter via REST](./src/datahub/emitter/rest_emitter.py) (same requirements as `datahub-rest`). Basic usage [example](./examples/library/lineage_emitter_rest.py).
- [DataHub emitter via Kafka](./src/datahub/emitter/kafka_emitter.py) (same requirements as `datahub-kafka`). Basic usage [example](./examples/library/lineage_emitter_kafka.py).
## Usage with Airflow

View File

@ -0,0 +1,36 @@
import datahub.emitter.mce_builder as builder
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig
# Construct a lineage object.
lineage_mce = builder.make_lineage_mce(
[
builder.make_dataset_urn("bigquery", "upstream1"),
builder.make_dataset_urn("bigquery", "upstream2"),
],
builder.make_dataset_urn("bigquery", "downstream"),
)
# Create an emitter to DataHub's Kafka broker.
emitter = DatahubKafkaEmitter(
KafkaEmitterConfig.parse_obj(
# This is the same config format as the standard Kafka sink's YAML.
{
"connection": {
"bootstrap": "broker:9092",
"producer_config": {},
"schema_registry_url": "http://schema-registry:8081",
}
}
)
)
# Emit metadata!
def callback(err, msg):
if err:
# Handle the metadata emission error.
print("error:", err)
emitter.emit_mce_async(lineage_mce, callback)
emitter.flush()