diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java b/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java index a4a1988c80c..969417714e0 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java @@ -216,6 +216,7 @@ public final class Entity { // Time series entities public static final String ENTITY_REPORT_DATA = "entityReportData"; public static final String TEST_CASE_RESOLUTION_STATUS = "testCaseResolutionStatus"; + public static final String TEST_CASE_RESULT = "testCaseResult"; public static final String WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA = "webAnalyticEntityViewReportData"; public static final String WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA = diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java index 4b6a7ad00e4..89cac4e91ac 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java @@ -1,8 +1,10 @@ package org.openmetadata.service.apps.bundles.insights; import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.APP_RUN_STATS; +import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.WEBSOCKET_STATUS_CHANNEL; import static org.openmetadata.service.apps.scheduler.AppScheduler.ON_DEMAND_JOB; -import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getTotalRequestToProcess; +import static org.openmetadata.service.socket.WebSocketManager.DATA_INSIGHTS_JOB_BROADCAST_CHANNEL; +import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getInitialStatsForEntities; import es.org.elasticsearch.client.RestClient; import java.io.IOException; @@ -312,7 +314,10 @@ public class DataInsightsApp extends AbstractNativeApplication { if (stats == null) { stats = new StepStats() - .withTotalRecords(getTotalRequestToProcess(jobData.getEntities(), collectionDAO)); + .withTotalRecords( + getInitialStatsForEntities(jobData.getEntities()) + .getJobStats() + .getTotalRecords()); } stats.setTotalRecords( @@ -339,7 +344,7 @@ public class DataInsightsApp extends AbstractNativeApplication { jobData.setStats(jobDataStats); } - public void updateRecordToDb(JobExecutionContext jobExecutionContext) { + public void updateRecordToDbAndNotify(JobExecutionContext jobExecutionContext) { AppRunRecord appRecord = getJobRecord(jobExecutionContext); // Update Run Record with Status @@ -357,6 +362,12 @@ public class DataInsightsApp extends AbstractNativeApplication { new SuccessContext().withAdditionalProperty("stats", jobData.getStats())); } + if (WebSocketManager.getInstance() != null) { + WebSocketManager.getInstance() + .broadCastMessageToAll( + DATA_INSIGHTS_JOB_BROADCAST_CHANNEL, JsonUtils.pojoToJson(appRecord)); + } + pushAppStatusUpdates(jobExecutionContext, appRecord, true); } @@ -364,13 +375,12 @@ public class DataInsightsApp extends AbstractNativeApplication { try { // store job details in Database jobExecutionContext.getJobDetail().getJobDataMap().put(APP_RUN_STATS, jobData.getStats()); + jobExecutionContext + .getJobDetail() + .getJobDataMap() + .put(WEBSOCKET_STATUS_CHANNEL, DATA_INSIGHTS_JOB_BROADCAST_CHANNEL); // Update Record to db - updateRecordToDb(jobExecutionContext); - if (WebSocketManager.getInstance() != null) { - WebSocketManager.getInstance() - .broadCastMessageToAll( - WebSocketManager.JOB_STATUS_BROADCAST_CHANNEL, JsonUtils.pojoToJson(jobData)); - } + updateRecordToDbAndNotify(jobExecutionContext); } catch (Exception ex) { LOG.error("Failed to send updated stats with WebSocket", ex); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsReportApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsReportApp.java index 902a2b93722..0486d1bb781 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsReportApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsReportApp.java @@ -105,7 +105,7 @@ public class DataInsightsReportApp extends AbstractNativeApplication { throws SearchIndexException { PaginatedEntitiesSource teamReader = new PaginatedEntitiesSource(TEAM, 10, List.of("name", "email", "users")); - while (!teamReader.isDone()) { + while (!teamReader.isDone().get()) { ResultList resultList = (ResultList) teamReader.readNext(null); for (Team team : resultList.getData()) { Set emails = new HashSet<>(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/costAnalysis/CostAnalysisWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/costAnalysis/CostAnalysisWorkflow.java index 6f301a683b1..08378c7bcde 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/costAnalysis/CostAnalysisWorkflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/costAnalysis/CostAnalysisWorkflow.java @@ -102,7 +102,7 @@ public class CostAnalysisWorkflow { new PaginatedEntitiesSource(Entity.DATABASE_SERVICE, batchSize, List.of("*")); int total = 0; - while (!databaseServices.isDone()) { + while (!databaseServices.isDone().get()) { ResultList resultList = filterDatabaseServices(databaseServices.readNext(null)); if (!resultList.getData().isEmpty()) { @@ -158,7 +158,7 @@ public class CostAnalysisWorkflow { Optional initialProcessorError = Optional.empty(); - while (!source.isDone()) { + while (!source.isDone().get()) { try { ResultList resultList = source.readNext(null); List costAnalysisTableData = diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java index a4302fdae0d..2a46a3947bb 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java @@ -5,7 +5,7 @@ import static org.openmetadata.service.apps.bundles.insights.DataInsightsApp.get import static org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils.END_TIMESTAMP_KEY; import static org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils.START_TIMESTAMP_KEY; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY; -import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getTotalRequestToProcess; +import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getInitialStatsForEntities; import java.util.ArrayList; import java.util.Collections; @@ -19,6 +19,7 @@ import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration; import org.openmetadata.schema.system.IndexingError; +import org.openmetadata.schema.system.Stats; import org.openmetadata.schema.system.StepStats; import org.openmetadata.schema.type.Include; import org.openmetadata.service.apps.bundles.insights.DataInsightsApp; @@ -97,7 +98,8 @@ public class DataAssetsWorkflow { } private void initialize() { - int totalRecords = getTotalRequestToProcess(entityTypes, collectionDAO); + Stats stats = getInitialStatsForEntities(entityTypes); + int totalRecords = stats.getJobStats().getTotalRecords(); entityTypes.forEach( entityType -> { @@ -145,7 +147,7 @@ public class DataAssetsWorkflow { contextData.put(DATA_STREAM_KEY, getDataStreamName(source.getEntityType())); contextData.put(ENTITY_TYPE_KEY, source.getEntityType()); - while (!source.isDone()) { + while (!source.isDone().get()) { try { processEntity(source.readNext(null), contextData, source); } catch (SearchIndexException ex) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataQuality/DataQualityWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataQuality/DataQualityWorkflow.java new file mode 100644 index 00000000000..8b137891791 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataQuality/DataQualityWorkflow.java @@ -0,0 +1 @@ + diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/WebAnalyticsWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/WebAnalyticsWorkflow.java index 12e6f91c1dd..ac77f9ea98b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/WebAnalyticsWorkflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/WebAnalyticsWorkflow.java @@ -167,7 +167,7 @@ public class WebAnalyticsWorkflow { throws SearchIndexException { Optional error = Optional.empty(); - while (!source.isDone()) { + while (!source.isDone().get()) { ResultList resultList = source.readNext(null); try { if (!resultList.getData().isEmpty()) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/sources/PaginatedWebAnalyticEventDataSource.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/sources/PaginatedWebAnalyticEventDataSource.java index 049a73468cb..c741d4830c3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/sources/PaginatedWebAnalyticEventDataSource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/sources/PaginatedWebAnalyticEventDataSource.java @@ -6,8 +6,8 @@ import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getU import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import lombok.Getter; -import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.glassfish.jersey.internal.util.ExceptionUtils; import org.openmetadata.schema.analytics.WebAnalyticEventData; @@ -17,31 +17,33 @@ import org.openmetadata.schema.system.StepStats; import org.openmetadata.service.Entity; import org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils; import org.openmetadata.service.exception.SearchIndexException; +import org.openmetadata.service.jdbi3.ListFilter; import org.openmetadata.service.jdbi3.WebAnalyticEventRepository; import org.openmetadata.service.util.RestUtil; import org.openmetadata.service.util.ResultList; import org.openmetadata.service.workflows.interfaces.Source; @Slf4j +@Getter public class PaginatedWebAnalyticEventDataSource implements Source> { - @Getter private final String name; - @Getter private final int batchSize; - @Getter private final Long startTs; - @Getter private final Long endTs; - @Getter private final int totalRecords; - @Getter private final String entityType = Entity.WEB_ANALYTIC_EVENT; + private final String name; + private final int batchSize; + private final Long startTs; + private final Long endTs; + private final int totalRecords; + private final String entityType = Entity.WEB_ANALYTIC_EVENT; @Getter private final WebAnalyticEventRepository repository = (WebAnalyticEventRepository) Entity.getEntityRepository(entityType); - @Getter private final String eventType = WebAnalyticEventType.PAGE_VIEW.toString(); - @Getter private final List readerErrors = new ArrayList<>(); - @Getter private final StepStats stats = new StepStats(); - @Getter private String lastFailedCursor = null; - @Setter private String cursor = RestUtil.encodeCursor("0"); - @Getter private boolean isDone = false; + private final String eventType = WebAnalyticEventType.PAGE_VIEW.toString(); + private final List readerErrors = new ArrayList<>(); + private final StepStats stats = new StepStats(); + private String lastFailedCursor = null; + private final AtomicReference cursor = new AtomicReference<>(RestUtil.encodeCursor("0")); + private final AtomicReference isDone = new AtomicReference<>(false); public PaginatedWebAnalyticEventDataSource(int batchSize, Long startTs, Long endTs) { this.batchSize = batchSize; @@ -59,16 +61,43 @@ public class PaginatedWebAnalyticEventDataSource public ResultList readNext(Map contextData) throws SearchIndexException { ResultList data = null; - if (!isDone) { - data = read(cursor); - cursor = data.getPaging().getAfter(); - if (cursor == null) { - isDone = true; + if (Boolean.FALSE.equals(isDone.get())) { + data = read(cursor.get()); + cursor.set(data.getPaging().getAfter()); + if (cursor.get() == null) { + isDone.set(true); } } return data; } + @Override + public ResultList readWithCursor(String currentCursor) + throws SearchIndexException { + LOG.debug("[PaginatedEntityTimeSeriesSource] Fetching a Batch of Size: {} ", batchSize); + ResultList result; + try { + result = + repository.listWebAnalyticEventDataWithOffset( + currentCursor, eventType, batchSize, startTs, endTs, false, true); + LOG.debug( + "[PaginatedEntitiesSource] Batch Stats :- %n Submitted : {} Success: {} Failed: {}", + batchSize, result.getData().size(), result.getErrors().size()); + } catch (Exception e) { + IndexingError indexingError = + new IndexingError() + .withErrorSource(READER) + .withSuccessCount(0) + .withMessage( + "Issues in Reading A Batch For Entities. No Relationship Issue , Json Processing or DB issue.") + .withLastFailedCursor(lastFailedCursor) + .withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e)); + LOG.debug(indexingError.getMessage()); + throw new SearchIndexException(indexingError); + } + return result; + } + private ResultList read(String cursor) throws SearchIndexException { LOG.debug("[PaginatedEntityTimeSeriesSource] Fetching a Batch of Size: {} ", batchSize); ResultList result; @@ -77,11 +106,11 @@ public class PaginatedWebAnalyticEventDataSource repository.listWebAnalyticEventDataWithOffset( cursor, eventType, batchSize, startTs, endTs, false, true); if (!result.getErrors().isEmpty()) { - lastFailedCursor = this.cursor; + lastFailedCursor = this.cursor.get(); if (result.getPaging().getAfter() == null) { - isDone = true; + isDone.set(true); } else { - this.cursor = result.getPaging().getAfter(); + this.cursor.set(result.getPaging().getAfter()); } return result; } @@ -89,20 +118,20 @@ public class PaginatedWebAnalyticEventDataSource "[PaginatedEntitiesSource] Batch Stats :- %n Submitted : {} Success: {} Failed: {}", batchSize, result.getData().size(), result.getErrors().size()); } catch (Exception e) { - lastFailedCursor = this.cursor; + lastFailedCursor = this.cursor.get(); int remainingRecords = stats.getTotalRecords() - stats.getFailedRecords() - stats.getSuccessRecords(); int submittedRecords; if (remainingRecords - batchSize <= 0) { submittedRecords = remainingRecords; updateStats(0, remainingRecords); - this.cursor = null; - this.isDone = true; + this.cursor.set(null); + this.isDone.set(true); } else { submittedRecords = batchSize; String decodedCursor = RestUtil.decodeCursor(cursor); - this.cursor = - RestUtil.encodeCursor(String.valueOf(Integer.parseInt(decodedCursor) + batchSize)); + this.cursor.set( + RestUtil.encodeCursor(String.valueOf(Integer.parseInt(decodedCursor) + batchSize))); updateStats(0, batchSize); } IndexingError indexingError = @@ -123,12 +152,22 @@ public class PaginatedWebAnalyticEventDataSource @Override public void reset() { - cursor = null; - isDone = false; + cursor.set(null); + isDone.set(false); + } + + @Override + public ListFilter getFilter() { + return null; } @Override public void updateStats(int currentSuccess, int currentFailed) { getUpdatedStats(stats, currentSuccess, currentFailed); } + + @Override + public AtomicReference isDone() { + return isDone; + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/BulkSink.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/BulkSink.java new file mode 100644 index 00000000000..5491493cf49 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/BulkSink.java @@ -0,0 +1,16 @@ +package org.openmetadata.service.apps.bundles.searchIndex; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.openmetadata.schema.system.StepStats; + +public interface BulkSink { + void write(List entities, Map contextData) throws Exception; + + void updateStats(int currentSuccess, int currentFailed); + + StepStats getStats(); + + void close() throws IOException; +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchIndexSink.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchIndexSink.java new file mode 100644 index 00000000000..979e07e0044 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchIndexSink.java @@ -0,0 +1,265 @@ +package org.openmetadata.service.apps.bundles.searchIndex; + +import static org.openmetadata.schema.system.IndexingError.ErrorSource.SINK; +import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getErrorsFromBulkResponse; +import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats; + +import es.org.elasticsearch.ElasticsearchException; +import es.org.elasticsearch.action.DocWriteRequest; +import es.org.elasticsearch.action.bulk.BulkRequest; +import es.org.elasticsearch.action.bulk.BulkResponse; +import es.org.elasticsearch.action.update.UpdateRequest; +import es.org.elasticsearch.client.RequestOptions; +import es.org.elasticsearch.rest.RestStatus; +import es.org.elasticsearch.xcontent.XContentType; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadLocalRandom; +import lombok.extern.slf4j.Slf4j; +import org.glassfish.jersey.internal.util.ExceptionUtils; +import org.openmetadata.schema.EntityInterface; +import org.openmetadata.schema.EntityTimeSeriesInterface; +import org.openmetadata.schema.system.EntityError; +import org.openmetadata.schema.system.IndexingError; +import org.openmetadata.schema.system.StepStats; +import org.openmetadata.service.Entity; +import org.openmetadata.service.exception.SearchIndexException; +import org.openmetadata.service.search.SearchClient; +import org.openmetadata.service.search.models.IndexMapping; +import org.openmetadata.service.util.JsonUtils; + +@Slf4j +public class ElasticSearchIndexSink implements BulkSink, Closeable { + private final StepStats stats = new StepStats(); + private final SearchClient client; + private final long maxPayloadSizeInBytes; + private final int maxRetries; + private final long initialBackoffMillis; + private final long maxBackoffMillis; + private final Semaphore semaphore; + + public ElasticSearchIndexSink( + SearchClient client, + long maxPayloadSizeInBytes, + int maxConcurrentRequests, + int maxRetries, + long initialBackoffMillis, + long maxBackoffMillis) { + this.client = client; + this.maxPayloadSizeInBytes = maxPayloadSizeInBytes; + this.maxRetries = maxRetries; + this.initialBackoffMillis = initialBackoffMillis; + this.maxBackoffMillis = maxBackoffMillis; + this.semaphore = new Semaphore(maxConcurrentRequests); + } + + @Override + public void write(List entities, Map contextData) throws SearchIndexException { + String entityType = (String) contextData.get("entityType"); + LOG.debug( + "[ElasticSearchIndexSink] Processing {} entities of type {}", entities.size(), entityType); + + List entityErrorList = new ArrayList<>(); + List> requests = new ArrayList<>(); + long currentBatchSize = 0L; + + for (Object entity : entities) { + try { + DocWriteRequest request = convertEntityToRequest(entity, entityType); + long requestSize = estimateRequestSizeInBytes(request); + + if (currentBatchSize + requestSize > maxPayloadSizeInBytes) { + // Flush current batch + sendBulkRequest(requests, entityErrorList); + requests.clear(); + currentBatchSize = 0L; + } + + requests.add(request); + currentBatchSize += requestSize; + + } catch (Exception e) { + entityErrorList.add( + new EntityError() + .withMessage("Failed to convert entity to request: " + e.getMessage()) + .withEntity(entity.toString())); + LOG.error("Error converting entity to request", e); + } + } + + if (!requests.isEmpty()) { + sendBulkRequest(requests, entityErrorList); + } + + int totalEntities = entities.size(); + int failedEntities = entityErrorList.size(); + int successfulEntities = totalEntities - failedEntities; + updateStats(successfulEntities, failedEntities); + + if (!entityErrorList.isEmpty()) { + throw new SearchIndexException( + new IndexingError() + .withErrorSource(SINK) + .withSubmittedCount(totalEntities) + .withSuccessCount(successfulEntities) + .withFailedCount(failedEntities) + .withMessage(String.format("Issues in Sink to Elasticsearch: %s", entityErrorList)) + .withFailedEntities(entityErrorList)); + } + } + + private void sendBulkRequest(List> requests, List entityErrorList) + throws SearchIndexException { + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(requests); + + int attempt = 0; + long backoffMillis = initialBackoffMillis; + + while (attempt <= maxRetries) { + try { + semaphore.acquire(); + try { + BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT); + entityErrorList.addAll(getErrorsFromBulkResponse(response)); + break; // Success, exit retry loop + } finally { + semaphore.release(); + } + } catch (IOException e) { + if (isRetriableException(e)) { + attempt++; + LOG.warn( + "Bulk request failed with retriable exception, retrying attempt {}/{}", + attempt, + maxRetries); + sleepWithBackoff(backoffMillis); + backoffMillis = Math.min(backoffMillis * 2, maxBackoffMillis); + } else { + LOG.error("Bulk request failed with non-retriable exception", e); + throw new SearchIndexException(createIndexingError(requests.size(), e)); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Bulk request interrupted", e); + throw new SearchIndexException(createIndexingError(requests.size(), e)); + } catch (ElasticsearchException e) { + if (isRetriableStatusCode(e.status())) { + attempt++; + LOG.warn( + "Bulk request failed with status {}, retrying attempt {}/{}", + e.status(), + attempt, + maxRetries); + sleepWithBackoff(backoffMillis); + backoffMillis = Math.min(backoffMillis * 2, maxBackoffMillis); + } else { + LOG.error("Bulk request failed with non-retriable status {}", e.status(), e); + throw new SearchIndexException(createIndexingError(requests.size(), e)); + } + } + } + + if (attempt > maxRetries) { + throw new SearchIndexException( + new IndexingError() + .withErrorSource(SINK) + .withSubmittedCount(requests.size()) + .withSuccessCount(0) + .withFailedCount(requests.size()) + .withMessage("Exceeded maximum retries for bulk request")); + } + } + + private boolean isRetriableException(Exception e) { + return e instanceof IOException; + } + + private boolean isRetriableStatusCode(RestStatus status) { + return status == RestStatus.TOO_MANY_REQUESTS || status == RestStatus.SERVICE_UNAVAILABLE; + } + + private void sleepWithBackoff(long millis) { + try { + Thread.sleep(millis + ThreadLocalRandom.current().nextLong(0, millis)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Sleep interrupted during backoff", e); + } + } + + private IndexingError createIndexingError(int requestCount, Exception e) { + return new IndexingError() + .withErrorSource(SINK) + .withSubmittedCount(requestCount) + .withSuccessCount(0) + .withFailedCount(requestCount) + .withMessage(String.format("Issue in Sink to Elasticsearch: %s", e.getMessage())) + .withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e)); + } + + private DocWriteRequest convertEntityToRequest(Object entity, String entityType) { + if (entity instanceof EntityInterface) { + return getEntityInterfaceRequest(entityType, (EntityInterface) entity); + } else if (entity instanceof EntityTimeSeriesInterface) { + return getEntityTimeSeriesInterfaceReqeust(entityType, (EntityTimeSeriesInterface) entity); + } else { + throw new IllegalArgumentException("Unknown entity type: " + entity.getClass()); + } + } + + private UpdateRequest getEntityInterfaceRequest(String entityType, EntityInterface entity) { + IndexMapping indexMapping = Entity.getSearchRepository().getIndexMapping(entityType); + UpdateRequest updateRequest = + new UpdateRequest( + indexMapping.getIndexName(Entity.getSearchRepository().getClusterAlias()), + entity.getId().toString()); + updateRequest.doc( + JsonUtils.pojoToJson( + Objects.requireNonNull(Entity.buildSearchIndex(entityType, entity)) + .buildSearchIndexDoc()), + XContentType.JSON); + updateRequest.docAsUpsert(true); + return updateRequest; + } + + private UpdateRequest getEntityTimeSeriesInterfaceReqeust( + String entityType, EntityTimeSeriesInterface entity) { + IndexMapping indexMapping = Entity.getSearchRepository().getIndexMapping(entityType); + UpdateRequest updateRequest = + new UpdateRequest( + indexMapping.getIndexName(Entity.getSearchRepository().getClusterAlias()), + entity.getId().toString()); + updateRequest.doc( + JsonUtils.pojoToJson( + Objects.requireNonNull( + Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc())), + XContentType.JSON); + updateRequest.docAsUpsert(true); + return updateRequest; + } + + private long estimateRequestSizeInBytes(DocWriteRequest request) { + return new BulkRequest().add(request).estimatedSizeInBytes(); + } + + @Override + public void updateStats(int currentSuccess, int currentFailed) { + getUpdatedStats(stats, currentSuccess, currentFailed); + } + + @Override + public StepStats getStats() { + return stats; + } + + @Override + public void close() throws IOException { + client.close(); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchIndexSink.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchIndexSink.java new file mode 100644 index 00000000000..3c3ca23f162 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchIndexSink.java @@ -0,0 +1,271 @@ +package org.openmetadata.service.apps.bundles.searchIndex; + +import static org.openmetadata.schema.system.IndexingError.ErrorSource.SINK; +import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getErrorsFromBulkResponse; +import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadLocalRandom; +import lombok.extern.slf4j.Slf4j; +import org.glassfish.jersey.internal.util.ExceptionUtils; +import org.openmetadata.schema.EntityInterface; +import org.openmetadata.schema.EntityTimeSeriesInterface; +import org.openmetadata.schema.system.EntityError; +import org.openmetadata.schema.system.IndexingError; +import org.openmetadata.schema.system.StepStats; +import org.openmetadata.service.Entity; +import org.openmetadata.service.exception.SearchIndexException; +import org.openmetadata.service.search.SearchClient; +import org.openmetadata.service.search.models.IndexMapping; +import org.openmetadata.service.util.JsonUtils; +import os.org.opensearch.OpenSearchException; +import os.org.opensearch.action.DocWriteRequest; +import os.org.opensearch.action.bulk.BulkRequest; +import os.org.opensearch.action.bulk.BulkResponse; +import os.org.opensearch.action.update.UpdateRequest; +import os.org.opensearch.client.RequestOptions; +import os.org.opensearch.common.xcontent.XContentType; +import os.org.opensearch.rest.RestStatus; + +@Slf4j +public class OpenSearchIndexSink implements BulkSink, Closeable { + + private final StepStats stats = new StepStats(); + private final SearchClient client; + private final long maxPayloadSizeInBytes; + private final int maxRetries; + private final long initialBackoffMillis; + private final long maxBackoffMillis; + private final Semaphore semaphore; + + public OpenSearchIndexSink( + SearchClient client, + long maxPayloadSizeInBytes, + int maxConcurrentRequests, + int maxRetries, + long initialBackoffMillis, + long maxBackoffMillis) { + this.client = client; + this.maxPayloadSizeInBytes = maxPayloadSizeInBytes; + this.maxRetries = maxRetries; + this.initialBackoffMillis = initialBackoffMillis; + this.maxBackoffMillis = maxBackoffMillis; + this.semaphore = new Semaphore(maxConcurrentRequests); + } + + @Override + public void write(List entities, Map contextData) throws SearchIndexException { + String entityType = (String) contextData.get("entityType"); + LOG.debug( + "[OpenSearchIndexSink] Processing {} entities of type {}", entities.size(), entityType); + + List entityErrorList = new ArrayList<>(); + List> requests = new ArrayList<>(); + long currentBatchSize = 0L; + + // Convert entities to DocWriteRequests + for (Object entity : entities) { + try { + DocWriteRequest request = convertEntityToRequest(entity, entityType); + long requestSize = estimateRequestSizeInBytes(request); + + if (currentBatchSize + requestSize > maxPayloadSizeInBytes) { + // Flush current batch + sendBulkRequest(requests, entityErrorList); + requests.clear(); + currentBatchSize = 0L; + } + + requests.add(request); + currentBatchSize += requestSize; + + } catch (Exception e) { + entityErrorList.add( + new EntityError() + .withMessage("Failed to convert entity to request: " + e.getMessage()) + .withEntity(entity.toString())); + LOG.error("Error converting entity to request", e); + } + } + + // Send any remaining requests + if (!requests.isEmpty()) { + sendBulkRequest(requests, entityErrorList); + } + + // Update stats + int totalEntities = entities.size(); + int failedEntities = entityErrorList.size(); + int successfulEntities = totalEntities - failedEntities; + updateStats(successfulEntities, failedEntities); + + // Handle errors + if (!entityErrorList.isEmpty()) { + throw new SearchIndexException( + new IndexingError() + .withErrorSource(SINK) + .withSubmittedCount(totalEntities) + .withSuccessCount(successfulEntities) + .withFailedCount(failedEntities) + .withMessage(String.format("Issues in Sink to OpenSearch: %s", entityErrorList)) + .withFailedEntities(entityErrorList)); + } + } + + private void sendBulkRequest(List> requests, List entityErrorList) + throws SearchIndexException { + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(requests); + + int attempt = 0; + long backoffMillis = initialBackoffMillis; + + while (attempt <= maxRetries) { + try { + semaphore.acquire(); + try { + BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT); + entityErrorList.addAll(getErrorsFromBulkResponse(response)); + break; // Success, exit retry loop + } finally { + semaphore.release(); + } + } catch (IOException e) { + if (isRetriableException(e)) { + attempt++; + LOG.warn( + "Bulk request failed with retriable exception, retrying attempt {}/{}", + attempt, + maxRetries); + sleepWithBackoff(backoffMillis); + backoffMillis = Math.min(backoffMillis * 2, maxBackoffMillis); + } else { + LOG.error("Bulk request failed with non-retriable exception", e); + throw new SearchIndexException(createIndexingError(requests.size(), e)); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Bulk request interrupted", e); + throw new SearchIndexException(createIndexingError(requests.size(), e)); + } catch (OpenSearchException e) { + if (isRetriableStatusCode(e.status())) { + attempt++; + LOG.warn( + "Bulk request failed with status {}, retrying attempt {}/{}", + e.status(), + attempt, + maxRetries); + sleepWithBackoff(backoffMillis); + backoffMillis = Math.min(backoffMillis * 2, maxBackoffMillis); + } else { + LOG.error("Bulk request failed with non-retriable status {}", e.status(), e); + throw new SearchIndexException(createIndexingError(requests.size(), e)); + } + } + } + + if (attempt > maxRetries) { + throw new SearchIndexException( + new IndexingError() + .withErrorSource(SINK) + .withSubmittedCount(requests.size()) + .withSuccessCount(0) + .withFailedCount(requests.size()) + .withMessage("Exceeded maximum retries for bulk request")); + } + } + + private boolean isRetriableException(Exception e) { + return e instanceof IOException; + } + + private boolean isRetriableStatusCode(RestStatus status) { + return status == RestStatus.TOO_MANY_REQUESTS || status == RestStatus.SERVICE_UNAVAILABLE; + } + + private void sleepWithBackoff(long millis) { + try { + Thread.sleep(millis + ThreadLocalRandom.current().nextLong(0, millis)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Sleep interrupted during backoff", e); + } + } + + private IndexingError createIndexingError(int requestCount, Exception e) { + return new IndexingError() + .withErrorSource(SINK) + .withSubmittedCount(requestCount) + .withSuccessCount(0) + .withFailedCount(requestCount) + .withMessage(String.format("Issue in Sink to OpenSearch: %s", e.getMessage())) + .withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e)); + } + + private DocWriteRequest convertEntityToRequest(Object entity, String entityType) { + if (entity instanceof EntityInterface) { + return getEntityInterfaceRequest((EntityInterface) entity, entityType); + } else if (entity instanceof EntityTimeSeriesInterface) { + return getEntityTimeSeriesInterfaceReqeust(entityType, (EntityTimeSeriesInterface) entity); + } else { + throw new IllegalArgumentException("Unknown entity type: " + entity.getClass()); + } + } + + private DocWriteRequest getEntityInterfaceRequest(EntityInterface entity, String entityType) { + IndexMapping indexMapping = Entity.getSearchRepository().getIndexMapping(entityType); + UpdateRequest updateRequest = + new UpdateRequest( + indexMapping.getIndexName(Entity.getSearchRepository().getClusterAlias()), + entity.getId().toString()); + updateRequest.doc( + JsonUtils.pojoToJson( + Objects.requireNonNull(Entity.buildSearchIndex(entityType, entity)) + .buildSearchIndexDoc()), + XContentType.JSON); + updateRequest.docAsUpsert(true); + return updateRequest; + } + + private UpdateRequest getEntityTimeSeriesInterfaceReqeust( + String entityType, EntityTimeSeriesInterface entity) { + IndexMapping indexMapping = Entity.getSearchRepository().getIndexMapping(entityType); + UpdateRequest updateRequest = + new UpdateRequest( + indexMapping.getIndexName(Entity.getSearchRepository().getClusterAlias()), + entity.getId().toString()); + updateRequest.doc( + JsonUtils.pojoToJson( + Objects.requireNonNull( + Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc())), + XContentType.JSON); + updateRequest.docAsUpsert(true); + return updateRequest; + } + + private long estimateRequestSizeInBytes(DocWriteRequest request) { + return new BulkRequest().add(request).estimatedSizeInBytes(); + } + + @Override + public void updateStats(int currentSuccess, int currentFailed) { + getUpdatedStats(stats, currentSuccess, currentFailed); + } + + @Override + public StepStats getStats() { + return stats; + } + + @Override + public void close() throws IOException { + // Close resources if needed + client.close(); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java index a6c1ccdf8a2..8af69e78fee 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java @@ -1,25 +1,70 @@ package org.openmetadata.service.apps.bundles.searchIndex; -import static org.openmetadata.schema.system.IndexingError.ErrorSource.READER; +import static org.openmetadata.service.Entity.API_COLLCECTION; +import static org.openmetadata.service.Entity.API_ENDPOINT; +import static org.openmetadata.service.Entity.API_SERVICE; +import static org.openmetadata.service.Entity.CHART; +import static org.openmetadata.service.Entity.CLASSIFICATION; +import static org.openmetadata.service.Entity.CONTAINER; +import static org.openmetadata.service.Entity.DASHBOARD; +import static org.openmetadata.service.Entity.DASHBOARD_DATA_MODEL; +import static org.openmetadata.service.Entity.DASHBOARD_SERVICE; +import static org.openmetadata.service.Entity.DATABASE; +import static org.openmetadata.service.Entity.DATABASE_SCHEMA; +import static org.openmetadata.service.Entity.DATABASE_SERVICE; +import static org.openmetadata.service.Entity.DOMAIN; +import static org.openmetadata.service.Entity.ENTITY_REPORT_DATA; +import static org.openmetadata.service.Entity.GLOSSARY; +import static org.openmetadata.service.Entity.GLOSSARY_TERM; +import static org.openmetadata.service.Entity.INGESTION_PIPELINE; +import static org.openmetadata.service.Entity.MESSAGING_SERVICE; +import static org.openmetadata.service.Entity.METRICS; +import static org.openmetadata.service.Entity.MLMODEL; +import static org.openmetadata.service.Entity.MLMODEL_SERVICE; +import static org.openmetadata.service.Entity.PIPELINE; +import static org.openmetadata.service.Entity.PIPELINE_SERVICE; +import static org.openmetadata.service.Entity.QUERY; +import static org.openmetadata.service.Entity.SEARCH_INDEX; +import static org.openmetadata.service.Entity.SEARCH_SERVICE; +import static org.openmetadata.service.Entity.STORAGE_SERVICE; +import static org.openmetadata.service.Entity.STORED_PROCEDURE; +import static org.openmetadata.service.Entity.TABLE; +import static org.openmetadata.service.Entity.TAG; +import static org.openmetadata.service.Entity.TEAM; +import static org.openmetadata.service.Entity.TEST_CASE; +import static org.openmetadata.service.Entity.TEST_CASE_RESOLUTION_STATUS; +import static org.openmetadata.service.Entity.TEST_CASE_RESULT; +import static org.openmetadata.service.Entity.TEST_SUITE; +import static org.openmetadata.service.Entity.TOPIC; +import static org.openmetadata.service.Entity.USER; +import static org.openmetadata.service.Entity.WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA; +import static org.openmetadata.service.Entity.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA; import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.APP_RUN_STATS; +import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.WEBSOCKET_STATUS_CHANNEL; import static org.openmetadata.service.apps.scheduler.AppScheduler.ON_DEMAND_JOB; -import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_NAME_LIST_KEY; +import static org.openmetadata.service.socket.WebSocketManager.SEARCH_INDEX_JOB_BROADCAST_CHANNEL; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY; -import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getTotalRequestToProcess; +import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isDataInsightIndex; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.exception.ExceptionUtils; +import org.jetbrains.annotations.NotNull; import org.openmetadata.common.utils.CommonUtil; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.EntityTimeSeriesInterface; @@ -33,71 +78,71 @@ import org.openmetadata.schema.system.EventPublisherJob; import org.openmetadata.schema.system.IndexingError; import org.openmetadata.schema.system.Stats; import org.openmetadata.schema.system.StepStats; +import org.openmetadata.service.Entity; import org.openmetadata.service.apps.AbstractNativeApplication; import org.openmetadata.service.exception.SearchIndexException; import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.jdbi3.EntityRepository; +import org.openmetadata.service.jdbi3.EntityTimeSeriesRepository; +import org.openmetadata.service.jdbi3.ListFilter; import org.openmetadata.service.search.SearchRepository; -import org.openmetadata.service.search.elasticsearch.ElasticSearchEntitiesProcessor; -import org.openmetadata.service.search.elasticsearch.ElasticSearchEntityTimeSeriesProcessor; -import org.openmetadata.service.search.elasticsearch.ElasticSearchIndexSink; import org.openmetadata.service.search.models.IndexMapping; -import org.openmetadata.service.search.opensearch.OpenSearchEntitiesProcessor; -import org.openmetadata.service.search.opensearch.OpenSearchEntityTimeSeriesProcessor; -import org.openmetadata.service.search.opensearch.OpenSearchIndexSink; import org.openmetadata.service.socket.WebSocketManager; +import org.openmetadata.service.util.FullyQualifiedName; import org.openmetadata.service.util.JsonUtils; +import org.openmetadata.service.util.RestUtil; import org.openmetadata.service.util.ResultList; -import org.openmetadata.service.workflows.interfaces.Processor; -import org.openmetadata.service.workflows.interfaces.Sink; import org.openmetadata.service.workflows.interfaces.Source; import org.openmetadata.service.workflows.searchIndex.PaginatedEntitiesSource; import org.openmetadata.service.workflows.searchIndex.PaginatedEntityTimeSeriesSource; import org.quartz.JobExecutionContext; @Slf4j -@SuppressWarnings("unused") public class SearchIndexApp extends AbstractNativeApplication { private static final String ALL = "all"; - private static final Set ALL_ENTITIES = + public static final Set ALL_ENTITIES = Set.of( - "table", - "dashboard", - "topic", - "pipeline", - "ingestionPipeline", - "searchIndex", - "user", - "team", - "glossary", - "glossaryTerm", - "mlmodel", - "tag", - "classification", - "query", - "container", - "database", - "databaseSchema", - "testCase", - "testSuite", - "chart", - "dashboardDataModel", - "databaseService", - "messagingService", - "dashboardService", - "pipelineService", - "mlmodelService", - "searchService", - "entityReportData", - "webAnalyticEntityViewReportData", - "webAnalyticUserActivityReportData", - "domain", - "storedProcedure", - "storageService", - "testCaseResolutionStatus", - "apiService", - "apiEndpoint", - "apiCollection"); + TABLE, + DASHBOARD, + TOPIC, + PIPELINE, + INGESTION_PIPELINE, + SEARCH_INDEX, + USER, + TEAM, + GLOSSARY, + GLOSSARY_TERM, + MLMODEL, + TAG, + CLASSIFICATION, + QUERY, + CONTAINER, + DATABASE, + DATABASE_SCHEMA, + TEST_CASE, + TEST_SUITE, + CHART, + DASHBOARD_DATA_MODEL, + DATABASE_SERVICE, + MESSAGING_SERVICE, + DASHBOARD_SERVICE, + PIPELINE_SERVICE, + MLMODEL_SERVICE, + SEARCH_SERVICE, + ENTITY_REPORT_DATA, + WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA, + WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA, + DOMAIN, + STORED_PROCEDURE, + STORAGE_SERVICE, + TEST_CASE_RESOLUTION_STATUS, + TEST_CASE_RESULT, + API_SERVICE, + API_ENDPOINT, + API_COLLCECTION, + METRICS); + public static final Set TIME_SERIES_ENTITIES = Set.of( ReportData.ReportDataType.ENTITY_REPORT_DATA.value(), @@ -105,16 +150,26 @@ public class SearchIndexApp extends AbstractNativeApplication { ReportData.ReportDataType.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA.value(), ReportData.ReportDataType.WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA.value(), ReportData.ReportDataType.AGGREGATED_COST_ANALYSIS_REPORT_DATA.value(), - "testCaseResolutionStatus"); - private final List paginatedSources = new ArrayList<>(); - private Processor entityProcessor; - private Processor entityTimeSeriesProcessor; - private Sink searchIndexSink; + TEST_CASE_RESOLUTION_STATUS, + TEST_CASE_RESULT); - @Getter EventPublisherJob jobData; - private final Object jobDataLock = new Object(); // Dedicated final lock object + // Constants to replace magic numbers + private static final int DEFAULT_PAYLOAD_SIZE = 100; + private static final int DEFAULT_BATCH_SIZE = 5; + private static final int DEFAULT_MAX_RETRIES = 1000; + private static final int DEFAULT_TIMEOUT = 10000; + private static final int MAX_CONSUMERS = 10; + private BulkSink searchIndexSink; + + @Getter private EventPublisherJob jobData; + private final Object jobDataLock = new Object(); private volatile boolean stopped = false; - @Getter private volatile ExecutorService executorService; + private ExecutorService producerExecutor; + private ExecutorService consumerExecutor; + private ExecutorService entityReaderExecutor; + private final BlockingQueue> taskQueue = new LinkedBlockingQueue<>(); + private final AtomicReference searchIndexStats = new AtomicReference<>(); + private final AtomicReference batchSize = new AtomicReference<>(DEFAULT_BATCH_SIZE); public SearchIndexApp(CollectionDAO collectionDAO, SearchRepository searchRepository) { super(collectionDAO, searchRepository); @@ -123,102 +178,92 @@ public class SearchIndexApp extends AbstractNativeApplication { @Override public void init(App app) { super.init(app); - // request for reindexing EventPublisherJob request = JsonUtils.convertValue(app.getAppConfiguration(), EventPublisherJob.class) .withStats(new Stats()); + if (request.getEntities().contains(ALL)) { request.setEntities(ALL_ENTITIES); } + jobData = request; + LOG.info("Initialized SearchIndexApp with entities: {}", jobData.getEntities()); } @Override public void startApp(JobExecutionContext jobExecutionContext) { try { - initializeJob(); - LOG.info("Executing Reindexing Job with JobData : {}", jobData); - jobData.setStatus(EventPublisherJob.Status.RUNNING); + initializeJob(jobExecutionContext); String runType = (String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType"); - if (!runType.equals(ON_DEMAND_JOB)) { + if (!ON_DEMAND_JOB.equals(runType)) { jobData.setRecreateIndex(false); } + performReindex(jobExecutionContext); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + handleJobFailure(ex); } catch (Exception ex) { - IndexingError indexingError = - new IndexingError() - .withErrorSource(IndexingError.ErrorSource.JOB) - .withMessage( - String.format( - "Reindexing Job Has Encountered an Exception. %n Job Data: %s, %n Stack : %s ", - jobData.toString(), ExceptionUtils.getStackTrace(ex))); - LOG.error(indexingError.getMessage()); - jobData.setStatus(EventPublisherJob.Status.RUNNING); - jobData.setFailure(indexingError); + handleJobFailure(ex); } finally { sendUpdates(jobExecutionContext); } } + /** + * Cleans up stale jobs from previous runs. + */ private void cleanUpStaleJobsFromRuns() { try { collectionDAO .appExtensionTimeSeriesDao() .markStaleEntriesStopped(getApp().getId().toString()); + LOG.debug("Cleaned up stale jobs."); } catch (Exception ex) { - LOG.error("Failed in Marking Stale Entries Stopped."); + LOG.error("Failed in marking stale entries as stopped.", ex); } } - private void initializeJob() { - // Remove any Stale Jobs + private void initializeJob(JobExecutionContext jobExecutionContext) { cleanUpStaleJobsFromRuns(); - int totalRecords = getTotalRequestToProcess(jobData.getEntities(), collectionDAO); - this.jobData.setStats( - new Stats() - .withJobStats( - new StepStats() - .withTotalRecords(totalRecords) - .withFailedRecords(0) - .withSuccessRecords(0))); - jobData - .getEntities() - .forEach( - entityType -> { - if (!TIME_SERIES_ENTITIES.contains(entityType)) { - List fields = List.of("*"); - PaginatedEntitiesSource source = - new PaginatedEntitiesSource(entityType, jobData.getBatchSize(), fields); - if (!CommonUtil.nullOrEmpty(jobData.getAfterCursor())) { - source.setCursor(jobData.getAfterCursor()); - } - paginatedSources.add(source); - } else { - PaginatedEntityTimeSeriesSource source = - new PaginatedEntityTimeSeriesSource( - entityType, jobData.getBatchSize(), List.of("*")); - if (!CommonUtil.nullOrEmpty(jobData.getAfterCursor())) { - source.setCursor(jobData.getAfterCursor()); - } - paginatedSources.add(source); - } - }); - if (searchRepository.getSearchType().equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) { - this.entityProcessor = new OpenSearchEntitiesProcessor(totalRecords); - this.entityTimeSeriesProcessor = new OpenSearchEntityTimeSeriesProcessor(totalRecords); + LOG.info("Executing Reindexing Job with JobData: {}", jobData); + batchSize.set(jobData.getBatchSize()); + jobData.setStatus(EventPublisherJob.Status.RUNNING); + + LOG.debug("Initializing job statistics."); + searchIndexStats.set(initializeTotalRecords(jobData.getEntities())); + jobData.setStats(searchIndexStats.get()); + sendUpdates(jobExecutionContext); + + ElasticSearchConfiguration.SearchType searchType = searchRepository.getSearchType(); + LOG.info("Initializing searchIndexSink with search type: {}", searchType); + + if (searchType.equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) { this.searchIndexSink = - new OpenSearchIndexSink(searchRepository, totalRecords, jobData.getPayLoadSize()); + new OpenSearchIndexSink( + searchRepository.getSearchClient(), + jobData.getPayLoadSize(), + DEFAULT_PAYLOAD_SIZE, + DEFAULT_BATCH_SIZE, + DEFAULT_MAX_RETRIES, + DEFAULT_TIMEOUT); + LOG.info("Initialized OpenSearchIndexSink."); } else { - this.entityProcessor = new ElasticSearchEntitiesProcessor(totalRecords); - this.entityTimeSeriesProcessor = new ElasticSearchEntityTimeSeriesProcessor(totalRecords); this.searchIndexSink = - new ElasticSearchIndexSink(searchRepository, totalRecords, jobData.getPayLoadSize()); + new ElasticSearchIndexSink( + searchRepository.getSearchClient(), + jobData.getPayLoadSize(), + DEFAULT_PAYLOAD_SIZE, + DEFAULT_BATCH_SIZE, + DEFAULT_MAX_RETRIES, + DEFAULT_TIMEOUT); + LOG.info("Initialized ElasticSearchIndexSink."); } } - public void updateRecordToDb(JobExecutionContext jobExecutionContext) { + public void updateRecordToDbAndNotify(JobExecutionContext jobExecutionContext) { AppRunRecord appRecord = getJobRecord(jobExecutionContext); appRecord.setStatus(AppRunRecord.Status.fromValue(jobData.getStatus().value())); @@ -231,257 +276,423 @@ public class SearchIndexApp extends AbstractNativeApplication { new SuccessContext().withAdditionalProperty("stats", jobData.getStats())); } + if (WebSocketManager.getInstance() != null) { + WebSocketManager.getInstance() + .broadCastMessageToAll( + SEARCH_INDEX_JOB_BROADCAST_CHANNEL, JsonUtils.pojoToJson(appRecord)); + LOG.debug("Broad-casted job updates via WebSocket."); + } + pushAppStatusUpdates(jobExecutionContext, appRecord, true); + LOG.debug("Updated AppRunRecord in DB: {}", appRecord); } - private void performReindex(JobExecutionContext jobExecutionContext) { - if (jobData.getStats() == null) { - jobData.setStats(new Stats()); - } + private void performReindex(JobExecutionContext jobExecutionContext) throws InterruptedException { + int numProducers = jobData.getEntities().size(); + int numConsumers = calculateNumberOfConsumers(); + LOG.info("Starting reindexing with {} producers and {} consumers.", numProducers, numConsumers); - if (executorService == null || executorService.isShutdown() || executorService.isTerminated()) { - int numThreads = - Math.min(paginatedSources.size(), Runtime.getRuntime().availableProcessors()); - this.executorService = Executors.newFixedThreadPool(numThreads); - LOG.debug("Initialized new ExecutorService with {} threads.", numThreads); - } - List> futures = new ArrayList<>(); - - for (Source paginatedSource : paginatedSources) { - Future future = - executorService.submit( - () -> { - String entityType = paginatedSource.getEntityType(); - Map contextData = new HashMap<>(); - contextData.put(ENTITY_TYPE_KEY, entityType); - try { - reCreateIndexes(entityType); - contextData.put(ENTITY_TYPE_KEY, entityType); - - while (!paginatedSource.isDone()) { - Object resultList = paginatedSource.readNext(null); - if (resultList == null) { - break; - } - - if (!TIME_SERIES_ENTITIES.contains(entityType)) { - List entityNames = - getEntityNameFromEntity( - (ResultList) resultList, entityType); - contextData.put(ENTITY_NAME_LIST_KEY, entityNames); - processEntity( - (ResultList) resultList, - contextData, - paginatedSource); - } else { - List entityNames = - getEntityNameFromEntityTimeSeries( - (ResultList) resultList, - entityType); - contextData.put(ENTITY_NAME_LIST_KEY, entityNames); - processEntityTimeSeries( - (ResultList) resultList, - contextData, - paginatedSource); - } - synchronized (jobDataLock) { - updateStats(entityType, paginatedSource.getStats()); - } - sendUpdates(jobExecutionContext); - } - - } catch (SearchIndexException e) { - synchronized (jobDataLock) { - jobData.setStatus(EventPublisherJob.Status.RUNNING); - jobData.setFailure(e.getIndexingError()); - paginatedSource.updateStats( - e.getIndexingError().getSuccessCount(), - e.getIndexingError().getFailedCount()); - updateStats(entityType, paginatedSource.getStats()); - } - sendUpdates(jobExecutionContext); - } catch (Exception e) { - synchronized (jobDataLock) { - jobData.setStatus(EventPublisherJob.Status.FAILED); - jobData.setFailure( - new IndexingError() - .withErrorSource(IndexingError.ErrorSource.JOB) - .withMessage(e.getMessage())); - } - sendUpdates(jobExecutionContext); - LOG.error("Unexpected error during reindexing for entity {}", entityType, e); - } - }); - - futures.add(future); - } - - executorService.shutdown(); + producerExecutor = Executors.newFixedThreadPool(numProducers); + consumerExecutor = Executors.newFixedThreadPool(numConsumers); + entityReaderExecutor = Executors.newCachedThreadPool(); + CountDownLatch producerLatch = new CountDownLatch(numProducers); try { - boolean allTasksCompleted = executorService.awaitTermination(1, TimeUnit.HOURS); - if (!allTasksCompleted) { - LOG.warn("Reindexing tasks did not complete within the expected time."); - executorService.shutdownNow(); + for (String entityType : jobData.getEntities()) { + producerExecutor.submit( + () -> { + try { + reCreateIndexes(entityType); + processEntityType(entityType); + } catch (Exception e) { + LOG.error("Error processing entity type {}", entityType, e); + } finally { + producerLatch.countDown(); + } + }); } - } catch (InterruptedException e) { - LOG.error("Reindexing was interrupted", e); - executorService.shutdownNow(); - Thread.currentThread().interrupt(); - } - for (Future future : futures) { + for (int i = 0; i < numConsumers; i++) { + consumerExecutor.submit( + () -> { + try { + consumeTasks(jobExecutionContext); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Consumer thread interrupted."); + } + }); + } + + producerLatch.await(); + sendPoisonPills(numConsumers); + } catch (Exception e) { + LOG.error("Error during reindexing process.", e); + throw e; + } finally { + shutdownExecutor(producerExecutor, "ProducerExecutor", 1, TimeUnit.HOURS); + shutdownExecutor(consumerExecutor, "ConsumerExecutor", 20, TimeUnit.SECONDS); + shutdownExecutor(entityReaderExecutor, "ReaderExecutor", 20, TimeUnit.SECONDS); + } + } + + private int calculateNumberOfConsumers() { + return Math.min(Runtime.getRuntime().availableProcessors(), MAX_CONSUMERS); + } + + private void consumeTasks(JobExecutionContext jobExecutionContext) throws InterruptedException { + while (true) { + IndexingTask task = taskQueue.take(); + LOG.info( + "Consuming Indexing Task for entityType: {}, entity offset : {}", + task.entityType(), + task.currentEntityOffset()); + if (task == IndexingTask.POISON_PILL) { + LOG.debug("Received POISON_PILL. Consumer thread terminating."); + break; + } + processTask(task, jobExecutionContext); + } + } + + /** + * Sends POISON_PILLs to signal consumer threads to terminate. + * + * @param numConsumers The number of consumers to signal. + * @throws InterruptedException If the thread is interrupted while waiting. + */ + private void sendPoisonPills(int numConsumers) throws InterruptedException { + for (int i = 0; i < numConsumers; i++) { + taskQueue.put(IndexingTask.POISON_PILL); + } + LOG.debug("Sent {} POISON_PILLs to consumers.", numConsumers); + } + + /** + * Shuts down an executor service gracefully. + * + * @param executor The executor service to shut down. + * @param name The name of the executor for logging. + * @param timeout The timeout duration. + * @param unit The time unit of the timeout. + */ + private void shutdownExecutor( + ExecutorService executor, String name, long timeout, TimeUnit unit) { + if (executor != null && !executor.isShutdown()) { + executor.shutdown(); try { - future.get(); + if (!executor.awaitTermination(timeout, unit)) { + executor.shutdownNow(); + LOG.warn("{} did not terminate within the specified timeout.", name); + } else { + LOG.info("{} terminated successfully.", name); + } } catch (InterruptedException e) { - LOG.error("Task was interrupted", e); + LOG.error("Interrupted while waiting for {} to terminate.", name, e); + executor.shutdownNow(); Thread.currentThread().interrupt(); - } catch (ExecutionException e) { - LOG.error("Exception in reindexing task", e.getCause()); } } } - private List getEntityNameFromEntity( - ResultList resultList, String entityType) { - return resultList.getData().stream() - .map(entity -> String.format("%s %s", entityType, entity.getId())) - .toList(); + private void handleJobFailure(Exception ex) { + IndexingError indexingError = + new IndexingError() + .withErrorSource(IndexingError.ErrorSource.JOB) + .withMessage( + String.format( + "Reindexing Job Has Encountered an Exception.%nJob Data: %s,%nStack: %s", + jobData.toString(), ExceptionUtils.getStackTrace(ex))); + LOG.error(indexingError.getMessage(), ex); + jobData.setStatus(EventPublisherJob.Status.FAILED); + jobData.setFailure(indexingError); } - private List getEntityNameFromEntityTimeSeries( - ResultList resultList, String entityType) { - return resultList.getData().stream() - .map(entity -> String.format("%s %s", entityType, entity.getId())) - .toList(); - } - - private void processEntity( - ResultList resultList, - Map contextData, - Source paginatedSource) - throws SearchIndexException { - if (!resultList.getData().isEmpty()) { - searchIndexSink.write(entityProcessor.process(resultList, contextData), contextData); - if (!resultList.getErrors().isEmpty()) { - throw new SearchIndexException( - new IndexingError() - .withErrorSource(READER) - .withLastFailedCursor(paginatedSource.getLastFailedCursor()) - .withSubmittedCount(paginatedSource.getBatchSize()) - .withSuccessCount(resultList.getData().size()) - .withFailedCount(resultList.getErrors().size()) - .withMessage( - "Issues in Reading A Batch For Entities. Check Errors Corresponding to Entities.") - .withFailedEntities(resultList.getErrors())); - } - paginatedSource.updateStats(resultList.getData().size(), 0); + public synchronized void updateStats(String entityType, StepStats currentEntityStats) { + Stats jobDataStats = jobData.getStats(); + if (jobDataStats.getEntityStats() == null) { + jobDataStats.setEntityStats(new StepStats()); } + + StepStats existingEntityStats = + (StepStats) jobDataStats.getEntityStats().getAdditionalProperties().get(entityType); + if (existingEntityStats == null) { + jobDataStats.getEntityStats().getAdditionalProperties().put(entityType, currentEntityStats); + LOG.debug("Initialized StepStats for entityType '{}': {}", entityType, currentEntityStats); + } else { + accumulateStepStats(existingEntityStats, currentEntityStats); + LOG.debug( + "Accumulated StepStats for entityType '{}': Success - {}, Failed - {}", + entityType, + existingEntityStats.getSuccessRecords(), + existingEntityStats.getFailedRecords()); + } + + StepStats jobStats = jobDataStats.getJobStats(); + if (jobStats == null) { + jobStats = new StepStats(); + jobDataStats.setJobStats(jobStats); + } + + accumulateStepStats(jobStats, currentEntityStats); + LOG.debug( + "Updated jobStats: Success - {}, Failed - {}", + jobStats.getSuccessRecords(), + jobStats.getFailedRecords()); + + jobData.setStats(jobDataStats); } - private void processEntityTimeSeries( - ResultList resultList, - Map contextData, - Source paginatedSource) - throws SearchIndexException { - if (!resultList.getData().isEmpty()) { - searchIndexSink.write( - entityTimeSeriesProcessor.process(resultList, contextData), contextData); - if (!resultList.getErrors().isEmpty()) { - throw new SearchIndexException( - new IndexingError() - .withErrorSource(READER) - .withLastFailedCursor(paginatedSource.getLastFailedCursor()) - .withSubmittedCount(paginatedSource.getBatchSize()) - .withSuccessCount(resultList.getData().size()) - .withFailedCount(resultList.getErrors().size()) - .withMessage( - "Issues in Reading A Batch For Entities. Check Errors Corresponding to Entities.") - .withFailedEntities(resultList.getErrors())); + private void accumulateStepStats(StepStats target, StepStats source) { + if (target == null || source == null) { + return; + } + target.setTotalRecords(target.getTotalRecords() + source.getTotalRecords()); + target.setSuccessRecords(target.getSuccessRecords() + source.getSuccessRecords()); + target.setFailedRecords(target.getFailedRecords() + source.getFailedRecords()); + } + + public synchronized Stats initializeTotalRecords(Set entities) { + Stats jobDataStats = jobData.getStats(); + if (jobDataStats.getEntityStats() == null) { + jobDataStats.setEntityStats(new StepStats()); + LOG.debug("Initialized entityStats map."); + } + + int total = 0; + for (String entityType : entities) { + int entityTotal = getEntityTotal(entityType); + total += entityTotal; + + StepStats entityStats = new StepStats(); + entityStats.setTotalRecords(entityTotal); + entityStats.setSuccessRecords(0); + entityStats.setFailedRecords(0); + + jobDataStats.getEntityStats().getAdditionalProperties().put(entityType, entityStats); + LOG.debug("Set Total Records for entityType '{}': {}", entityType, entityTotal); + } + + StepStats jobStats = jobDataStats.getJobStats(); + if (jobStats == null) { + jobStats = new StepStats(); + jobDataStats.setJobStats(jobStats); + LOG.debug("Initialized jobStats."); + } + jobStats.setTotalRecords(total); + LOG.debug("Set job-level Total Records: {}", jobStats.getTotalRecords()); + + jobData.setStats(jobDataStats); + return jobDataStats; + } + + private int getEntityTotal(String entityType) { + try { + if (!TIME_SERIES_ENTITIES.contains(entityType)) { + EntityRepository repository = Entity.getEntityRepository(entityType); + return repository.getDao().listTotalCount(); + } else { + EntityTimeSeriesRepository repository; + ListFilter listFilter = new ListFilter(null); + if (isDataInsightIndex(entityType)) { + listFilter.addQueryParam("entityFQNHash", FullyQualifiedName.buildHash(entityType)); + repository = Entity.getEntityTimeSeriesRepository(Entity.ENTITY_REPORT_DATA); + } else { + repository = Entity.getEntityTimeSeriesRepository(entityType); + } + return repository.getTimeSeriesDao().listCount(listFilter); } - paginatedSource.updateStats(resultList.getData().size(), 0); + } catch (Exception e) { + LOG.debug("Error while getting total entities to index for '{}'", entityType, e); + return 0; } } private void sendUpdates(JobExecutionContext jobExecutionContext) { try { jobExecutionContext.getJobDetail().getJobDataMap().put(APP_RUN_STATS, jobData.getStats()); - updateRecordToDb(jobExecutionContext); - if (WebSocketManager.getInstance() != null) { - WebSocketManager.getInstance() - .broadCastMessageToAll( - WebSocketManager.JOB_STATUS_BROADCAST_CHANNEL, JsonUtils.pojoToJson(jobData)); - } + jobExecutionContext + .getJobDetail() + .getJobDataMap() + .put(WEBSOCKET_STATUS_CHANNEL, SEARCH_INDEX_JOB_BROADCAST_CHANNEL); + updateRecordToDbAndNotify(jobExecutionContext); } catch (Exception ex) { LOG.error("Failed to send updated stats with WebSocket", ex); } } - public void updateStats(String entityType, StepStats currentEntityStats) { - Stats jobDataStats = jobData.getStats(); - - StepStats entityLevelStats = jobDataStats.getEntityStats(); - if (entityLevelStats == null) { - entityLevelStats = - new StepStats().withTotalRecords(null).withFailedRecords(null).withSuccessRecords(null); - } - entityLevelStats.withAdditionalProperty(entityType, currentEntityStats); - - StepStats stats = jobData.getStats().getJobStats(); - if (stats == null) { - stats = - new StepStats() - .withTotalRecords(getTotalRequestToProcess(jobData.getEntities(), collectionDAO)); - } - - stats.setSuccessRecords( - entityLevelStats.getAdditionalProperties().values().stream() - .map(s -> (StepStats) s) - .mapToInt(StepStats::getSuccessRecords) - .sum()); - stats.setFailedRecords( - entityLevelStats.getAdditionalProperties().values().stream() - .map(s -> (StepStats) s) - .mapToInt(StepStats::getFailedRecords) - .sum()); - - jobDataStats.setJobStats(stats); - jobDataStats.setEntityStats(entityLevelStats); - - jobData.setStats(jobDataStats); - } - - private void reCreateIndexes(String entityType) { + private void reCreateIndexes(String entityType) throws SearchIndexException { if (Boolean.FALSE.equals(jobData.getRecreateIndex())) { + LOG.debug("RecreateIndex is false. Skipping index recreation for '{}'.", entityType); return; } - IndexMapping indexType = searchRepository.getIndexMapping(entityType); - searchRepository.deleteIndex(indexType); - searchRepository.createIndex(indexType); + try { + IndexMapping indexType = searchRepository.getIndexMapping(entityType); + searchRepository.deleteIndex(indexType); + searchRepository.createIndex(indexType); + LOG.info("Recreated index for entityType '{}'.", entityType); + } catch (Exception e) { + LOG.error("Failed to recreate index for entityType '{}'.", entityType, e); + throw new SearchIndexException(e); + } } + @SuppressWarnings("unused") public void stopJob() { LOG.info("Stopping reindexing job."); stopped = true; - if (executorService != null && !executorService.isShutdown()) { - List awaitingTasks = executorService.shutdownNow(); - LOG.info( - "ExecutorService has been shutdown. Awaiting termination. {} tasks were awaiting execution.", - awaitingTasks.size()); + shutdownExecutor(producerExecutor, "ProducerExecutor", 60, TimeUnit.SECONDS); + shutdownExecutor(consumerExecutor, "ConsumerExecutor", 60, TimeUnit.SECONDS); + } - try { - if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { - LOG.warn("ExecutorService did not terminate within the specified timeout."); - } - } catch (InterruptedException e) { - LOG.error("Interrupted while waiting for ExecutorService to terminate.", e); - List stillAwaitingTasks = executorService.shutdownNow(); // Force shutdown - LOG.info( - "Forced shutdown initiated due to interruption. {} tasks were awaiting execution.", - stillAwaitingTasks.size()); - Thread.currentThread().interrupt(); // Restore interrupt status + private void processTask(IndexingTask task, JobExecutionContext jobExecutionContext) { + String entityType = task.entityType(); + List entities = task.entities(); + Map contextData = new HashMap<>(); + contextData.put(ENTITY_TYPE_KEY, entityType); + + try { + if (!TIME_SERIES_ENTITIES.contains(entityType)) { + @SuppressWarnings("unchecked") + List entityList = (List) entities; + searchIndexSink.write(entityList, contextData); + } else { + @SuppressWarnings("unchecked") + List entityList = (List) entities; + searchIndexSink.write(entityList, contextData); + } + + // After successful write, create a new StepStats for the current batch + StepStats currentEntityStats = new StepStats(); + currentEntityStats.setSuccessRecords(entities.size()); + currentEntityStats.setFailedRecords(0); + // Do NOT set Total Records here + + // Update statistics in a thread-safe manner + synchronized (jobDataLock) { + updateStats(entityType, currentEntityStats); + } + + } catch (Exception e) { + synchronized (jobDataLock) { + jobData.setStatus(EventPublisherJob.Status.FAILED); + jobData.setFailure( + new IndexingError() + .withErrorSource(IndexingError.ErrorSource.JOB) + .withMessage(e.getMessage())); + + StepStats failedEntityStats = new StepStats(); + failedEntityStats.setSuccessRecords(0); + failedEntityStats.setFailedRecords(entities.size()); + updateStats(entityType, failedEntityStats); + } + LOG.error("Unexpected error during processing task for entity {}", entityType, e); + } finally { + sendUpdates(jobExecutionContext); + } + } + + @NotNull + private Source createSource(String entityType) { + List fields = List.of("*"); + Source source; + + if (!TIME_SERIES_ENTITIES.contains(entityType)) { + PaginatedEntitiesSource paginatedSource = + new PaginatedEntitiesSource(entityType, batchSize.get(), fields); + if (!CommonUtil.nullOrEmpty(jobData.getAfterCursor())) { + paginatedSource.getCursor().set(jobData.getAfterCursor()); + } + source = paginatedSource; + } else { + PaginatedEntityTimeSeriesSource paginatedSource = + new PaginatedEntityTimeSeriesSource(entityType, batchSize.get(), fields); + if (!CommonUtil.nullOrEmpty(jobData.getAfterCursor())) { + paginatedSource.getCursor().set(jobData.getAfterCursor()); + } + source = paginatedSource; + } + + return source; + } + + private void processEntityType(String entityType) + throws InterruptedException, ExecutionException { + int totalEntityRecords = getTotalEntityRecords(entityType); + if (totalEntityRecords > 0) { + Source source = createSource(entityType); + int loadPerThread = calculateLoadPerThread(totalEntityRecords); + List> futures = submitReaderTasks(entityType, source, loadPerThread); + for (Future future : futures) { + future.get(); } } } + + private int getTotalEntityRecords(String entityType) { + return ((StepStats) + searchIndexStats.get().getEntityStats().getAdditionalProperties().get(entityType)) + .getTotalRecords(); + } + + private List> submitReaderTasks( + String entityType, Source source, int loadPerThread) { + List> futures = new ArrayList<>(); + for (int i = 0; i < loadPerThread; i++) { + int currentOffset = i * batchSize.get(); + Future future = + entityReaderExecutor.submit(() -> processReadTask(entityType, source, currentOffset)); + futures.add(future); + } + return futures; + } + + private void processReadTask(String entityType, Source source, int offset) { + try { + Object resultList = source.readWithCursor(RestUtil.encodeCursor(String.valueOf(offset))); + if (resultList != null) { + List entities = extractEntities(entityType, resultList); + if (entities != null && !entities.isEmpty()) { + LOG.info( + "Creating Indexing Task for entityType: {}, current offset: {}", entityType, offset); + createIndexingTask(entityType, entities, offset); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Reader thread interrupted for entityType: {}", entityType); + } catch (SearchIndexException e) { + LOG.error("Error while reading source for entityType: {}", entityType, e); + } + } + + private void createIndexingTask(String entityType, List entities, int offset) + throws InterruptedException { + IndexingTask task = new IndexingTask<>(entityType, entities, offset); + taskQueue.put(task); + } + + private synchronized int calculateLoadPerThread(int totalEntityRecords) { + int mod = totalEntityRecords % batchSize.get(); + if (mod == 0) { + return totalEntityRecords / batchSize.get(); + } else { + return (totalEntityRecords / batchSize.get()) + 1; + } + } + + @SuppressWarnings("unchecked") + private List extractEntities(String entityType, Object resultList) { + if (!TIME_SERIES_ENTITIES.contains(entityType)) { + return ((ResultList) resultList).getData(); + } else { + return ((ResultList) resultList).getData(); + } + } + + private record IndexingTask(String entityType, List entities, int currentEntityOffset) { + public static final IndexingTask POISON_PILL = + new IndexingTask<>(null, Collections.emptyList(), -1); + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/Stats.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/Stats.java new file mode 100644 index 00000000000..b78f9739c6c --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/Stats.java @@ -0,0 +1,26 @@ +package org.openmetadata.service.apps.bundles.searchIndex; + +import java.util.concurrent.ConcurrentHashMap; +import lombok.Getter; +import org.openmetadata.schema.system.StepStats; + +@Getter +public class Stats { + private final StepStats jobStats = new StepStats(); + private final ConcurrentHashMap entityStats = new ConcurrentHashMap<>(); + + public void updateEntityStats(String entityType, StepStats stats) { + entityStats.merge( + entityType, + stats, + (existingStats, newStats) -> { + existingStats.setSuccessRecords( + existingStats.getSuccessRecords() + newStats.getSuccessRecords()); + existingStats.setFailedRecords( + existingStats.getFailedRecords() + newStats.getFailedRecords()); + existingStats.setTotalRecords( + existingStats.getTotalRecords() + newStats.getTotalRecords()); + return existingStats; + }); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java index 2e3968133e6..8d256f4167f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java @@ -1,5 +1,6 @@ package org.openmetadata.service.apps.scheduler; +import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_NAME; import java.util.HashMap; @@ -15,6 +16,7 @@ import org.openmetadata.schema.entity.app.SuccessContext; import org.openmetadata.service.apps.ApplicationHandler; import org.openmetadata.service.jdbi3.AppRepository; import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.socket.WebSocketManager; import org.openmetadata.service.util.JsonUtils; import org.quartz.JobDataMap; import org.quartz.JobExecutionContext; @@ -26,6 +28,8 @@ public abstract class AbstractOmAppJobListener implements JobListener { private final CollectionDAO collectionDAO; private final AppRepository repository; private static final String SCHEDULED_APP_RUN_EXTENSION = "AppScheduleRun"; + public static final String WEBSOCKET_STATUS_CHANNEL = "WebsocketStatusUpdateExtension"; + public static final String APP_RUN_STATS = "AppRunStats"; public static final String JOB_LISTENER_NAME = "OM_JOB_LISTENER"; @@ -120,6 +124,14 @@ public abstract class AbstractOmAppJobListener implements JobListener { runRecord.setFailureContext(context); } + // Push Update on WebSocket + String webSocketChannelName = + (String) jobExecutionContext.getJobDetail().getJobDataMap().get(WEBSOCKET_STATUS_CHANNEL); + if (!nullOrEmpty(webSocketChannelName) && WebSocketManager.getInstance() != null) { + WebSocketManager.getInstance() + .broadCastMessageToAll(webSocketChannelName, JsonUtils.pojoToJson(runRecord)); + } + // Update App Run Record pushApplicationStatusUpdates(jobExecutionContext, runRecord, true); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index 8bab5f64f8a..3121754facb 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -4254,7 +4254,7 @@ public interface CollectionDAO { @ConnectionAwareSqlUpdate( value = - "UPDATE apps_extension_time_series SET json = JSON_SET(json, '$.status', 'stopped') where appId=:appId AND JSON_UNQUOTE(JSON_EXTRACT(json_column_name, '$.status')) = 'running' AND extension = 'status'", + "UPDATE apps_extension_time_series SET json = JSON_SET(json, '$.status', 'stopped') where appId=:appId AND JSON_UNQUOTE(JSON_EXTRACT(json, '$.status')) = 'running' AND extension = 'status'", connectionType = MYSQL) @ConnectionAwareSqlUpdate( value = diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java index 5f30663a2eb..ff77f9a2daa 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java @@ -80,9 +80,9 @@ import org.openmetadata.service.workflows.searchIndex.ReindexingUtil; @Slf4j public class SearchRepository { - @Getter private final SearchClient searchClient; + private volatile SearchClient searchClient; - private Map entityIndexMap; + @Getter private Map entityIndexMap; private final String language; @@ -119,6 +119,17 @@ public class SearchRepository { loadIndexMappings(); } + public SearchClient getSearchClient() { + if (searchClient == null) { + synchronized (SearchRepository.class) { + if (searchClient == null) { + searchClient = buildSearchClient(elasticSearchConfiguration); + } + } + } + return searchClient; + } + private void loadIndexMappings() { Set entities; entityIndexMap = new HashMap<>(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java index 7cd8164e378..ecfec907eb1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java @@ -1831,13 +1831,7 @@ public class ElasticSearchClient implements SearchClient { /** */ @Override - public void close() { - try { - this.client.close(); - } catch (Exception e) { - LOG.error("Failed to close elastic search", e); - } - } + public void close() {} @SneakyThrows private void deleteEntityFromElasticSearch(DeleteRequest deleteRequest) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java index 87286ce5e4e..c4a92656afe 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java @@ -1846,13 +1846,7 @@ public class OpenSearchClient implements SearchClient { /** */ @Override - public void close() { - try { - this.client.close(); - } catch (Exception e) { - LOG.error("Failed to close open search", e); - } - } + public void close() {} @Override public BulkResponse bulk(BulkRequest data, RequestOptions options) throws IOException { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/socket/WebSocketManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/socket/WebSocketManager.java index 75aa8eddb62..77e9a8336f3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/socket/WebSocketManager.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/socket/WebSocketManager.java @@ -24,7 +24,8 @@ public class WebSocketManager { @Getter private final SocketIoServer socketIoServer; public static final String FEED_BROADCAST_CHANNEL = "activityFeed"; public static final String TASK_BROADCAST_CHANNEL = "taskChannel"; - public static final String JOB_STATUS_BROADCAST_CHANNEL = "jobStatus"; + public static final String SEARCH_INDEX_JOB_BROADCAST_CHANNEL = "searchIndexJobStatus"; + public static final String DATA_INSIGHTS_JOB_BROADCAST_CHANNEL = "dataInsightsJobStatus"; public static final String MENTION_CHANNEL = "mentionChannel"; public static final String ANNOUNCEMENT_CHANNEL = "announcementChannel"; public static final String CSV_EXPORT_CHANNEL = "csvExportChannel"; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java index d22937d7d48..c631699a715 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java @@ -264,28 +264,67 @@ public class OpenMetadataOperations implements Callable { public Integer reIndex( @Option( names = {"-b", "--batch-size"}, - defaultValue = "100") + defaultValue = "100", + description = "Number of records to process in each batch.") int batchSize, @Option( names = {"-p", "--payload-size"}, - defaultValue = "104857600") + defaultValue = "104857600", + description = "Maximum size of the payload in bytes.") long payloadSize, @Option( names = {"--recreate-indexes"}, - defaultValue = "true") - boolean recreateIndexes) { + defaultValue = "true", + description = "Flag to determine if indexes should be recreated.") + boolean recreateIndexes, + @Option( + names = {"--num-threads"}, + defaultValue = "10", + description = "Number of threads to use for processing.") + int numThreads, + @Option( + names = {"--back-off"}, + defaultValue = "1000", + description = "Back-off time in milliseconds for retries.") + int backOff, + @Option( + names = {"--max-back-off"}, + defaultValue = "10000", + description = "Max Back-off time in milliseconds for retries.") + int maxBackOff, + @Option( + names = {"--max-requests"}, + defaultValue = "100", + description = "Maximum number of concurrent search requests.") + int maxRequests, + @Option( + names = {"--retries"}, + defaultValue = "3", + description = "Maximum number of retries for failed search requests.") + int retries) { try { + LOG.info( + "Running Reindexing with Batch Size: {}, Payload Size: {}, Recreate-Index: {}", + batchSize, + payloadSize, + recreateIndexes); parseConfig(); CollectionRegistry.initialize(); ApplicationHandler.initialize(config); - // load seed data so that repositories are initialized CollectionRegistry.getInstance().loadSeedData(jdbi, config, null, null, null, true); ApplicationHandler.initialize(config); - // creates the default search index application AppScheduler.initialize(config, collectionDAO, searchRepository); - String appName = "SearchIndexingApplication"; - return executeSearchReindexApp(appName, batchSize, payloadSize, recreateIndexes); + return executeSearchReindexApp( + appName, + batchSize, + payloadSize, + recreateIndexes, + numThreads, + backOff, + maxBackOff, + maxRequests, + retries); } catch (Exception e) { LOG.error("Failed to reindex due to ", e); return 1; @@ -293,7 +332,15 @@ public class OpenMetadataOperations implements Callable { } private int executeSearchReindexApp( - String appName, int batchSize, long payloadSize, boolean recreateIndexes) { + String appName, + int batchSize, + long payloadSize, + boolean recreateIndexes, + int numThreads, + int backOff, + int maxBackOff, + int maxRequests, + int retries) { AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION); App originalSearchIndexApp = appRepository.getByName(null, appName, appRepository.getFields("id")); @@ -307,9 +354,14 @@ public class OpenMetadataOperations implements Callable { .withBatchSize(batchSize) .withPayLoadSize(payloadSize) .withRecreateIndex(recreateIndexes) + .withNumberOfThreads(numThreads) + .withInitialBackoff(backOff) + .withMaxBackoff(maxBackOff) + .withMaxConcurrentRequests(maxRequests) + .withMaxRetries(retries) .withEntities(Set.of("all")); - // Update the search index app with the new batch size, payload size and recreate index flag + // Update the search index app with the new configurations App updatedSearchIndexApp = JsonUtils.deepCopy(originalSearchIndexApp, App.class); updatedSearchIndexApp.withAppConfiguration(updatedJob); JsonPatch patch = JsonUtils.getJsonPatch(originalSearchIndexApp, updatedSearchIndexApp); @@ -322,7 +374,7 @@ public class OpenMetadataOperations implements Callable { int result = waitAndReturnReindexingAppStatus(updatedSearchIndexApp, currentTime); - // Repatch with original + // Re-patch with original configuration JsonPatch repatch = JsonUtils.getJsonPatch(updatedSearchIndexApp, originalSearchIndexApp); appRepository.patch(null, originalSearchIndexApp.getId(), "admin", repatch); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/interfaces/Source.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/interfaces/Source.java index ca59c04d14f..f95329b5aee 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/interfaces/Source.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/interfaces/Source.java @@ -13,13 +13,18 @@ package org.openmetadata.service.workflows.interfaces; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import org.openmetadata.service.exception.SearchIndexException; +import org.openmetadata.service.jdbi3.ListFilter; public interface Source extends Stats { R readNext(Map contextData) throws SearchIndexException; + R readWithCursor(String currentCursor) throws SearchIndexException; + List getReaderErrors(); void reset(); @@ -30,5 +35,13 @@ public interface Source extends Stats { String getLastFailedCursor(); - boolean isDone(); + default List getFields() { + return new ArrayList<>(); + } + + ListFilter getFilter(); + + AtomicReference isDone(); + + AtomicReference getCursor(); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java index f4cab9af6b9..234d079ee35 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java @@ -19,8 +19,8 @@ import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getU import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import lombok.Getter; -import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.glassfish.jersey.internal.util.ExceptionUtils; import org.openmetadata.schema.EntityInterface; @@ -36,17 +36,18 @@ import org.openmetadata.service.util.ResultList; import org.openmetadata.service.workflows.interfaces.Source; @Slf4j +@Getter public class PaginatedEntitiesSource implements Source> { - @Getter private String name = "PaginatedEntitiesSource"; - @Getter private final int batchSize; - @Getter private final String entityType; - @Getter private final List fields; - @Getter private final List readerErrors = new ArrayList<>(); - @Getter private final StepStats stats = new StepStats(); - @Getter private ListFilter filter; - @Getter private String lastFailedCursor = null; - @Setter private String cursor = RestUtil.encodeCursor("0"); - @Getter private boolean isDone = false; + private String name = "PaginatedEntitiesSource"; + private final int batchSize; + private final String entityType; + private final List fields; + private final List readerErrors = new ArrayList<>(); + private final StepStats stats = new StepStats(); + private final ListFilter filter; + private String lastFailedCursor = null; + private final AtomicReference cursor = new AtomicReference<>(RestUtil.encodeCursor("0")); + private final AtomicReference isDone = new AtomicReference<>(false); public PaginatedEntitiesSource(String entityType, int batchSize, List fields) { this.entityType = entityType; @@ -80,11 +81,11 @@ public class PaginatedEntitiesSource implements Source readNext(Map contextData) throws SearchIndexException { ResultList data = null; - if (!isDone) { - data = read(cursor); - cursor = data.getPaging().getAfter(); - if (cursor == null) { - isDone = true; + if (Boolean.FALSE.equals(isDone.get())) { + data = read(cursor.get()); + cursor.set(data.getPaging().getAfter()); + if (cursor.get() == null) { + isDone.set(true); } } return data; @@ -99,35 +100,36 @@ public class PaginatedEntitiesSource implements Source readWithCursor(String currentCursor) + throws SearchIndexException { + LOG.debug("[PaginatedEntitiesSource] Fetching a Batch of Size: {} ", batchSize); + EntityRepository entityRepository = Entity.getEntityRepository(entityType); + ResultList result; + try { + result = + entityRepository.listAfterWithSkipFailure( + null, Entity.getFields(entityType, fields), filter, batchSize, currentCursor); + LOG.debug( + "[PaginatedEntitiesSource] Batch Stats :- %n Submitted : {} Success: {} Failed: {}", + batchSize, result.getData().size(), result.getErrors().size()); + + } catch (Exception e) { + IndexingError indexingError = + new IndexingError() + .withErrorSource(READER) + .withSuccessCount(0) + .withMessage( + "Issues in Reading A Batch For Entities. No Relationship Issue , Json Processing or DB issue.") + .withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e)); + LOG.debug(indexingError.getMessage()); + throw new SearchIndexException(indexingError); + } + return result; + } + @Override public void reset() { - cursor = null; - isDone = false; + cursor.set(null); + isDone.set(Boolean.FALSE); + } + + @Override + public AtomicReference isDone() { + return isDone; } @Override diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSource.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSource.java index d8a46a28db5..c86fec420a8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSource.java @@ -6,8 +6,8 @@ import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getU import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import lombok.Getter; -import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.glassfish.jersey.internal.util.ExceptionUtils; import org.openmetadata.schema.EntityTimeSeriesInterface; @@ -23,16 +23,22 @@ import org.openmetadata.service.util.ResultList; import org.openmetadata.service.workflows.interfaces.Source; @Slf4j +@Getter public class PaginatedEntityTimeSeriesSource implements Source> { - @Getter private final int batchSize; - @Getter private final String entityType; - @Getter private final List fields; - @Getter private final List readerErrors = new ArrayList<>(); - @Getter private final StepStats stats = new StepStats(); - @Getter private String lastFailedCursor = null; - @Setter private String cursor = RestUtil.encodeCursor("0"); - @Getter private boolean isDone = false; + private final int batchSize; + private final String entityType; + private final List fields; + private final List readerErrors = new ArrayList<>(); + private final StepStats stats = new StepStats(); + private String lastFailedCursor = null; + + // Make cursor thread-safe using AtomicReference + private final AtomicReference cursor = new AtomicReference<>(RestUtil.encodeCursor("0")); + + private final AtomicReference isDone = new AtomicReference<>(false); + private Long startTs; + private Long endTs; public PaginatedEntityTimeSeriesSource(String entityType, int batchSize, List fields) { this.entityType = entityType; @@ -44,20 +50,67 @@ public class PaginatedEntityTimeSeriesSource .withFailedRecords(0); } + public PaginatedEntityTimeSeriesSource( + String entityType, int batchSize, List fields, Long startTs, Long endTs) { + this.entityType = entityType; + this.batchSize = batchSize; + this.fields = fields; + this.stats + .withTotalRecords(getEntityTimeSeriesRepository().getTimeSeriesDao().listCount(getFilter())) + .withSuccessRecords(0) + .withFailedRecords(0); + this.startTs = startTs; + this.endTs = endTs; + } + @Override public ResultList readNext(Map contextData) throws SearchIndexException { ResultList data = null; - if (!isDone) { - data = read(cursor); - cursor = data.getPaging().getAfter(); - if (cursor == null) { - isDone = true; + if (Boolean.FALSE.equals(isDone.get())) { + data = read(cursor.get()); // Use cursor.get() to retrieve the current value + cursor.set(data.getPaging().getAfter()); // Use cursor.set() to update the value + if (cursor.get() == null) { + isDone.set(true); } } return data; } + @Override + public ResultList readWithCursor(String currentCursor) + throws SearchIndexException { + LOG.debug("[PaginatedEntityTimeSeriesSource] Fetching a Batch of Size: {} ", batchSize); + EntityTimeSeriesRepository repository = + getEntityTimeSeriesRepository(); + ResultList result; + ListFilter filter = getFilter(); + try { + if (startTs != null && endTs != null) { + result = + repository.listWithOffset( + currentCursor, filter, batchSize, startTs, endTs, false, true); + } else { + result = repository.listWithOffset(currentCursor, filter, batchSize, true); + } + LOG.debug( + "[PaginatedEntitiesSource] Batch Stats :- %n Submitted : {} Success: {} Failed: {}", + batchSize, result.getData().size(), result.getErrors().size()); + } catch (Exception e) { + IndexingError indexingError = + new IndexingError() + .withErrorSource(READER) + .withSuccessCount(0) + .withMessage( + "Issues in Reading A Batch For Entities. No Relationship Issue , Json Processing or DB issue.") + .withLastFailedCursor(lastFailedCursor) + .withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e)); + LOG.debug(indexingError.getMessage()); + throw new SearchIndexException(indexingError); + } + return result; + } + private ResultList read(String cursor) throws SearchIndexException { LOG.debug("[PaginatedEntityTimeSeriesSource] Fetching a Batch of Size: {} ", batchSize); @@ -66,13 +119,18 @@ public class PaginatedEntityTimeSeriesSource ResultList result; ListFilter filter = getFilter(); try { - result = repository.listWithOffset(cursor, filter, batchSize, true); + if (startTs != null && endTs != null) { + result = repository.listWithOffset(cursor, filter, batchSize, startTs, endTs, false, true); + } else { + result = repository.listWithOffset(cursor, filter, batchSize, true); + } + if (!result.getErrors().isEmpty()) { - lastFailedCursor = this.cursor; + lastFailedCursor = this.cursor.get(); if (result.getPaging().getAfter() == null) { - isDone = true; + isDone.set(true); } else { - this.cursor = result.getPaging().getAfter(); + this.cursor.set(result.getPaging().getAfter()); } return result; } @@ -80,20 +138,20 @@ public class PaginatedEntityTimeSeriesSource "[PaginatedEntitiesSource] Batch Stats :- %n Submitted : {} Success: {} Failed: {}", batchSize, result.getData().size(), result.getErrors().size()); } catch (Exception e) { - lastFailedCursor = this.cursor; + lastFailedCursor = this.cursor.get(); int remainingRecords = stats.getTotalRecords() - stats.getFailedRecords() - stats.getSuccessRecords(); int submittedRecords; if (remainingRecords - batchSize <= 0) { submittedRecords = remainingRecords; updateStats(0, remainingRecords); - this.cursor = null; - this.isDone = true; + this.cursor.set(null); // Set cursor to null + this.isDone.set(true); } else { submittedRecords = batchSize; String decodedCursor = RestUtil.decodeCursor(cursor); - this.cursor = - RestUtil.encodeCursor(String.valueOf(Integer.parseInt(decodedCursor) + batchSize)); + this.cursor.set( + RestUtil.encodeCursor(String.valueOf(Integer.parseInt(decodedCursor) + batchSize))); updateStats(0, batchSize); } IndexingError indexingError = @@ -114,8 +172,8 @@ public class PaginatedEntityTimeSeriesSource @Override public void reset() { - cursor = null; - isDone = false; + cursor.set(null); + isDone.set(false); } @Override @@ -123,7 +181,7 @@ public class PaginatedEntityTimeSeriesSource getUpdatedStats(stats, currentSuccess, currentFailed); } - private ListFilter getFilter() { + public ListFilter getFilter() { ListFilter filter = new ListFilter(null); if (ReindexingUtil.isDataInsightIndex(entityType)) { filter.addQueryParam("entityFQNHash", FullyQualifiedName.buildHash(entityType)); @@ -139,4 +197,9 @@ public class PaginatedEntityTimeSeriesSource return Entity.getEntityTimeSeriesRepository(entityType); } } + + @Override + public AtomicReference isDone() { + return isDone; + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java index 7799f4d5797..4fb479855aa 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java @@ -28,10 +28,10 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.openmetadata.common.utils.CommonUtil; import org.openmetadata.schema.system.EntityError; +import org.openmetadata.schema.system.Stats; import org.openmetadata.schema.system.StepStats; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.service.Entity; -import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.jdbi3.EntityTimeSeriesRepository; import org.openmetadata.service.jdbi3.ListFilter; @@ -59,14 +59,19 @@ public class ReindexingUtil { return Entity.getSearchRepository().getDataInsightReports().contains(entityType); } - public static int getTotalRequestToProcess(Set entities, CollectionDAO dao) { + public static Stats getInitialStatsForEntities(Set entities) { + Stats initialStats = new Stats(); + StepStats entityLevelStat = new StepStats(); int total = 0; for (String entityType : entities) { try { if (!TIME_SERIES_ENTITIES.contains(entityType)) { EntityRepository repository = Entity.getEntityRepository(entityType); - total += repository.getDao().listTotalCount(); + int entityCount = repository.getDao().listTotalCount(); + total += entityCount; + entityLevelStat.withAdditionalProperty( + entityType, new StepStats().withTotalRecords(entityCount)); } else { EntityTimeSeriesRepository repository; ListFilter listFilter = new ListFilter(null); @@ -76,13 +81,18 @@ public class ReindexingUtil { } else { repository = Entity.getEntityTimeSeriesRepository(entityType); } - total += repository.getTimeSeriesDao().listCount(listFilter); + int entityCount = repository.getTimeSeriesDao().listCount(listFilter); + total += entityCount; + entityLevelStat.withAdditionalProperty( + entityType, new StepStats().withTotalRecords(entityCount)); } } catch (Exception e) { LOG.debug("Error while getting total entities to index", e); } } - return total; + initialStats.setJobStats(new StepStats().withTotalRecords(total)); + initialStats.setEntityStats(entityLevelStat); + return initialStats; } public static int getSuccessFromBulkResponse(BulkResponse response) { diff --git a/openmetadata-service/src/main/resources/json/data/app/SearchIndexingApplication.json b/openmetadata-service/src/main/resources/json/data/app/SearchIndexingApplication.json index fd461c6d65f..0909fbac2e9 100644 --- a/openmetadata-service/src/main/resources/json/data/app/SearchIndexingApplication.json +++ b/openmetadata-service/src/main/resources/json/data/app/SearchIndexingApplication.json @@ -46,6 +46,11 @@ "recreateIndex": false, "batchSize": "100", "payLoadSize": 104857600, + "numberOfThreads": 5, + "maxConcurrentRequests": 100, + "maxRetries": 3, + "initialBackoff": 1000, + "maxBackoff": 10000, "searchIndexMappingLanguage": "EN" }, "appSchedule": { diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/applications/configuration/internal/searchIndexingAppConfig.json b/openmetadata-spec/src/main/resources/json/schema/entity/applications/configuration/internal/searchIndexingAppConfig.json index f05af337987..dfc5423e68a 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/applications/configuration/internal/searchIndexingAppConfig.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/applications/configuration/internal/searchIndexingAppConfig.json @@ -44,6 +44,36 @@ "existingJavaType": "java.lang.Long", "default": 104857600 }, + "numberOfThreads": { + "title": "Number of Threads", + "description": "Number of threads to use for reindexing", + "type": "integer", + "default": 5 + }, + "maxConcurrentRequests": { + "title": "Max Concurrent Requests", + "description": "Maximum number of concurrent requests to the search index", + "type": "integer", + "default": 100 + }, + "maxRetries": { + "title": "Max Retries", + "description": "Maximum number of retries for a failed request", + "type": "integer", + "default": 3 + }, + "initialBackoff": { + "title": "Initial Backoff Millis", + "description": "Initial backoff time in milliseconds", + "type": "integer", + "default": 1000 + }, + "maxBackoff": { + "title": "Max Backoff Millis", + "description": "Maximum backoff time in milliseconds", + "type": "integer", + "default": 10000 + }, "searchIndexMappingLanguage": { "description": "Recreate Indexes with updated Language", "$ref": "../../../../configuration/elasticSearchConfiguration.json#/definitions/searchIndexMappingLanguage" diff --git a/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json b/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json index 4a4f8b123f7..6767c847238 100644 --- a/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json +++ b/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json @@ -99,7 +99,8 @@ }, "batchSize": { "description": "Maximum number of events sent in a batch (Default 10).", - "type": "integer" + "type": "integer", + "default": 100 }, "payLoadSize": { "description": "Payload size in bytes depending on config.", @@ -107,6 +108,36 @@ "existingJavaType": "java.lang.Long", "default": 104857600 }, + "numberOfThreads": { + "title": "Number of Threads", + "description": "Number of threads to use for reindexing", + "type": "integer", + "default": 5 + }, + "maxConcurrentRequests": { + "title": "Max Concurrent Requests", + "description": "Maximum number of concurrent requests to the search index", + "type": "integer", + "default": 100 + }, + "maxRetries": { + "title": "Max Retries", + "description": "Maximum number of retries for a failed request", + "type": "integer", + "default": 5 + }, + "initialBackoff": { + "title": "Initial Backoff Millis", + "description": "Initial backoff time in milliseconds", + "type": "integer", + "default": 1000 + }, + "maxBackoff": { + "title": "Max Backoff Millis", + "description": "Maximum backoff time in milliseconds", + "type": "integer", + "default": 10000 + }, "searchIndexMappingLanguage": { "description": "Recreate Indexes with updated Language", "$ref": "../configuration/elasticSearchConfiguration.json#/definitions/searchIndexMappingLanguage" diff --git a/openmetadata-ui/src/main/resources/ui/playwright/e2e/Pages/DataInsight.spec.ts b/openmetadata-ui/src/main/resources/ui/playwright/e2e/Pages/DataInsight.spec.ts index 00577f96aa0..88083914ad4 100644 --- a/openmetadata-ui/src/main/resources/ui/playwright/e2e/Pages/DataInsight.spec.ts +++ b/openmetadata-ui/src/main/resources/ui/playwright/e2e/Pages/DataInsight.spec.ts @@ -169,7 +169,7 @@ test.describe('Data Insight Page', { tag: '@data-insight' }, () => { await redirectToHomePage(page); await kpiResponse; - + await page.waitForSelector('[data-testid="loader"]', { state: 'detached' }); expect(page.locator('[data-testid="kpi-widget"]')).toBeVisible(); diff --git a/openmetadata-ui/src/main/resources/ui/playwright/e2e/Pages/DataInsightSettings.spec.ts b/openmetadata-ui/src/main/resources/ui/playwright/e2e/Pages/DataInsightSettings.spec.ts index da15de86ca2..db7aeae50fc 100644 --- a/openmetadata-ui/src/main/resources/ui/playwright/e2e/Pages/DataInsightSettings.spec.ts +++ b/openmetadata-ui/src/main/resources/ui/playwright/e2e/Pages/DataInsightSettings.spec.ts @@ -215,4 +215,4 @@ test.describe.serial( }); } } -); \ No newline at end of file +); diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppLogsViewer/AppLogsViewer.component.tsx b/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppLogsViewer/AppLogsViewer.component.tsx index 126af27437b..3c440f2fa08 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppLogsViewer/AppLogsViewer.component.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppLogsViewer/AppLogsViewer.component.tsx @@ -40,7 +40,7 @@ import { JobStats, } from './AppLogsViewer.interface'; -const AppLogsViewer = ({ data }: AppLogsViewerProps) => { +const AppLogsViewer = ({ data, scrollHeight }: AppLogsViewerProps) => { const { t } = useTranslation(); const { successContext, failureContext, timestamp, status } = data; @@ -251,7 +251,7 @@ const AppLogsViewer = ({ data }: AppLogsViewerProps) => { dataSource={getEntityStatsData(entityStats)} pagination={false} rowKey="name" - scroll={{ y: 200 }} + scroll={scrollHeight ? { y: scrollHeight } : undefined} size="small" /> ); diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppLogsViewer/AppLogsViewer.interface.ts b/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppLogsViewer/AppLogsViewer.interface.ts index fbccbb5ccb5..bc9e51d7cb7 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppLogsViewer/AppLogsViewer.interface.ts +++ b/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppLogsViewer/AppLogsViewer.interface.ts @@ -16,6 +16,7 @@ import { AppRunRecord } from '../../../../generated/entity/applications/appRunRe export interface AppLogsViewerProps { data: AppRunRecord; + scrollHeight?: number; } export interface TotalRecords { diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppRunsHistory/AppRunsHistory.component.tsx b/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppRunsHistory/AppRunsHistory.component.tsx index aecccc6167c..8cbdf24155c 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppRunsHistory/AppRunsHistory.component.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppRunsHistory/AppRunsHistory.component.tsx @@ -26,9 +26,11 @@ import { useTranslation } from 'react-i18next'; import { useHistory } from 'react-router-dom'; import { NO_DATA_PLACEHOLDER, + SOCKET_EVENTS, STATUS_LABEL, } from '../../../../constants/constants'; import { GlobalSettingOptions } from '../../../../constants/GlobalSettings.constants'; +import { useWebSocketConnector } from '../../../../context/WebSocketProvider/WebSocketProvider'; import { AppType } from '../../../../generated/entity/applications/app'; import { Status } from '../../../../generated/entity/applications/appRunRecord'; import { @@ -45,7 +47,9 @@ import { } from '../../../../utils/ApplicationUtils'; import { formatDateTime, + formatDuration, getEpochMillisForPastDays, + getIntervalInMilliseconds, } from '../../../../utils/date-time/DateTimeUtils'; import { getLogsViewerPath } from '../../../../utils/RouterUtils'; import { showErrorToast } from '../../../../utils/ToastUtils'; @@ -66,6 +70,7 @@ const AppRunsHistory = forwardRef( { appData, maxRecords, showPagination = true }: AppRunsHistoryProps, ref ) => { + const { socket } = useWebSocketConnector(); const { t } = useTranslation(); const { fqn } = useFqn(); const [isLoading, setIsLoading] = useState(true); @@ -122,7 +127,7 @@ const AppRunsHistory = forwardRef( return true; } - return record.status === Status.Running; + return false; }, []); const getActionButton = useCallback( @@ -174,6 +179,23 @@ const AppRunsHistory = forwardRef( {runType ?? NO_DATA_PLACEHOLDER} ), }, + { + title: t('label.duration'), + dataIndex: 'executionTime', + key: 'executionTime', + render: (_, record: AppRunRecordWithId) => { + if (record.startTime && record.endTime) { + const ms = getIntervalInMilliseconds( + record.startTime, + record.endTime + ); + + return formatDuration(ms); + } else { + return '-'; + } + }, + }, { title: t('label.status'), dataIndex: 'status', @@ -270,6 +292,25 @@ const AppRunsHistory = forwardRef( } as Paging); }; + const handleAppHistoryRecordUpdate = ( + updatedRecord: AppRunRecordWithId + ) => { + setAppRunsHistoryData((prev) => { + const updatedData = prev.map((item) => { + if ( + item.appId === updatedRecord.appId && + item.startTime === updatedRecord.startTime + ) { + return { ...updatedRecord, id: item.id }; + } + + return item; + }); + + return updatedData; + }); + }; + useImperativeHandle(ref, () => ({ refreshAppHistory() { fetchAppHistory(); @@ -280,6 +321,31 @@ const AppRunsHistory = forwardRef( fetchAppHistory(); }, [fqn, pageSize]); + useEffect(() => { + if (socket) { + socket.on(SOCKET_EVENTS.SEARCH_INDEX_JOB_BROADCAST_CHANNEL, (data) => { + if (data) { + const searchIndexJob = JSON.parse(data); + handleAppHistoryRecordUpdate(searchIndexJob); + } + }); + + socket.on(SOCKET_EVENTS.DATA_INSIGHTS_JOB_BROADCAST_CHANNEL, (data) => { + if (data) { + const dataInsightJob = JSON.parse(data); + handleAppHistoryRecordUpdate(dataInsightJob); + } + }); + } + + return () => { + if (socket) { + socket.off(SOCKET_EVENTS.SEARCH_INDEX_JOB_BROADCAST_CHANNEL); + socket.off(SOCKET_EVENTS.DATA_INSIGHTS_JOB_BROADCAST_CHANNEL); + } + }; + }, [socket]); + return ( @@ -289,7 +355,12 @@ const AppRunsHistory = forwardRef( data-testid="app-run-history-table" dataSource={appRunsHistoryData} expandable={{ - expandedRowRender: (record) => , + expandedRowRender: (record) => ( + + ), showExpandColumn: false, rowExpandable: (record) => !showLogAction(record), expandedRowKeys, diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppSchedule/AppSchedule.component.tsx b/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppSchedule/AppSchedule.component.tsx index e36ccf6ca9c..4be06ce0249 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppSchedule/AppSchedule.component.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppSchedule/AppSchedule.component.tsx @@ -112,7 +112,11 @@ const AppSchedule = ({ const onAppTrigger = async () => { await onDemandTrigger(); - appRunsHistoryRef.current?.refreshAppHistory(); + + // Refresh the app history after 750ms to get the latest run as the run is triggered asynchronously + setTimeout(() => { + appRunsHistoryRef.current?.refreshAppHistory(); + }, 750); }; const appRunHistory = useMemo(() => { diff --git a/openmetadata-ui/src/main/resources/ui/src/constants/constants.ts b/openmetadata-ui/src/main/resources/ui/src/constants/constants.ts index 7612ffaa8d9..fc4a82c9d55 100644 --- a/openmetadata-ui/src/main/resources/ui/src/constants/constants.ts +++ b/openmetadata-ui/src/main/resources/ui/src/constants/constants.ts @@ -292,6 +292,8 @@ export const SOCKET_EVENTS = { MENTION_CHANNEL: 'mentionChannel', JOB_STATUS: 'jobStatus', CSV_EXPORT_CHANNEL: 'csvExportChannel', + SEARCH_INDEX_JOB_BROADCAST_CHANNEL: 'searchIndexJobStatus', + DATA_INSIGHTS_JOB_BROADCAST_CHANNEL: 'dataInsightsJobStatus', }; export const IN_PAGE_SEARCH_ROUTES: Record> = { diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationSchemas/SearchIndexingApplication.json b/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationSchemas/SearchIndexingApplication.json index 392fb78ec0a..039bf0539b6 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationSchemas/SearchIndexingApplication.json +++ b/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationSchemas/SearchIndexingApplication.json @@ -16,6 +16,36 @@ "type": "integer", "default": 104857600 }, + "numberOfThreads": { + "title": "Number of Threads", + "description": "Number of threads to use for reindexing", + "type": "integer", + "default": 5 + }, + "maxConcurrentRequests": { + "title": "Max Concurrent Requests", + "description": "Maximum number of concurrent requests to the search index", + "type": "integer", + "default": 100 + }, + "maxRetries": { + "title": "Max Retries", + "description": "Maximum number of retries for a failed request", + "type": "integer", + "default": 5 + }, + "initialBackoff": { + "title": "Initial Backoff Millis", + "description": "Initial backoff time in milliseconds", + "type": "integer", + "default": 1000 + }, + "maxBackoff": { + "title": "Max Backoff Millis", + "description": "Maximum backoff time in milliseconds", + "type": "integer", + "default": 10000 + }, "entities": { "title": "Entities", "description": "List of entities that you need to reindex", diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationUtils.tsx b/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationUtils.tsx index 574cac598e0..089cf46d136 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationUtils.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationUtils.tsx @@ -58,8 +58,19 @@ export const getStatusFromPipelineState = (status: PipelineState) => { }; export const getEntityStatsData = (data: EntityStats): EntityStatsData[] => { - return Object.keys(data).map((key) => ({ - name: upperFirst(key), - ...data[key as EntityTypeSearchIndex], - })); + const filteredRow = ['failedRecords', 'totalRecords', 'successRecords']; + + return Object.keys(data).reduce((acc, key) => { + if (filteredRow.includes(key)) { + return acc; + } + + return [ + ...acc, + { + name: upperFirst(key), + ...data[key as EntityTypeSearchIndex], + }, + ]; + }, [] as EntityStatsData[]); }; diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/TagClassBase.test.ts b/openmetadata-ui/src/main/resources/ui/src/utils/TagClassBase.test.ts index a393d968c22..396cce6ac06 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/TagClassBase.test.ts +++ b/openmetadata-ui/src/main/resources/ui/src/utils/TagClassBase.test.ts @@ -16,7 +16,6 @@ import tagClassBase, { TagClassBase } from './TagClassBase'; jest.mock('../rest/searchAPI'); - jest.mock('./StringsUtils', () => ({ getEncodedFqn: jest.fn().mockReturnValue('test'), escapeESReservedCharacters: jest.fn().mockReturnValue('test'), diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/date-time/DateTimeUtils.ts b/openmetadata-ui/src/main/resources/ui/src/utils/date-time/DateTimeUtils.ts index 677dd07158c..06c79f8dbd2 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/date-time/DateTimeUtils.ts +++ b/openmetadata-ui/src/main/resources/ui/src/utils/date-time/DateTimeUtils.ts @@ -186,6 +186,18 @@ export const isValidDateFormat = (format: string) => { } }; +export const getIntervalInMilliseconds = ( + startTime: number, + endTime: number +) => { + const startDateTime = DateTime.fromMillis(startTime); + const endDateTime = DateTime.fromMillis(endTime); + + const interval = endDateTime.diff(startDateTime); + + return interval.milliseconds; +}; + /** * Calculates the interval between two timestamps in milliseconds * and returns the result as a formatted string "X Days, Y Hours". @@ -199,11 +211,12 @@ export const calculateInterval = ( endTime: number ): string => { try { - const startDateTime = DateTime.fromMillis(startTime); - const endDateTime = DateTime.fromMillis(endTime); + const intervalInMilliseconds = getIntervalInMilliseconds( + startTime, + endTime + ); - const interval = endDateTime.diff(startDateTime); - const duration = Duration.fromMillis(interval.milliseconds); + const duration = Duration.fromMillis(intervalInMilliseconds); const days = Math.floor(duration.as('days')); const hours = Math.floor(duration.as('hours')) % 24; @@ -212,3 +225,63 @@ export const calculateInterval = ( return 'Invalid interval'; } }; + +const intervals: [string, number][] = [ + ['Y', 933120000000], // 1000 * 60 * 60 * 24 * 30 * 360 + ['M', 2592000000], // 1000 * 60 * 60 * 24 * 30 + ['d', 86400000], // 1000 * 60 * 60 * 24 + ['h', 3600000], // 1000 * 60 * 60 + ['m', 60000], // 1000 * 60 + ['s', 1000], // 1000 +]; + +/** + * Converts a given time in milliseconds to a human-readable format. + * + * @param milliseconds - The time duration in milliseconds to be converted. + * @returns A human-readable string representation of the time duration. + */ +export const convertMillisecondsToHumanReadableFormat = ( + milliseconds: number +): string => { + if (milliseconds <= 0) { + return '0s'; + } + + const result: string[] = []; + let remainingMilliseconds = milliseconds; + + for (const [name, count] of intervals) { + if (remainingMilliseconds < count) { + continue; // Skip smaller units + } + const value = Math.floor(remainingMilliseconds / count); + remainingMilliseconds %= count; + result.push(`${value}${name}`); + } + + return result.join(' '); +}; + +export const formatDuration = (ms: number) => { + const seconds = ms / 1000; + const minutes = seconds / 60; + const hours = minutes / 60; + + const pluralize = (value: number, unit: string) => + `${value.toFixed(2)} ${unit}${value !== 1 ? 's' : ''}`; + + if (seconds < 60) { + return pluralize(seconds, 'second'); + } else if (minutes < 60) { + return pluralize(minutes, 'minute'); + } else { + return pluralize(hours, 'hour'); + } +}; + +export const getStartOfDayInMillis = (timestamp: number) => + DateTime.fromMillis(timestamp).toUTC().startOf('day').toMillis(); + +export const getEndOfDayInMillis = (timestamp: number) => + DateTime.fromMillis(timestamp).toUTC().endOf('day').toMillis(); diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/mocks/ApplicationUtils.mock.ts b/openmetadata-ui/src/main/resources/ui/src/utils/mocks/ApplicationUtils.mock.ts index 359f7c8cf87..37e48628df7 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/mocks/ApplicationUtils.mock.ts +++ b/openmetadata-ui/src/main/resources/ui/src/utils/mocks/ApplicationUtils.mock.ts @@ -193,6 +193,9 @@ export const MOCK_APPLICATION_ENTITY_STATS = { failedRecords: 0, successRecords: 4, }, + totalRecords: 0, + successRecords: 0, + failedRecords: 0, }; export const MOCK_APPLICATION_ENTITY_STATS_DATA = [