mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-10-25 07:54:37 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			50 lines
		
	
	
		
			1.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			50 lines
		
	
	
		
			1.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from typing import List
 | |
| 
 | |
| import datahub.emitter.mce_builder as builder
 | |
| from datahub.emitter.mcp import MetadataChangeProposalWrapper
 | |
| from datahub.emitter.rest_emitter import DatahubRestEmitter
 | |
| from datahub.metadata.com.linkedin.pegasus2avro.datajob import DataJobInputOutputClass
 | |
| from datahub.metadata.schema_classes import ChangeTypeClass
 | |
| 
 | |
| # Construct the DataJobInputOutput aspect.
 | |
| input_datasets: List[str] = [
 | |
|     builder.make_dataset_urn(platform="mysql", name="librarydb.member", env="PROD"),
 | |
|     builder.make_dataset_urn(platform="mysql", name="librarydb.checkout", env="PROD"),
 | |
| ]
 | |
| 
 | |
| output_datasets: List[str] = [
 | |
|     builder.make_dataset_urn(
 | |
|         platform="kafka", name="debezium.topics.librarydb.member_checkout", env="PROD"
 | |
|     )
 | |
| ]
 | |
| 
 | |
| input_data_jobs: List[str] = [
 | |
|     builder.make_data_job_urn(
 | |
|         orchestrator="airflow", flow_id="flow1", job_id="job0", cluster="PROD"
 | |
|     )
 | |
| ]
 | |
| 
 | |
| datajob_input_output = DataJobInputOutputClass(
 | |
|     inputDatasets=input_datasets,
 | |
|     outputDatasets=output_datasets,
 | |
|     inputDatajobs=input_data_jobs,
 | |
| )
 | |
| 
 | |
| # Construct a MetadataChangeProposalWrapper object.
 | |
| # NOTE: This will overwrite all of the existing lineage information associated with this job.
 | |
| datajob_input_output_mcp = MetadataChangeProposalWrapper(
 | |
|     entityType="datajob",
 | |
|     changeType=ChangeTypeClass.UPSERT,
 | |
|     entityUrn=builder.make_data_job_urn(
 | |
|         orchestrator="airflow", flow_id="flow1", job_id="job1", cluster="PROD"
 | |
|     ),
 | |
|     aspectName="dataJobInputOutput",
 | |
|     aspect=datajob_input_output,
 | |
| )
 | |
| 
 | |
| # Create an emitter to the GMS REST API.
 | |
| emitter = DatahubRestEmitter("http://localhost:8080")
 | |
| 
 | |
| # Emit metadata!
 | |
| emitter.emit_mcp(datajob_input_output_mcp)
 | 
