MINOR: Add pagination to entity history versions (#17587)

* Add pagination to EntityHistory Versions

* Fix endless loop

(cherry picked from commit b8af2019b403a733244239881df85769d50028f9)
This commit is contained in:
IceS2 2024-08-28 11:08:15 +02:00 committed by mohitdeuex
parent f42e684857
commit 13fd18d5ad
3 changed files with 73 additions and 25 deletions

View File

@ -21,7 +21,6 @@ import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.schema.system.IndexingError;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.schema.type.EntityHistory;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.TagLabel;
@ -84,39 +83,52 @@ public class DataInsightsEntityEnricherProcessor
Long endTimestamp = (Long) contextData.get(END_TIMESTAMP_KEY);
Long startTimestamp = (Long) contextData.get(START_TIMESTAMP_KEY);
EntityRepository<?> entityRepository = Entity.getEntityRepository(entityType);
EntityHistory entityHistory = entityRepository.listVersions(entity.getId());
Long pointerTimestamp = endTimestamp;
List<Map<String, Object>> entityVersions = new java.util.ArrayList<>();
boolean historyDone = false;
int nextOffset = 0;
for (Object version : entityHistory.getVersions()) {
EntityInterface versionEntity =
JsonUtils.readOrConvertValue(
version, ENTITY_TYPE_TO_CLASS_MAP.get(entityType.toLowerCase()));
Long versionTimestamp = TimestampUtils.getStartOfDayTimestamp(versionEntity.getUpdatedAt());
if (versionTimestamp > pointerTimestamp) {
continue;
} else if (versionTimestamp < startTimestamp) {
Map<String, Object> versionMap = new HashMap<>();
versionMap.put("endTimestamp", pointerTimestamp);
versionMap.put("startTimestamp", startTimestamp);
versionMap.put("versionEntity", versionEntity);
entityVersions.add(versionMap);
while (!historyDone) {
EntityRepository.EntityHistoryWithOffset entityHistoryWithOffset =
entityRepository.listVersionsWithOffset(entity.getId(), 100, nextOffset);
List<Object> versions = entityHistoryWithOffset.entityHistory().getVersions();
if (versions.isEmpty()) {
break;
} else {
Map<String, Object> versionMap = new HashMap<>();
}
nextOffset = entityHistoryWithOffset.nextOffset();
versionMap.put("endTimestamp", pointerTimestamp);
versionMap.put("startTimestamp", TimestampUtils.getEndOfDayTimestamp(versionTimestamp));
versionMap.put("versionEntity", versionEntity);
for (Object version : versions) {
EntityInterface versionEntity =
JsonUtils.readOrConvertValue(
version, ENTITY_TYPE_TO_CLASS_MAP.get(entityType.toLowerCase()));
Long versionTimestamp = TimestampUtils.getStartOfDayTimestamp(versionEntity.getUpdatedAt());
if (versionTimestamp > pointerTimestamp) {
continue;
} else if (versionTimestamp < startTimestamp) {
Map<String, Object> versionMap = new HashMap<>();
entityVersions.add(versionMap);
pointerTimestamp =
TimestampUtils.getEndOfDayTimestamp(TimestampUtils.subtractDays(versionTimestamp, 1));
versionMap.put("endTimestamp", pointerTimestamp);
versionMap.put("startTimestamp", startTimestamp);
versionMap.put("versionEntity", versionEntity);
entityVersions.add(versionMap);
historyDone = true;
break;
} else {
Map<String, Object> versionMap = new HashMap<>();
versionMap.put("endTimestamp", pointerTimestamp);
versionMap.put("startTimestamp", TimestampUtils.getEndOfDayTimestamp(versionTimestamp));
versionMap.put("versionEntity", versionEntity);
entityVersions.add(versionMap);
pointerTimestamp =
TimestampUtils.getEndOfDayTimestamp(TimestampUtils.subtractDays(versionTimestamp, 1));
}
}
}
return entityVersions;
}

View File

@ -703,6 +703,18 @@ public interface CollectionDAO {
List<ExtensionRecord> getExtensions(
@BindUUID("id") UUID id, @Bind("extensionPrefix") String extensionPrefix);
@RegisterRowMapper(ExtensionMapper.class)
@SqlQuery(
"SELECT extension, json FROM entity_extension WHERE id = :id AND extension "
+ "LIKE CONCAT (:extensionPrefix, '.%') "
+ "ORDER BY extension DESC "
+ "LIMIT :limit OFFSET :offset")
List<ExtensionRecord> getExtensionsWithOffset(
@BindUUID("id") UUID id,
@Bind("extensionPrefix") String extensionPrefix,
@Bind("limit") int limit,
@Bind("offset") int offset);
@SqlUpdate("DELETE FROM entity_extension WHERE id = :id AND extension = :extension")
void delete(@BindUUID("id") UUID id, @Bind("extension") String extension);

View File

@ -207,6 +207,7 @@ import org.openmetadata.service.util.ResultList;
@Slf4j
@Repository()
public abstract class EntityRepository<T extends EntityInterface> {
public record EntityHistoryWithOffset(EntityHistory entityHistory, int nextOffset) {}
public static final LoadingCache<Pair<String, String>, EntityInterface> CACHE_WITH_NAME =
CacheBuilder.newBuilder()
@ -806,6 +807,29 @@ public abstract class EntityRepository<T extends EntityInterface> {
CatalogExceptionMessage.entityVersionNotFound(entityType, id, requestedVersion));
}
public final EntityHistoryWithOffset listVersionsWithOffset(UUID id, int limit, int offset) {
T latest = setFieldsInternal(find(id, ALL), putFields);
setInheritedFields(latest, putFields);
String extensionPrefix = EntityUtil.getVersionExtensionPrefix(entityType);
List<ExtensionRecord> records =
daoCollection
.entityExtensionDAO()
.getExtensionsWithOffset(id, extensionPrefix, limit, offset);
List<EntityVersionPair> oldVersions = new ArrayList<>();
records.forEach(r -> oldVersions.add(new EntityVersionPair(r)));
oldVersions.sort(EntityUtil.compareVersion.reversed());
final List<Object> versions = new ArrayList<>();
if (offset == 0) {
versions.add(JsonUtils.pojoToJson(latest));
}
oldVersions.forEach(version -> versions.add(version.getEntityJson()));
return new EntityHistoryWithOffset(
new EntityHistory().withEntityType(entityType).withVersions(versions), offset + limit);
}
public final EntityHistory listVersions(UUID id) {
T latest = setFieldsInternal(find(id, ALL), putFields);
setInheritedFields(latest, putFields);