Format Error on Indexing Application, and in schedule turn off recreate (#14225)

This commit is contained in:
Mohit Yadav 2023-12-05 21:29:36 +05:30 committed by GitHub
parent 8486b351f3
commit 7481bcc526
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 46 additions and 64 deletions

View File

@ -22,6 +22,7 @@ import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.analytics.ReportData;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.app.AppRunType;
import org.openmetadata.schema.entity.app.FailureContext;
import org.openmetadata.schema.entity.app.SuccessContext;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
@ -111,6 +112,16 @@ public class SearchIndexApp extends AbstractNativeApplication {
LOG.info("Executing Reindexing Job with JobData : {}", jobData);
// Update Job Status
jobData.setStatus(EventPublisherJob.Status.RUNNING);
// Make recreate as false for onDemand
AppRunType runType =
AppRunType.fromValue((String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType"));
// Schedule Run has recreate as false always
if (runType.equals(AppRunType.Scheduled)) {
jobData.setRecreateIndex(false);
}
// Run ReIndexing
entitiesReIndex();
dataInsightReindex();
@ -123,7 +134,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
jobData.toString(), ExceptionUtils.getStackTrace(ex));
LOG.error(error);
jobData.setStatus(EventPublisherJob.Status.FAILED);
handleJobError("Failure in Job: Check Stack", error, System.currentTimeMillis());
handleJobError(error, System.currentTimeMillis());
} finally {
// store job details in Database
jobExecutionContext.getJobDetail().getJobDataMap().put(APP_RUN_STATS, jobData.getStats());
@ -185,30 +196,15 @@ public class SearchIndexApp extends AbstractNativeApplication {
}
} catch (SourceException rx) {
handleSourceError(
rx.getMessage(),
String.format(
ENTITY_TYPE_ERROR_MSG,
paginatedEntitiesSource.getEntityType(),
rx.getCause(),
ExceptionUtils.getStackTrace(rx)),
String.format(ENTITY_TYPE_ERROR_MSG, paginatedEntitiesSource.getEntityType(), rx.getCause(), ""),
currentTime);
} catch (ProcessorException px) {
handleProcessorError(
px.getMessage(),
String.format(
ENTITY_TYPE_ERROR_MSG,
paginatedEntitiesSource.getEntityType(),
px.getCause(),
ExceptionUtils.getStackTrace(px)),
String.format(ENTITY_TYPE_ERROR_MSG, paginatedEntitiesSource.getEntityType(), px.getCause(), ""),
currentTime);
} catch (SinkException wx) {
handleEsSinkError(
wx.getMessage(),
String.format(
ENTITY_TYPE_ERROR_MSG,
paginatedEntitiesSource.getEntityType(),
wx.getCause(),
ExceptionUtils.getStackTrace(wx)),
String.format(ENTITY_TYPE_ERROR_MSG, paginatedEntitiesSource.getEntityType(), wx.getCause(), ""),
currentTime);
}
}
@ -246,30 +242,15 @@ public class SearchIndexApp extends AbstractNativeApplication {
}
} catch (SourceException rx) {
handleSourceError(
rx.getMessage(),
String.format(
ENTITY_TYPE_ERROR_MSG,
paginatedDataInsightSource.getEntityType(),
rx.getCause(),
ExceptionUtils.getStackTrace(rx)),
String.format(ENTITY_TYPE_ERROR_MSG, paginatedDataInsightSource.getEntityType(), rx.getCause(), ""),
currentTime);
} catch (ProcessorException px) {
handleProcessorError(
px.getMessage(),
String.format(
ENTITY_TYPE_ERROR_MSG,
paginatedDataInsightSource.getEntityType(),
px.getCause(),
ExceptionUtils.getStackTrace(px)),
String.format(ENTITY_TYPE_ERROR_MSG, paginatedDataInsightSource.getEntityType(), px.getCause(), ""),
currentTime);
} catch (SinkException wx) {
handleEsSinkError(
wx.getMessage(),
String.format(
ENTITY_TYPE_ERROR_MSG,
paginatedDataInsightSource.getEntityType(),
wx.getCause(),
ExceptionUtils.getStackTrace(wx)),
String.format(ENTITY_TYPE_ERROR_MSG, paginatedDataInsightSource.getEntityType(), wx.getCause(), ""),
currentTime);
}
}
@ -335,29 +316,28 @@ public class SearchIndexApp extends AbstractNativeApplication {
handleEsSinkErrors(response, time);
}
private void handleSourceError(String context, String reason, long time) {
handleError("source", context, reason, time);
private void handleSourceError(String reason, long time) {
handleError("source", reason, time);
}
private void handleProcessorError(String context, String reason, long time) {
handleError("processor", context, reason, time);
private void handleProcessorError(String reason, long time) {
handleError("processor", reason, time);
}
private void handleError(String errType, String context, String reason, long time) {
private void handleError(String errType, String reason, long time) {
Failure failures = jobData.getFailure() != null ? jobData.getFailure() : new Failure();
failures.withAdditionalProperty("errorFrom", errType);
failures.withAdditionalProperty("context", context);
failures.withAdditionalProperty("lastFailedReason", reason);
failures.withAdditionalProperty("lastFailedAt", time);
jobData.setFailure(failures);
}
private void handleEsSinkError(String context, String reason, long time) {
handleError("sink", context, reason, time);
private void handleEsSinkError(String reason, long time) {
handleError("sink", reason, time);
}
private void handleJobError(String context, String reason, long time) {
handleError("job", context, reason, time);
private void handleJobError(String reason, long time) {
handleError("job", reason, time);
}
@SneakyThrows
@ -369,8 +349,9 @@ public class SearchIndexApp extends AbstractNativeApplication {
builder.append("%n");
}
handleSourceError(
String.format("SourceContext: After Cursor : %s, Encountered Error While Reading Data.", lastCursor),
String.format("Following Entities were not fetched Successfully : %s", builder),
String.format(
"SourceContext: After Cursor : %s, Encountered Error While Reading Data. Following Entities were not fetched Successfully : %s",
lastCursor, builder),
time);
}
}
@ -397,15 +378,15 @@ public class SearchIndexApp extends AbstractNativeApplication {
}
if (!details.isEmpty()) {
handleEsSinkError(
"[EsWriter] BulkResponseItems",
String.format("[BulkItemResponse] Got Following Error Responses: %n %s ", JsonUtils.pojoToJson(details)),
String.format(
"[EsWriter][BulkItemResponse] Got Following Error Responses: %n %s ",
JsonUtils.pojoToJson(details, true)),
time);
}
}
@SneakyThrows
private void handleEsSinkErrors(BulkResponse response, long time) {
List<Map<String, Object>> details = new ArrayList<>();
for (BulkItemResponse bulkItemResponse : response) {
if (bulkItemResponse.isFailed()) {
@ -426,8 +407,8 @@ public class SearchIndexApp extends AbstractNativeApplication {
}
if (!details.isEmpty()) {
handleEsSinkError(
"[EsWriter] BulkResponseItems",
String.format("[BulkItemResponse] Got Following Error Responses: %n %s ", JsonUtils.pojoToJson(details)),
String.format(
"[EsWriter][BulkItemResponse] Got Following Error Responses: %s ", JsonUtils.pojoToJson(details, true)),
time);
}
}

View File

@ -27,7 +27,6 @@ import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.SourceException;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.RestUtil;
import org.openmetadata.service.util.ResultList;
import org.openmetadata.service.workflows.interfaces.Source;
@ -76,16 +75,18 @@ public class PaginatedEntitiesSource implements Source<ResultList<? extends Enti
null, Entity.getFields(entityType, fields), new ListFilter(Include.ALL), batchSize, cursor);
if (!result.getErrors().isEmpty()) {
lastFailedCursor = this.cursor;
String errMsg =
StringBuilder errMsg = new StringBuilder();
errMsg.append(
String.format(
"[PaginatedEntitiesSource] Encountered Failures. %n Marked After Cursor : %s, %n Batch Stats :- %n Submitted : %s Success: %s Failed: %s, %n Errors : %s",
this.lastFailedCursor,
batchSize,
result.getData().size(),
result.getErrors().size(),
JsonUtils.pojoToJson(result.getErrors()));
LOG.error(errMsg);
throw new SourceException(errMsg);
"[PaginatedEntitiesSource] Encountered Failures. %n Marked After Cursor : %s, %n Batch Stats :- %n Submitted : %s Success: %s Failed: %s, %n Errors : %n",
this.lastFailedCursor, batchSize, result.getData().size(), result.getErrors().size()));
for (int i = 0; i < result.getErrors().size(); i++) {
errMsg.append(String.format("%s. EntityError :- %s", i, result.getErrors().get(i)));
errMsg.append("%n");
}
String error = errMsg.toString();
LOG.error(error);
throw new SourceException(error);
}
LOG.debug(