From 10d8ec84fe3445dd5b356879c252ce652aac2f8e Mon Sep 17 00:00:00 2001 From: Mohit Yadav <105265192+mohityadav766@users.noreply.github.com> Date: Mon, 13 Nov 2023 23:39:56 +0530 Subject: [PATCH] Logs added for Search Indexing and Stats issue fixed (#13956) * Logs added for Search Indexing and Stats issue fixed * Fix uninstall error * Add error handling * fix lint * Push Job Level Exception on top * disable flaky tests * Fix Logs not visible in Search --------- Co-authored-by: ulixius9 --- .../metadata_elasticsearch/metadata.py | 2 +- .../tests/unit/readers/test_github_reader.py | 4 +- .../dashboard/test_lookml_github_reader.py | 4 +- .../apps/AbstractNativeApplication.java | 21 ++ .../bundles/searchIndex/SearchIndexApp.java | 227 ++++++++---------- .../scheduler/AbstractOmAppJobListener.java | 55 ++++- .../service/exception/SourceException.java | 8 + .../service/jdbi3/EntityRepository.java | 2 +- .../service/resources/apps/AppResource.java | 47 ++-- .../service/search/SearchRepository.java | 15 +- .../ElasticSearchDataInsightProcessor.java | 4 + .../ElasticSearchEntitiesProcessor.java | 4 + .../elasticsearch/ElasticSearchIndexSink.java | 3 +- .../OpenSearchDataInsightProcessor.java | 4 + .../OpenSearchEntitiesProcessor.java | 4 + .../opensearch/OpenSearchIndexSink.java | 4 +- .../service/workflows/interfaces/Source.java | 3 + .../PaginatedDataInsightSource.java | 38 ++- .../searchIndex/PaginatedEntitiesSource.java | 86 ++++--- .../workflows/searchIndex/ReindexingUtil.java | 7 +- .../entity/applications/appRunRecord.json | 5 + .../json/schema/system/eventPublisherJob.json | 60 +---- .../AppLogsViewer/AppLogsViewer.component.tsx | 46 +--- 23 files changed, 357 insertions(+), 296 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/metadata/metadata_elasticsearch/metadata.py b/ingestion/src/metadata/ingestion/source/metadata/metadata_elasticsearch/metadata.py index b2a940d8d42..0a46dd0f3fc 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/metadata_elasticsearch/metadata.py +++ b/ingestion/src/metadata/ingestion/source/metadata/metadata_elasticsearch/metadata.py @@ -112,7 +112,7 @@ class MetadataElasticsearchSource(Source): total_retries_count = 3 current_try = 1 - while status not in {Status.COMPLETED, Status.FAILED, Status.STOPPED}: + while status not in {Status.completed, Status.failed, Status.stopped}: sleep(5) job = self.metadata.get_reindex_job_status(model_str(self.reindex_job.id)) if job and job.stats and job.stats.jobStats: diff --git a/ingestion/tests/unit/readers/test_github_reader.py b/ingestion/tests/unit/readers/test_github_reader.py index 8ff970dc8b0..8aa41104480 100644 --- a/ingestion/tests/unit/readers/test_github_reader.py +++ b/ingestion/tests/unit/readers/test_github_reader.py @@ -37,9 +37,11 @@ class TestGitHubReader(TestCase): self.assertEqual(reader.auth_headers, {"Authorization": "Bearer token"}) - def test_read(self): + def x_test_read(self): """ We can read the OM README + + disabling this test as it is flakey and fails with error rate limit exceeded """ creds = GitHubCredentials( repositoryName="OpenMetadata", diff --git a/ingestion/tests/unit/topology/dashboard/test_lookml_github_reader.py b/ingestion/tests/unit/topology/dashboard/test_lookml_github_reader.py index 306650f48c8..658ee8aaed3 100644 --- a/ingestion/tests/unit/topology/dashboard/test_lookml_github_reader.py +++ b/ingestion/tests/unit/topology/dashboard/test_lookml_github_reader.py @@ -35,11 +35,13 @@ class TestLookMLGitHubReader(TestCase): reader = GitHubReader(creds) parser = LkmlParser(reader) - def test_lookml_read_and_parse(self): + def x_test_lookml_read_and_parse(self): """ We can parse the explore file. We'll expand and find views from https://github.com/open-metadata/lookml-sample/blob/main/cats.explore.lkml + + disabling this test as it is flakey and fails with error rate limit exceeded """ explore_file = "cats.explore.lkml" diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java index adde19256cc..d99caa3a54e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java @@ -1,6 +1,7 @@ package org.openmetadata.service.apps; import static com.cronutils.model.CronType.QUARTZ; +import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.JOB_LISTENER_NAME; import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_INFO_KEY; import static org.openmetadata.service.apps.scheduler.AppScheduler.COLLECTION_DAO_KEY; import static org.openmetadata.service.apps.scheduler.AppScheduler.SEARCH_CLIENT_KEY; @@ -13,10 +14,12 @@ import com.cronutils.model.definition.CronDefinitionBuilder; import com.cronutils.parser.CronParser; import java.util.List; import lombok.Getter; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.AppRuntime; import org.openmetadata.schema.api.services.ingestionPipelines.CreateIngestionPipeline; import org.openmetadata.schema.entity.app.App; +import org.openmetadata.schema.entity.app.AppRunRecord; import org.openmetadata.schema.entity.app.AppType; import org.openmetadata.schema.entity.app.ExternalAppIngestionConfig; import org.openmetadata.schema.entity.app.ScheduleType; @@ -29,6 +32,7 @@ import org.openmetadata.schema.type.ProviderType; import org.openmetadata.schema.type.Relationship; import org.openmetadata.service.Entity; import org.openmetadata.service.apps.scheduler.AppScheduler; +import org.openmetadata.service.apps.scheduler.OmAppJobListener; import org.openmetadata.service.exception.EntityNotFoundException; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.EntityRepository; @@ -39,6 +43,7 @@ import org.openmetadata.service.util.JsonUtils; import org.openmetadata.service.util.OpenMetadataConnectionBuilder; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; +import org.quartz.SchedulerException; @Slf4j public class AbstractNativeApplication implements NativeApplication { @@ -203,4 +208,20 @@ public class AbstractNativeApplication implements NativeApplication { .withLoggerLevel(create.getLoggerLevel()) .withService(create.getService()); } + + private OmAppJobListener getJobListener(JobExecutionContext jobExecutionContext) throws SchedulerException { + return (OmAppJobListener) jobExecutionContext.getScheduler().getListenerManager().getJobListener(JOB_LISTENER_NAME); + } + + @SneakyThrows + protected AppRunRecord getJobRecord(JobExecutionContext jobExecutionContext) { + OmAppJobListener listener = getJobListener(jobExecutionContext); + return listener.getAppRunRecordForJob(jobExecutionContext); + } + + @SneakyThrows + protected void pushAppStausUpdates(JobExecutionContext jobExecutionContext, AppRunRecord record, boolean update) { + OmAppJobListener listener = getJobListener(jobExecutionContext); + listener.pushApplicationStatusUpdates(jobExecutionContext, record, update); + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java index 05fe29cfd6e..4280c34191b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java @@ -21,10 +21,12 @@ import org.openmetadata.common.utils.CommonUtil; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.analytics.ReportData; import org.openmetadata.schema.entity.app.App; +import org.openmetadata.schema.entity.app.AppRunRecord; +import org.openmetadata.schema.entity.app.FailureContext; +import org.openmetadata.schema.entity.app.SuccessContext; import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration; import org.openmetadata.schema.system.EventPublisherJob; import org.openmetadata.schema.system.Failure; -import org.openmetadata.schema.system.FailureDetails; import org.openmetadata.schema.system.Stats; import org.openmetadata.schema.system.StepStats; import org.openmetadata.service.apps.AbstractNativeApplication; @@ -52,8 +54,8 @@ import org.quartz.JobExecutionContext; @Slf4j public class SearchIndexApp extends AbstractNativeApplication { private static final String ENTITY_TYPE_ERROR_MSG = "EntityType: %s %n Cause: %s %n Stack: %s"; - private List paginatedEntitiesSources = new ArrayList<>(); - private List paginatedDataInsightSources = new ArrayList<>(); + private final List paginatedEntitiesSources = new ArrayList<>(); + private final List paginatedDataInsightSources = new ArrayList<>(); private Processor entityProcessor; private Processor dataInsightProcessor; private Sink searchIndexSink; @@ -70,7 +72,11 @@ public class SearchIndexApp extends AbstractNativeApplication { JsonUtils.convertValue(app.getAppConfiguration(), EventPublisherJob.class) .withStats(new Stats()) .withFailure(new Failure()); + int totalRecords = getTotalRequestToProcess(request.getEntities(), collectionDAO); this.jobData = request; + this.jobData.setStats( + new Stats() + .withJobStats(new StepStats().withTotalRecords(totalRecords).withFailedRecords(0).withSuccessRecords(0))); request .getEntities() .forEach( @@ -89,13 +95,13 @@ public class SearchIndexApp extends AbstractNativeApplication { } }); if (searchRepository.getSearchType().equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) { - this.entityProcessor = new OpenSearchEntitiesProcessor(); - this.dataInsightProcessor = new OpenSearchDataInsightProcessor(); - this.searchIndexSink = new OpenSearchIndexSink(searchRepository); + this.entityProcessor = new OpenSearchEntitiesProcessor(totalRecords); + this.dataInsightProcessor = new OpenSearchDataInsightProcessor(totalRecords); + this.searchIndexSink = new OpenSearchIndexSink(searchRepository, totalRecords); } else { - this.entityProcessor = new ElasticSearchEntitiesProcessor(); - this.dataInsightProcessor = new ElasticSearchDataInsightProcessor(); - this.searchIndexSink = new ElasticSearchIndexSink(searchRepository); + this.entityProcessor = new ElasticSearchEntitiesProcessor(totalRecords); + this.dataInsightProcessor = new ElasticSearchDataInsightProcessor(totalRecords); + this.searchIndexSink = new ElasticSearchIndexSink(searchRepository, totalRecords); } } @@ -121,11 +127,33 @@ public class SearchIndexApp extends AbstractNativeApplication { } finally { // store job details in Database jobExecutionContext.getJobDetail().getJobDataMap().put(APP_RUN_STATS, jobData.getStats()); + // Update Record to db + updateRecordToDb(jobExecutionContext); // Send update sendUpdates(); } } + public void updateRecordToDb(JobExecutionContext jobExecutionContext) { + AppRunRecord appRecord = getJobRecord(jobExecutionContext); + + // Update Run Record with Status + appRecord.setStatus(AppRunRecord.Status.fromValue(jobData.getStatus().value())); + + // Update Error + if (jobData.getFailure() != null) { + appRecord.setFailureContext( + new FailureContext().withAdditionalProperty("failure", JsonUtils.pojoToJson(jobData.getFailure()))); + } + + // Update Stats + if (jobData.getStats() != null) { + appRecord.setSuccessContext(new SuccessContext().withAdditionalProperty("stats", jobData.getStats())); + } + + pushAppStausUpdates(jobExecutionContext, appRecord, true); + } + private void entitiesReIndex() { Map contextData = new HashMap<>(); for (PaginatedEntitiesSource paginatedEntitiesSource : paginatedEntitiesSources) { @@ -134,12 +162,8 @@ public class SearchIndexApp extends AbstractNativeApplication { ResultList resultList; while (!stopped && !paginatedEntitiesSource.isDone()) { long currentTime = System.currentTimeMillis(); - int requestToProcess = jobData.getBatchSize(); - int failed = requestToProcess; - int success = 0; try { resultList = paginatedEntitiesSource.readNext(null); - requestToProcess = resultList.getData().size() + resultList.getErrors().size(); if (!resultList.getData().isEmpty()) { if (searchRepository.getSearchType().equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) { // process data to build Reindex Request @@ -150,8 +174,6 @@ public class SearchIndexApp extends AbstractNativeApplication { (os.org.opensearch.action.bulk.BulkResponse) searchIndexSink.write(requests, contextData); // update Status handleErrorsOs(resultList, paginatedEntitiesSource.getLastFailedCursor(), response, currentTime); - // Update stats - success = searchRepository.getSearchClient().getSuccessFromBulkResponse(response); } else { // process data to build Reindex Request BulkRequest requests = (BulkRequest) entityProcessor.process(resultList, contextData); @@ -159,13 +181,17 @@ public class SearchIndexApp extends AbstractNativeApplication { BulkResponse response = (BulkResponse) searchIndexSink.write(requests, contextData); // update Status handleErrorsEs(resultList, paginatedEntitiesSource.getLastFailedCursor(), response, currentTime); - // Update stats - success = searchRepository.getSearchClient().getSuccessFromBulkResponse(response); } - failed = requestToProcess - success; - } else { - failed = 0; } + } catch (SourceException rx) { + handleSourceError( + rx.getMessage(), + String.format( + ENTITY_TYPE_ERROR_MSG, + paginatedEntitiesSource.getEntityType(), + rx.getCause(), + ExceptionUtils.getStackTrace(rx)), + currentTime); } catch (ProcessorException px) { handleProcessorError( px.getMessage(), @@ -184,16 +210,10 @@ public class SearchIndexApp extends AbstractNativeApplication { wx.getCause(), ExceptionUtils.getStackTrace(wx)), currentTime); - } finally { - updateStats( - success, - failed, - paginatedEntitiesSource.getStats(), - entityProcessor.getStats(), - searchIndexSink.getStats()); - sendUpdates(); } } + updateStats(paginatedEntitiesSource.getEntityType(), paginatedEntitiesSource.getStats()); + sendUpdates(); } } @@ -205,12 +225,8 @@ public class SearchIndexApp extends AbstractNativeApplication { ResultList resultList; while (!stopped && !paginatedDataInsightSource.isDone()) { long currentTime = System.currentTimeMillis(); - int requestToProcess = jobData.getBatchSize(); - int failed = requestToProcess; - int success = 0; try { resultList = paginatedDataInsightSource.readNext(null); - requestToProcess = resultList.getData().size() + resultList.getErrors().size(); if (!resultList.getData().isEmpty()) { if (searchRepository.getSearchType().equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) { // process data to build Reindex Request @@ -220,20 +236,13 @@ public class SearchIndexApp extends AbstractNativeApplication { os.org.opensearch.action.bulk.BulkResponse response = (os.org.opensearch.action.bulk.BulkResponse) searchIndexSink.write(requests, contextData); handleErrorsOs(resultList, "", response, currentTime); - // Update stats - success = searchRepository.getSearchClient().getSuccessFromBulkResponse(response); } else { // process data to build Reindex Request BulkRequest requests = (BulkRequest) dataInsightProcessor.process(resultList, contextData); // process data to build Reindex Request BulkResponse response = (BulkResponse) searchIndexSink.write(requests, contextData); handleErrorsEs(resultList, "", response, currentTime); - // Update stats - success = searchRepository.getSearchClient().getSuccessFromBulkResponse(response); } - failed = requestToProcess - success; - } else { - failed = 0; } } catch (SourceException rx) { handleSourceError( @@ -262,16 +271,10 @@ public class SearchIndexApp extends AbstractNativeApplication { wx.getCause(), ExceptionUtils.getStackTrace(wx)), currentTime); - } finally { - updateStats( - success, - failed, - paginatedDataInsightSource.getStats(), - dataInsightProcessor.getStats(), - searchIndexSink.getStats()); - sendUpdates(); } } + updateStats(paginatedDataInsightSource.getEntityType(), paginatedDataInsightSource.getStats()); + sendUpdates(); } } @@ -284,41 +287,31 @@ public class SearchIndexApp extends AbstractNativeApplication { } } - public void updateStats( - int currentSuccess, int currentFailed, StepStats reader, StepStats processor, StepStats writer) { + public void updateStats(String entityType, StepStats currentEntityStats) { // Job Level Stats Stats jobDataStats = jobData.getStats(); + // Update Entity Level Stats + StepStats entityLevelStats = jobDataStats.getEntityStats(); + if (entityLevelStats == null) { + entityLevelStats = new StepStats(); + } + entityLevelStats.withAdditionalProperty(entityType, currentEntityStats); + // Total Stats StepStats stats = jobData.getStats().getJobStats(); if (stats == null) { stats = new StepStats().withTotalRecords(getTotalRequestToProcess(jobData.getEntities(), collectionDAO)); } - getUpdatedStats(stats, currentSuccess, currentFailed); + getUpdatedStats(stats, currentEntityStats.getSuccessRecords(), currentEntityStats.getFailedRecords()); // Update for the Job jobDataStats.setJobStats(stats); - // Source Stats - jobDataStats.setSourceStats(getTotalStatsTillCurrentRun(jobDataStats.getSourceStats(), reader)); - // Processor - jobDataStats.setProcessorStats(processor); - // Writer - jobDataStats.setSinkStats(writer); + jobDataStats.setEntityStats(entityLevelStats); jobData.setStats(jobDataStats); } - private StepStats getTotalStatsTillCurrentRun(StepStats sourceStat, StepStats newInputStat) { - if (sourceStat == null) { - sourceStat = new StepStats(); - } - sourceStat.setTotalRecords(sourceStat.getTotalRecords() + newInputStat.getTotalRecords()); - sourceStat.setProcessedRecords(sourceStat.getProcessedRecords() + newInputStat.getProcessedRecords()); - sourceStat.setSuccessRecords(sourceStat.getSuccessRecords() + newInputStat.getSuccessRecords()); - sourceStat.setFailedRecords(sourceStat.getFailedRecords() + newInputStat.getFailedRecords()); - return sourceStat; - } - private void reCreateIndexes(String entityType) { if (Boolean.FALSE.equals(jobData.getRecreateIndex())) { return; @@ -343,62 +336,63 @@ public class SearchIndexApp extends AbstractNativeApplication { } private void handleSourceError(String context, String reason, long time) { - Failure failures = getFailure(); - FailureDetails readerFailures = getFailureDetails(context, reason, time); - failures.setSourceError(readerFailures); - jobData.setFailure(failures); + handleError("source", context, reason, time); } private void handleProcessorError(String context, String reason, long time) { - Failure failures = getFailure(); - FailureDetails processorError = getFailureDetails(context, reason, time); - failures.setProcessorError(processorError); + handleError("processor", context, reason, time); + } + + private void handleError(String errType, String context, String reason, long time) { + Failure failures = jobData.getFailure() != null ? jobData.getFailure() : new Failure(); + failures.withAdditionalProperty("errorFrom", errType); + failures.withAdditionalProperty("context", context); + failures.withAdditionalProperty("lastFailedReason", reason); + failures.withAdditionalProperty("lastFailedAt", time); jobData.setFailure(failures); } private void handleEsSinkError(String context, String reason, long time) { - Failure failures = getFailure(); - FailureDetails writerFailure = getFailureDetails(context, reason, time); - failures.setSinkError(writerFailure); - jobData.setFailure(failures); + handleError("sink", context, reason, time); } private void handleJobError(String context, String reason, long time) { - Failure failures = getFailure(); - FailureDetails jobFailure = getFailureDetails(context, reason, time); - failures.setJobError(jobFailure); - jobData.setFailure(failures); + handleError("job", context, reason, time); } @SneakyThrows private void handleSourceError(ResultList data, String lastCursor, long time) { if (!data.getErrors().isEmpty()) { + StringBuilder builder = new StringBuilder(); + for (String str : data.getErrors()) { + builder.append(str); + builder.append("%n"); + } handleSourceError( String.format("SourceContext: After Cursor : %s, Encountered Error While Reading Data.", lastCursor), - String.format( - "Following Entities were not fetched Successfully : %s", JsonUtils.pojoToJson(data.getErrors())), + String.format("Following Entities were not fetched Successfully : %s", builder), time); } } @SneakyThrows private void handleOsSinkErrors(os.org.opensearch.action.bulk.BulkResponse response, long time) { - List details = new ArrayList<>(); + List> details = new ArrayList<>(); for (os.org.opensearch.action.bulk.BulkItemResponse bulkItemResponse : response) { if (bulkItemResponse.isFailed()) { + Map detailsMap = new HashMap<>(); os.org.opensearch.action.bulk.BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); - FailureDetails esFailure = - new FailureDetails() - .withContext( - String.format( - "EsWriterContext: Encountered Error While Writing Data %n Entity %n ID : [%s] ", - failure.getId())) - .withLastFailedReason( - String.format( - "Index Type: [%s], Reason: [%s] %n Trace : [%s]", - failure.getIndex(), failure.getMessage(), ExceptionUtils.getStackTrace(failure.getCause()))) - .withLastFailedAt(System.currentTimeMillis()); - details.add(esFailure); + detailsMap.put( + "context", + String.format( + "EsWriterContext: Encountered Error While Writing Data %n Entity %n ID : [%s] ", failure.getId())); + detailsMap.put( + "lastFailedReason", + String.format( + "Index Type: [%s], Reason: [%s] %n Trace : [%s]", + failure.getIndex(), failure.getMessage(), ExceptionUtils.getStackTrace(failure.getCause()))); + detailsMap.put("lastFailedAt", System.currentTimeMillis()); + details.add(detailsMap); } } if (!details.isEmpty()) { @@ -411,22 +405,23 @@ public class SearchIndexApp extends AbstractNativeApplication { @SneakyThrows private void handleEsSinkErrors(BulkResponse response, long time) { - List details = new ArrayList<>(); + + List> details = new ArrayList<>(); for (BulkItemResponse bulkItemResponse : response) { if (bulkItemResponse.isFailed()) { + Map detailsMap = new HashMap<>(); BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); - FailureDetails esFailure = - new FailureDetails() - .withContext( - String.format( - "EsWriterContext: Encountered Error While Writing Data %n Entity %n ID : [%s] ", - failure.getId())) - .withLastFailedReason( - String.format( - "Index Type: [%s], Reason: [%s] %n Trace : [%s]", - failure.getIndex(), failure.getMessage(), ExceptionUtils.getStackTrace(failure.getCause()))) - .withLastFailedAt(System.currentTimeMillis()); - details.add(esFailure); + detailsMap.put( + "context", + String.format( + "EsWriterContext: Encountered Error While Writing Data %n Entity %n ID : [%s] ", failure.getId())); + detailsMap.put( + "lastFailedReason", + String.format( + "Index Type: [%s], Reason: [%s] %n Trace : [%s]", + failure.getIndex(), failure.getMessage(), ExceptionUtils.getStackTrace(failure.getCause()))); + detailsMap.put("lastFailedAt", System.currentTimeMillis()); + details.add(detailsMap); } } if (!details.isEmpty()) { @@ -441,9 +436,7 @@ public class SearchIndexApp extends AbstractNativeApplication { if (stopped) { jobData.setStatus(EventPublisherJob.Status.STOPPED); } else { - if (jobData.getFailure().getSinkError() != null - || jobData.getFailure().getSourceError() != null - || jobData.getFailure().getProcessorError() != null) { + if (jobData.getFailure() != null && !jobData.getFailure().getAdditionalProperties().isEmpty()) { jobData.setStatus(EventPublisherJob.Status.FAILED); } else { jobData.setStatus(EventPublisherJob.Status.COMPLETED); @@ -451,14 +444,6 @@ public class SearchIndexApp extends AbstractNativeApplication { } } - private Failure getFailure() { - return jobData.getFailure() != null ? jobData.getFailure() : new Failure(); - } - - private FailureDetails getFailureDetails(String context, String reason, long time) { - return new FailureDetails().withContext(context).withLastFailedReason(reason).withLastFailedAt(time); - } - public void stopJob() { stopped = true; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java index 6560d6b4adf..e89b765ab17 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java @@ -2,6 +2,7 @@ package org.openmetadata.service.apps.scheduler; import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_INFO_KEY; +import java.util.UUID; import org.apache.commons.lang.exception.ExceptionUtils; import org.openmetadata.schema.entity.app.App; import org.openmetadata.schema.entity.app.AppRunRecord; @@ -16,11 +17,10 @@ import org.quartz.JobExecutionException; import org.quartz.JobListener; public abstract class AbstractOmAppJobListener implements JobListener { - private CollectionDAO collectionDAO; + private final CollectionDAO collectionDAO; private static final String SCHEDULED_APP_RUN_EXTENSION = "AppScheduleRun"; - private static final String SCHEDULED_APP_RUN_RECORD_SCHEMA = "applicationRunRecord.json"; public static final String APP_RUN_STATS = "AppRunStats"; - static final String JOB_LISTENER_NAME = "OM_JOB_LISTENER"; + public static final String JOB_LISTENER_NAME = "OM_JOB_LISTENER"; protected AbstractOmAppJobListener(CollectionDAO dao) { this.collectionDAO = dao; @@ -50,8 +50,8 @@ public abstract class AbstractOmAppJobListener implements JobListener { // Put the Context in the Job Data Map dataMap.put(SCHEDULED_APP_RUN_EXTENSION, runRecord); - // Run the Scheduled Run Record on the time series - collectionDAO.appExtensionTimeSeriesDao().insert(JsonUtils.pojoToJson(runRecord)); + // Insert new Record Run + pushApplicationStatusUpdates(jobExecutionContext, runRecord, false); this.doJobToBeExecuted(jobExecutionContext); } @@ -67,27 +67,58 @@ public abstract class AbstractOmAppJobListener implements JobListener { long endTime = System.currentTimeMillis(); runRecord.withEndTime(endTime); - boolean success = jobException == null; - if (success) { + if (jobException == null + && !(runRecord.getStatus() == AppRunRecord.Status.FAILED + || runRecord.getStatus() == AppRunRecord.Status.ACTIVE_ERROR)) { runRecord.withStatus(AppRunRecord.Status.SUCCESS); SuccessContext context = new SuccessContext(); + if (runRecord.getSuccessContext() != null) { + context = runRecord.getSuccessContext(); + } context.getAdditionalProperties().put("stats", JsonUtils.getMap(jobStats)); runRecord.setSuccessContext(context); } else { runRecord.withStatus(AppRunRecord.Status.FAILED); FailureContext context = new FailureContext(); - context.withAdditionalProperty("message", jobException.getMessage()); - context.withAdditionalProperty("stackTrace", ExceptionUtils.getStackTrace(jobException)); + if (runRecord.getFailureContext() != null) { + context = runRecord.getFailureContext(); + } + if (jobException != null) { + context.withAdditionalProperty("message", jobException.getMessage()); + context.withAdditionalProperty("jobStackTrace", ExceptionUtils.getStackTrace(jobException)); + } runRecord.setFailureContext(context); } - collectionDAO - .appExtensionTimeSeriesDao() - .update(runRecord.getAppId().toString(), JsonUtils.pojoToJson(runRecord), runRecord.getTimestamp()); + // Update App Run Record + pushApplicationStatusUpdates(jobExecutionContext, runRecord, true); this.doJobWasExecuted(jobExecutionContext, jobException); } + public AppRunRecord getAppRunRecordForJob(JobExecutionContext context) { + JobDataMap dataMap = context.getJobDetail().getJobDataMap(); + return (AppRunRecord) dataMap.get(SCHEDULED_APP_RUN_EXTENSION); + } + + public void pushApplicationStatusUpdates(JobExecutionContext context, AppRunRecord runRecord, boolean update) { + JobDataMap dataMap = context.getJobDetail().getJobDataMap(); + if (dataMap.containsKey(SCHEDULED_APP_RUN_EXTENSION)) { + App jobApp = (App) context.getJobDetail().getJobDataMap().get(APP_INFO_KEY); + updateStatus(jobApp.getId(), runRecord, update); + } + } + + private void updateStatus(UUID appId, AppRunRecord record, boolean update) { + if (update) { + collectionDAO + .appExtensionTimeSeriesDao() + .update(appId.toString(), JsonUtils.pojoToJson(record), record.getTimestamp()); + } else { + collectionDAO.appExtensionTimeSeriesDao().insert(JsonUtils.pojoToJson(record)); + } + } + protected void doJobWasExecuted(JobExecutionContext jobExecutionContext, JobExecutionException jobException) {} protected void doJobToBeExecuted(JobExecutionContext jobExecutionContext) {} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/exception/SourceException.java b/openmetadata-service/src/main/java/org/openmetadata/service/exception/SourceException.java index a04cdb5602e..04a1178f3f9 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/exception/SourceException.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/exception/SourceException.java @@ -19,4 +19,12 @@ public class SourceException extends IOException { public SourceException(String msg, Throwable throwable) { super(msg, throwable); } + + public SourceException(Throwable throwable) { + super(throwable); + } + + public SourceException(String msg) { + super(msg); + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java index f9a5fb86c48..5a49fff8bd1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java @@ -610,7 +610,7 @@ public abstract class EntityRepository { entities.add(withHref(uriInfo, entity)); } catch (Exception e) { LOG.error("Failed in Set Fields for Entity with Json : {}", json); - errors.add(json); + errors.add(String.format("Error Message : %s , %n Entity Json : %s", e.getMessage(), json)); } } currentOffset = currentOffset + limitParam; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java index 86745258f97..d8eeac68e66 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java @@ -553,11 +553,10 @@ public class AppResource extends EntityResource { @DefaultValue("false") boolean hardDelete, @Parameter(description = "Name of the App", schema = @Schema(type = "string")) @PathParam("name") String name) { - Response response = deleteByName(uriInfo, securityContext, name, true, hardDelete); - if (response.getStatus() == Response.Status.OK.getStatusCode()) { - deleteApp(securityContext, (App) response.getEntity(), hardDelete); - } - return response; + App app = repository.getByName(null, name, repository.getFields("bot,pipelines")); + // Remove from Pipeline Service + deleteApp(securityContext, app, hardDelete); + return deleteByName(uriInfo, securityContext, name, true, hardDelete); } @DELETE @@ -578,11 +577,11 @@ public class AppResource extends EntityResource { @DefaultValue("false") boolean hardDelete, @Parameter(description = "Id of the App", schema = @Schema(type = "UUID")) @PathParam("id") UUID id) { - Response response = delete(uriInfo, securityContext, id, true, hardDelete); - if (response.getStatus() == Response.Status.OK.getStatusCode()) { - deleteApp(securityContext, (App) response.getEntity(), hardDelete); - } - return response; + App app = repository.get(null, id, repository.getFields("bot,pipelines")); + // Remove from Pipeline Service + deleteApp(securityContext, app, hardDelete); + // Remove from repository + return delete(uriInfo, securityContext, id, true, hardDelete); } @PUT @@ -794,25 +793,29 @@ public class AppResource extends EntityResource { throw new InternalServerErrorException("Failed in Delete App from Scheduler."); } } else { - App app = repository.getByName(null, installedApp.getName(), repository.getFields("bot,pipelines")); - if (!nullOrEmpty(app.getPipelines())) { - EntityReference pipelineRef = app.getPipelines().get(0); + if (!nullOrEmpty(installedApp.getPipelines())) { + EntityReference pipelineRef = installedApp.getPipelines().get(0); IngestionPipelineRepository ingestionPipelineRepository = (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); IngestionPipeline ingestionPipeline = ingestionPipelineRepository.get( null, pipelineRef.getId(), ingestionPipelineRepository.getFields(FIELD_OWNER)); - - if (hardDelete) { - // Remove the Pipeline in case of Delete - if (!nullOrEmpty(app.getPipelines())) { - pipelineServiceClient.deletePipeline(ingestionPipeline); + try { + if (hardDelete) { + // Remove the Pipeline in case of Delete + if (!nullOrEmpty(installedApp.getPipelines())) { + pipelineServiceClient.deletePipeline(ingestionPipeline); + } + } else { + // Just Kill Running ingestion + if (Boolean.TRUE.equals(ingestionPipeline.getDeployed())) { + decryptOrNullify(securityContext, ingestionPipeline, installedApp.getBot().getName(), true); + pipelineServiceClient.killIngestion(ingestionPipeline); + } } - } else { - // Just Kill Running ingestion - decryptOrNullify(securityContext, ingestionPipeline, app.getBot().getName(), true); - pipelineServiceClient.killIngestion(ingestionPipeline); + } catch (Exception ex) { + LOG.error("Failed in Pipeline Service Client : ", ex); } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java index cc53591586b..57ef96e1e17 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java @@ -1,8 +1,13 @@ package org.openmetadata.service.search; +import static org.openmetadata.service.Entity.AGGREGATED_COST_ANALYSIS_REPORT_DATA; +import static org.openmetadata.service.Entity.ENTITY_REPORT_DATA; import static org.openmetadata.service.Entity.FIELD_FOLLOWERS; import static org.openmetadata.service.Entity.FIELD_USAGE_SUMMARY; import static org.openmetadata.service.Entity.QUERY; +import static org.openmetadata.service.Entity.RAW_COST_ANALYSIS_REPORT_DATA; +import static org.openmetadata.service.Entity.WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA; +import static org.openmetadata.service.Entity.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA; import static org.openmetadata.service.search.SearchClient.DEFAULT_UPDATE_SCRIPT; import static org.openmetadata.service.search.SearchClient.GLOBAL_SEARCH_ALIAS; import static org.openmetadata.service.search.SearchClient.PROPAGATE_ENTITY_REFERENCE_FIELD_SCRIPT; @@ -69,11 +74,11 @@ public class SearchRepository { public final List dataInsightReports = List.of( - "entityReportData", - "webAnalyticEntityViewReportData", - "webAnalyticUserActivityReportData", - "rawCostAnalysisReportData", - "aggregatedCostAnalysisReportData"); + ENTITY_REPORT_DATA, + WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA, + WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA, + RAW_COST_ANALYSIS_REPORT_DATA, + AGGREGATED_COST_ANALYSIS_REPORT_DATA); public static final String ELASTIC_SEARCH_EXTENSION = "service.eventPublisher"; public static final String ELASTIC_SEARCH_ENTITY_FQN_STREAM = "eventPublisher:ElasticSearch:STREAM"; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchDataInsightProcessor.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchDataInsightProcessor.java index 392e6c63ae0..87ee58f3459 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchDataInsightProcessor.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchDataInsightProcessor.java @@ -24,6 +24,10 @@ import org.openmetadata.service.workflows.interfaces.Processor; public class ElasticSearchDataInsightProcessor implements Processor> { private final StepStats stats = new StepStats(); + public ElasticSearchDataInsightProcessor(int total) { + this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0); + } + @Override public BulkRequest process(ResultList input, Map contextData) throws ProcessorException { String entityType = (String) contextData.get(ENTITY_TYPE_KEY); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchEntitiesProcessor.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchEntitiesProcessor.java index 32690657161..7e2fd7eef22 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchEntitiesProcessor.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchEntitiesProcessor.java @@ -25,6 +25,10 @@ import org.openmetadata.service.workflows.interfaces.Processor; public class ElasticSearchEntitiesProcessor implements Processor> { private final StepStats stats = new StepStats(); + public ElasticSearchEntitiesProcessor(int total) { + this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0); + } + @Override public BulkRequest process(ResultList input, Map contextData) throws ProcessorException { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexSink.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexSink.java index 54b673af423..909bcbcd7bb 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexSink.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexSink.java @@ -18,8 +18,9 @@ public class ElasticSearchIndexSink implements Sink { private final StepStats stats = new StepStats(); private final SearchRepository searchRepository; - public ElasticSearchIndexSink(SearchRepository searchRepository) { + public ElasticSearchIndexSink(SearchRepository searchRepository, int total) { this.searchRepository = searchRepository; + this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0); } @Override diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchDataInsightProcessor.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchDataInsightProcessor.java index 670cb7602ce..479d288ed0c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchDataInsightProcessor.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchDataInsightProcessor.java @@ -24,6 +24,10 @@ import os.org.opensearch.common.xcontent.XContentType; public class OpenSearchDataInsightProcessor implements Processor> { private final StepStats stats = new StepStats(); + public OpenSearchDataInsightProcessor(int total) { + this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0); + } + @Override public BulkRequest process(ResultList input, Map contextData) throws ProcessorException { String entityType = (String) contextData.get(ENTITY_TYPE_KEY); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchEntitiesProcessor.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchEntitiesProcessor.java index 8f7064dc627..8b8c1526ff5 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchEntitiesProcessor.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchEntitiesProcessor.java @@ -25,6 +25,10 @@ import os.org.opensearch.common.xcontent.XContentType; public class OpenSearchEntitiesProcessor implements Processor> { private final StepStats stats = new StepStats(); + public OpenSearchEntitiesProcessor(int total) { + this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0); + } + @Override public BulkRequest process(ResultList input, Map contextData) throws ProcessorException { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchIndexSink.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchIndexSink.java index 4283790076b..8eb946d32dc 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchIndexSink.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchIndexSink.java @@ -18,8 +18,9 @@ public class OpenSearchIndexSink implements Sink { private final StepStats stats = new StepStats(); private final SearchRepository searchRepository; - public OpenSearchIndexSink(SearchRepository repository) { + public OpenSearchIndexSink(SearchRepository repository, int total) { this.searchRepository = repository; + this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0); } @Override @@ -27,7 +28,6 @@ public class OpenSearchIndexSink implements Sink { LOG.debug("[EsSearchIndexSink] Processing a Batch of Size: {}", data.numberOfActions()); try { BulkResponse response = searchRepository.getSearchClient().bulk(data, RequestOptions.DEFAULT); - // BulkResponse response = null; int currentSuccess = getSuccessFromBulkResponse(response); int currentFailed = response.getItems().length - currentSuccess; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/interfaces/Source.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/interfaces/Source.java index fa6c4d155aa..fbc73b5898c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/interfaces/Source.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/interfaces/Source.java @@ -13,11 +13,14 @@ package org.openmetadata.service.workflows.interfaces; +import java.util.List; import java.util.Map; import org.openmetadata.service.exception.SourceException; public interface Source extends Stats { R readNext(Map contextData) throws SourceException; + List getReaderErrors(); + void reset(); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedDataInsightSource.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedDataInsightSource.java index 2e26ce4e3fc..fac737b4e96 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedDataInsightSource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedDataInsightSource.java @@ -33,7 +33,8 @@ public class PaginatedDataInsightSource implements Source private final CollectionDAO dao; @Getter private final String entityType; @Getter private final int batchSize; - private final StepStats stats = new StepStats(); + @Getter private final List readerErrors = new ArrayList<>(); + @Getter private final StepStats stats = new StepStats(); private String cursor = null; @Getter private boolean isDone = false; @@ -41,7 +42,10 @@ public class PaginatedDataInsightSource implements Source this.dao = dao; this.entityType = entityType; this.batchSize = batchSize; - stats.setTotalRecords(dao.reportDataTimeSeriesDao().listCount(entityType)); + this.stats + .withTotalRecords(dao.reportDataTimeSeriesDao().listCount(entityType)) + .withSuccessRecords(0) + .withFailedRecords(0); } @Override @@ -66,7 +70,7 @@ public class PaginatedDataInsightSource implements Source private ResultList read(String afterCursor) throws SourceException { LOG.debug("[DataInsightReader] Fetching a Batch of Size: {} ", batchSize); - ResultList result; + ResultList result = null; try { result = getReportDataPagination(entityType, batchSize, afterCursor); LOG.debug( @@ -76,14 +80,27 @@ public class PaginatedDataInsightSource implements Source 0); updateStats(result.getData().size(), result.getErrors().size()); } catch (Exception ex) { - LOG.debug("[DataInsightReader] Batch Stats :- Submitted : {} Success: {} Failed: {}", batchSize, 0, batchSize); - if (stats.getTotalRecords() - stats.getProcessedRecords() <= batchSize) { - isDone = true; - updateStats(0, stats.getTotalRecords() - stats.getProcessedRecords()); + String errMsg = + String.format( + "[DataInsightReader] Failing Completely. Batch Stats :- Submitted : %s Success: %s Failed: %s", + batchSize, 0, batchSize); + LOG.debug(errMsg); + if (result != null) { + if (result.getPaging().getAfter() == null) { + isDone = true; + int recordToRead = stats.getTotalRecords() - (stats.getSuccessRecords() + stats.getFailedRecords()); + updateStats(result.getData().size(), recordToRead - result.getData().size()); + } else { + updateStats(result.getData().size(), batchSize - result.getData().size()); + } } else { updateStats(0, batchSize); } - throw new SourceException("[EntitiesReader] Batch encountered Exception. Failing Completely.", ex); + + // Add the error to the list + readerErrors.add(errMsg); + + throw new SourceException(errMsg, ex); } return result; @@ -117,9 +134,4 @@ public class PaginatedDataInsightSource implements Source public void updateStats(int currentSuccess, int currentFailed) { getUpdatedStats(stats, currentSuccess, currentFailed); } - - @Override - public StepStats getStats() { - return stats; - } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java index 2523a15fb62..8e6907c4d90 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java @@ -15,6 +15,7 @@ package org.openmetadata.service.workflows.searchIndex; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats; +import java.util.ArrayList; import java.util.List; import java.util.Map; import lombok.Getter; @@ -23,8 +24,10 @@ import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.system.StepStats; import org.openmetadata.schema.type.Include; import org.openmetadata.service.Entity; +import org.openmetadata.service.exception.SourceException; import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.jdbi3.ListFilter; +import org.openmetadata.service.util.JsonUtils; import org.openmetadata.service.util.RestUtil; import org.openmetadata.service.util.ResultList; import org.openmetadata.service.workflows.interfaces.Source; @@ -34,9 +37,9 @@ public class PaginatedEntitiesSource implements Source fields; - private final StepStats stats = new StepStats(); + @Getter private final List readerErrors = new ArrayList<>(); + @Getter private final StepStats stats = new StepStats(); private String lastFailedCursor = null; - private String cursor = RestUtil.encodeCursor("0"); @Getter private boolean isDone = false; @@ -44,48 +47,71 @@ public class PaginatedEntitiesSource implements Source readNext(Map contextData) { + public ResultList readNext(Map contextData) throws SourceException { + ResultList data = null; if (!isDone) { - ResultList data = read(cursor); + data = read(cursor); cursor = data.getPaging().getAfter(); if (cursor == null) { isDone = true; } - return data; - } else { - return null; } + return data; } - private ResultList read(String cursor) { + private ResultList read(String cursor) throws SourceException { LOG.debug("[PaginatedEntitiesSource] Fetching a Batch of Size: {} ", batchSize); EntityRepository entityRepository = Entity.getEntityRepository(entityType); - ResultList result; - result = - entityRepository.listAfterWithSkipFailure( - null, Entity.getFields(entityType, fields), new ListFilter(Include.ALL), batchSize, cursor); - if (!result.getErrors().isEmpty()) { - lastFailedCursor = this.cursor; - result - .getErrors() - .forEach( - error -> - LOG.error( - "[PaginatedEntitiesSource] Failed in getting Record, After Cursor : {} , RECORD: {}", - result.getPaging().getAfter(), - error)); - } + ResultList result = null; + try { + result = + entityRepository.listAfterWithSkipFailure( + null, Entity.getFields(entityType, fields), new ListFilter(Include.ALL), batchSize, cursor); + if (!result.getErrors().isEmpty()) { + lastFailedCursor = this.cursor; + String errMsg = + String.format( + "[PaginatedEntitiesSource] Encountered Failures. %n Marked After Cursor : %s, %n Batch Stats :- %n Submitted : %s Success: %s Failed: %s, %n Errors : %s", + this.lastFailedCursor, + batchSize, + result.getData().size(), + result.getErrors().size(), + JsonUtils.pojoToJson(result.getErrors())); + LOG.error(errMsg); + throw new SourceException(errMsg); + } - LOG.debug( - "[PaginatedEntitiesSource] Batch Stats :- Submitted : {} Success: {} Failed: {}", - batchSize, - result.getData().size(), - result.getErrors().size()); - updateStats(result.getData().size(), result.getErrors().size()); + LOG.debug( + "[PaginatedEntitiesSource] Batch Stats :- %n Submitted : {} Success: {} Failed: {}", + batchSize, result.getData().size(), result.getErrors().size()); + updateStats(result.getData().size(), result.getErrors().size()); + } catch (Exception e) { + lastFailedCursor = this.cursor; + if (result != null) { + if (result.getPaging().getAfter() == null) { + isDone = true; + } else { + this.cursor = result.getPaging().getAfter(); + } + updateStats(result.getData().size(), result.getErrors().size()); + } else { + String errMsg = + String.format( + "[PaginatedEntitiesSource] Encountered Failures. %n Marked After Cursor : %s, %n Batch Stats :- Submitted : %s Success: %s Failed: %s, %n Errors : %s", + this.lastFailedCursor, batchSize, 0, batchSize, "No Relationship Issue , Json Processing or DB issue."); + LOG.debug(errMsg); + updateStats(0, batchSize); + } + + throw new SourceException(e); + } return result; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java index 31da3f9d451..092df10025c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java @@ -22,10 +22,13 @@ import os.org.opensearch.action.bulk.BulkItemResponse; import os.org.opensearch.action.bulk.BulkResponse; public class ReindexingUtil { + private ReindexingUtil() { + /*unused*/ + } + public static final String ENTITY_TYPE_KEY = "entityType"; public static void getUpdatedStats(StepStats stats, int currentSuccess, int currentFailed) { - stats.setProcessedRecords(stats.getProcessedRecords() + currentSuccess + currentFailed); stats.setSuccessRecords(stats.getSuccessRecords() + currentSuccess); stats.setFailedRecords(stats.getFailedRecords() + currentFailed); } @@ -41,7 +44,7 @@ public class ReindexingUtil { EntityRepository repository = Entity.getEntityRepository(entityType); total += repository.getDao().listTotalCount(); } else { - total += dao.entityExtensionTimeSeriesDao().listCount(entityType); + total += dao.reportDataTimeSeriesDao().listCount(entityType); } } return total; diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/applications/appRunRecord.json b/openmetadata-spec/src/main/resources/json/schema/entity/applications/appRunRecord.json index 5578474bf38..5f9378d7dc5 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/applications/appRunRecord.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/applications/appRunRecord.json @@ -14,8 +14,13 @@ "description": "Status for the Job.", "type": "string", "enum": [ + "started", "running", + "completed", "failed", + "active", + "activeError", + "stopped", "success" ] }, diff --git a/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json b/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json index f418471a27b..b706dbab087 100644 --- a/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json +++ b/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json @@ -34,11 +34,6 @@ "type": "integer", "default": 0 }, - "processedRecords": { - "description": "Records that are processed in", - "type": "integer", - "default": 0 - }, "successRecords": { "description": "Count of Total Successfully Records", "type": "integer", @@ -49,23 +44,16 @@ "type": "integer", "default": 0 } - }, - "additionalProperties": false + } }, "stats": { "type": "object", "properties": { - "sourceStats": { - "$ref": "#/definitions/stepStats" - }, - "processorStats": { - "$ref": "#/definitions/stepStats" - }, - "sinkStats": { - "$ref": "#/definitions/stepStats" - }, "jobStats": { "$ref": "#/definitions/stepStats" + }, + "entityStats": { + "$ref": "#/definitions/stepStats" } }, "additionalProperties": false @@ -99,41 +87,19 @@ "description": "This schema publisher run job status.", "type": "string", "enum": [ - "STARTED", - "RUNNING", - "COMPLETED", - "FAILED", - "ACTIVE", - "ACTIVE_WITH_ERROR", - "STOPPED" + "started", + "running", + "completed", + "failed", + "active", + "activeError", + "stopped", + "success" ] }, "failure": { "description": "List of Failures in the Job", - "type": "object", - "properties": { - "sourceError": { - "type": "object", - "$ref": "#/definitions/failureDetails", - "default": null - }, - "processorError": { - "type": "object", - "$ref": "#/definitions/failureDetails", - "default": null - }, - "sinkError": { - "type": "object", - "$ref": "#/definitions/failureDetails", - "default": null - }, - "jobError": { - "type": "object", - "$ref": "#/definitions/failureDetails", - "default": null - } - }, - "additionalProperties": false + "type": "object" }, "stats": { "$ref": "#/definitions/stats" diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Applications/AppLogsViewer/AppLogsViewer.component.tsx b/openmetadata-ui/src/main/resources/ui/src/components/Applications/AppLogsViewer/AppLogsViewer.component.tsx index d7268d0e14d..3d7ba9e9e16 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Applications/AppLogsViewer/AppLogsViewer.component.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/Applications/AppLogsViewer/AppLogsViewer.component.tsx @@ -12,8 +12,8 @@ */ import { Badge, Button, Card, Col, Divider, Row, Space } from 'antd'; -import { isEmpty, isNil } from 'lodash'; -import React, { useCallback, useMemo } from 'react'; +import { isNil } from 'lodash'; +import React, { useCallback } from 'react'; import { useTranslation } from 'react-i18next'; import { LazyLog } from 'react-lazylog'; import { ReactComponent as IconSuccessBadge } from '../../../assets/svg/success-badge.svg'; @@ -24,21 +24,7 @@ import { AppLogsViewerProps, JobStats } from './AppLogsViewer.interface'; const AppLogsViewer = ({ data }: AppLogsViewerProps) => { const { t } = useTranslation(); - const { failureContext, successContext, timestamp } = useMemo( - () => ({ - ...data, - }), - [data] - ); - - const hasSuccessStats = useMemo( - () => !isEmpty(successContext?.stats), - [successContext] - ); - const hasFailureStats = useMemo( - () => !isEmpty(failureContext?.stats), - [failureContext] - ); + const { successContext, failureContext, timestamp } = data; const handleJumpToEnd = () => { const logsBody = document.getElementsByClassName( @@ -155,27 +141,13 @@ const AppLogsViewer = ({ data }: AppLogsViewerProps) => { [timestamp, formatDateTimeWithTimezone] ); - const successContextRender = useMemo( - () => ( - <> - {hasSuccessStats && statsRender(successContext?.stats.jobStats)} - {logsRender(successContext?.stackTrace)} - - ), - [successContext] + return ( + <> + {successContext?.stats && statsRender(successContext?.stats.jobStats)} + {failureContext?.stats && statsRender(failureContext?.stats.jobStats)} + {logsRender(failureContext?.stackTrace ?? failureContext?.failure)} + ); - - const failureContextRender = useMemo( - () => ( - <> - {hasFailureStats && statsRender(failureContext?.stats.jobStats)} - {logsRender(failureContext?.stackTrace)} - - ), - [failureContext] - ); - - return successContext ? successContextRender : failureContextRender; }; export default AppLogsViewer;