diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java index d151c311ce3..72a24629b3d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java @@ -374,6 +374,21 @@ public class SearchIndexApp extends AbstractNativeApplication { return Optional.empty(); } + private void finalizeEntityIndex(String entityType, boolean success) { + if (recreateIndexHandler == null + || recreateContext == null + || !Boolean.TRUE.equals(jobData.getRecreateIndex())) { + return; + } + + try { + recreateIndexHandler.finalizeEntityReindex(recreateContext, entityType, success); + LOG.info("Finalized index for entity '{}' with success={}", entityType, success); + } catch (Exception ex) { + LOG.error("Failed to finalize index for entity '{}'", entityType, ex); + } + } + private void finalizeRecreateIndexes(boolean success) { if (recreateIndexHandler == null || recreateContext == null) { return; @@ -878,27 +893,52 @@ public class SearchIndexApp extends AbstractNativeApplication { int loadPerThread = calculateNumberOfThreads(totalEntityRecords); if (totalEntityRecords > 0) { - submitBatchTasks(entityType, loadPerThread, producerLatch); + // Create entity-specific latch to wait for all batches of this entity + CountDownLatch entityLatch = new CountDownLatch(loadPerThread); + submitBatchTasks(entityType, loadPerThread, producerLatch, entityLatch); + + // Wait for all batches of this entity to complete + try { + entityLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted while waiting for entity '{}' batches to complete", entityType); + finalizeEntityIndex(entityType, false); + return; + } } if (jobLogger != null) { jobLogger.markEntityCompleted(entityType); } + + // Finalize index for this entity immediately after all its batches complete + finalizeEntityIndex(entityType, true); } catch (Exception e) { LOG.error("Error processing entity type {}", entityType, e); + // Cleanup staged index on failure + finalizeEntityIndex(entityType, false); } } private void submitBatchTasks( - String entityType, int loadPerThread, CountDownLatch producerLatch) { + String entityType, + int loadPerThread, + CountDownLatch producerLatch, + CountDownLatch entityLatch) { for (int i = 0; i < loadPerThread; i++) { LOG.debug("Submitting virtual thread producer task for batch {}/{}", i + 1, loadPerThread); int currentOffset = i * batchSize.get(); - producerExecutor.submit(() -> processBatch(entityType, currentOffset, producerLatch)); + producerExecutor.submit( + () -> processBatch(entityType, currentOffset, producerLatch, entityLatch)); } } - private void processBatch(String entityType, int currentOffset, CountDownLatch producerLatch) { + private void processBatch( + String entityType, + int currentOffset, + CountDownLatch producerLatch, + CountDownLatch entityLatch) { try { if (shouldSkipBatch()) { return; @@ -917,6 +957,9 @@ public class SearchIndexApp extends AbstractNativeApplication { } finally { LOG.debug("Virtual thread completed batch, remaining: {}", producerLatch.getCount() - 1); producerLatch.countDown(); + if (entityLatch != null) { + entityLatch.countDown(); + } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java index 974036af36c..d2e26e4d198 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java @@ -87,6 +87,41 @@ public class DefaultRecreateHandler implements RecreateIndexHandler { return context; } + @Override + public void finalizeEntityReindex(ReindexContext context, String entityType, boolean success) { + if (context == null || entityType == null) { + return; + } + + // Check if already finalized + if (context.isFinalized(entityType)) { + LOG.debug("Entity '{}' already finalized, skipping", entityType); + return; + } + + String canonicalIndex = context.getCanonicalIndex(entityType).orElse(null); + String activeIndex = context.getOriginalIndex(entityType).orElse(null); + String stagedIndex = context.getStagedIndex(entityType).orElse(null); + + if (canonicalIndex == null || stagedIndex == null) { + LOG.debug("Skipping finalization for entity '{}' - missing index information", entityType); + return; + } + + SearchRepository searchRepository = Entity.getSearchRepository(); + SearchClient searchClient = searchRepository.getSearchClient(); + + if (success) { + promoteIndexForEntity( + context, entityType, canonicalIndex, activeIndex, stagedIndex, searchClient); + } else { + cleanupStagedIndexForEntity(entityType, stagedIndex, searchClient); + } + + // Mark as finalized + context.markFinalized(entityType); + } + @Override public void finalizeReindex(ReindexContext context, boolean success) { if (context == null || context.isEmpty()) { @@ -97,6 +132,13 @@ public class DefaultRecreateHandler implements RecreateIndexHandler { SearchClient searchClient = searchRepository.getSearchClient(); for (String entityType : context.getEntities()) { + // Skip if already finalized per-entity + if (context.isFinalized(entityType)) { + LOG.debug( + "Entity '{}' already finalized per-entity, skipping in batch finalization", entityType); + continue; + } + String canonicalIndex = context.getCanonicalIndex(entityType).orElse(null); String activeIndex = context.getOriginalIndex(entityType).orElse(null); String stagedIndex = context.getStagedIndex(entityType).orElse(null); @@ -106,86 +148,111 @@ public class DefaultRecreateHandler implements RecreateIndexHandler { } if (success) { - try { - Set aliasesToAttach = new HashSet<>(); - aliasesToAttach.addAll(context.getExistingAliases(entityType)); - context.getCanonicalAlias(entityType).ifPresent(aliasesToAttach::add); - aliasesToAttach.add(canonicalIndex); - List parentAliases = context.getParentAliases(entityType); - if (parentAliases != null) { - parentAliases.stream() - .filter(alias -> alias != null && !alias.isBlank()) - .forEach(aliasesToAttach::add); - } - aliasesToAttach.removeIf(alias -> alias == null || alias.isBlank()); - - for (String alias : aliasesToAttach) { - Set targets = searchClient.getIndicesByAlias(alias); - for (String target : targets) { - if (target.equals(stagedIndex)) { - continue; - } - - boolean belongsToEntity = - target.equals(canonicalIndex) || target.startsWith(canonicalIndex + "_rebuild_"); - - if (!belongsToEntity) { - LOG.debug( - "Skipping alias '{}' removal from index '{}' as it does not belong to entity '{}'.", - alias, - target, - entityType); - continue; - } - - searchClient.removeAliases(target, Set.of(alias)); - LOG.info( - "Removed alias '{}' from index '{}' during promotion for entity '{}'.", - alias, - target, - entityType); - - if (searchClient.indexExists(target)) { - searchClient.deleteIndex(target); - LOG.debug("Replaced old index '{}' for entity '{}'.", target, entityType); - } - } - } - - if (activeIndex != null && searchClient.indexExists(activeIndex)) { - searchClient.deleteIndex(activeIndex); - LOG.debug("Replaced old index '{}' for entity '{}'.", activeIndex, entityType); - } - - if (!aliasesToAttach.isEmpty()) { - searchClient.addAliases(stagedIndex, aliasesToAttach); - } - LOG.info( - "Promoted staged index '{}' to serve entity '{}' (aliases: {}).", - stagedIndex, - entityType, - aliasesToAttach); - } catch (Exception ex) { - LOG.error( - "Failed to promote staged index '{}' for entity '{}'.", stagedIndex, entityType, ex); - } + promoteIndexForEntity( + context, entityType, canonicalIndex, activeIndex, stagedIndex, searchClient); } else { - try { - if (searchClient.indexExists(stagedIndex)) { - searchClient.deleteIndex(stagedIndex); - LOG.info( - "Deleted staged index '{}' after unsuccessful reindex for entity '{}'.", - stagedIndex, - entityType); + cleanupStagedIndexForEntity(entityType, stagedIndex, searchClient); + } + + // Mark as finalized + context.markFinalized(entityType); + } + } + + private void promoteIndexForEntity( + ReindexContext context, + String entityType, + String canonicalIndex, + String activeIndex, + String stagedIndex, + SearchClient searchClient) { + try { + Set aliasesToAttach = new HashSet<>(); + aliasesToAttach.addAll(context.getExistingAliases(entityType)); + context.getCanonicalAlias(entityType).ifPresent(aliasesToAttach::add); + + // Add canonical index name as an alias so queries using the full index name still work + // But only if no index exists with that name + if (!searchClient.indexExists(canonicalIndex)) { + aliasesToAttach.add(canonicalIndex); + } + + List parentAliases = context.getParentAliases(entityType); + if (parentAliases != null) { + parentAliases.stream() + .filter(alias -> alias != null && !alias.isBlank()) + .forEach(aliasesToAttach::add); + } + aliasesToAttach.removeIf(alias -> alias == null || alias.isBlank()); + + for (String alias : aliasesToAttach) { + Set targets = searchClient.getIndicesByAlias(alias); + for (String target : targets) { + if (target.equals(stagedIndex)) { + continue; + } + + boolean belongsToEntity = + target.equals(canonicalIndex) || target.startsWith(canonicalIndex + "_rebuild_"); + + if (!belongsToEntity) { + LOG.debug( + "Skipping alias '{}' removal from index '{}' as it does not belong to entity '{}'.", + alias, + target, + entityType); + continue; + } + + searchClient.removeAliases(target, Set.of(alias)); + LOG.info( + "Removed alias '{}' from index '{}' during promotion for entity '{}'.", + alias, + target, + entityType); + + if (searchClient.indexExists(target)) { + searchClient.deleteIndex(target); + LOG.debug("Replaced old index '{}' for entity '{}'.", target, entityType); } - } catch (Exception ex) { - LOG.warn( - "Failed to delete staged index '{}' for entity '{}' after failure.", - stagedIndex, - entityType, - ex); } } + + if (activeIndex != null && searchClient.indexExists(activeIndex)) { + searchClient.deleteIndex(activeIndex); + LOG.debug("Replaced old index '{}' for entity '{}'.", activeIndex, entityType); + } + + if (!aliasesToAttach.isEmpty()) { + searchClient.addAliases(stagedIndex, aliasesToAttach); + } + LOG.info( + "Promoted staged index '{}' to serve entity '{}' (aliases: {}).", + stagedIndex, + entityType, + aliasesToAttach); + } catch (Exception ex) { + LOG.error( + "Failed to promote staged index '{}' for entity '{}'.", stagedIndex, entityType, ex); + } + } + + private void cleanupStagedIndexForEntity( + String entityType, String stagedIndex, SearchClient searchClient) { + try { + if (searchClient.indexExists(stagedIndex)) { + searchClient.deleteIndex(stagedIndex); + LOG.info( + "Deleted staged index '{}' after unsuccessful reindex for entity '{}'.", + stagedIndex, + entityType); + } + } catch (Exception ex) { + LOG.warn( + "Failed to delete staged index '{}' for entity '{}' after failure.", + stagedIndex, + entityType, + ex); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/RecreateIndexHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/RecreateIndexHandler.java index 5d746c21482..b792a923014 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/RecreateIndexHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/RecreateIndexHandler.java @@ -17,6 +17,16 @@ public interface RecreateIndexHandler { default void finalizeReindex(ReindexContext context, boolean success) {} + /** + * Finalize reindex for a specific entity. This allows per-entity index swapping + * to reduce memory footprint during reindexing. + * + * @param context The reindex context containing staged index information + * @param entityType The entity type to finalize + * @param success Whether the reindexing was successful for this entity + */ + default void finalizeEntityReindex(ReindexContext context, String entityType, boolean success) {} + class ReindexContext { private final Map canonicalIndexByEntity = new HashMap<>(); private final Map originalIndexByEntity = new HashMap<>(); @@ -24,6 +34,7 @@ public interface RecreateIndexHandler { private final Map> existingAliasesByEntity = new HashMap<>(); private final Map canonicalAliasByEntity = new HashMap<>(); private final Map> parentAliasesByEntity = new HashMap<>(); + private final Set finalizedEntities = new HashSet<>(); public void add( String entity, @@ -42,6 +53,14 @@ public interface RecreateIndexHandler { parentAliasesByEntity.put(entity, parentAliases != null ? parentAliases : List.of()); } + public synchronized void markFinalized(String entity) { + finalizedEntities.add(entity); + } + + public synchronized boolean isFinalized(String entity) { + return finalizedEntities.contains(entity); + } + public Optional getCanonicalIndex(String entity) { return Optional.ofNullable(canonicalIndexByEntity.get(entity)); } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexAppTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexAppTest.java index 7089243ae20..d99c0ec787d 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexAppTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexAppTest.java @@ -2,6 +2,7 @@ package org.openmetadata.service.apps.bundles.searchIndex; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -322,6 +323,7 @@ class SearchIndexAppTest extends OpenMetadataApplicationTest { .withName("SearchIndexingApplication") .withAppConfiguration(JsonUtils.convertValue(testJobData, Object.class)); searchIndexApp.init(testApp); + searchIndexApp.getJobData().setStatus(EventPublisherJob.Status.RUNNING); injectMockSink(); List entityErrors = @@ -352,11 +354,7 @@ class SearchIndexAppTest extends OpenMetadataApplicationTest { List entities = List.of(mockEntity, mockEntity); ResultList resultList = new ResultList<>(entities, null, null, 2); - Map contextData = new HashMap<>(); - contextData.put("entityType", "table"); - contextData.put("recreateIndex", false); - - lenient().doThrow(searchIndexException).when(mockSink).write(eq(entities), eq(contextData)); + lenient().doThrow(searchIndexException).when(mockSink).write(eq(entities), any(Map.class)); SearchIndexApp.IndexingTask task = new SearchIndexApp.IndexingTask<>("table", resultList, 0); @@ -1044,6 +1042,379 @@ class SearchIndexAppTest extends OpenMetadataApplicationTest { assertEquals(1000000L, jobData.getPayLoadSize()); } + @Test + void testPerEntityIndexFinalization() { + SearchRepository searchRepo = Entity.getSearchRepository(); + SearchClient searchClient = searchRepo.getSearchClient(); + String clusterAlias = searchRepo.getClusterAlias(); + + // Create test indexes + String stagedIndex = "test_table_search_index_rebuild_" + System.currentTimeMillis(); + String oldIndex = "test_table_search_index_old_" + System.currentTimeMillis(); + String canonicalIndex = "test_table_search_index"; + + try { + // Setup: Create the old index with aliases + searchClient.createIndex(oldIndex, "{}"); + searchClient.addAliases(oldIndex, Set.of("test_table", canonicalIndex, "test_all")); + + // Create the staged index + searchClient.createIndex(stagedIndex, "{}"); + + RecreateIndexHandler.ReindexContext context = new RecreateIndexHandler.ReindexContext(); + context.add( + "test_table", + canonicalIndex, + oldIndex, + stagedIndex, + Set.of("test_table", canonicalIndex, "test_all"), + "test_table", + List.of("test_all")); + + DefaultRecreateHandler handler = new DefaultRecreateHandler(); + + // Finalize just the table entity + handler.finalizeEntityReindex(context, "test_table", true); + + // Verify the staged index was promoted + assertFalse(searchClient.indexExists(oldIndex), "Old index should be deleted"); + assertTrue(searchClient.indexExists(stagedIndex), "Staged index should exist"); + + Set stagedAliases = searchClient.getAliases(stagedIndex); + assertTrue(stagedAliases.contains("test_table")); + assertTrue(stagedAliases.contains(canonicalIndex)); + assertTrue(stagedAliases.contains("test_all")); + + // Verify the entity is marked as finalized + assertTrue(context.isFinalized("test_table")); + } finally { + // Cleanup + if (searchClient.indexExists(stagedIndex)) { + searchClient.deleteIndex(stagedIndex); + } + if (searchClient.indexExists(oldIndex)) { + searchClient.deleteIndex(oldIndex); + } + } + } + + @Test + void testFinalizedEntitiesNotReprocessed() { + SearchRepository searchRepo = Entity.getSearchRepository(); + SearchClient searchClient = searchRepo.getSearchClient(); + + long timestamp = System.currentTimeMillis(); + String tableStaged = "test_table_staged_" + timestamp; + String tableOld = "test_table_old_" + timestamp; + String userStaged = "test_user_staged_" + timestamp; + String userOld = "test_user_old_" + timestamp; + + try { + // Create test indexes + searchClient.createIndex(tableOld, "{}"); + searchClient.addAliases(tableOld, Set.of("test_table_alias")); + searchClient.createIndex(tableStaged, "{}"); + + searchClient.createIndex(userOld, "{}"); + searchClient.addAliases(userOld, Set.of("test_user_alias")); + searchClient.createIndex(userStaged, "{}"); + + RecreateIndexHandler.ReindexContext context = new RecreateIndexHandler.ReindexContext(); + context.add( + "test_table", + "test_table_index", + tableOld, + tableStaged, + Set.of("test_table_alias"), + "test_table", + List.of()); + context.add( + "test_user", + "test_user_index", + userOld, + userStaged, + Set.of("test_user_alias"), + "test_user", + List.of()); + + DefaultRecreateHandler handler = new DefaultRecreateHandler(); + + // Finalize table entity first + handler.finalizeEntityReindex(context, "test_table", true); + assertTrue(context.isFinalized("test_table")); + assertFalse(searchClient.indexExists(tableOld), "Table old index should be deleted"); + + // Now call batch finalization (should skip table, only process user) + handler.finalizeReindex(context, true); + + // Both should be finalized + assertTrue(context.isFinalized("test_table")); + assertTrue(context.isFinalized("test_user")); + + // User should be processed + assertFalse(searchClient.indexExists(userOld), "User old index should be deleted"); + assertTrue(searchClient.indexExists(userStaged), "User staged index should exist"); + } finally { + // Cleanup + for (String index : List.of(tableStaged, tableOld, userStaged, userOld)) { + if (searchClient.indexExists(index)) { + searchClient.deleteIndex(index); + } + } + } + } + + @Test + void testEntityFinalizationOnSuccess() { + SearchRepository searchRepo = Entity.getSearchRepository(); + SearchClient searchClient = searchRepo.getSearchClient(); + + long timestamp = System.currentTimeMillis(); + String stagedIndex = "test_dashboard_staged_" + timestamp; + String oldIndex = "test_dashboard_old_" + timestamp; + + try { + // Create indexes + searchClient.createIndex(oldIndex, "{}"); + searchClient.addAliases(oldIndex, Set.of("test_dashboard", "test_all")); + searchClient.createIndex(stagedIndex, "{}"); + + RecreateIndexHandler.ReindexContext context = new RecreateIndexHandler.ReindexContext(); + context.add( + "test_dashboard", + "test_dashboard_index", + oldIndex, + stagedIndex, + Set.of("test_dashboard", "test_all"), + "test_dashboard", + List.of("test_all")); + + DefaultRecreateHandler handler = new DefaultRecreateHandler(); + handler.finalizeEntityReindex(context, "test_dashboard", true); + + // On success, old index should be deleted and aliases moved + assertFalse(searchClient.indexExists(oldIndex), "Old index should be deleted"); + assertTrue(searchClient.indexExists(stagedIndex), "Staged index should exist"); + + Set aliases = searchClient.getAliases(stagedIndex); + assertTrue(aliases.contains("test_dashboard")); + assertTrue(aliases.contains("test_all")); + assertTrue(context.isFinalized("test_dashboard")); + } finally { + // Cleanup + for (String index : List.of(stagedIndex, oldIndex)) { + if (searchClient.indexExists(index)) { + searchClient.deleteIndex(index); + } + } + } + } + + @Test + void testEntityFinalizationOnFailure() { + SearchRepository searchRepo = Entity.getSearchRepository(); + SearchClient searchClient = searchRepo.getSearchClient(); + + long timestamp = System.currentTimeMillis(); + String stagedIndex = "test_pipeline_staged_" + timestamp; + String oldIndex = "test_pipeline_old_" + timestamp; + + try { + // Create indexes + searchClient.createIndex(oldIndex, "{}"); + searchClient.addAliases(oldIndex, Set.of("test_pipeline", "test_all")); + searchClient.createIndex(stagedIndex, "{}"); + + RecreateIndexHandler.ReindexContext context = new RecreateIndexHandler.ReindexContext(); + context.add( + "test_pipeline", + "test_pipeline_index", + oldIndex, + stagedIndex, + Set.of("test_pipeline", "test_all"), + "test_pipeline", + List.of("test_all")); + + DefaultRecreateHandler handler = new DefaultRecreateHandler(); + handler.finalizeEntityReindex(context, "test_pipeline", false); + + // On failure, staged index should be deleted, old index should remain + assertFalse(searchClient.indexExists(stagedIndex), "Staged index should be deleted"); + assertTrue(searchClient.indexExists(oldIndex), "Old index should remain"); + + Set aliases = searchClient.getAliases(oldIndex); + assertTrue(aliases.contains("test_pipeline")); + assertTrue(aliases.contains("test_all")); + assertTrue(context.isFinalized("test_pipeline")); + } finally { + // Cleanup + for (String index : List.of(stagedIndex, oldIndex)) { + if (searchClient.indexExists(index)) { + searchClient.deleteIndex(index); + } + } + } + } + + @Test + void testPerEntityFinalizationWithClusterAlias() { + SearchRepository searchRepo = Entity.getSearchRepository(); + SearchClient searchClient = searchRepo.getSearchClient(); + String clusterAlias = searchRepo.getClusterAlias(); + + long timestamp = System.currentTimeMillis(); + String canonicalIndexName = + clusterAlias.isEmpty() + ? "test_cluster_table_index" + : clusterAlias + "_test_cluster_table_index"; + String oldIndex = canonicalIndexName + "_old_" + timestamp; + String stagedIndex = canonicalIndexName + "_rebuild_" + timestamp; + + try { + // Create old index with canonical name as alias + searchClient.createIndex(oldIndex, "{}"); + if (!clusterAlias.isEmpty()) { + // Add canonical index name as alias (simulating existing setup) + searchClient.addAliases(oldIndex, Set.of(canonicalIndexName, "test_cluster_alias")); + } else { + searchClient.addAliases(oldIndex, Set.of("test_cluster_alias")); + } + + // Create staged index + searchClient.createIndex(stagedIndex, "{}"); + + RecreateIndexHandler.ReindexContext context = new RecreateIndexHandler.ReindexContext(); + Set existingAliases = + clusterAlias.isEmpty() + ? Set.of("test_cluster_alias") + : Set.of(canonicalIndexName, "test_cluster_alias"); + + context.add( + "test_cluster_table", + canonicalIndexName, + oldIndex, + stagedIndex, + existingAliases, + "test_cluster_table", + List.of()); + + DefaultRecreateHandler handler = new DefaultRecreateHandler(); + + // Finalize the entity + handler.finalizeEntityReindex(context, "test_cluster_table", true); + + // Verify old index is deleted + assertFalse(searchClient.indexExists(oldIndex), "Old index should be deleted"); + assertTrue(searchClient.indexExists(stagedIndex), "Staged index should exist"); + + // Verify aliases are attached to staged index + Set stagedAliases = searchClient.getAliases(stagedIndex); + assertTrue(stagedAliases.contains("test_cluster_alias"), "Should have test_cluster_alias"); + + // Verify canonical index name works as alias (if cluster alias is configured) + if (!clusterAlias.isEmpty()) { + assertTrue( + stagedAliases.contains(canonicalIndexName), + "Should have canonical index name as alias: " + canonicalIndexName); + } + + assertTrue(context.isFinalized("test_cluster_table")); + } finally { + // Cleanup + for (String index : List.of(oldIndex, stagedIndex)) { + if (searchClient.indexExists(index)) { + searchClient.deleteIndex(index); + } + } + } + } + + @Test + void testMultipleEntitiesWithPerEntityFinalization() { + SearchRepository searchRepo = Entity.getSearchRepository(); + SearchClient searchClient = searchRepo.getSearchClient(); + + long timestamp = System.currentTimeMillis(); + String tableStaged = "test_multi_table_staged_" + timestamp; + String tableOld = "test_multi_table_old_" + timestamp; + String userStaged = "test_multi_user_staged_" + timestamp; + String userOld = "test_multi_user_old_" + timestamp; + String dashStaged = "test_multi_dash_staged_" + timestamp; + String dashOld = "test_multi_dash_old_" + timestamp; + + try { + // Setup three entities + searchClient.createIndex(tableOld, "{}"); + searchClient.addAliases(tableOld, Set.of("test_table", "test_all")); + searchClient.createIndex(tableStaged, "{}"); + + searchClient.createIndex(userOld, "{}"); + searchClient.addAliases(userOld, Set.of("test_user", "test_all")); + searchClient.createIndex(userStaged, "{}"); + + searchClient.createIndex(dashOld, "{}"); + searchClient.addAliases(dashOld, Set.of("test_dashboard", "test_all")); + searchClient.createIndex(dashStaged, "{}"); + + RecreateIndexHandler.ReindexContext context = new RecreateIndexHandler.ReindexContext(); + context.add( + "test_table", + "test_table_index", + tableOld, + tableStaged, + Set.of("test_table", "test_all"), + "test_table", + List.of("test_all")); + context.add( + "test_user", + "test_user_index", + userOld, + userStaged, + Set.of("test_user", "test_all"), + "test_user", + List.of("test_all")); + context.add( + "test_dashboard", + "test_dashboard_index", + dashOld, + dashStaged, + Set.of("test_dashboard", "test_all"), + "test_dashboard", + List.of("test_all")); + + DefaultRecreateHandler handler = new DefaultRecreateHandler(); + + // Simulate processing entities one by one + handler.finalizeEntityReindex(context, "test_table", true); + assertFalse(searchClient.indexExists(tableOld), "Table old index should be deleted"); + assertTrue(context.isFinalized("test_table")); + + handler.finalizeEntityReindex(context, "test_user", true); + assertFalse(searchClient.indexExists(userOld), "User old index should be deleted"); + assertTrue(context.isFinalized("test_user")); + + // Dashboard fails + handler.finalizeEntityReindex(context, "test_dashboard", false); + assertFalse( + searchClient.indexExists(dashStaged), "Dashboard staged should be deleted on failure"); + assertTrue(searchClient.indexExists(dashOld), "Dashboard old should remain on failure"); + assertTrue(context.isFinalized("test_dashboard")); + + // Verify final state - successful entities have staged indexes + assertTrue(searchClient.indexExists(tableStaged)); + assertTrue(searchClient.indexExists(userStaged)); + assertFalse(searchClient.indexExists(dashStaged)); + } finally { + // Cleanup + for (String index : + List.of(tableStaged, tableOld, userStaged, userOld, dashStaged, dashOld)) { + if (searchClient.indexExists(index)) { + searchClient.deleteIndex(index); + } + } + } + } + private static class AliasState { final Map> indexAliases = new HashMap<>(); final Set deletedIndices = new HashSet<>();