diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index c3e14b0bf9f..c2b5a36591a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -2821,6 +2821,11 @@ public interface CollectionDAO { @SqlUpdate("DELETE FROM entity_extension_time_series WHERE entityFQN = :entityFQN") void deleteAll(@Bind("entityFQN") String entityFQN); + // This just saves the limit number of records, and remove all other with given extension + @SqlUpdate( + "DELETE FROM entity_extension_time_series WHERE extension = :extension AND entityFQN NOT IN(SELECT entityFQN FROM (select * from entity_extension_time_series WHERE extension = :extension ORDER BY timestamp DESC LIMIT :records) AS subquery)") + void deleteLastRecords(@Bind("extension") String extension, @Bind("records") int noOfRecord); + @SqlUpdate( "DELETE FROM entity_extension_time_series WHERE entityFQN = :entityFQN AND extension = :extension AND timestamp = :timestamp") void deleteAtTimestamp( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java index d979894a3bb..6d9db7a00eb 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java @@ -54,6 +54,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -394,9 +395,9 @@ public abstract class EntityRepository { @Transaction public ResultList listAfterWithSkipFailure( UriInfo uriInfo, Fields fields, ListFilter filter, int limitParam, String after) throws IOException { - List errors = new ArrayList<>(); + Map errors = new LinkedHashMap<>(); + Map entities = new LinkedHashMap<>(); int total = dao.listCount(filter); - List entities = new ArrayList<>(); if (limitParam > 0) { // forward scrolling, if after == null then first page is being asked List jsons = dao.listAfter(filter, limitParam + 1, after == null ? "" : RestUtil.decodeCursor(after)); @@ -404,24 +405,31 @@ public abstract class EntityRepository { for (String json : jsons) { try { T entity = withHref(uriInfo, setFieldsInternal(JsonUtils.readValue(json, entityClass), fields)); - entities.add(entity); + entities.put(entity.getId(), entity); } catch (Exception e) { LOG.error("Failed in Set Fields for Entity with Json : {}", json); - errors.add(json); + errors.put(JsonUtils.readValue(json, entityClass).getId(), json); } } String beforeCursor; String afterCursor = null; - beforeCursor = after == null ? null : entities.get(0).getFullyQualifiedName(); - if (entities.size() > limitParam) { // If extra result exists, then next page exists - return after cursor - entities.remove(limitParam); - afterCursor = entities.get(limitParam - 1).getFullyQualifiedName(); + beforeCursor = after == null ? null : JsonUtils.readValue(jsons.get(0), entityClass).getFullyQualifiedName(); + if (jsons.size() > limitParam) { + T lastReadEntity = JsonUtils.readValue(jsons.get(limitParam), entityClass); + entities.remove(lastReadEntity.getId()); + afterCursor = JsonUtils.readValue(jsons.get(limitParam - 1), entityClass).getFullyQualifiedName(); + errors.forEach((key, value) -> entities.remove(key)); + // Remove the Last Json Entry if present in error, since the read was actually just till limitParam , and if + // error + // is there it will come in next read + errors.remove(lastReadEntity.getId()); } - return getResultList(entities, errors, beforeCursor, afterCursor, total); + return getResultList( + new ArrayList<>(entities.values()), new ArrayList<>(errors.values()), beforeCursor, afterCursor, total); } else { // limit == 0 , return total count of entity. - return getResultList(entities, errors, null, null, total); + return getResultList(new ArrayList<>(entities.values()), new ArrayList<>(errors.values()), null, null, total); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/ReIndexingHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/ReIndexingHandler.java index 72384d9724d..524cecde067 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/ReIndexingHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/ReIndexingHandler.java @@ -35,6 +35,7 @@ import javax.ws.rs.core.Response; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.elasticsearch.client.RestHighLevelClient; +import org.openmetadata.common.utils.CommonUtil; import org.openmetadata.schema.api.CreateEventPublisherJob; import org.openmetadata.schema.system.EventPublisherJob; import org.openmetadata.schema.system.Failure; @@ -42,6 +43,7 @@ import org.openmetadata.schema.system.Stats; import org.openmetadata.service.Entity; import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition; import org.openmetadata.service.exception.CustomExceptionMessage; +import org.openmetadata.service.exception.UnhandledServerException; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.workflows.searchIndex.ReindexingUtil; import org.openmetadata.service.workflows.searchIndex.SearchIndexWorkflow; @@ -91,10 +93,10 @@ public class ReIndexingHandler { // Create new Task if (taskQueue.size() >= 5) { - throw new RuntimeException("Cannot create new Reindexing Jobs. There are pending jobs."); + throw new UnhandledServerException("Cannot create new Reindexing Jobs. There are pending jobs."); } if (((ThreadPoolExecutor) threadScheduler).getActiveCount() > 5) { - throw new RuntimeException("Thread unavailable to run the jobs. There are pending jobs."); + throw new UnhandledServerException("Thread unavailable to run the jobs. There are pending jobs."); } else { EventPublisherJob jobData = getReindexJob(startedBy, createReindexingJob); List activeJobs = new ArrayList<>(REINDEXING_JOB_MAP.values()); @@ -106,7 +108,14 @@ public class ReIndexingHandler { LOG.info("Reindexing triggered for the following Entities: {}", entityList); - if (entityList.size() > 0) { + if (!entityList.isEmpty()) { + // Check if the after cursor is provided + if (!CommonUtil.nullOrEmpty(jobData.getAfterCursor()) && entityList.size() > 1) { + throw new IllegalArgumentException("After Cursor can only be associated with one entity"); + } + + // Remove previous run, + dao.entityExtensionTimeSeriesDao().deleteLastRecords(REINDEXING_JOB_EXTENSION, 5); // Create Entry in the DB dao.entityExtensionTimeSeriesDao() .insert( @@ -120,7 +129,8 @@ public class ReIndexingHandler { REINDEXING_JOB_MAP.put(jobData.getId(), job); return jobData; } else { - throw new RuntimeException("There are already executing Jobs working on the same Entities. Please try later."); + throw new UnhandledServerException( + "There are already executing Jobs working on the same Entities. Please try later."); } } } @@ -129,7 +139,7 @@ public class ReIndexingHandler { REINDEXING_JOB_MAP .entrySet() .removeIf( - (entry) -> + entry -> entry.getValue().getJobData().getStatus() != EventPublisherJob.Status.STARTED && entry.getValue().getJobData().getStatus() != EventPublisherJob.Status.RUNNING); } @@ -144,12 +154,13 @@ public class ReIndexingHandler { } private void validateJob(CreateEventPublisherJob job) { + // Check valid Entities are provided Objects.requireNonNull(job); Set storedEntityList = new HashSet<>(Entity.getEntityList()); - if (job.getEntities().size() > 0) { + if (!job.getEntities().isEmpty()) { job.getEntities() .forEach( - (entityType) -> { + entityType -> { if (!storedEntityList.contains(entityType) && !ReindexingUtil.isDataInsightIndex(entityType)) { throw new IllegalArgumentException( String.format("Entity Type : %s is not a valid Entity", entityType)); @@ -176,7 +187,7 @@ public class ReIndexingHandler { public EventPublisherJob getLatestJob() throws IOException { List activeJobs = new ArrayList<>(REINDEXING_JOB_MAP.values()); - if (activeJobs.size() > 0) { + if (!activeJobs.isEmpty()) { return activeJobs.get(activeJobs.size() - 1).getJobData(); } else { String recordString = dao.entityExtensionTimeSeriesDao().getLatestByExtension(REINDEXING_JOB_EXTENSION); @@ -193,7 +204,7 @@ public class ReIndexingHandler { JsonUtils.readObjects( dao.entityExtensionTimeSeriesDao().getAllByExtension(REINDEXING_JOB_EXTENSION), EventPublisherJob.class); jobsFromDatabase.removeIf( - (job) -> { + job -> { for (EventPublisherJob active : activeEventPubJob) { if (active.getId().equals(job.getId())) { return true; @@ -222,6 +233,7 @@ public class ReIndexingHandler { .withBatchSize(job.getBatchSize()) .withFailure(new Failure()) .withRecreateIndex(job.getRecreateIndex()) - .withSearchIndexMappingLanguage(job.getSearchIndexMappingLanguage()); + .withSearchIndexMappingLanguage(job.getSearchIndexMappingLanguage()) + .withAfterCursor(job.getAfterCursor()); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java index 75648414fe4..49490f391ff 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java @@ -36,6 +36,8 @@ public class PaginatedEntitiesSource implements Source fields; private final StepStats stats = new StepStats(); + private String lastFailedCursor = null; + private String cursor = null; @Getter private boolean isDone = false; @@ -68,10 +70,16 @@ public class PaginatedEntitiesSource implements Source 0) { + if (!result.getErrors().isEmpty()) { + lastFailedCursor = this.cursor; result .getErrors() - .forEach((error) -> LOG.error("[PaginatedEntitiesSource] Failed in getting Record, RECORD: {}", error)); + .forEach( + error -> + LOG.error( + "[PaginatedEntitiesSource] Failed in getting Record, After Cursor : {} , RECORD: {}", + result.getPaging().getAfter(), + error)); } LOG.debug( @@ -82,15 +90,24 @@ public class PaginatedEntitiesSource implements Source paginatedEntitiesSources = new ArrayList<>(); private final List paginatedDataInsightSources = new ArrayList<>(); private final EsEntitiesProcessor entitiesProcessor; @@ -75,12 +77,17 @@ public class SearchIndexWorkflow implements Runnable { request .getEntities() .forEach( - (entityType) -> { + entityType -> { if (!isDataInsightIndex(entityType)) { List fields = new ArrayList<>( Objects.requireNonNull(getIndexFields(entityType, jobData.getSearchIndexMappingLanguage()))); - paginatedEntitiesSources.add(new PaginatedEntitiesSource(entityType, jobData.getBatchSize(), fields)); + PaginatedEntitiesSource source = + new PaginatedEntitiesSource(entityType, jobData.getBatchSize(), fields); + if (!CommonUtil.nullOrEmpty(request.getAfterCursor())) { + source.setCursor(request.getAfterCursor()); + } + paginatedEntitiesSources.add(source); } else { paginatedDataInsightSources.add( new PaginatedDataInsightSource(dao, entityType, jobData.getBatchSize())); @@ -107,7 +114,7 @@ public class SearchIndexWorkflow implements Runnable { } catch (Exception ex) { String error = String.format( - "Reindexing Job Has Encountered an Exception. \n Job Data: %s, \n Stack : %s ", + "Reindexing Job Has Encountered an Exception. %n Job Data: %s, %n Stack : %s ", jobData.toString(), ExceptionUtils.getStackTrace(ex)); LOG.error(error); jobData.setStatus(EventPublisherJob.Status.FAILED); @@ -136,13 +143,13 @@ public class SearchIndexWorkflow implements Runnable { try { resultList = paginatedEntitiesSource.readNext(null); requestToProcess = resultList.getData().size() + resultList.getErrors().size(); - if (resultList.getData().size() > 0) { + if (!resultList.getData().isEmpty()) { // process data to build Reindex Request BulkRequest requests = entitiesProcessor.process(resultList, contextData); // write the data to ElasticSearch BulkResponse response = searchIndexSink.write(requests, contextData); // update Status - handleErrors(resultList, response, currentTime); + handleErrors(resultList, paginatedEntitiesSource.getLastFailedCursor(), response, currentTime); // Update stats success = getSuccessFromBulkResponse(response); failed = requestToProcess - success; @@ -153,22 +160,28 @@ public class SearchIndexWorkflow implements Runnable { handleSourceError( rx.getMessage(), String.format( - "EntityType: %s \n Cause: %s \n Stack: %s", - paginatedEntitiesSource.getEntityType(), rx.getCause(), ExceptionUtils.getStackTrace(rx)), + ENTITY_TYPE_ERROR_MSG, + paginatedEntitiesSource.getEntityType(), + rx.getCause(), + ExceptionUtils.getStackTrace(rx)), currentTime); } catch (ProcessorException px) { handleProcessorError( px.getMessage(), String.format( - "EntityType: %s \n Cause: %s \n Stack: %s", - paginatedEntitiesSource.getEntityType(), px.getCause(), ExceptionUtils.getStackTrace(px)), + ENTITY_TYPE_ERROR_MSG, + paginatedEntitiesSource.getEntityType(), + px.getCause(), + ExceptionUtils.getStackTrace(px)), currentTime); } catch (SinkException wx) { handleEsSinkError( wx.getMessage(), String.format( - "EntityType: %s \n Cause: %s \n Stack: %s", - paginatedEntitiesSource.getEntityType(), wx.getCause(), ExceptionUtils.getStackTrace(wx)), + ENTITY_TYPE_ERROR_MSG, + paginatedEntitiesSource.getEntityType(), + wx.getCause(), + ExceptionUtils.getStackTrace(wx)), currentTime); } finally { updateStats( @@ -197,14 +210,14 @@ public class SearchIndexWorkflow implements Runnable { try { resultList = paginatedDataInsightSource.readNext(null); requestToProcess = resultList.getData().size() + resultList.getErrors().size(); - if (resultList.getData().size() > 0) { + if (!resultList.getData().isEmpty()) { // process data to build Reindex Request BulkRequest requests = dataInsightProcessor.process(resultList, contextData); // write the data to ElasticSearch // write the data to ElasticSearch BulkResponse response = searchIndexSink.write(requests, contextData); // update Status - handleErrors(resultList, response, currentTime); + handleErrors(resultList, "", response, currentTime); // Update stats success = getSuccessFromBulkResponse(response); failed = requestToProcess - success; @@ -215,22 +228,28 @@ public class SearchIndexWorkflow implements Runnable { handleSourceError( rx.getMessage(), String.format( - "EntityType: %s \n Cause: %s \n Stack: %s", - paginatedDataInsightSource.getEntityType(), rx.getCause(), ExceptionUtils.getStackTrace(rx)), + ENTITY_TYPE_ERROR_MSG, + paginatedDataInsightSource.getEntityType(), + rx.getCause(), + ExceptionUtils.getStackTrace(rx)), currentTime); } catch (ProcessorException px) { handleProcessorError( px.getMessage(), String.format( - "EntityType: %s \n Cause: %s \n Stack: %s", - paginatedDataInsightSource.getEntityType(), px.getCause(), ExceptionUtils.getStackTrace(px)), + ENTITY_TYPE_ERROR_MSG, + paginatedDataInsightSource.getEntityType(), + px.getCause(), + ExceptionUtils.getStackTrace(px)), currentTime); } catch (SinkException wx) { handleEsSinkError( wx.getMessage(), String.format( - "EntityType: %s \n Cause: %s \n Stack: %s", - paginatedDataInsightSource.getEntityType(), wx.getCause(), ExceptionUtils.getStackTrace(wx)), + ENTITY_TYPE_ERROR_MSG, + paginatedDataInsightSource.getEntityType(), + wx.getCause(), + ExceptionUtils.getStackTrace(wx)), currentTime); } finally { updateStats( @@ -290,7 +309,7 @@ public class SearchIndexWorkflow implements Runnable { } private void reCreateIndexes(String entityType) { - if (!jobData.getRecreateIndex()) { + if (Boolean.FALSE.equals(jobData.getRecreateIndex())) { return; } @@ -302,8 +321,8 @@ public class SearchIndexWorkflow implements Runnable { elasticSearchIndexDefinition.createIndex(indexType, jobData.getSearchIndexMappingLanguage().value()); } - private void handleErrors(ResultList data, BulkResponse response, long time) { - handleSourceError(data, time); + private void handleErrors(ResultList data, String lastCursor, BulkResponse response, long time) { + handleSourceError(data, lastCursor, time); handleEsSinkErrors(response, time); } @@ -336,10 +355,10 @@ public class SearchIndexWorkflow implements Runnable { } @SneakyThrows - private void handleSourceError(ResultList data, long time) { - if (data.getErrors().size() > 0) { + private void handleSourceError(ResultList data, String lastCursor, long time) { + if (!data.getErrors().isEmpty()) { handleSourceError( - "SourceContext: Encountered Error While Reading Data", + String.format("SourceContext: After Cursor : %s, Encountered Error While Reading Data.", lastCursor), String.format( "Following Entities were not fetched Successfully : %s", JsonUtils.pojoToJson(data.getErrors())), time); @@ -356,20 +375,20 @@ public class SearchIndexWorkflow implements Runnable { new FailureDetails() .withContext( String.format( - "EsWriterContext: Encountered Error While Writing Data \n Entity \n ID : [%s] ", + "EsWriterContext: Encountered Error While Writing Data %n Entity %n ID : [%s] ", failure.getId())) .withLastFailedReason( String.format( - "Index Type: [%s], Reason: [%s] \n Trace : [%s]", + "Index Type: [%s], Reason: [%s] %n Trace : [%s]", failure.getIndex(), failure.getMessage(), ExceptionUtils.getStackTrace(failure.getCause()))) .withLastFailedAt(System.currentTimeMillis()); details.add(esFailure); } } - if (details.size() > 0) { + if (!details.isEmpty()) { handleEsSinkError( "[EsWriter] BulkResponseItems", - String.format("[BulkItemResponse] Got Following Error Responses: \n %s ", JsonUtils.pojoToJson(details)), + String.format("[BulkItemResponse] Got Following Error Responses: %n %s ", JsonUtils.pojoToJson(details)), time); } } diff --git a/openmetadata-spec/src/main/resources/json/schema/api/createEventPublisherJob.json b/openmetadata-spec/src/main/resources/json/schema/api/createEventPublisherJob.json index 619956f85a8..11489c09c7d 100644 --- a/openmetadata-spec/src/main/resources/json/schema/api/createEventPublisherJob.json +++ b/openmetadata-spec/src/main/resources/json/schema/api/createEventPublisherJob.json @@ -38,6 +38,10 @@ "searchIndexMappingLanguage": { "description": "Recreate Indexes with updated Language", "$ref": "../configuration/elasticSearchConfiguration.json#/definitions/searchIndexMappingLanguage" + }, + "afterCursor": { + "description": "Provide After in case of failure to start reindexing after the issue is solved", + "type": "string" } }, "additionalProperties": false diff --git a/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json b/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json index ba70e989059..3c17442b7f1 100644 --- a/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json +++ b/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json @@ -177,6 +177,10 @@ "searchIndexMappingLanguage": { "description": "Recreate Indexes with updated Language", "$ref": "../configuration/elasticSearchConfiguration.json#/definitions/searchIndexMappingLanguage" + }, + "afterCursor": { + "description": "Provide After in case of failure to start reindexing after the issue is solved", + "type": "string" } }, "required": ["id", "runMode", "timestamp", "status"],