From 094433c3618983206dbbe17eb152b3db39a28820 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Tue, 26 Nov 2024 12:57:47 -0600 Subject: [PATCH] fix(dataproduct): optimize data product sideeffect (#11961) --- .../DataProductUnsetSideEffect.java | 159 +++++++++++------- .../DataProductUnsetSideEffectTest.java | 112 +++++++++++- .../SpringStandardPluginConfiguration.java | 3 +- 3 files changed, 202 insertions(+), 72 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/dataproducts/sideeffects/DataProductUnsetSideEffect.java b/metadata-io/src/main/java/com/linkedin/metadata/dataproducts/sideeffects/DataProductUnsetSideEffect.java index 9c4bb52f01..dae1a8ff51 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/dataproducts/sideeffects/DataProductUnsetSideEffect.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/dataproducts/sideeffects/DataProductUnsetSideEffect.java @@ -9,6 +9,7 @@ import com.linkedin.common.urn.UrnUtils; import com.linkedin.dataproduct.DataProductAssociation; import com.linkedin.dataproduct.DataProductAssociationArray; import com.linkedin.dataproduct.DataProductProperties; +import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.aspect.RetrieverContext; import com.linkedin.metadata.aspect.batch.ChangeMCP; import com.linkedin.metadata.aspect.batch.MCLItem; @@ -31,6 +32,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nonnull; import lombok.Getter; @@ -65,77 +67,108 @@ public class DataProductUnsetSideEffect extends MCPSideEffect { MCLItem mclItem, @Nonnull RetrieverContext retrieverContext) { if (DATA_PRODUCT_PROPERTIES_ASPECT_NAME.equals(mclItem.getAspectName())) { - List mcpItems = new ArrayList<>(); + DataProductProperties dataProductProperties = mclItem.getAspect(DataProductProperties.class); if (dataProductProperties == null) { log.error("Unable to process data product properties for urn: {}", mclItem.getUrn()); return Stream.empty(); } - Map> patchOpMap = new HashMap<>(); - for (DataProductAssociation dataProductAssociation : + DataProductAssociationArray newDataProductAssociationArray = Optional.ofNullable(dataProductProperties.getAssets()) - .orElse(new DataProductAssociationArray())) { - RelatedEntitiesScrollResult result = - retrieverContext - .getGraphRetriever() - .scrollRelatedEntities( - null, - QueryUtils.newFilter( - "urn", dataProductAssociation.getDestinationUrn().toString()), - null, - EMPTY_FILTER, - ImmutableList.of("DataProductContains"), - QueryUtils.newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING), - Collections.emptyList(), - null, - 10, // Should only ever be one, if ever greater than ten will decrease over time - // to become consistent - null, - null); - if (!result.getEntities().isEmpty()) { - for (RelatedEntities entity : result.getEntities()) { - if (!mclItem.getUrn().equals(UrnUtils.getUrn(entity.getSourceUrn()))) { - GenericJsonPatch.PatchOp patchOp = new GenericJsonPatch.PatchOp(); - patchOp.setOp(PatchOperationType.REMOVE.getValue()); - patchOp.setPath(String.format("/assets/%s", entity.getDestinationUrn())); - patchOpMap - .computeIfAbsent(entity.getSourceUrn(), urn -> new ArrayList<>()) - .add(patchOp); - } - } - } + .orElse(new DataProductAssociationArray()); + + DataProductProperties previousDataProductProperties = + mclItem.getPreviousAspect(DataProductProperties.class); + + if (!ChangeType.UPSERT.equals(mclItem.getChangeType()) + || previousDataProductProperties == null) { + // CREATE/CREATE_ENTITY/RESTATE + return generateUnsetMCPs(mclItem, newDataProductAssociationArray, retrieverContext); + } else { + // UPSERT with previous + DataProductAssociationArray oldDataProductAssociationArray = + Optional.ofNullable(previousDataProductProperties.getAssets()) + .orElse(new DataProductAssociationArray()); + + DataProductAssociationArray additions = + newDataProductAssociationArray.stream() + .filter(association -> !oldDataProductAssociationArray.contains(association)) + .collect(Collectors.toCollection(DataProductAssociationArray::new)); + + return generateUnsetMCPs(mclItem, additions, retrieverContext); } - for (String urn : patchOpMap.keySet()) { - EntitySpec entitySpec = - retrieverContext - .getAspectRetriever() - .getEntityRegistry() - .getEntitySpec(DATA_PRODUCT_ENTITY_NAME); - mcpItems.add( - PatchItemImpl.builder() - .urn(UrnUtils.getUrn(urn)) - .entitySpec( - retrieverContext - .getAspectRetriever() - .getEntityRegistry() - .getEntitySpec(DATA_PRODUCT_ENTITY_NAME)) - .aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME) - .aspectSpec(entitySpec.getAspectSpec(DATA_PRODUCT_PROPERTIES_ASPECT_NAME)) - .patch( - GenericJsonPatch.builder() - .arrayPrimaryKeys( - Map.of( - DataProductPropertiesTemplate.ASSETS_FIELD_NAME, - List.of(DataProductPropertiesTemplate.KEY_FIELD_NAME))) - .patch(patchOpMap.get(urn)) - .build() - .getJsonPatch()) - .auditStamp(mclItem.getAuditStamp()) - .systemMetadata(mclItem.getSystemMetadata()) - .build(retrieverContext.getAspectRetriever().getEntityRegistry())); - } - return mcpItems.stream(); } return Stream.empty(); } + + private static Stream generateUnsetMCPs( + @Nonnull MCLItem dataProductItem, + @Nonnull DataProductAssociationArray dataProductAssociations, + @Nonnull RetrieverContext retrieverContext) { + List mcpItems = new ArrayList<>(); + Map> patchOpMap = new HashMap<>(); + + for (DataProductAssociation dataProductAssociation : dataProductAssociations) { + RelatedEntitiesScrollResult result = + retrieverContext + .getGraphRetriever() + .scrollRelatedEntities( + null, + QueryUtils.newFilter( + "urn", dataProductAssociation.getDestinationUrn().toString()), + null, + EMPTY_FILTER, + ImmutableList.of("DataProductContains"), + QueryUtils.newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING), + Collections.emptyList(), + null, + 10, // Should only ever be one, if ever greater than ten will decrease over time + // to become consistent + null, + null); + if (!result.getEntities().isEmpty()) { + for (RelatedEntities entity : result.getEntities()) { + if (!dataProductItem.getUrn().equals(UrnUtils.getUrn(entity.getSourceUrn()))) { + GenericJsonPatch.PatchOp patchOp = new GenericJsonPatch.PatchOp(); + patchOp.setOp(PatchOperationType.REMOVE.getValue()); + patchOp.setPath(String.format("/assets/%s", entity.getDestinationUrn())); + patchOpMap + .computeIfAbsent(entity.getSourceUrn(), urn -> new ArrayList<>()) + .add(patchOp); + } + } + } + } + for (String urn : patchOpMap.keySet()) { + EntitySpec entitySpec = + retrieverContext + .getAspectRetriever() + .getEntityRegistry() + .getEntitySpec(DATA_PRODUCT_ENTITY_NAME); + mcpItems.add( + PatchItemImpl.builder() + .urn(UrnUtils.getUrn(urn)) + .entitySpec( + retrieverContext + .getAspectRetriever() + .getEntityRegistry() + .getEntitySpec(DATA_PRODUCT_ENTITY_NAME)) + .aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME) + .aspectSpec(entitySpec.getAspectSpec(DATA_PRODUCT_PROPERTIES_ASPECT_NAME)) + .patch( + GenericJsonPatch.builder() + .arrayPrimaryKeys( + Map.of( + DataProductPropertiesTemplate.ASSETS_FIELD_NAME, + List.of(DataProductPropertiesTemplate.KEY_FIELD_NAME))) + .patch(patchOpMap.get(urn)) + .build() + .getJsonPatch()) + .auditStamp(dataProductItem.getAuditStamp()) + .systemMetadata(dataProductItem.getSystemMetadata()) + .build(retrieverContext.getAspectRetriever().getEntityRegistry())); + } + + return mcpItems.stream(); + } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/dataproducts/sideeffects/DataProductUnsetSideEffectTest.java b/metadata-io/src/test/java/com/linkedin/metadata/dataproducts/sideeffects/DataProductUnsetSideEffectTest.java index 12dd57f94d..976b165fea 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/dataproducts/sideeffects/DataProductUnsetSideEffectTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/dataproducts/sideeffects/DataProductUnsetSideEffectTest.java @@ -17,6 +17,7 @@ import com.linkedin.dataproduct.DataProductProperties; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.aspect.AspectRetriever; import com.linkedin.metadata.aspect.GraphRetriever; +import com.linkedin.metadata.aspect.SystemAspect; import com.linkedin.metadata.aspect.batch.MCPItem; import com.linkedin.metadata.aspect.models.graph.RelatedEntities; import com.linkedin.metadata.aspect.models.graph.RelatedEntitiesScrollResult; @@ -47,13 +48,7 @@ import org.testng.annotations.Test; public class DataProductUnsetSideEffectTest { private static final EntityRegistry TEST_REGISTRY = new TestEntityRegistry(); private static final List SUPPORTED_CHANGE_TYPES = - List.of( - ChangeType.CREATE, - ChangeType.PATCH, - ChangeType.CREATE_ENTITY, - ChangeType.UPSERT, - ChangeType.DELETE, - ChangeType.RESTATE); + List.of(ChangeType.CREATE, ChangeType.CREATE_ENTITY, ChangeType.UPSERT, ChangeType.RESTATE); private static final Urn TEST_PRODUCT_URN = UrnUtils.getUrn("urn:li:dataProduct:someDataProductId"); @@ -358,6 +353,109 @@ public class DataProductUnsetSideEffectTest { } } + @Test + public void testUpsertWithPreviousAspect() { + DataProductUnsetSideEffect test = new DataProductUnsetSideEffect(); + test.setConfig(TEST_PLUGIN_CONFIG); + + // Case 1: UPSERT with new additions + DataProductProperties previousProperties = new DataProductProperties(); + DataProductAssociationArray previousAssociations = new DataProductAssociationArray(); + DataProductAssociation previousAssociation = new DataProductAssociation(); + previousAssociation.setDestinationUrn(DATASET_URN_1); + previousAssociations.add(previousAssociation); + previousProperties.setAssets(previousAssociations); + + // New properties include both old and new datasets + DataProductProperties newProperties = new DataProductProperties(); + DataProductAssociationArray newAssociations = new DataProductAssociationArray(); + DataProductAssociation association1 = new DataProductAssociation(); + association1.setDestinationUrn(DATASET_URN_1); + DataProductAssociation association2 = new DataProductAssociation(); + association2.setDestinationUrn(DATASET_URN_2); + newAssociations.add(association1); + newAssociations.add(association2); + newProperties.setAssets(newAssociations); + + // Create change item with previous aspect + SystemAspect prevData = mock(SystemAspect.class); + when(prevData.getRecordTemplate()).thenReturn(previousProperties); + + ChangeItemImpl dataProductPropertiesChangeItem = + ChangeItemImpl.builder() + .urn(TEST_PRODUCT_URN) + .aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME) + .changeType(ChangeType.UPSERT) + .entitySpec(TEST_REGISTRY.getEntitySpec(DATA_PRODUCT_ENTITY_NAME)) + .aspectSpec( + TEST_REGISTRY + .getEntitySpec(DATA_PRODUCT_ENTITY_NAME) + .getAspectSpec(DATA_PRODUCT_PROPERTIES_ASPECT_NAME)) + .recordTemplate(newProperties) + .previousSystemAspect(prevData) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(mockAspectRetriever); + + List testOutput = + test.postMCPSideEffect( + List.of( + MCLItemImpl.builder() + .build( + dataProductPropertiesChangeItem, + null, + null, + retrieverContext.getAspectRetriever())), + retrieverContext) + .toList(); + + // Verify that only one patch is generated for the new dataset + assertEquals( + testOutput.size(), 1, "Expected removal of previous data product for new dataset only"); + MCPItem patchItem = testOutput.get(0); + assertEquals( + patchItem.getUrn(), TEST_PRODUCT_URN_2, "Patch should target the old data product"); + GenericJsonPatch.PatchOp expectedPatchOp = new GenericJsonPatch.PatchOp(); + expectedPatchOp.setOp(PatchOperationType.REMOVE.getValue()); + expectedPatchOp.setPath(String.format("/assets/%s", DATASET_URN_2)); + + // Case 2: UPSERT with no new additions + DataProductProperties sameProperties = new DataProductProperties(); + DataProductAssociationArray sameAssociations = new DataProductAssociationArray(); + DataProductAssociation sameAssociation = new DataProductAssociation(); + sameAssociation.setDestinationUrn(DATASET_URN_1); + sameAssociations.add(sameAssociation); + sameProperties.setAssets(sameAssociations); + + SystemAspect prevSameData = mock(SystemAspect.class); + when(prevData.getRecordTemplate()).thenReturn(sameProperties); + + ChangeItemImpl noChangeItem = + ChangeItemImpl.builder() + .urn(TEST_PRODUCT_URN) + .aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME) + .changeType(ChangeType.UPSERT) + .entitySpec(TEST_REGISTRY.getEntitySpec(DATA_PRODUCT_ENTITY_NAME)) + .aspectSpec( + TEST_REGISTRY + .getEntitySpec(DATA_PRODUCT_ENTITY_NAME) + .getAspectSpec(DATA_PRODUCT_PROPERTIES_ASPECT_NAME)) + .recordTemplate(sameProperties) + .previousSystemAspect(prevSameData) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(mockAspectRetriever); + + List noChangeOutput = + test.postMCPSideEffect( + List.of( + MCLItemImpl.builder() + .build(noChangeItem, null, null, retrieverContext.getAspectRetriever())), + retrieverContext) + .toList(); + + // Verify no patches are generated when there are no new additions + assertEquals(noChangeOutput.size(), 0, "Expected no changes when assets are the same"); + } + private static DataProductProperties getTestDataProductProperties(Urn destinationUrn) { DataProductProperties dataProductProperties = new DataProductProperties(); DataProductAssociationArray dataProductAssociations = new DataProductAssociationArray(); diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java index b2db0857a6..26e0da8e6f 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java @@ -98,8 +98,7 @@ public class SpringStandardPluginConfiguration { AspectPluginConfig.builder() .enabled(true) .className(DataProductUnsetSideEffect.class.getName()) - .supportedOperations( - List.of("CREATE", "CREATE_ENTITY", "UPSERT", "RESTATE", "DELETE", "PATCH")) + .supportedOperations(List.of("CREATE", "CREATE_ENTITY", "UPSERT", "RESTATE")) .supportedEntityAspectNames( List.of( AspectPluginConfig.EntityAspectName.builder()