Update Pagination with offset and limit for Reindexing (#12553)

This commit is contained in:
Mohit Yadav 2023-07-25 18:39:23 +05:30 committed by GitHub
parent be5883bd3a
commit c86bda9d94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 24 deletions

View File

@ -58,7 +58,6 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
@ -466,41 +465,30 @@ public abstract class EntityRepository<T extends EntityInterface> {
@Transaction @Transaction
public ResultList<T> listAfterWithSkipFailure( public ResultList<T> listAfterWithSkipFailure(
UriInfo uriInfo, Fields fields, ListFilter filter, int limitParam, String after) throws IOException { UriInfo uriInfo, Fields fields, ListFilter filter, int limitParam, String after) throws IOException {
Map<UUID, String> errors = new LinkedHashMap<>(); List<String> errors = new ArrayList<>();
Map<UUID, T> entities = new LinkedHashMap<>(); List<T> entities = new ArrayList<>();
int beforeOffset = Integer.parseInt(RestUtil.decodeCursor(after));
int currentOffset = beforeOffset;
int total = dao.listCount(filter); int total = dao.listCount(filter);
if (limitParam > 0) { if (limitParam > 0) {
// forward scrolling, if after == null then first page is being asked // forward scrolling, if after == null then first page is being asked
List<String> jsons = dao.listAfter(filter, limitParam + 1, after == null ? "" : RestUtil.decodeCursor(after)); List<String> jsons = dao.listAfterWithOffset(limitParam, currentOffset);
for (String json : jsons) { for (String json : jsons) {
try { try {
T entity = withHref(uriInfo, setFieldsInternal(JsonUtils.readValue(json, entityClass), fields)); T entity = withHref(uriInfo, setFieldsInternal(JsonUtils.readValue(json, entityClass), fields));
entities.put(entity.getId(), entity); entities.add(entity);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Failed in Set Fields for Entity with Json : {}", json); LOG.error("Failed in Set Fields for Entity with Json : {}", json);
errors.put(JsonUtils.readValue(json, entityClass).getId(), json); errors.add(json);
} }
} }
currentOffset = currentOffset + limitParam;
String beforeCursor; String newAfter = currentOffset > total ? null : String.valueOf(currentOffset);
String afterCursor = null; return getResultList(entities, errors, String.valueOf(beforeOffset), newAfter, total);
beforeCursor = after == null ? null : JsonUtils.readValue(jsons.get(0), entityClass).getName();
if (jsons.size() > limitParam) {
T lastReadEntity = JsonUtils.readValue(jsons.get(limitParam), entityClass);
entities.remove(lastReadEntity.getId());
afterCursor = JsonUtils.readValue(jsons.get(limitParam - 1), entityClass).getName();
errors.forEach((key, value) -> entities.remove(key));
// Remove the Last Json Entry if present in error, since the read was actually just till limitParam , and if
// error
// is there it will come in next read
errors.remove(lastReadEntity.getId());
}
return getResultList(
new ArrayList<>(entities.values()), new ArrayList<>(errors.values()), beforeCursor, afterCursor, total);
} else { } else {
// limit == 0 , return total count of entity. // limit == 0 , return total count of entity.
return getResultList(new ArrayList<>(entities.values()), new ArrayList<>(errors.values()), null, null, total); return getResultList(entities, errors, null, null, total);
} }
} }

View File

@ -27,6 +27,7 @@ import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.SourceException; import org.openmetadata.service.exception.SourceException;
import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.ListFilter; import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.util.RestUtil;
import org.openmetadata.service.util.ResultList; import org.openmetadata.service.util.ResultList;
import org.openmetadata.service.workflows.interfaces.Source; import org.openmetadata.service.workflows.interfaces.Source;
@ -38,7 +39,7 @@ public class PaginatedEntitiesSource implements Source<ResultList<? extends Enti
private final StepStats stats = new StepStats(); private final StepStats stats = new StepStats();
private String lastFailedCursor = null; private String lastFailedCursor = null;
private String cursor = null; private String cursor = RestUtil.encodeCursor("0");
@Getter private boolean isDone = false; @Getter private boolean isDone = false;
public PaginatedEntitiesSource(String entityType, int batchSize, List<String> fields) { public PaginatedEntitiesSource(String entityType, int batchSize, List<String> fields) {