mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-27 18:07:57 +00:00
fix(batch-patch): fix patches in batches (#11928)
This commit is contained in:
parent
e7e208fdfe
commit
28cc8caf65
@ -9,6 +9,7 @@ import com.linkedin.mxe.SystemMetadata;
|
||||
import com.linkedin.util.Pair;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
@ -49,7 +50,8 @@ public interface AspectsBatch {
|
||||
* various hooks
|
||||
*/
|
||||
Pair<Map<String, Set<String>>, List<ChangeMCP>> toUpsertBatchItems(
|
||||
Map<String, Map<String, SystemAspect>> latestAspects);
|
||||
Map<String, Map<String, SystemAspect>> latestAspects,
|
||||
Map<String, Map<String, Long>> nextVersions);
|
||||
|
||||
/**
|
||||
* Apply read mutations to batch
|
||||
@ -227,4 +229,39 @@ public interface AspectsBatch {
|
||||
+ StringUtils.abbreviate(itemsAbbreviated.toString(), maxWidth)
|
||||
+ '}';
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment aspect within a batch, tracking both the next aspect version and the most recent
|
||||
*
|
||||
* @param changeMCP changeMCP to be incremented
|
||||
* @param latestAspects lastest aspects within the batch
|
||||
* @param nextVersions next version for the aspects in the batch
|
||||
* @return the incremented changeMCP
|
||||
*/
|
||||
static ChangeMCP incrementBatchVersion(
|
||||
ChangeMCP changeMCP,
|
||||
Map<String, Map<String, SystemAspect>> latestAspects,
|
||||
Map<String, Map<String, Long>> nextVersions) {
|
||||
long nextVersion =
|
||||
nextVersions
|
||||
.getOrDefault(changeMCP.getUrn().toString(), Collections.emptyMap())
|
||||
.getOrDefault(changeMCP.getAspectName(), 0L);
|
||||
|
||||
changeMCP.setPreviousSystemAspect(
|
||||
latestAspects
|
||||
.getOrDefault(changeMCP.getUrn().toString(), Collections.emptyMap())
|
||||
.getOrDefault(changeMCP.getAspectName(), null));
|
||||
|
||||
changeMCP.setNextAspectVersion(nextVersion);
|
||||
|
||||
// support inner-batch upserts
|
||||
latestAspects
|
||||
.computeIfAbsent(changeMCP.getUrn().toString(), key -> new HashMap<>())
|
||||
.put(changeMCP.getAspectName(), changeMCP.getSystemAspect(nextVersion));
|
||||
nextVersions
|
||||
.computeIfAbsent(changeMCP.getUrn().toString(), key -> new HashMap<>())
|
||||
.put(changeMCP.getAspectName(), nextVersion + 1);
|
||||
|
||||
return changeMCP;
|
||||
}
|
||||
}
|
||||
|
||||
@ -47,7 +47,8 @@ public class AspectsBatchImpl implements AspectsBatch {
|
||||
*/
|
||||
@Override
|
||||
public Pair<Map<String, Set<String>>, List<ChangeMCP>> toUpsertBatchItems(
|
||||
final Map<String, Map<String, SystemAspect>> latestAspects) {
|
||||
Map<String, Map<String, SystemAspect>> latestAspects,
|
||||
Map<String, Map<String, Long>> nextVersions) {
|
||||
|
||||
// Process proposals to change items
|
||||
Stream<? extends BatchItem> mutatedProposalsStream =
|
||||
@ -56,6 +57,7 @@ public class AspectsBatchImpl implements AspectsBatch {
|
||||
.filter(item -> item instanceof ProposedItem)
|
||||
.map(item -> (MCPItem) item)
|
||||
.collect(Collectors.toList()));
|
||||
|
||||
// Regular change items
|
||||
Stream<? extends BatchItem> changeMCPStream =
|
||||
items.stream().filter(item -> !(item instanceof ProposedItem));
|
||||
@ -83,10 +85,8 @@ public class AspectsBatchImpl implements AspectsBatch {
|
||||
currentValue, retrieverContext.getAspectRetriever());
|
||||
}
|
||||
|
||||
// Populate old aspect for write hooks
|
||||
upsertItem.setPreviousSystemAspect(latest);
|
||||
|
||||
return upsertItem;
|
||||
return AspectsBatch.incrementBatchVersion(
|
||||
upsertItem, latestAspects, nextVersions);
|
||||
})
|
||||
.collect(Collectors.toCollection(LinkedList::new));
|
||||
|
||||
@ -96,6 +96,7 @@ public class AspectsBatchImpl implements AspectsBatch {
|
||||
LinkedList<ChangeMCP> newItems =
|
||||
applyMCPSideEffects(upsertBatchItems).collect(Collectors.toCollection(LinkedList::new));
|
||||
upsertBatchItems.addAll(newItems);
|
||||
|
||||
Map<String, Set<String>> newUrnAspectNames =
|
||||
getNewUrnAspectsMap(getUrnAspectsMap(), upsertBatchItems);
|
||||
|
||||
|
||||
@ -41,6 +41,7 @@ import com.linkedin.util.Pair;
|
||||
import io.datahubproject.metadata.context.RetrieverContext;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Stream;
|
||||
@ -120,7 +121,7 @@ public class AspectsBatchImplTest {
|
||||
AspectsBatchImpl.builder().items(testItems).retrieverContext(retrieverContext).build();
|
||||
|
||||
assertEquals(
|
||||
testBatch.toUpsertBatchItems(Map.of()),
|
||||
testBatch.toUpsertBatchItems(new HashMap<>(), new HashMap<>()),
|
||||
Pair.of(Map.of(), testItems),
|
||||
"Expected noop, pass through with no additional MCPs or changes");
|
||||
}
|
||||
@ -176,7 +177,7 @@ public class AspectsBatchImplTest {
|
||||
AspectsBatchImpl.builder().items(testItems).retrieverContext(retrieverContext).build();
|
||||
|
||||
assertEquals(
|
||||
testBatch.toUpsertBatchItems(Map.of()),
|
||||
testBatch.toUpsertBatchItems(new HashMap<>(), new HashMap<>()),
|
||||
Pair.of(
|
||||
Map.of(),
|
||||
List.of(
|
||||
@ -195,7 +196,7 @@ public class AspectsBatchImplTest {
|
||||
.recordTemplate(
|
||||
new StructuredProperties()
|
||||
.setProperties(new StructuredPropertyValueAssignmentArray()))
|
||||
.systemMetadata(testItems.get(0).getSystemMetadata())
|
||||
.systemMetadata(testItems.get(0).getSystemMetadata().setVersion("1"))
|
||||
.build(mockAspectRetriever),
|
||||
ChangeItemImpl.builder()
|
||||
.urn(
|
||||
@ -212,7 +213,7 @@ public class AspectsBatchImplTest {
|
||||
.recordTemplate(
|
||||
new StructuredProperties()
|
||||
.setProperties(new StructuredPropertyValueAssignmentArray()))
|
||||
.systemMetadata(testItems.get(1).getSystemMetadata())
|
||||
.systemMetadata(testItems.get(1).getSystemMetadata().setVersion("1"))
|
||||
.build(mockAspectRetriever))),
|
||||
"Expected patch items converted to upsert change items");
|
||||
}
|
||||
@ -264,7 +265,7 @@ public class AspectsBatchImplTest {
|
||||
AspectsBatchImpl.builder().items(testItems).retrieverContext(retrieverContext).build();
|
||||
|
||||
assertEquals(
|
||||
testBatch.toUpsertBatchItems(Map.of()),
|
||||
testBatch.toUpsertBatchItems(new HashMap<>(), new HashMap<>()),
|
||||
Pair.of(
|
||||
Map.of(),
|
||||
List.of(
|
||||
@ -280,7 +281,7 @@ public class AspectsBatchImplTest {
|
||||
.getEntitySpec(DATASET_ENTITY_NAME)
|
||||
.getAspectSpec(STATUS_ASPECT_NAME))
|
||||
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
|
||||
.systemMetadata(testItems.get(0).getSystemMetadata())
|
||||
.systemMetadata(testItems.get(0).getSystemMetadata().setVersion("1"))
|
||||
.recordTemplate(new Status().setRemoved(false))
|
||||
.build(mockAspectRetriever),
|
||||
ChangeItemImpl.builder()
|
||||
@ -295,7 +296,7 @@ public class AspectsBatchImplTest {
|
||||
.getEntitySpec(DATASET_ENTITY_NAME)
|
||||
.getAspectSpec(STATUS_ASPECT_NAME))
|
||||
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
|
||||
.systemMetadata(testItems.get(1).getSystemMetadata())
|
||||
.systemMetadata(testItems.get(1).getSystemMetadata().setVersion("1"))
|
||||
.recordTemplate(new Status().setRemoved(false))
|
||||
.build(mockAspectRetriever))),
|
||||
"Mutation to status aspect");
|
||||
@ -328,7 +329,7 @@ public class AspectsBatchImplTest {
|
||||
.build();
|
||||
|
||||
assertEquals(
|
||||
testBatch.toUpsertBatchItems(Map.of()).getSecond().size(),
|
||||
testBatch.toUpsertBatchItems(new HashMap<>(), new HashMap<>()).getSecond().size(),
|
||||
1,
|
||||
"Expected 1 valid mcp to be passed through.");
|
||||
}
|
||||
|
||||
@ -868,71 +868,64 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
|
||||
// Read before write is unfortunate, however batch it
|
||||
final Map<String, Set<String>> urnAspects = batchWithDefaults.getUrnAspectsMap();
|
||||
// read #1
|
||||
final Map<String, Map<String, SystemAspect>> latestAspects =
|
||||
Map<String, Map<String, EntityAspect>> databaseAspects =
|
||||
aspectDao.getLatestAspects(urnAspects, true);
|
||||
|
||||
final Map<String, Map<String, SystemAspect>> batchAspects =
|
||||
EntityUtils.toSystemAspects(
|
||||
opContext.getRetrieverContext().get(),
|
||||
aspectDao.getLatestAspects(urnAspects, true));
|
||||
opContext.getRetrieverContext().get(), databaseAspects);
|
||||
|
||||
// read #2 (potentially)
|
||||
final Map<String, Map<String, Long>> nextVersions =
|
||||
EntityUtils.calculateNextVersions(
|
||||
txContext, aspectDao, latestAspects, urnAspects);
|
||||
EntityUtils.calculateNextVersions(txContext, aspectDao, batchAspects, urnAspects);
|
||||
|
||||
// 1. Convert patches to full upserts
|
||||
// 2. Run any entity/aspect level hooks
|
||||
Pair<Map<String, Set<String>>, List<ChangeMCP>> updatedItems =
|
||||
batchWithDefaults.toUpsertBatchItems(latestAspects);
|
||||
batchWithDefaults.toUpsertBatchItems(batchAspects, nextVersions);
|
||||
|
||||
// Fetch additional information if needed
|
||||
final Map<String, Map<String, SystemAspect>> updatedLatestAspects;
|
||||
final Map<String, Map<String, Long>> updatedNextVersions;
|
||||
final List<ChangeMCP> changeMCPs;
|
||||
|
||||
if (!updatedItems.getFirst().isEmpty()) {
|
||||
// These items are new items from side effects
|
||||
Map<String, Set<String>> sideEffects = updatedItems.getFirst();
|
||||
|
||||
final Map<String, Map<String, SystemAspect>> updatedLatestAspects;
|
||||
final Map<String, Map<String, Long>> updatedNextVersions;
|
||||
|
||||
Map<String, Map<String, SystemAspect>> newLatestAspects =
|
||||
EntityUtils.toSystemAspects(
|
||||
opContext.getRetrieverContext().get(),
|
||||
aspectDao.getLatestAspects(updatedItems.getFirst(), true));
|
||||
// merge
|
||||
updatedLatestAspects = AspectsBatch.merge(latestAspects, newLatestAspects);
|
||||
updatedLatestAspects = AspectsBatch.merge(batchAspects, newLatestAspects);
|
||||
|
||||
Map<String, Map<String, Long>> newNextVersions =
|
||||
EntityUtils.calculateNextVersions(
|
||||
txContext, aspectDao, updatedLatestAspects, updatedItems.getFirst());
|
||||
// merge
|
||||
updatedNextVersions = AspectsBatch.merge(nextVersions, newNextVersions);
|
||||
|
||||
changeMCPs =
|
||||
updatedItems.getSecond().stream()
|
||||
.peek(
|
||||
changeMCP -> {
|
||||
// Add previous version to each side-effect
|
||||
if (sideEffects
|
||||
.getOrDefault(
|
||||
changeMCP.getUrn().toString(), Collections.emptySet())
|
||||
.contains(changeMCP.getAspectName())) {
|
||||
|
||||
AspectsBatch.incrementBatchVersion(
|
||||
changeMCP, updatedLatestAspects, updatedNextVersions);
|
||||
}
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
} else {
|
||||
updatedLatestAspects = latestAspects;
|
||||
updatedNextVersions = nextVersions;
|
||||
changeMCPs = updatedItems.getSecond();
|
||||
}
|
||||
|
||||
// Add previous version to each upsert
|
||||
List<ChangeMCP> changeMCPs =
|
||||
updatedItems.getSecond().stream()
|
||||
.peek(
|
||||
changeMCP -> {
|
||||
String urnStr = changeMCP.getUrn().toString();
|
||||
long nextVersion =
|
||||
updatedNextVersions
|
||||
.getOrDefault(urnStr, Map.of())
|
||||
.getOrDefault(changeMCP.getAspectName(), 0L);
|
||||
|
||||
changeMCP.setPreviousSystemAspect(
|
||||
updatedLatestAspects
|
||||
.getOrDefault(urnStr, Map.of())
|
||||
.getOrDefault(changeMCP.getAspectName(), null));
|
||||
|
||||
changeMCP.setNextAspectVersion(nextVersion);
|
||||
|
||||
// support inner-batch upserts
|
||||
updatedLatestAspects
|
||||
.computeIfAbsent(urnStr, key -> new HashMap<>())
|
||||
.put(
|
||||
changeMCP.getAspectName(),
|
||||
changeMCP.getSystemAspect(nextVersion));
|
||||
updatedNextVersions
|
||||
.computeIfAbsent(urnStr, key -> new HashMap<>())
|
||||
.put(changeMCP.getAspectName(), nextVersion + 1);
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// No changes, return
|
||||
if (changeMCPs.isEmpty()) {
|
||||
return Collections.<UpdateAspectResult>emptyList();
|
||||
@ -954,40 +947,50 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
|
||||
List<UpdateAspectResult> upsertResults =
|
||||
changeMCPs.stream()
|
||||
.map(
|
||||
item -> {
|
||||
final EntityAspect.EntitySystemAspect latest =
|
||||
(EntityAspect.EntitySystemAspect) item.getPreviousSystemAspect();
|
||||
writeItem -> {
|
||||
|
||||
/*
|
||||
database*Aspect - should be used for comparisons of before batch operation information
|
||||
*/
|
||||
final EntityAspect databaseAspect =
|
||||
databaseAspects
|
||||
.getOrDefault(writeItem.getUrn().toString(), Map.of())
|
||||
.get(writeItem.getAspectName());
|
||||
final EntityAspect.EntitySystemAspect databaseSystemAspect =
|
||||
databaseAspect == null
|
||||
? null
|
||||
: EntityAspect.EntitySystemAspect.builder()
|
||||
.build(
|
||||
writeItem.getEntitySpec(),
|
||||
writeItem.getAspectSpec(),
|
||||
databaseAspect);
|
||||
|
||||
final UpdateAspectResult result;
|
||||
if (overwrite || latest == null) {
|
||||
/*
|
||||
This condition is specifically for an older conditional write ingestAspectIfNotPresent()
|
||||
overwrite is always true otherwise
|
||||
*/
|
||||
if (overwrite || databaseAspect == null) {
|
||||
result =
|
||||
ingestAspectToLocalDB(
|
||||
txContext,
|
||||
item.getUrn(),
|
||||
item.getAspectName(),
|
||||
item.getRecordTemplate(),
|
||||
item.getAuditStamp(),
|
||||
item.getSystemMetadata(),
|
||||
latest == null ? null : latest,
|
||||
item.getNextAspectVersion())
|
||||
ingestAspectToLocalDB(txContext, writeItem, databaseSystemAspect)
|
||||
.toBuilder()
|
||||
.request(item)
|
||||
.request(writeItem)
|
||||
.build();
|
||||
|
||||
} else {
|
||||
RecordTemplate oldValue = latest.getRecordTemplate();
|
||||
SystemMetadata oldMetadata = latest.getSystemMetadata();
|
||||
RecordTemplate oldValue = databaseSystemAspect.getRecordTemplate();
|
||||
SystemMetadata oldMetadata = databaseSystemAspect.getSystemMetadata();
|
||||
result =
|
||||
UpdateAspectResult.<ChangeItemImpl>builder()
|
||||
.urn(item.getUrn())
|
||||
.request(item)
|
||||
.urn(writeItem.getUrn())
|
||||
.request(writeItem)
|
||||
.oldValue(oldValue)
|
||||
.newValue(oldValue)
|
||||
.oldSystemMetadata(oldMetadata)
|
||||
.newSystemMetadata(oldMetadata)
|
||||
.operation(MetadataAuditOperation.UPDATE)
|
||||
.auditStamp(item.getAuditStamp())
|
||||
.maxVersion(latest.getVersion())
|
||||
.auditStamp(writeItem.getAuditStamp())
|
||||
.maxVersion(databaseAspect.getVersion())
|
||||
.build();
|
||||
}
|
||||
|
||||
@ -1011,8 +1014,8 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
|
||||
// Only consider retention when there was a previous version
|
||||
.filter(
|
||||
result ->
|
||||
latestAspects.containsKey(result.getUrn().toString())
|
||||
&& latestAspects
|
||||
batchAspects.containsKey(result.getUrn().toString())
|
||||
&& batchAspects
|
||||
.get(result.getUrn().toString())
|
||||
.containsKey(result.getRequest().getAspectName()))
|
||||
.filter(
|
||||
@ -1102,9 +1105,11 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
|
||||
* @param auditStamp an {@link AuditStamp} containing metadata about the writer & current time
|
||||
* @param systemMetadata
|
||||
* @return the {@link RecordTemplate} representation of the written aspect object
|
||||
* @deprecated See Conditional Write ChangeType CREATE
|
||||
*/
|
||||
@Nullable
|
||||
@Override
|
||||
@Deprecated
|
||||
public RecordTemplate ingestAspectIfNotPresent(
|
||||
@Nonnull OperationContext opContext,
|
||||
@Nonnull Urn urn,
|
||||
@ -2495,87 +2500,107 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
|
||||
((EntityAspect.EntitySystemAspect) systemAspect).toEnvelopedAspects()));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param txContext Transaction context, keeps track of retries, exceptions etc.
|
||||
* @param writeItem The aspect being written
|
||||
* @param databaseAspect The aspect as it exists in the database.
|
||||
* @return result object
|
||||
*/
|
||||
@Nonnull
|
||||
private UpdateAspectResult ingestAspectToLocalDB(
|
||||
@Nullable TransactionContext txContext,
|
||||
@Nonnull final Urn urn,
|
||||
@Nonnull final String aspectName,
|
||||
@Nonnull final RecordTemplate newValue,
|
||||
@Nonnull final AuditStamp auditStamp,
|
||||
@Nonnull final SystemMetadata providedSystemMetadata,
|
||||
@Nullable final EntityAspect.EntitySystemAspect latest,
|
||||
@Nonnull final Long nextVersion) {
|
||||
@Nonnull final ChangeMCP writeItem,
|
||||
@Nullable final EntityAspect.EntitySystemAspect databaseAspect) {
|
||||
|
||||
// Set the "last run id" to be the run id provided with the new system metadata. This will be
|
||||
// stored in index
|
||||
// for all aspects that have a run id, regardless of whether they change.
|
||||
providedSystemMetadata.setLastRunId(
|
||||
providedSystemMetadata.getRunId(GetMode.NULL), SetMode.IGNORE_NULL);
|
||||
writeItem
|
||||
.getSystemMetadata()
|
||||
.setLastRunId(writeItem.getSystemMetadata().getRunId(GetMode.NULL), SetMode.IGNORE_NULL);
|
||||
|
||||
// 2. Compare the latest existing and new.
|
||||
final RecordTemplate oldValue = latest == null ? null : latest.getRecordTemplate();
|
||||
final EntityAspect.EntitySystemAspect previousBatchAspect =
|
||||
(EntityAspect.EntitySystemAspect) writeItem.getPreviousSystemAspect();
|
||||
final RecordTemplate previousValue =
|
||||
previousBatchAspect == null ? null : previousBatchAspect.getRecordTemplate();
|
||||
|
||||
// 3. If there is no difference between existing and new, we just update
|
||||
// the lastObserved in system metadata. RunId should stay as the original runId
|
||||
if (oldValue != null && DataTemplateUtil.areEqual(oldValue, newValue)) {
|
||||
SystemMetadata latestSystemMetadata = latest.getSystemMetadata();
|
||||
latestSystemMetadata.setLastObserved(providedSystemMetadata.getLastObserved());
|
||||
if (previousValue != null
|
||||
&& DataTemplateUtil.areEqual(previousValue, writeItem.getRecordTemplate())) {
|
||||
|
||||
SystemMetadata latestSystemMetadata = previousBatchAspect.getSystemMetadata();
|
||||
latestSystemMetadata.setLastObserved(writeItem.getSystemMetadata().getLastObserved());
|
||||
latestSystemMetadata.setLastRunId(
|
||||
providedSystemMetadata.getLastRunId(GetMode.NULL), SetMode.IGNORE_NULL);
|
||||
writeItem.getSystemMetadata().getLastRunId(GetMode.NULL), SetMode.IGNORE_NULL);
|
||||
|
||||
latest.getEntityAspect().setSystemMetadata(RecordUtils.toJsonString(latestSystemMetadata));
|
||||
previousBatchAspect
|
||||
.getEntityAspect()
|
||||
.setSystemMetadata(RecordUtils.toJsonString(latestSystemMetadata));
|
||||
|
||||
log.info("Ingesting aspect with name {}, urn {}", aspectName, urn);
|
||||
aspectDao.saveAspect(txContext, latest.getEntityAspect(), false);
|
||||
log.info(
|
||||
"Ingesting aspect with name {}, urn {}",
|
||||
previousBatchAspect.getAspectName(),
|
||||
previousBatchAspect.getUrn());
|
||||
aspectDao.saveAspect(txContext, previousBatchAspect.getEntityAspect(), false);
|
||||
|
||||
// metrics
|
||||
aspectDao.incrementWriteMetrics(
|
||||
aspectName, 1, latest.getMetadataRaw().getBytes(StandardCharsets.UTF_8).length);
|
||||
previousBatchAspect.getAspectName(),
|
||||
1,
|
||||
previousBatchAspect.getMetadataRaw().getBytes(StandardCharsets.UTF_8).length);
|
||||
|
||||
return UpdateAspectResult.builder()
|
||||
.urn(urn)
|
||||
.oldValue(oldValue)
|
||||
.newValue(oldValue)
|
||||
.oldSystemMetadata(latest.getSystemMetadata())
|
||||
.urn(writeItem.getUrn())
|
||||
.oldValue(previousValue)
|
||||
.newValue(previousValue)
|
||||
.oldSystemMetadata(previousBatchAspect.getSystemMetadata())
|
||||
.newSystemMetadata(latestSystemMetadata)
|
||||
.operation(MetadataAuditOperation.UPDATE)
|
||||
.auditStamp(auditStamp)
|
||||
.auditStamp(writeItem.getAuditStamp())
|
||||
.maxVersion(0)
|
||||
.build();
|
||||
}
|
||||
|
||||
// 4. Save the newValue as the latest version
|
||||
log.debug("Ingesting aspect with name {}, urn {}", aspectName, urn);
|
||||
String newValueStr = EntityApiUtils.toJsonAspect(newValue);
|
||||
log.debug(
|
||||
"Ingesting aspect with name {}, urn {}", writeItem.getAspectName(), writeItem.getUrn());
|
||||
String newValueStr = EntityApiUtils.toJsonAspect(writeItem.getRecordTemplate());
|
||||
long versionOfOld =
|
||||
aspectDao.saveLatestAspect(
|
||||
txContext,
|
||||
urn.toString(),
|
||||
aspectName,
|
||||
latest == null ? null : EntityApiUtils.toJsonAspect(oldValue),
|
||||
latest == null ? null : latest.getCreatedBy(),
|
||||
latest == null ? null : latest.getEntityAspect().getCreatedFor(),
|
||||
latest == null ? null : latest.getCreatedOn(),
|
||||
latest == null ? null : latest.getSystemMetadataRaw(),
|
||||
writeItem.getUrn().toString(),
|
||||
writeItem.getAspectName(),
|
||||
previousBatchAspect == null ? null : EntityApiUtils.toJsonAspect(previousValue),
|
||||
previousBatchAspect == null ? null : previousBatchAspect.getCreatedBy(),
|
||||
previousBatchAspect == null
|
||||
? null
|
||||
: previousBatchAspect.getEntityAspect().getCreatedFor(),
|
||||
previousBatchAspect == null ? null : previousBatchAspect.getCreatedOn(),
|
||||
previousBatchAspect == null ? null : previousBatchAspect.getSystemMetadataRaw(),
|
||||
newValueStr,
|
||||
auditStamp.getActor().toString(),
|
||||
auditStamp.hasImpersonator() ? auditStamp.getImpersonator().toString() : null,
|
||||
new Timestamp(auditStamp.getTime()),
|
||||
EntityApiUtils.toJsonAspect(providedSystemMetadata),
|
||||
nextVersion);
|
||||
writeItem.getAuditStamp().getActor().toString(),
|
||||
writeItem.getAuditStamp().hasImpersonator()
|
||||
? writeItem.getAuditStamp().getImpersonator().toString()
|
||||
: null,
|
||||
new Timestamp(writeItem.getAuditStamp().getTime()),
|
||||
EntityApiUtils.toJsonAspect(writeItem.getSystemMetadata()),
|
||||
writeItem.getNextAspectVersion());
|
||||
|
||||
// metrics
|
||||
aspectDao.incrementWriteMetrics(
|
||||
aspectName, 1, newValueStr.getBytes(StandardCharsets.UTF_8).length);
|
||||
writeItem.getAspectName(), 1, newValueStr.getBytes(StandardCharsets.UTF_8).length);
|
||||
|
||||
return UpdateAspectResult.builder()
|
||||
.urn(urn)
|
||||
.oldValue(oldValue)
|
||||
.newValue(newValue)
|
||||
.oldSystemMetadata(latest == null ? null : latest.getSystemMetadata())
|
||||
.newSystemMetadata(providedSystemMetadata)
|
||||
.urn(writeItem.getUrn())
|
||||
.oldValue(previousValue)
|
||||
.newValue(writeItem.getRecordTemplate())
|
||||
.oldSystemMetadata(
|
||||
previousBatchAspect == null ? null : previousBatchAspect.getSystemMetadata())
|
||||
.newSystemMetadata(writeItem.getSystemMetadata())
|
||||
.operation(MetadataAuditOperation.UPDATE)
|
||||
.auditStamp(auditStamp)
|
||||
.auditStamp(writeItem.getAuditStamp())
|
||||
.maxVersion(versionOfOld)
|
||||
.build();
|
||||
}
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
package com.linkedin.metadata.entity;
|
||||
|
||||
import static com.linkedin.metadata.Constants.CORP_USER_ENTITY_NAME;
|
||||
import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME;
|
||||
import static com.linkedin.metadata.Constants.GLOBAL_TAGS_ASPECT_NAME;
|
||||
import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.testng.Assert.assertEquals;
|
||||
@ -8,7 +10,11 @@ import static org.testng.Assert.assertNotNull;
|
||||
import static org.testng.Assert.assertTrue;
|
||||
|
||||
import com.linkedin.common.AuditStamp;
|
||||
import com.linkedin.common.GlobalTags;
|
||||
import com.linkedin.common.Status;
|
||||
import com.linkedin.common.TagAssociation;
|
||||
import com.linkedin.common.TagAssociationArray;
|
||||
import com.linkedin.common.urn.TagUrn;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.common.urn.UrnUtils;
|
||||
import com.linkedin.data.template.DataTemplateUtil;
|
||||
@ -18,17 +24,21 @@ import com.linkedin.identity.CorpUserInfo;
|
||||
import com.linkedin.metadata.AspectGenerationUtils;
|
||||
import com.linkedin.metadata.Constants;
|
||||
import com.linkedin.metadata.EbeanTestUtils;
|
||||
import com.linkedin.metadata.aspect.patch.GenericJsonPatch;
|
||||
import com.linkedin.metadata.aspect.patch.PatchOperationType;
|
||||
import com.linkedin.metadata.config.EbeanConfiguration;
|
||||
import com.linkedin.metadata.config.PreProcessHooks;
|
||||
import com.linkedin.metadata.entity.ebean.EbeanAspectDao;
|
||||
import com.linkedin.metadata.entity.ebean.EbeanRetentionService;
|
||||
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
|
||||
import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl;
|
||||
import com.linkedin.metadata.entity.ebean.batch.PatchItemImpl;
|
||||
import com.linkedin.metadata.event.EventProducer;
|
||||
import com.linkedin.metadata.key.CorpUserKey;
|
||||
import com.linkedin.metadata.models.registry.EntityRegistryException;
|
||||
import com.linkedin.metadata.query.ListUrnsResult;
|
||||
import com.linkedin.metadata.service.UpdateIndicesService;
|
||||
import com.linkedin.metadata.utils.AuditStampUtils;
|
||||
import com.linkedin.metadata.utils.PegasusUtils;
|
||||
import com.linkedin.mxe.MetadataChangeProposal;
|
||||
import com.linkedin.mxe.SystemMetadata;
|
||||
@ -433,6 +443,220 @@ public class EbeanEntityServiceTest
|
||||
"Expected 2nd item to be the latest");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBatchPatchWithTrailingNoOp() throws Exception {
|
||||
Urn entityUrn =
|
||||
UrnUtils.getUrn(
|
||||
"urn:li:dataset:(urn:li:dataPlatform:snowflake,testBatchPatchWithTrailingNoOp,PROD)");
|
||||
TagUrn tag1 = TagUrn.createFromString("urn:li:tag:tag1");
|
||||
Urn tag2 = UrnUtils.getUrn("urn:li:tag:tag2");
|
||||
Urn tagOther = UrnUtils.getUrn("urn:li:tag:other");
|
||||
|
||||
SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata();
|
||||
|
||||
ChangeItemImpl initialAspectTag1 =
|
||||
ChangeItemImpl.builder()
|
||||
.urn(entityUrn)
|
||||
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
|
||||
.recordTemplate(
|
||||
new GlobalTags()
|
||||
.setTags(new TagAssociationArray(new TagAssociation().setTag(tag1))))
|
||||
.systemMetadata(systemMetadata.copy())
|
||||
.auditStamp(TEST_AUDIT_STAMP)
|
||||
.build(TestOperationContexts.emptyAspectRetriever(null));
|
||||
|
||||
PatchItemImpl patchAdd2 =
|
||||
PatchItemImpl.builder()
|
||||
.urn(entityUrn)
|
||||
.entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
|
||||
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
|
||||
.aspectSpec(
|
||||
_testEntityRegistry
|
||||
.getEntitySpec(DATASET_ENTITY_NAME)
|
||||
.getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
|
||||
.patch(
|
||||
GenericJsonPatch.builder()
|
||||
.arrayPrimaryKeys(Map.of("properties", List.of("tag")))
|
||||
.patch(List.of(tagPatchOp(PatchOperationType.ADD, tag2)))
|
||||
.build()
|
||||
.getJsonPatch())
|
||||
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
|
||||
.build(_testEntityRegistry);
|
||||
|
||||
PatchItemImpl patchRemoveNonExistent =
|
||||
PatchItemImpl.builder()
|
||||
.urn(entityUrn)
|
||||
.entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
|
||||
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
|
||||
.aspectSpec(
|
||||
_testEntityRegistry
|
||||
.getEntitySpec(DATASET_ENTITY_NAME)
|
||||
.getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
|
||||
.patch(
|
||||
GenericJsonPatch.builder()
|
||||
.arrayPrimaryKeys(Map.of("properties", List.of("tag")))
|
||||
.patch(List.of(tagPatchOp(PatchOperationType.REMOVE, tagOther)))
|
||||
.build()
|
||||
.getJsonPatch())
|
||||
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
|
||||
.build(_testEntityRegistry);
|
||||
|
||||
// establish base entity
|
||||
_entityServiceImpl.ingestAspects(
|
||||
opContext,
|
||||
AspectsBatchImpl.builder()
|
||||
.retrieverContext(opContext.getRetrieverContext().get())
|
||||
.items(List.of(initialAspectTag1))
|
||||
.build(),
|
||||
false,
|
||||
true);
|
||||
|
||||
_entityServiceImpl.ingestAspects(
|
||||
opContext,
|
||||
AspectsBatchImpl.builder()
|
||||
.retrieverContext(opContext.getRetrieverContext().get())
|
||||
.items(List.of(patchAdd2, patchRemoveNonExistent))
|
||||
.build(),
|
||||
false,
|
||||
true);
|
||||
|
||||
// List aspects urns
|
||||
ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 1);
|
||||
|
||||
assertEquals(batch.getStart().intValue(), 0);
|
||||
assertEquals(batch.getCount().intValue(), 1);
|
||||
assertEquals(batch.getTotal().intValue(), 1);
|
||||
assertEquals(batch.getEntities().size(), 1);
|
||||
assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString());
|
||||
|
||||
EnvelopedAspect envelopedAspect =
|
||||
_entityServiceImpl.getLatestEnvelopedAspect(
|
||||
opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME);
|
||||
assertEquals(
|
||||
envelopedAspect.getSystemMetadata().getVersion(),
|
||||
"2",
|
||||
"Expected version 2. 1 - Initial, + 1 batch operation (1 add, 1 remove)");
|
||||
assertEquals(
|
||||
new GlobalTags(envelopedAspect.getValue().data())
|
||||
.getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()),
|
||||
Set.of(tag1, tag2),
|
||||
"Expected both tags");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBatchPatchAdd() throws Exception {
|
||||
Urn entityUrn =
|
||||
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:snowflake,testBatchPatchAdd,PROD)");
|
||||
TagUrn tag1 = TagUrn.createFromString("urn:li:tag:tag1");
|
||||
TagUrn tag2 = TagUrn.createFromString("urn:li:tag:tag2");
|
||||
TagUrn tag3 = TagUrn.createFromString("urn:li:tag:tag3");
|
||||
|
||||
SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata();
|
||||
|
||||
ChangeItemImpl initialAspectTag1 =
|
||||
ChangeItemImpl.builder()
|
||||
.urn(entityUrn)
|
||||
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
|
||||
.recordTemplate(
|
||||
new GlobalTags()
|
||||
.setTags(new TagAssociationArray(new TagAssociation().setTag(tag1))))
|
||||
.systemMetadata(systemMetadata.copy())
|
||||
.auditStamp(TEST_AUDIT_STAMP)
|
||||
.build(TestOperationContexts.emptyAspectRetriever(null));
|
||||
|
||||
PatchItemImpl patchAdd3 =
|
||||
PatchItemImpl.builder()
|
||||
.urn(entityUrn)
|
||||
.entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
|
||||
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
|
||||
.aspectSpec(
|
||||
_testEntityRegistry
|
||||
.getEntitySpec(DATASET_ENTITY_NAME)
|
||||
.getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
|
||||
.patch(
|
||||
GenericJsonPatch.builder()
|
||||
.arrayPrimaryKeys(Map.of("properties", List.of("tag")))
|
||||
.patch(List.of(tagPatchOp(PatchOperationType.ADD, tag3)))
|
||||
.build()
|
||||
.getJsonPatch())
|
||||
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
|
||||
.build(_testEntityRegistry);
|
||||
|
||||
PatchItemImpl patchAdd2 =
|
||||
PatchItemImpl.builder()
|
||||
.urn(entityUrn)
|
||||
.entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
|
||||
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
|
||||
.aspectSpec(
|
||||
_testEntityRegistry
|
||||
.getEntitySpec(DATASET_ENTITY_NAME)
|
||||
.getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
|
||||
.patch(
|
||||
GenericJsonPatch.builder()
|
||||
.arrayPrimaryKeys(Map.of("properties", List.of("tag")))
|
||||
.patch(List.of(tagPatchOp(PatchOperationType.ADD, tag2)))
|
||||
.build()
|
||||
.getJsonPatch())
|
||||
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
|
||||
.build(_testEntityRegistry);
|
||||
|
||||
PatchItemImpl patchAdd1 =
|
||||
PatchItemImpl.builder()
|
||||
.urn(entityUrn)
|
||||
.entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
|
||||
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
|
||||
.aspectSpec(
|
||||
_testEntityRegistry
|
||||
.getEntitySpec(DATASET_ENTITY_NAME)
|
||||
.getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
|
||||
.patch(
|
||||
GenericJsonPatch.builder()
|
||||
.arrayPrimaryKeys(Map.of("properties", List.of("tag")))
|
||||
.patch(List.of(tagPatchOp(PatchOperationType.ADD, tag1)))
|
||||
.build()
|
||||
.getJsonPatch())
|
||||
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
|
||||
.build(_testEntityRegistry);
|
||||
|
||||
// establish base entity
|
||||
_entityServiceImpl.ingestAspects(
|
||||
opContext,
|
||||
AspectsBatchImpl.builder()
|
||||
.retrieverContext(opContext.getRetrieverContext().get())
|
||||
.items(List.of(initialAspectTag1))
|
||||
.build(),
|
||||
false,
|
||||
true);
|
||||
|
||||
_entityServiceImpl.ingestAspects(
|
||||
opContext,
|
||||
AspectsBatchImpl.builder()
|
||||
.retrieverContext(opContext.getRetrieverContext().get())
|
||||
.items(List.of(patchAdd3, patchAdd2, patchAdd1))
|
||||
.build(),
|
||||
false,
|
||||
true);
|
||||
|
||||
// List aspects urns
|
||||
ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 1);
|
||||
|
||||
assertEquals(batch.getStart().intValue(), 0);
|
||||
assertEquals(batch.getCount().intValue(), 1);
|
||||
assertEquals(batch.getTotal().intValue(), 1);
|
||||
assertEquals(batch.getEntities().size(), 1);
|
||||
assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString());
|
||||
|
||||
EnvelopedAspect envelopedAspect =
|
||||
_entityServiceImpl.getLatestEnvelopedAspect(
|
||||
opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME);
|
||||
assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "3", "Expected version 3");
|
||||
assertEquals(
|
||||
new GlobalTags(envelopedAspect.getValue().data())
|
||||
.getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()),
|
||||
Set.of(tag1, tag2, tag3),
|
||||
"Expected all tags");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void dataGeneratorThreadingTest() {
|
||||
DataGenerator dataGenerator = new DataGenerator(opContext, _entityServiceImpl);
|
||||
@ -659,4 +883,14 @@ public class EbeanEntityServiceTest
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static GenericJsonPatch.PatchOp tagPatchOp(PatchOperationType op, Urn tagUrn) {
|
||||
GenericJsonPatch.PatchOp patchOp = new GenericJsonPatch.PatchOp();
|
||||
patchOp.setOp(op.getValue());
|
||||
patchOp.setPath(String.format("/tags/%s", tagUrn));
|
||||
if (PatchOperationType.ADD.equals(op)) {
|
||||
patchOp.setValue(Map.of("tag", tagUrn.toString()));
|
||||
}
|
||||
return patchOp;
|
||||
}
|
||||
}
|
||||
|
||||
@ -50,10 +50,8 @@ import com.linkedin.metadata.entity.validation.ValidationException;
|
||||
import com.linkedin.metadata.event.EventProducer;
|
||||
import com.linkedin.metadata.key.CorpUserKey;
|
||||
import com.linkedin.metadata.models.AspectSpec;
|
||||
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
|
||||
import com.linkedin.metadata.models.registry.EntityRegistry;
|
||||
import com.linkedin.metadata.models.registry.EntityRegistryException;
|
||||
import com.linkedin.metadata.models.registry.MergedEntityRegistry;
|
||||
import com.linkedin.metadata.run.AspectRowSummary;
|
||||
import com.linkedin.metadata.service.UpdateIndicesService;
|
||||
import com.linkedin.metadata.snapshot.CorpUserSnapshot;
|
||||
@ -75,6 +73,7 @@ import com.linkedin.structured.StructuredPropertyValueAssignment;
|
||||
import com.linkedin.structured.StructuredPropertyValueAssignmentArray;
|
||||
import com.linkedin.util.Pair;
|
||||
import io.datahubproject.metadata.context.OperationContext;
|
||||
import io.datahubproject.test.metadata.context.TestOperationContexts;
|
||||
import jakarta.annotation.Nonnull;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
@ -113,18 +112,13 @@ public abstract class EntityServiceTest<T_AD extends AspectDao, T_RS extends Ret
|
||||
protected T_RS _retentionService;
|
||||
|
||||
protected static final AuditStamp TEST_AUDIT_STAMP = AspectGenerationUtils.createAuditStamp();
|
||||
protected final EntityRegistry _snapshotEntityRegistry = new TestEntityRegistry();
|
||||
protected final EntityRegistry _configEntityRegistry =
|
||||
new ConfigEntityRegistry(
|
||||
Snapshot.class.getClassLoader().getResourceAsStream("entity-registry.yml"));
|
||||
protected final EntityRegistry _testEntityRegistry =
|
||||
new MergedEntityRegistry(_snapshotEntityRegistry).apply(_configEntityRegistry);
|
||||
protected OperationContext opContext = TestOperationContexts.systemContextNoSearchAuthorization();
|
||||
protected final EntityRegistry _testEntityRegistry = opContext.getEntityRegistry();
|
||||
protected EventProducer _mockProducer;
|
||||
protected UpdateIndicesService _mockUpdateIndicesService;
|
||||
|
||||
protected OperationContext opContext;
|
||||
protected final AspectSpec structuredPropertiesDefinitionAspect =
|
||||
_configEntityRegistry.getAspectSpecs().get(STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME);
|
||||
_testEntityRegistry.getAspectSpecs().get(STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME);
|
||||
|
||||
protected EntityServiceTest() throws EntityRegistryException {}
|
||||
|
||||
|
||||
@ -363,7 +363,9 @@ public interface EntityService<U extends ChangeMCP> {
|
||||
* @param auditStamp an {@link AuditStamp} containing metadata about the writer & current time
|
||||
* @param systemMetadata
|
||||
* @return the {@link RecordTemplate} representation of the written aspect object
|
||||
* @deprecated See Conditional Write ChangeType CREATE
|
||||
*/
|
||||
@Deprecated
|
||||
RecordTemplate ingestAspectIfNotPresent(
|
||||
@Nonnull OperationContext opContext,
|
||||
@Nonnull Urn urn,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user