mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-05 08:07:04 +00:00
111 lines
3.7 KiB
Python
111 lines
3.7 KiB
Python
import json
|
|
|
|
from datahub.emitter.aspect import JSON_CONTENT_TYPE, JSON_PATCH_CONTENT_TYPE
|
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
|
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
|
from datahub.metadata.schema_classes import (
|
|
ChangeTypeClass,
|
|
ContainerClass,
|
|
DatasetSnapshotClass,
|
|
GenericAspectClass,
|
|
MetadataChangeEventClass,
|
|
MetadataChangeProposalClass,
|
|
StatusClass,
|
|
UpstreamLineageClass,
|
|
)
|
|
|
|
|
|
def test_get_aspects_of_type_mcp():
|
|
aspect = StatusClass(False)
|
|
wu = MetadataChangeProposalWrapper(
|
|
entityUrn="urn:li:container:asdf", aspect=aspect
|
|
).as_workunit()
|
|
assert wu.get_aspect_of_type(StatusClass) == aspect
|
|
assert wu.get_aspect_of_type(ContainerClass) is None
|
|
|
|
|
|
def test_get_aspects_of_type_mce():
|
|
status_aspect = StatusClass(False)
|
|
status_aspect_2 = StatusClass(True)
|
|
lineage_aspect = UpstreamLineageClass(upstreams=[])
|
|
mce = MetadataChangeEventClass(
|
|
proposedSnapshot=DatasetSnapshotClass(
|
|
urn="urn:li:dataset:asdf",
|
|
aspects=[status_aspect, lineage_aspect, status_aspect_2],
|
|
)
|
|
)
|
|
wu = MetadataWorkUnit(id="id", mce=mce)
|
|
assert wu.get_aspect_of_type(StatusClass) == status_aspect_2
|
|
assert wu.get_aspect_of_type(UpstreamLineageClass) == lineage_aspect
|
|
assert wu.get_aspect_of_type(ContainerClass) is None
|
|
|
|
|
|
def test_get_aspects_of_type_mcpc():
|
|
aspect = StatusClass(False)
|
|
mcpc = MetadataChangeProposalClass(
|
|
entityUrn="urn:li:container:asdf",
|
|
entityType="container",
|
|
changeType=ChangeTypeClass.UPSERT,
|
|
aspectName=StatusClass.ASPECT_NAME,
|
|
aspect=GenericAspectClass(
|
|
value=json.dumps(aspect.to_obj()).encode(),
|
|
contentType=JSON_CONTENT_TYPE,
|
|
),
|
|
)
|
|
wu = MetadataWorkUnit(id="id", mcp_raw=mcpc)
|
|
assert wu.get_aspect_of_type(StatusClass) == aspect
|
|
assert wu.get_aspect_of_type(ContainerClass) is None
|
|
|
|
# Failure scenarios
|
|
mcpc = MetadataChangeProposalClass(
|
|
entityUrn="urn:li:container:asdf",
|
|
entityType="container",
|
|
changeType=ChangeTypeClass.UPSERT,
|
|
aspectName="not status",
|
|
aspect=GenericAspectClass(
|
|
value=json.dumps(aspect.to_obj()).encode(),
|
|
contentType=JSON_CONTENT_TYPE,
|
|
),
|
|
)
|
|
wu = MetadataWorkUnit(id="id", mcp_raw=mcpc)
|
|
assert wu.get_aspect_of_type(StatusClass) is None
|
|
|
|
mcpc = MetadataChangeProposalClass(
|
|
entityUrn="urn:li:container:asdf",
|
|
entityType="container",
|
|
changeType=ChangeTypeClass.PATCH,
|
|
aspectName=StatusClass.ASPECT_NAME,
|
|
aspect=GenericAspectClass(
|
|
value=json.dumps({"not_status": True}).encode(),
|
|
contentType=JSON_PATCH_CONTENT_TYPE,
|
|
),
|
|
)
|
|
wu = MetadataWorkUnit(id="id", mcp_raw=mcpc)
|
|
assert wu.get_aspect_of_type(StatusClass) is None
|
|
|
|
mcpc = MetadataChangeProposalClass(
|
|
entityUrn="urn:li:container:asdf",
|
|
entityType="container",
|
|
changeType=ChangeTypeClass.UPSERT,
|
|
aspectName=StatusClass.ASPECT_NAME,
|
|
aspect=GenericAspectClass(
|
|
value=(json.dumps(aspect.to_obj()) + "aaa").encode(),
|
|
contentType=JSON_CONTENT_TYPE,
|
|
),
|
|
)
|
|
wu = MetadataWorkUnit(id="id", mcp_raw=mcpc)
|
|
assert wu.get_aspect_of_type(StatusClass) is None
|
|
|
|
mcpc = MetadataChangeProposalClass(
|
|
entityUrn="urn:li:container:asdf",
|
|
entityType="container",
|
|
changeType=ChangeTypeClass.UPSERT,
|
|
aspectName=StatusClass.ASPECT_NAME,
|
|
aspect=GenericAspectClass(
|
|
value='{"ß": 2}'.encode("latin_1"),
|
|
contentType=JSON_CONTENT_TYPE,
|
|
),
|
|
)
|
|
wu = MetadataWorkUnit(id="id", mcp_raw=mcpc)
|
|
assert wu.get_aspect_of_type(StatusClass) is None
|