From ba2f1d3147a59d3736c8dbb79278b1163a63bca3 Mon Sep 17 00:00:00 2001 From: RyanHolstien Date: Fri, 4 Oct 2024 16:32:08 -0500 Subject: [PATCH] feat(dataProduct): add data product unset side effect (#11512) Co-authored-by: david-leifker <114954101+david-leifker@users.noreply.github.com> --- .../DataProductPropertiesTemplate.java | 4 +- .../DataProductUnsetSideEffect.java | 134 +++++++++ .../DataProductUnsetSideEffectTest.java | 263 ++++++++++++++++++ .../src/main/resources/application.yaml | 2 + .../SpringStandardPluginConfiguration.java | 24 ++ .../metadata/service/DataProductService.java | 8 +- 6 files changed, 426 insertions(+), 9 deletions(-) create mode 100644 metadata-io/src/main/java/com/linkedin/metadata/dataproducts/sideeffects/DataProductUnsetSideEffect.java create mode 100644 metadata-io/src/test/java/com/linkedin/metadata/dataproducts/sideeffects/DataProductUnsetSideEffectTest.java diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/template/dataproduct/DataProductPropertiesTemplate.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/template/dataproduct/DataProductPropertiesTemplate.java index 9b11711439..f1b83732f7 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/template/dataproduct/DataProductPropertiesTemplate.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/template/dataproduct/DataProductPropertiesTemplate.java @@ -10,8 +10,8 @@ import javax.annotation.Nonnull; public class DataProductPropertiesTemplate implements ArrayMergingTemplate { - private static final String ASSETS_FIELD_NAME = "assets"; - private static final String KEY_FIELD_NAME = "destinationUrn"; + public static final String ASSETS_FIELD_NAME = "assets"; + public static final String KEY_FIELD_NAME = "destinationUrn"; @Override public DataProductProperties getSubtype(RecordTemplate recordTemplate) throws ClassCastException { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/dataproducts/sideeffects/DataProductUnsetSideEffect.java b/metadata-io/src/main/java/com/linkedin/metadata/dataproducts/sideeffects/DataProductUnsetSideEffect.java new file mode 100644 index 0000000000..544040d14f --- /dev/null +++ b/metadata-io/src/main/java/com/linkedin/metadata/dataproducts/sideeffects/DataProductUnsetSideEffect.java @@ -0,0 +1,134 @@ +package com.linkedin.metadata.dataproducts.sideeffects; + +import static com.linkedin.metadata.Constants.DATA_PRODUCT_ENTITY_NAME; +import static com.linkedin.metadata.Constants.DATA_PRODUCT_PROPERTIES_ASPECT_NAME; +import static com.linkedin.metadata.search.utils.QueryUtils.EMPTY_FILTER; + +import com.google.common.collect.ImmutableList; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.dataproduct.DataProductAssociation; +import com.linkedin.dataproduct.DataProductAssociationArray; +import com.linkedin.dataproduct.DataProductProperties; +import com.linkedin.metadata.aspect.RetrieverContext; +import com.linkedin.metadata.aspect.batch.ChangeMCP; +import com.linkedin.metadata.aspect.batch.MCLItem; +import com.linkedin.metadata.aspect.batch.MCPItem; +import com.linkedin.metadata.aspect.models.graph.RelatedEntities; +import com.linkedin.metadata.aspect.models.graph.RelatedEntitiesScrollResult; +import com.linkedin.metadata.aspect.patch.GenericJsonPatch; +import com.linkedin.metadata.aspect.patch.PatchOperationType; +import com.linkedin.metadata.aspect.patch.template.dataproduct.DataProductPropertiesTemplate; +import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; +import com.linkedin.metadata.aspect.plugins.hooks.MCPSideEffect; +import com.linkedin.metadata.entity.ebean.batch.PatchItemImpl; +import com.linkedin.metadata.models.EntitySpec; +import com.linkedin.metadata.query.filter.RelationshipDirection; +import com.linkedin.metadata.search.utils.QueryUtils; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Stream; +import javax.annotation.Nonnull; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; + +/** + * Side effect that enforces single data product being associated with each entity by removing any + * previous relation when evaluation updates to Data Product Properties aspects. + */ +@Slf4j +@Getter +@Setter +@Accessors(chain = true) +public class DataProductUnsetSideEffect extends MCPSideEffect { + @Nonnull private AspectPluginConfig config; + + @Override + protected Stream applyMCPSideEffect( + Collection changeMCPS, @Nonnull RetrieverContext retrieverContext) { + return Stream.of(); + } + + @Override + protected Stream postMCPSideEffect( + Collection mclItems, @Nonnull RetrieverContext retrieverContext) { + return mclItems.stream().flatMap(item -> generatePatchRemove(item, retrieverContext)); + } + + private static Stream generatePatchRemove( + MCLItem mclItem, @Nonnull RetrieverContext retrieverContext) { + + if (DATA_PRODUCT_PROPERTIES_ASPECT_NAME.equals(mclItem.getAspectName())) { + List mcpItems = new ArrayList<>(); + DataProductProperties dataProductProperties = mclItem.getAspect(DataProductProperties.class); + if (dataProductProperties == null) { + log.error("Unable to process data product properties for urn: {}", mclItem.getUrn()); + return Stream.empty(); + } + for (DataProductAssociation dataProductAssociation : + Optional.ofNullable(dataProductProperties.getAssets()) + .orElse(new DataProductAssociationArray())) { + RelatedEntitiesScrollResult result = + retrieverContext + .getGraphRetriever() + .scrollRelatedEntities( + null, + QueryUtils.newFilter( + "urn", dataProductAssociation.getDestinationUrn().toString()), + null, + EMPTY_FILTER, + ImmutableList.of("DataProductContains"), + QueryUtils.newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING), + Collections.emptyList(), + null, + 10, // Should only ever be one, if ever greater than ten will decrease over time + // to become consistent + null, + null); + if (!result.getEntities().isEmpty()) { + for (RelatedEntities entity : result.getEntities()) { + if (!mclItem.getUrn().equals(UrnUtils.getUrn(entity.getSourceUrn()))) { + EntitySpec entitySpec = + retrieverContext + .getAspectRetriever() + .getEntityRegistry() + .getEntitySpec(DATA_PRODUCT_ENTITY_NAME); + GenericJsonPatch.PatchOp patchOp = new GenericJsonPatch.PatchOp(); + patchOp.setOp(PatchOperationType.REMOVE.getValue()); + patchOp.setPath(String.format("/assets/%s", entity.getDestinationUrn())); + mcpItems.add( + PatchItemImpl.builder() + .urn(UrnUtils.getUrn(entity.getSourceUrn())) + .entitySpec( + retrieverContext + .getAspectRetriever() + .getEntityRegistry() + .getEntitySpec(DATA_PRODUCT_ENTITY_NAME)) + .aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME) + .aspectSpec(entitySpec.getAspectSpec(DATA_PRODUCT_PROPERTIES_ASPECT_NAME)) + .patch( + GenericJsonPatch.builder() + .arrayPrimaryKeys( + Map.of( + DataProductPropertiesTemplate.ASSETS_FIELD_NAME, + List.of(DataProductPropertiesTemplate.KEY_FIELD_NAME))) + .patch(List.of(patchOp)) + .build() + .getJsonPatch()) + .auditStamp(mclItem.getAuditStamp()) + .systemMetadata(mclItem.getSystemMetadata()) + .build(retrieverContext.getAspectRetriever().getEntityRegistry())); + } + } + } + } + return mcpItems.stream(); + } + return Stream.empty(); + } +} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/dataproducts/sideeffects/DataProductUnsetSideEffectTest.java b/metadata-io/src/test/java/com/linkedin/metadata/dataproducts/sideeffects/DataProductUnsetSideEffectTest.java new file mode 100644 index 0000000000..1151014bf1 --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/dataproducts/sideeffects/DataProductUnsetSideEffectTest.java @@ -0,0 +1,263 @@ +package com.linkedin.metadata.dataproducts.sideeffects; + +import static com.linkedin.metadata.Constants.DATA_PRODUCT_ENTITY_NAME; +import static com.linkedin.metadata.Constants.DATA_PRODUCT_PROPERTIES_ASPECT_NAME; +import static com.linkedin.metadata.search.utils.QueryUtils.EMPTY_FILTER; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; + +import com.google.common.collect.ImmutableList; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.dataproduct.DataProductAssociation; +import com.linkedin.dataproduct.DataProductAssociationArray; +import com.linkedin.dataproduct.DataProductProperties; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.aspect.AspectRetriever; +import com.linkedin.metadata.aspect.GraphRetriever; +import com.linkedin.metadata.aspect.batch.MCPItem; +import com.linkedin.metadata.aspect.models.graph.RelatedEntities; +import com.linkedin.metadata.aspect.models.graph.RelatedEntitiesScrollResult; +import com.linkedin.metadata.aspect.patch.GenericJsonPatch; +import com.linkedin.metadata.aspect.patch.PatchOperationType; +import com.linkedin.metadata.aspect.patch.template.dataproduct.DataProductPropertiesTemplate; +import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; +import com.linkedin.metadata.entity.SearchRetriever; +import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl; +import com.linkedin.metadata.entity.ebean.batch.MCLItemImpl; +import com.linkedin.metadata.entity.ebean.batch.PatchItemImpl; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.query.filter.RelationshipDirection; +import com.linkedin.metadata.search.utils.QueryUtils; +import com.linkedin.metadata.utils.AuditStampUtils; +import com.linkedin.test.metadata.aspect.TestEntityRegistry; +import io.datahubproject.metadata.context.RetrieverContext; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class DataProductUnsetSideEffectTest { + private static final EntityRegistry TEST_REGISTRY = new TestEntityRegistry(); + private static final List SUPPORTED_CHANGE_TYPES = + List.of( + ChangeType.CREATE, + ChangeType.PATCH, + ChangeType.CREATE_ENTITY, + ChangeType.UPSERT, + ChangeType.DELETE, + ChangeType.RESTATE); + private static final Urn TEST_PRODUCT_URN = + UrnUtils.getUrn("urn:li:dataProduct:someDataProductId"); + + private static final Urn TEST_PRODUCT_URN_2 = + UrnUtils.getUrn("urn:li:dataProduct:someOtherDataProductId"); + + private static final Urn DATASET_URN_1 = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)"); + private static final Urn DATASET_URN_2 = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_deleted,PROD)"); + private static final AspectPluginConfig TEST_PLUGIN_CONFIG = + AspectPluginConfig.builder() + .className(DataProductUnsetSideEffect.class.getName()) + .enabled(true) + .supportedOperations( + SUPPORTED_CHANGE_TYPES.stream() + .map(ChangeType::toString) + .collect(Collectors.toList())) + .supportedEntityAspectNames( + List.of( + AspectPluginConfig.EntityAspectName.builder() + .entityName(DATA_PRODUCT_ENTITY_NAME) + .aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME) + .build())) + .build(); + + private AspectRetriever mockAspectRetriever; + private RetrieverContext retrieverContext; + + @BeforeMethod + public void setup() { + mockAspectRetriever = mock(AspectRetriever.class); + when(mockAspectRetriever.getEntityRegistry()).thenReturn(TEST_REGISTRY); + GraphRetriever graphRetriever = mock(GraphRetriever.class); + RelatedEntities relatedEntities = + new RelatedEntities( + "DataProductContains", + TEST_PRODUCT_URN.toString(), + DATASET_URN_1.toString(), + RelationshipDirection.INCOMING, + null); + + List relatedEntitiesList = new ArrayList<>(); + relatedEntitiesList.add(relatedEntities); + RelatedEntitiesScrollResult relatedEntitiesScrollResult = + new RelatedEntitiesScrollResult(1, 10, null, relatedEntitiesList); + when(graphRetriever.scrollRelatedEntities( + eq(null), + eq(QueryUtils.newFilter("urn", DATASET_URN_1.toString())), + eq(null), + eq(EMPTY_FILTER), + eq(ImmutableList.of("DataProductContains")), + eq(QueryUtils.newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING)), + eq(Collections.emptyList()), + eq(null), + eq(10), // Should only ever be one, if ever greater than ten will decrease over time to + // become consistent + eq(null), + eq(null))) + .thenReturn(relatedEntitiesScrollResult); + + RelatedEntities relatedEntities2 = + new RelatedEntities( + "DataProductContains", + TEST_PRODUCT_URN_2.toString(), + DATASET_URN_2.toString(), + RelationshipDirection.INCOMING, + null); + + List relatedEntitiesList2 = new ArrayList<>(); + relatedEntitiesList2.add(relatedEntities2); + RelatedEntitiesScrollResult relatedEntitiesScrollResult2 = + new RelatedEntitiesScrollResult(1, 10, null, relatedEntitiesList2); + when(graphRetriever.scrollRelatedEntities( + eq(null), + eq(QueryUtils.newFilter("urn", DATASET_URN_2.toString())), + eq(null), + eq(EMPTY_FILTER), + eq(ImmutableList.of("DataProductContains")), + eq(QueryUtils.newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING)), + eq(Collections.emptyList()), + eq(null), + eq(10), // Should only ever be one, if ever greater than ten will decrease over time to + // become consistent + eq(null), + eq(null))) + .thenReturn(relatedEntitiesScrollResult2); + retrieverContext = + RetrieverContext.builder() + .searchRetriever(mock(SearchRetriever.class)) + .aspectRetriever(mockAspectRetriever) + .graphRetriever(graphRetriever) + .build(); + } + + @Test + public void testDPAlreadySetToSame() { + DataProductUnsetSideEffect test = new DataProductUnsetSideEffect(); + test.setConfig(TEST_PLUGIN_CONFIG); + + DataProductProperties dataProductProperties = getTestDataProductProperties(DATASET_URN_1); + + List testOutput; + // Run test + ChangeItemImpl dataProductPropertiesChangeItem = + ChangeItemImpl.builder() + .urn(TEST_PRODUCT_URN) + .aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME) + .changeType(ChangeType.UPSERT) + .entitySpec(TEST_REGISTRY.getEntitySpec(DATA_PRODUCT_ENTITY_NAME)) + .aspectSpec( + TEST_REGISTRY + .getEntitySpec(DATA_PRODUCT_ENTITY_NAME) + .getAspectSpec(DATA_PRODUCT_PROPERTIES_ASPECT_NAME)) + .recordTemplate(dataProductProperties) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(mockAspectRetriever); + testOutput = + test.postMCPSideEffect( + List.of( + MCLItemImpl.builder() + .build( + dataProductPropertiesChangeItem, + null, + null, + retrieverContext.getAspectRetriever())), + retrieverContext) + .toList(); + + // Verify test + assertEquals(testOutput.size(), 0, "Expected no additional changes: " + testOutput); + } + + @Test + public void testDPRemoveOld() { + DataProductUnsetSideEffect test = new DataProductUnsetSideEffect(); + test.setConfig(TEST_PLUGIN_CONFIG); + + DataProductProperties dataProductProperties = getTestDataProductProperties(DATASET_URN_2); + + List testOutput; + // Run test + ChangeItemImpl dataProductPropertiesChangeItem = + ChangeItemImpl.builder() + .urn(TEST_PRODUCT_URN) + .aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME) + .changeType(ChangeType.UPSERT) + .entitySpec(TEST_REGISTRY.getEntitySpec(DATA_PRODUCT_ENTITY_NAME)) + .aspectSpec( + TEST_REGISTRY + .getEntitySpec(DATA_PRODUCT_ENTITY_NAME) + .getAspectSpec(DATA_PRODUCT_PROPERTIES_ASPECT_NAME)) + .recordTemplate(dataProductProperties) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(mockAspectRetriever); + testOutput = + test.postMCPSideEffect( + List.of( + MCLItemImpl.builder() + .build( + dataProductPropertiesChangeItem, + null, + null, + retrieverContext.getAspectRetriever())), + retrieverContext) + .toList(); + + // Verify test + assertEquals(testOutput.size(), 1, "Expected removal of previous data product: " + testOutput); + + GenericJsonPatch.PatchOp patchOp = new GenericJsonPatch.PatchOp(); + patchOp.setOp(PatchOperationType.REMOVE.getValue()); + patchOp.setPath(String.format("/assets/%s", DATASET_URN_2)); + + assertEquals( + testOutput, + List.of( + PatchItemImpl.builder() + .urn(TEST_PRODUCT_URN_2) + .aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME) + .patch( + GenericJsonPatch.builder() + .arrayPrimaryKeys( + Map.of( + DataProductPropertiesTemplate.ASSETS_FIELD_NAME, + List.of(DataProductPropertiesTemplate.KEY_FIELD_NAME))) + .patch(List.of(patchOp)) + .build() + .getJsonPatch()) + .entitySpec(TEST_REGISTRY.getEntitySpec(DATA_PRODUCT_ENTITY_NAME)) + .aspectSpec( + TEST_REGISTRY + .getEntitySpec(DATA_PRODUCT_ENTITY_NAME) + .getAspectSpec(DATA_PRODUCT_PROPERTIES_ASPECT_NAME)) + .auditStamp(dataProductPropertiesChangeItem.getAuditStamp()) + .systemMetadata(dataProductPropertiesChangeItem.getSystemMetadata()) + .build(mockAspectRetriever.getEntityRegistry()))); + } + + private static DataProductProperties getTestDataProductProperties(Urn destinationUrn) { + DataProductProperties dataProductProperties = new DataProductProperties(); + DataProductAssociationArray dataProductAssociations = new DataProductAssociationArray(); + DataProductAssociation dataProductAssociation1 = new DataProductAssociation(); + dataProductAssociation1.setDestinationUrn(destinationUrn); + dataProductAssociations.add(dataProductAssociation1); + dataProductProperties.setAssets(dataProductAssociations); + return dataProductProperties; + } +} diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml index 24764b512c..ef3ae76d81 100644 --- a/metadata-service/configuration/src/main/resources/application.yaml +++ b/metadata-service/configuration/src/main/resources/application.yaml @@ -542,6 +542,8 @@ metadataChangeProposal: sideEffects: schemaField: enabled: ${MCP_SIDE_EFFECTS_SCHEMA_FIELD_ENABLED:false} + dataProductUnset: + enabled: ${MCP_SIDE_EFFECTS_DATA_PRODUCT_UNSET_ENABLED:true} throttle: updateIntervalMs: ${MCP_THROTTLE_UPDATE_INTERVAL_MS:60000} diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java index 67fe2dd6d6..4a2095685a 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java @@ -7,6 +7,7 @@ import com.linkedin.metadata.aspect.hooks.IgnoreUnknownMutator; import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; import com.linkedin.metadata.aspect.plugins.hooks.MCPSideEffect; import com.linkedin.metadata.aspect.plugins.hooks.MutationHook; +import com.linkedin.metadata.dataproducts.sideeffects.DataProductUnsetSideEffect; import com.linkedin.metadata.schemafields.sideeffects.SchemaFieldSideEffect; import com.linkedin.metadata.timeline.eventgenerator.EntityChangeEventGeneratorRegistry; import com.linkedin.metadata.timeline.eventgenerator.SchemaMetadataChangeEventGenerator; @@ -80,4 +81,27 @@ public class SpringStandardPluginConfiguration { .setConfig(config) .setEntityChangeEventGeneratorRegistry(entityChangeEventGeneratorRegistry); } + + @Bean + @ConditionalOnProperty( + name = "metadataChangeProposal.sideEffects.dataProductUnset.enabled", + havingValue = "true") + public MCPSideEffect dataProductUnsetSideEffect() { + AspectPluginConfig config = + AspectPluginConfig.builder() + .enabled(true) + .className(DataProductUnsetSideEffect.class.getName()) + .supportedOperations( + List.of("CREATE", "CREATE_ENTITY", "UPSERT", "RESTATE", "DELETE", "PATCH")) + .supportedEntityAspectNames( + List.of( + AspectPluginConfig.EntityAspectName.builder() + .entityName(Constants.DATA_PRODUCT_ENTITY_NAME) + .aspectName(Constants.DATA_PRODUCT_PROPERTIES_ASPECT_NAME) + .build())) + .build(); + + log.info("Initialized {}", SchemaFieldSideEffect.class.getName()); + return new DataProductUnsetSideEffect().setConfig(config); + } } diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/service/DataProductService.java b/metadata-service/services/src/main/java/com/linkedin/metadata/service/DataProductService.java index 3abd663832..f222c31d08 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/service/DataProductService.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/service/DataProductService.java @@ -340,12 +340,6 @@ public class DataProductService { .filter(urn -> !existingResourceUrns.contains(urn)) .collect(Collectors.toList()); - // unset existing data product on resources first as we only allow one data product on an - // entity at a time - for (Urn resourceUrn : resourceUrns) { - unsetDataProduct(opContext, resourceUrn, actorUrn); - } - AuditStamp nowAuditStamp = new AuditStamp().setTime(System.currentTimeMillis()).setActor(actorUrn); for (Urn resourceUrn : newResourceUrns) { @@ -390,7 +384,7 @@ public class DataProductService { 10, // should never be more than 1 as long as we only allow one actorUrn.toString()); - if (relationships.hasRelationships() && relationships.getRelationships().size() > 0) { + if (relationships.hasRelationships() && !relationships.getRelationships().isEmpty()) { relationships .getRelationships() .forEach(