diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSink.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSink.java index a454ca1216c..dc93f11b894 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSink.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSink.java @@ -107,7 +107,17 @@ public class ElasticSearchBulkSink implements BulkSink { for (var item : response.getItems()) { if (item.isFailed()) { failures++; - LOG.warn("Failed to index document: {}", item.getFailureMessage()); + String failureMessage = item.getFailureMessage(); + // Log document_missing_exception differently as it indicates a race condition + if (failureMessage != null + && failureMessage.contains("document_missing_exception")) { + LOG.warn( + "Document missing error for {}: {} - This may occur during concurrent reindexing", + item.getId(), + failureMessage); + } else { + LOG.warn("Failed to index document {}: {}", item.getId(), failureMessage); + } } } int successes = numberOfActions - failures; @@ -179,6 +189,8 @@ public class ElasticSearchBulkSink implements BulkSink { throw new IllegalArgumentException("Entity type is required in context data"); } + Boolean recreateIndex = (Boolean) contextData.getOrDefault("recreateIndex", false); + IndexMapping indexMapping = searchRepository.getIndexMapping(entityType); if (indexMapping == null) { LOG.debug("No index mapping found for entityType '{}'. Skipping indexing.", entityType); @@ -196,7 +208,7 @@ public class ElasticSearchBulkSink implements BulkSink { } else { List entityInterfaces = (List) entities; for (EntityInterface entity : entityInterfaces) { - addEntity(entity, indexName); + addEntity(entity, indexName, recreateIndex); } } } catch (Exception e) { @@ -215,18 +227,24 @@ public class ElasticSearchBulkSink implements BulkSink { } } - private void addEntity(EntityInterface entity, String indexName) { + private void addEntity(EntityInterface entity, String indexName, boolean recreateIndex) { // Build the search index document using the proper transformation String entityType = Entity.getEntityTypeFromObject(entity); Object searchIndexDoc = Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc(); String json = JsonUtils.pojoToJson(searchIndexDoc); - UpdateRequest updateRequest = new UpdateRequest(indexName, entity.getId().toString()); - updateRequest.doc(json, XContentType.JSON); - updateRequest.docAsUpsert(true); - - // Add to bulk processor - it handles everything including size limits - bulkProcessor.add(updateRequest); + if (recreateIndex) { + // Use IndexRequest for fresh indexing to avoid document_missing_exception + IndexRequest indexRequest = + new IndexRequest(indexName).id(entity.getId().toString()).source(json, XContentType.JSON); + bulkProcessor.add(indexRequest); + } else { + // Use UpdateRequest with upsert for regular updates + UpdateRequest updateRequest = new UpdateRequest(indexName, entity.getId().toString()); + updateRequest.doc(json, XContentType.JSON); + updateRequest.docAsUpsert(true); + bulkProcessor.add(updateRequest); + } } private void addTimeSeriesEntity(EntityTimeSeriesInterface entity, String indexName) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSink.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSink.java index 26c55ae1908..dd554db92e0 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSink.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSink.java @@ -107,7 +107,17 @@ public class OpenSearchBulkSink implements BulkSink { for (var item : response.getItems()) { if (item.isFailed()) { failures++; - LOG.warn("Failed to index document: {}", item.getFailureMessage()); + String failureMessage = item.getFailureMessage(); + // Log document_missing_exception differently as it indicates a race condition + if (failureMessage != null + && failureMessage.contains("document_missing_exception")) { + LOG.warn( + "Document missing error for {}: {} - This may occur during concurrent reindexing", + item.getId(), + failureMessage); + } else { + LOG.warn("Failed to index document {}: {}", item.getId(), failureMessage); + } } } int successes = numberOfActions - failures; @@ -179,6 +189,8 @@ public class OpenSearchBulkSink implements BulkSink { throw new IllegalArgumentException("Entity type is required in context data"); } + Boolean recreateIndex = (Boolean) contextData.getOrDefault("recreateIndex", false); + IndexMapping indexMapping = searchRepository.getIndexMapping(entityType); if (indexMapping == null) { LOG.debug("No index mapping found for entityType '{}'. Skipping indexing.", entityType); @@ -196,7 +208,7 @@ public class OpenSearchBulkSink implements BulkSink { } else { List entityInterfaces = (List) entities; for (EntityInterface entity : entityInterfaces) { - addEntity(entity, indexName); + addEntity(entity, indexName, recreateIndex); } } } catch (Exception e) { @@ -215,18 +227,24 @@ public class OpenSearchBulkSink implements BulkSink { } } - private void addEntity(EntityInterface entity, String indexName) { + private void addEntity(EntityInterface entity, String indexName, boolean recreateIndex) { // Build the search index document using the proper transformation String entityType = Entity.getEntityTypeFromObject(entity); Object searchIndexDoc = Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc(); String json = JsonUtils.pojoToJson(searchIndexDoc); - UpdateRequest updateRequest = new UpdateRequest(indexName, entity.getId().toString()); - updateRequest.doc(json, XContentType.JSON); - updateRequest.docAsUpsert(true); - - // Add to bulk processor - it handles everything including size limits - bulkProcessor.add(updateRequest); + if (recreateIndex) { + // Use IndexRequest for fresh indexing to avoid document_missing_exception + IndexRequest indexRequest = + new IndexRequest(indexName).id(entity.getId().toString()).source(json, XContentType.JSON); + bulkProcessor.add(indexRequest); + } else { + // Use UpdateRequest with upsert for regular updates + UpdateRequest updateRequest = new UpdateRequest(indexName, entity.getId().toString()); + updateRequest.doc(json, XContentType.JSON); + updateRequest.docAsUpsert(true); + bulkProcessor.add(updateRequest); + } } private void addTimeSeriesEntity(EntityTimeSeriesInterface entity, String indexName) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java index 676cd8fba9c..7b5b5819a8f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java @@ -1050,6 +1050,7 @@ public class SearchIndexApp extends AbstractNativeApplication { ResultList entities = task.entities(); Map contextData = new HashMap<>(); contextData.put(ENTITY_TYPE_KEY, entityType); + contextData.put("recreateIndex", jobData.getRecreateIndex()); try { if (!TIME_SERIES_ENTITIES.contains(entityType)) { diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSinkSimpleTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSinkSimpleTest.java new file mode 100644 index 00000000000..758f0e35e52 --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSinkSimpleTest.java @@ -0,0 +1,83 @@ +package org.openmetadata.service.apps.bundles.searchIndex; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.Mockito.lenient; + +import es.org.elasticsearch.client.RestHighLevelClient; +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.openmetadata.schema.system.StepStats; +import org.openmetadata.search.IndexMapping; +import org.openmetadata.service.search.SearchRepository; +import org.openmetadata.service.search.elasticsearch.ElasticSearchClient; + +@ExtendWith(MockitoExtension.class) +class ElasticSearchBulkSinkSimpleTest { + + @Mock private SearchRepository searchRepository; + @Mock private ElasticSearchClient searchClient; + @Mock private RestHighLevelClient restHighLevelClient; + @Mock private IndexMapping indexMapping; + + private ElasticSearchBulkSink elasticSearchBulkSink; + + @BeforeEach + void setUp() { + lenient().when(searchRepository.getSearchClient()).thenReturn(searchClient); + lenient().when(searchClient.getClient()).thenReturn(restHighLevelClient); + lenient().when(searchRepository.getClusterAlias()).thenReturn("default"); + lenient().when(indexMapping.getIndexName("default")).thenReturn("test_index"); + lenient().when(searchRepository.getIndexMapping("table")).thenReturn(indexMapping); + + elasticSearchBulkSink = new ElasticSearchBulkSink(searchRepository, 10, 2, 1000000L); + } + + @Test + void testSinkCreation() { + assertNotNull(elasticSearchBulkSink); + assertEquals(10, elasticSearchBulkSink.getBatchSize()); + assertEquals(2, elasticSearchBulkSink.getConcurrentRequests()); + } + + @Test + void testGetStats() { + StepStats stats = elasticSearchBulkSink.getStats(); + assertNotNull(stats); + assertEquals(0, stats.getTotalRecords()); + assertEquals(0, stats.getSuccessRecords()); + assertEquals(0, stats.getFailedRecords()); + } + + @Test + void testUpdateConfiguration() { + elasticSearchBulkSink.updateBatchSize(20); + assertEquals(20, elasticSearchBulkSink.getBatchSize()); + + elasticSearchBulkSink.updateConcurrentRequests(5); + assertEquals(5, elasticSearchBulkSink.getConcurrentRequests()); + } + + @Test + void testContextDataHandling() { + Map contextData = new HashMap<>(); + contextData.put("entityType", "table"); + contextData.put("recreateIndex", true); + + Boolean recreateIndex = (Boolean) contextData.getOrDefault("recreateIndex", false); + assertEquals(true, recreateIndex); + + contextData.put("recreateIndex", false); + recreateIndex = (Boolean) contextData.getOrDefault("recreateIndex", false); + assertEquals(false, recreateIndex); + + contextData.remove("recreateIndex"); + recreateIndex = (Boolean) contextData.getOrDefault("recreateIndex", false); + assertEquals(false, recreateIndex); + } +} diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSinkSimpleTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSinkSimpleTest.java new file mode 100644 index 00000000000..5a06ae708a5 --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSinkSimpleTest.java @@ -0,0 +1,84 @@ +package org.openmetadata.service.apps.bundles.searchIndex; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.Mockito.lenient; + +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.openmetadata.schema.system.StepStats; +import org.openmetadata.search.IndexMapping; +import org.openmetadata.service.search.SearchRepository; +import org.openmetadata.service.search.opensearch.OpenSearchClient; +import os.org.opensearch.client.RestHighLevelClient; + +@ExtendWith(MockitoExtension.class) +class OpenSearchBulkSinkSimpleTest { + + @Mock private SearchRepository searchRepository; + @Mock private OpenSearchClient searchClient; + @Mock private RestHighLevelClient restHighLevelClient; + @Mock private IndexMapping indexMapping; + + private OpenSearchBulkSink openSearchBulkSink; + + @BeforeEach + void setUp() { + lenient().when(searchRepository.getSearchClient()).thenReturn(searchClient); + lenient().when(searchClient.getClient()).thenReturn(restHighLevelClient); + lenient().when(searchRepository.getClusterAlias()).thenReturn("default"); + lenient().when(indexMapping.getIndexName("default")).thenReturn("test_index"); + lenient().when(searchRepository.getIndexMapping("table")).thenReturn(indexMapping); + + // Create the sink + openSearchBulkSink = new OpenSearchBulkSink(searchRepository, 10, 2, 1000000L); + } + + @Test + void testSinkCreation() { + assertNotNull(openSearchBulkSink); + assertEquals(10, openSearchBulkSink.getBatchSize()); + assertEquals(2, openSearchBulkSink.getConcurrentRequests()); + } + + @Test + void testGetStats() { + StepStats stats = openSearchBulkSink.getStats(); + assertNotNull(stats); + assertEquals(0, stats.getTotalRecords()); + assertEquals(0, stats.getSuccessRecords()); + assertEquals(0, stats.getFailedRecords()); + } + + @Test + void testUpdateConfiguration() { + openSearchBulkSink.updateBatchSize(20); + assertEquals(20, openSearchBulkSink.getBatchSize()); + + openSearchBulkSink.updateConcurrentRequests(5); + assertEquals(5, openSearchBulkSink.getConcurrentRequests()); + } + + @Test + void testContextDataHandling() { + Map contextData = new HashMap<>(); + contextData.put("entityType", "table"); + contextData.put("recreateIndex", true); + + Boolean recreateIndex = (Boolean) contextData.getOrDefault("recreateIndex", false); + assertEquals(true, recreateIndex); + + contextData.put("recreateIndex", false); + recreateIndex = (Boolean) contextData.getOrDefault("recreateIndex", false); + assertEquals(false, recreateIndex); + + contextData.remove("recreateIndex"); + recreateIndex = (Boolean) contextData.getOrDefault("recreateIndex", false); + assertEquals(false, recreateIndex); + } +} diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexAppTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexAppTest.java index 838128ab6c6..1d42471a59a 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexAppTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexAppTest.java @@ -27,6 +27,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; @@ -154,6 +155,7 @@ public class SearchIndexAppTest extends OpenMetadataApplicationTest { Map contextData = new HashMap<>(); contextData.put("entityType", "table"); + contextData.put("recreateIndex", false); lenient().doNothing().when(mockSink).write(eq(entities), eq(contextData)); @@ -209,6 +211,7 @@ public class SearchIndexAppTest extends OpenMetadataApplicationTest { Map contextData = new HashMap<>(); contextData.put("entityType", "table"); + contextData.put("recreateIndex", false); lenient().doThrow(searchIndexException).when(mockSink).write(eq(entities), eq(contextData)); @@ -256,6 +259,7 @@ public class SearchIndexAppTest extends OpenMetadataApplicationTest { Map contextData = new HashMap<>(); contextData.put("entityType", "user"); + contextData.put("recreateIndex", false); lenient().doNothing().when(mockSink).write(eq(entities), eq(contextData)); @@ -466,6 +470,55 @@ public class SearchIndexAppTest extends OpenMetadataApplicationTest { assertEquals(expectedFailures, tableStats.getFailedRecords()); } + @Test + void testProcessingWithRecreateIndexTrue() throws Exception { + // Create job data with recreateIndex = true + EventPublisherJob recreateIndexJobData = + new EventPublisherJob() + .withEntities(Set.of("table")) + .withBatchSize(5) + .withPayLoadSize(1000000L) + .withMaxConcurrentRequests(10) + .withRecreateIndex(true) // Set recreateIndex to true + .withStats(new Stats()); + + App testApp = + new App() + .withName("SearchIndexingApplication") + .withAppConfiguration(JsonUtils.convertValue(recreateIndexJobData, Object.class)); + searchIndexApp.init(testApp); + injectMockSink(); + + EntityInterface mockEntity = mock(EntityInterface.class); + lenient().when(mockEntity.getId()).thenReturn(UUID.randomUUID()); + + List entities = List.of(mockEntity, mockEntity); + ResultList resultList = new ResultList<>(entities, null, null, 2); + + // Capture the context data passed to the sink + ArgumentCaptor> contextCaptor = ArgumentCaptor.forClass(Map.class); + + lenient().doNothing().when(mockSink).write(eq(entities), contextCaptor.capture()); + + SearchIndexApp.IndexingTask task = + new SearchIndexApp.IndexingTask<>("table", resultList, 0); + + assertDoesNotThrow( + () -> { + var method = + SearchIndexApp.class.getDeclaredMethod( + "processTask", SearchIndexApp.IndexingTask.class, JobExecutionContext.class); + method.setAccessible(true); + method.invoke(searchIndexApp, task, jobExecutionContext); + }); + + // Verify that recreateIndex was passed in context data + Map capturedContext = contextCaptor.getValue(); + assertNotNull(capturedContext); + assertEquals("table", capturedContext.get("entityType")); + assertEquals(true, capturedContext.get("recreateIndex")); + } + @Test void testAutoTuneConfiguration() { EventPublisherJob autoTuneJobData =