mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-03 23:28:11 +00:00
63 lines
2.1 KiB
Python
63 lines
2.1 KiB
Python
![]() |
from datahub.emitter.mce_builder import make_dataset_urn, make_schema_field_urn
|
||
|
from datahub.ingestion.graph.client import DataHubGraph, DataHubGraphConfig
|
||
|
from datahub.metadata.schema_classes import (
|
||
|
DatasetLineageTypeClass,
|
||
|
FineGrainedLineageClass,
|
||
|
FineGrainedLineageUpstreamTypeClass,
|
||
|
UpstreamClass,
|
||
|
)
|
||
|
from datahub.specific.dataset import DatasetPatchBuilder
|
||
|
|
||
|
# Create DataHub Client
|
||
|
datahub_client = DataHubGraph(DataHubGraphConfig(server="http://localhost:8080"))
|
||
|
|
||
|
# Create Dataset URN
|
||
|
dataset_urn = make_dataset_urn(
|
||
|
platform="snowflake", name="fct_users_created", env="PROD"
|
||
|
)
|
||
|
upstream_to_remove_urn = make_dataset_urn(
|
||
|
platform="s3", name="fct_users_old", env="PROD"
|
||
|
)
|
||
|
upstream_to_add_urn = make_dataset_urn(platform="s3", name="fct_users_new", env="PROD")
|
||
|
|
||
|
# Create Dataset Patch to Add & Remove Upstream Lineage Edges
|
||
|
patch_builder = DatasetPatchBuilder(dataset_urn)
|
||
|
patch_builder.remove_upstream_lineage(upstream_to_remove_urn)
|
||
|
patch_builder.add_upstream_lineage(
|
||
|
UpstreamClass(upstream_to_add_urn, DatasetLineageTypeClass.TRANSFORMED)
|
||
|
)
|
||
|
|
||
|
# ...And also include schema field lineage
|
||
|
upstream_field_to_add_urn = make_schema_field_urn(upstream_to_add_urn, "profile_id")
|
||
|
downstream_field_to_add_urn = make_schema_field_urn(dataset_urn, "profile_id")
|
||
|
|
||
|
patch_builder.add_fine_grained_upstream_lineage(
|
||
|
FineGrainedLineageClass(
|
||
|
FineGrainedLineageUpstreamTypeClass.FIELD_SET,
|
||
|
FineGrainedLineageUpstreamTypeClass.FIELD_SET,
|
||
|
[upstream_field_to_add_urn],
|
||
|
[downstream_field_to_add_urn],
|
||
|
)
|
||
|
)
|
||
|
|
||
|
upstream_field_to_remove_urn = make_schema_field_urn(
|
||
|
upstream_to_remove_urn, "profile_id"
|
||
|
)
|
||
|
downstream_field_to_remove_urn = make_schema_field_urn(dataset_urn, "profile_id")
|
||
|
|
||
|
patch_builder.remove_fine_grained_upstream_lineage(
|
||
|
FineGrainedLineageClass(
|
||
|
FineGrainedLineageUpstreamTypeClass.FIELD_SET,
|
||
|
FineGrainedLineageUpstreamTypeClass.FIELD_SET,
|
||
|
[upstream_field_to_remove_urn],
|
||
|
[downstream_field_to_remove_urn],
|
||
|
)
|
||
|
)
|
||
|
|
||
|
patch_mcps = patch_builder.build()
|
||
|
|
||
|
|
||
|
# Emit Dataset Patch
|
||
|
for patch_mcp in patch_mcps:
|
||
|
datahub_client.emit(patch_mcp)
|