mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-24 18:10:11 +00:00
fix(version): forUpdate needed for versioning (#11328)
This commit is contained in:
parent
cf49f80e77
commit
c6eea1eec3
@ -51,16 +51,30 @@ public interface AspectDao {
|
||||
List<EntityAspect> getAspectsInRange(
|
||||
@Nonnull Urn urn, Set<String> aspectNames, long startTimeMillis, long endTimeMillis);
|
||||
|
||||
/**
|
||||
* @param urn urn to fetch
|
||||
* @param aspectName aspect to fetch
|
||||
* @param forUpdate set to true if the result is used for versioning <a
|
||||
* href="https://ebean.io/docs/query/option#forUpdate">link</a>
|
||||
* @return
|
||||
*/
|
||||
@Nullable
|
||||
default EntityAspect getLatestAspect(
|
||||
@Nonnull final String urn, @Nonnull final String aspectName) {
|
||||
return getLatestAspects(Map.of(urn, Set.of(aspectName)))
|
||||
@Nonnull final String urn, @Nonnull final String aspectName, boolean forUpdate) {
|
||||
return getLatestAspects(Map.of(urn, Set.of(aspectName)), forUpdate)
|
||||
.getOrDefault(urn, Map.of())
|
||||
.getOrDefault(aspectName, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param urnAspects urn/aspects to fetch
|
||||
* @param forUpdate set to true if the result is used for versioning <a
|
||||
* href="https://ebean.io/docs/query/option#forUpdate">link</a>
|
||||
* @return the data
|
||||
*/
|
||||
@Nonnull
|
||||
Map<String, Map<String, EntityAspect>> getLatestAspects(Map<String, Set<String>> urnAspects);
|
||||
Map<String, Map<String, EntityAspect>> getLatestAspects(
|
||||
Map<String, Set<String>> urnAspects, boolean forUpdate);
|
||||
|
||||
void saveAspect(
|
||||
@Nullable Transaction tx,
|
||||
|
@ -849,7 +849,7 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
|
||||
final Map<String, Map<String, SystemAspect>> latestAspects =
|
||||
EntityUtils.toSystemAspects(
|
||||
opContext.getRetrieverContext().get(),
|
||||
aspectDao.getLatestAspects(urnAspects));
|
||||
aspectDao.getLatestAspects(urnAspects, true));
|
||||
// read #2 (potentially)
|
||||
final Map<String, Map<String, Long>> nextVersions =
|
||||
EntityUtils.calculateNextVersions(aspectDao, latestAspects, urnAspects);
|
||||
@ -866,7 +866,7 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
|
||||
Map<String, Map<String, SystemAspect>> newLatestAspects =
|
||||
EntityUtils.toSystemAspects(
|
||||
opContext.getRetrieverContext().get(),
|
||||
aspectDao.getLatestAspects(updatedItems.getFirst()));
|
||||
aspectDao.getLatestAspects(updatedItems.getFirst(), true));
|
||||
// merge
|
||||
updatedLatestAspects = AspectsBatch.merge(latestAspects, newLatestAspects);
|
||||
|
||||
@ -2064,7 +2064,7 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
|
||||
|
||||
EntityAspect latestKey = null;
|
||||
try {
|
||||
latestKey = aspectDao.getLatestAspect(urn.toString(), keyAspectName);
|
||||
latestKey = aspectDao.getLatestAspect(urn.toString(), keyAspectName, false);
|
||||
} catch (EntityNotFoundException e) {
|
||||
log.warn("Entity to delete does not exist. {}", urn.toString());
|
||||
}
|
||||
@ -2217,7 +2217,7 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
|
||||
(EntityAspect.EntitySystemAspect)
|
||||
EntityUtils.toSystemAspect(
|
||||
opContext.getRetrieverContext().get(),
|
||||
aspectDao.getLatestAspect(urn, aspectName))
|
||||
aspectDao.getLatestAspect(urn, aspectName, false))
|
||||
.orElse(null);
|
||||
|
||||
// 1.1 If no latest exists, skip this aspect
|
||||
|
@ -81,14 +81,15 @@ public class CassandraAspectDao implements AspectDao, AspectMigrationsDao {
|
||||
}
|
||||
|
||||
@Override
|
||||
public EntityAspect getLatestAspect(@Nonnull String urn, @Nonnull String aspectName) {
|
||||
public EntityAspect getLatestAspect(
|
||||
@Nonnull String urn, @Nonnull String aspectName, boolean forUpdate) {
|
||||
validateConnection();
|
||||
return getAspect(urn, aspectName, ASPECT_LATEST_VERSION);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Map<String, EntityAspect>> getLatestAspects(
|
||||
Map<String, Set<String>> urnAspects) {
|
||||
Map<String, Set<String>> urnAspects, boolean forUpdate) {
|
||||
return urnAspects.entrySet().stream()
|
||||
.map(
|
||||
entry ->
|
||||
@ -97,7 +98,8 @@ public class CassandraAspectDao implements AspectDao, AspectMigrationsDao {
|
||||
entry.getValue().stream()
|
||||
.map(
|
||||
aspectName -> {
|
||||
EntityAspect aspect = getLatestAspect(entry.getKey(), aspectName);
|
||||
EntityAspect aspect =
|
||||
getLatestAspect(entry.getKey(), aspectName, forUpdate);
|
||||
return aspect != null ? Map.entry(aspectName, aspect) : null;
|
||||
})
|
||||
.filter(Objects::nonNull)
|
||||
|
@ -242,7 +242,7 @@ public class EbeanAspectDao implements AspectDao, AspectMigrationsDao {
|
||||
|
||||
@Override
|
||||
public Map<String, Map<String, EntityAspect>> getLatestAspects(
|
||||
@Nonnull Map<String, Set<String>> urnAspects) {
|
||||
@Nonnull Map<String, Set<String>> urnAspects, boolean forUpdate) {
|
||||
validateConnection();
|
||||
|
||||
List<EbeanAspectV2.PrimaryKey> keys =
|
||||
@ -256,7 +256,12 @@ public class EbeanAspectDao implements AspectDao, AspectMigrationsDao {
|
||||
entry.getKey(), aspect, ASPECT_LATEST_VERSION)))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
List<EbeanAspectV2> results = _server.find(EbeanAspectV2.class).where().idIn(keys).findList();
|
||||
final List<EbeanAspectV2> results;
|
||||
if (forUpdate) {
|
||||
results = _server.find(EbeanAspectV2.class).where().idIn(keys).forUpdate().findList();
|
||||
} else {
|
||||
results = _server.find(EbeanAspectV2.class).where().idIn(keys).findList();
|
||||
}
|
||||
|
||||
return toUrnAspectMap(results);
|
||||
}
|
||||
@ -814,7 +819,8 @@ public class EbeanAspectDao implements AspectDao, AspectMigrationsDao {
|
||||
return result;
|
||||
}
|
||||
|
||||
List<EbeanAspectV2.PrimaryKey> dbResults = exp.endOr().findIds();
|
||||
// forUpdate is required to avoid duplicate key violations
|
||||
List<EbeanAspectV2.PrimaryKey> dbResults = exp.endOr().forUpdate().findIds();
|
||||
|
||||
for (EbeanAspectV2.PrimaryKey key : dbResults) {
|
||||
if (result.get(key.getUrn()).get(key.getAspect()) <= key.getVersion()) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user