mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-10-31 10:49:00 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			86 lines
		
	
	
		
			2.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			86 lines
		
	
	
		
			2.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import datahub.emitter.mce_builder as builder
 | |
| from datahub.emitter.mcp import MetadataChangeProposalWrapper
 | |
| from datahub.emitter.rest_emitter import DatahubRestEmitter
 | |
| from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
 | |
|     DatasetLineageType,
 | |
|     FineGrainedLineage,
 | |
|     FineGrainedLineageDownstreamType,
 | |
|     FineGrainedLineageUpstreamType,
 | |
|     Upstream,
 | |
|     UpstreamLineage,
 | |
| )
 | |
| 
 | |
| 
 | |
| def datasetUrn(tbl):
 | |
|     return builder.make_dataset_urn("postgres", tbl)
 | |
| 
 | |
| 
 | |
| def fldUrn(tbl, fld):
 | |
|     return builder.make_schema_field_urn(datasetUrn(tbl), fld)
 | |
| 
 | |
| 
 | |
| # Lineage of fields in a dataset
 | |
| # c1      <-- unknownFunc(bar2.c1, bar4.c1)
 | |
| # c2      <-- myfunc(bar3.c2)
 | |
| # {c3,c4} <-- unknownFunc(bar2.c2, bar2.c3, bar3.c1)
 | |
| # c5      <-- unknownFunc(bar3)
 | |
| # {c6,c7} <-- unknownFunc(bar4)
 | |
| 
 | |
| # note that the semantic of the "transformOperation" value is contextual.
 | |
| # In above example, it is regarded as some kind of UDF; but it could also be an expression etc.
 | |
| 
 | |
| fineGrainedLineages = [
 | |
|     FineGrainedLineage(
 | |
|         upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
 | |
|         upstreams=[fldUrn("bar2", "c1"), fldUrn("bar4", "c1")],
 | |
|         downstreamType=FineGrainedLineageDownstreamType.FIELD,
 | |
|         downstreams=[fldUrn("bar", "c1")],
 | |
|     ),
 | |
|     FineGrainedLineage(
 | |
|         upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
 | |
|         upstreams=[fldUrn("bar3", "c2")],
 | |
|         downstreamType=FineGrainedLineageDownstreamType.FIELD,
 | |
|         downstreams=[fldUrn("bar", "c2")],
 | |
|         confidenceScore=0.8,
 | |
|         transformOperation="myfunc",
 | |
|     ),
 | |
|     FineGrainedLineage(
 | |
|         upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
 | |
|         upstreams=[fldUrn("bar2", "c2"), fldUrn("bar2", "c3"), fldUrn("bar3", "c1")],
 | |
|         downstreamType=FineGrainedLineageDownstreamType.FIELD_SET,
 | |
|         downstreams=[fldUrn("bar", "c3"), fldUrn("bar", "c4")],
 | |
|         confidenceScore=0.7,
 | |
|     ),
 | |
|     FineGrainedLineage(
 | |
|         upstreamType=FineGrainedLineageUpstreamType.DATASET,
 | |
|         upstreams=[datasetUrn("bar3")],
 | |
|         downstreamType=FineGrainedLineageDownstreamType.FIELD,
 | |
|         downstreams=[fldUrn("bar", "c5")],
 | |
|     ),
 | |
|     FineGrainedLineage(
 | |
|         upstreamType=FineGrainedLineageUpstreamType.DATASET,
 | |
|         upstreams=[datasetUrn("bar4")],
 | |
|         downstreamType=FineGrainedLineageDownstreamType.FIELD_SET,
 | |
|         downstreams=[fldUrn("bar", "c6"), fldUrn("bar", "c7")],
 | |
|     ),
 | |
| ]
 | |
| 
 | |
| 
 | |
| # this is just to check if any conflicts with existing Upstream, particularly the DownstreamOf relationship
 | |
| upstream = Upstream(dataset=datasetUrn("bar2"), type=DatasetLineageType.TRANSFORMED)
 | |
| 
 | |
| fieldLineages = UpstreamLineage(
 | |
|     upstreams=[upstream], fineGrainedLineages=fineGrainedLineages
 | |
| )
 | |
| 
 | |
| lineageMcp = MetadataChangeProposalWrapper(
 | |
|     entityUrn=datasetUrn("bar"),
 | |
|     aspect=fieldLineages,
 | |
| )
 | |
| 
 | |
| # Create an emitter to the GMS REST API.
 | |
| emitter = DatahubRestEmitter("http://localhost:8080")
 | |
| 
 | |
| # Emit metadata!
 | |
| emitter.emit_mcp(lineageMcp)
 | 
