From 602f178afd64bf2e8fec787a996cc7353ebfc941 Mon Sep 17 00:00:00 2001 From: RyanHolstien Date: Wed, 9 Jul 2025 17:05:20 -0500 Subject: [PATCH] fix(systemUpdate): add configuration for opensearch clusters with zone awareness enabled (#13996) --- .../indexbuilder/ESIndexBuilder.java | 8 +- .../indexbuilder/ESIndexBuilderTest.java | 112 ++++++++++++++++++ .../indexbuilder/IndexBuilderTestBase.java | 7 +- .../test/search/SearchTestUtils.java | 3 + .../search/BuildIndicesConfiguration.java | 9 ++ .../src/main/resources/application.yaml | 1 + 6 files changed, 134 insertions(+), 6 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/ESIndexBuilder.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/ESIndexBuilder.java index 68c2be98a3..2cf6c8cd70 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/ESIndexBuilder.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/ESIndexBuilder.java @@ -943,7 +943,9 @@ public class ESIndexBuilder { private Map setReindexOptimalSettings(String tempIndexName, int targetShards) throws IOException { Map res = new HashMap<>(); - setIndexSetting(tempIndexName, "0", INDEX_NUMBER_OF_REPLICAS); + if (elasticSearchConfiguration.getBuildIndices().isReindexOptimizationEnabled()) { + setIndexSetting(tempIndexName, "0", INDEX_NUMBER_OF_REPLICAS); + } setIndexSetting(tempIndexName, "-1", INDEX_REFRESH_INTERVAL); // these depend on jvm max heap... // flush_threshold_size: 512MB by def. Increasing to 1gb, if heap at least 16gb (this is more @@ -985,7 +987,9 @@ public class ESIndexBuilder { Map reinfo) throws IOException { // set the original values - setIndexSetting(tempIndexName, targetReplicas, INDEX_NUMBER_OF_REPLICAS); + if (elasticSearchConfiguration.getBuildIndices().isReindexOptimizationEnabled()) { + setIndexSetting(tempIndexName, targetReplicas, INDEX_NUMBER_OF_REPLICAS); + } setIndexSetting(tempIndexName, refreshinterval, INDEX_REFRESH_INTERVAL); // reinfo could be emtpy (if reindex was already ongoing...) String setting = INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE; diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/indexbuilder/ESIndexBuilderTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/indexbuilder/ESIndexBuilderTest.java index 95aeaeb4e8..ceeaf01f3d 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/indexbuilder/ESIndexBuilderTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/indexbuilder/ESIndexBuilderTest.java @@ -17,6 +17,7 @@ import com.linkedin.metadata.version.GitVersion; import java.io.ByteArrayInputStream; import java.io.IOException; import java.time.temporal.ChronoUnit; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -112,6 +113,7 @@ public class ESIndexBuilderTest { when(buildIndicesConfig.getRetentionUnit()).thenReturn(ChronoUnit.DAYS.name()); when(buildIndicesConfig.isAllowDocCountMismatch()).thenReturn(false); when(buildIndicesConfig.isCloneIndices()).thenReturn(false); + when(buildIndicesConfig.isReindexOptimizationEnabled()).thenReturn(true); indexBuilder = new ESIndexBuilder( @@ -834,6 +836,116 @@ public class ESIndexBuilderTest { assertEquals(indexSettings.get("codec"), "zstd_no_dict"); } + @Test + void testReindexWithOptimizationDisabled() throws Exception { + // Setup zone awareness enabled configuration + when(buildIndicesConfig.isReindexOptimizationEnabled()).thenReturn(false); + + // Create index builder with zone awareness enabled + ESIndexBuilder optimizationDisabledIndexBuilder = + new ESIndexBuilder( + searchClient, + NUM_SHARDS, + NUM_REPLICAS, + NUM_RETRIES, + REFRESH_INTERVAL_SECONDS, + new HashMap<>(), + true, + true, + true, + elasticSearchConfiguration, + gitVersion); + + // Setup index state that requires reindexing + ReindexConfig indexState = mock(ReindexConfig.class); + when(indexState.exists()).thenReturn(true); + when(indexState.requiresApplyMappings()).thenReturn(true); + when(indexState.requiresApplySettings()).thenReturn(true); + when(indexState.requiresReindex()).thenReturn(true); + when(indexState.name()).thenReturn(TEST_INDEX_NAME); + when(indexState.targetMappings()).thenReturn(createTestMappings()); + + // Setup target settings with index structure + Map indexSettings = new HashMap<>(); + indexSettings.put(ESIndexBuilder.NUMBER_OF_SHARDS, 6); + indexSettings.put(ESIndexBuilder.NUMBER_OF_REPLICAS, 1); + indexSettings.put(ESIndexBuilder.REFRESH_INTERVAL, "1s"); + Map targetSettings = new HashMap<>(); + targetSettings.put("index", indexSettings); + when(indexState.targetSettings()).thenReturn(targetSettings); + + // Mock index creation + CreateIndexResponse createResponse = mock(CreateIndexResponse.class); + when(createResponse.isAcknowledged()).thenReturn(true); + when(indicesClient.create(any(CreateIndexRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(createResponse); + + // Mock document count to be 0 to trigger REINDEXED_SKIPPED_0DOCS path + CountResponse countResponse = mock(CountResponse.class); + when(countResponse.getCount()).thenReturn(0L); + when(searchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(countResponse); + + // Mock TasksClient to avoid null pointer exception + org.opensearch.client.TasksClient tasksClient = mock(org.opensearch.client.TasksClient.class); + when(searchClient.tasks()).thenReturn(tasksClient); + + // Mock task list response - return empty list (no previous tasks) + org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse taskListResponse = + mock(org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse.class); + when(taskListResponse.getTasks()).thenReturn(new ArrayList<>()); + when(tasksClient.list( + any(org.opensearch.action.admin.cluster.node.tasks.list.ListTasksRequest.class), any())) + .thenReturn(taskListResponse); + + // Mock refresh response + org.opensearch.action.admin.indices.refresh.RefreshResponse refreshResponse = + mock(org.opensearch.action.admin.indices.refresh.RefreshResponse.class); + when(indicesClient.refresh(any(), eq(RequestOptions.DEFAULT))).thenReturn(refreshResponse); + + // Mock settings operations for reindex optimization + GetSettingsResponse getSettingsResponse = mock(GetSettingsResponse.class); + when(getSettingsResponse.getSetting(anyString(), eq("index.translog.flush_threshold_size"))) + .thenReturn("512mb"); + when(indicesClient.getSettings(any(GetSettingsRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(getSettingsResponse); + + AcknowledgedResponse settingsUpdateResponse = mock(AcknowledgedResponse.class); + when(settingsUpdateResponse.isAcknowledged()).thenReturn(true); + when(indicesClient.putSettings(any(UpdateSettingsRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(settingsUpdateResponse); + + // Mock alias operations for final rename + GetAliasesResponse getAliasesResponse = mock(GetAliasesResponse.class); + when(getAliasesResponse.getAliases()).thenReturn(Map.of()); + when(indicesClient.getAlias(any(GetAliasesRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(getAliasesResponse); + + AcknowledgedResponse aliasResponse = mock(AcknowledgedResponse.class); + when(aliasResponse.isAcknowledged()).thenReturn(true); + when(indicesClient.updateAliases(any(IndicesAliasesRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(aliasResponse); + + // Execute the reindex + ReindexResult result = optimizationDisabledIndexBuilder.buildIndex(indexState); + + // Verify the result + assertEquals(result, ReindexResult.REINDEXED_SKIPPED_0DOCS); + + // Verify that replica settings were NOT modified during reindexing + // When zone awareness is enabled, the number of replicas should not be set to 0 + verify(indicesClient, never()) + .putSettings( + argThat( + request -> + request.indices().length == 1 + && request.indices()[0].contains(TEST_INDEX_NAME + "_") + && // temp index name pattern + request.settings().get("index.number_of_replicas") != null + && request.settings().get("index.number_of_replicas").equals("0")), + eq(RequestOptions.DEFAULT)); + } + // Helper methods private Map createTestMappings() { return ImmutableMap.of( diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/indexbuilder/IndexBuilderTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/search/indexbuilder/IndexBuilderTestBase.java index 6e1c1a82f2..15d5eb2375 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/indexbuilder/IndexBuilderTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/indexbuilder/IndexBuilderTestBase.java @@ -5,7 +5,6 @@ import static io.datahubproject.test.search.SearchTestUtils.TEST_ES_SEARCH_CONFI import static org.testng.Assert.*; import com.google.common.collect.ImmutableMap; -import com.linkedin.metadata.config.search.ElasticSearchConfiguration; import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder; import com.linkedin.metadata.search.elasticsearch.indexbuilder.ReindexConfig; import com.linkedin.metadata.search.elasticsearch.indexbuilder.ReindexResult; @@ -181,7 +180,7 @@ public abstract class IndexBuilderTestBase extends AbstractTestNGSpringContextTe true, false, false, - new ElasticSearchConfiguration(), + TEST_ES_SEARCH_CONFIG, gitVersion); // index one doc IndexRequest indexRequest = @@ -223,7 +222,7 @@ public abstract class IndexBuilderTestBase extends AbstractTestNGSpringContextTe true, false, false, - new ElasticSearchConfiguration(), + TEST_ES_SEARCH_CONFIG, gitVersion); // reindex ReindexResult rr = changedShardBuilder.buildIndex(TEST_INDEX_NAME, Map.of(), Map.of()); @@ -287,7 +286,7 @@ public abstract class IndexBuilderTestBase extends AbstractTestNGSpringContextTe true, false, false, - new ElasticSearchConfiguration(), + TEST_ES_SEARCH_CONFIG, gitVersion); // reindex ReindexResult rr = changedShardBuilder.buildIndex(TEST_INDEX_NAME, Map.of(), Map.of()); diff --git a/metadata-io/src/testFixtures/java/io/datahubproject/test/search/SearchTestUtils.java b/metadata-io/src/testFixtures/java/io/datahubproject/test/search/SearchTestUtils.java index e3346dcd75..f7f198b777 100644 --- a/metadata-io/src/testFixtures/java/io/datahubproject/test/search/SearchTestUtils.java +++ b/metadata-io/src/testFixtures/java/io/datahubproject/test/search/SearchTestUtils.java @@ -15,6 +15,7 @@ import com.linkedin.metadata.config.DataHubAppConfiguration; import com.linkedin.metadata.config.SystemMetadataServiceConfig; import com.linkedin.metadata.config.TimeseriesAspectServiceConfig; import com.linkedin.metadata.config.graph.GraphServiceConfiguration; +import com.linkedin.metadata.config.search.BuildIndicesConfiguration; import com.linkedin.metadata.config.search.ElasticSearchConfiguration; import com.linkedin.metadata.config.search.GraphQueryConfiguration; import com.linkedin.metadata.config.search.SearchConfiguration; @@ -76,6 +77,8 @@ public class SearchTestUtils { }); } }) + .buildIndices( + BuildIndicesConfiguration.builder().reindexOptimizationEnabled(true).build()) .build(); public static SystemMetadataServiceConfig TEST_SYSTEM_METADATA_SERVICE_CONFIG = diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/BuildIndicesConfiguration.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/BuildIndicesConfiguration.java index 70ffa59ea4..4ad6532e3f 100644 --- a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/BuildIndicesConfiguration.java +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/BuildIndicesConfiguration.java @@ -1,12 +1,21 @@ package com.linkedin.metadata.config.search; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.Accessors; @Data +@NoArgsConstructor +@AllArgsConstructor +@Accessors(chain = true) +@Builder(toBuilder = true) public class BuildIndicesConfiguration { private boolean cloneIndices; private boolean allowDocCountMismatch; private String retentionUnit; private Long retentionValue; + private boolean reindexOptimizationEnabled; } diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml index 8b5e41033f..7176824044 100644 --- a/metadata-service/configuration/src/main/resources/application.yaml +++ b/metadata-service/configuration/src/main/resources/application.yaml @@ -282,6 +282,7 @@ elasticsearch: cloneIndices: ${ELASTICSEARCH_BUILD_INDICES_CLONE_INDICES:true} retentionUnit: ${ELASTICSEARCH_BUILD_INDICES_RETENTION_UNIT:DAYS} retentionValue: ${ELASTICSEARCH_BUILD_INDICES_RETENTION_VALUE:60} + reindexOptimizationEnabled: ${ELASTICSEARCH_BUILD_INDICES_REINDEX_OPTIMIZATION_ENABLED:true} # Disable when Multi-AZ zone replication is set to required, will prevent index from setting zero replicas during reindexing search: maxTermBucketSize: ${ELASTICSEARCH_QUERY_MAX_TERM_BUCKET_SIZE:20} # Defines the behavior of quoted searches, do they apply weights or exclude results