diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index b9b3ec56d8..10b8cbf5a3 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -518,10 +518,8 @@ sink: In some cases, you might want to construct the MetadataChangeEvents yourself but still use this framework to emit that metadata to DataHub. In this case, take a look at the emitter interfaces, which can easily be imported and called from your own code. -- [DataHub emitter via REST](./src/datahub/emitter/rest_emitter.py) (same requirements as `datahub-rest`) -- [DataHub emitter via Kafka](./src/datahub/emitter/kafka_emitter.py) (same requirements as `datahub-kafka`) - -For a basic usage example, see the [lineage_emitter.py](./examples/library/lineage_emitter.py) example. +- [DataHub emitter via REST](./src/datahub/emitter/rest_emitter.py) (same requirements as `datahub-rest`). Basic usage [example](./examples/library/lineage_emitter_rest.py). +- [DataHub emitter via Kafka](./src/datahub/emitter/kafka_emitter.py) (same requirements as `datahub-kafka`). Basic usage [example](./examples/library/lineage_emitter_kafka.py). ## Usage with Airflow diff --git a/metadata-ingestion/examples/library/lineage_emitter_kafka.py b/metadata-ingestion/examples/library/lineage_emitter_kafka.py new file mode 100644 index 0000000000..81a294908e --- /dev/null +++ b/metadata-ingestion/examples/library/lineage_emitter_kafka.py @@ -0,0 +1,36 @@ +import datahub.emitter.mce_builder as builder +from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig + +# Construct a lineage object. +lineage_mce = builder.make_lineage_mce( + [ + builder.make_dataset_urn("bigquery", "upstream1"), + builder.make_dataset_urn("bigquery", "upstream2"), + ], + builder.make_dataset_urn("bigquery", "downstream"), +) + +# Create an emitter to DataHub's Kafka broker. +emitter = DatahubKafkaEmitter( + KafkaEmitterConfig.parse_obj( + # This is the same config format as the standard Kafka sink's YAML. + { + "connection": { + "bootstrap": "broker:9092", + "producer_config": {}, + "schema_registry_url": "http://schema-registry:8081", + } + } + ) +) + + +# Emit metadata! +def callback(err, msg): + if err: + # Handle the metadata emission error. + print("error:", err) + + +emitter.emit_mce_async(lineage_mce, callback) +emitter.flush() diff --git a/metadata-ingestion/examples/library/lineage_emitter.py b/metadata-ingestion/examples/library/lineage_emitter_rest.py similarity index 100% rename from metadata-ingestion/examples/library/lineage_emitter.py rename to metadata-ingestion/examples/library/lineage_emitter_rest.py