From ec6ff78b80927fd5e4a832ca8819ddb92ce24f73 Mon Sep 17 00:00:00 2001 From: Mohit Yadav <105265192+mohityadav766@users.noreply.github.com> Date: Thu, 6 Apr 2023 10:50:08 +0530 Subject: [PATCH] fixes for Reindex (#10948) * fixes for Reindex * review comments --- ...eaderException.java => SinkException.java} | 4 +- ...terException.java => SourceException.java} | 4 +- .../service/workflows/interfaces/Sink.java | 4 +- .../service/workflows/interfaces/Source.java | 4 +- .../searchIndex/EsDataInsightProcessor.java | 2 +- ...ndexWriter.java => EsSearchIndexSink.java} | 19 +- ...r.java => PaginatedDataInsightSource.java} | 22 +- ...ader.java => PaginatedEntitiesSource.java} | 33 ++- .../workflows/searchIndex/ReindexingUtil.java | 18 +- .../searchIndex/SearchIndexWorkflow.java | 249 ++++++++++-------- .../json/schema/system/eventPublisherJob.json | 14 +- 11 files changed, 221 insertions(+), 152 deletions(-) rename openmetadata-service/src/main/java/org/openmetadata/service/exception/{ReaderException.java => SinkException.java} (86%) rename openmetadata-service/src/main/java/org/openmetadata/service/exception/{WriterException.java => SourceException.java} (86%) rename openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/{EsSearchIndexWriter.java => EsSearchIndexSink.java} (76%) rename openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/{PaginatedDataInsightReader.java => PaginatedDataInsightSource.java} (84%) rename openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/{PaginatedEntitiesReader.java => PaginatedEntitiesSource.java} (70%) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/exception/ReaderException.java b/openmetadata-service/src/main/java/org/openmetadata/service/exception/SinkException.java similarity index 86% rename from openmetadata-service/src/main/java/org/openmetadata/service/exception/ReaderException.java rename to openmetadata-service/src/main/java/org/openmetadata/service/exception/SinkException.java index fd2488886b6..b37c218b1f3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/exception/ReaderException.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/exception/SinkException.java @@ -15,8 +15,8 @@ package org.openmetadata.service.exception; import java.io.IOException; -public class ReaderException extends IOException { - public ReaderException(String msg, Throwable throwable) { +public class SinkException extends IOException { + public SinkException(String msg, Throwable throwable) { super(msg, throwable); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/exception/WriterException.java b/openmetadata-service/src/main/java/org/openmetadata/service/exception/SourceException.java similarity index 86% rename from openmetadata-service/src/main/java/org/openmetadata/service/exception/WriterException.java rename to openmetadata-service/src/main/java/org/openmetadata/service/exception/SourceException.java index cf38695f75e..a04cdb5602e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/exception/WriterException.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/exception/SourceException.java @@ -15,8 +15,8 @@ package org.openmetadata.service.exception; import java.io.IOException; -public class WriterException extends IOException { - public WriterException(String msg, Throwable throwable) { +public class SourceException extends IOException { + public SourceException(String msg, Throwable throwable) { super(msg, throwable); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/interfaces/Sink.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/interfaces/Sink.java index 22bc8057ab0..7ac30be9b45 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/interfaces/Sink.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/interfaces/Sink.java @@ -14,8 +14,8 @@ package org.openmetadata.service.workflows.interfaces; import java.util.Map; -import org.openmetadata.service.exception.WriterException; +import org.openmetadata.service.exception.SinkException; public interface Sink extends Stats { - O write(I data, Map contextData) throws WriterException; + O write(I data, Map contextData) throws SinkException; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/interfaces/Source.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/interfaces/Source.java index 2da5b01c1e1..fa6c4d155aa 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/interfaces/Source.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/interfaces/Source.java @@ -14,10 +14,10 @@ package org.openmetadata.service.workflows.interfaces; import java.util.Map; -import org.openmetadata.service.exception.ReaderException; +import org.openmetadata.service.exception.SourceException; public interface Source extends Stats { - R readNext(Map contextData) throws ReaderException; + R readNext(Map contextData) throws SourceException; void reset(); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/EsDataInsightProcessor.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/EsDataInsightProcessor.java index 603dc27c0e0..b8552051a95 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/EsDataInsightProcessor.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/EsDataInsightProcessor.java @@ -41,7 +41,7 @@ public class EsDataInsightProcessor implements Processor, public BulkRequest process(ResultList input, Map contextData) throws ProcessorException { String entityType = (String) contextData.get(ENTITY_TYPE_KEY); if (CommonUtil.nullOrEmpty(entityType)) { - throw new IllegalArgumentException("[EsEntitiesProcessor] entityType cannot be null or empty."); + throw new IllegalArgumentException("[EsDataInsightProcessor] entityType cannot be null or empty."); } LOG.debug( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/EsSearchIndexWriter.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/EsSearchIndexSink.java similarity index 76% rename from openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/EsSearchIndexWriter.java rename to openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/EsSearchIndexSink.java index fcb282ba448..c7a468bf0a3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/EsSearchIndexWriter.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/EsSearchIndexSink.java @@ -16,7 +16,6 @@ package org.openmetadata.service.workflows.searchIndex; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getSuccessFromBulkResponse; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats; -import java.io.IOException; import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.elasticsearch.action.bulk.BulkRequest; @@ -24,21 +23,21 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.openmetadata.schema.system.StepStats; -import org.openmetadata.service.exception.WriterException; +import org.openmetadata.service.exception.SinkException; import org.openmetadata.service.workflows.interfaces.Sink; @Slf4j -public class EsSearchIndexWriter implements Sink { +public class EsSearchIndexSink implements Sink { private final StepStats stats = new StepStats(); private final RestHighLevelClient client; - EsSearchIndexWriter(RestHighLevelClient client) { + EsSearchIndexSink(RestHighLevelClient client) { this.client = client; } @Override - public BulkResponse write(BulkRequest data, Map contextData) throws WriterException { - LOG.debug("[EsSearchIndexWriter] Processing a Batch of Size: {}", data.numberOfActions()); + public BulkResponse write(BulkRequest data, Map contextData) throws SinkException { + LOG.debug("[EsSearchIndexSink] Processing a Batch of Size: {}", data.numberOfActions()); try { BulkResponse response = client.bulk(data, RequestOptions.DEFAULT); int currentSuccess = getSuccessFromBulkResponse(response); @@ -46,21 +45,21 @@ public class EsSearchIndexWriter implements Sink { // Update Stats LOG.debug( - "[EsSearchIndexWriter] Batch Stats :- Submitted : {} Success: {} Failed: {}", + "[EsSearchIndexSink] Batch Stats :- Submitted : {} Success: {} Failed: {}", data.numberOfActions(), currentSuccess, currentFailed); updateStats(currentSuccess, currentFailed); return response; - } catch (IOException e) { + } catch (Exception e) { LOG.debug( - "[EsSearchIndexWriter] Batch Stats :- Submitted : {} Success: {} Failed: {}", + "[EsSearchIndexSink] Batch Stats :- Submitted : {} Success: {} Failed: {}", data.numberOfActions(), 0, data.numberOfActions()); updateStats(0, data.numberOfActions()); - throw new WriterException("[EsSearchIndexWriter] Batch encountered Exception. Failing Completely", e); + throw new SinkException("[EsSearchIndexSink] Batch encountered Exception. Failing Completely", e); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedDataInsightReader.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedDataInsightSource.java similarity index 84% rename from openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedDataInsightReader.java rename to openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedDataInsightSource.java index dff07c50473..5fc029deb6e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedDataInsightReader.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedDataInsightSource.java @@ -22,14 +22,14 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.analytics.ReportData; import org.openmetadata.schema.system.StepStats; -import org.openmetadata.service.exception.ReaderException; +import org.openmetadata.service.exception.SourceException; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.util.RestUtil; import org.openmetadata.service.util.ResultList; import org.openmetadata.service.workflows.interfaces.Source; @Slf4j -public class PaginatedDataInsightReader implements Source> { +public class PaginatedDataInsightSource implements Source> { private final CollectionDAO dao; @Getter private final String entityType; @Getter private final int batchSize; @@ -37,14 +37,15 @@ public class PaginatedDataInsightReader implements Source private String cursor = null; @Getter private boolean isDone = false; - public PaginatedDataInsightReader(CollectionDAO dao, String entityType, int batchSize) { + public PaginatedDataInsightSource(CollectionDAO dao, String entityType, int batchSize) { this.dao = dao; this.entityType = entityType; this.batchSize = batchSize; + stats.setTotalRecords(dao.entityExtensionTimeSeriesDao().listCount(entityType)); } @Override - public ResultList readNext(Map contextData) throws ReaderException { + public ResultList readNext(Map contextData) throws SourceException { if (!isDone) { ResultList data = read(cursor); cursor = data.getPaging().getAfter(); @@ -63,7 +64,7 @@ public class PaginatedDataInsightReader implements Source isDone = false; } - private ResultList read(String afterCursor) throws ReaderException { + private ResultList read(String afterCursor) throws SourceException { LOG.debug("[DataInsightReader] Fetching a Batch of Size: {} ", batchSize); ResultList result; try { @@ -76,8 +77,13 @@ public class PaginatedDataInsightReader implements Source updateStats(result.getData().size(), result.getErrors().size()); } catch (Exception ex) { LOG.debug("[DataInsightReader] Batch Stats :- Submitted : {} Success: {} Failed: {}", batchSize, 0, batchSize); - updateStats(0, batchSize); - throw new ReaderException("[EntitiesReader] Batch encountered Exception. Failing Completely.", ex); + if (stats.getTotalRecords() - stats.getProcessedRecords() <= batchSize) { + isDone = true; + updateStats(0, stats.getTotalRecords() - stats.getProcessedRecords()); + } else { + updateStats(0, batchSize); + } + throw new SourceException("[EntitiesReader] Batch encountered Exception. Failing Completely.", ex); } return result; @@ -104,7 +110,7 @@ public class PaginatedDataInsightReader implements Source for (CollectionDAO.ReportDataRow reportDataRow : reportDataRowList) { reportDataList.add(reportDataRow.getReportData()); } - return new ResultList<>(reportDataList, beforeCursor, afterCursor, total); + return new ResultList<>(reportDataList, new ArrayList<>(), beforeCursor, afterCursor, total); } @Override diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesReader.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java similarity index 70% rename from openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesReader.java rename to openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java index 396c6f627a7..c8bb64103ec 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesReader.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java @@ -24,14 +24,14 @@ import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.system.StepStats; import org.openmetadata.schema.type.Include; import org.openmetadata.service.Entity; -import org.openmetadata.service.exception.ReaderException; +import org.openmetadata.service.exception.SourceException; import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.jdbi3.ListFilter; import org.openmetadata.service.util.ResultList; import org.openmetadata.service.workflows.interfaces.Source; @Slf4j -public class PaginatedEntitiesReader implements Source> { +public class PaginatedEntitiesSource implements Source> { @Getter private final int batchSize; @Getter private final String entityType; @Getter private final List fields; @@ -39,14 +39,15 @@ public class PaginatedEntitiesReader implements Source fields) { + PaginatedEntitiesSource(String entityType, int batchSize, List fields) { this.entityType = entityType; this.batchSize = batchSize; this.fields = fields; + this.stats.setTotalRecords(Entity.getEntityRepository(entityType).dao.listTotalCount()); } @Override - public ResultList readNext(Map contextData) throws ReaderException { + public ResultList readNext(Map contextData) throws SourceException { if (!isDone) { ResultList data = read(cursor); cursor = data.getPaging().getAfter(); @@ -59,10 +60,10 @@ public class PaginatedEntitiesReader implements Source read(String cursor) throws ReaderException { - LOG.debug("[EntitiesReader] Fetching a Batch of Size: {} ", batchSize); + private ResultList read(String cursor) throws SourceException { + LOG.debug("[PaginatedEntitiesSource] Fetching a Batch of Size: {} ", batchSize); EntityRepository entityRepository = Entity.getEntityRepository(entityType); - ResultList result; + ResultList result = null; try { result = entityRepository.listAfterWithSkipFailure( @@ -70,20 +71,28 @@ public class PaginatedEntitiesReader implements Source 0) { result .getErrors() - .forEach((error) -> LOG.error("[EntitiesReader] Failed in getting Record, RECORD: {}", error.toString())); + .forEach( + (error) -> + LOG.error("[PaginatedEntitiesSource] Failed in getting Record, RECORD: {}", error.toString())); } LOG.debug( - "[EntitiesReader] Batch Stats :- Submitted : {} Success: {} Failed: {}", + "[PaginatedEntitiesSource] Batch Stats :- Submitted : {} Success: {} Failed: {}", batchSize, result.getData().size(), result.getErrors().size()); updateStats(result.getData().size(), result.getErrors().size()); } catch (IOException e) { - LOG.debug("[EntitiesReader] Batch Stats :- Submitted : {} Success: {} Failed: {}", batchSize, 0, batchSize); - updateStats(0, batchSize); - throw new ReaderException("[EntitiesReader] Batch encountered Exception. Failing Completely.", e); + LOG.debug( + "[PaginatedEntitiesSource] Batch Stats :- Submitted : {} Success: {} Failed: {}", batchSize, 0, batchSize); + if (stats.getTotalRecords() - stats.getProcessedRecords() <= batchSize) { + isDone = true; + updateStats(0, stats.getTotalRecords() - stats.getProcessedRecords()); + } else { + updateStats(0, batchSize); + } + throw new SourceException("[PaginatedEntitiesSource] Batch encountered Exception. Failing Completely.", e); } return result; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java index 908d6348aec..c522ff50588 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java @@ -19,15 +19,16 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.openmetadata.schema.system.StepStats; import org.openmetadata.service.Entity; import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition; +import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.EntityRepository; public class ReindexingUtil { public static final String ENTITY_TYPE_KEY = "entityType"; public static void getUpdatedStats(StepStats stats, int currentSuccess, int currentFailed) { - stats.setTotalRecords(stats.getTotalRecords() + currentSuccess + currentFailed); - stats.setTotalSuccessRecords(stats.getTotalSuccessRecords() + currentSuccess); - stats.setTotalFailedRecords(stats.getTotalFailedRecords() + currentFailed); + stats.setProcessedRecords(stats.getProcessedRecords() + currentSuccess + currentFailed); + stats.setSuccessRecords(stats.getSuccessRecords() + currentSuccess); + stats.setFailedRecords(stats.getFailedRecords() + currentFailed); } public static boolean isDataInsightIndex(String entityType) { @@ -36,11 +37,15 @@ public class ReindexingUtil { || entityType.equalsIgnoreCase(ElasticSearchIndexDefinition.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA); } - public static int getTotalRequestToProcess(Set entities) { + public static int getTotalRequestToProcess(Set entities, CollectionDAO dao) { int total = 0; for (String entityType : entities) { - EntityRepository repository = Entity.getEntityRepository(entityType); - total += repository.dao.listTotalCount(); + if (!isDataInsightIndex(entityType)) { + EntityRepository repository = Entity.getEntityRepository(entityType); + total += repository.dao.listTotalCount(); + } else { + total += dao.entityExtensionTimeSeriesDao().listCount(entityType); + } } return total; } @@ -51,7 +56,6 @@ public class ReindexingUtil { if (!bulkItemResponse.isFailed()) { success++; } - ; } return success; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/SearchIndexWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/SearchIndexWorkflow.java index f248d6c5f90..f020788e030 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/SearchIndexWorkflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/SearchIndexWorkflow.java @@ -18,6 +18,7 @@ import static org.openmetadata.service.util.ReIndexingHandler.REINDEXING_JOB_EXT import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getSuccessFromBulkResponse; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getTotalRequestToProcess; +import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isDataInsightIndex; import com.fasterxml.jackson.core.JsonProcessingException; @@ -44,8 +45,8 @@ import org.openmetadata.schema.system.Stats; import org.openmetadata.schema.system.StepStats; import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition; import org.openmetadata.service.exception.ProcessorException; -import org.openmetadata.service.exception.ReaderException; -import org.openmetadata.service.exception.WriterException; +import org.openmetadata.service.exception.SinkException; +import org.openmetadata.service.exception.SourceException; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.socket.WebSocketManager; import org.openmetadata.service.util.JsonUtils; @@ -54,11 +55,11 @@ import org.openmetadata.service.util.ResultList; @Slf4j public class SearchIndexWorkflow implements Runnable { - private final List entitiesReaders = new ArrayList<>(); - private final List dataInsightReaders = new ArrayList<>(); + private final List paginatedEntitiesSources = new ArrayList<>(); + private final List paginatedDataInsightSources = new ArrayList<>(); private final EsEntitiesProcessor entitiesProcessor; private final EsDataInsightProcessor dataInsightProcessor; - private final EsSearchIndexWriter writer; + private final EsSearchIndexSink searchIndexSink; private final ElasticSearchIndexDefinition elasticSearchIndexDefinition; @Getter private final EventPublisherJob jobData; private final CollectionDAO dao; @@ -78,146 +79,181 @@ public class SearchIndexWorkflow implements Runnable { List fields = new ArrayList<>( Objects.requireNonNull(getIndexFields(entityType, jobData.getSearchIndexMappingLanguage()))); - entitiesReaders.add(new PaginatedEntitiesReader(entityType, jobData.getBatchSize(), fields)); + paginatedEntitiesSources.add(new PaginatedEntitiesSource(entityType, jobData.getBatchSize(), fields)); } else { - dataInsightReaders.add(new PaginatedDataInsightReader(dao, entityType, jobData.getBatchSize())); + paginatedDataInsightSources.add( + new PaginatedDataInsightSource(dao, entityType, jobData.getBatchSize())); } }); this.entitiesProcessor = new EsEntitiesProcessor(); this.dataInsightProcessor = new EsDataInsightProcessor(); - this.writer = new EsSearchIndexWriter(client); + this.searchIndexSink = new EsSearchIndexSink(client); this.elasticSearchIndexDefinition = elasticSearchIndexDefinition; } @SneakyThrows public void run() { - LOG.info("Executing Reindexing Job with JobData : {}", jobData); - - // Update Job Status - jobData.setStatus(EventPublisherJob.Status.RUNNING); - - // Run ReIndexing - entitiesReIndexer(); - dataInsightReindexer(); - - // Mark Job as Completed - updateJobStatus(); - jobData.setEndTime(System.currentTimeMillis()); - - // store job details in Database - updateRecordToDb(); - ReIndexingHandler.getInstance().removeCompletedJob(jobData.getId()); + try { + LOG.info("Executing Reindexing Job with JobData : {}", jobData); + // Update Job Status + jobData.setStatus(EventPublisherJob.Status.RUNNING); + // Run ReIndexing + entitiesReIndex(); + dataInsightReindex(); + // Mark Job as Completed + updateJobStatus(); + jobData.setEndTime(System.currentTimeMillis()); + } catch (Exception ex) { + String error = + String.format( + "Reindexing Job Has Encountered an Exception. \n Job Data: %s, \n Stack : %s ", + jobData.toString(), ExceptionUtils.getStackTrace(ex)); + LOG.error(error); + jobData.setStatus(EventPublisherJob.Status.FAILED); + handleJobError("Failure in Job: Check Stack", error, System.currentTimeMillis()); + } finally { + // store job details in Database + updateRecordToDb(); + // Send update + sendUpdates(); + // Remove list from active jobs + ReIndexingHandler.getInstance().removeCompletedJob(jobData.getId()); + } } - private void entitiesReIndexer() { + private void entitiesReIndex() { Map contextData = new HashMap<>(); - for (PaginatedEntitiesReader reader : entitiesReaders) { - reCreateIndexes(reader.getEntityType()); - contextData.put(ENTITY_TYPE_KEY, reader.getEntityType()); + for (PaginatedEntitiesSource paginatedEntitiesSource : paginatedEntitiesSources) { + reCreateIndexes(paginatedEntitiesSource.getEntityType()); + contextData.put(ENTITY_TYPE_KEY, paginatedEntitiesSource.getEntityType()); ResultList resultList; - while (!reader.isDone()) { + while (!paginatedEntitiesSource.isDone()) { long currentTime = System.currentTimeMillis(); int requestToProcess = jobData.getBatchSize(); int failed = requestToProcess; int success = 0; try { - resultList = reader.readNext(null); + resultList = paginatedEntitiesSource.readNext(null); requestToProcess = resultList.getData().size() + resultList.getErrors().size(); - // process data to build Reindex Request - BulkRequest requests = entitiesProcessor.process(resultList, contextData); - // write the data to ElasticSearch - BulkResponse response = writer.write(requests, contextData); - // update Status - handleErrors(resultList, response, currentTime); - // Update stats - success = getSuccessFromBulkResponse(response); - failed = requestToProcess - success; - } catch (ReaderException rx) { - handleReaderError( + if (resultList.getData().size() > 0) { + // process data to build Reindex Request + BulkRequest requests = entitiesProcessor.process(resultList, contextData); + // write the data to ElasticSearch + BulkResponse response = searchIndexSink.write(requests, contextData); + // update Status + handleErrors(resultList, response, currentTime); + // Update stats + success = getSuccessFromBulkResponse(response); + failed = requestToProcess - success; + } else { + failed = 0; + } + } catch (SourceException rx) { + handleSourceError( rx.getMessage(), - String.format("Cause: %s \n Stack: %s", rx.getCause(), ExceptionUtils.getStackTrace(rx)), + String.format( + "EntityType: %s \n Cause: %s \n Stack: %s", + paginatedEntitiesSource.getEntityType(), rx.getCause(), ExceptionUtils.getStackTrace(rx)), currentTime); } catch (ProcessorException px) { handleProcessorError( px.getMessage(), - String.format("Cause: %s \n Stack: %s", px.getCause(), ExceptionUtils.getStackTrace(px)), + String.format( + "EntityType: %s \n Cause: %s \n Stack: %s", + paginatedEntitiesSource.getEntityType(), px.getCause(), ExceptionUtils.getStackTrace(px)), currentTime); - } catch (WriterException wx) { - handleEsError( + } catch (SinkException wx) { + handleEsSinkError( wx.getMessage(), - String.format("Cause: %s \n Stack: %s", wx.getCause(), ExceptionUtils.getStackTrace(wx)), + String.format( + "EntityType: %s \n Cause: %s \n Stack: %s", + paginatedEntitiesSource.getEntityType(), wx.getCause(), ExceptionUtils.getStackTrace(wx)), currentTime); } finally { - updateStats(success, failed, reader.getStats(), entitiesProcessor.getStats(), writer.getStats()); - try { - WebSocketManager.getInstance() - .sendToOne( - jobData.getStartedBy(), - WebSocketManager.JOB_STATUS_BROADCAST_CHANNEL, - JsonUtils.pojoToJson(jobData)); - } catch (JsonProcessingException ex) { - LOG.error("Failed to send updated stats with WebSocker", ex); - } + updateStats( + success, + failed, + paginatedEntitiesSource.getStats(), + entitiesProcessor.getStats(), + searchIndexSink.getStats()); + sendUpdates(); } } } } - private void dataInsightReindexer() { + private void dataInsightReindex() { Map contextData = new HashMap<>(); - for (PaginatedDataInsightReader dataInsightReader : dataInsightReaders) { - reCreateIndexes(dataInsightReader.getEntityType()); - contextData.put(ENTITY_TYPE_KEY, dataInsightReader.getEntityType()); + for (PaginatedDataInsightSource paginatedDataInsightSource : paginatedDataInsightSources) { + reCreateIndexes(paginatedDataInsightSource.getEntityType()); + contextData.put(ENTITY_TYPE_KEY, paginatedDataInsightSource.getEntityType()); ResultList resultList; - while (!dataInsightReader.isDone()) { + while (!paginatedDataInsightSource.isDone()) { long currentTime = System.currentTimeMillis(); int requestToProcess = jobData.getBatchSize(); int failed = requestToProcess; int success = 0; try { - resultList = dataInsightReader.readNext(null); + resultList = paginatedDataInsightSource.readNext(null); requestToProcess = resultList.getData().size() + resultList.getErrors().size(); - // process data to build Reindex Request - BulkRequest requests = dataInsightProcessor.process(resultList, contextData); - // write the data to ElasticSearch - BulkResponse response = writer.write(requests, contextData); - // update Status - handleErrors(resultList, response, currentTime); - // Update stats - success = getSuccessFromBulkResponse(response); - failed = requestToProcess - success; - } catch (ReaderException rx) { - handleReaderError( + if (resultList.getData().size() > 0) { + // process data to build Reindex Request + BulkRequest requests = dataInsightProcessor.process(resultList, contextData); + // write the data to ElasticSearch + // write the data to ElasticSearch + BulkResponse response = searchIndexSink.write(requests, contextData); + // update Status + handleErrors(resultList, response, currentTime); + // Update stats + success = getSuccessFromBulkResponse(response); + failed = requestToProcess - success; + } else { + failed = 0; + } + } catch (SourceException rx) { + handleSourceError( rx.getMessage(), - String.format("Cause: %s \n Stack: %s", rx.getCause(), ExceptionUtils.getStackTrace(rx)), + String.format( + "EntityType: %s \n Cause: %s \n Stack: %s", + paginatedDataInsightSource.getEntityType(), rx.getCause(), ExceptionUtils.getStackTrace(rx)), currentTime); } catch (ProcessorException px) { handleProcessorError( px.getMessage(), - String.format("Cause: %s \n Stack: %s", px.getCause(), ExceptionUtils.getStackTrace(px)), + String.format( + "EntityType: %s \n Cause: %s \n Stack: %s", + paginatedDataInsightSource.getEntityType(), px.getCause(), ExceptionUtils.getStackTrace(px)), currentTime); - } catch (WriterException wx) { - handleEsError( + } catch (SinkException wx) { + handleEsSinkError( wx.getMessage(), - String.format("Cause: %s \n Stack: %s", wx.getCause(), ExceptionUtils.getStackTrace(wx)), + String.format( + "EntityType: %s \n Cause: %s \n Stack: %s", + paginatedDataInsightSource.getEntityType(), wx.getCause(), ExceptionUtils.getStackTrace(wx)), currentTime); } finally { updateStats( - success, failed, dataInsightReader.getStats(), dataInsightProcessor.getStats(), writer.getStats()); - try { - WebSocketManager.getInstance() - .sendToOne( - jobData.getStartedBy(), - WebSocketManager.JOB_STATUS_BROADCAST_CHANNEL, - JsonUtils.pojoToJson(jobData)); - } catch (JsonProcessingException ex) { - LOG.error("Failed to send updated stats with WebSocker", ex); - } + success, + failed, + paginatedDataInsightSource.getStats(), + dataInsightProcessor.getStats(), + searchIndexSink.getStats()); + sendUpdates(); } } } } + private void sendUpdates() { + try { + WebSocketManager.getInstance() + .sendToOne( + jobData.getStartedBy(), WebSocketManager.JOB_STATUS_BROADCAST_CHANNEL, JsonUtils.pojoToJson(jobData)); + } catch (JsonProcessingException ex) { + LOG.error("Failed to send updated stats with WebSocket", ex); + } + } + public void updateStats( int currentSuccess, int currentFailed, StepStats reader, StepStats processor, StepStats writer) { // Job Level Stats @@ -226,21 +262,19 @@ public class SearchIndexWorkflow implements Runnable { // Total Stats StepStats stats = jobData.getStats().getJobStats(); if (stats == null) { - stats = new StepStats().withTotalRecords(getTotalRequestToProcess(jobData.getEntities())); + stats = new StepStats().withTotalRecords(getTotalRequestToProcess(jobData.getEntities(), dao)); } - stats.setTotalSuccessRecords(stats.getTotalSuccessRecords() + currentSuccess); - stats.setTotalFailedRecords(stats.getTotalFailedRecords() + currentFailed); + getUpdatedStats(stats, currentSuccess, currentFailed); + // Update for the Job jobDataStats.setJobStats(stats); - // Reader Stats jobDataStats.setSourceStats(reader); - // Processor jobDataStats.setProcessorStats(processor); - // Writer jobDataStats.setSinkStats(writer); + jobData.setStats(jobDataStats); } @@ -268,11 +302,11 @@ public class SearchIndexWorkflow implements Runnable { } private void handleErrors(ResultList data, BulkResponse response, long time) { - handleReaderError(data, time); - handleEsErrors(response, time); + handleSourceError(data, time); + handleEsSinkErrors(response, time); } - private void handleReaderError(String context, String reason, long time) { + private void handleSourceError(String context, String reason, long time) { Failure failures = getFailure(); FailureDetails readerFailures = getFailureDetails(context, reason, time); failures.setSourceError(readerFailures); @@ -286,18 +320,25 @@ public class SearchIndexWorkflow implements Runnable { jobData.setFailure(failures); } - private void handleEsError(String context, String reason, long time) { + private void handleEsSinkError(String context, String reason, long time) { Failure failures = getFailure(); FailureDetails writerFailure = getFailureDetails(context, reason, time); - failures.setProcessorError(writerFailure); + failures.setSinkError(writerFailure); + jobData.setFailure(failures); + } + + private void handleJobError(String context, String reason, long time) { + Failure failures = getFailure(); + FailureDetails jobFailure = getFailureDetails(context, reason, time); + failures.setJobError(jobFailure); jobData.setFailure(failures); } @SneakyThrows - private void handleReaderError(ResultList data, long time) { + private void handleSourceError(ResultList data, long time) { if (data.getErrors().size() > 0) { - handleReaderError( - "ReaderContext: Encountered Error While Reading Data", + handleSourceError( + "SourceContext: Encountered Error While Reading Data", String.format( "Following Entities were not fetched Successfully : %s", JsonUtils.pojoToJson(data.getErrors())), time); @@ -305,7 +346,7 @@ public class SearchIndexWorkflow implements Runnable { } @SneakyThrows - private void handleEsErrors(BulkResponse response, long time) { + private void handleEsSinkErrors(BulkResponse response, long time) { List details = new ArrayList<>(); for (BulkItemResponse bulkItemResponse : response) { if (bulkItemResponse.isFailed()) { @@ -325,7 +366,7 @@ public class SearchIndexWorkflow implements Runnable { } } if (details.size() > 0) { - handleEsError( + handleEsSinkError( "[EsWriter] BulkResponseItems", String.format("[BulkItemResponse] Got Following Error Responses: \n %s ", JsonUtils.pojoToJson(details)), time); diff --git a/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json b/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json index f489c288f68..7a84e771266 100644 --- a/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json +++ b/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json @@ -34,12 +34,17 @@ "type": "integer", "default": 0 }, - "totalSuccessRecords": { + "processedRecords": { + "description": "Records that are processed in", + "type": "integer", + "default": 0 + }, + "successRecords": { "description": "Count of Total Successfully Records", "type": "integer", "default": 0 }, - "totalFailedRecords": { + "failedRecords": { "description": "Count of Total Failed Records", "type": "integer", "default": 0 @@ -140,6 +145,11 @@ "type": "object", "$ref": "#/definitions/failureDetails", "default": null + }, + "jobError" : { + "type": "object", + "$ref": "#/definitions/failureDetails", + "default": null } }, "additionalProperties": false