| 
									
										
										
										
											2022-02-24 14:36:19 -08:00
										 |  |  | import datahub.emitter.mce_builder as builder | 
					
						
							|  |  |  | from datahub.emitter.mcp import MetadataChangeProposalWrapper | 
					
						
							|  |  |  | from datahub.emitter.rest_emitter import DatahubRestEmitter | 
					
						
							|  |  |  | from datahub.metadata.com.linkedin.pegasus2avro.dataset import ( | 
					
						
							|  |  |  |     FineGrainedLineage, | 
					
						
							|  |  |  |     FineGrainedLineageDownstreamType, | 
					
						
							|  |  |  |     FineGrainedLineageUpstreamType, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-12-30 01:26:47 -05:00
										 |  |  | from datahub.metadata.schema_classes import DataJobInputOutputClass | 
					
						
							| 
									
										
										
										
											2022-02-24 14:36:19 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-03-30 01:55:51 +05:30
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-02-24 14:36:19 -08:00
										 |  |  | def datasetUrn(tbl): | 
					
						
							|  |  |  |     return builder.make_dataset_urn("postgres", tbl) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-03-30 01:55:51 +05:30
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-02-24 14:36:19 -08:00
										 |  |  | def fldUrn(tbl, fld): | 
					
						
							| 
									
										
										
										
											2022-03-30 01:55:51 +05:30
										 |  |  |     return builder.make_schema_field_urn(datasetUrn(tbl), fld) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-02-24 14:36:19 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-02-25 21:05:48 +05:30
										 |  |  | # Lineage of fields output by a job | 
					
						
							|  |  |  | # bar.c1          <-- unknownFunc(bar2.c1, bar4.c1) | 
					
						
							|  |  |  | # bar.c2          <-- myfunc(bar3.c2) | 
					
						
							|  |  |  | # {bar.c3,bar.c4} <-- unknownFunc(bar2.c2, bar2.c3, bar3.c1) | 
					
						
							|  |  |  | # bar.c5          <-- unknownFunc(bar3) | 
					
						
							|  |  |  | # {bar.c6,bar.c7} <-- unknownFunc(bar4) | 
					
						
							|  |  |  | # bar2.c9 has no upstream i.e. its values are somehow created independently within this job. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # Note that the semantic of the "transformOperation" value is contextual. | 
					
						
							| 
									
										
										
										
											2022-02-24 14:36:19 -08:00
										 |  |  | # In above example, it is regarded as some kind of UDF; but it could also be an expression etc. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-03-30 01:55:51 +05:30
										 |  |  | fineGrainedLineages = [ | 
					
						
							|  |  |  |     FineGrainedLineage( | 
					
						
							|  |  |  |         upstreamType=FineGrainedLineageUpstreamType.FIELD_SET, | 
					
						
							|  |  |  |         upstreams=[fldUrn("bar2", "c1"), fldUrn("bar4", "c1")], | 
					
						
							|  |  |  |         downstreamType=FineGrainedLineageDownstreamType.FIELD, | 
					
						
							|  |  |  |         downstreams=[fldUrn("bar", "c1")], | 
					
						
							|  |  |  |     ), | 
					
						
							|  |  |  |     FineGrainedLineage( | 
					
						
							|  |  |  |         upstreamType=FineGrainedLineageUpstreamType.FIELD_SET, | 
					
						
							|  |  |  |         upstreams=[fldUrn("bar3", "c2")], | 
					
						
							|  |  |  |         downstreamType=FineGrainedLineageDownstreamType.FIELD, | 
					
						
							|  |  |  |         downstreams=[fldUrn("bar", "c2")], | 
					
						
							|  |  |  |         confidenceScore=0.8, | 
					
						
							|  |  |  |         transformOperation="myfunc", | 
					
						
							|  |  |  |     ), | 
					
						
							|  |  |  |     FineGrainedLineage( | 
					
						
							|  |  |  |         upstreamType=FineGrainedLineageUpstreamType.FIELD_SET, | 
					
						
							|  |  |  |         upstreams=[fldUrn("bar2", "c2"), fldUrn("bar2", "c3"), fldUrn("bar3", "c1")], | 
					
						
							|  |  |  |         downstreamType=FineGrainedLineageDownstreamType.FIELD_SET, | 
					
						
							|  |  |  |         downstreams=[fldUrn("bar", "c3"), fldUrn("bar", "c4")], | 
					
						
							|  |  |  |         confidenceScore=0.7, | 
					
						
							|  |  |  |     ), | 
					
						
							|  |  |  |     FineGrainedLineage( | 
					
						
							|  |  |  |         upstreamType=FineGrainedLineageUpstreamType.DATASET, | 
					
						
							|  |  |  |         upstreams=[datasetUrn("bar3")], | 
					
						
							|  |  |  |         downstreamType=FineGrainedLineageDownstreamType.FIELD, | 
					
						
							|  |  |  |         downstreams=[fldUrn("bar", "c5")], | 
					
						
							|  |  |  |     ), | 
					
						
							|  |  |  |     FineGrainedLineage( | 
					
						
							|  |  |  |         upstreamType=FineGrainedLineageUpstreamType.DATASET, | 
					
						
							|  |  |  |         upstreams=[datasetUrn("bar4")], | 
					
						
							|  |  |  |         downstreamType=FineGrainedLineageDownstreamType.FIELD_SET, | 
					
						
							|  |  |  |         downstreams=[fldUrn("bar", "c6"), fldUrn("bar", "c7")], | 
					
						
							|  |  |  |     ), | 
					
						
							|  |  |  |     FineGrainedLineage( | 
					
						
							|  |  |  |         upstreamType=FineGrainedLineageUpstreamType.NONE, | 
					
						
							|  |  |  |         upstreams=[], | 
					
						
							|  |  |  |         downstreamType=FineGrainedLineageDownstreamType.FIELD, | 
					
						
							|  |  |  |         downstreams=[fldUrn("bar2", "c9")], | 
					
						
							|  |  |  |     ), | 
					
						
							|  |  |  | ] | 
					
						
							| 
									
										
										
										
											2022-02-24 14:36:19 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-02-25 21:05:48 +05:30
										 |  |  | # The lineage of output col bar.c9 is unknown. So there is no lineage for it above. | 
					
						
							| 
									
										
										
										
											2022-02-24 14:36:19 -08:00
										 |  |  | # Note that bar2 is an input as well as an output dataset, but some fields are inputs while other fields are outputs. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | dataJobInputOutput = DataJobInputOutputClass( | 
					
						
							| 
									
										
										
										
											2022-03-30 01:55:51 +05:30
										 |  |  |     inputDatasets=[datasetUrn("bar2"), datasetUrn("bar3"), datasetUrn("bar4")], | 
					
						
							|  |  |  |     outputDatasets=[datasetUrn("bar"), datasetUrn("bar2")], | 
					
						
							| 
									
										
										
										
											2022-02-24 14:36:19 -08:00
										 |  |  |     inputDatajobs=None, | 
					
						
							| 
									
										
										
										
											2022-03-30 01:55:51 +05:30
										 |  |  |     inputDatasetFields=[ | 
					
						
							|  |  |  |         fldUrn("bar2", "c1"), | 
					
						
							|  |  |  |         fldUrn("bar2", "c2"), | 
					
						
							|  |  |  |         fldUrn("bar2", "c3"), | 
					
						
							|  |  |  |         fldUrn("bar3", "c1"), | 
					
						
							|  |  |  |         fldUrn("bar3", "c2"), | 
					
						
							|  |  |  |         fldUrn("bar4", "c1"), | 
					
						
							|  |  |  |     ], | 
					
						
							|  |  |  |     outputDatasetFields=[ | 
					
						
							|  |  |  |         fldUrn("bar", "c1"), | 
					
						
							|  |  |  |         fldUrn("bar", "c2"), | 
					
						
							|  |  |  |         fldUrn("bar", "c3"), | 
					
						
							|  |  |  |         fldUrn("bar", "c4"), | 
					
						
							|  |  |  |         fldUrn("bar", "c5"), | 
					
						
							|  |  |  |         fldUrn("bar", "c6"), | 
					
						
							|  |  |  |         fldUrn("bar", "c7"), | 
					
						
							|  |  |  |         fldUrn("bar", "c9"), | 
					
						
							|  |  |  |         fldUrn("bar2", "c9"), | 
					
						
							|  |  |  |     ], | 
					
						
							|  |  |  |     fineGrainedLineages=fineGrainedLineages, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-02-24 14:36:19 -08:00
										 |  |  | 
 | 
					
						
							|  |  |  | dataJobLineageMcp = MetadataChangeProposalWrapper( | 
					
						
							|  |  |  |     entityUrn=builder.make_data_job_urn("spark", "Flow1", "Task1"), | 
					
						
							| 
									
										
										
										
											2022-03-30 01:55:51 +05:30
										 |  |  |     aspect=dataJobInputOutput, | 
					
						
							| 
									
										
										
										
											2022-02-24 14:36:19 -08:00
										 |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-02-25 21:05:48 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  | # Create an emitter to the GMS REST API. | 
					
						
							|  |  |  | emitter = DatahubRestEmitter("http://localhost:8080") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # Emit metadata! | 
					
						
							| 
									
										
										
										
											2022-03-30 01:55:51 +05:30
										 |  |  | emitter.emit_mcp(dataJobLineageMcp) |