[Reindexing] Fixes and Addition for Reindexing (#11447)

* Remove Entries on Reindexing

* Fix Reindexing Count Issues

* Reindexing Cursor Print in case of error

* Add afterCursor to Request

* add comment for function
This commit is contained in:
Mohit Yadav 2023-05-05 14:19:51 +05:30 committed by GitHub
parent a0e2078320
commit 1b7fb192d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 130 additions and 53 deletions

View File

@ -2821,6 +2821,11 @@ public interface CollectionDAO {
@SqlUpdate("DELETE FROM entity_extension_time_series WHERE entityFQN = :entityFQN")
void deleteAll(@Bind("entityFQN") String entityFQN);
// This just saves the limit number of records, and remove all other with given extension
@SqlUpdate(
"DELETE FROM entity_extension_time_series WHERE extension = :extension AND entityFQN NOT IN(SELECT entityFQN FROM (select * from entity_extension_time_series WHERE extension = :extension ORDER BY timestamp DESC LIMIT :records) AS subquery)")
void deleteLastRecords(@Bind("extension") String extension, @Bind("records") int noOfRecord);
@SqlUpdate(
"DELETE FROM entity_extension_time_series WHERE entityFQN = :entityFQN AND extension = :extension AND timestamp = :timestamp")
void deleteAtTimestamp(

View File

@ -54,6 +54,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -394,9 +395,9 @@ public abstract class EntityRepository<T extends EntityInterface> {
@Transaction
public ResultList<T> listAfterWithSkipFailure(
UriInfo uriInfo, Fields fields, ListFilter filter, int limitParam, String after) throws IOException {
List<String> errors = new ArrayList<>();
Map<UUID, String> errors = new LinkedHashMap<>();
Map<UUID, T> entities = new LinkedHashMap<>();
int total = dao.listCount(filter);
List<T> entities = new ArrayList<>();
if (limitParam > 0) {
// forward scrolling, if after == null then first page is being asked
List<String> jsons = dao.listAfter(filter, limitParam + 1, after == null ? "" : RestUtil.decodeCursor(after));
@ -404,24 +405,31 @@ public abstract class EntityRepository<T extends EntityInterface> {
for (String json : jsons) {
try {
T entity = withHref(uriInfo, setFieldsInternal(JsonUtils.readValue(json, entityClass), fields));
entities.add(entity);
entities.put(entity.getId(), entity);
} catch (Exception e) {
LOG.error("Failed in Set Fields for Entity with Json : {}", json);
errors.add(json);
errors.put(JsonUtils.readValue(json, entityClass).getId(), json);
}
}
String beforeCursor;
String afterCursor = null;
beforeCursor = after == null ? null : entities.get(0).getFullyQualifiedName();
if (entities.size() > limitParam) { // If extra result exists, then next page exists - return after cursor
entities.remove(limitParam);
afterCursor = entities.get(limitParam - 1).getFullyQualifiedName();
beforeCursor = after == null ? null : JsonUtils.readValue(jsons.get(0), entityClass).getFullyQualifiedName();
if (jsons.size() > limitParam) {
T lastReadEntity = JsonUtils.readValue(jsons.get(limitParam), entityClass);
entities.remove(lastReadEntity.getId());
afterCursor = JsonUtils.readValue(jsons.get(limitParam - 1), entityClass).getFullyQualifiedName();
errors.forEach((key, value) -> entities.remove(key));
// Remove the Last Json Entry if present in error, since the read was actually just till limitParam , and if
// error
// is there it will come in next read
errors.remove(lastReadEntity.getId());
}
return getResultList(entities, errors, beforeCursor, afterCursor, total);
return getResultList(
new ArrayList<>(entities.values()), new ArrayList<>(errors.values()), beforeCursor, afterCursor, total);
} else {
// limit == 0 , return total count of entity.
return getResultList(entities, errors, null, null, total);
return getResultList(new ArrayList<>(entities.values()), new ArrayList<>(errors.values()), null, null, total);
}
}

View File

@ -35,6 +35,7 @@ import javax.ws.rs.core.Response;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.client.RestHighLevelClient;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.api.CreateEventPublisherJob;
import org.openmetadata.schema.system.EventPublisherJob;
import org.openmetadata.schema.system.Failure;
@ -42,6 +43,7 @@ import org.openmetadata.schema.system.Stats;
import org.openmetadata.service.Entity;
import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition;
import org.openmetadata.service.exception.CustomExceptionMessage;
import org.openmetadata.service.exception.UnhandledServerException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.workflows.searchIndex.ReindexingUtil;
import org.openmetadata.service.workflows.searchIndex.SearchIndexWorkflow;
@ -91,10 +93,10 @@ public class ReIndexingHandler {
// Create new Task
if (taskQueue.size() >= 5) {
throw new RuntimeException("Cannot create new Reindexing Jobs. There are pending jobs.");
throw new UnhandledServerException("Cannot create new Reindexing Jobs. There are pending jobs.");
}
if (((ThreadPoolExecutor) threadScheduler).getActiveCount() > 5) {
throw new RuntimeException("Thread unavailable to run the jobs. There are pending jobs.");
throw new UnhandledServerException("Thread unavailable to run the jobs. There are pending jobs.");
} else {
EventPublisherJob jobData = getReindexJob(startedBy, createReindexingJob);
List<SearchIndexWorkflow> activeJobs = new ArrayList<>(REINDEXING_JOB_MAP.values());
@ -106,7 +108,14 @@ public class ReIndexingHandler {
LOG.info("Reindexing triggered for the following Entities: {}", entityList);
if (entityList.size() > 0) {
if (!entityList.isEmpty()) {
// Check if the after cursor is provided
if (!CommonUtil.nullOrEmpty(jobData.getAfterCursor()) && entityList.size() > 1) {
throw new IllegalArgumentException("After Cursor can only be associated with one entity");
}
// Remove previous run,
dao.entityExtensionTimeSeriesDao().deleteLastRecords(REINDEXING_JOB_EXTENSION, 5);
// Create Entry in the DB
dao.entityExtensionTimeSeriesDao()
.insert(
@ -120,7 +129,8 @@ public class ReIndexingHandler {
REINDEXING_JOB_MAP.put(jobData.getId(), job);
return jobData;
} else {
throw new RuntimeException("There are already executing Jobs working on the same Entities. Please try later.");
throw new UnhandledServerException(
"There are already executing Jobs working on the same Entities. Please try later.");
}
}
}
@ -129,7 +139,7 @@ public class ReIndexingHandler {
REINDEXING_JOB_MAP
.entrySet()
.removeIf(
(entry) ->
entry ->
entry.getValue().getJobData().getStatus() != EventPublisherJob.Status.STARTED
&& entry.getValue().getJobData().getStatus() != EventPublisherJob.Status.RUNNING);
}
@ -144,12 +154,13 @@ public class ReIndexingHandler {
}
private void validateJob(CreateEventPublisherJob job) {
// Check valid Entities are provided
Objects.requireNonNull(job);
Set<String> storedEntityList = new HashSet<>(Entity.getEntityList());
if (job.getEntities().size() > 0) {
if (!job.getEntities().isEmpty()) {
job.getEntities()
.forEach(
(entityType) -> {
entityType -> {
if (!storedEntityList.contains(entityType) && !ReindexingUtil.isDataInsightIndex(entityType)) {
throw new IllegalArgumentException(
String.format("Entity Type : %s is not a valid Entity", entityType));
@ -176,7 +187,7 @@ public class ReIndexingHandler {
public EventPublisherJob getLatestJob() throws IOException {
List<SearchIndexWorkflow> activeJobs = new ArrayList<>(REINDEXING_JOB_MAP.values());
if (activeJobs.size() > 0) {
if (!activeJobs.isEmpty()) {
return activeJobs.get(activeJobs.size() - 1).getJobData();
} else {
String recordString = dao.entityExtensionTimeSeriesDao().getLatestByExtension(REINDEXING_JOB_EXTENSION);
@ -193,7 +204,7 @@ public class ReIndexingHandler {
JsonUtils.readObjects(
dao.entityExtensionTimeSeriesDao().getAllByExtension(REINDEXING_JOB_EXTENSION), EventPublisherJob.class);
jobsFromDatabase.removeIf(
(job) -> {
job -> {
for (EventPublisherJob active : activeEventPubJob) {
if (active.getId().equals(job.getId())) {
return true;
@ -222,6 +233,7 @@ public class ReIndexingHandler {
.withBatchSize(job.getBatchSize())
.withFailure(new Failure())
.withRecreateIndex(job.getRecreateIndex())
.withSearchIndexMappingLanguage(job.getSearchIndexMappingLanguage());
.withSearchIndexMappingLanguage(job.getSearchIndexMappingLanguage())
.withAfterCursor(job.getAfterCursor());
}
}

View File

@ -36,6 +36,8 @@ public class PaginatedEntitiesSource implements Source<ResultList<? extends Enti
@Getter private final String entityType;
@Getter private final List<String> fields;
private final StepStats stats = new StepStats();
private String lastFailedCursor = null;
private String cursor = null;
@Getter private boolean isDone = false;
@ -68,10 +70,16 @@ public class PaginatedEntitiesSource implements Source<ResultList<? extends Enti
result =
entityRepository.listAfterWithSkipFailure(
null, Entity.getFields(entityType, fields), new ListFilter(Include.ALL), batchSize, cursor);
if (result.getErrors().size() > 0) {
if (!result.getErrors().isEmpty()) {
lastFailedCursor = this.cursor;
result
.getErrors()
.forEach((error) -> LOG.error("[PaginatedEntitiesSource] Failed in getting Record, RECORD: {}", error));
.forEach(
error ->
LOG.error(
"[PaginatedEntitiesSource] Failed in getting Record, After Cursor : {} , RECORD: {}",
result.getPaging().getAfter(),
error));
}
LOG.debug(
@ -82,15 +90,24 @@ public class PaginatedEntitiesSource implements Source<ResultList<? extends Enti
updateStats(result.getData().size(), result.getErrors().size());
} catch (IOException e) {
lastFailedCursor = this.cursor;
LOG.debug(
"[PaginatedEntitiesSource] Batch Stats :- Submitted : {} Success: {} Failed: {}", batchSize, 0, batchSize);
"[PaginatedEntitiesSource] After Cursor : {}, Batch Stats :- Submitted : {} Success: {} Failed: {}",
this.lastFailedCursor,
batchSize,
0,
batchSize);
if (stats.getTotalRecords() - stats.getProcessedRecords() <= batchSize) {
isDone = true;
updateStats(0, stats.getTotalRecords() - stats.getProcessedRecords());
} else {
updateStats(0, batchSize);
}
throw new SourceException("[PaginatedEntitiesSource] Batch encountered Exception. Failing Completely.", e);
throw new SourceException(
String.format(
"[PaginatedEntitiesSource] After Cursor : %s, Batch encountered Exception. Failing Completely.",
this.lastFailedCursor),
e);
}
return result;
@ -111,4 +128,12 @@ public class PaginatedEntitiesSource implements Source<ResultList<? extends Enti
public StepStats getStats() {
return stats;
}
public String getLastFailedCursor() {
return lastFailedCursor;
}
public void setCursor(String cursor) {
this.cursor = cursor;
}
}

View File

@ -36,6 +36,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.analytics.ReportData;
import org.openmetadata.schema.system.EventPublisherJob;
@ -55,6 +56,7 @@ import org.openmetadata.service.util.ResultList;
@Slf4j
public class SearchIndexWorkflow implements Runnable {
private static final String ENTITY_TYPE_ERROR_MSG = "EntityType: %s %n Cause: %s %n Stack: %s";
private final List<PaginatedEntitiesSource> paginatedEntitiesSources = new ArrayList<>();
private final List<PaginatedDataInsightSource> paginatedDataInsightSources = new ArrayList<>();
private final EsEntitiesProcessor entitiesProcessor;
@ -75,12 +77,17 @@ public class SearchIndexWorkflow implements Runnable {
request
.getEntities()
.forEach(
(entityType) -> {
entityType -> {
if (!isDataInsightIndex(entityType)) {
List<String> fields =
new ArrayList<>(
Objects.requireNonNull(getIndexFields(entityType, jobData.getSearchIndexMappingLanguage())));
paginatedEntitiesSources.add(new PaginatedEntitiesSource(entityType, jobData.getBatchSize(), fields));
PaginatedEntitiesSource source =
new PaginatedEntitiesSource(entityType, jobData.getBatchSize(), fields);
if (!CommonUtil.nullOrEmpty(request.getAfterCursor())) {
source.setCursor(request.getAfterCursor());
}
paginatedEntitiesSources.add(source);
} else {
paginatedDataInsightSources.add(
new PaginatedDataInsightSource(dao, entityType, jobData.getBatchSize()));
@ -107,7 +114,7 @@ public class SearchIndexWorkflow implements Runnable {
} catch (Exception ex) {
String error =
String.format(
"Reindexing Job Has Encountered an Exception. \n Job Data: %s, \n Stack : %s ",
"Reindexing Job Has Encountered an Exception. %n Job Data: %s, %n Stack : %s ",
jobData.toString(), ExceptionUtils.getStackTrace(ex));
LOG.error(error);
jobData.setStatus(EventPublisherJob.Status.FAILED);
@ -136,13 +143,13 @@ public class SearchIndexWorkflow implements Runnable {
try {
resultList = paginatedEntitiesSource.readNext(null);
requestToProcess = resultList.getData().size() + resultList.getErrors().size();
if (resultList.getData().size() > 0) {
if (!resultList.getData().isEmpty()) {
// process data to build Reindex Request
BulkRequest requests = entitiesProcessor.process(resultList, contextData);
// write the data to ElasticSearch
BulkResponse response = searchIndexSink.write(requests, contextData);
// update Status
handleErrors(resultList, response, currentTime);
handleErrors(resultList, paginatedEntitiesSource.getLastFailedCursor(), response, currentTime);
// Update stats
success = getSuccessFromBulkResponse(response);
failed = requestToProcess - success;
@ -153,22 +160,28 @@ public class SearchIndexWorkflow implements Runnable {
handleSourceError(
rx.getMessage(),
String.format(
"EntityType: %s \n Cause: %s \n Stack: %s",
paginatedEntitiesSource.getEntityType(), rx.getCause(), ExceptionUtils.getStackTrace(rx)),
ENTITY_TYPE_ERROR_MSG,
paginatedEntitiesSource.getEntityType(),
rx.getCause(),
ExceptionUtils.getStackTrace(rx)),
currentTime);
} catch (ProcessorException px) {
handleProcessorError(
px.getMessage(),
String.format(
"EntityType: %s \n Cause: %s \n Stack: %s",
paginatedEntitiesSource.getEntityType(), px.getCause(), ExceptionUtils.getStackTrace(px)),
ENTITY_TYPE_ERROR_MSG,
paginatedEntitiesSource.getEntityType(),
px.getCause(),
ExceptionUtils.getStackTrace(px)),
currentTime);
} catch (SinkException wx) {
handleEsSinkError(
wx.getMessage(),
String.format(
"EntityType: %s \n Cause: %s \n Stack: %s",
paginatedEntitiesSource.getEntityType(), wx.getCause(), ExceptionUtils.getStackTrace(wx)),
ENTITY_TYPE_ERROR_MSG,
paginatedEntitiesSource.getEntityType(),
wx.getCause(),
ExceptionUtils.getStackTrace(wx)),
currentTime);
} finally {
updateStats(
@ -197,14 +210,14 @@ public class SearchIndexWorkflow implements Runnable {
try {
resultList = paginatedDataInsightSource.readNext(null);
requestToProcess = resultList.getData().size() + resultList.getErrors().size();
if (resultList.getData().size() > 0) {
if (!resultList.getData().isEmpty()) {
// process data to build Reindex Request
BulkRequest requests = dataInsightProcessor.process(resultList, contextData);
// write the data to ElasticSearch
// write the data to ElasticSearch
BulkResponse response = searchIndexSink.write(requests, contextData);
// update Status
handleErrors(resultList, response, currentTime);
handleErrors(resultList, "", response, currentTime);
// Update stats
success = getSuccessFromBulkResponse(response);
failed = requestToProcess - success;
@ -215,22 +228,28 @@ public class SearchIndexWorkflow implements Runnable {
handleSourceError(
rx.getMessage(),
String.format(
"EntityType: %s \n Cause: %s \n Stack: %s",
paginatedDataInsightSource.getEntityType(), rx.getCause(), ExceptionUtils.getStackTrace(rx)),
ENTITY_TYPE_ERROR_MSG,
paginatedDataInsightSource.getEntityType(),
rx.getCause(),
ExceptionUtils.getStackTrace(rx)),
currentTime);
} catch (ProcessorException px) {
handleProcessorError(
px.getMessage(),
String.format(
"EntityType: %s \n Cause: %s \n Stack: %s",
paginatedDataInsightSource.getEntityType(), px.getCause(), ExceptionUtils.getStackTrace(px)),
ENTITY_TYPE_ERROR_MSG,
paginatedDataInsightSource.getEntityType(),
px.getCause(),
ExceptionUtils.getStackTrace(px)),
currentTime);
} catch (SinkException wx) {
handleEsSinkError(
wx.getMessage(),
String.format(
"EntityType: %s \n Cause: %s \n Stack: %s",
paginatedDataInsightSource.getEntityType(), wx.getCause(), ExceptionUtils.getStackTrace(wx)),
ENTITY_TYPE_ERROR_MSG,
paginatedDataInsightSource.getEntityType(),
wx.getCause(),
ExceptionUtils.getStackTrace(wx)),
currentTime);
} finally {
updateStats(
@ -290,7 +309,7 @@ public class SearchIndexWorkflow implements Runnable {
}
private void reCreateIndexes(String entityType) {
if (!jobData.getRecreateIndex()) {
if (Boolean.FALSE.equals(jobData.getRecreateIndex())) {
return;
}
@ -302,8 +321,8 @@ public class SearchIndexWorkflow implements Runnable {
elasticSearchIndexDefinition.createIndex(indexType, jobData.getSearchIndexMappingLanguage().value());
}
private void handleErrors(ResultList<?> data, BulkResponse response, long time) {
handleSourceError(data, time);
private void handleErrors(ResultList<?> data, String lastCursor, BulkResponse response, long time) {
handleSourceError(data, lastCursor, time);
handleEsSinkErrors(response, time);
}
@ -336,10 +355,10 @@ public class SearchIndexWorkflow implements Runnable {
}
@SneakyThrows
private void handleSourceError(ResultList<?> data, long time) {
if (data.getErrors().size() > 0) {
private void handleSourceError(ResultList<?> data, String lastCursor, long time) {
if (!data.getErrors().isEmpty()) {
handleSourceError(
"SourceContext: Encountered Error While Reading Data",
String.format("SourceContext: After Cursor : %s, Encountered Error While Reading Data.", lastCursor),
String.format(
"Following Entities were not fetched Successfully : %s", JsonUtils.pojoToJson(data.getErrors())),
time);
@ -356,20 +375,20 @@ public class SearchIndexWorkflow implements Runnable {
new FailureDetails()
.withContext(
String.format(
"EsWriterContext: Encountered Error While Writing Data \n Entity \n ID : [%s] ",
"EsWriterContext: Encountered Error While Writing Data %n Entity %n ID : [%s] ",
failure.getId()))
.withLastFailedReason(
String.format(
"Index Type: [%s], Reason: [%s] \n Trace : [%s]",
"Index Type: [%s], Reason: [%s] %n Trace : [%s]",
failure.getIndex(), failure.getMessage(), ExceptionUtils.getStackTrace(failure.getCause())))
.withLastFailedAt(System.currentTimeMillis());
details.add(esFailure);
}
}
if (details.size() > 0) {
if (!details.isEmpty()) {
handleEsSinkError(
"[EsWriter] BulkResponseItems",
String.format("[BulkItemResponse] Got Following Error Responses: \n %s ", JsonUtils.pojoToJson(details)),
String.format("[BulkItemResponse] Got Following Error Responses: %n %s ", JsonUtils.pojoToJson(details)),
time);
}
}

View File

@ -38,6 +38,10 @@
"searchIndexMappingLanguage": {
"description": "Recreate Indexes with updated Language",
"$ref": "../configuration/elasticSearchConfiguration.json#/definitions/searchIndexMappingLanguage"
},
"afterCursor": {
"description": "Provide After in case of failure to start reindexing after the issue is solved",
"type": "string"
}
},
"additionalProperties": false

View File

@ -177,6 +177,10 @@
"searchIndexMappingLanguage": {
"description": "Recreate Indexes with updated Language",
"$ref": "../configuration/elasticSearchConfiguration.json#/definitions/searchIndexMappingLanguage"
},
"afterCursor": {
"description": "Provide After in case of failure to start reindexing after the issue is solved",
"type": "string"
}
},
"required": ["id", "runMode", "timestamp", "status"],