diff --git a/metadata-ingestion/src/datahub/emitter/mcp.py b/metadata-ingestion/src/datahub/emitter/mcp.py index 027dc768a4..20e0c659ae 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp.py +++ b/metadata-ingestion/src/datahub/emitter/mcp.py @@ -197,6 +197,11 @@ class MetadataChangeProposalWrapper: Raises: Exception if the generic aspect is invalid, e.g. contains invalid json. """ + + if mcpc.changeType != ChangeTypeClass.UPSERT: + # We can only generate MCPWs for upserts. + return None + converted, aspect = _try_from_generic_aspect(mcpc.aspectName, mcpc.aspect) if converted: return cls( diff --git a/metadata-ingestion/tests/unit/patch/complex_dataset_patch.json b/metadata-ingestion/tests/unit/patch/complex_dataset_patch.json new file mode 100644 index 0000000000..08e100140c --- /dev/null +++ b/metadata-ingestion/tests/unit/patch/complex_dataset_patch.json @@ -0,0 +1,42 @@ +[ + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)", + "changeType": "PATCH", + "aspectName": "datasetProperties", + "aspect": { + "value": "[{\"op\": \"replace\", \"path\": \"/description\", \"value\": \"test description\"}, {\"op\": \"add\", \"path\": \"/customProperties/test_key_1\", \"value\": \"test_value_1\"}, {\"op\": \"add\", \"path\": \"/customProperties/test_key_2\", \"value\": \"test_value_2\"}]", + "contentType": "application/json-patch+json" + } + }, + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)", + "changeType": "PATCH", + "aspectName": "globalTags", + "aspect": { + "value": "[{\"op\": \"add\", \"path\": \"/tags/urn:li:tag:test_tag\", \"value\": {\"tag\": \"urn:li:tag:test_tag\"}}]", + "contentType": "application/json-patch+json" + } + }, + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)", + "changeType": "PATCH", + "aspectName": "upstreamLineage", + "aspect": { + "value": "[{\"op\": \"add\", \"path\": \"/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahive%2Cfct_users_created_upstream%2CPROD%29\", \"value\": {\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)\", \"type\": \"TRANSFORMED\"}}]", + "contentType": "application/json-patch+json" + } + }, + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)", + "changeType": "PATCH", + "aspectName": "editableSchemaMetadata", + "aspect": { + "value": "[{\"op\": \"add\", \"path\": \"/editableSchemaFieldInfo/field1/globalTags/tags/urn:li:tag:tag1\", \"value\": {\"tag\": \"urn:li:tag:tag1\"}}]", + "contentType": "application/json-patch+json" + } + } +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/patch/test_patch_builder.py b/metadata-ingestion/tests/unit/patch/test_patch_builder.py new file mode 100644 index 0000000000..0701b3d696 --- /dev/null +++ b/metadata-ingestion/tests/unit/patch/test_patch_builder.py @@ -0,0 +1,66 @@ +import json +import pathlib + +import pytest + +from datahub.emitter.mce_builder import make_dataset_urn, make_tag_urn +from datahub.ingestion.sink.file import write_metadata_file +from datahub.metadata.schema_classes import ( + DatasetLineageTypeClass, + GenericAspectClass, + MetadataChangeProposalClass, + TagAssociationClass, + UpstreamClass, +) +from datahub.specific.dataset import DatasetPatchBuilder + + +def test_basic_dataset_patch_builder(): + patcher = DatasetPatchBuilder( + make_dataset_urn(platform="hive", name="fct_users_created", env="PROD") + ).add_tag(TagAssociationClass(tag=make_tag_urn("test_tag"))) + + assert patcher.build() == [ + MetadataChangeProposalClass( + entityType="dataset", + entityUrn="urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)", + changeType="PATCH", + aspectName="globalTags", + aspect=GenericAspectClass( + value=b'[{"op": "add", "path": "/tags/urn:li:tag:test_tag", "value": {"tag": "urn:li:tag:test_tag"}}]', + contentType="application/json-patch+json", + ), + ), + ] + + +def test_complex_dataset_patch( + pytestconfig: pytest.Config, tmp_path: pathlib.Path +) -> None: + patcher = ( + DatasetPatchBuilder( + make_dataset_urn(platform="hive", name="fct_users_created", env="PROD") + ) + .set_description("test description") + .add_custom_property("test_key_1", "test_value_1") + .add_custom_property("test_key_2", "test_value_2") + .add_tag(TagAssociationClass(tag=make_tag_urn("test_tag"))) + .add_upstream_lineage( + upstream=UpstreamClass( + dataset=make_dataset_urn( + platform="hive", name="fct_users_created_upstream", env="PROD" + ), + type=DatasetLineageTypeClass.TRANSFORMED, + ) + ) + ) + patcher.for_field("field1").add_tag(TagAssociationClass(tag=make_tag_urn("tag1"))) + + out_path = tmp_path / "patch.json" + write_metadata_file(out_path, patcher.build()) + + assert json.loads(out_path.read_text()) == json.loads( + ( + pytestconfig.rootpath / "tests/unit/patch/complex_dataset_patch.json" + ).read_text() + ) diff --git a/metadata-ingestion/tests/unit/serde/test_serde.py b/metadata-ingestion/tests/unit/serde/test_serde.py index 4be6df15f9..d116f1f547 100644 --- a/metadata-ingestion/tests/unit/serde/test_serde.py +++ b/metadata-ingestion/tests/unit/serde/test_serde.py @@ -36,15 +36,16 @@ FROZEN_TIME = "2021-07-22 18:54:06" "tests/unit/serde/test_serde_usage.json", # Profiles with the MetadataChangeProposal format. "tests/unit/serde/test_serde_profile.json", + # Test one that uses patch. + "tests/unit/serde/test_serde_patch.json", ], ) def test_serde_to_json( pytestconfig: PytestConfig, tmp_path: pathlib.Path, json_filename: str ) -> None: golden_file = pytestconfig.rootpath / json_filename + output_file = tmp_path / "output.json" - output_filename = "output.json" - output_file = tmp_path / output_filename pipeline = Pipeline.create( { "source": {"type": "file", "config": {"filename": str(golden_file)}}, @@ -57,7 +58,7 @@ def test_serde_to_json( mce_helpers.check_golden_file( pytestconfig, - output_path=f"{tmp_path}/{output_filename}", + output_path=f"{output_file}", golden_path=golden_file, ) diff --git a/metadata-ingestion/tests/unit/serde/test_serde_patch.json b/metadata-ingestion/tests/unit/serde/test_serde_patch.json new file mode 100644 index 0000000000..33684ea68e --- /dev/null +++ b/metadata-ingestion/tests/unit/serde/test_serde_patch.json @@ -0,0 +1,58 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)", + "changeType": "PATCH", + "aspectName": "datasetProperties", + "aspect": { + "value": "[{\"op\": \"replace\", \"path\": \"/description\", \"value\": \"test description\"}, {\"op\": \"add\", \"path\": \"/customProperties/test_key_1\", \"value\": \"test_value_1\"}, {\"op\": \"add\", \"path\": \"/customProperties/test_key_2\", \"value\": \"test_value_2\"}]", + "contentType": "application/json-patch+json" + }, + "systemMetadata": { + "lastObserved": 1626980046000, + "runId": "serde_test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)", + "changeType": "PATCH", + "aspectName": "globalTags", + "aspect": { + "value": "[{\"op\": \"add\", \"path\": \"/tags/urn:li:tag:test_tag\", \"value\": {\"tag\": \"urn:li:tag:test_tag\"}}]", + "contentType": "application/json-patch+json" + }, + "systemMetadata": { + "lastObserved": 1626980046000, + "runId": "serde_test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)", + "changeType": "PATCH", + "aspectName": "upstreamLineage", + "aspect": { + "value": "[{\"op\": \"add\", \"path\": \"/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahive%2Cfct_users_created_upstream%2CPROD%29\", \"value\": {\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)\", \"type\": \"TRANSFORMED\"}}]", + "contentType": "application/json-patch+json" + }, + "systemMetadata": { + "lastObserved": 1626980046000, + "runId": "serde_test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)", + "changeType": "PATCH", + "aspectName": "editableSchemaMetadata", + "aspect": { + "value": "[{\"op\": \"add\", \"path\": \"/editableSchemaFieldInfo/field1/globalTags/tags/urn:li:tag:tag1\", \"value\": {\"tag\": \"urn:li:tag:tag1\"}}]", + "contentType": "application/json-patch+json" + }, + "systemMetadata": { + "lastObserved": 1626980046000, + "runId": "serde_test" + } +} +] \ No newline at end of file