datahub/metadata-ingestion/tests/unit/patch/test_patch_builder.py

128 lines
4.4 KiB
Python
Raw Normal View History

import json
import pathlib
import pytest
from datahub.emitter.mce_builder import (
make_chart_urn,
make_dashboard_urn,
make_dataset_urn,
make_tag_urn,
)
from datahub.ingestion.sink.file import write_metadata_file
from datahub.metadata.schema_classes import (
DatasetLineageTypeClass,
FineGrainedLineageClass,
FineGrainedLineageDownstreamTypeClass,
FineGrainedLineageUpstreamTypeClass,
GenericAspectClass,
MetadataChangeProposalClass,
TagAssociationClass,
UpstreamClass,
)
from datahub.specific.chart import ChartPatchBuilder
from datahub.specific.dashboard import DashboardPatchBuilder
from datahub.specific.dataset import DatasetPatchBuilder
def test_basic_dataset_patch_builder():
patcher = DatasetPatchBuilder(
make_dataset_urn(platform="hive", name="fct_users_created", env="PROD")
).add_tag(TagAssociationClass(tag=make_tag_urn("test_tag")))
assert patcher.build() == [
MetadataChangeProposalClass(
entityType="dataset",
entityUrn="urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
changeType="PATCH",
aspectName="globalTags",
aspect=GenericAspectClass(
value=b'[{"op": "add", "path": "/tags/urn:li:tag:test_tag", "value": {"tag": "urn:li:tag:test_tag"}}]',
contentType="application/json-patch+json",
),
),
]
def test_complex_dataset_patch(
pytestconfig: pytest.Config, tmp_path: pathlib.Path
) -> None:
patcher = (
DatasetPatchBuilder(
make_dataset_urn(platform="hive", name="fct_users_created", env="PROD")
)
.set_description("test description")
.add_custom_property("test_key_1", "test_value_1")
.add_custom_property("test_key_2", "test_value_2")
.add_tag(TagAssociationClass(tag=make_tag_urn("test_tag")))
.add_upstream_lineage(
upstream=UpstreamClass(
dataset=make_dataset_urn(
platform="hive", name="fct_users_created_upstream", env="PROD"
),
type=DatasetLineageTypeClass.TRANSFORMED,
)
)
.add_fine_grained_upstream_lineage(
fine_grained_lineage=FineGrainedLineageClass(
upstreamType=FineGrainedLineageUpstreamTypeClass.DATASET,
upstreams=[
make_dataset_urn(
platform="hive", name="fct_users_created_upstream", env="PROD"
)
],
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD_SET,
transformOperation="TRANSFORM",
confidenceScore=1.0,
)
)
)
patcher.for_field("field1").add_tag(TagAssociationClass(tag=make_tag_urn("tag1")))
out_path = tmp_path / "patch.json"
write_metadata_file(out_path, patcher.build())
assert json.loads(out_path.read_text()) == json.loads(
(
pytestconfig.rootpath / "tests/unit/patch/complex_dataset_patch.json"
).read_text()
)
def test_basic_chart_patch_builder():
patcher = ChartPatchBuilder(
make_chart_urn(platform="hive", name="fct_users_created")
).add_tag(TagAssociationClass(tag=make_tag_urn("test_tag")))
assert patcher.build() == [
MetadataChangeProposalClass(
entityType="chart",
entityUrn="urn:li:chart:(hive,fct_users_created)",
changeType="PATCH",
aspectName="globalTags",
aspect=GenericAspectClass(
value=b'[{"op": "add", "path": "/tags/urn:li:tag:test_tag", "value": {"tag": "urn:li:tag:test_tag"}}]',
contentType="application/json-patch+json",
),
),
]
def test_basic_dashboard_patch_builder():
patcher = DashboardPatchBuilder(
make_dashboard_urn(platform="hive", name="fct_users_created")
).add_tag(TagAssociationClass(tag=make_tag_urn("test_tag")))
assert patcher.build() == [
MetadataChangeProposalClass(
entityType="dashboard",
entityUrn="urn:li:dashboard:(hive,fct_users_created)",
changeType="PATCH",
aspectName="globalTags",
aspect=GenericAspectClass(
value=b'[{"op": "add", "path": "/tags/urn:li:tag:test_tag", "value": {"tag": "urn:li:tag:test_tag"}}]',
contentType="application/json-patch+json",
),
),
]