feat(delete): delete logic non-strict monotonically increasing version (#12242)

This commit is contained in:
david-leifker 2024-12-30 17:44:09 -06:00 committed by GitHub
parent abb64433fc
commit ee54f1fb61
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 188 additions and 32 deletions

View File

@ -6,6 +6,7 @@ import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import com.linkedin.metadata.entity.ebean.PartitionedStream;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.util.Pair;
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
@ -155,6 +156,16 @@ public interface AspectDao {
long getMaxVersion(@Nonnull final String urn, @Nonnull final String aspectName);
/**
* Return the min/max version for the given URN & aspect
*
* @param urn the urn
* @param aspectName the aspect
* @return the range of versions, if they do not exist -1 is returned
*/
@Nonnull
Pair<Long, Long> getVersionRange(@Nonnull final String urn, @Nonnull final String aspectName);
void setWritable(boolean canWrite);
@Nonnull

View File

@ -2229,8 +2229,9 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
}
/** Does not emit MCL */
@VisibleForTesting
@Nullable
private RollbackResult deleteAspectWithoutMCL(
RollbackResult deleteAspectWithoutMCL(
@Nonnull OperationContext opContext,
String urn,
String aspectName,
@ -2288,11 +2289,14 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
// 4. Fetch all preceding aspects, that match
List<EntityAspect> aspectsToDelete = new ArrayList<>();
long maxVersion = aspectDao.getMaxVersion(urn, aspectName);
Pair<Long, Long> versionRange = aspectDao.getVersionRange(urn, aspectName);
long minVersion = Math.max(0, versionRange.getFirst());
long maxVersion = Math.max(0, versionRange.getSecond());
EntityAspect.EntitySystemAspect survivingAspect = null;
String previousMetadata = null;
boolean filterMatch = true;
while (maxVersion > 0 && filterMatch) {
while (maxVersion > minVersion && filterMatch) {
EntityAspect.EntitySystemAspect candidateAspect =
(EntityAspect.EntitySystemAspect)
EntityUtils.toSystemAspect(
@ -2305,11 +2309,13 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
previousSysMetadata != null && filterMatch(previousSysMetadata, conditions);
if (filterMatch) {
aspectsToDelete.add(candidateAspect.getEntityAspect());
maxVersion = maxVersion - 1;
} else if (candidateAspect == null) {
// potential gap
filterMatch = true;
} else {
survivingAspect = candidateAspect;
previousMetadata = survivingAspect.getMetadataRaw();
}
maxVersion = maxVersion - 1;
}
// Delete validation hooks

View File

@ -36,6 +36,7 @@ import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.query.ExtraInfo;
import com.linkedin.metadata.query.ExtraInfoArray;
import com.linkedin.metadata.query.ListResultMetadata;
import com.linkedin.util.Pair;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
@ -110,7 +111,14 @@ public class CassandraAspectDao implements AspectDao, AspectMigrationsDao {
@Override
public long getMaxVersion(@Nonnull final String urn, @Nonnull final String aspectName) {
validateConnection();
Map<String, Long> result = getMaxVersions(urn, ImmutableSet.of(aspectName));
Map<String, Pair<Long, Long>> result = getVersionRanges(urn, ImmutableSet.of(aspectName));
return result.get(aspectName).getSecond();
}
@Override
@Nonnull
public Pair<Long, Long> getVersionRange(@Nonnull String urn, @Nonnull String aspectName) {
Map<String, Pair<Long, Long>> result = getVersionRanges(urn, ImmutableSet.of(aspectName));
return result.get(aspectName);
}
@ -148,15 +156,17 @@ public class CassandraAspectDao implements AspectDao, AspectMigrationsDao {
return rs.one() != null;
}
private Map<String, Long> getMaxVersions(
private Map<String, Pair<Long, Long>> getVersionRanges(
@Nonnull final String urn, @Nonnull final Set<String> aspectNames) {
SimpleStatement ss =
selectFrom(CassandraAspect.TABLE_NAME)
.selectors(
Selector.column(CassandraAspect.URN_COLUMN),
Selector.column(CassandraAspect.ASPECT_COLUMN),
Selector.function("min", Selector.column(CassandraAspect.VERSION_COLUMN))
.as("min_version"),
Selector.function("max", Selector.column(CassandraAspect.VERSION_COLUMN))
.as(CassandraAspect.VERSION_COLUMN))
.as("max_version"))
.whereColumn(CassandraAspect.URN_COLUMN)
.isEqualTo(literal(urn))
.whereColumn(CassandraAspect.ASPECT_COLUMN)
@ -168,21 +178,21 @@ public class CassandraAspectDao implements AspectDao, AspectMigrationsDao {
.build();
ResultSet rs = _cqlSession.execute(ss);
Map<String, Long> aspectVersions =
Map<String, Pair<Long, Long>> aspectVersionRanges =
rs.all().stream()
.collect(
Collectors.toMap(
row -> row.getString(CassandraAspect.ASPECT_COLUMN),
row -> row.getLong(CassandraAspect.VERSION_COLUMN)));
row -> Pair.of(row.getLong("min_version"), row.getLong("max_version"))));
// For each requested aspect that didn't come back from DB, add a version -1
// For each requested aspect that didn't come back from DB, add a version range of (-1, -1)
for (String aspect : aspectNames) {
if (!aspectVersions.containsKey(aspect)) {
aspectVersions.put(aspect, -1L);
if (!aspectVersionRanges.containsKey(aspect)) {
aspectVersionRanges.put(aspect, Pair.of(-1L, -1L));
}
}
return aspectVersions;
return aspectVersionRanges;
}
@Override
@ -551,11 +561,12 @@ public class CassandraAspectDao implements AspectDao, AspectMigrationsDao {
Map<String, Map<String, Long>> result = new HashMap<>();
for (Map.Entry<String, Set<String>> aspectNames : urnAspectMap.entrySet()) {
Map<String, Long> maxVersions = getMaxVersions(aspectNames.getKey(), aspectNames.getValue());
Map<String, Pair<Long, Long>> maxVersions =
getVersionRanges(aspectNames.getKey(), aspectNames.getValue());
Map<String, Long> nextVersions = new HashMap<>();
for (String aspectName : aspectNames.getValue()) {
long latestVersion = maxVersions.get(aspectName);
long latestVersion = maxVersions.get(aspectName).getSecond();
long nextVal = latestVersion < 0 ? ASPECT_LATEST_VERSION : latestVersion + 1L;
nextVersions.put(aspectName, nextVal);
}

View File

@ -38,6 +38,8 @@ import io.ebean.PagedList;
import io.ebean.Query;
import io.ebean.RawSql;
import io.ebean.RawSqlBuilder;
import io.ebean.SqlQuery;
import io.ebean.SqlRow;
import io.ebean.Transaction;
import io.ebean.TxScope;
import io.ebean.annotation.TxIsolation;
@ -247,10 +249,18 @@ public class EbeanAspectDao implements AspectDao, AspectMigrationsDao {
@Nonnull final EbeanAspectV2 ebeanAspect,
final boolean insert) {
validateConnection();
if (insert) {
_server.insert(ebeanAspect, txContext.tx());
if (txContext != null && txContext.tx() != null) {
if (insert) {
_server.insert(ebeanAspect, txContext.tx());
} else {
_server.update(ebeanAspect, txContext.tx());
}
} else {
_server.update(ebeanAspect, txContext.tx());
if (insert) {
_server.insert(ebeanAspect);
} else {
_server.update(ebeanAspect);
}
}
}
@ -864,20 +874,33 @@ public class EbeanAspectDao implements AspectDao, AspectMigrationsDao {
}
@Override
public long getMaxVersion(@Nonnull final String urn, @Nonnull final String aspectName) {
@Nonnull
public Pair<Long, Long> getVersionRange(
@Nonnull final String urn, @Nonnull final String aspectName) {
validateConnection();
final List<EbeanAspectV2.PrimaryKey> result =
_server
.find(EbeanAspectV2.class)
.where()
.eq(EbeanAspectV2.URN_COLUMN, urn.toString())
.eq(EbeanAspectV2.ASPECT_COLUMN, aspectName)
.orderBy()
.desc(EbeanAspectV2.VERSION_COLUMN)
.setMaxRows(1)
.findIds();
return result.isEmpty() ? -1 : result.get(0).getVersion();
// Use SQL aggregation to get both min and max in a single query
SqlQuery query =
_server.sqlQuery(
"SELECT MIN(version) as min_version, MAX(version) as max_version "
+ "FROM metadata_aspect_v2 "
+ "WHERE urn = :urn AND aspect = :aspect");
query.setParameter("urn", urn);
query.setParameter("aspect", aspectName);
SqlRow result = query.findOne();
if (result == null) {
return Pair.of(-1L, -1L);
}
return Pair.of(result.getLong("min_version"), result.getLong("max_version"));
}
@Override
public long getMaxVersion(@Nonnull final String urn, @Nonnull final String aspectName) {
return getVersionRange(urn, aspectName).getSecond();
}
/**

View File

@ -84,6 +84,8 @@ import com.linkedin.util.Pair;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.test.metadata.context.TestOperationContexts;
import jakarta.annotation.Nonnull;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -2670,6 +2672,109 @@ public abstract class EntityServiceTest<T_AD extends AspectDao, T_RS extends Ret
"Expected all tags");
}
@Test
public void testDeleteUrnWithRunIdFilterNonMatch() throws Exception {
Urn entityUrn = UrnUtils.getUrn("urn:li:corpuser:deleteWithFilterNonMatch");
// Create aspects with different run IDs
SystemMetadata metadata1 = AspectGenerationUtils.createSystemMetadata();
metadata1.setRunId("run-123");
SystemMetadata metadata2 = AspectGenerationUtils.createSystemMetadata();
metadata2.setRunId("run-456"); // Different run ID
String aspectName = AspectGenerationUtils.getAspectName(new CorpUserInfo());
// First ingest the aspect that should survive (run-456)
CorpUserInfo writeAspect1 = AspectGenerationUtils.createCorpUserInfo("first@test.com");
List<Pair<String, RecordTemplate>> firstPair = new ArrayList<>();
firstPair.add(getAspectRecordPair(writeAspect1, CorpUserInfo.class));
_entityServiceImpl.ingestAspects(opContext, entityUrn, firstPair, TEST_AUDIT_STAMP, metadata2);
// Then ingest the aspect that should be deleted (run-123)
CorpUserInfo writeAspect2 = AspectGenerationUtils.createCorpUserInfo("second@test.com");
List<Pair<String, RecordTemplate>> secondPair = new ArrayList<>();
secondPair.add(getAspectRecordPair(writeAspect2, CorpUserInfo.class));
_entityServiceImpl.ingestAspects(opContext, entityUrn, secondPair, TEST_AUDIT_STAMP, metadata1);
// When we try to delete with runId=run-123, the version with runId=run-456 should survive
RollbackResult result =
_entityServiceImpl.deleteAspectWithoutMCL(
opContext,
entityUrn.toString(),
aspectName,
Collections.singletonMap("runId", "run-123"),
true);
// The aspect with run-456 should still exist
RecordTemplate survivingAspect =
_entityServiceImpl.getLatestAspect(opContext, entityUrn, aspectName);
assertTrue(DataTemplateUtil.areEqual(writeAspect1, survivingAspect));
// Verify the RollbackResult details
assertNotNull(result);
assertEquals(result.getUrn(), entityUrn);
assertEquals(result.getEntityName(), "corpuser");
assertEquals(result.getAspectName(), aspectName);
}
@Test
public void testDeleteUrnWithRunIdFilterNonMatchVersionGap() throws Exception {
Urn entityUrn = UrnUtils.getUrn("urn:li:corpuser:deleteWithFilterNonMatch");
String aspectName = AspectGenerationUtils.getAspectName(new CorpUserInfo());
// Metadata that should be preserved (run-456)
SystemMetadata metadata456 = AspectGenerationUtils.createSystemMetadata();
metadata456.setRunId("run-456"); // Different run ID
CorpUserInfo writeAspect456 = AspectGenerationUtils.createCorpUserInfo("first@test.com");
List<Pair<String, RecordTemplate>> firstPair = new ArrayList<>();
firstPair.add(getAspectRecordPair(writeAspect456, CorpUserInfo.class));
_entityServiceImpl.ingestAspects(
opContext, entityUrn, firstPair, TEST_AUDIT_STAMP, metadata456);
// Metadata that should be deleted (run-123)
SystemMetadata metadata123 = AspectGenerationUtils.createSystemMetadata();
metadata123.setRunId("run-123");
CorpUserInfo writeAspect123 = AspectGenerationUtils.createCorpUserInfo("second@test.com");
List<Pair<String, RecordTemplate>> secondPair = new ArrayList<>();
secondPair.add(getAspectRecordPair(writeAspect123, CorpUserInfo.class));
_entityServiceImpl.ingestAspects(
opContext, entityUrn, secondPair, TEST_AUDIT_STAMP, metadata123);
// Then insert another run-123 with version gap
_aspectDao.saveAspect(
null,
entityUrn.toString(),
aspectName,
RecordUtils.toJsonString(writeAspect123),
TEST_AUDIT_STAMP.getActor().toString(),
null,
Timestamp.from(Instant.ofEpochMilli(TEST_AUDIT_STAMP.getTime())),
RecordUtils.toJsonString(metadata123),
10L,
true);
// When we try to delete with runId=run-123, the version with runId=run-456 should survive
RollbackResult result =
_entityServiceImpl.deleteAspectWithoutMCL(
opContext,
entityUrn.toString(),
aspectName,
Collections.singletonMap("runId", "run-123"),
true);
// The aspect with run-456 should still exist
RecordTemplate survivingAspect =
_entityServiceImpl.getLatestAspect(opContext, entityUrn, aspectName);
assertTrue(DataTemplateUtil.areEqual(writeAspect456, survivingAspect));
// Verify the RollbackResult details
assertNotNull(result);
assertEquals(result.getUrn(), entityUrn);
assertEquals(result.getEntityName(), "corpuser");
assertEquals(result.getAspectName(), aspectName);
}
@Nonnull
protected com.linkedin.entity.Entity createCorpUserEntity(Urn entityUrn, String email)
throws Exception {