feat(sdk): support patches as MCPs in file source (#8220)

Co-authored-by: Shirshanka Das <shirshanka@apache.org>
This commit is contained in:
Harshal Sheth 2023-06-14 14:56:27 -07:00 committed by GitHub
parent 9254a1b614
commit 2d7692a245
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 175 additions and 3 deletions

View File

@ -197,6 +197,11 @@ class MetadataChangeProposalWrapper:
Raises:
Exception if the generic aspect is invalid, e.g. contains invalid json.
"""
if mcpc.changeType != ChangeTypeClass.UPSERT:
# We can only generate MCPWs for upserts.
return None
converted, aspect = _try_from_generic_aspect(mcpc.aspectName, mcpc.aspect)
if converted:
return cls(

View File

@ -0,0 +1,42 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "datasetProperties",
"aspect": {
"value": "[{\"op\": \"replace\", \"path\": \"/description\", \"value\": \"test description\"}, {\"op\": \"add\", \"path\": \"/customProperties/test_key_1\", \"value\": \"test_value_1\"}, {\"op\": \"add\", \"path\": \"/customProperties/test_key_2\", \"value\": \"test_value_2\"}]",
"contentType": "application/json-patch+json"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "globalTags",
"aspect": {
"value": "[{\"op\": \"add\", \"path\": \"/tags/urn:li:tag:test_tag\", \"value\": {\"tag\": \"urn:li:tag:test_tag\"}}]",
"contentType": "application/json-patch+json"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "upstreamLineage",
"aspect": {
"value": "[{\"op\": \"add\", \"path\": \"/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahive%2Cfct_users_created_upstream%2CPROD%29\", \"value\": {\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)\", \"type\": \"TRANSFORMED\"}}]",
"contentType": "application/json-patch+json"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "editableSchemaMetadata",
"aspect": {
"value": "[{\"op\": \"add\", \"path\": \"/editableSchemaFieldInfo/field1/globalTags/tags/urn:li:tag:tag1\", \"value\": {\"tag\": \"urn:li:tag:tag1\"}}]",
"contentType": "application/json-patch+json"
}
}
]

View File

@ -0,0 +1,66 @@
import json
import pathlib
import pytest
from datahub.emitter.mce_builder import make_dataset_urn, make_tag_urn
from datahub.ingestion.sink.file import write_metadata_file
from datahub.metadata.schema_classes import (
DatasetLineageTypeClass,
GenericAspectClass,
MetadataChangeProposalClass,
TagAssociationClass,
UpstreamClass,
)
from datahub.specific.dataset import DatasetPatchBuilder
def test_basic_dataset_patch_builder():
patcher = DatasetPatchBuilder(
make_dataset_urn(platform="hive", name="fct_users_created", env="PROD")
).add_tag(TagAssociationClass(tag=make_tag_urn("test_tag")))
assert patcher.build() == [
MetadataChangeProposalClass(
entityType="dataset",
entityUrn="urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
changeType="PATCH",
aspectName="globalTags",
aspect=GenericAspectClass(
value=b'[{"op": "add", "path": "/tags/urn:li:tag:test_tag", "value": {"tag": "urn:li:tag:test_tag"}}]',
contentType="application/json-patch+json",
),
),
]
def test_complex_dataset_patch(
pytestconfig: pytest.Config, tmp_path: pathlib.Path
) -> None:
patcher = (
DatasetPatchBuilder(
make_dataset_urn(platform="hive", name="fct_users_created", env="PROD")
)
.set_description("test description")
.add_custom_property("test_key_1", "test_value_1")
.add_custom_property("test_key_2", "test_value_2")
.add_tag(TagAssociationClass(tag=make_tag_urn("test_tag")))
.add_upstream_lineage(
upstream=UpstreamClass(
dataset=make_dataset_urn(
platform="hive", name="fct_users_created_upstream", env="PROD"
),
type=DatasetLineageTypeClass.TRANSFORMED,
)
)
)
patcher.for_field("field1").add_tag(TagAssociationClass(tag=make_tag_urn("tag1")))
out_path = tmp_path / "patch.json"
write_metadata_file(out_path, patcher.build())
assert json.loads(out_path.read_text()) == json.loads(
(
pytestconfig.rootpath / "tests/unit/patch/complex_dataset_patch.json"
).read_text()
)

View File

@ -36,15 +36,16 @@ FROZEN_TIME = "2021-07-22 18:54:06"
"tests/unit/serde/test_serde_usage.json",
# Profiles with the MetadataChangeProposal format.
"tests/unit/serde/test_serde_profile.json",
# Test one that uses patch.
"tests/unit/serde/test_serde_patch.json",
],
)
def test_serde_to_json(
pytestconfig: PytestConfig, tmp_path: pathlib.Path, json_filename: str
) -> None:
golden_file = pytestconfig.rootpath / json_filename
output_file = tmp_path / "output.json"
output_filename = "output.json"
output_file = tmp_path / output_filename
pipeline = Pipeline.create(
{
"source": {"type": "file", "config": {"filename": str(golden_file)}},
@ -57,7 +58,7 @@ def test_serde_to_json(
mce_helpers.check_golden_file(
pytestconfig,
output_path=f"{tmp_path}/{output_filename}",
output_path=f"{output_file}",
golden_path=golden_file,
)

View File

@ -0,0 +1,58 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "datasetProperties",
"aspect": {
"value": "[{\"op\": \"replace\", \"path\": \"/description\", \"value\": \"test description\"}, {\"op\": \"add\", \"path\": \"/customProperties/test_key_1\", \"value\": \"test_value_1\"}, {\"op\": \"add\", \"path\": \"/customProperties/test_key_2\", \"value\": \"test_value_2\"}]",
"contentType": "application/json-patch+json"
},
"systemMetadata": {
"lastObserved": 1626980046000,
"runId": "serde_test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "globalTags",
"aspect": {
"value": "[{\"op\": \"add\", \"path\": \"/tags/urn:li:tag:test_tag\", \"value\": {\"tag\": \"urn:li:tag:test_tag\"}}]",
"contentType": "application/json-patch+json"
},
"systemMetadata": {
"lastObserved": 1626980046000,
"runId": "serde_test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "upstreamLineage",
"aspect": {
"value": "[{\"op\": \"add\", \"path\": \"/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahive%2Cfct_users_created_upstream%2CPROD%29\", \"value\": {\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)\", \"type\": \"TRANSFORMED\"}}]",
"contentType": "application/json-patch+json"
},
"systemMetadata": {
"lastObserved": 1626980046000,
"runId": "serde_test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "editableSchemaMetadata",
"aspect": {
"value": "[{\"op\": \"add\", \"path\": \"/editableSchemaFieldInfo/field1/globalTags/tags/urn:li:tag:tag1\", \"value\": {\"tag\": \"urn:li:tag:tag1\"}}]",
"contentType": "application/json-patch+json"
},
"systemMetadata": {
"lastObserved": 1626980046000,
"runId": "serde_test"
}
}
]