diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java index bd78b640ef8..b7693514ab4 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java @@ -22,6 +22,7 @@ import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.analytics.ReportData; import org.openmetadata.schema.entity.app.App; import org.openmetadata.schema.entity.app.AppRunRecord; +import org.openmetadata.schema.entity.app.AppRunType; import org.openmetadata.schema.entity.app.FailureContext; import org.openmetadata.schema.entity.app.SuccessContext; import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration; @@ -111,6 +112,16 @@ public class SearchIndexApp extends AbstractNativeApplication { LOG.info("Executing Reindexing Job with JobData : {}", jobData); // Update Job Status jobData.setStatus(EventPublisherJob.Status.RUNNING); + + // Make recreate as false for onDemand + AppRunType runType = + AppRunType.fromValue((String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType")); + + // Schedule Run has recreate as false always + if (runType.equals(AppRunType.Scheduled)) { + jobData.setRecreateIndex(false); + } + // Run ReIndexing entitiesReIndex(); dataInsightReindex(); @@ -123,7 +134,7 @@ public class SearchIndexApp extends AbstractNativeApplication { jobData.toString(), ExceptionUtils.getStackTrace(ex)); LOG.error(error); jobData.setStatus(EventPublisherJob.Status.FAILED); - handleJobError("Failure in Job: Check Stack", error, System.currentTimeMillis()); + handleJobError(error, System.currentTimeMillis()); } finally { // store job details in Database jobExecutionContext.getJobDetail().getJobDataMap().put(APP_RUN_STATS, jobData.getStats()); @@ -185,30 +196,15 @@ public class SearchIndexApp extends AbstractNativeApplication { } } catch (SourceException rx) { handleSourceError( - rx.getMessage(), - String.format( - ENTITY_TYPE_ERROR_MSG, - paginatedEntitiesSource.getEntityType(), - rx.getCause(), - ExceptionUtils.getStackTrace(rx)), + String.format(ENTITY_TYPE_ERROR_MSG, paginatedEntitiesSource.getEntityType(), rx.getCause(), ""), currentTime); } catch (ProcessorException px) { handleProcessorError( - px.getMessage(), - String.format( - ENTITY_TYPE_ERROR_MSG, - paginatedEntitiesSource.getEntityType(), - px.getCause(), - ExceptionUtils.getStackTrace(px)), + String.format(ENTITY_TYPE_ERROR_MSG, paginatedEntitiesSource.getEntityType(), px.getCause(), ""), currentTime); } catch (SinkException wx) { handleEsSinkError( - wx.getMessage(), - String.format( - ENTITY_TYPE_ERROR_MSG, - paginatedEntitiesSource.getEntityType(), - wx.getCause(), - ExceptionUtils.getStackTrace(wx)), + String.format(ENTITY_TYPE_ERROR_MSG, paginatedEntitiesSource.getEntityType(), wx.getCause(), ""), currentTime); } } @@ -246,30 +242,15 @@ public class SearchIndexApp extends AbstractNativeApplication { } } catch (SourceException rx) { handleSourceError( - rx.getMessage(), - String.format( - ENTITY_TYPE_ERROR_MSG, - paginatedDataInsightSource.getEntityType(), - rx.getCause(), - ExceptionUtils.getStackTrace(rx)), + String.format(ENTITY_TYPE_ERROR_MSG, paginatedDataInsightSource.getEntityType(), rx.getCause(), ""), currentTime); } catch (ProcessorException px) { handleProcessorError( - px.getMessage(), - String.format( - ENTITY_TYPE_ERROR_MSG, - paginatedDataInsightSource.getEntityType(), - px.getCause(), - ExceptionUtils.getStackTrace(px)), + String.format(ENTITY_TYPE_ERROR_MSG, paginatedDataInsightSource.getEntityType(), px.getCause(), ""), currentTime); } catch (SinkException wx) { handleEsSinkError( - wx.getMessage(), - String.format( - ENTITY_TYPE_ERROR_MSG, - paginatedDataInsightSource.getEntityType(), - wx.getCause(), - ExceptionUtils.getStackTrace(wx)), + String.format(ENTITY_TYPE_ERROR_MSG, paginatedDataInsightSource.getEntityType(), wx.getCause(), ""), currentTime); } } @@ -335,29 +316,28 @@ public class SearchIndexApp extends AbstractNativeApplication { handleEsSinkErrors(response, time); } - private void handleSourceError(String context, String reason, long time) { - handleError("source", context, reason, time); + private void handleSourceError(String reason, long time) { + handleError("source", reason, time); } - private void handleProcessorError(String context, String reason, long time) { - handleError("processor", context, reason, time); + private void handleProcessorError(String reason, long time) { + handleError("processor", reason, time); } - private void handleError(String errType, String context, String reason, long time) { + private void handleError(String errType, String reason, long time) { Failure failures = jobData.getFailure() != null ? jobData.getFailure() : new Failure(); failures.withAdditionalProperty("errorFrom", errType); - failures.withAdditionalProperty("context", context); failures.withAdditionalProperty("lastFailedReason", reason); failures.withAdditionalProperty("lastFailedAt", time); jobData.setFailure(failures); } - private void handleEsSinkError(String context, String reason, long time) { - handleError("sink", context, reason, time); + private void handleEsSinkError(String reason, long time) { + handleError("sink", reason, time); } - private void handleJobError(String context, String reason, long time) { - handleError("job", context, reason, time); + private void handleJobError(String reason, long time) { + handleError("job", reason, time); } @SneakyThrows @@ -369,8 +349,9 @@ public class SearchIndexApp extends AbstractNativeApplication { builder.append("%n"); } handleSourceError( - String.format("SourceContext: After Cursor : %s, Encountered Error While Reading Data.", lastCursor), - String.format("Following Entities were not fetched Successfully : %s", builder), + String.format( + "SourceContext: After Cursor : %s, Encountered Error While Reading Data. Following Entities were not fetched Successfully : %s", + lastCursor, builder), time); } } @@ -397,15 +378,15 @@ public class SearchIndexApp extends AbstractNativeApplication { } if (!details.isEmpty()) { handleEsSinkError( - "[EsWriter] BulkResponseItems", - String.format("[BulkItemResponse] Got Following Error Responses: %n %s ", JsonUtils.pojoToJson(details)), + String.format( + "[EsWriter][BulkItemResponse] Got Following Error Responses: %n %s ", + JsonUtils.pojoToJson(details, true)), time); } } @SneakyThrows private void handleEsSinkErrors(BulkResponse response, long time) { - List> details = new ArrayList<>(); for (BulkItemResponse bulkItemResponse : response) { if (bulkItemResponse.isFailed()) { @@ -426,8 +407,8 @@ public class SearchIndexApp extends AbstractNativeApplication { } if (!details.isEmpty()) { handleEsSinkError( - "[EsWriter] BulkResponseItems", - String.format("[BulkItemResponse] Got Following Error Responses: %n %s ", JsonUtils.pojoToJson(details)), + String.format( + "[EsWriter][BulkItemResponse] Got Following Error Responses: %s ", JsonUtils.pojoToJson(details, true)), time); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java index 5df6b13124b..5e6a8de5014 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java @@ -27,7 +27,6 @@ import org.openmetadata.service.Entity; import org.openmetadata.service.exception.SourceException; import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.jdbi3.ListFilter; -import org.openmetadata.service.util.JsonUtils; import org.openmetadata.service.util.RestUtil; import org.openmetadata.service.util.ResultList; import org.openmetadata.service.workflows.interfaces.Source; @@ -76,16 +75,18 @@ public class PaginatedEntitiesSource implements Source