fix(entity-service): handle no-op system-metadata batches (#12055)

This commit is contained in:
david-leifker 2024-12-09 23:15:07 -06:00 committed by GitHub
parent 638a0e370e
commit 0a2ac70d38
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 924 additions and 482 deletions

View File

@ -28,10 +28,12 @@ import org.apache.commons.lang3.StringUtils;
public interface AspectsBatch {
Collection<? extends BatchItem> getItems();
Collection<? extends BatchItem> getInitialItems();
RetrieverContext getRetrieverContext();
/**
* Returns MCP items. Could be patch, upsert, etc.
* Returns MCP items. Could be one of patch, upsert, etc.
*
* @return batch items
*/
@ -160,13 +162,24 @@ public interface AspectsBatch {
}
default boolean containsDuplicateAspects() {
return getItems().stream()
.map(i -> String.format("%s_%s", i.getClass().getName(), i.hashCode()))
return getInitialItems().stream()
.map(i -> String.format("%s_%s", i.getClass().getSimpleName(), i.hashCode()))
.distinct()
.count()
!= getItems().size();
}
default Map<String, List<? extends BatchItem>> duplicateAspects() {
return getInitialItems().stream()
.collect(
Collectors.groupingBy(
i -> String.format("%s_%s", i.getClass().getSimpleName(), i.hashCode())))
.entrySet()
.stream()
.filter(entry -> entry.getValue() != null && entry.getValue().size() > 1)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
default Map<String, Set<String>> getUrnAspectsMap() {
return getItems().stream()
.map(aspect -> Pair.of(aspect.getUrn().toString(), aspect.getAspectName()))

View File

@ -23,4 +23,11 @@ public interface BatchItem extends ReadItem {
*/
@Nonnull
ChangeType getChangeType();
/**
* Determines if this item is a duplicate of another item in terms of the operation it represents
* to the database.Each implementation can define what constitutes a duplicate based on its
* specific fields which are persisted.
*/
boolean isDatabaseDuplicateOf(BatchItem other);
}

View File

@ -4,10 +4,12 @@ import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.MCLItem;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.mxe.MetadataChangeLog;
import java.util.Objects;
import javax.annotation.Nonnull;
import lombok.Builder;
import lombok.Getter;
@ -29,4 +31,23 @@ public class TestMCL implements MCLItem {
public String getAspectName() {
return getAspectSpec().getName();
}
@Override
public boolean isDatabaseDuplicateOf(BatchItem other) {
return equals(other);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TestMCL testMCL = (TestMCL) o;
return Objects.equals(metadataChangeLog, testMCL.metadataChangeLog);
}
@Override
public int hashCode() {
return Objects.hashCode(metadataChangeLog);
}
}

View File

@ -6,6 +6,7 @@ import static org.mockito.Mockito.when;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.DataTemplateUtil;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.ReadItem;
@ -21,6 +22,7 @@ import com.linkedin.test.metadata.aspect.TestEntityRegistry;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -140,4 +142,40 @@ public class TestMCP implements ChangeMCP {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
.orElse(headers);
}
@Override
public boolean isDatabaseDuplicateOf(BatchItem other) {
return equals(other);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TestMCP testMCP = (TestMCP) o;
return urn.equals(testMCP.urn)
&& DataTemplateUtil.areEqual(recordTemplate, testMCP.recordTemplate)
&& Objects.equals(systemAspect, testMCP.systemAspect)
&& Objects.equals(previousSystemAspect, testMCP.previousSystemAspect)
&& Objects.equals(auditStamp, testMCP.auditStamp)
&& Objects.equals(changeType, testMCP.changeType)
&& Objects.equals(metadataChangeProposal, testMCP.metadataChangeProposal);
}
@Override
public int hashCode() {
int result = urn.hashCode();
result = 31 * result + Objects.hashCode(recordTemplate);
result = 31 * result + Objects.hashCode(systemAspect);
result = 31 * result + Objects.hashCode(previousSystemAspect);
result = 31 * result + Objects.hashCode(auditStamp);
result = 31 * result + Objects.hashCode(changeType);
result = 31 * result + Objects.hashCode(metadataChangeProposal);
return result;
}
}

View File

@ -52,6 +52,26 @@ public class EntityAspect {
private String createdFor;
@Override
public String toString() {
return "EntityAspect{"
+ "urn='"
+ urn
+ '\''
+ ", aspect='"
+ aspect
+ '\''
+ ", version="
+ version
+ ", metadata='"
+ metadata
+ '\''
+ ", systemMetadata='"
+ systemMetadata
+ '\''
+ '}';
}
/**
* Provide a typed EntityAspect without breaking the existing public contract with generic types.
*/
@ -144,6 +164,11 @@ public class EntityAspect {
return envelopedAspect;
}
@Override
public String toString() {
return entityAspect.toString();
}
public static class EntitySystemAspectBuilder {
private EntityAspect.EntitySystemAspect build() {

View File

@ -1,6 +1,7 @@
package com.linkedin.metadata.entity.ebean.batch;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.AspectRetriever;
@ -15,7 +16,9 @@ import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollec
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.util.Pair;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -29,12 +32,23 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Getter
@Builder(toBuilder = true)
public class AspectsBatchImpl implements AspectsBatch {
@Nonnull private final Collection<? extends BatchItem> items;
@Nonnull private final RetrieverContext retrieverContext;
@Nonnull private final Collection<? extends BatchItem> nonRepeatedItems;
@Getter @Nonnull private final RetrieverContext retrieverContext;
@Override
@Nonnull
public Collection<? extends BatchItem> getItems() {
return nonRepeatedItems;
}
@Override
public Collection<? extends BatchItem> getInitialItems() {
return items;
}
/**
* Convert patches to upserts, apply hooks at the aspect and batch level.
@ -207,14 +221,32 @@ public class AspectsBatchImpl implements AspectsBatch {
return this;
}
private static <T extends BatchItem> List<T> filterRepeats(Collection<T> items) {
List<T> result = new ArrayList<>();
Map<Pair<Urn, String>, T> last = new HashMap<>();
for (T item : items) {
Pair<Urn, String> urnAspect = Pair.of(item.getUrn(), item.getAspectName());
// Check if this item is a duplicate of the previous
if (!last.containsKey(urnAspect) || !item.isDatabaseDuplicateOf(last.get(urnAspect))) {
result.add(item);
}
last.put(urnAspect, item);
}
return result;
}
public AspectsBatchImpl build() {
this.nonRepeatedItems = filterRepeats(this.items);
ValidationExceptionCollection exceptions =
AspectsBatch.validateProposed(this.items, this.retrieverContext);
AspectsBatch.validateProposed(this.nonRepeatedItems, this.retrieverContext);
if (!exceptions.isEmpty()) {
throw new IllegalArgumentException("Failed to validate MCP due to: " + exceptions);
}
return new AspectsBatchImpl(this.items, this.retrieverContext);
return new AspectsBatchImpl(this.items, this.nonRepeatedItems, this.retrieverContext);
}
}

View File

@ -3,11 +3,13 @@ package com.linkedin.metadata.entity.ebean.batch;
import com.datahub.util.exception.ModelConversionException;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.DataTemplateUtil;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.StringMap;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.AspectRetriever;
import com.linkedin.metadata.aspect.SystemAspect;
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.patch.template.common.GenericPatchTemplate;
@ -269,6 +271,11 @@ public class ChangeItemImpl implements ChangeMCP {
}
}
@Override
public boolean isDatabaseDuplicateOf(BatchItem other) {
return equals(other);
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -280,13 +287,15 @@ public class ChangeItemImpl implements ChangeMCP {
ChangeItemImpl that = (ChangeItemImpl) o;
return urn.equals(that.urn)
&& aspectName.equals(that.aspectName)
&& changeType.equals(that.changeType)
&& Objects.equals(systemMetadata, that.systemMetadata)
&& recordTemplate.equals(that.recordTemplate);
&& Objects.equals(auditStamp, that.auditStamp)
&& DataTemplateUtil.areEqual(recordTemplate, that.recordTemplate);
}
@Override
public int hashCode() {
return Objects.hash(urn, aspectName, systemMetadata, recordTemplate);
return Objects.hash(urn, aspectName, changeType, systemMetadata, auditStamp, recordTemplate);
}
@Override

View File

@ -6,6 +6,7 @@ import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.AspectRetriever;
import com.linkedin.metadata.aspect.SystemAspect;
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.entity.EntityApiUtils;
import com.linkedin.metadata.entity.EntityAspect;
@ -115,6 +116,11 @@ public class DeleteItemImpl implements ChangeMCP {
}
}
@Override
public boolean isDatabaseDuplicateOf(BatchItem other) {
return equals(other);
}
@Override
public boolean equals(Object o) {
if (this == o) {

View File

@ -5,6 +5,7 @@ import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.AspectRetriever;
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.MCLItem;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.entity.AspectUtils;
@ -158,6 +159,11 @@ public class MCLItemImpl implements MCLItem {
}
}
@Override
public boolean isDatabaseDuplicateOf(BatchItem other) {
return equals(other);
}
@Override
public boolean equals(Object o) {
if (this == o) {

View File

@ -14,6 +14,7 @@ import com.linkedin.data.ByteString;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.AspectRetriever;
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.batch.PatchMCP;
import com.linkedin.metadata.aspect.patch.template.AspectTemplateEngine;
@ -216,6 +217,11 @@ public class PatchItemImpl implements PatchMCP {
}
}
@Override
public boolean isDatabaseDuplicateOf(BatchItem other) {
return equals(other);
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -228,12 +234,13 @@ public class PatchItemImpl implements PatchMCP {
return urn.equals(that.urn)
&& aspectName.equals(that.aspectName)
&& Objects.equals(systemMetadata, that.systemMetadata)
&& auditStamp.equals(that.auditStamp)
&& patch.equals(that.patch);
}
@Override
public int hashCode() {
return Objects.hash(urn, aspectName, systemMetadata, patch);
return Objects.hash(urn, aspectName, systemMetadata, auditStamp, patch);
}
@Override

View File

@ -4,6 +4,7 @@ import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
@ -86,6 +87,32 @@ public class ProposedItem implements MCPItem {
return metadataChangeProposal.getChangeType();
}
@Override
public boolean isDatabaseDuplicateOf(BatchItem other) {
return equals(other);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ProposedItem that = (ProposedItem) o;
return metadataChangeProposal.equals(that.metadataChangeProposal)
&& auditStamp.equals(that.auditStamp);
}
@Override
public int hashCode() {
int result = metadataChangeProposal.hashCode();
result = 31 * result + auditStamp.hashCode();
return result;
}
public static class ProposedItemBuilder {
public ProposedItem build() {
// Ensure systemMetadata

View File

@ -6,6 +6,7 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import com.google.common.collect.ImmutableList;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.FabricType;
import com.linkedin.common.Status;
import com.linkedin.common.urn.DataPlatformUrn;
@ -220,6 +221,7 @@ public class AspectsBatchImplTest {
@Test
public void toUpsertBatchItemsProposedItemTest() {
AuditStamp auditStamp = AuditStampUtils.createDefaultAuditStamp();
List<ProposedItem> testItems =
List.of(
ProposedItem.builder()
@ -239,7 +241,7 @@ public class AspectsBatchImplTest {
ByteString.copyString(
"{\"foo\":\"bar\"}", StandardCharsets.UTF_8)))
.setSystemMetadata(new SystemMetadata()))
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
.auditStamp(auditStamp)
.build(),
ProposedItem.builder()
.entitySpec(testRegistry.getEntitySpec(DATASET_ENTITY_NAME))
@ -258,7 +260,7 @@ public class AspectsBatchImplTest {
ByteString.copyString(
"{\"foo\":\"bar\"}", StandardCharsets.UTF_8)))
.setSystemMetadata(new SystemMetadata()))
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
.auditStamp(auditStamp)
.build());
AspectsBatchImpl testBatch =
@ -280,7 +282,7 @@ public class AspectsBatchImplTest {
testRegistry
.getEntitySpec(DATASET_ENTITY_NAME)
.getAspectSpec(STATUS_ASPECT_NAME))
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
.auditStamp(auditStamp)
.systemMetadata(testItems.get(0).getSystemMetadata().setVersion("1"))
.recordTemplate(new Status().setRemoved(false))
.build(mockAspectRetriever),
@ -295,7 +297,7 @@ public class AspectsBatchImplTest {
testRegistry
.getEntitySpec(DATASET_ENTITY_NAME)
.getAspectSpec(STATUS_ASPECT_NAME))
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
.auditStamp(auditStamp)
.systemMetadata(testItems.get(1).getSystemMetadata().setVersion("1"))
.recordTemplate(new Status().setRemoved(false))
.build(mockAspectRetriever))),

View File

@ -854,7 +854,7 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
boolean overwrite) {
if (inputBatch.containsDuplicateAspects()) {
log.warn(String.format("Batch contains duplicates: %s", inputBatch));
log.warn("Batch contains duplicates: {}", inputBatch.duplicateAspects());
MetricUtils.counter(EntityServiceImpl.class, "batch_with_duplicate").inc();
}
@ -968,39 +968,20 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
writeItem.getAspectSpec(),
databaseAspect);
final UpdateAspectResult result;
/*
This condition is specifically for an older conditional write ingestAspectIfNotPresent()
overwrite is always true otherwise
*/
if (overwrite || databaseAspect == null) {
result =
Optional.ofNullable(
ingestAspectToLocalDB(
txContext, writeItem, databaseSystemAspect))
.map(
optResult ->
optResult.toBuilder().request(writeItem).build())
.orElse(null);
} else {
RecordTemplate oldValue = databaseSystemAspect.getRecordTemplate();
SystemMetadata oldMetadata = databaseSystemAspect.getSystemMetadata();
result =
UpdateAspectResult.<ChangeItemImpl>builder()
.urn(writeItem.getUrn())
.request(writeItem)
.oldValue(oldValue)
.newValue(oldValue)
.oldSystemMetadata(oldMetadata)
.newSystemMetadata(oldMetadata)
.operation(MetadataAuditOperation.UPDATE)
.auditStamp(writeItem.getAuditStamp())
.maxVersion(databaseAspect.getVersion())
.build();
return Optional.ofNullable(
ingestAspectToLocalDB(
txContext, writeItem, databaseSystemAspect))
.map(
optResult -> optResult.toBuilder().request(writeItem).build())
.orElse(null);
}
return result;
return null;
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
@ -1051,7 +1032,8 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
}
} else {
MetricUtils.counter(EntityServiceImpl.class, "batch_empty_transaction").inc();
log.warn("Empty transaction detected. {}", inputBatch);
// This includes no-op batches. i.e. patch removing non-existent items
log.debug("Empty transaction detected");
}
return upsertResults;
@ -1150,7 +1132,7 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
.build();
List<UpdateAspectResult> ingested = ingestAspects(opContext, aspectsBatch, true, false);
return ingested.stream().findFirst().get().getNewValue();
return ingested.stream().findFirst().map(UpdateAspectResult::getNewValue).orElse(null);
}
/**
@ -2525,6 +2507,14 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
@Nonnull final ChangeMCP writeItem,
@Nullable final EntityAspect.EntitySystemAspect databaseAspect) {
if (writeItem.getRecordTemplate() == null) {
log.error(
"Unexpected write of null aspect with name {}, urn {}",
writeItem.getAspectName(),
writeItem.getUrn());
return null;
}
// Set the "last run id" to be the run id provided with the new system metadata. This will be
// stored in index
// for all aspects that have a run id, regardless of whether they change.
@ -2533,9 +2523,6 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
.setLastRunId(writeItem.getSystemMetadata().getRunId(GetMode.NULL), SetMode.IGNORE_NULL);
// 2. Compare the latest existing and new.
final RecordTemplate databaseValue =
databaseAspect == null ? null : databaseAspect.getRecordTemplate();
final EntityAspect.EntitySystemAspect previousBatchAspect =
(EntityAspect.EntitySystemAspect) writeItem.getPreviousSystemAspect();
final RecordTemplate previousValue =
@ -2544,45 +2531,86 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
// 3. If there is no difference between existing and new, we just update
// the lastObserved in system metadata. RunId should stay as the original runId
if (previousValue != null
&& DataTemplateUtil.areEqual(databaseValue, writeItem.getRecordTemplate())) {
&& DataTemplateUtil.areEqual(previousValue, writeItem.getRecordTemplate())) {
SystemMetadata latestSystemMetadata = previousBatchAspect.getSystemMetadata();
latestSystemMetadata.setLastObserved(writeItem.getSystemMetadata().getLastObserved());
latestSystemMetadata.setLastRunId(
writeItem.getSystemMetadata().getLastRunId(GetMode.NULL), SetMode.IGNORE_NULL);
Optional<SystemMetadata> latestSystemMetadataDiff =
systemMetadataDiff(
txContext,
previousBatchAspect.getSystemMetadata(),
writeItem.getSystemMetadata(),
databaseAspect == null ? null : databaseAspect.getSystemMetadata());
previousBatchAspect
.getEntityAspect()
.setSystemMetadata(RecordUtils.toJsonString(latestSystemMetadata));
if (latestSystemMetadataDiff.isPresent()) {
// Update previous version since that is what is re-written
previousBatchAspect
.getEntityAspect()
.setSystemMetadata(RecordUtils.toJsonString(latestSystemMetadataDiff.get()));
log.info(
"Ingesting aspect with name {}, urn {}",
previousBatchAspect.getAspectName(),
previousBatchAspect.getUrn());
aspectDao.saveAspect(txContext, previousBatchAspect.getEntityAspect(), false);
// Inserts & update order is not guaranteed, flush the insert for potential updates within
// same tx
if (databaseAspect == null && txContext != null) {
conditionalLogLevel(
txContext,
String.format(
"Flushing for systemMetadata update aspect with name %s, urn %s",
writeItem.getAspectName(), writeItem.getUrn()));
txContext.flush();
}
// metrics
aspectDao.incrementWriteMetrics(
previousBatchAspect.getAspectName(),
1,
previousBatchAspect.getMetadataRaw().getBytes(StandardCharsets.UTF_8).length);
conditionalLogLevel(
txContext,
String.format(
"Update aspect with name %s, urn %s, txContext: %s, databaseAspect: %s, newAspect: %s",
previousBatchAspect.getAspectName(),
previousBatchAspect.getUrn(),
txContext != null,
databaseAspect == null ? null : databaseAspect.getEntityAspect(),
previousBatchAspect.getEntityAspect()));
aspectDao.saveAspect(txContext, previousBatchAspect.getEntityAspect(), false);
return UpdateAspectResult.builder()
.urn(writeItem.getUrn())
.oldValue(previousValue)
.newValue(previousValue)
.oldSystemMetadata(previousBatchAspect.getSystemMetadata())
.newSystemMetadata(latestSystemMetadata)
.operation(MetadataAuditOperation.UPDATE)
.auditStamp(writeItem.getAuditStamp())
.maxVersion(0)
.build();
// metrics
aspectDao.incrementWriteMetrics(
previousBatchAspect.getAspectName(),
1,
previousBatchAspect.getMetadataRaw().getBytes(StandardCharsets.UTF_8).length);
return UpdateAspectResult.builder()
.urn(writeItem.getUrn())
.oldValue(previousValue)
.newValue(previousValue)
.oldSystemMetadata(previousBatchAspect.getSystemMetadata())
.newSystemMetadata(latestSystemMetadataDiff.get())
.operation(MetadataAuditOperation.UPDATE)
.auditStamp(writeItem.getAuditStamp())
.maxVersion(0)
.build();
} else {
MetricUtils.counter(EntityServiceImpl.class, "batch_with_noop_sysmetadata").inc();
return null;
}
}
// 4. Save the newValue as the latest version
if (!DataTemplateUtil.areEqual(databaseValue, writeItem.getRecordTemplate())) {
log.debug(
"Ingesting aspect with name {}, urn {}", writeItem.getAspectName(), writeItem.getUrn());
if (writeItem.getRecordTemplate() != null
&& !DataTemplateUtil.areEqual(previousValue, writeItem.getRecordTemplate())) {
conditionalLogLevel(
txContext,
String.format(
"Insert aspect with name %s, urn %s", writeItem.getAspectName(), writeItem.getUrn()));
// Inserts & update order is not guaranteed, flush the insert for potential updates within
// same tx
if (databaseAspect == null
&& !ASPECT_LATEST_VERSION.equals(writeItem.getNextAspectVersion())
&& txContext != null) {
conditionalLogLevel(
txContext,
String.format(
"Flushing for update aspect with name %s, urn %s",
writeItem.getAspectName(), writeItem.getUrn()));
txContext.flush();
}
String newValueStr = EntityApiUtils.toJsonAspect(writeItem.getRecordTemplate());
long versionOfOld =
aspectDao.saveLatestAspect(
@ -2630,4 +2658,41 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
aspectSpec.getRelationshipFieldSpecs();
return relationshipFieldSpecs.stream().anyMatch(RelationshipFieldSpec::isLineageRelationship);
}
private static Optional<SystemMetadata> systemMetadataDiff(
@Nullable TransactionContext txContext,
@Nullable SystemMetadata previous,
@Nonnull SystemMetadata current,
@Nullable SystemMetadata database) {
SystemMetadata latestSystemMetadata = GenericRecordUtils.copy(previous, SystemMetadata.class);
latestSystemMetadata.setLastRunId(previous.getRunId(), SetMode.REMOVE_IF_NULL);
latestSystemMetadata.setLastObserved(current.getLastObserved(), SetMode.IGNORE_NULL);
latestSystemMetadata.setRunId(current.getRunId(), SetMode.REMOVE_IF_NULL);
if (!DataTemplateUtil.areEqual(latestSystemMetadata, previous)
&& !DataTemplateUtil.areEqual(latestSystemMetadata, database)) {
conditionalLogLevel(
txContext,
String.format(
"systemMetdataDiff: %s != %s AND %s",
RecordUtils.toJsonString(latestSystemMetadata),
previous == null ? null : RecordUtils.toJsonString(previous),
database == null ? null : RecordUtils.toJsonString(database)));
return Optional.of(latestSystemMetadata);
}
return Optional.empty();
}
private static void conditionalLogLevel(@Nullable TransactionContext txContext, String message) {
if (txContext != null && txContext.getFailedAttempts() > 1) {
log.warn(message);
} else {
log.debug(message);
}
}
}

View File

@ -66,4 +66,10 @@ public class TransactionContext {
}
success();
}
public void flush() {
if (tx != null) {
tx.flush();
}
}
}

View File

@ -590,7 +590,7 @@ public class CassandraAspectDao implements AspectDao, AspectMigrationsDao {
// Save oldValue as the largest version + 1
long largestVersion = ASPECT_LATEST_VERSION;
BatchStatement batch = BatchStatement.newInstance(BatchType.UNLOGGED);
if (oldAspectMetadata != null && oldTime != null) {
if (!ASPECT_LATEST_VERSION.equals(nextVersion) && oldTime != null) {
largestVersion = nextVersion;
final EntityAspect aspect =
new EntityAspect(
@ -616,7 +616,7 @@ public class CassandraAspectDao implements AspectDao, AspectMigrationsDao {
newTime,
newActor,
newImpersonator);
batch = batch.add(generateSaveStatement(aspect, oldAspectMetadata == null));
batch = batch.add(generateSaveStatement(aspect, ASPECT_LATEST_VERSION.equals(nextVersion)));
_cqlSession.execute(batch);
return largestVersion;
}

View File

@ -165,7 +165,7 @@ public class EbeanAspectDao implements AspectDao, AspectMigrationsDao {
}
// Save oldValue as the largest version + 1
long largestVersion = ASPECT_LATEST_VERSION;
if (oldAspectMetadata != null && oldTime != null) {
if (!ASPECT_LATEST_VERSION.equals(nextVersion) && oldTime != null) {
largestVersion = nextVersion;
saveAspect(
txContext,
@ -191,7 +191,7 @@ public class EbeanAspectDao implements AspectDao, AspectMigrationsDao {
newTime,
newSystemMetadata,
ASPECT_LATEST_VERSION,
oldAspectMetadata == null);
ASPECT_LATEST_VERSION.equals(nextVersion));
return largestVersion;
}

View File

@ -34,19 +34,19 @@ public class AspectGenerationUtils {
}
@Nonnull
public static SystemMetadata createSystemMetadata(long nextAspectVersion) {
public static SystemMetadata createSystemMetadata(int nextAspectVersion) {
return createSystemMetadata(
1625792689, "run-123", "run-123", String.valueOf(nextAspectVersion));
}
@Nonnull
public static SystemMetadata createSystemMetadata(long lastObserved, @Nonnull String runId) {
public static SystemMetadata createSystemMetadata(int lastObserved, @Nonnull String runId) {
return createSystemMetadata(lastObserved, runId, runId, null);
}
@Nonnull
public static SystemMetadata createSystemMetadata(
long lastObserved,
int lastObserved, // for test comparison must be int
@Nonnull String runId,
@Nonnull String lastRunId,
@Nullable String version) {

View File

@ -1,10 +1,6 @@
package com.linkedin.metadata.entity;
import static com.linkedin.metadata.Constants.APP_SOURCE;
import static com.linkedin.metadata.Constants.CORP_USER_ENTITY_NAME;
import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME;
import static com.linkedin.metadata.Constants.GLOBAL_TAGS_ASPECT_NAME;
import static com.linkedin.metadata.Constants.METADATA_TESTS_SOURCE;
import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
@ -12,36 +8,27 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.GlobalTags;
import com.linkedin.common.Status;
import com.linkedin.common.TagAssociation;
import com.linkedin.common.TagAssociationArray;
import com.linkedin.common.urn.TagUrn;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.DataTemplateUtil;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.StringMap;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.identity.CorpUserInfo;
import com.linkedin.metadata.AspectGenerationUtils;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.EbeanTestUtils;
import com.linkedin.metadata.aspect.patch.GenericJsonPatch;
import com.linkedin.metadata.aspect.patch.PatchOperationType;
import com.linkedin.metadata.config.EbeanConfiguration;
import com.linkedin.metadata.config.PreProcessHooks;
import com.linkedin.metadata.entity.ebean.EbeanAspectDao;
import com.linkedin.metadata.entity.ebean.EbeanRetentionService;
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl;
import com.linkedin.metadata.entity.ebean.batch.PatchItemImpl;
import com.linkedin.metadata.event.EventProducer;
import com.linkedin.metadata.key.CorpUserKey;
import com.linkedin.metadata.models.registry.EntityRegistryException;
import com.linkedin.metadata.query.ListUrnsResult;
import com.linkedin.metadata.service.UpdateIndicesService;
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.metadata.utils.PegasusUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.SystemMetadata;
@ -64,7 +51,6 @@ import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Triple;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
@ -396,360 +382,6 @@ public class EbeanEntityServiceTest
"Expected version 0 with systemMeta version 3 accounting for the the collision");
}
@Test
public void testBatchDuplicate() throws Exception {
Urn entityUrn = UrnUtils.getUrn("urn:li:corpuser:batchDuplicateTest");
SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata();
ChangeItemImpl item1 =
ChangeItemImpl.builder()
.urn(entityUrn)
.aspectName(STATUS_ASPECT_NAME)
.recordTemplate(new Status().setRemoved(true))
.systemMetadata(systemMetadata.copy())
.auditStamp(TEST_AUDIT_STAMP)
.build(TestOperationContexts.emptyAspectRetriever(null));
ChangeItemImpl item2 =
ChangeItemImpl.builder()
.urn(entityUrn)
.aspectName(STATUS_ASPECT_NAME)
.recordTemplate(new Status().setRemoved(false))
.systemMetadata(systemMetadata.copy())
.auditStamp(TEST_AUDIT_STAMP)
.build(TestOperationContexts.emptyAspectRetriever(null));
_entityServiceImpl.ingestAspects(
opContext,
AspectsBatchImpl.builder()
.retrieverContext(opContext.getRetrieverContext().get())
.items(List.of(item1, item2))
.build(),
false,
true);
// List aspects urns
ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 2);
assertEquals(batch.getStart().intValue(), 0);
assertEquals(batch.getCount().intValue(), 1);
assertEquals(batch.getTotal().intValue(), 1);
assertEquals(batch.getEntities().size(), 1);
assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString());
EnvelopedAspect envelopedAspect =
_entityServiceImpl.getLatestEnvelopedAspect(
opContext, CORP_USER_ENTITY_NAME, entityUrn, STATUS_ASPECT_NAME);
assertEquals(
envelopedAspect.getSystemMetadata().getVersion(),
"2",
"Expected version 2 accounting for duplicates");
assertEquals(
envelopedAspect.getValue().toString(),
"{removed=false}",
"Expected 2nd item to be the latest");
}
@Test
public void testBatchPatchWithTrailingNoOp() throws Exception {
Urn entityUrn =
UrnUtils.getUrn(
"urn:li:dataset:(urn:li:dataPlatform:snowflake,testBatchPatchWithTrailingNoOp,PROD)");
TagUrn tag1 = TagUrn.createFromString("urn:li:tag:tag1");
Urn tag2 = UrnUtils.getUrn("urn:li:tag:tag2");
Urn tagOther = UrnUtils.getUrn("urn:li:tag:other");
SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata();
ChangeItemImpl initialAspectTag1 =
ChangeItemImpl.builder()
.urn(entityUrn)
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
.recordTemplate(
new GlobalTags()
.setTags(new TagAssociationArray(new TagAssociation().setTag(tag1))))
.systemMetadata(systemMetadata.copy())
.auditStamp(TEST_AUDIT_STAMP)
.build(TestOperationContexts.emptyAspectRetriever(null));
PatchItemImpl patchAdd2 =
PatchItemImpl.builder()
.urn(entityUrn)
.entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
.aspectSpec(
_testEntityRegistry
.getEntitySpec(DATASET_ENTITY_NAME)
.getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
.patch(
GenericJsonPatch.builder()
.arrayPrimaryKeys(Map.of("properties", List.of("tag")))
.patch(List.of(tagPatchOp(PatchOperationType.ADD, tag2)))
.build()
.getJsonPatch())
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
.build(_testEntityRegistry);
PatchItemImpl patchRemoveNonExistent =
PatchItemImpl.builder()
.urn(entityUrn)
.entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
.aspectSpec(
_testEntityRegistry
.getEntitySpec(DATASET_ENTITY_NAME)
.getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
.patch(
GenericJsonPatch.builder()
.arrayPrimaryKeys(Map.of("properties", List.of("tag")))
.patch(List.of(tagPatchOp(PatchOperationType.REMOVE, tagOther)))
.build()
.getJsonPatch())
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
.build(_testEntityRegistry);
// establish base entity
_entityServiceImpl.ingestAspects(
opContext,
AspectsBatchImpl.builder()
.retrieverContext(opContext.getRetrieverContext().get())
.items(List.of(initialAspectTag1))
.build(),
false,
true);
_entityServiceImpl.ingestAspects(
opContext,
AspectsBatchImpl.builder()
.retrieverContext(opContext.getRetrieverContext().get())
.items(List.of(patchAdd2, patchRemoveNonExistent))
.build(),
false,
true);
// List aspects urns
ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 1);
assertEquals(batch.getStart().intValue(), 0);
assertEquals(batch.getCount().intValue(), 1);
assertEquals(batch.getTotal().intValue(), 1);
assertEquals(batch.getEntities().size(), 1);
assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString());
EnvelopedAspect envelopedAspect =
_entityServiceImpl.getLatestEnvelopedAspect(
opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME);
assertEquals(
envelopedAspect.getSystemMetadata().getVersion(),
"3",
"Expected version 3. 1 - Initial, + 1 add, 1 remove");
assertEquals(
new GlobalTags(envelopedAspect.getValue().data())
.getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()),
Set.of(tag1, tag2),
"Expected both tags");
}
@Test
public void testBatchPatchAdd() throws Exception {
Urn entityUrn =
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:snowflake,testBatchPatchAdd,PROD)");
TagUrn tag1 = TagUrn.createFromString("urn:li:tag:tag1");
TagUrn tag2 = TagUrn.createFromString("urn:li:tag:tag2");
TagUrn tag3 = TagUrn.createFromString("urn:li:tag:tag3");
SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata();
ChangeItemImpl initialAspectTag1 =
ChangeItemImpl.builder()
.urn(entityUrn)
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
.recordTemplate(
new GlobalTags()
.setTags(new TagAssociationArray(new TagAssociation().setTag(tag1))))
.systemMetadata(systemMetadata.copy())
.auditStamp(TEST_AUDIT_STAMP)
.build(TestOperationContexts.emptyAspectRetriever(null));
PatchItemImpl patchAdd3 =
PatchItemImpl.builder()
.urn(entityUrn)
.entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
.aspectSpec(
_testEntityRegistry
.getEntitySpec(DATASET_ENTITY_NAME)
.getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
.patch(
GenericJsonPatch.builder()
.arrayPrimaryKeys(Map.of("properties", List.of("tag")))
.patch(List.of(tagPatchOp(PatchOperationType.ADD, tag3)))
.build()
.getJsonPatch())
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
.build(_testEntityRegistry);
PatchItemImpl patchAdd2 =
PatchItemImpl.builder()
.urn(entityUrn)
.entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
.aspectSpec(
_testEntityRegistry
.getEntitySpec(DATASET_ENTITY_NAME)
.getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
.patch(
GenericJsonPatch.builder()
.arrayPrimaryKeys(Map.of("properties", List.of("tag")))
.patch(List.of(tagPatchOp(PatchOperationType.ADD, tag2)))
.build()
.getJsonPatch())
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
.build(_testEntityRegistry);
PatchItemImpl patchAdd1 =
PatchItemImpl.builder()
.urn(entityUrn)
.entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
.aspectSpec(
_testEntityRegistry
.getEntitySpec(DATASET_ENTITY_NAME)
.getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
.patch(
GenericJsonPatch.builder()
.arrayPrimaryKeys(Map.of("properties", List.of("tag")))
.patch(List.of(tagPatchOp(PatchOperationType.ADD, tag1)))
.build()
.getJsonPatch())
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
.build(_testEntityRegistry);
// establish base entity
_entityServiceImpl.ingestAspects(
opContext,
AspectsBatchImpl.builder()
.retrieverContext(opContext.getRetrieverContext().get())
.items(List.of(initialAspectTag1))
.build(),
false,
true);
_entityServiceImpl.ingestAspects(
opContext,
AspectsBatchImpl.builder()
.retrieverContext(opContext.getRetrieverContext().get())
.items(List.of(patchAdd3, patchAdd2, patchAdd1))
.build(),
false,
true);
// List aspects urns
ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 1);
assertEquals(batch.getStart().intValue(), 0);
assertEquals(batch.getCount().intValue(), 1);
assertEquals(batch.getTotal().intValue(), 1);
assertEquals(batch.getEntities().size(), 1);
assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString());
EnvelopedAspect envelopedAspect =
_entityServiceImpl.getLatestEnvelopedAspect(
opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME);
assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "4", "Expected version 4");
assertEquals(
new GlobalTags(envelopedAspect.getValue().data())
.getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()),
Set.of(tag1, tag2, tag3),
"Expected all tags");
}
@Test
public void testBatchPatchAddDuplicate() throws Exception {
Urn entityUrn =
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:snowflake,testBatchPatchAdd,PROD)");
List<TagAssociation> initialTags =
List.of(
TagUrn.createFromString("urn:li:tag:__default_large_table"),
TagUrn.createFromString("urn:li:tag:__default_low_queries"),
TagUrn.createFromString("urn:li:tag:__default_low_changes"),
TagUrn.createFromString("urn:li:tag:!10TB+ tables"))
.stream()
.map(tag -> new TagAssociation().setTag(tag))
.collect(Collectors.toList());
TagUrn tag2 = TagUrn.createFromString("urn:li:tag:$ 1TB+");
SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata();
SystemMetadata patchSystemMetadata = new SystemMetadata();
patchSystemMetadata.setLastObserved(systemMetadata.getLastObserved() + 1);
patchSystemMetadata.setProperties(new StringMap(Map.of(APP_SOURCE, METADATA_TESTS_SOURCE)));
ChangeItemImpl initialAspectTag1 =
ChangeItemImpl.builder()
.urn(entityUrn)
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
.recordTemplate(new GlobalTags().setTags(new TagAssociationArray(initialTags)))
.systemMetadata(systemMetadata.copy())
.auditStamp(TEST_AUDIT_STAMP)
.build(TestOperationContexts.emptyAspectRetriever(null));
PatchItemImpl patchAdd2 =
PatchItemImpl.builder()
.urn(entityUrn)
.entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
.aspectSpec(
_testEntityRegistry
.getEntitySpec(DATASET_ENTITY_NAME)
.getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
.patch(
GenericJsonPatch.builder()
.arrayPrimaryKeys(Map.of("properties", List.of("tag")))
.patch(List.of(tagPatchOp(PatchOperationType.ADD, tag2)))
.build()
.getJsonPatch())
.systemMetadata(patchSystemMetadata)
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
.build(_testEntityRegistry);
// establish base entity
_entityServiceImpl.ingestAspects(
opContext,
AspectsBatchImpl.builder()
.retrieverContext(opContext.getRetrieverContext().get())
.items(List.of(initialAspectTag1))
.build(),
false,
true);
_entityServiceImpl.ingestAspects(
opContext,
AspectsBatchImpl.builder()
.retrieverContext(opContext.getRetrieverContext().get())
.items(List.of(patchAdd2, patchAdd2)) // duplicate
.build(),
false,
true);
// List aspects urns
ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 1);
assertEquals(batch.getStart().intValue(), 0);
assertEquals(batch.getCount().intValue(), 1);
assertEquals(batch.getTotal().intValue(), 1);
assertEquals(batch.getEntities().size(), 1);
assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString());
EnvelopedAspect envelopedAspect =
_entityServiceImpl.getLatestEnvelopedAspect(
opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME);
assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "3", "Expected version 3");
assertEquals(
new GlobalTags(envelopedAspect.getValue().data())
.getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()),
Stream.concat(initialTags.stream().map(TagAssociation::getTag), Stream.of(tag2))
.collect(Collectors.toSet()),
"Expected all tags");
}
@Test
public void dataGeneratorThreadingTest() {
DataGenerator dataGenerator = new DataGenerator(opContext, _entityServiceImpl);
@ -976,14 +608,4 @@ public class EbeanEntityServiceTest
}
}
}
private static GenericJsonPatch.PatchOp tagPatchOp(PatchOperationType op, Urn tagUrn) {
GenericJsonPatch.PatchOp patchOp = new GenericJsonPatch.PatchOp();
patchOp.setOp(op.getValue());
patchOp.setPath(String.format("/tags/%s", tagUrn));
if (PatchOperationType.ADD.equals(op)) {
patchOp.setValue(Map.of("tag", tagUrn.toString()));
}
return patchOp;
}
}

View File

@ -11,14 +11,18 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.GlobalTags;
import com.linkedin.common.Owner;
import com.linkedin.common.OwnerArray;
import com.linkedin.common.Ownership;
import com.linkedin.common.OwnershipType;
import com.linkedin.common.Status;
import com.linkedin.common.TagAssociation;
import com.linkedin.common.TagAssociationArray;
import com.linkedin.common.UrnArray;
import com.linkedin.common.VersionedUrn;
import com.linkedin.common.urn.CorpuserUrn;
import com.linkedin.common.urn.TagUrn;
import com.linkedin.common.urn.TupleKey;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
@ -42,8 +46,11 @@ import com.linkedin.metadata.aspect.Aspect;
import com.linkedin.metadata.aspect.CorpUserAspect;
import com.linkedin.metadata.aspect.CorpUserAspectArray;
import com.linkedin.metadata.aspect.VersionedAspect;
import com.linkedin.metadata.aspect.patch.GenericJsonPatch;
import com.linkedin.metadata.aspect.patch.PatchOperationType;
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl;
import com.linkedin.metadata.entity.ebean.batch.PatchItemImpl;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.entity.validation.ValidationApiUtils;
import com.linkedin.metadata.entity.validation.ValidationException;
@ -52,10 +59,12 @@ import com.linkedin.metadata.key.CorpUserKey;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistryException;
import com.linkedin.metadata.query.ListUrnsResult;
import com.linkedin.metadata.run.AspectRowSummary;
import com.linkedin.metadata.service.UpdateIndicesService;
import com.linkedin.metadata.snapshot.CorpUserSnapshot;
import com.linkedin.metadata.snapshot.Snapshot;
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.GenericAspect;
@ -605,6 +614,9 @@ public abstract class EntityServiceTest<T_AD extends AspectDao, T_RS extends Ret
entityUrn,
_testEntityRegistry.getEntitySpec(entityUrn.getEntityType()).getKeyAspectSpec())));
SystemMetadata futureSystemMetadata = AspectGenerationUtils.createSystemMetadata(1);
futureSystemMetadata.setLastObserved(futureSystemMetadata.getLastObserved() + 1);
final MetadataChangeLog restateChangeLog = new MetadataChangeLog();
restateChangeLog.setEntityType(entityUrn.getEntityType());
restateChangeLog.setEntityUrn(entityUrn);
@ -612,10 +624,10 @@ public abstract class EntityServiceTest<T_AD extends AspectDao, T_RS extends Ret
restateChangeLog.setAspectName(aspectName1);
restateChangeLog.setCreated(TEST_AUDIT_STAMP);
restateChangeLog.setAspect(aspect);
restateChangeLog.setSystemMetadata(AspectGenerationUtils.createSystemMetadata(1));
restateChangeLog.setSystemMetadata(futureSystemMetadata);
restateChangeLog.setPreviousAspectValue(aspect);
restateChangeLog.setPreviousSystemMetadata(
simulatePullFromDB(initialSystemMetadata, SystemMetadata.class));
simulatePullFromDB(futureSystemMetadata, SystemMetadata.class));
restateChangeLog.setEntityKeyAspect(
GenericRecordUtils.serializeAspect(
EntityKeyUtils.convertUrnToEntityKey(
@ -636,11 +648,7 @@ public abstract class EntityServiceTest<T_AD extends AspectDao, T_RS extends Ret
clearInvocations(_mockProducer);
_entityServiceImpl.ingestAspects(
opContext,
entityUrn,
pairToIngest,
TEST_AUDIT_STAMP,
AspectGenerationUtils.createSystemMetadata());
opContext, entityUrn, pairToIngest, TEST_AUDIT_STAMP, futureSystemMetadata);
verify(_mockProducer, times(1))
.produceMetadataChangeLog(
@ -682,6 +690,12 @@ public abstract class EntityServiceTest<T_AD extends AspectDao, T_RS extends Ret
initialChangeLog.setAspect(genericAspect);
initialChangeLog.setSystemMetadata(metadata1);
SystemMetadata futureSystemMetadata = AspectGenerationUtils.createSystemMetadata(1);
futureSystemMetadata.setLastObserved(futureSystemMetadata.getLastObserved() + 1);
MetadataChangeProposal mcp2 = new MetadataChangeProposal(mcp1.data().copy());
mcp2.getSystemMetadata().setLastObserved(futureSystemMetadata.getLastObserved());
final MetadataChangeLog restateChangeLog = new MetadataChangeLog();
restateChangeLog.setEntityType(entityUrn.getEntityType());
restateChangeLog.setEntityUrn(entityUrn);
@ -689,9 +703,10 @@ public abstract class EntityServiceTest<T_AD extends AspectDao, T_RS extends Ret
restateChangeLog.setAspectName(aspectName1);
restateChangeLog.setCreated(TEST_AUDIT_STAMP);
restateChangeLog.setAspect(genericAspect);
restateChangeLog.setSystemMetadata(AspectGenerationUtils.createSystemMetadata(1));
restateChangeLog.setSystemMetadata(futureSystemMetadata);
restateChangeLog.setPreviousAspectValue(genericAspect);
restateChangeLog.setPreviousSystemMetadata(simulatePullFromDB(metadata1, SystemMetadata.class));
restateChangeLog.setPreviousSystemMetadata(
simulatePullFromDB(futureSystemMetadata, SystemMetadata.class));
Map<String, RecordTemplate> latestAspects =
_entityServiceImpl.getLatestAspectsForUrn(
@ -706,7 +721,7 @@ public abstract class EntityServiceTest<T_AD extends AspectDao, T_RS extends Ret
// unless invocations are cleared
clearInvocations(_mockProducer);
_entityServiceImpl.ingestProposal(opContext, mcp1, TEST_AUDIT_STAMP, false);
_entityServiceImpl.ingestProposal(opContext, mcp2, TEST_AUDIT_STAMP, false);
verify(_mockProducer, times(1))
.produceMetadataChangeLog(
@ -1390,7 +1405,7 @@ public abstract class EntityServiceTest<T_AD extends AspectDao, T_RS extends Ret
SystemMetadata metadata1 = AspectGenerationUtils.createSystemMetadata(1625792689, "run-123");
SystemMetadata metadata2 = AspectGenerationUtils.createSystemMetadata(1635792689, "run-456");
SystemMetadata metadata3 =
AspectGenerationUtils.createSystemMetadata(1635792689, "run-123", "run-456", "1");
AspectGenerationUtils.createSystemMetadata(1635792689, "run-456", "run-123", "1");
List<ChangeItemImpl> items =
List.of(
@ -1482,6 +1497,9 @@ public abstract class EntityServiceTest<T_AD extends AspectDao, T_RS extends Ret
assertTrue(
DataTemplateUtil.areEqual(
EntityApiUtils.parseSystemMetadata(readAspectDao2.getSystemMetadata()), metadata3),
String.format(
"Expected %s == %s",
EntityApiUtils.parseSystemMetadata(readAspectDao2.getSystemMetadata()), metadata3));
verify(_mockProducer, times(0))
@ -2179,6 +2197,474 @@ public abstract class EntityServiceTest<T_AD extends AspectDao, T_RS extends Ret
Set.of(existentUrn, noStatusUrn, softDeletedUrn));
}
@Test
public void testBatchDuplicate() throws Exception {
Urn entityUrn = UrnUtils.getUrn("urn:li:corpuser:batchDuplicateTest");
SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata();
ChangeItemImpl item1 =
ChangeItemImpl.builder()
.urn(entityUrn)
.aspectName(STATUS_ASPECT_NAME)
.recordTemplate(new Status().setRemoved(true))
.systemMetadata(systemMetadata.copy())
.auditStamp(TEST_AUDIT_STAMP)
.build(TestOperationContexts.emptyAspectRetriever(null));
ChangeItemImpl item2 =
ChangeItemImpl.builder()
.urn(entityUrn)
.aspectName(STATUS_ASPECT_NAME)
.recordTemplate(new Status().setRemoved(false))
.systemMetadata(systemMetadata.copy())
.auditStamp(TEST_AUDIT_STAMP)
.build(TestOperationContexts.emptyAspectRetriever(null));
_entityServiceImpl.ingestAspects(
opContext,
AspectsBatchImpl.builder()
.retrieverContext(opContext.getRetrieverContext().get())
.items(List.of(item1, item2))
.build(),
false,
true);
// List aspects urns
ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 2);
assertEquals(batch.getStart().intValue(), 0);
assertEquals(batch.getCount().intValue(), 1);
assertEquals(batch.getTotal().intValue(), 1);
assertEquals(batch.getEntities().size(), 1);
assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString());
EnvelopedAspect envelopedAspect =
_entityServiceImpl.getLatestEnvelopedAspect(
opContext, CORP_USER_ENTITY_NAME, entityUrn, STATUS_ASPECT_NAME);
assertEquals(
envelopedAspect.getSystemMetadata().getVersion(),
"2",
"Expected version 2 after accounting for sequential duplicates");
assertEquals(
envelopedAspect.getValue().toString(),
"{removed=false}",
"Expected 2nd item to be the latest");
}
@Test
public void testBatchPatchWithTrailingNoOp() throws Exception {
Urn entityUrn =
UrnUtils.getUrn(
"urn:li:dataset:(urn:li:dataPlatform:snowflake,testBatchPatchWithTrailingNoOp,PROD)");
TagUrn tag1 = TagUrn.createFromString("urn:li:tag:tag1");
Urn tag2 = UrnUtils.getUrn("urn:li:tag:tag2");
Urn tagOther = UrnUtils.getUrn("urn:li:tag:other");
SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata();
ChangeItemImpl initialAspectTag1 =
ChangeItemImpl.builder()
.urn(entityUrn)
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
.recordTemplate(
new GlobalTags()
.setTags(new TagAssociationArray(new TagAssociation().setTag(tag1))))
.systemMetadata(systemMetadata.copy())
.auditStamp(TEST_AUDIT_STAMP)
.build(TestOperationContexts.emptyAspectRetriever(null));
PatchItemImpl patchAdd2 =
PatchItemImpl.builder()
.urn(entityUrn)
.entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
.aspectSpec(
_testEntityRegistry
.getEntitySpec(DATASET_ENTITY_NAME)
.getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
.patch(
GenericJsonPatch.builder()
.arrayPrimaryKeys(Map.of("properties", List.of("tag")))
.patch(List.of(tagPatchOp(PatchOperationType.ADD, tag2)))
.build()
.getJsonPatch())
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
.build(_testEntityRegistry);
PatchItemImpl patchRemoveNonExistent =
PatchItemImpl.builder()
.urn(entityUrn)
.entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
.aspectSpec(
_testEntityRegistry
.getEntitySpec(DATASET_ENTITY_NAME)
.getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
.patch(
GenericJsonPatch.builder()
.arrayPrimaryKeys(Map.of("properties", List.of("tag")))
.patch(List.of(tagPatchOp(PatchOperationType.REMOVE, tagOther)))
.build()
.getJsonPatch())
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
.build(_testEntityRegistry);
// establish base entity
_entityServiceImpl.ingestAspects(
opContext,
AspectsBatchImpl.builder()
.retrieverContext(opContext.getRetrieverContext().get())
.items(List.of(initialAspectTag1))
.build(),
false,
true);
_entityServiceImpl.ingestAspects(
opContext,
AspectsBatchImpl.builder()
.retrieverContext(opContext.getRetrieverContext().get())
.items(List.of(patchAdd2, patchRemoveNonExistent))
.build(),
false,
true);
// List aspects urns
ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 1);
assertEquals(batch.getStart().intValue(), 0);
assertEquals(batch.getCount().intValue(), 1);
assertEquals(batch.getTotal().intValue(), 1);
assertEquals(batch.getEntities().size(), 1);
assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString());
EnvelopedAspect envelopedAspect =
_entityServiceImpl.getLatestEnvelopedAspect(
opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME);
assertEquals(
envelopedAspect.getSystemMetadata().getVersion(),
"2",
"Expected version 3. 1 - Initial, + 1 add, 1 remove");
assertEquals(
new GlobalTags(envelopedAspect.getValue().data())
.getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()),
Set.of(tag1, tag2),
"Expected both tags");
}
@Test
public void testBatchPatchAdd() throws Exception {
Urn entityUrn =
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:snowflake,testBatchPatchAdd,PROD)");
TagUrn tag1 = TagUrn.createFromString("urn:li:tag:tag1");
TagUrn tag2 = TagUrn.createFromString("urn:li:tag:tag2");
TagUrn tag3 = TagUrn.createFromString("urn:li:tag:tag3");
SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata();
ChangeItemImpl initialAspectTag1 =
ChangeItemImpl.builder()
.urn(entityUrn)
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
.recordTemplate(
new GlobalTags()
.setTags(new TagAssociationArray(new TagAssociation().setTag(tag1))))
.systemMetadata(systemMetadata.copy())
.auditStamp(TEST_AUDIT_STAMP)
.build(TestOperationContexts.emptyAspectRetriever(null));
PatchItemImpl patchAdd3 =
PatchItemImpl.builder()
.urn(entityUrn)
.entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
.aspectSpec(
_testEntityRegistry
.getEntitySpec(DATASET_ENTITY_NAME)
.getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
.patch(
GenericJsonPatch.builder()
.arrayPrimaryKeys(Map.of("properties", List.of("tag")))
.patch(List.of(tagPatchOp(PatchOperationType.ADD, tag3)))
.build()
.getJsonPatch())
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
.build(_testEntityRegistry);
PatchItemImpl patchAdd2 =
PatchItemImpl.builder()
.urn(entityUrn)
.entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
.aspectSpec(
_testEntityRegistry
.getEntitySpec(DATASET_ENTITY_NAME)
.getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
.patch(
GenericJsonPatch.builder()
.arrayPrimaryKeys(Map.of("properties", List.of("tag")))
.patch(List.of(tagPatchOp(PatchOperationType.ADD, tag2)))
.build()
.getJsonPatch())
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
.build(_testEntityRegistry);
PatchItemImpl patchAdd1 =
PatchItemImpl.builder()
.urn(entityUrn)
.entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
.aspectSpec(
_testEntityRegistry
.getEntitySpec(DATASET_ENTITY_NAME)
.getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
.patch(
GenericJsonPatch.builder()
.arrayPrimaryKeys(Map.of("properties", List.of("tag")))
.patch(List.of(tagPatchOp(PatchOperationType.ADD, tag1)))
.build()
.getJsonPatch())
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
.build(_testEntityRegistry);
// establish base entity
_entityServiceImpl.ingestAspects(
opContext,
AspectsBatchImpl.builder()
.retrieverContext(opContext.getRetrieverContext().get())
.items(List.of(initialAspectTag1))
.build(),
false,
true);
_entityServiceImpl.ingestAspects(
opContext,
AspectsBatchImpl.builder()
.retrieverContext(opContext.getRetrieverContext().get())
.items(List.of(patchAdd3, patchAdd2, patchAdd1))
.build(),
false,
true);
// List aspects urns
ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 1);
assertEquals(batch.getStart().intValue(), 0);
assertEquals(batch.getCount().intValue(), 1);
assertEquals(batch.getTotal().intValue(), 1);
assertEquals(batch.getEntities().size(), 1);
assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString());
EnvelopedAspect envelopedAspect =
_entityServiceImpl.getLatestEnvelopedAspect(
opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME);
assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "3", "Expected version 4");
assertEquals(
new GlobalTags(envelopedAspect.getValue().data())
.getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()),
Set.of(tag1, tag2, tag3),
"Expected all tags");
}
@Test
public void testBatchPatchAddDuplicate() throws Exception {
Urn entityUrn =
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:snowflake,testBatchPatchAdd,PROD)");
List<TagAssociation> initialTags =
List.of(
TagUrn.createFromString("urn:li:tag:__default_large_table"),
TagUrn.createFromString("urn:li:tag:__default_low_queries"),
TagUrn.createFromString("urn:li:tag:__default_low_changes"),
TagUrn.createFromString("urn:li:tag:!10TB+ tables"))
.stream()
.map(tag -> new TagAssociation().setTag(tag))
.collect(Collectors.toList());
TagUrn tag2 = TagUrn.createFromString("urn:li:tag:$ 1TB+");
SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata();
SystemMetadata patchSystemMetadata = new SystemMetadata();
patchSystemMetadata.setLastObserved(systemMetadata.getLastObserved() + 1);
patchSystemMetadata.setProperties(new StringMap(Map.of(APP_SOURCE, METADATA_TESTS_SOURCE)));
ChangeItemImpl initialAspectTag1 =
ChangeItemImpl.builder()
.urn(entityUrn)
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
.recordTemplate(new GlobalTags().setTags(new TagAssociationArray(initialTags)))
.systemMetadata(systemMetadata.copy())
.auditStamp(TEST_AUDIT_STAMP)
.build(TestOperationContexts.emptyAspectRetriever(null));
PatchItemImpl patchAdd2 =
PatchItemImpl.builder()
.urn(entityUrn)
.entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
.aspectSpec(
_testEntityRegistry
.getEntitySpec(DATASET_ENTITY_NAME)
.getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
.patch(
GenericJsonPatch.builder()
.arrayPrimaryKeys(Map.of("properties", List.of("tag")))
.patch(List.of(tagPatchOp(PatchOperationType.ADD, tag2)))
.build()
.getJsonPatch())
.systemMetadata(patchSystemMetadata)
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
.build(_testEntityRegistry);
// establish base entity
_entityServiceImpl.ingestAspects(
opContext,
AspectsBatchImpl.builder()
.retrieverContext(opContext.getRetrieverContext().get())
.items(List.of(initialAspectTag1))
.build(),
false,
true);
_entityServiceImpl.ingestAspects(
opContext,
AspectsBatchImpl.builder()
.retrieverContext(opContext.getRetrieverContext().get())
.items(List.of(patchAdd2, patchAdd2)) // duplicate
.build(),
false,
true);
// List aspects urns
ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 1);
assertEquals(batch.getStart().intValue(), 0);
assertEquals(batch.getCount().intValue(), 1);
assertEquals(batch.getTotal().intValue(), 1);
assertEquals(batch.getEntities().size(), 1);
assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString());
EnvelopedAspect envelopedAspect =
_entityServiceImpl.getLatestEnvelopedAspect(
opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME);
assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "2", "Expected version 2");
assertEquals(
new GlobalTags(envelopedAspect.getValue().data())
.getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()),
Stream.concat(initialTags.stream().map(TagAssociation::getTag), Stream.of(tag2))
.collect(Collectors.toSet()),
"Expected all tags");
}
@Test
public void testPatchRemoveNonExistent() throws Exception {
Urn entityUrn =
UrnUtils.getUrn(
"urn:li:dataset:(urn:li:dataPlatform:snowflake,testPatchRemoveNonExistent,PROD)");
TagUrn tag1 = TagUrn.createFromString("urn:li:tag:tag1");
PatchItemImpl patchRemove =
PatchItemImpl.builder()
.urn(entityUrn)
.entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
.aspectSpec(
_testEntityRegistry
.getEntitySpec(DATASET_ENTITY_NAME)
.getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
.patch(
GenericJsonPatch.builder()
.arrayPrimaryKeys(Map.of("properties", List.of("tag")))
.patch(List.of(tagPatchOp(PatchOperationType.REMOVE, tag1)))
.build()
.getJsonPatch())
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
.build(_testEntityRegistry);
List<UpdateAspectResult> results =
_entityServiceImpl.ingestAspects(
opContext,
AspectsBatchImpl.builder()
.retrieverContext(opContext.getRetrieverContext().get())
.items(List.of(patchRemove))
.build(),
false,
true);
assertEquals(results.size(), 4, "Expected default aspects + empty globalTags");
// List aspects urns
ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 1);
assertEquals(batch.getStart().intValue(), 0);
assertEquals(batch.getCount().intValue(), 1);
assertEquals(batch.getTotal().intValue(), 1);
assertEquals(batch.getEntities().size(), 1);
assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString());
EnvelopedAspect envelopedAspect =
_entityServiceImpl.getLatestEnvelopedAspect(
opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME);
assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "1", "Expected version 4");
assertEquals(
new GlobalTags(envelopedAspect.getValue().data())
.getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()),
Set.of(),
"Expected empty tags");
}
@Test
public void testPatchAddNonExistent() throws Exception {
Urn entityUrn =
UrnUtils.getUrn(
"urn:li:dataset:(urn:li:dataPlatform:snowflake,testPatchAddNonExistent,PROD)");
TagUrn tag1 = TagUrn.createFromString("urn:li:tag:tag1");
PatchItemImpl patchAdd =
PatchItemImpl.builder()
.urn(entityUrn)
.entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
.aspectSpec(
_testEntityRegistry
.getEntitySpec(DATASET_ENTITY_NAME)
.getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
.patch(
GenericJsonPatch.builder()
.arrayPrimaryKeys(Map.of("properties", List.of("tag")))
.patch(List.of(tagPatchOp(PatchOperationType.ADD, tag1)))
.build()
.getJsonPatch())
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
.build(_testEntityRegistry);
List<UpdateAspectResult> results =
_entityServiceImpl.ingestAspects(
opContext,
AspectsBatchImpl.builder()
.retrieverContext(opContext.getRetrieverContext().get())
.items(List.of(patchAdd))
.build(),
false,
true);
assertEquals(results.size(), 4, "Expected default aspects + globalTags");
// List aspects urns
ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 1);
assertEquals(batch.getStart().intValue(), 0);
assertEquals(batch.getCount().intValue(), 1);
assertEquals(batch.getTotal().intValue(), 1);
assertEquals(batch.getEntities().size(), 1);
assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString());
EnvelopedAspect envelopedAspect =
_entityServiceImpl.getLatestEnvelopedAspect(
opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME);
assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "1", "Expected version 4");
assertEquals(
new GlobalTags(envelopedAspect.getValue().data())
.getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()),
Set.of(tag1),
"Expected all tags");
}
@Nonnull
protected com.linkedin.entity.Entity createCorpUserEntity(Urn entityUrn, String email)
throws Exception {
@ -2210,4 +2696,14 @@ public abstract class EntityServiceTest<T_AD extends AspectDao, T_RS extends Ret
RecordUtils.toRecordTemplate(clazz, objectMapper.writeValueAsString(aspect));
return new Pair<>(AspectGenerationUtils.getAspectName(aspect), recordTemplate);
}
private static GenericJsonPatch.PatchOp tagPatchOp(PatchOperationType op, Urn tagUrn) {
GenericJsonPatch.PatchOp patchOp = new GenericJsonPatch.PatchOp();
patchOp.setOp(op.getValue());
patchOp.setPath(String.format("/tags/%s", tagUrn));
if (PatchOperationType.ADD.equals(op)) {
patchOp.setValue(Map.of("tag", tagUrn.toString()));
}
return patchOp;
}
}

View File

@ -0,0 +1,41 @@
package com.linkedin.metadata.entity.ebean.batch;
import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME;
import static org.testng.Assert.assertFalse;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.Status;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.metadata.AspectGenerationUtils;
import com.linkedin.mxe.SystemMetadata;
import io.datahubproject.test.metadata.context.TestOperationContexts;
import org.testng.annotations.Test;
public class ChangeItemImplTest {
private static final AuditStamp TEST_AUDIT_STAMP = AspectGenerationUtils.createAuditStamp();
@Test
public void testBatchDuplicate() throws Exception {
Urn entityUrn = UrnUtils.getUrn("urn:li:corpuser:batchDuplicateTest");
SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata();
ChangeItemImpl item1 =
ChangeItemImpl.builder()
.urn(entityUrn)
.aspectName(STATUS_ASPECT_NAME)
.recordTemplate(new Status().setRemoved(true))
.systemMetadata(systemMetadata.copy())
.auditStamp(TEST_AUDIT_STAMP)
.build(TestOperationContexts.emptyAspectRetriever(null));
ChangeItemImpl item2 =
ChangeItemImpl.builder()
.urn(entityUrn)
.aspectName(STATUS_ASPECT_NAME)
.recordTemplate(new Status().setRemoved(false))
.systemMetadata(systemMetadata.copy())
.auditStamp(TEST_AUDIT_STAMP)
.build(TestOperationContexts.emptyAspectRetriever(null));
assertFalse(item1.isDatabaseDuplicateOf(item2));
}
}

View File

@ -151,7 +151,7 @@ public class SchemaFieldSideEffectTest {
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_id)"))
.aspectName(SCHEMA_FIELD_ALIASES_ASPECT)
.changeType(changeType)
.changeType(ChangeType.UPSERT)
.entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME))
.aspectSpec(
TEST_REGISTRY
@ -172,7 +172,7 @@ public class SchemaFieldSideEffectTest {
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_name)"))
.aspectName(SCHEMA_FIELD_ALIASES_ASPECT)
.changeType(changeType)
.changeType(ChangeType.UPSERT)
.entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME))
.aspectSpec(
TEST_REGISTRY
@ -248,7 +248,7 @@ public class SchemaFieldSideEffectTest {
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_id)"))
.aspectName(STATUS_ASPECT_NAME)
.changeType(changeType)
.changeType(ChangeType.UPSERT)
.entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME))
.aspectSpec(
TEST_REGISTRY
@ -263,7 +263,7 @@ public class SchemaFieldSideEffectTest {
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_name)"))
.aspectName(STATUS_ASPECT_NAME)
.changeType(changeType)
.changeType(ChangeType.UPSERT)
.entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME))
.aspectSpec(
TEST_REGISTRY
@ -324,7 +324,7 @@ public class SchemaFieldSideEffectTest {
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_id)"))
.aspectName(STATUS_ASPECT_NAME)
.changeType(changeType)
.changeType(ChangeType.UPSERT)
.entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME))
.aspectSpec(
TEST_REGISTRY
@ -339,7 +339,7 @@ public class SchemaFieldSideEffectTest {
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_name)"))
.aspectName(STATUS_ASPECT_NAME)
.changeType(changeType)
.changeType(ChangeType.UPSERT)
.entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME))
.aspectSpec(
TEST_REGISTRY
@ -354,7 +354,7 @@ public class SchemaFieldSideEffectTest {
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_id)"))
.aspectName(SCHEMA_FIELD_ALIASES_ASPECT)
.changeType(changeType)
.changeType(ChangeType.UPSERT)
.entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME))
.aspectSpec(
TEST_REGISTRY
@ -375,7 +375,7 @@ public class SchemaFieldSideEffectTest {
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_name)"))
.aspectName(SCHEMA_FIELD_ALIASES_ASPECT)
.changeType(changeType)
.changeType(ChangeType.UPSERT)
.entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME))
.aspectSpec(
TEST_REGISTRY

View File

@ -159,7 +159,7 @@ ebean:
autoCreateDdl: ${EBEAN_AUTOCREATE:false}
postgresUseIamAuth: ${EBEAN_POSTGRES_USE_AWS_IAM_AUTH:false}
locking:
enabled: ${EBEAN_LOCKING_ENABLED:true}
enabled: ${EBEAN_LOCKING_ENABLED:false}
durationSeconds: ${EBEAN_LOCKING_DURATION_SECONDS:60}
maximumLocks: ${EBEAN_LOCKING_MAXIMUM_LOCKS:20000}

View File

@ -4,6 +4,7 @@ import com.datahub.util.RecordUtils;
import com.fasterxml.jackson.databind.JsonNode;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.ByteString;
import com.linkedin.data.DataMap;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.entity.Aspect;
import com.linkedin.entity.EntityResponse;
@ -13,6 +14,8 @@ import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.mxe.GenericAspect;
import com.linkedin.mxe.GenericPayload;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.stream.Collectors;
@ -23,6 +26,22 @@ public class GenericRecordUtils {
private GenericRecordUtils() {}
public static <T extends RecordTemplate> T copy(T input, Class<T> clazz) {
try {
if (input == null) {
return null;
}
Constructor<T> constructor = clazz.getConstructor(DataMap.class);
return constructor.newInstance(input.data().copy());
} catch (CloneNotSupportedException
| InvocationTargetException
| NoSuchMethodException
| InstantiationException
| IllegalAccessException e) {
throw new RuntimeException(e);
}
}
/** Deserialize the given value into the aspect based on the input aspectSpec */
@Nonnull
public static RecordTemplate deserializeAspect(