feat(dataProduct): add data product unset side effect (#11512)

Co-authored-by: david-leifker <114954101+david-leifker@users.noreply.github.com>
This commit is contained in:
RyanHolstien 2024-10-04 16:32:08 -05:00 committed by GitHub
parent e71918c56a
commit ba2f1d3147
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 426 additions and 9 deletions

View File

@ -10,8 +10,8 @@ import javax.annotation.Nonnull;
public class DataProductPropertiesTemplate implements ArrayMergingTemplate<DataProductProperties> {
private static final String ASSETS_FIELD_NAME = "assets";
private static final String KEY_FIELD_NAME = "destinationUrn";
public static final String ASSETS_FIELD_NAME = "assets";
public static final String KEY_FIELD_NAME = "destinationUrn";
@Override
public DataProductProperties getSubtype(RecordTemplate recordTemplate) throws ClassCastException {

View File

@ -0,0 +1,134 @@
package com.linkedin.metadata.dataproducts.sideeffects;
import static com.linkedin.metadata.Constants.DATA_PRODUCT_ENTITY_NAME;
import static com.linkedin.metadata.Constants.DATA_PRODUCT_PROPERTIES_ASPECT_NAME;
import static com.linkedin.metadata.search.utils.QueryUtils.EMPTY_FILTER;
import com.google.common.collect.ImmutableList;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.dataproduct.DataProductAssociation;
import com.linkedin.dataproduct.DataProductAssociationArray;
import com.linkedin.dataproduct.DataProductProperties;
import com.linkedin.metadata.aspect.RetrieverContext;
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.aspect.batch.MCLItem;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.models.graph.RelatedEntities;
import com.linkedin.metadata.aspect.models.graph.RelatedEntitiesScrollResult;
import com.linkedin.metadata.aspect.patch.GenericJsonPatch;
import com.linkedin.metadata.aspect.patch.PatchOperationType;
import com.linkedin.metadata.aspect.patch.template.dataproduct.DataProductPropertiesTemplate;
import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig;
import com.linkedin.metadata.aspect.plugins.hooks.MCPSideEffect;
import com.linkedin.metadata.entity.ebean.batch.PatchItemImpl;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.query.filter.RelationshipDirection;
import com.linkedin.metadata.search.utils.QueryUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
/**
* Side effect that enforces single data product being associated with each entity by removing any
* previous relation when evaluation updates to Data Product Properties aspects.
*/
@Slf4j
@Getter
@Setter
@Accessors(chain = true)
public class DataProductUnsetSideEffect extends MCPSideEffect {
@Nonnull private AspectPluginConfig config;
@Override
protected Stream<ChangeMCP> applyMCPSideEffect(
Collection<ChangeMCP> changeMCPS, @Nonnull RetrieverContext retrieverContext) {
return Stream.of();
}
@Override
protected Stream<MCPItem> postMCPSideEffect(
Collection<MCLItem> mclItems, @Nonnull RetrieverContext retrieverContext) {
return mclItems.stream().flatMap(item -> generatePatchRemove(item, retrieverContext));
}
private static Stream<MCPItem> generatePatchRemove(
MCLItem mclItem, @Nonnull RetrieverContext retrieverContext) {
if (DATA_PRODUCT_PROPERTIES_ASPECT_NAME.equals(mclItem.getAspectName())) {
List<MCPItem> mcpItems = new ArrayList<>();
DataProductProperties dataProductProperties = mclItem.getAspect(DataProductProperties.class);
if (dataProductProperties == null) {
log.error("Unable to process data product properties for urn: {}", mclItem.getUrn());
return Stream.empty();
}
for (DataProductAssociation dataProductAssociation :
Optional.ofNullable(dataProductProperties.getAssets())
.orElse(new DataProductAssociationArray())) {
RelatedEntitiesScrollResult result =
retrieverContext
.getGraphRetriever()
.scrollRelatedEntities(
null,
QueryUtils.newFilter(
"urn", dataProductAssociation.getDestinationUrn().toString()),
null,
EMPTY_FILTER,
ImmutableList.of("DataProductContains"),
QueryUtils.newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING),
Collections.emptyList(),
null,
10, // Should only ever be one, if ever greater than ten will decrease over time
// to become consistent
null,
null);
if (!result.getEntities().isEmpty()) {
for (RelatedEntities entity : result.getEntities()) {
if (!mclItem.getUrn().equals(UrnUtils.getUrn(entity.getSourceUrn()))) {
EntitySpec entitySpec =
retrieverContext
.getAspectRetriever()
.getEntityRegistry()
.getEntitySpec(DATA_PRODUCT_ENTITY_NAME);
GenericJsonPatch.PatchOp patchOp = new GenericJsonPatch.PatchOp();
patchOp.setOp(PatchOperationType.REMOVE.getValue());
patchOp.setPath(String.format("/assets/%s", entity.getDestinationUrn()));
mcpItems.add(
PatchItemImpl.builder()
.urn(UrnUtils.getUrn(entity.getSourceUrn()))
.entitySpec(
retrieverContext
.getAspectRetriever()
.getEntityRegistry()
.getEntitySpec(DATA_PRODUCT_ENTITY_NAME))
.aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME)
.aspectSpec(entitySpec.getAspectSpec(DATA_PRODUCT_PROPERTIES_ASPECT_NAME))
.patch(
GenericJsonPatch.builder()
.arrayPrimaryKeys(
Map.of(
DataProductPropertiesTemplate.ASSETS_FIELD_NAME,
List.of(DataProductPropertiesTemplate.KEY_FIELD_NAME)))
.patch(List.of(patchOp))
.build()
.getJsonPatch())
.auditStamp(mclItem.getAuditStamp())
.systemMetadata(mclItem.getSystemMetadata())
.build(retrieverContext.getAspectRetriever().getEntityRegistry()));
}
}
}
}
return mcpItems.stream();
}
return Stream.empty();
}
}

View File

@ -0,0 +1,263 @@
package com.linkedin.metadata.dataproducts.sideeffects;
import static com.linkedin.metadata.Constants.DATA_PRODUCT_ENTITY_NAME;
import static com.linkedin.metadata.Constants.DATA_PRODUCT_PROPERTIES_ASPECT_NAME;
import static com.linkedin.metadata.search.utils.QueryUtils.EMPTY_FILTER;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import com.google.common.collect.ImmutableList;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.dataproduct.DataProductAssociation;
import com.linkedin.dataproduct.DataProductAssociationArray;
import com.linkedin.dataproduct.DataProductProperties;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.AspectRetriever;
import com.linkedin.metadata.aspect.GraphRetriever;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.models.graph.RelatedEntities;
import com.linkedin.metadata.aspect.models.graph.RelatedEntitiesScrollResult;
import com.linkedin.metadata.aspect.patch.GenericJsonPatch;
import com.linkedin.metadata.aspect.patch.PatchOperationType;
import com.linkedin.metadata.aspect.patch.template.dataproduct.DataProductPropertiesTemplate;
import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig;
import com.linkedin.metadata.entity.SearchRetriever;
import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl;
import com.linkedin.metadata.entity.ebean.batch.MCLItemImpl;
import com.linkedin.metadata.entity.ebean.batch.PatchItemImpl;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.query.filter.RelationshipDirection;
import com.linkedin.metadata.search.utils.QueryUtils;
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.test.metadata.aspect.TestEntityRegistry;
import io.datahubproject.metadata.context.RetrieverContext;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class DataProductUnsetSideEffectTest {
private static final EntityRegistry TEST_REGISTRY = new TestEntityRegistry();
private static final List<ChangeType> SUPPORTED_CHANGE_TYPES =
List.of(
ChangeType.CREATE,
ChangeType.PATCH,
ChangeType.CREATE_ENTITY,
ChangeType.UPSERT,
ChangeType.DELETE,
ChangeType.RESTATE);
private static final Urn TEST_PRODUCT_URN =
UrnUtils.getUrn("urn:li:dataProduct:someDataProductId");
private static final Urn TEST_PRODUCT_URN_2 =
UrnUtils.getUrn("urn:li:dataProduct:someOtherDataProductId");
private static final Urn DATASET_URN_1 =
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)");
private static final Urn DATASET_URN_2 =
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_deleted,PROD)");
private static final AspectPluginConfig TEST_PLUGIN_CONFIG =
AspectPluginConfig.builder()
.className(DataProductUnsetSideEffect.class.getName())
.enabled(true)
.supportedOperations(
SUPPORTED_CHANGE_TYPES.stream()
.map(ChangeType::toString)
.collect(Collectors.toList()))
.supportedEntityAspectNames(
List.of(
AspectPluginConfig.EntityAspectName.builder()
.entityName(DATA_PRODUCT_ENTITY_NAME)
.aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME)
.build()))
.build();
private AspectRetriever mockAspectRetriever;
private RetrieverContext retrieverContext;
@BeforeMethod
public void setup() {
mockAspectRetriever = mock(AspectRetriever.class);
when(mockAspectRetriever.getEntityRegistry()).thenReturn(TEST_REGISTRY);
GraphRetriever graphRetriever = mock(GraphRetriever.class);
RelatedEntities relatedEntities =
new RelatedEntities(
"DataProductContains",
TEST_PRODUCT_URN.toString(),
DATASET_URN_1.toString(),
RelationshipDirection.INCOMING,
null);
List<RelatedEntities> relatedEntitiesList = new ArrayList<>();
relatedEntitiesList.add(relatedEntities);
RelatedEntitiesScrollResult relatedEntitiesScrollResult =
new RelatedEntitiesScrollResult(1, 10, null, relatedEntitiesList);
when(graphRetriever.scrollRelatedEntities(
eq(null),
eq(QueryUtils.newFilter("urn", DATASET_URN_1.toString())),
eq(null),
eq(EMPTY_FILTER),
eq(ImmutableList.of("DataProductContains")),
eq(QueryUtils.newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING)),
eq(Collections.emptyList()),
eq(null),
eq(10), // Should only ever be one, if ever greater than ten will decrease over time to
// become consistent
eq(null),
eq(null)))
.thenReturn(relatedEntitiesScrollResult);
RelatedEntities relatedEntities2 =
new RelatedEntities(
"DataProductContains",
TEST_PRODUCT_URN_2.toString(),
DATASET_URN_2.toString(),
RelationshipDirection.INCOMING,
null);
List<RelatedEntities> relatedEntitiesList2 = new ArrayList<>();
relatedEntitiesList2.add(relatedEntities2);
RelatedEntitiesScrollResult relatedEntitiesScrollResult2 =
new RelatedEntitiesScrollResult(1, 10, null, relatedEntitiesList2);
when(graphRetriever.scrollRelatedEntities(
eq(null),
eq(QueryUtils.newFilter("urn", DATASET_URN_2.toString())),
eq(null),
eq(EMPTY_FILTER),
eq(ImmutableList.of("DataProductContains")),
eq(QueryUtils.newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING)),
eq(Collections.emptyList()),
eq(null),
eq(10), // Should only ever be one, if ever greater than ten will decrease over time to
// become consistent
eq(null),
eq(null)))
.thenReturn(relatedEntitiesScrollResult2);
retrieverContext =
RetrieverContext.builder()
.searchRetriever(mock(SearchRetriever.class))
.aspectRetriever(mockAspectRetriever)
.graphRetriever(graphRetriever)
.build();
}
@Test
public void testDPAlreadySetToSame() {
DataProductUnsetSideEffect test = new DataProductUnsetSideEffect();
test.setConfig(TEST_PLUGIN_CONFIG);
DataProductProperties dataProductProperties = getTestDataProductProperties(DATASET_URN_1);
List<MCPItem> testOutput;
// Run test
ChangeItemImpl dataProductPropertiesChangeItem =
ChangeItemImpl.builder()
.urn(TEST_PRODUCT_URN)
.aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME)
.changeType(ChangeType.UPSERT)
.entitySpec(TEST_REGISTRY.getEntitySpec(DATA_PRODUCT_ENTITY_NAME))
.aspectSpec(
TEST_REGISTRY
.getEntitySpec(DATA_PRODUCT_ENTITY_NAME)
.getAspectSpec(DATA_PRODUCT_PROPERTIES_ASPECT_NAME))
.recordTemplate(dataProductProperties)
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
.build(mockAspectRetriever);
testOutput =
test.postMCPSideEffect(
List.of(
MCLItemImpl.builder()
.build(
dataProductPropertiesChangeItem,
null,
null,
retrieverContext.getAspectRetriever())),
retrieverContext)
.toList();
// Verify test
assertEquals(testOutput.size(), 0, "Expected no additional changes: " + testOutput);
}
@Test
public void testDPRemoveOld() {
DataProductUnsetSideEffect test = new DataProductUnsetSideEffect();
test.setConfig(TEST_PLUGIN_CONFIG);
DataProductProperties dataProductProperties = getTestDataProductProperties(DATASET_URN_2);
List<MCPItem> testOutput;
// Run test
ChangeItemImpl dataProductPropertiesChangeItem =
ChangeItemImpl.builder()
.urn(TEST_PRODUCT_URN)
.aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME)
.changeType(ChangeType.UPSERT)
.entitySpec(TEST_REGISTRY.getEntitySpec(DATA_PRODUCT_ENTITY_NAME))
.aspectSpec(
TEST_REGISTRY
.getEntitySpec(DATA_PRODUCT_ENTITY_NAME)
.getAspectSpec(DATA_PRODUCT_PROPERTIES_ASPECT_NAME))
.recordTemplate(dataProductProperties)
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
.build(mockAspectRetriever);
testOutput =
test.postMCPSideEffect(
List.of(
MCLItemImpl.builder()
.build(
dataProductPropertiesChangeItem,
null,
null,
retrieverContext.getAspectRetriever())),
retrieverContext)
.toList();
// Verify test
assertEquals(testOutput.size(), 1, "Expected removal of previous data product: " + testOutput);
GenericJsonPatch.PatchOp patchOp = new GenericJsonPatch.PatchOp();
patchOp.setOp(PatchOperationType.REMOVE.getValue());
patchOp.setPath(String.format("/assets/%s", DATASET_URN_2));
assertEquals(
testOutput,
List.of(
PatchItemImpl.builder()
.urn(TEST_PRODUCT_URN_2)
.aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME)
.patch(
GenericJsonPatch.builder()
.arrayPrimaryKeys(
Map.of(
DataProductPropertiesTemplate.ASSETS_FIELD_NAME,
List.of(DataProductPropertiesTemplate.KEY_FIELD_NAME)))
.patch(List.of(patchOp))
.build()
.getJsonPatch())
.entitySpec(TEST_REGISTRY.getEntitySpec(DATA_PRODUCT_ENTITY_NAME))
.aspectSpec(
TEST_REGISTRY
.getEntitySpec(DATA_PRODUCT_ENTITY_NAME)
.getAspectSpec(DATA_PRODUCT_PROPERTIES_ASPECT_NAME))
.auditStamp(dataProductPropertiesChangeItem.getAuditStamp())
.systemMetadata(dataProductPropertiesChangeItem.getSystemMetadata())
.build(mockAspectRetriever.getEntityRegistry())));
}
private static DataProductProperties getTestDataProductProperties(Urn destinationUrn) {
DataProductProperties dataProductProperties = new DataProductProperties();
DataProductAssociationArray dataProductAssociations = new DataProductAssociationArray();
DataProductAssociation dataProductAssociation1 = new DataProductAssociation();
dataProductAssociation1.setDestinationUrn(destinationUrn);
dataProductAssociations.add(dataProductAssociation1);
dataProductProperties.setAssets(dataProductAssociations);
return dataProductProperties;
}
}

View File

@ -542,6 +542,8 @@ metadataChangeProposal:
sideEffects:
schemaField:
enabled: ${MCP_SIDE_EFFECTS_SCHEMA_FIELD_ENABLED:false}
dataProductUnset:
enabled: ${MCP_SIDE_EFFECTS_DATA_PRODUCT_UNSET_ENABLED:true}
throttle:
updateIntervalMs: ${MCP_THROTTLE_UPDATE_INTERVAL_MS:60000}

View File

@ -7,6 +7,7 @@ import com.linkedin.metadata.aspect.hooks.IgnoreUnknownMutator;
import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig;
import com.linkedin.metadata.aspect.plugins.hooks.MCPSideEffect;
import com.linkedin.metadata.aspect.plugins.hooks.MutationHook;
import com.linkedin.metadata.dataproducts.sideeffects.DataProductUnsetSideEffect;
import com.linkedin.metadata.schemafields.sideeffects.SchemaFieldSideEffect;
import com.linkedin.metadata.timeline.eventgenerator.EntityChangeEventGeneratorRegistry;
import com.linkedin.metadata.timeline.eventgenerator.SchemaMetadataChangeEventGenerator;
@ -80,4 +81,27 @@ public class SpringStandardPluginConfiguration {
.setConfig(config)
.setEntityChangeEventGeneratorRegistry(entityChangeEventGeneratorRegistry);
}
@Bean
@ConditionalOnProperty(
name = "metadataChangeProposal.sideEffects.dataProductUnset.enabled",
havingValue = "true")
public MCPSideEffect dataProductUnsetSideEffect() {
AspectPluginConfig config =
AspectPluginConfig.builder()
.enabled(true)
.className(DataProductUnsetSideEffect.class.getName())
.supportedOperations(
List.of("CREATE", "CREATE_ENTITY", "UPSERT", "RESTATE", "DELETE", "PATCH"))
.supportedEntityAspectNames(
List.of(
AspectPluginConfig.EntityAspectName.builder()
.entityName(Constants.DATA_PRODUCT_ENTITY_NAME)
.aspectName(Constants.DATA_PRODUCT_PROPERTIES_ASPECT_NAME)
.build()))
.build();
log.info("Initialized {}", SchemaFieldSideEffect.class.getName());
return new DataProductUnsetSideEffect().setConfig(config);
}
}

View File

@ -340,12 +340,6 @@ public class DataProductService {
.filter(urn -> !existingResourceUrns.contains(urn))
.collect(Collectors.toList());
// unset existing data product on resources first as we only allow one data product on an
// entity at a time
for (Urn resourceUrn : resourceUrns) {
unsetDataProduct(opContext, resourceUrn, actorUrn);
}
AuditStamp nowAuditStamp =
new AuditStamp().setTime(System.currentTimeMillis()).setActor(actorUrn);
for (Urn resourceUrn : newResourceUrns) {
@ -390,7 +384,7 @@ public class DataProductService {
10, // should never be more than 1 as long as we only allow one
actorUrn.toString());
if (relationships.hasRelationships() && relationships.getRelationships().size() > 0) {
if (relationships.hasRelationships() && !relationships.getRelationships().isEmpty()) {
relationships
.getRelationships()
.forEach(