Fix Search Index Contention (#18605)

* Fix Search Index Contention

* Update searchIndexingAppConfig.json

* Missing Error Logs and Stats

---------

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
(cherry picked from commit 9a5dc61ca7316e7f89052d95490ec451b80c5e7e)
This commit is contained in:
Mohit Yadav 2024-11-12 20:36:23 +05:30 committed by mohitdeuex
parent e04d637860
commit 42b4ea8de8
11 changed files with 179 additions and 113 deletions

View File

@ -0,0 +1,5 @@
-- Remove SearchIndexing for api Service, collection and endpoint
DELETE er FROM entity_relationship er JOIN installed_apps ia ON er.fromId = ia.id OR er.toId = ia.id WHERE ia.name = 'SearchIndexingApplication';
DELETE er FROM entity_relationship er JOIN apps_marketplace ia ON er.fromId = ia.id OR er.toId = ia.id WHERE ia.name = 'SearchIndexingApplication';
DELETE from installed_apps where name = 'SearchIndexingApplication';
DELETE from apps_marketplace where name = 'SearchIndexingApplication';

View File

@ -0,0 +1,5 @@
-- Remove SearchIndexing for api Service, collection and endpoint
DELETE FROM entity_relationship er USING installed_apps ia WHERE (er.fromId = ia.id OR er.toId = ia.id) AND ia.name = 'SearchIndexingApplication';
DELETE FROM entity_relationship er USING apps_marketplace ia WHERE (er.fromId = ia.id OR er.toId = ia.id) AND ia.name = 'SearchIndexingApplication';
DELETE from installed_apps where name = 'SearchIndexingApplication';
DELETE from apps_marketplace where name = 'SearchIndexingApplication';

View File

@ -1,5 +1,6 @@
package org.openmetadata.service.apps.bundles.searchIndex;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.Entity.API_COLLCECTION;
import static org.openmetadata.service.Entity.API_ENDPOINT;
import static org.openmetadata.service.Entity.API_SERVICE;
@ -46,18 +47,14 @@ import static org.openmetadata.service.socket.WebSocketManager.SEARCH_INDEX_JOB_
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isDataInsightIndex;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -65,7 +62,6 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.jetbrains.annotations.NotNull;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.EntityTimeSeriesInterface;
import org.openmetadata.schema.analytics.ReportData;
@ -154,22 +150,16 @@ public class SearchIndexApp extends AbstractNativeApplication {
TEST_CASE_RESULT);
// Constants to replace magic numbers
private static final int DEFAULT_PAYLOAD_SIZE = 100;
private static final int DEFAULT_BATCH_SIZE = 5;
private static final int DEFAULT_MAX_RETRIES = 1000;
private static final int DEFAULT_TIMEOUT = 10000;
private static final int MAX_CONSUMERS = 10;
private BulkSink searchIndexSink;
@Getter private EventPublisherJob jobData;
private final Object jobDataLock = new Object();
private volatile boolean stopped = false;
private ExecutorService producerExecutor;
private ExecutorService consumerExecutor;
private ExecutorService entityReaderExecutor;
private ExecutorService producerExecutor;
private final BlockingQueue<IndexingTask<?>> taskQueue = new LinkedBlockingQueue<>();
private final AtomicReference<Stats> searchIndexStats = new AtomicReference<>();
private final AtomicReference<Integer> batchSize = new AtomicReference<>(DEFAULT_BATCH_SIZE);
private final AtomicReference<Integer> batchSize = new AtomicReference<>(5);
public SearchIndexApp(CollectionDAO collectionDAO, SearchRepository searchRepository) {
super(collectionDAO, searchRepository);
@ -245,20 +235,20 @@ public class SearchIndexApp extends AbstractNativeApplication {
new OpenSearchIndexSink(
searchRepository.getSearchClient(),
jobData.getPayLoadSize(),
DEFAULT_PAYLOAD_SIZE,
DEFAULT_BATCH_SIZE,
DEFAULT_MAX_RETRIES,
DEFAULT_TIMEOUT);
jobData.getMaxConcurrentRequests(),
jobData.getMaxRetries(),
jobData.getInitialBackoff(),
jobData.getMaxBackoff());
LOG.info("Initialized OpenSearchIndexSink.");
} else {
this.searchIndexSink =
new ElasticSearchIndexSink(
searchRepository.getSearchClient(),
jobData.getPayLoadSize(),
DEFAULT_PAYLOAD_SIZE,
DEFAULT_BATCH_SIZE,
DEFAULT_MAX_RETRIES,
DEFAULT_TIMEOUT);
jobData.getMaxConcurrentRequests(),
jobData.getMaxRetries(),
jobData.getInitialBackoff(),
jobData.getMaxBackoff());
LOG.info("Initialized ElasticSearchIndexSink.");
}
}
@ -288,56 +278,68 @@ public class SearchIndexApp extends AbstractNativeApplication {
}
private void performReindex(JobExecutionContext jobExecutionContext) throws InterruptedException {
int numProducers = jobData.getEntities().size();
int numConsumers = calculateNumberOfConsumers();
int numProducers = jobData.getProducerThreads();
int numConsumers = jobData.getConsumerThreads();
LOG.info("Starting reindexing with {} producers and {} consumers.", numProducers, numConsumers);
producerExecutor = Executors.newFixedThreadPool(numProducers);
consumerExecutor = Executors.newFixedThreadPool(numConsumers);
entityReaderExecutor = Executors.newCachedThreadPool();
CountDownLatch producerLatch = new CountDownLatch(numProducers);
producerExecutor = Executors.newFixedThreadPool(numProducers);
try {
for (String entityType : jobData.getEntities()) {
producerExecutor.submit(
() -> {
try {
reCreateIndexes(entityType);
processEntityType(entityType);
} catch (Exception e) {
LOG.error("Error processing entity type {}", entityType, e);
} finally {
producerLatch.countDown();
}
});
}
for (int i = 0; i < numConsumers; i++) {
consumerExecutor.submit(
() -> {
try {
consumeTasks(jobExecutionContext);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Consumer thread interrupted.");
}
});
}
producerLatch.await();
sendPoisonPills(numConsumers);
processEntityReindex(jobExecutionContext);
} catch (Exception e) {
LOG.error("Error during reindexing process.", e);
throw e;
} finally {
shutdownExecutor(producerExecutor, "ProducerExecutor", 1, TimeUnit.HOURS);
shutdownExecutor(producerExecutor, "ReaderExecutor", 1, TimeUnit.MINUTES);
shutdownExecutor(consumerExecutor, "ConsumerExecutor", 20, TimeUnit.SECONDS);
shutdownExecutor(entityReaderExecutor, "ReaderExecutor", 20, TimeUnit.SECONDS);
}
}
private int calculateNumberOfConsumers() {
return Math.min(Runtime.getRuntime().availableProcessors(), MAX_CONSUMERS);
private void processEntityReindex(JobExecutionContext jobExecutionContext)
throws InterruptedException {
int numConsumers = jobData.getConsumerThreads();
CountDownLatch producerLatch = new CountDownLatch(getTotalLatchCount(jobData.getEntities()));
for (String entityType : jobData.getEntities()) {
try {
reCreateIndexes(entityType);
int totalEntityRecords = getTotalEntityRecords(entityType);
Source<?> source = createSource(entityType);
int noOfThreads = calculateNumberOfThreads(totalEntityRecords);
if (totalEntityRecords > 0) {
for (int i = 0; i < noOfThreads; i++) {
int currentOffset = i * batchSize.get();
producerExecutor.submit(
() -> {
try {
processReadTask(entityType, source, currentOffset);
} catch (Exception e) {
LOG.error("Error processing entity type {}", entityType, e);
} finally {
producerLatch.countDown();
}
});
}
}
} catch (Exception e) {
LOG.error("Error processing entity type {}", entityType, e);
}
}
for (int i = 0; i < numConsumers; i++) {
consumerExecutor.submit(
() -> {
try {
consumeTasks(jobExecutionContext);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Consumer thread interrupted.");
}
});
}
producerLatch.await();
sendPoisonPills(numConsumers);
}
private void consumeTasks(JobExecutionContext jobExecutionContext) throws InterruptedException {
@ -548,29 +550,42 @@ public class SearchIndexApp extends AbstractNativeApplication {
private void processTask(IndexingTask<?> task, JobExecutionContext jobExecutionContext) {
String entityType = task.entityType();
List<?> entities = task.entities();
ResultList<?> entities = task.entities();
Map<String, Object> contextData = new HashMap<>();
contextData.put(ENTITY_TYPE_KEY, entityType);
try {
if (!TIME_SERIES_ENTITIES.contains(entityType)) {
@SuppressWarnings("unchecked")
List<EntityInterface> entityList = (List<EntityInterface>) entities;
List<EntityInterface> entityList = (List<EntityInterface>) entities.getData();
searchIndexSink.write(entityList, contextData);
} else {
@SuppressWarnings("unchecked")
List<EntityTimeSeriesInterface> entityList = (List<EntityTimeSeriesInterface>) entities;
List<EntityTimeSeriesInterface> entityList =
(List<EntityTimeSeriesInterface>) entities.getData();
searchIndexSink.write(entityList, contextData);
}
// After successful write, create a new StepStats for the current batch
StepStats currentEntityStats = new StepStats();
currentEntityStats.setSuccessRecords(entities.size());
currentEntityStats.setFailedRecords(0);
currentEntityStats.setSuccessRecords(entities.getData().size());
currentEntityStats.setFailedRecords(entities.getErrors().size());
// Do NOT set Total Records here
// Update statistics in a thread-safe manner
synchronized (jobDataLock) {
if (!entities.getErrors().isEmpty()) {
jobData.setStatus(EventPublisherJob.Status.ACTIVE_ERROR);
jobData.setFailure(
new IndexingError()
.withErrorSource(IndexingError.ErrorSource.READER)
.withSubmittedCount(batchSize.get())
.withSuccessCount(entities.getData().size())
.withFailedCount(entities.getErrors().size())
.withMessage(
"Issues in Reading A Batch For Entities. Check Errors Corresponding to Entities.")
.withFailedEntities(entities.getErrors()));
}
updateStats(entityType, currentEntityStats);
}
@ -584,7 +599,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
StepStats failedEntityStats = new StepStats();
failedEntityStats.setSuccessRecords(0);
failedEntityStats.setFailedRecords(entities.size());
failedEntityStats.setFailedRecords(entities.getData().size());
updateStats(entityType, failedEntityStats);
}
LOG.error("Unexpected error during processing task for entity {}", entityType, e);
@ -601,14 +616,14 @@ public class SearchIndexApp extends AbstractNativeApplication {
if (!TIME_SERIES_ENTITIES.contains(entityType)) {
PaginatedEntitiesSource paginatedSource =
new PaginatedEntitiesSource(entityType, batchSize.get(), fields);
if (!CommonUtil.nullOrEmpty(jobData.getAfterCursor())) {
if (!nullOrEmpty(jobData.getAfterCursor())) {
paginatedSource.getCursor().set(jobData.getAfterCursor());
}
source = paginatedSource;
} else {
PaginatedEntityTimeSeriesSource paginatedSource =
new PaginatedEntityTimeSeriesSource(entityType, batchSize.get(), fields);
if (!CommonUtil.nullOrEmpty(jobData.getAfterCursor())) {
if (!nullOrEmpty(jobData.getAfterCursor())) {
paginatedSource.getCursor().set(jobData.getAfterCursor());
}
source = paginatedSource;
@ -617,17 +632,14 @@ public class SearchIndexApp extends AbstractNativeApplication {
return source;
}
private void processEntityType(String entityType)
throws InterruptedException, ExecutionException {
int totalEntityRecords = getTotalEntityRecords(entityType);
if (totalEntityRecords > 0) {
Source<?> source = createSource(entityType);
int loadPerThread = calculateLoadPerThread(totalEntityRecords);
List<Future<?>> futures = submitReaderTasks(entityType, source, loadPerThread);
for (Future<?> future : futures) {
future.get();
}
private int getTotalLatchCount(Set<String> entities) {
int totalCount = 0;
for (String entityType : entities) {
int totalEntityRecords = getTotalEntityRecords(entityType);
int noOfThreads = calculateNumberOfThreads(totalEntityRecords);
totalCount += noOfThreads;
}
return totalCount;
}
private int getTotalEntityRecords(String entityType) {
@ -636,24 +648,12 @@ public class SearchIndexApp extends AbstractNativeApplication {
.getTotalRecords();
}
private List<Future<?>> submitReaderTasks(
String entityType, Source<?> source, int loadPerThread) {
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < loadPerThread; i++) {
int currentOffset = i * batchSize.get();
Future<?> future =
entityReaderExecutor.submit(() -> processReadTask(entityType, source, currentOffset));
futures.add(future);
}
return futures;
}
private void processReadTask(String entityType, Source<?> source, int offset) {
try {
Object resultList = source.readWithCursor(RestUtil.encodeCursor(String.valueOf(offset)));
if (resultList != null) {
List<?> entities = extractEntities(entityType, resultList);
if (entities != null && !entities.isEmpty()) {
ResultList<?> entities = extractEntities(entityType, resultList);
if (!nullOrEmpty(entities.getData())) {
LOG.info(
"Creating Indexing Task for entityType: {}, current offset: {}", entityType, offset);
createIndexingTask(entityType, entities, offset);
@ -664,16 +664,29 @@ public class SearchIndexApp extends AbstractNativeApplication {
LOG.warn("Reader thread interrupted for entityType: {}", entityType);
} catch (SearchIndexException e) {
LOG.error("Error while reading source for entityType: {}", entityType, e);
synchronized (jobDataLock) {
jobData.setStatus(EventPublisherJob.Status.ACTIVE_ERROR);
jobData.setFailure(e.getIndexingError());
int remainingRecords = getRemainingRecordsToProcess(entityType);
if (remainingRecords - batchSize.get() <= 0) {
updateStats(
entityType,
new StepStats().withSuccessRecords(0).withFailedRecords(remainingRecords));
} else {
updateStats(
entityType, new StepStats().withSuccessRecords(0).withFailedRecords(batchSize.get()));
}
}
}
}
private void createIndexingTask(String entityType, List<?> entities, int offset)
private void createIndexingTask(String entityType, ResultList<?> entities, int offset)
throws InterruptedException {
IndexingTask<?> task = new IndexingTask<>(entityType, entities, offset);
taskQueue.put(task);
}
private synchronized int calculateLoadPerThread(int totalEntityRecords) {
private synchronized int calculateNumberOfThreads(int totalEntityRecords) {
int mod = totalEntityRecords % batchSize.get();
if (mod == 0) {
return totalEntityRecords / batchSize.get();
@ -683,16 +696,26 @@ public class SearchIndexApp extends AbstractNativeApplication {
}
@SuppressWarnings("unchecked")
private List<?> extractEntities(String entityType, Object resultList) {
private ResultList<?> extractEntities(String entityType, Object resultList) {
if (!TIME_SERIES_ENTITIES.contains(entityType)) {
return ((ResultList<? extends EntityInterface>) resultList).getData();
return ((ResultList<? extends EntityInterface>) resultList);
} else {
return ((ResultList<? extends EntityTimeSeriesInterface>) resultList).getData();
return ((ResultList<? extends EntityTimeSeriesInterface>) resultList);
}
}
private record IndexingTask<T>(String entityType, List<T> entities, int currentEntityOffset) {
private synchronized int getRemainingRecordsToProcess(String entityType) {
StepStats entityStats =
((StepStats)
searchIndexStats.get().getEntityStats().getAdditionalProperties().get(entityType));
return entityStats.getTotalRecords()
- entityStats.getFailedRecords()
- entityStats.getSuccessRecords();
}
private record IndexingTask<T>(
String entityType, ResultList<T> entities, int currentEntityOffset) {
public static final IndexingTask<?> POISON_PILL =
new IndexingTask<>(null, Collections.emptyList(), -1);
new IndexingTask<>(null, new ResultList<>(), -1);
}
}

View File

@ -278,10 +278,15 @@ public class OpenMetadataOperations implements Callable<Integer> {
description = "Flag to determine if indexes should be recreated.")
boolean recreateIndexes,
@Option(
names = {"--num-threads"},
names = {"--producer-threads"},
defaultValue = "10",
description = "Number of threads to use for processing.")
int numThreads,
int producerThreads,
@Option(
names = {"--consumer-threads"},
defaultValue = "10",
description = "Number of threads to use for processing.")
int consumerThreads,
@Option(
names = {"--back-off"},
defaultValue = "1000",
@ -320,7 +325,8 @@ public class OpenMetadataOperations implements Callable<Integer> {
batchSize,
payloadSize,
recreateIndexes,
numThreads,
producerThreads,
consumerThreads,
backOff,
maxBackOff,
maxRequests,
@ -336,7 +342,8 @@ public class OpenMetadataOperations implements Callable<Integer> {
int batchSize,
long payloadSize,
boolean recreateIndexes,
int numThreads,
int producerThreads,
int consumerThreads,
int backOff,
int maxBackOff,
int maxRequests,
@ -354,7 +361,8 @@ public class OpenMetadataOperations implements Callable<Integer> {
.withBatchSize(batchSize)
.withPayLoadSize(payloadSize)
.withRecreateIndex(recreateIndexes)
.withNumberOfThreads(numThreads)
.withProducerThreads(producerThreads)
.withConsumerThreads(consumerThreads)
.withInitialBackoff(backOff)
.withMaxBackoff(maxBackOff)
.withMaxConcurrentRequests(maxRequests)

View File

@ -46,7 +46,8 @@
"recreateIndex": false,
"batchSize": "100",
"payLoadSize": 104857600,
"numberOfThreads": 5,
"producerThreads": 10,
"consumerThreads": 10,
"maxConcurrentRequests": 100,
"maxRetries": 3,
"initialBackoff": 1000,

View File

@ -60,6 +60,12 @@
"recreateIndex": false,
"batchSize": "100",
"payLoadSize": 104857600,
"producerThreads": 10,
"consumerThreads": 10,
"maxConcurrentRequests": 100,
"maxRetries": 3,
"initialBackoff": 1000,
"maxBackoff": 10000,
"searchIndexMappingLanguage": "EN"
}
}

View File

@ -44,11 +44,17 @@
"existingJavaType": "java.lang.Long",
"default": 104857600
},
"numberOfThreads": {
"title": "Number of Threads",
"producerThreads": {
"title": "Number of Producer Threads",
"description": "Number of threads to use for reindexing",
"type": "integer",
"default": 5
"default": 10
},
"consumerThreads": {
"title": "Number of Consumer Threads",
"description": "Number of threads to use for reindexing",
"type": "integer",
"default": 10
},
"maxConcurrentRequests": {
"title": "Max Concurrent Requests",
@ -80,4 +86,4 @@
}
},
"additionalProperties": false
}
}

View File

@ -108,11 +108,17 @@
"existingJavaType": "java.lang.Long",
"default": 104857600
},
"numberOfThreads": {
"title": "Number of Threads",
"description": "Number of threads to use for reindexing",
"producerThreads": {
"title": "Number of Producer Threads to use",
"description": "Number of producer threads to use for reindexing",
"type": "integer",
"default": 5
"default": 10
},
"consumerThreads": {
"title": "Number of Consumer Threads to use",
"description": "Number of consumer threads to use for reindexing",
"type": "integer",
"default": 10
},
"maxConcurrentRequests": {
"title": "Max Concurrent Requests",

View File

@ -16,8 +16,14 @@
"type": "integer",
"default": 104857600
},
"numberOfThreads": {
"title": "Number of Threads",
"producerThreads": {
"title": "Number of Producer Threads",
"description": "Number of threads to use for reindexing",
"type": "integer",
"default": 5
},
"consumerThreads": {
"title": "Number of Consumer Threads",
"description": "Number of threads to use for reindexing",
"type": "integer",
"default": 5