fixes for Reindex (#10948)

* fixes for Reindex

* review comments
This commit is contained in:
Mohit Yadav 2023-04-06 10:50:08 +05:30 committed by GitHub
parent c53c7b680d
commit ec6ff78b80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 221 additions and 152 deletions

View File

@ -15,8 +15,8 @@ package org.openmetadata.service.exception;
import java.io.IOException;
public class ReaderException extends IOException {
public ReaderException(String msg, Throwable throwable) {
public class SinkException extends IOException {
public SinkException(String msg, Throwable throwable) {
super(msg, throwable);
}
}

View File

@ -15,8 +15,8 @@ package org.openmetadata.service.exception;
import java.io.IOException;
public class WriterException extends IOException {
public WriterException(String msg, Throwable throwable) {
public class SourceException extends IOException {
public SourceException(String msg, Throwable throwable) {
super(msg, throwable);
}
}

View File

@ -14,8 +14,8 @@
package org.openmetadata.service.workflows.interfaces;
import java.util.Map;
import org.openmetadata.service.exception.WriterException;
import org.openmetadata.service.exception.SinkException;
public interface Sink<I, O> extends Stats {
O write(I data, Map<String, Object> contextData) throws WriterException;
O write(I data, Map<String, Object> contextData) throws SinkException;
}

View File

@ -14,10 +14,10 @@
package org.openmetadata.service.workflows.interfaces;
import java.util.Map;
import org.openmetadata.service.exception.ReaderException;
import org.openmetadata.service.exception.SourceException;
public interface Source<R> extends Stats {
R readNext(Map<String, Object> contextData) throws ReaderException;
R readNext(Map<String, Object> contextData) throws SourceException;
void reset();
}

View File

@ -41,7 +41,7 @@ public class EsDataInsightProcessor implements Processor<ResultList<ReportData>,
public BulkRequest process(ResultList<ReportData> input, Map<String, Object> contextData) throws ProcessorException {
String entityType = (String) contextData.get(ENTITY_TYPE_KEY);
if (CommonUtil.nullOrEmpty(entityType)) {
throw new IllegalArgumentException("[EsEntitiesProcessor] entityType cannot be null or empty.");
throw new IllegalArgumentException("[EsDataInsightProcessor] entityType cannot be null or empty.");
}
LOG.debug(

View File

@ -16,7 +16,6 @@ package org.openmetadata.service.workflows.searchIndex;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getSuccessFromBulkResponse;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats;
import java.io.IOException;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.bulk.BulkRequest;
@ -24,21 +23,21 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.exception.WriterException;
import org.openmetadata.service.exception.SinkException;
import org.openmetadata.service.workflows.interfaces.Sink;
@Slf4j
public class EsSearchIndexWriter implements Sink<BulkRequest, BulkResponse> {
public class EsSearchIndexSink implements Sink<BulkRequest, BulkResponse> {
private final StepStats stats = new StepStats();
private final RestHighLevelClient client;
EsSearchIndexWriter(RestHighLevelClient client) {
EsSearchIndexSink(RestHighLevelClient client) {
this.client = client;
}
@Override
public BulkResponse write(BulkRequest data, Map<String, Object> contextData) throws WriterException {
LOG.debug("[EsSearchIndexWriter] Processing a Batch of Size: {}", data.numberOfActions());
public BulkResponse write(BulkRequest data, Map<String, Object> contextData) throws SinkException {
LOG.debug("[EsSearchIndexSink] Processing a Batch of Size: {}", data.numberOfActions());
try {
BulkResponse response = client.bulk(data, RequestOptions.DEFAULT);
int currentSuccess = getSuccessFromBulkResponse(response);
@ -46,21 +45,21 @@ public class EsSearchIndexWriter implements Sink<BulkRequest, BulkResponse> {
// Update Stats
LOG.debug(
"[EsSearchIndexWriter] Batch Stats :- Submitted : {} Success: {} Failed: {}",
"[EsSearchIndexSink] Batch Stats :- Submitted : {} Success: {} Failed: {}",
data.numberOfActions(),
currentSuccess,
currentFailed);
updateStats(currentSuccess, currentFailed);
return response;
} catch (IOException e) {
} catch (Exception e) {
LOG.debug(
"[EsSearchIndexWriter] Batch Stats :- Submitted : {} Success: {} Failed: {}",
"[EsSearchIndexSink] Batch Stats :- Submitted : {} Success: {} Failed: {}",
data.numberOfActions(),
0,
data.numberOfActions());
updateStats(0, data.numberOfActions());
throw new WriterException("[EsSearchIndexWriter] Batch encountered Exception. Failing Completely", e);
throw new SinkException("[EsSearchIndexSink] Batch encountered Exception. Failing Completely", e);
}
}

View File

@ -22,14 +22,14 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.analytics.ReportData;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.exception.ReaderException;
import org.openmetadata.service.exception.SourceException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.util.RestUtil;
import org.openmetadata.service.util.ResultList;
import org.openmetadata.service.workflows.interfaces.Source;
@Slf4j
public class PaginatedDataInsightReader implements Source<ResultList<ReportData>> {
public class PaginatedDataInsightSource implements Source<ResultList<ReportData>> {
private final CollectionDAO dao;
@Getter private final String entityType;
@Getter private final int batchSize;
@ -37,14 +37,15 @@ public class PaginatedDataInsightReader implements Source<ResultList<ReportData>
private String cursor = null;
@Getter private boolean isDone = false;
public PaginatedDataInsightReader(CollectionDAO dao, String entityType, int batchSize) {
public PaginatedDataInsightSource(CollectionDAO dao, String entityType, int batchSize) {
this.dao = dao;
this.entityType = entityType;
this.batchSize = batchSize;
stats.setTotalRecords(dao.entityExtensionTimeSeriesDao().listCount(entityType));
}
@Override
public ResultList<ReportData> readNext(Map<String, Object> contextData) throws ReaderException {
public ResultList<ReportData> readNext(Map<String, Object> contextData) throws SourceException {
if (!isDone) {
ResultList<ReportData> data = read(cursor);
cursor = data.getPaging().getAfter();
@ -63,7 +64,7 @@ public class PaginatedDataInsightReader implements Source<ResultList<ReportData>
isDone = false;
}
private ResultList<ReportData> read(String afterCursor) throws ReaderException {
private ResultList<ReportData> read(String afterCursor) throws SourceException {
LOG.debug("[DataInsightReader] Fetching a Batch of Size: {} ", batchSize);
ResultList<ReportData> result;
try {
@ -76,8 +77,13 @@ public class PaginatedDataInsightReader implements Source<ResultList<ReportData>
updateStats(result.getData().size(), result.getErrors().size());
} catch (Exception ex) {
LOG.debug("[DataInsightReader] Batch Stats :- Submitted : {} Success: {} Failed: {}", batchSize, 0, batchSize);
updateStats(0, batchSize);
throw new ReaderException("[EntitiesReader] Batch encountered Exception. Failing Completely.", ex);
if (stats.getTotalRecords() - stats.getProcessedRecords() <= batchSize) {
isDone = true;
updateStats(0, stats.getTotalRecords() - stats.getProcessedRecords());
} else {
updateStats(0, batchSize);
}
throw new SourceException("[EntitiesReader] Batch encountered Exception. Failing Completely.", ex);
}
return result;
@ -104,7 +110,7 @@ public class PaginatedDataInsightReader implements Source<ResultList<ReportData>
for (CollectionDAO.ReportDataRow reportDataRow : reportDataRowList) {
reportDataList.add(reportDataRow.getReportData());
}
return new ResultList<>(reportDataList, beforeCursor, afterCursor, total);
return new ResultList<>(reportDataList, new ArrayList<>(), beforeCursor, afterCursor, total);
}
@Override

View File

@ -24,14 +24,14 @@ import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.ReaderException;
import org.openmetadata.service.exception.SourceException;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.util.ResultList;
import org.openmetadata.service.workflows.interfaces.Source;
@Slf4j
public class PaginatedEntitiesReader implements Source<ResultList<? extends EntityInterface>> {
public class PaginatedEntitiesSource implements Source<ResultList<? extends EntityInterface>> {
@Getter private final int batchSize;
@Getter private final String entityType;
@Getter private final List<String> fields;
@ -39,14 +39,15 @@ public class PaginatedEntitiesReader implements Source<ResultList<? extends Enti
private String cursor = null;
@Getter private boolean isDone = false;
PaginatedEntitiesReader(String entityType, int batchSize, List<String> fields) {
PaginatedEntitiesSource(String entityType, int batchSize, List<String> fields) {
this.entityType = entityType;
this.batchSize = batchSize;
this.fields = fields;
this.stats.setTotalRecords(Entity.getEntityRepository(entityType).dao.listTotalCount());
}
@Override
public ResultList<? extends EntityInterface> readNext(Map<String, Object> contextData) throws ReaderException {
public ResultList<? extends EntityInterface> readNext(Map<String, Object> contextData) throws SourceException {
if (!isDone) {
ResultList<? extends EntityInterface> data = read(cursor);
cursor = data.getPaging().getAfter();
@ -59,10 +60,10 @@ public class PaginatedEntitiesReader implements Source<ResultList<? extends Enti
}
}
private ResultList<? extends EntityInterface> read(String cursor) throws ReaderException {
LOG.debug("[EntitiesReader] Fetching a Batch of Size: {} ", batchSize);
private ResultList<? extends EntityInterface> read(String cursor) throws SourceException {
LOG.debug("[PaginatedEntitiesSource] Fetching a Batch of Size: {} ", batchSize);
EntityRepository<?> entityRepository = Entity.getEntityRepository(entityType);
ResultList<? extends EntityInterface> result;
ResultList<? extends EntityInterface> result = null;
try {
result =
entityRepository.listAfterWithSkipFailure(
@ -70,20 +71,28 @@ public class PaginatedEntitiesReader implements Source<ResultList<? extends Enti
if (result.getErrors().size() > 0) {
result
.getErrors()
.forEach((error) -> LOG.error("[EntitiesReader] Failed in getting Record, RECORD: {}", error.toString()));
.forEach(
(error) ->
LOG.error("[PaginatedEntitiesSource] Failed in getting Record, RECORD: {}", error.toString()));
}
LOG.debug(
"[EntitiesReader] Batch Stats :- Submitted : {} Success: {} Failed: {}",
"[PaginatedEntitiesSource] Batch Stats :- Submitted : {} Success: {} Failed: {}",
batchSize,
result.getData().size(),
result.getErrors().size());
updateStats(result.getData().size(), result.getErrors().size());
} catch (IOException e) {
LOG.debug("[EntitiesReader] Batch Stats :- Submitted : {} Success: {} Failed: {}", batchSize, 0, batchSize);
updateStats(0, batchSize);
throw new ReaderException("[EntitiesReader] Batch encountered Exception. Failing Completely.", e);
LOG.debug(
"[PaginatedEntitiesSource] Batch Stats :- Submitted : {} Success: {} Failed: {}", batchSize, 0, batchSize);
if (stats.getTotalRecords() - stats.getProcessedRecords() <= batchSize) {
isDone = true;
updateStats(0, stats.getTotalRecords() - stats.getProcessedRecords());
} else {
updateStats(0, batchSize);
}
throw new SourceException("[PaginatedEntitiesSource] Batch encountered Exception. Failing Completely.", e);
}
return result;

View File

@ -19,15 +19,16 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.Entity;
import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
public class ReindexingUtil {
public static final String ENTITY_TYPE_KEY = "entityType";
public static void getUpdatedStats(StepStats stats, int currentSuccess, int currentFailed) {
stats.setTotalRecords(stats.getTotalRecords() + currentSuccess + currentFailed);
stats.setTotalSuccessRecords(stats.getTotalSuccessRecords() + currentSuccess);
stats.setTotalFailedRecords(stats.getTotalFailedRecords() + currentFailed);
stats.setProcessedRecords(stats.getProcessedRecords() + currentSuccess + currentFailed);
stats.setSuccessRecords(stats.getSuccessRecords() + currentSuccess);
stats.setFailedRecords(stats.getFailedRecords() + currentFailed);
}
public static boolean isDataInsightIndex(String entityType) {
@ -36,11 +37,15 @@ public class ReindexingUtil {
|| entityType.equalsIgnoreCase(ElasticSearchIndexDefinition.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA);
}
public static int getTotalRequestToProcess(Set<String> entities) {
public static int getTotalRequestToProcess(Set<String> entities, CollectionDAO dao) {
int total = 0;
for (String entityType : entities) {
EntityRepository<?> repository = Entity.getEntityRepository(entityType);
total += repository.dao.listTotalCount();
if (!isDataInsightIndex(entityType)) {
EntityRepository<?> repository = Entity.getEntityRepository(entityType);
total += repository.dao.listTotalCount();
} else {
total += dao.entityExtensionTimeSeriesDao().listCount(entityType);
}
}
return total;
}
@ -51,7 +56,6 @@ public class ReindexingUtil {
if (!bulkItemResponse.isFailed()) {
success++;
}
;
}
return success;
}

View File

@ -18,6 +18,7 @@ import static org.openmetadata.service.util.ReIndexingHandler.REINDEXING_JOB_EXT
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getSuccessFromBulkResponse;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getTotalRequestToProcess;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isDataInsightIndex;
import com.fasterxml.jackson.core.JsonProcessingException;
@ -44,8 +45,8 @@ import org.openmetadata.schema.system.Stats;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition;
import org.openmetadata.service.exception.ProcessorException;
import org.openmetadata.service.exception.ReaderException;
import org.openmetadata.service.exception.WriterException;
import org.openmetadata.service.exception.SinkException;
import org.openmetadata.service.exception.SourceException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.socket.WebSocketManager;
import org.openmetadata.service.util.JsonUtils;
@ -54,11 +55,11 @@ import org.openmetadata.service.util.ResultList;
@Slf4j
public class SearchIndexWorkflow implements Runnable {
private final List<PaginatedEntitiesReader> entitiesReaders = new ArrayList<>();
private final List<PaginatedDataInsightReader> dataInsightReaders = new ArrayList<>();
private final List<PaginatedEntitiesSource> paginatedEntitiesSources = new ArrayList<>();
private final List<PaginatedDataInsightSource> paginatedDataInsightSources = new ArrayList<>();
private final EsEntitiesProcessor entitiesProcessor;
private final EsDataInsightProcessor dataInsightProcessor;
private final EsSearchIndexWriter writer;
private final EsSearchIndexSink searchIndexSink;
private final ElasticSearchIndexDefinition elasticSearchIndexDefinition;
@Getter private final EventPublisherJob jobData;
private final CollectionDAO dao;
@ -78,146 +79,181 @@ public class SearchIndexWorkflow implements Runnable {
List<String> fields =
new ArrayList<>(
Objects.requireNonNull(getIndexFields(entityType, jobData.getSearchIndexMappingLanguage())));
entitiesReaders.add(new PaginatedEntitiesReader(entityType, jobData.getBatchSize(), fields));
paginatedEntitiesSources.add(new PaginatedEntitiesSource(entityType, jobData.getBatchSize(), fields));
} else {
dataInsightReaders.add(new PaginatedDataInsightReader(dao, entityType, jobData.getBatchSize()));
paginatedDataInsightSources.add(
new PaginatedDataInsightSource(dao, entityType, jobData.getBatchSize()));
}
});
this.entitiesProcessor = new EsEntitiesProcessor();
this.dataInsightProcessor = new EsDataInsightProcessor();
this.writer = new EsSearchIndexWriter(client);
this.searchIndexSink = new EsSearchIndexSink(client);
this.elasticSearchIndexDefinition = elasticSearchIndexDefinition;
}
@SneakyThrows
public void run() {
LOG.info("Executing Reindexing Job with JobData : {}", jobData);
// Update Job Status
jobData.setStatus(EventPublisherJob.Status.RUNNING);
// Run ReIndexing
entitiesReIndexer();
dataInsightReindexer();
// Mark Job as Completed
updateJobStatus();
jobData.setEndTime(System.currentTimeMillis());
// store job details in Database
updateRecordToDb();
ReIndexingHandler.getInstance().removeCompletedJob(jobData.getId());
try {
LOG.info("Executing Reindexing Job with JobData : {}", jobData);
// Update Job Status
jobData.setStatus(EventPublisherJob.Status.RUNNING);
// Run ReIndexing
entitiesReIndex();
dataInsightReindex();
// Mark Job as Completed
updateJobStatus();
jobData.setEndTime(System.currentTimeMillis());
} catch (Exception ex) {
String error =
String.format(
"Reindexing Job Has Encountered an Exception. \n Job Data: %s, \n Stack : %s ",
jobData.toString(), ExceptionUtils.getStackTrace(ex));
LOG.error(error);
jobData.setStatus(EventPublisherJob.Status.FAILED);
handleJobError("Failure in Job: Check Stack", error, System.currentTimeMillis());
} finally {
// store job details in Database
updateRecordToDb();
// Send update
sendUpdates();
// Remove list from active jobs
ReIndexingHandler.getInstance().removeCompletedJob(jobData.getId());
}
}
private void entitiesReIndexer() {
private void entitiesReIndex() {
Map<String, Object> contextData = new HashMap<>();
for (PaginatedEntitiesReader reader : entitiesReaders) {
reCreateIndexes(reader.getEntityType());
contextData.put(ENTITY_TYPE_KEY, reader.getEntityType());
for (PaginatedEntitiesSource paginatedEntitiesSource : paginatedEntitiesSources) {
reCreateIndexes(paginatedEntitiesSource.getEntityType());
contextData.put(ENTITY_TYPE_KEY, paginatedEntitiesSource.getEntityType());
ResultList<? extends EntityInterface> resultList;
while (!reader.isDone()) {
while (!paginatedEntitiesSource.isDone()) {
long currentTime = System.currentTimeMillis();
int requestToProcess = jobData.getBatchSize();
int failed = requestToProcess;
int success = 0;
try {
resultList = reader.readNext(null);
resultList = paginatedEntitiesSource.readNext(null);
requestToProcess = resultList.getData().size() + resultList.getErrors().size();
// process data to build Reindex Request
BulkRequest requests = entitiesProcessor.process(resultList, contextData);
// write the data to ElasticSearch
BulkResponse response = writer.write(requests, contextData);
// update Status
handleErrors(resultList, response, currentTime);
// Update stats
success = getSuccessFromBulkResponse(response);
failed = requestToProcess - success;
} catch (ReaderException rx) {
handleReaderError(
if (resultList.getData().size() > 0) {
// process data to build Reindex Request
BulkRequest requests = entitiesProcessor.process(resultList, contextData);
// write the data to ElasticSearch
BulkResponse response = searchIndexSink.write(requests, contextData);
// update Status
handleErrors(resultList, response, currentTime);
// Update stats
success = getSuccessFromBulkResponse(response);
failed = requestToProcess - success;
} else {
failed = 0;
}
} catch (SourceException rx) {
handleSourceError(
rx.getMessage(),
String.format("Cause: %s \n Stack: %s", rx.getCause(), ExceptionUtils.getStackTrace(rx)),
String.format(
"EntityType: %s \n Cause: %s \n Stack: %s",
paginatedEntitiesSource.getEntityType(), rx.getCause(), ExceptionUtils.getStackTrace(rx)),
currentTime);
} catch (ProcessorException px) {
handleProcessorError(
px.getMessage(),
String.format("Cause: %s \n Stack: %s", px.getCause(), ExceptionUtils.getStackTrace(px)),
String.format(
"EntityType: %s \n Cause: %s \n Stack: %s",
paginatedEntitiesSource.getEntityType(), px.getCause(), ExceptionUtils.getStackTrace(px)),
currentTime);
} catch (WriterException wx) {
handleEsError(
} catch (SinkException wx) {
handleEsSinkError(
wx.getMessage(),
String.format("Cause: %s \n Stack: %s", wx.getCause(), ExceptionUtils.getStackTrace(wx)),
String.format(
"EntityType: %s \n Cause: %s \n Stack: %s",
paginatedEntitiesSource.getEntityType(), wx.getCause(), ExceptionUtils.getStackTrace(wx)),
currentTime);
} finally {
updateStats(success, failed, reader.getStats(), entitiesProcessor.getStats(), writer.getStats());
try {
WebSocketManager.getInstance()
.sendToOne(
jobData.getStartedBy(),
WebSocketManager.JOB_STATUS_BROADCAST_CHANNEL,
JsonUtils.pojoToJson(jobData));
} catch (JsonProcessingException ex) {
LOG.error("Failed to send updated stats with WebSocker", ex);
}
updateStats(
success,
failed,
paginatedEntitiesSource.getStats(),
entitiesProcessor.getStats(),
searchIndexSink.getStats());
sendUpdates();
}
}
}
}
private void dataInsightReindexer() {
private void dataInsightReindex() {
Map<String, Object> contextData = new HashMap<>();
for (PaginatedDataInsightReader dataInsightReader : dataInsightReaders) {
reCreateIndexes(dataInsightReader.getEntityType());
contextData.put(ENTITY_TYPE_KEY, dataInsightReader.getEntityType());
for (PaginatedDataInsightSource paginatedDataInsightSource : paginatedDataInsightSources) {
reCreateIndexes(paginatedDataInsightSource.getEntityType());
contextData.put(ENTITY_TYPE_KEY, paginatedDataInsightSource.getEntityType());
ResultList<ReportData> resultList;
while (!dataInsightReader.isDone()) {
while (!paginatedDataInsightSource.isDone()) {
long currentTime = System.currentTimeMillis();
int requestToProcess = jobData.getBatchSize();
int failed = requestToProcess;
int success = 0;
try {
resultList = dataInsightReader.readNext(null);
resultList = paginatedDataInsightSource.readNext(null);
requestToProcess = resultList.getData().size() + resultList.getErrors().size();
// process data to build Reindex Request
BulkRequest requests = dataInsightProcessor.process(resultList, contextData);
// write the data to ElasticSearch
BulkResponse response = writer.write(requests, contextData);
// update Status
handleErrors(resultList, response, currentTime);
// Update stats
success = getSuccessFromBulkResponse(response);
failed = requestToProcess - success;
} catch (ReaderException rx) {
handleReaderError(
if (resultList.getData().size() > 0) {
// process data to build Reindex Request
BulkRequest requests = dataInsightProcessor.process(resultList, contextData);
// write the data to ElasticSearch
// write the data to ElasticSearch
BulkResponse response = searchIndexSink.write(requests, contextData);
// update Status
handleErrors(resultList, response, currentTime);
// Update stats
success = getSuccessFromBulkResponse(response);
failed = requestToProcess - success;
} else {
failed = 0;
}
} catch (SourceException rx) {
handleSourceError(
rx.getMessage(),
String.format("Cause: %s \n Stack: %s", rx.getCause(), ExceptionUtils.getStackTrace(rx)),
String.format(
"EntityType: %s \n Cause: %s \n Stack: %s",
paginatedDataInsightSource.getEntityType(), rx.getCause(), ExceptionUtils.getStackTrace(rx)),
currentTime);
} catch (ProcessorException px) {
handleProcessorError(
px.getMessage(),
String.format("Cause: %s \n Stack: %s", px.getCause(), ExceptionUtils.getStackTrace(px)),
String.format(
"EntityType: %s \n Cause: %s \n Stack: %s",
paginatedDataInsightSource.getEntityType(), px.getCause(), ExceptionUtils.getStackTrace(px)),
currentTime);
} catch (WriterException wx) {
handleEsError(
} catch (SinkException wx) {
handleEsSinkError(
wx.getMessage(),
String.format("Cause: %s \n Stack: %s", wx.getCause(), ExceptionUtils.getStackTrace(wx)),
String.format(
"EntityType: %s \n Cause: %s \n Stack: %s",
paginatedDataInsightSource.getEntityType(), wx.getCause(), ExceptionUtils.getStackTrace(wx)),
currentTime);
} finally {
updateStats(
success, failed, dataInsightReader.getStats(), dataInsightProcessor.getStats(), writer.getStats());
try {
WebSocketManager.getInstance()
.sendToOne(
jobData.getStartedBy(),
WebSocketManager.JOB_STATUS_BROADCAST_CHANNEL,
JsonUtils.pojoToJson(jobData));
} catch (JsonProcessingException ex) {
LOG.error("Failed to send updated stats with WebSocker", ex);
}
success,
failed,
paginatedDataInsightSource.getStats(),
dataInsightProcessor.getStats(),
searchIndexSink.getStats());
sendUpdates();
}
}
}
}
private void sendUpdates() {
try {
WebSocketManager.getInstance()
.sendToOne(
jobData.getStartedBy(), WebSocketManager.JOB_STATUS_BROADCAST_CHANNEL, JsonUtils.pojoToJson(jobData));
} catch (JsonProcessingException ex) {
LOG.error("Failed to send updated stats with WebSocket", ex);
}
}
public void updateStats(
int currentSuccess, int currentFailed, StepStats reader, StepStats processor, StepStats writer) {
// Job Level Stats
@ -226,21 +262,19 @@ public class SearchIndexWorkflow implements Runnable {
// Total Stats
StepStats stats = jobData.getStats().getJobStats();
if (stats == null) {
stats = new StepStats().withTotalRecords(getTotalRequestToProcess(jobData.getEntities()));
stats = new StepStats().withTotalRecords(getTotalRequestToProcess(jobData.getEntities(), dao));
}
stats.setTotalSuccessRecords(stats.getTotalSuccessRecords() + currentSuccess);
stats.setTotalFailedRecords(stats.getTotalFailedRecords() + currentFailed);
getUpdatedStats(stats, currentSuccess, currentFailed);
// Update for the Job
jobDataStats.setJobStats(stats);
// Reader Stats
jobDataStats.setSourceStats(reader);
// Processor
jobDataStats.setProcessorStats(processor);
// Writer
jobDataStats.setSinkStats(writer);
jobData.setStats(jobDataStats);
}
@ -268,11 +302,11 @@ public class SearchIndexWorkflow implements Runnable {
}
private void handleErrors(ResultList<?> data, BulkResponse response, long time) {
handleReaderError(data, time);
handleEsErrors(response, time);
handleSourceError(data, time);
handleEsSinkErrors(response, time);
}
private void handleReaderError(String context, String reason, long time) {
private void handleSourceError(String context, String reason, long time) {
Failure failures = getFailure();
FailureDetails readerFailures = getFailureDetails(context, reason, time);
failures.setSourceError(readerFailures);
@ -286,18 +320,25 @@ public class SearchIndexWorkflow implements Runnable {
jobData.setFailure(failures);
}
private void handleEsError(String context, String reason, long time) {
private void handleEsSinkError(String context, String reason, long time) {
Failure failures = getFailure();
FailureDetails writerFailure = getFailureDetails(context, reason, time);
failures.setProcessorError(writerFailure);
failures.setSinkError(writerFailure);
jobData.setFailure(failures);
}
private void handleJobError(String context, String reason, long time) {
Failure failures = getFailure();
FailureDetails jobFailure = getFailureDetails(context, reason, time);
failures.setJobError(jobFailure);
jobData.setFailure(failures);
}
@SneakyThrows
private void handleReaderError(ResultList<?> data, long time) {
private void handleSourceError(ResultList<?> data, long time) {
if (data.getErrors().size() > 0) {
handleReaderError(
"ReaderContext: Encountered Error While Reading Data",
handleSourceError(
"SourceContext: Encountered Error While Reading Data",
String.format(
"Following Entities were not fetched Successfully : %s", JsonUtils.pojoToJson(data.getErrors())),
time);
@ -305,7 +346,7 @@ public class SearchIndexWorkflow implements Runnable {
}
@SneakyThrows
private void handleEsErrors(BulkResponse response, long time) {
private void handleEsSinkErrors(BulkResponse response, long time) {
List<FailureDetails> details = new ArrayList<>();
for (BulkItemResponse bulkItemResponse : response) {
if (bulkItemResponse.isFailed()) {
@ -325,7 +366,7 @@ public class SearchIndexWorkflow implements Runnable {
}
}
if (details.size() > 0) {
handleEsError(
handleEsSinkError(
"[EsWriter] BulkResponseItems",
String.format("[BulkItemResponse] Got Following Error Responses: \n %s ", JsonUtils.pojoToJson(details)),
time);

View File

@ -34,12 +34,17 @@
"type": "integer",
"default": 0
},
"totalSuccessRecords": {
"processedRecords": {
"description": "Records that are processed in",
"type": "integer",
"default": 0
},
"successRecords": {
"description": "Count of Total Successfully Records",
"type": "integer",
"default": 0
},
"totalFailedRecords": {
"failedRecords": {
"description": "Count of Total Failed Records",
"type": "integer",
"default": 0
@ -140,6 +145,11 @@
"type": "object",
"$ref": "#/definitions/failureDetails",
"default": null
},
"jobError" : {
"type": "object",
"$ref": "#/definitions/failureDetails",
"default": null
}
},
"additionalProperties": false