mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-04 22:52:54 +00:00
fix(dataproduct): optimize data product sideeffect (#11961)
This commit is contained in:
parent
3b1a8ca926
commit
094433c361
@ -9,6 +9,7 @@ import com.linkedin.common.urn.UrnUtils;
|
||||
import com.linkedin.dataproduct.DataProductAssociation;
|
||||
import com.linkedin.dataproduct.DataProductAssociationArray;
|
||||
import com.linkedin.dataproduct.DataProductProperties;
|
||||
import com.linkedin.events.metadata.ChangeType;
|
||||
import com.linkedin.metadata.aspect.RetrieverContext;
|
||||
import com.linkedin.metadata.aspect.batch.ChangeMCP;
|
||||
import com.linkedin.metadata.aspect.batch.MCLItem;
|
||||
@ -31,6 +32,7 @@ import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import javax.annotation.Nonnull;
|
||||
import lombok.Getter;
|
||||
@ -65,77 +67,108 @@ public class DataProductUnsetSideEffect extends MCPSideEffect {
|
||||
MCLItem mclItem, @Nonnull RetrieverContext retrieverContext) {
|
||||
|
||||
if (DATA_PRODUCT_PROPERTIES_ASPECT_NAME.equals(mclItem.getAspectName())) {
|
||||
List<MCPItem> mcpItems = new ArrayList<>();
|
||||
|
||||
DataProductProperties dataProductProperties = mclItem.getAspect(DataProductProperties.class);
|
||||
if (dataProductProperties == null) {
|
||||
log.error("Unable to process data product properties for urn: {}", mclItem.getUrn());
|
||||
return Stream.empty();
|
||||
}
|
||||
Map<String, List<GenericJsonPatch.PatchOp>> patchOpMap = new HashMap<>();
|
||||
for (DataProductAssociation dataProductAssociation :
|
||||
DataProductAssociationArray newDataProductAssociationArray =
|
||||
Optional.ofNullable(dataProductProperties.getAssets())
|
||||
.orElse(new DataProductAssociationArray())) {
|
||||
RelatedEntitiesScrollResult result =
|
||||
retrieverContext
|
||||
.getGraphRetriever()
|
||||
.scrollRelatedEntities(
|
||||
null,
|
||||
QueryUtils.newFilter(
|
||||
"urn", dataProductAssociation.getDestinationUrn().toString()),
|
||||
null,
|
||||
EMPTY_FILTER,
|
||||
ImmutableList.of("DataProductContains"),
|
||||
QueryUtils.newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING),
|
||||
Collections.emptyList(),
|
||||
null,
|
||||
10, // Should only ever be one, if ever greater than ten will decrease over time
|
||||
// to become consistent
|
||||
null,
|
||||
null);
|
||||
if (!result.getEntities().isEmpty()) {
|
||||
for (RelatedEntities entity : result.getEntities()) {
|
||||
if (!mclItem.getUrn().equals(UrnUtils.getUrn(entity.getSourceUrn()))) {
|
||||
GenericJsonPatch.PatchOp patchOp = new GenericJsonPatch.PatchOp();
|
||||
patchOp.setOp(PatchOperationType.REMOVE.getValue());
|
||||
patchOp.setPath(String.format("/assets/%s", entity.getDestinationUrn()));
|
||||
patchOpMap
|
||||
.computeIfAbsent(entity.getSourceUrn(), urn -> new ArrayList<>())
|
||||
.add(patchOp);
|
||||
}
|
||||
}
|
||||
}
|
||||
.orElse(new DataProductAssociationArray());
|
||||
|
||||
DataProductProperties previousDataProductProperties =
|
||||
mclItem.getPreviousAspect(DataProductProperties.class);
|
||||
|
||||
if (!ChangeType.UPSERT.equals(mclItem.getChangeType())
|
||||
|| previousDataProductProperties == null) {
|
||||
// CREATE/CREATE_ENTITY/RESTATE
|
||||
return generateUnsetMCPs(mclItem, newDataProductAssociationArray, retrieverContext);
|
||||
} else {
|
||||
// UPSERT with previous
|
||||
DataProductAssociationArray oldDataProductAssociationArray =
|
||||
Optional.ofNullable(previousDataProductProperties.getAssets())
|
||||
.orElse(new DataProductAssociationArray());
|
||||
|
||||
DataProductAssociationArray additions =
|
||||
newDataProductAssociationArray.stream()
|
||||
.filter(association -> !oldDataProductAssociationArray.contains(association))
|
||||
.collect(Collectors.toCollection(DataProductAssociationArray::new));
|
||||
|
||||
return generateUnsetMCPs(mclItem, additions, retrieverContext);
|
||||
}
|
||||
for (String urn : patchOpMap.keySet()) {
|
||||
EntitySpec entitySpec =
|
||||
retrieverContext
|
||||
.getAspectRetriever()
|
||||
.getEntityRegistry()
|
||||
.getEntitySpec(DATA_PRODUCT_ENTITY_NAME);
|
||||
mcpItems.add(
|
||||
PatchItemImpl.builder()
|
||||
.urn(UrnUtils.getUrn(urn))
|
||||
.entitySpec(
|
||||
retrieverContext
|
||||
.getAspectRetriever()
|
||||
.getEntityRegistry()
|
||||
.getEntitySpec(DATA_PRODUCT_ENTITY_NAME))
|
||||
.aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME)
|
||||
.aspectSpec(entitySpec.getAspectSpec(DATA_PRODUCT_PROPERTIES_ASPECT_NAME))
|
||||
.patch(
|
||||
GenericJsonPatch.builder()
|
||||
.arrayPrimaryKeys(
|
||||
Map.of(
|
||||
DataProductPropertiesTemplate.ASSETS_FIELD_NAME,
|
||||
List.of(DataProductPropertiesTemplate.KEY_FIELD_NAME)))
|
||||
.patch(patchOpMap.get(urn))
|
||||
.build()
|
||||
.getJsonPatch())
|
||||
.auditStamp(mclItem.getAuditStamp())
|
||||
.systemMetadata(mclItem.getSystemMetadata())
|
||||
.build(retrieverContext.getAspectRetriever().getEntityRegistry()));
|
||||
}
|
||||
return mcpItems.stream();
|
||||
}
|
||||
return Stream.empty();
|
||||
}
|
||||
|
||||
private static Stream<MCPItem> generateUnsetMCPs(
|
||||
@Nonnull MCLItem dataProductItem,
|
||||
@Nonnull DataProductAssociationArray dataProductAssociations,
|
||||
@Nonnull RetrieverContext retrieverContext) {
|
||||
List<MCPItem> mcpItems = new ArrayList<>();
|
||||
Map<String, List<GenericJsonPatch.PatchOp>> patchOpMap = new HashMap<>();
|
||||
|
||||
for (DataProductAssociation dataProductAssociation : dataProductAssociations) {
|
||||
RelatedEntitiesScrollResult result =
|
||||
retrieverContext
|
||||
.getGraphRetriever()
|
||||
.scrollRelatedEntities(
|
||||
null,
|
||||
QueryUtils.newFilter(
|
||||
"urn", dataProductAssociation.getDestinationUrn().toString()),
|
||||
null,
|
||||
EMPTY_FILTER,
|
||||
ImmutableList.of("DataProductContains"),
|
||||
QueryUtils.newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING),
|
||||
Collections.emptyList(),
|
||||
null,
|
||||
10, // Should only ever be one, if ever greater than ten will decrease over time
|
||||
// to become consistent
|
||||
null,
|
||||
null);
|
||||
if (!result.getEntities().isEmpty()) {
|
||||
for (RelatedEntities entity : result.getEntities()) {
|
||||
if (!dataProductItem.getUrn().equals(UrnUtils.getUrn(entity.getSourceUrn()))) {
|
||||
GenericJsonPatch.PatchOp patchOp = new GenericJsonPatch.PatchOp();
|
||||
patchOp.setOp(PatchOperationType.REMOVE.getValue());
|
||||
patchOp.setPath(String.format("/assets/%s", entity.getDestinationUrn()));
|
||||
patchOpMap
|
||||
.computeIfAbsent(entity.getSourceUrn(), urn -> new ArrayList<>())
|
||||
.add(patchOp);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for (String urn : patchOpMap.keySet()) {
|
||||
EntitySpec entitySpec =
|
||||
retrieverContext
|
||||
.getAspectRetriever()
|
||||
.getEntityRegistry()
|
||||
.getEntitySpec(DATA_PRODUCT_ENTITY_NAME);
|
||||
mcpItems.add(
|
||||
PatchItemImpl.builder()
|
||||
.urn(UrnUtils.getUrn(urn))
|
||||
.entitySpec(
|
||||
retrieverContext
|
||||
.getAspectRetriever()
|
||||
.getEntityRegistry()
|
||||
.getEntitySpec(DATA_PRODUCT_ENTITY_NAME))
|
||||
.aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME)
|
||||
.aspectSpec(entitySpec.getAspectSpec(DATA_PRODUCT_PROPERTIES_ASPECT_NAME))
|
||||
.patch(
|
||||
GenericJsonPatch.builder()
|
||||
.arrayPrimaryKeys(
|
||||
Map.of(
|
||||
DataProductPropertiesTemplate.ASSETS_FIELD_NAME,
|
||||
List.of(DataProductPropertiesTemplate.KEY_FIELD_NAME)))
|
||||
.patch(patchOpMap.get(urn))
|
||||
.build()
|
||||
.getJsonPatch())
|
||||
.auditStamp(dataProductItem.getAuditStamp())
|
||||
.systemMetadata(dataProductItem.getSystemMetadata())
|
||||
.build(retrieverContext.getAspectRetriever().getEntityRegistry()));
|
||||
}
|
||||
|
||||
return mcpItems.stream();
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,6 +17,7 @@ import com.linkedin.dataproduct.DataProductProperties;
|
||||
import com.linkedin.events.metadata.ChangeType;
|
||||
import com.linkedin.metadata.aspect.AspectRetriever;
|
||||
import com.linkedin.metadata.aspect.GraphRetriever;
|
||||
import com.linkedin.metadata.aspect.SystemAspect;
|
||||
import com.linkedin.metadata.aspect.batch.MCPItem;
|
||||
import com.linkedin.metadata.aspect.models.graph.RelatedEntities;
|
||||
import com.linkedin.metadata.aspect.models.graph.RelatedEntitiesScrollResult;
|
||||
@ -47,13 +48,7 @@ import org.testng.annotations.Test;
|
||||
public class DataProductUnsetSideEffectTest {
|
||||
private static final EntityRegistry TEST_REGISTRY = new TestEntityRegistry();
|
||||
private static final List<ChangeType> SUPPORTED_CHANGE_TYPES =
|
||||
List.of(
|
||||
ChangeType.CREATE,
|
||||
ChangeType.PATCH,
|
||||
ChangeType.CREATE_ENTITY,
|
||||
ChangeType.UPSERT,
|
||||
ChangeType.DELETE,
|
||||
ChangeType.RESTATE);
|
||||
List.of(ChangeType.CREATE, ChangeType.CREATE_ENTITY, ChangeType.UPSERT, ChangeType.RESTATE);
|
||||
private static final Urn TEST_PRODUCT_URN =
|
||||
UrnUtils.getUrn("urn:li:dataProduct:someDataProductId");
|
||||
|
||||
@ -358,6 +353,109 @@ public class DataProductUnsetSideEffectTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpsertWithPreviousAspect() {
|
||||
DataProductUnsetSideEffect test = new DataProductUnsetSideEffect();
|
||||
test.setConfig(TEST_PLUGIN_CONFIG);
|
||||
|
||||
// Case 1: UPSERT with new additions
|
||||
DataProductProperties previousProperties = new DataProductProperties();
|
||||
DataProductAssociationArray previousAssociations = new DataProductAssociationArray();
|
||||
DataProductAssociation previousAssociation = new DataProductAssociation();
|
||||
previousAssociation.setDestinationUrn(DATASET_URN_1);
|
||||
previousAssociations.add(previousAssociation);
|
||||
previousProperties.setAssets(previousAssociations);
|
||||
|
||||
// New properties include both old and new datasets
|
||||
DataProductProperties newProperties = new DataProductProperties();
|
||||
DataProductAssociationArray newAssociations = new DataProductAssociationArray();
|
||||
DataProductAssociation association1 = new DataProductAssociation();
|
||||
association1.setDestinationUrn(DATASET_URN_1);
|
||||
DataProductAssociation association2 = new DataProductAssociation();
|
||||
association2.setDestinationUrn(DATASET_URN_2);
|
||||
newAssociations.add(association1);
|
||||
newAssociations.add(association2);
|
||||
newProperties.setAssets(newAssociations);
|
||||
|
||||
// Create change item with previous aspect
|
||||
SystemAspect prevData = mock(SystemAspect.class);
|
||||
when(prevData.getRecordTemplate()).thenReturn(previousProperties);
|
||||
|
||||
ChangeItemImpl dataProductPropertiesChangeItem =
|
||||
ChangeItemImpl.builder()
|
||||
.urn(TEST_PRODUCT_URN)
|
||||
.aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME)
|
||||
.changeType(ChangeType.UPSERT)
|
||||
.entitySpec(TEST_REGISTRY.getEntitySpec(DATA_PRODUCT_ENTITY_NAME))
|
||||
.aspectSpec(
|
||||
TEST_REGISTRY
|
||||
.getEntitySpec(DATA_PRODUCT_ENTITY_NAME)
|
||||
.getAspectSpec(DATA_PRODUCT_PROPERTIES_ASPECT_NAME))
|
||||
.recordTemplate(newProperties)
|
||||
.previousSystemAspect(prevData)
|
||||
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
|
||||
.build(mockAspectRetriever);
|
||||
|
||||
List<MCPItem> testOutput =
|
||||
test.postMCPSideEffect(
|
||||
List.of(
|
||||
MCLItemImpl.builder()
|
||||
.build(
|
||||
dataProductPropertiesChangeItem,
|
||||
null,
|
||||
null,
|
||||
retrieverContext.getAspectRetriever())),
|
||||
retrieverContext)
|
||||
.toList();
|
||||
|
||||
// Verify that only one patch is generated for the new dataset
|
||||
assertEquals(
|
||||
testOutput.size(), 1, "Expected removal of previous data product for new dataset only");
|
||||
MCPItem patchItem = testOutput.get(0);
|
||||
assertEquals(
|
||||
patchItem.getUrn(), TEST_PRODUCT_URN_2, "Patch should target the old data product");
|
||||
GenericJsonPatch.PatchOp expectedPatchOp = new GenericJsonPatch.PatchOp();
|
||||
expectedPatchOp.setOp(PatchOperationType.REMOVE.getValue());
|
||||
expectedPatchOp.setPath(String.format("/assets/%s", DATASET_URN_2));
|
||||
|
||||
// Case 2: UPSERT with no new additions
|
||||
DataProductProperties sameProperties = new DataProductProperties();
|
||||
DataProductAssociationArray sameAssociations = new DataProductAssociationArray();
|
||||
DataProductAssociation sameAssociation = new DataProductAssociation();
|
||||
sameAssociation.setDestinationUrn(DATASET_URN_1);
|
||||
sameAssociations.add(sameAssociation);
|
||||
sameProperties.setAssets(sameAssociations);
|
||||
|
||||
SystemAspect prevSameData = mock(SystemAspect.class);
|
||||
when(prevData.getRecordTemplate()).thenReturn(sameProperties);
|
||||
|
||||
ChangeItemImpl noChangeItem =
|
||||
ChangeItemImpl.builder()
|
||||
.urn(TEST_PRODUCT_URN)
|
||||
.aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME)
|
||||
.changeType(ChangeType.UPSERT)
|
||||
.entitySpec(TEST_REGISTRY.getEntitySpec(DATA_PRODUCT_ENTITY_NAME))
|
||||
.aspectSpec(
|
||||
TEST_REGISTRY
|
||||
.getEntitySpec(DATA_PRODUCT_ENTITY_NAME)
|
||||
.getAspectSpec(DATA_PRODUCT_PROPERTIES_ASPECT_NAME))
|
||||
.recordTemplate(sameProperties)
|
||||
.previousSystemAspect(prevSameData)
|
||||
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
|
||||
.build(mockAspectRetriever);
|
||||
|
||||
List<MCPItem> noChangeOutput =
|
||||
test.postMCPSideEffect(
|
||||
List.of(
|
||||
MCLItemImpl.builder()
|
||||
.build(noChangeItem, null, null, retrieverContext.getAspectRetriever())),
|
||||
retrieverContext)
|
||||
.toList();
|
||||
|
||||
// Verify no patches are generated when there are no new additions
|
||||
assertEquals(noChangeOutput.size(), 0, "Expected no changes when assets are the same");
|
||||
}
|
||||
|
||||
private static DataProductProperties getTestDataProductProperties(Urn destinationUrn) {
|
||||
DataProductProperties dataProductProperties = new DataProductProperties();
|
||||
DataProductAssociationArray dataProductAssociations = new DataProductAssociationArray();
|
||||
|
||||
@ -98,8 +98,7 @@ public class SpringStandardPluginConfiguration {
|
||||
AspectPluginConfig.builder()
|
||||
.enabled(true)
|
||||
.className(DataProductUnsetSideEffect.class.getName())
|
||||
.supportedOperations(
|
||||
List.of("CREATE", "CREATE_ENTITY", "UPSERT", "RESTATE", "DELETE", "PATCH"))
|
||||
.supportedOperations(List.of("CREATE", "CREATE_ENTITY", "UPSERT", "RESTATE"))
|
||||
.supportedEntityAspectNames(
|
||||
List.of(
|
||||
AspectPluginConfig.EntityAspectName.builder()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user