datahub/metadata-ingestion/examples/library/lineage_emitter_dataset_finegrained_sample.py

54 lines
1.5 KiB
Python
Raw Permalink Normal View History

import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageType,
FineGrainedLineage,
FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamType,
Upstream,
UpstreamLineage,
)
def datasetUrn(tbl):
return builder.make_dataset_urn("hive", tbl)
def fldUrn(tbl, fld):
return builder.make_schema_field_urn(datasetUrn(tbl), fld)
fineGrainedLineages = [
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[
fldUrn("fct_users_deleted", "browser_id"),
fldUrn("fct_users_created", "user_id"),
],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("logging_events", "browser")],
),
]
# this is just to check if any conflicts with existing Upstream, particularly the DownstreamOf relationship
upstream = Upstream(
dataset=datasetUrn("fct_users_deleted"), type=DatasetLineageType.TRANSFORMED
)
fieldLineages = UpstreamLineage(
upstreams=[upstream], fineGrainedLineages=fineGrainedLineages
)
lineageMcp = MetadataChangeProposalWrapper(
entityUrn=datasetUrn("logging_events"),
aspect=fieldLineages,
)
# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://localhost:8080")
# Emit metadata!
emitter.emit_mcp(lineageMcp)