Improvements: SearchReindexApp, recreate indexes should swap immediately after indexing is done (#23749)

* Improvements: SearchReindexApp, recreate indexes should swap immediately after indexing is done

* Fix search index with alias configured
This commit is contained in:
Sriharsha Chintalapani 2025-10-07 00:11:23 -04:00 committed by GitHub
parent 9ba65ac0d2
commit c96e39fb5f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 585 additions and 85 deletions

View File

@ -374,6 +374,21 @@ public class SearchIndexApp extends AbstractNativeApplication {
return Optional.empty(); return Optional.empty();
} }
private void finalizeEntityIndex(String entityType, boolean success) {
if (recreateIndexHandler == null
|| recreateContext == null
|| !Boolean.TRUE.equals(jobData.getRecreateIndex())) {
return;
}
try {
recreateIndexHandler.finalizeEntityReindex(recreateContext, entityType, success);
LOG.info("Finalized index for entity '{}' with success={}", entityType, success);
} catch (Exception ex) {
LOG.error("Failed to finalize index for entity '{}'", entityType, ex);
}
}
private void finalizeRecreateIndexes(boolean success) { private void finalizeRecreateIndexes(boolean success) {
if (recreateIndexHandler == null || recreateContext == null) { if (recreateIndexHandler == null || recreateContext == null) {
return; return;
@ -878,27 +893,52 @@ public class SearchIndexApp extends AbstractNativeApplication {
int loadPerThread = calculateNumberOfThreads(totalEntityRecords); int loadPerThread = calculateNumberOfThreads(totalEntityRecords);
if (totalEntityRecords > 0) { if (totalEntityRecords > 0) {
submitBatchTasks(entityType, loadPerThread, producerLatch); // Create entity-specific latch to wait for all batches of this entity
CountDownLatch entityLatch = new CountDownLatch(loadPerThread);
submitBatchTasks(entityType, loadPerThread, producerLatch, entityLatch);
// Wait for all batches of this entity to complete
try {
entityLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Interrupted while waiting for entity '{}' batches to complete", entityType);
finalizeEntityIndex(entityType, false);
return;
}
} }
if (jobLogger != null) { if (jobLogger != null) {
jobLogger.markEntityCompleted(entityType); jobLogger.markEntityCompleted(entityType);
} }
// Finalize index for this entity immediately after all its batches complete
finalizeEntityIndex(entityType, true);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error processing entity type {}", entityType, e); LOG.error("Error processing entity type {}", entityType, e);
// Cleanup staged index on failure
finalizeEntityIndex(entityType, false);
} }
} }
private void submitBatchTasks( private void submitBatchTasks(
String entityType, int loadPerThread, CountDownLatch producerLatch) { String entityType,
int loadPerThread,
CountDownLatch producerLatch,
CountDownLatch entityLatch) {
for (int i = 0; i < loadPerThread; i++) { for (int i = 0; i < loadPerThread; i++) {
LOG.debug("Submitting virtual thread producer task for batch {}/{}", i + 1, loadPerThread); LOG.debug("Submitting virtual thread producer task for batch {}/{}", i + 1, loadPerThread);
int currentOffset = i * batchSize.get(); int currentOffset = i * batchSize.get();
producerExecutor.submit(() -> processBatch(entityType, currentOffset, producerLatch)); producerExecutor.submit(
() -> processBatch(entityType, currentOffset, producerLatch, entityLatch));
} }
} }
private void processBatch(String entityType, int currentOffset, CountDownLatch producerLatch) { private void processBatch(
String entityType,
int currentOffset,
CountDownLatch producerLatch,
CountDownLatch entityLatch) {
try { try {
if (shouldSkipBatch()) { if (shouldSkipBatch()) {
return; return;
@ -917,6 +957,9 @@ public class SearchIndexApp extends AbstractNativeApplication {
} finally { } finally {
LOG.debug("Virtual thread completed batch, remaining: {}", producerLatch.getCount() - 1); LOG.debug("Virtual thread completed batch, remaining: {}", producerLatch.getCount() - 1);
producerLatch.countDown(); producerLatch.countDown();
if (entityLatch != null) {
entityLatch.countDown();
}
} }
} }

View File

@ -87,6 +87,41 @@ public class DefaultRecreateHandler implements RecreateIndexHandler {
return context; return context;
} }
@Override
public void finalizeEntityReindex(ReindexContext context, String entityType, boolean success) {
if (context == null || entityType == null) {
return;
}
// Check if already finalized
if (context.isFinalized(entityType)) {
LOG.debug("Entity '{}' already finalized, skipping", entityType);
return;
}
String canonicalIndex = context.getCanonicalIndex(entityType).orElse(null);
String activeIndex = context.getOriginalIndex(entityType).orElse(null);
String stagedIndex = context.getStagedIndex(entityType).orElse(null);
if (canonicalIndex == null || stagedIndex == null) {
LOG.debug("Skipping finalization for entity '{}' - missing index information", entityType);
return;
}
SearchRepository searchRepository = Entity.getSearchRepository();
SearchClient<?> searchClient = searchRepository.getSearchClient();
if (success) {
promoteIndexForEntity(
context, entityType, canonicalIndex, activeIndex, stagedIndex, searchClient);
} else {
cleanupStagedIndexForEntity(entityType, stagedIndex, searchClient);
}
// Mark as finalized
context.markFinalized(entityType);
}
@Override @Override
public void finalizeReindex(ReindexContext context, boolean success) { public void finalizeReindex(ReindexContext context, boolean success) {
if (context == null || context.isEmpty()) { if (context == null || context.isEmpty()) {
@ -97,6 +132,13 @@ public class DefaultRecreateHandler implements RecreateIndexHandler {
SearchClient<?> searchClient = searchRepository.getSearchClient(); SearchClient<?> searchClient = searchRepository.getSearchClient();
for (String entityType : context.getEntities()) { for (String entityType : context.getEntities()) {
// Skip if already finalized per-entity
if (context.isFinalized(entityType)) {
LOG.debug(
"Entity '{}' already finalized per-entity, skipping in batch finalization", entityType);
continue;
}
String canonicalIndex = context.getCanonicalIndex(entityType).orElse(null); String canonicalIndex = context.getCanonicalIndex(entityType).orElse(null);
String activeIndex = context.getOriginalIndex(entityType).orElse(null); String activeIndex = context.getOriginalIndex(entityType).orElse(null);
String stagedIndex = context.getStagedIndex(entityType).orElse(null); String stagedIndex = context.getStagedIndex(entityType).orElse(null);
@ -106,86 +148,111 @@ public class DefaultRecreateHandler implements RecreateIndexHandler {
} }
if (success) { if (success) {
try { promoteIndexForEntity(
Set<String> aliasesToAttach = new HashSet<>(); context, entityType, canonicalIndex, activeIndex, stagedIndex, searchClient);
aliasesToAttach.addAll(context.getExistingAliases(entityType));
context.getCanonicalAlias(entityType).ifPresent(aliasesToAttach::add);
aliasesToAttach.add(canonicalIndex);
List<String> parentAliases = context.getParentAliases(entityType);
if (parentAliases != null) {
parentAliases.stream()
.filter(alias -> alias != null && !alias.isBlank())
.forEach(aliasesToAttach::add);
}
aliasesToAttach.removeIf(alias -> alias == null || alias.isBlank());
for (String alias : aliasesToAttach) {
Set<String> targets = searchClient.getIndicesByAlias(alias);
for (String target : targets) {
if (target.equals(stagedIndex)) {
continue;
}
boolean belongsToEntity =
target.equals(canonicalIndex) || target.startsWith(canonicalIndex + "_rebuild_");
if (!belongsToEntity) {
LOG.debug(
"Skipping alias '{}' removal from index '{}' as it does not belong to entity '{}'.",
alias,
target,
entityType);
continue;
}
searchClient.removeAliases(target, Set.of(alias));
LOG.info(
"Removed alias '{}' from index '{}' during promotion for entity '{}'.",
alias,
target,
entityType);
if (searchClient.indexExists(target)) {
searchClient.deleteIndex(target);
LOG.debug("Replaced old index '{}' for entity '{}'.", target, entityType);
}
}
}
if (activeIndex != null && searchClient.indexExists(activeIndex)) {
searchClient.deleteIndex(activeIndex);
LOG.debug("Replaced old index '{}' for entity '{}'.", activeIndex, entityType);
}
if (!aliasesToAttach.isEmpty()) {
searchClient.addAliases(stagedIndex, aliasesToAttach);
}
LOG.info(
"Promoted staged index '{}' to serve entity '{}' (aliases: {}).",
stagedIndex,
entityType,
aliasesToAttach);
} catch (Exception ex) {
LOG.error(
"Failed to promote staged index '{}' for entity '{}'.", stagedIndex, entityType, ex);
}
} else { } else {
try { cleanupStagedIndexForEntity(entityType, stagedIndex, searchClient);
if (searchClient.indexExists(stagedIndex)) { }
searchClient.deleteIndex(stagedIndex);
LOG.info( // Mark as finalized
"Deleted staged index '{}' after unsuccessful reindex for entity '{}'.", context.markFinalized(entityType);
stagedIndex, }
entityType); }
private void promoteIndexForEntity(
ReindexContext context,
String entityType,
String canonicalIndex,
String activeIndex,
String stagedIndex,
SearchClient<?> searchClient) {
try {
Set<String> aliasesToAttach = new HashSet<>();
aliasesToAttach.addAll(context.getExistingAliases(entityType));
context.getCanonicalAlias(entityType).ifPresent(aliasesToAttach::add);
// Add canonical index name as an alias so queries using the full index name still work
// But only if no index exists with that name
if (!searchClient.indexExists(canonicalIndex)) {
aliasesToAttach.add(canonicalIndex);
}
List<String> parentAliases = context.getParentAliases(entityType);
if (parentAliases != null) {
parentAliases.stream()
.filter(alias -> alias != null && !alias.isBlank())
.forEach(aliasesToAttach::add);
}
aliasesToAttach.removeIf(alias -> alias == null || alias.isBlank());
for (String alias : aliasesToAttach) {
Set<String> targets = searchClient.getIndicesByAlias(alias);
for (String target : targets) {
if (target.equals(stagedIndex)) {
continue;
}
boolean belongsToEntity =
target.equals(canonicalIndex) || target.startsWith(canonicalIndex + "_rebuild_");
if (!belongsToEntity) {
LOG.debug(
"Skipping alias '{}' removal from index '{}' as it does not belong to entity '{}'.",
alias,
target,
entityType);
continue;
}
searchClient.removeAliases(target, Set.of(alias));
LOG.info(
"Removed alias '{}' from index '{}' during promotion for entity '{}'.",
alias,
target,
entityType);
if (searchClient.indexExists(target)) {
searchClient.deleteIndex(target);
LOG.debug("Replaced old index '{}' for entity '{}'.", target, entityType);
} }
} catch (Exception ex) {
LOG.warn(
"Failed to delete staged index '{}' for entity '{}' after failure.",
stagedIndex,
entityType,
ex);
} }
} }
if (activeIndex != null && searchClient.indexExists(activeIndex)) {
searchClient.deleteIndex(activeIndex);
LOG.debug("Replaced old index '{}' for entity '{}'.", activeIndex, entityType);
}
if (!aliasesToAttach.isEmpty()) {
searchClient.addAliases(stagedIndex, aliasesToAttach);
}
LOG.info(
"Promoted staged index '{}' to serve entity '{}' (aliases: {}).",
stagedIndex,
entityType,
aliasesToAttach);
} catch (Exception ex) {
LOG.error(
"Failed to promote staged index '{}' for entity '{}'.", stagedIndex, entityType, ex);
}
}
private void cleanupStagedIndexForEntity(
String entityType, String stagedIndex, SearchClient<?> searchClient) {
try {
if (searchClient.indexExists(stagedIndex)) {
searchClient.deleteIndex(stagedIndex);
LOG.info(
"Deleted staged index '{}' after unsuccessful reindex for entity '{}'.",
stagedIndex,
entityType);
}
} catch (Exception ex) {
LOG.warn(
"Failed to delete staged index '{}' for entity '{}' after failure.",
stagedIndex,
entityType,
ex);
} }
} }

View File

@ -17,6 +17,16 @@ public interface RecreateIndexHandler {
default void finalizeReindex(ReindexContext context, boolean success) {} default void finalizeReindex(ReindexContext context, boolean success) {}
/**
* Finalize reindex for a specific entity. This allows per-entity index swapping
* to reduce memory footprint during reindexing.
*
* @param context The reindex context containing staged index information
* @param entityType The entity type to finalize
* @param success Whether the reindexing was successful for this entity
*/
default void finalizeEntityReindex(ReindexContext context, String entityType, boolean success) {}
class ReindexContext { class ReindexContext {
private final Map<String, String> canonicalIndexByEntity = new HashMap<>(); private final Map<String, String> canonicalIndexByEntity = new HashMap<>();
private final Map<String, String> originalIndexByEntity = new HashMap<>(); private final Map<String, String> originalIndexByEntity = new HashMap<>();
@ -24,6 +34,7 @@ public interface RecreateIndexHandler {
private final Map<String, Set<String>> existingAliasesByEntity = new HashMap<>(); private final Map<String, Set<String>> existingAliasesByEntity = new HashMap<>();
private final Map<String, String> canonicalAliasByEntity = new HashMap<>(); private final Map<String, String> canonicalAliasByEntity = new HashMap<>();
private final Map<String, List<String>> parentAliasesByEntity = new HashMap<>(); private final Map<String, List<String>> parentAliasesByEntity = new HashMap<>();
private final Set<String> finalizedEntities = new HashSet<>();
public void add( public void add(
String entity, String entity,
@ -42,6 +53,14 @@ public interface RecreateIndexHandler {
parentAliasesByEntity.put(entity, parentAliases != null ? parentAliases : List.of()); parentAliasesByEntity.put(entity, parentAliases != null ? parentAliases : List.of());
} }
public synchronized void markFinalized(String entity) {
finalizedEntities.add(entity);
}
public synchronized boolean isFinalized(String entity) {
return finalizedEntities.contains(entity);
}
public Optional<String> getCanonicalIndex(String entity) { public Optional<String> getCanonicalIndex(String entity) {
return Optional.ofNullable(canonicalIndexByEntity.get(entity)); return Optional.ofNullable(canonicalIndexByEntity.get(entity));
} }

View File

@ -2,6 +2,7 @@ package org.openmetadata.service.apps.bundles.searchIndex;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@ -322,6 +323,7 @@ class SearchIndexAppTest extends OpenMetadataApplicationTest {
.withName("SearchIndexingApplication") .withName("SearchIndexingApplication")
.withAppConfiguration(JsonUtils.convertValue(testJobData, Object.class)); .withAppConfiguration(JsonUtils.convertValue(testJobData, Object.class));
searchIndexApp.init(testApp); searchIndexApp.init(testApp);
searchIndexApp.getJobData().setStatus(EventPublisherJob.Status.RUNNING);
injectMockSink(); injectMockSink();
List<EntityError> entityErrors = List<EntityError> entityErrors =
@ -352,11 +354,7 @@ class SearchIndexAppTest extends OpenMetadataApplicationTest {
List<EntityInterface> entities = List.of(mockEntity, mockEntity); List<EntityInterface> entities = List.of(mockEntity, mockEntity);
ResultList<EntityInterface> resultList = new ResultList<>(entities, null, null, 2); ResultList<EntityInterface> resultList = new ResultList<>(entities, null, null, 2);
Map<String, Object> contextData = new HashMap<>(); lenient().doThrow(searchIndexException).when(mockSink).write(eq(entities), any(Map.class));
contextData.put("entityType", "table");
contextData.put("recreateIndex", false);
lenient().doThrow(searchIndexException).when(mockSink).write(eq(entities), eq(contextData));
SearchIndexApp.IndexingTask<EntityInterface> task = SearchIndexApp.IndexingTask<EntityInterface> task =
new SearchIndexApp.IndexingTask<>("table", resultList, 0); new SearchIndexApp.IndexingTask<>("table", resultList, 0);
@ -1044,6 +1042,379 @@ class SearchIndexAppTest extends OpenMetadataApplicationTest {
assertEquals(1000000L, jobData.getPayLoadSize()); assertEquals(1000000L, jobData.getPayLoadSize());
} }
@Test
void testPerEntityIndexFinalization() {
SearchRepository searchRepo = Entity.getSearchRepository();
SearchClient<?> searchClient = searchRepo.getSearchClient();
String clusterAlias = searchRepo.getClusterAlias();
// Create test indexes
String stagedIndex = "test_table_search_index_rebuild_" + System.currentTimeMillis();
String oldIndex = "test_table_search_index_old_" + System.currentTimeMillis();
String canonicalIndex = "test_table_search_index";
try {
// Setup: Create the old index with aliases
searchClient.createIndex(oldIndex, "{}");
searchClient.addAliases(oldIndex, Set.of("test_table", canonicalIndex, "test_all"));
// Create the staged index
searchClient.createIndex(stagedIndex, "{}");
RecreateIndexHandler.ReindexContext context = new RecreateIndexHandler.ReindexContext();
context.add(
"test_table",
canonicalIndex,
oldIndex,
stagedIndex,
Set.of("test_table", canonicalIndex, "test_all"),
"test_table",
List.of("test_all"));
DefaultRecreateHandler handler = new DefaultRecreateHandler();
// Finalize just the table entity
handler.finalizeEntityReindex(context, "test_table", true);
// Verify the staged index was promoted
assertFalse(searchClient.indexExists(oldIndex), "Old index should be deleted");
assertTrue(searchClient.indexExists(stagedIndex), "Staged index should exist");
Set<String> stagedAliases = searchClient.getAliases(stagedIndex);
assertTrue(stagedAliases.contains("test_table"));
assertTrue(stagedAliases.contains(canonicalIndex));
assertTrue(stagedAliases.contains("test_all"));
// Verify the entity is marked as finalized
assertTrue(context.isFinalized("test_table"));
} finally {
// Cleanup
if (searchClient.indexExists(stagedIndex)) {
searchClient.deleteIndex(stagedIndex);
}
if (searchClient.indexExists(oldIndex)) {
searchClient.deleteIndex(oldIndex);
}
}
}
@Test
void testFinalizedEntitiesNotReprocessed() {
SearchRepository searchRepo = Entity.getSearchRepository();
SearchClient<?> searchClient = searchRepo.getSearchClient();
long timestamp = System.currentTimeMillis();
String tableStaged = "test_table_staged_" + timestamp;
String tableOld = "test_table_old_" + timestamp;
String userStaged = "test_user_staged_" + timestamp;
String userOld = "test_user_old_" + timestamp;
try {
// Create test indexes
searchClient.createIndex(tableOld, "{}");
searchClient.addAliases(tableOld, Set.of("test_table_alias"));
searchClient.createIndex(tableStaged, "{}");
searchClient.createIndex(userOld, "{}");
searchClient.addAliases(userOld, Set.of("test_user_alias"));
searchClient.createIndex(userStaged, "{}");
RecreateIndexHandler.ReindexContext context = new RecreateIndexHandler.ReindexContext();
context.add(
"test_table",
"test_table_index",
tableOld,
tableStaged,
Set.of("test_table_alias"),
"test_table",
List.of());
context.add(
"test_user",
"test_user_index",
userOld,
userStaged,
Set.of("test_user_alias"),
"test_user",
List.of());
DefaultRecreateHandler handler = new DefaultRecreateHandler();
// Finalize table entity first
handler.finalizeEntityReindex(context, "test_table", true);
assertTrue(context.isFinalized("test_table"));
assertFalse(searchClient.indexExists(tableOld), "Table old index should be deleted");
// Now call batch finalization (should skip table, only process user)
handler.finalizeReindex(context, true);
// Both should be finalized
assertTrue(context.isFinalized("test_table"));
assertTrue(context.isFinalized("test_user"));
// User should be processed
assertFalse(searchClient.indexExists(userOld), "User old index should be deleted");
assertTrue(searchClient.indexExists(userStaged), "User staged index should exist");
} finally {
// Cleanup
for (String index : List.of(tableStaged, tableOld, userStaged, userOld)) {
if (searchClient.indexExists(index)) {
searchClient.deleteIndex(index);
}
}
}
}
@Test
void testEntityFinalizationOnSuccess() {
SearchRepository searchRepo = Entity.getSearchRepository();
SearchClient<?> searchClient = searchRepo.getSearchClient();
long timestamp = System.currentTimeMillis();
String stagedIndex = "test_dashboard_staged_" + timestamp;
String oldIndex = "test_dashboard_old_" + timestamp;
try {
// Create indexes
searchClient.createIndex(oldIndex, "{}");
searchClient.addAliases(oldIndex, Set.of("test_dashboard", "test_all"));
searchClient.createIndex(stagedIndex, "{}");
RecreateIndexHandler.ReindexContext context = new RecreateIndexHandler.ReindexContext();
context.add(
"test_dashboard",
"test_dashboard_index",
oldIndex,
stagedIndex,
Set.of("test_dashboard", "test_all"),
"test_dashboard",
List.of("test_all"));
DefaultRecreateHandler handler = new DefaultRecreateHandler();
handler.finalizeEntityReindex(context, "test_dashboard", true);
// On success, old index should be deleted and aliases moved
assertFalse(searchClient.indexExists(oldIndex), "Old index should be deleted");
assertTrue(searchClient.indexExists(stagedIndex), "Staged index should exist");
Set<String> aliases = searchClient.getAliases(stagedIndex);
assertTrue(aliases.contains("test_dashboard"));
assertTrue(aliases.contains("test_all"));
assertTrue(context.isFinalized("test_dashboard"));
} finally {
// Cleanup
for (String index : List.of(stagedIndex, oldIndex)) {
if (searchClient.indexExists(index)) {
searchClient.deleteIndex(index);
}
}
}
}
@Test
void testEntityFinalizationOnFailure() {
SearchRepository searchRepo = Entity.getSearchRepository();
SearchClient<?> searchClient = searchRepo.getSearchClient();
long timestamp = System.currentTimeMillis();
String stagedIndex = "test_pipeline_staged_" + timestamp;
String oldIndex = "test_pipeline_old_" + timestamp;
try {
// Create indexes
searchClient.createIndex(oldIndex, "{}");
searchClient.addAliases(oldIndex, Set.of("test_pipeline", "test_all"));
searchClient.createIndex(stagedIndex, "{}");
RecreateIndexHandler.ReindexContext context = new RecreateIndexHandler.ReindexContext();
context.add(
"test_pipeline",
"test_pipeline_index",
oldIndex,
stagedIndex,
Set.of("test_pipeline", "test_all"),
"test_pipeline",
List.of("test_all"));
DefaultRecreateHandler handler = new DefaultRecreateHandler();
handler.finalizeEntityReindex(context, "test_pipeline", false);
// On failure, staged index should be deleted, old index should remain
assertFalse(searchClient.indexExists(stagedIndex), "Staged index should be deleted");
assertTrue(searchClient.indexExists(oldIndex), "Old index should remain");
Set<String> aliases = searchClient.getAliases(oldIndex);
assertTrue(aliases.contains("test_pipeline"));
assertTrue(aliases.contains("test_all"));
assertTrue(context.isFinalized("test_pipeline"));
} finally {
// Cleanup
for (String index : List.of(stagedIndex, oldIndex)) {
if (searchClient.indexExists(index)) {
searchClient.deleteIndex(index);
}
}
}
}
@Test
void testPerEntityFinalizationWithClusterAlias() {
SearchRepository searchRepo = Entity.getSearchRepository();
SearchClient<?> searchClient = searchRepo.getSearchClient();
String clusterAlias = searchRepo.getClusterAlias();
long timestamp = System.currentTimeMillis();
String canonicalIndexName =
clusterAlias.isEmpty()
? "test_cluster_table_index"
: clusterAlias + "_test_cluster_table_index";
String oldIndex = canonicalIndexName + "_old_" + timestamp;
String stagedIndex = canonicalIndexName + "_rebuild_" + timestamp;
try {
// Create old index with canonical name as alias
searchClient.createIndex(oldIndex, "{}");
if (!clusterAlias.isEmpty()) {
// Add canonical index name as alias (simulating existing setup)
searchClient.addAliases(oldIndex, Set.of(canonicalIndexName, "test_cluster_alias"));
} else {
searchClient.addAliases(oldIndex, Set.of("test_cluster_alias"));
}
// Create staged index
searchClient.createIndex(stagedIndex, "{}");
RecreateIndexHandler.ReindexContext context = new RecreateIndexHandler.ReindexContext();
Set<String> existingAliases =
clusterAlias.isEmpty()
? Set.of("test_cluster_alias")
: Set.of(canonicalIndexName, "test_cluster_alias");
context.add(
"test_cluster_table",
canonicalIndexName,
oldIndex,
stagedIndex,
existingAliases,
"test_cluster_table",
List.of());
DefaultRecreateHandler handler = new DefaultRecreateHandler();
// Finalize the entity
handler.finalizeEntityReindex(context, "test_cluster_table", true);
// Verify old index is deleted
assertFalse(searchClient.indexExists(oldIndex), "Old index should be deleted");
assertTrue(searchClient.indexExists(stagedIndex), "Staged index should exist");
// Verify aliases are attached to staged index
Set<String> stagedAliases = searchClient.getAliases(stagedIndex);
assertTrue(stagedAliases.contains("test_cluster_alias"), "Should have test_cluster_alias");
// Verify canonical index name works as alias (if cluster alias is configured)
if (!clusterAlias.isEmpty()) {
assertTrue(
stagedAliases.contains(canonicalIndexName),
"Should have canonical index name as alias: " + canonicalIndexName);
}
assertTrue(context.isFinalized("test_cluster_table"));
} finally {
// Cleanup
for (String index : List.of(oldIndex, stagedIndex)) {
if (searchClient.indexExists(index)) {
searchClient.deleteIndex(index);
}
}
}
}
@Test
void testMultipleEntitiesWithPerEntityFinalization() {
SearchRepository searchRepo = Entity.getSearchRepository();
SearchClient<?> searchClient = searchRepo.getSearchClient();
long timestamp = System.currentTimeMillis();
String tableStaged = "test_multi_table_staged_" + timestamp;
String tableOld = "test_multi_table_old_" + timestamp;
String userStaged = "test_multi_user_staged_" + timestamp;
String userOld = "test_multi_user_old_" + timestamp;
String dashStaged = "test_multi_dash_staged_" + timestamp;
String dashOld = "test_multi_dash_old_" + timestamp;
try {
// Setup three entities
searchClient.createIndex(tableOld, "{}");
searchClient.addAliases(tableOld, Set.of("test_table", "test_all"));
searchClient.createIndex(tableStaged, "{}");
searchClient.createIndex(userOld, "{}");
searchClient.addAliases(userOld, Set.of("test_user", "test_all"));
searchClient.createIndex(userStaged, "{}");
searchClient.createIndex(dashOld, "{}");
searchClient.addAliases(dashOld, Set.of("test_dashboard", "test_all"));
searchClient.createIndex(dashStaged, "{}");
RecreateIndexHandler.ReindexContext context = new RecreateIndexHandler.ReindexContext();
context.add(
"test_table",
"test_table_index",
tableOld,
tableStaged,
Set.of("test_table", "test_all"),
"test_table",
List.of("test_all"));
context.add(
"test_user",
"test_user_index",
userOld,
userStaged,
Set.of("test_user", "test_all"),
"test_user",
List.of("test_all"));
context.add(
"test_dashboard",
"test_dashboard_index",
dashOld,
dashStaged,
Set.of("test_dashboard", "test_all"),
"test_dashboard",
List.of("test_all"));
DefaultRecreateHandler handler = new DefaultRecreateHandler();
// Simulate processing entities one by one
handler.finalizeEntityReindex(context, "test_table", true);
assertFalse(searchClient.indexExists(tableOld), "Table old index should be deleted");
assertTrue(context.isFinalized("test_table"));
handler.finalizeEntityReindex(context, "test_user", true);
assertFalse(searchClient.indexExists(userOld), "User old index should be deleted");
assertTrue(context.isFinalized("test_user"));
// Dashboard fails
handler.finalizeEntityReindex(context, "test_dashboard", false);
assertFalse(
searchClient.indexExists(dashStaged), "Dashboard staged should be deleted on failure");
assertTrue(searchClient.indexExists(dashOld), "Dashboard old should remain on failure");
assertTrue(context.isFinalized("test_dashboard"));
// Verify final state - successful entities have staged indexes
assertTrue(searchClient.indexExists(tableStaged));
assertTrue(searchClient.indexExists(userStaged));
assertFalse(searchClient.indexExists(dashStaged));
} finally {
// Cleanup
for (String index :
List.of(tableStaged, tableOld, userStaged, userOld, dashStaged, dashOld)) {
if (searchClient.indexExists(index)) {
searchClient.deleteIndex(index);
}
}
}
}
private static class AliasState { private static class AliasState {
final Map<String, Set<String>> indexAliases = new HashMap<>(); final Map<String, Set<String>> indexAliases = new HashMap<>();
final Set<String> deletedIndices = new HashSet<>(); final Set<String> deletedIndices = new HashSet<>();