Add insert for search reindex rather than update (#22382)

* Add insert for search reindex rather than update

* Fix java checkstyle
This commit is contained in:
Sriharsha Chintalapani 2025-07-21 01:54:09 -07:00 committed by GitHub
parent 0c2eb140fc
commit dd805f844e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 275 additions and 18 deletions

View File

@ -107,7 +107,17 @@ public class ElasticSearchBulkSink implements BulkSink {
for (var item : response.getItems()) {
if (item.isFailed()) {
failures++;
LOG.warn("Failed to index document: {}", item.getFailureMessage());
String failureMessage = item.getFailureMessage();
// Log document_missing_exception differently as it indicates a race condition
if (failureMessage != null
&& failureMessage.contains("document_missing_exception")) {
LOG.warn(
"Document missing error for {}: {} - This may occur during concurrent reindexing",
item.getId(),
failureMessage);
} else {
LOG.warn("Failed to index document {}: {}", item.getId(), failureMessage);
}
}
}
int successes = numberOfActions - failures;
@ -179,6 +189,8 @@ public class ElasticSearchBulkSink implements BulkSink {
throw new IllegalArgumentException("Entity type is required in context data");
}
Boolean recreateIndex = (Boolean) contextData.getOrDefault("recreateIndex", false);
IndexMapping indexMapping = searchRepository.getIndexMapping(entityType);
if (indexMapping == null) {
LOG.debug("No index mapping found for entityType '{}'. Skipping indexing.", entityType);
@ -196,7 +208,7 @@ public class ElasticSearchBulkSink implements BulkSink {
} else {
List<EntityInterface> entityInterfaces = (List<EntityInterface>) entities;
for (EntityInterface entity : entityInterfaces) {
addEntity(entity, indexName);
addEntity(entity, indexName, recreateIndex);
}
}
} catch (Exception e) {
@ -215,19 +227,25 @@ public class ElasticSearchBulkSink implements BulkSink {
}
}
private void addEntity(EntityInterface entity, String indexName) {
private void addEntity(EntityInterface entity, String indexName, boolean recreateIndex) {
// Build the search index document using the proper transformation
String entityType = Entity.getEntityTypeFromObject(entity);
Object searchIndexDoc = Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc();
String json = JsonUtils.pojoToJson(searchIndexDoc);
if (recreateIndex) {
// Use IndexRequest for fresh indexing to avoid document_missing_exception
IndexRequest indexRequest =
new IndexRequest(indexName).id(entity.getId().toString()).source(json, XContentType.JSON);
bulkProcessor.add(indexRequest);
} else {
// Use UpdateRequest with upsert for regular updates
UpdateRequest updateRequest = new UpdateRequest(indexName, entity.getId().toString());
updateRequest.doc(json, XContentType.JSON);
updateRequest.docAsUpsert(true);
// Add to bulk processor - it handles everything including size limits
bulkProcessor.add(updateRequest);
}
}
private void addTimeSeriesEntity(EntityTimeSeriesInterface entity, String indexName) {
String json = JsonUtils.pojoToJson(entity);

View File

@ -107,7 +107,17 @@ public class OpenSearchBulkSink implements BulkSink {
for (var item : response.getItems()) {
if (item.isFailed()) {
failures++;
LOG.warn("Failed to index document: {}", item.getFailureMessage());
String failureMessage = item.getFailureMessage();
// Log document_missing_exception differently as it indicates a race condition
if (failureMessage != null
&& failureMessage.contains("document_missing_exception")) {
LOG.warn(
"Document missing error for {}: {} - This may occur during concurrent reindexing",
item.getId(),
failureMessage);
} else {
LOG.warn("Failed to index document {}: {}", item.getId(), failureMessage);
}
}
}
int successes = numberOfActions - failures;
@ -179,6 +189,8 @@ public class OpenSearchBulkSink implements BulkSink {
throw new IllegalArgumentException("Entity type is required in context data");
}
Boolean recreateIndex = (Boolean) contextData.getOrDefault("recreateIndex", false);
IndexMapping indexMapping = searchRepository.getIndexMapping(entityType);
if (indexMapping == null) {
LOG.debug("No index mapping found for entityType '{}'. Skipping indexing.", entityType);
@ -196,7 +208,7 @@ public class OpenSearchBulkSink implements BulkSink {
} else {
List<EntityInterface> entityInterfaces = (List<EntityInterface>) entities;
for (EntityInterface entity : entityInterfaces) {
addEntity(entity, indexName);
addEntity(entity, indexName, recreateIndex);
}
}
} catch (Exception e) {
@ -215,19 +227,25 @@ public class OpenSearchBulkSink implements BulkSink {
}
}
private void addEntity(EntityInterface entity, String indexName) {
private void addEntity(EntityInterface entity, String indexName, boolean recreateIndex) {
// Build the search index document using the proper transformation
String entityType = Entity.getEntityTypeFromObject(entity);
Object searchIndexDoc = Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc();
String json = JsonUtils.pojoToJson(searchIndexDoc);
if (recreateIndex) {
// Use IndexRequest for fresh indexing to avoid document_missing_exception
IndexRequest indexRequest =
new IndexRequest(indexName).id(entity.getId().toString()).source(json, XContentType.JSON);
bulkProcessor.add(indexRequest);
} else {
// Use UpdateRequest with upsert for regular updates
UpdateRequest updateRequest = new UpdateRequest(indexName, entity.getId().toString());
updateRequest.doc(json, XContentType.JSON);
updateRequest.docAsUpsert(true);
// Add to bulk processor - it handles everything including size limits
bulkProcessor.add(updateRequest);
}
}
private void addTimeSeriesEntity(EntityTimeSeriesInterface entity, String indexName) {
String json = JsonUtils.pojoToJson(entity);

View File

@ -1050,6 +1050,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
ResultList<?> entities = task.entities();
Map<String, Object> contextData = new HashMap<>();
contextData.put(ENTITY_TYPE_KEY, entityType);
contextData.put("recreateIndex", jobData.getRecreateIndex());
try {
if (!TIME_SERIES_ENTITIES.contains(entityType)) {

View File

@ -0,0 +1,83 @@
package org.openmetadata.service.apps.bundles.searchIndex;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.lenient;
import es.org.elasticsearch.client.RestHighLevelClient;
import java.util.HashMap;
import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.search.IndexMapping;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.search.elasticsearch.ElasticSearchClient;
@ExtendWith(MockitoExtension.class)
class ElasticSearchBulkSinkSimpleTest {
@Mock private SearchRepository searchRepository;
@Mock private ElasticSearchClient searchClient;
@Mock private RestHighLevelClient restHighLevelClient;
@Mock private IndexMapping indexMapping;
private ElasticSearchBulkSink elasticSearchBulkSink;
@BeforeEach
void setUp() {
lenient().when(searchRepository.getSearchClient()).thenReturn(searchClient);
lenient().when(searchClient.getClient()).thenReturn(restHighLevelClient);
lenient().when(searchRepository.getClusterAlias()).thenReturn("default");
lenient().when(indexMapping.getIndexName("default")).thenReturn("test_index");
lenient().when(searchRepository.getIndexMapping("table")).thenReturn(indexMapping);
elasticSearchBulkSink = new ElasticSearchBulkSink(searchRepository, 10, 2, 1000000L);
}
@Test
void testSinkCreation() {
assertNotNull(elasticSearchBulkSink);
assertEquals(10, elasticSearchBulkSink.getBatchSize());
assertEquals(2, elasticSearchBulkSink.getConcurrentRequests());
}
@Test
void testGetStats() {
StepStats stats = elasticSearchBulkSink.getStats();
assertNotNull(stats);
assertEquals(0, stats.getTotalRecords());
assertEquals(0, stats.getSuccessRecords());
assertEquals(0, stats.getFailedRecords());
}
@Test
void testUpdateConfiguration() {
elasticSearchBulkSink.updateBatchSize(20);
assertEquals(20, elasticSearchBulkSink.getBatchSize());
elasticSearchBulkSink.updateConcurrentRequests(5);
assertEquals(5, elasticSearchBulkSink.getConcurrentRequests());
}
@Test
void testContextDataHandling() {
Map<String, Object> contextData = new HashMap<>();
contextData.put("entityType", "table");
contextData.put("recreateIndex", true);
Boolean recreateIndex = (Boolean) contextData.getOrDefault("recreateIndex", false);
assertEquals(true, recreateIndex);
contextData.put("recreateIndex", false);
recreateIndex = (Boolean) contextData.getOrDefault("recreateIndex", false);
assertEquals(false, recreateIndex);
contextData.remove("recreateIndex");
recreateIndex = (Boolean) contextData.getOrDefault("recreateIndex", false);
assertEquals(false, recreateIndex);
}
}

View File

@ -0,0 +1,84 @@
package org.openmetadata.service.apps.bundles.searchIndex;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.lenient;
import java.util.HashMap;
import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.search.IndexMapping;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.search.opensearch.OpenSearchClient;
import os.org.opensearch.client.RestHighLevelClient;
@ExtendWith(MockitoExtension.class)
class OpenSearchBulkSinkSimpleTest {
@Mock private SearchRepository searchRepository;
@Mock private OpenSearchClient searchClient;
@Mock private RestHighLevelClient restHighLevelClient;
@Mock private IndexMapping indexMapping;
private OpenSearchBulkSink openSearchBulkSink;
@BeforeEach
void setUp() {
lenient().when(searchRepository.getSearchClient()).thenReturn(searchClient);
lenient().when(searchClient.getClient()).thenReturn(restHighLevelClient);
lenient().when(searchRepository.getClusterAlias()).thenReturn("default");
lenient().when(indexMapping.getIndexName("default")).thenReturn("test_index");
lenient().when(searchRepository.getIndexMapping("table")).thenReturn(indexMapping);
// Create the sink
openSearchBulkSink = new OpenSearchBulkSink(searchRepository, 10, 2, 1000000L);
}
@Test
void testSinkCreation() {
assertNotNull(openSearchBulkSink);
assertEquals(10, openSearchBulkSink.getBatchSize());
assertEquals(2, openSearchBulkSink.getConcurrentRequests());
}
@Test
void testGetStats() {
StepStats stats = openSearchBulkSink.getStats();
assertNotNull(stats);
assertEquals(0, stats.getTotalRecords());
assertEquals(0, stats.getSuccessRecords());
assertEquals(0, stats.getFailedRecords());
}
@Test
void testUpdateConfiguration() {
openSearchBulkSink.updateBatchSize(20);
assertEquals(20, openSearchBulkSink.getBatchSize());
openSearchBulkSink.updateConcurrentRequests(5);
assertEquals(5, openSearchBulkSink.getConcurrentRequests());
}
@Test
void testContextDataHandling() {
Map<String, Object> contextData = new HashMap<>();
contextData.put("entityType", "table");
contextData.put("recreateIndex", true);
Boolean recreateIndex = (Boolean) contextData.getOrDefault("recreateIndex", false);
assertEquals(true, recreateIndex);
contextData.put("recreateIndex", false);
recreateIndex = (Boolean) contextData.getOrDefault("recreateIndex", false);
assertEquals(false, recreateIndex);
contextData.remove("recreateIndex");
recreateIndex = (Boolean) contextData.getOrDefault("recreateIndex", false);
assertEquals(false, recreateIndex);
}
}

View File

@ -27,6 +27,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
@ -154,6 +155,7 @@ public class SearchIndexAppTest extends OpenMetadataApplicationTest {
Map<String, Object> contextData = new HashMap<>();
contextData.put("entityType", "table");
contextData.put("recreateIndex", false);
lenient().doNothing().when(mockSink).write(eq(entities), eq(contextData));
@ -209,6 +211,7 @@ public class SearchIndexAppTest extends OpenMetadataApplicationTest {
Map<String, Object> contextData = new HashMap<>();
contextData.put("entityType", "table");
contextData.put("recreateIndex", false);
lenient().doThrow(searchIndexException).when(mockSink).write(eq(entities), eq(contextData));
@ -256,6 +259,7 @@ public class SearchIndexAppTest extends OpenMetadataApplicationTest {
Map<String, Object> contextData = new HashMap<>();
contextData.put("entityType", "user");
contextData.put("recreateIndex", false);
lenient().doNothing().when(mockSink).write(eq(entities), eq(contextData));
@ -466,6 +470,55 @@ public class SearchIndexAppTest extends OpenMetadataApplicationTest {
assertEquals(expectedFailures, tableStats.getFailedRecords());
}
@Test
void testProcessingWithRecreateIndexTrue() throws Exception {
// Create job data with recreateIndex = true
EventPublisherJob recreateIndexJobData =
new EventPublisherJob()
.withEntities(Set.of("table"))
.withBatchSize(5)
.withPayLoadSize(1000000L)
.withMaxConcurrentRequests(10)
.withRecreateIndex(true) // Set recreateIndex to true
.withStats(new Stats());
App testApp =
new App()
.withName("SearchIndexingApplication")
.withAppConfiguration(JsonUtils.convertValue(recreateIndexJobData, Object.class));
searchIndexApp.init(testApp);
injectMockSink();
EntityInterface mockEntity = mock(EntityInterface.class);
lenient().when(mockEntity.getId()).thenReturn(UUID.randomUUID());
List<EntityInterface> entities = List.of(mockEntity, mockEntity);
ResultList<EntityInterface> resultList = new ResultList<>(entities, null, null, 2);
// Capture the context data passed to the sink
ArgumentCaptor<Map<String, Object>> contextCaptor = ArgumentCaptor.forClass(Map.class);
lenient().doNothing().when(mockSink).write(eq(entities), contextCaptor.capture());
SearchIndexApp.IndexingTask<EntityInterface> task =
new SearchIndexApp.IndexingTask<>("table", resultList, 0);
assertDoesNotThrow(
() -> {
var method =
SearchIndexApp.class.getDeclaredMethod(
"processTask", SearchIndexApp.IndexingTask.class, JobExecutionContext.class);
method.setAccessible(true);
method.invoke(searchIndexApp, task, jobExecutionContext);
});
// Verify that recreateIndex was passed in context data
Map<String, Object> capturedContext = contextCaptor.getValue();
assertNotNull(capturedContext);
assertEquals("table", capturedContext.get("entityType"));
assertEquals(true, capturedContext.get("recreateIndex"));
}
@Test
void testAutoTuneConfiguration() {
EventPublisherJob autoTuneJobData =