datahub/metadata-ingestion/examples/library/lineage_emitter_mcpw_rest.py

37 lines
1.2 KiB
Python

from typing import List
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageTypeClass,
UpstreamClass,
UpstreamLineage,
)
upstream_table_1 = UpstreamClass(
dataset=builder.make_dataset_urn("bigquery", "upstream_table_1", "PROD"),
type=DatasetLineageTypeClass.TRANSFORMED,
)
upstream_tables: List[UpstreamClass] = [upstream_table_1]
upstream_table_2 = UpstreamClass(
dataset=builder.make_dataset_urn("bigquery", "upstream_table_2", "PROD"),
type=DatasetLineageTypeClass.TRANSFORMED,
)
upstream_tables.append(upstream_table_2)
# Construct a lineage object.
upstream_lineage = UpstreamLineage(upstreams=upstream_tables)
# Construct a MetadataChangeProposalWrapper object.
lineage_mcp = MetadataChangeProposalWrapper(
entityUrn=builder.make_dataset_urn("bigquery", "downstream"),
aspect=upstream_lineage,
)
# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://localhost:8080")
# Emit metadata!
emitter.emit_mcp(lineage_mcp)