mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-03 23:28:11 +00:00
46 lines
1.5 KiB
Python
46 lines
1.5 KiB
Python
![]() |
from typing import List
|
||
|
|
||
|
import datahub.emitter.mce_builder as builder
|
||
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
||
|
from datahub.emitter.rest_emitter import DatahubRestEmitter
|
||
|
from datahub.metadata.com.linkedin.pegasus2avro.datajob import DataJobInputOutputClass
|
||
|
|
||
|
# Construct the DataJobInputOutput aspect.
|
||
|
input_datasets: List[str] = [
|
||
|
builder.make_dataset_urn(platform="mysql", name="librarydb.member", env="PROD"),
|
||
|
builder.make_dataset_urn(platform="mysql", name="librarydb.checkout", env="PROD"),
|
||
|
]
|
||
|
|
||
|
output_datasets: List[str] = [
|
||
|
builder.make_dataset_urn(
|
||
|
platform="kafka", name="debezium.topics.librarydb.member_checkout", env="PROD"
|
||
|
)
|
||
|
]
|
||
|
|
||
|
input_data_jobs: List[str] = [
|
||
|
builder.make_data_job_urn(
|
||
|
orchestrator="airflow", flow_id="flow1", job_id="job0", cluster="PROD"
|
||
|
)
|
||
|
]
|
||
|
|
||
|
datajob_input_output = DataJobInputOutputClass(
|
||
|
inputDatasets=input_datasets,
|
||
|
outputDatasets=output_datasets,
|
||
|
inputDatajobs=input_data_jobs,
|
||
|
)
|
||
|
|
||
|
# Construct a MetadataChangeProposalWrapper object.
|
||
|
# NOTE: This will overwrite all of the existing lineage information associated with this job.
|
||
|
datajob_input_output_mcp = MetadataChangeProposalWrapper(
|
||
|
entityUrn=builder.make_data_job_urn(
|
||
|
orchestrator="airflow", flow_id="flow1", job_id="job1", cluster="PROD"
|
||
|
),
|
||
|
aspect=datajob_input_output,
|
||
|
)
|
||
|
|
||
|
# Create an emitter to the GMS REST API.
|
||
|
emitter = DatahubRestEmitter("http://localhost:8080")
|
||
|
|
||
|
# Emit metadata!
|
||
|
emitter.emit_mcp(datajob_input_output_mcp)
|