mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-01 19:25:56 +00:00
fix(systemUpdate): add configuration for opensearch clusters with zone awareness enabled (#13996)
This commit is contained in:
parent
b1354abcba
commit
602f178afd
@ -943,7 +943,9 @@ public class ESIndexBuilder {
|
||||
private Map<String, Object> setReindexOptimalSettings(String tempIndexName, int targetShards)
|
||||
throws IOException {
|
||||
Map<String, Object> res = new HashMap<>();
|
||||
setIndexSetting(tempIndexName, "0", INDEX_NUMBER_OF_REPLICAS);
|
||||
if (elasticSearchConfiguration.getBuildIndices().isReindexOptimizationEnabled()) {
|
||||
setIndexSetting(tempIndexName, "0", INDEX_NUMBER_OF_REPLICAS);
|
||||
}
|
||||
setIndexSetting(tempIndexName, "-1", INDEX_REFRESH_INTERVAL);
|
||||
// these depend on jvm max heap...
|
||||
// flush_threshold_size: 512MB by def. Increasing to 1gb, if heap at least 16gb (this is more
|
||||
@ -985,7 +987,9 @@ public class ESIndexBuilder {
|
||||
Map<String, Object> reinfo)
|
||||
throws IOException {
|
||||
// set the original values
|
||||
setIndexSetting(tempIndexName, targetReplicas, INDEX_NUMBER_OF_REPLICAS);
|
||||
if (elasticSearchConfiguration.getBuildIndices().isReindexOptimizationEnabled()) {
|
||||
setIndexSetting(tempIndexName, targetReplicas, INDEX_NUMBER_OF_REPLICAS);
|
||||
}
|
||||
setIndexSetting(tempIndexName, refreshinterval, INDEX_REFRESH_INTERVAL);
|
||||
// reinfo could be emtpy (if reindex was already ongoing...)
|
||||
String setting = INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE;
|
||||
|
||||
@ -17,6 +17,7 @@ import com.linkedin.metadata.version.GitVersion;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
@ -112,6 +113,7 @@ public class ESIndexBuilderTest {
|
||||
when(buildIndicesConfig.getRetentionUnit()).thenReturn(ChronoUnit.DAYS.name());
|
||||
when(buildIndicesConfig.isAllowDocCountMismatch()).thenReturn(false);
|
||||
when(buildIndicesConfig.isCloneIndices()).thenReturn(false);
|
||||
when(buildIndicesConfig.isReindexOptimizationEnabled()).thenReturn(true);
|
||||
|
||||
indexBuilder =
|
||||
new ESIndexBuilder(
|
||||
@ -834,6 +836,116 @@ public class ESIndexBuilderTest {
|
||||
assertEquals(indexSettings.get("codec"), "zstd_no_dict");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testReindexWithOptimizationDisabled() throws Exception {
|
||||
// Setup zone awareness enabled configuration
|
||||
when(buildIndicesConfig.isReindexOptimizationEnabled()).thenReturn(false);
|
||||
|
||||
// Create index builder with zone awareness enabled
|
||||
ESIndexBuilder optimizationDisabledIndexBuilder =
|
||||
new ESIndexBuilder(
|
||||
searchClient,
|
||||
NUM_SHARDS,
|
||||
NUM_REPLICAS,
|
||||
NUM_RETRIES,
|
||||
REFRESH_INTERVAL_SECONDS,
|
||||
new HashMap<>(),
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
elasticSearchConfiguration,
|
||||
gitVersion);
|
||||
|
||||
// Setup index state that requires reindexing
|
||||
ReindexConfig indexState = mock(ReindexConfig.class);
|
||||
when(indexState.exists()).thenReturn(true);
|
||||
when(indexState.requiresApplyMappings()).thenReturn(true);
|
||||
when(indexState.requiresApplySettings()).thenReturn(true);
|
||||
when(indexState.requiresReindex()).thenReturn(true);
|
||||
when(indexState.name()).thenReturn(TEST_INDEX_NAME);
|
||||
when(indexState.targetMappings()).thenReturn(createTestMappings());
|
||||
|
||||
// Setup target settings with index structure
|
||||
Map<String, Object> indexSettings = new HashMap<>();
|
||||
indexSettings.put(ESIndexBuilder.NUMBER_OF_SHARDS, 6);
|
||||
indexSettings.put(ESIndexBuilder.NUMBER_OF_REPLICAS, 1);
|
||||
indexSettings.put(ESIndexBuilder.REFRESH_INTERVAL, "1s");
|
||||
Map<String, Object> targetSettings = new HashMap<>();
|
||||
targetSettings.put("index", indexSettings);
|
||||
when(indexState.targetSettings()).thenReturn(targetSettings);
|
||||
|
||||
// Mock index creation
|
||||
CreateIndexResponse createResponse = mock(CreateIndexResponse.class);
|
||||
when(createResponse.isAcknowledged()).thenReturn(true);
|
||||
when(indicesClient.create(any(CreateIndexRequest.class), eq(RequestOptions.DEFAULT)))
|
||||
.thenReturn(createResponse);
|
||||
|
||||
// Mock document count to be 0 to trigger REINDEXED_SKIPPED_0DOCS path
|
||||
CountResponse countResponse = mock(CountResponse.class);
|
||||
when(countResponse.getCount()).thenReturn(0L);
|
||||
when(searchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT)))
|
||||
.thenReturn(countResponse);
|
||||
|
||||
// Mock TasksClient to avoid null pointer exception
|
||||
org.opensearch.client.TasksClient tasksClient = mock(org.opensearch.client.TasksClient.class);
|
||||
when(searchClient.tasks()).thenReturn(tasksClient);
|
||||
|
||||
// Mock task list response - return empty list (no previous tasks)
|
||||
org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse taskListResponse =
|
||||
mock(org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse.class);
|
||||
when(taskListResponse.getTasks()).thenReturn(new ArrayList<>());
|
||||
when(tasksClient.list(
|
||||
any(org.opensearch.action.admin.cluster.node.tasks.list.ListTasksRequest.class), any()))
|
||||
.thenReturn(taskListResponse);
|
||||
|
||||
// Mock refresh response
|
||||
org.opensearch.action.admin.indices.refresh.RefreshResponse refreshResponse =
|
||||
mock(org.opensearch.action.admin.indices.refresh.RefreshResponse.class);
|
||||
when(indicesClient.refresh(any(), eq(RequestOptions.DEFAULT))).thenReturn(refreshResponse);
|
||||
|
||||
// Mock settings operations for reindex optimization
|
||||
GetSettingsResponse getSettingsResponse = mock(GetSettingsResponse.class);
|
||||
when(getSettingsResponse.getSetting(anyString(), eq("index.translog.flush_threshold_size")))
|
||||
.thenReturn("512mb");
|
||||
when(indicesClient.getSettings(any(GetSettingsRequest.class), eq(RequestOptions.DEFAULT)))
|
||||
.thenReturn(getSettingsResponse);
|
||||
|
||||
AcknowledgedResponse settingsUpdateResponse = mock(AcknowledgedResponse.class);
|
||||
when(settingsUpdateResponse.isAcknowledged()).thenReturn(true);
|
||||
when(indicesClient.putSettings(any(UpdateSettingsRequest.class), eq(RequestOptions.DEFAULT)))
|
||||
.thenReturn(settingsUpdateResponse);
|
||||
|
||||
// Mock alias operations for final rename
|
||||
GetAliasesResponse getAliasesResponse = mock(GetAliasesResponse.class);
|
||||
when(getAliasesResponse.getAliases()).thenReturn(Map.of());
|
||||
when(indicesClient.getAlias(any(GetAliasesRequest.class), eq(RequestOptions.DEFAULT)))
|
||||
.thenReturn(getAliasesResponse);
|
||||
|
||||
AcknowledgedResponse aliasResponse = mock(AcknowledgedResponse.class);
|
||||
when(aliasResponse.isAcknowledged()).thenReturn(true);
|
||||
when(indicesClient.updateAliases(any(IndicesAliasesRequest.class), eq(RequestOptions.DEFAULT)))
|
||||
.thenReturn(aliasResponse);
|
||||
|
||||
// Execute the reindex
|
||||
ReindexResult result = optimizationDisabledIndexBuilder.buildIndex(indexState);
|
||||
|
||||
// Verify the result
|
||||
assertEquals(result, ReindexResult.REINDEXED_SKIPPED_0DOCS);
|
||||
|
||||
// Verify that replica settings were NOT modified during reindexing
|
||||
// When zone awareness is enabled, the number of replicas should not be set to 0
|
||||
verify(indicesClient, never())
|
||||
.putSettings(
|
||||
argThat(
|
||||
request ->
|
||||
request.indices().length == 1
|
||||
&& request.indices()[0].contains(TEST_INDEX_NAME + "_")
|
||||
&& // temp index name pattern
|
||||
request.settings().get("index.number_of_replicas") != null
|
||||
&& request.settings().get("index.number_of_replicas").equals("0")),
|
||||
eq(RequestOptions.DEFAULT));
|
||||
}
|
||||
|
||||
// Helper methods
|
||||
private Map<String, Object> createTestMappings() {
|
||||
return ImmutableMap.of(
|
||||
|
||||
@ -5,7 +5,6 @@ import static io.datahubproject.test.search.SearchTestUtils.TEST_ES_SEARCH_CONFI
|
||||
import static org.testng.Assert.*;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.linkedin.metadata.config.search.ElasticSearchConfiguration;
|
||||
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder;
|
||||
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ReindexConfig;
|
||||
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ReindexResult;
|
||||
@ -181,7 +180,7 @@ public abstract class IndexBuilderTestBase extends AbstractTestNGSpringContextTe
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
new ElasticSearchConfiguration(),
|
||||
TEST_ES_SEARCH_CONFIG,
|
||||
gitVersion);
|
||||
// index one doc
|
||||
IndexRequest indexRequest =
|
||||
@ -223,7 +222,7 @@ public abstract class IndexBuilderTestBase extends AbstractTestNGSpringContextTe
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
new ElasticSearchConfiguration(),
|
||||
TEST_ES_SEARCH_CONFIG,
|
||||
gitVersion);
|
||||
// reindex
|
||||
ReindexResult rr = changedShardBuilder.buildIndex(TEST_INDEX_NAME, Map.of(), Map.of());
|
||||
@ -287,7 +286,7 @@ public abstract class IndexBuilderTestBase extends AbstractTestNGSpringContextTe
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
new ElasticSearchConfiguration(),
|
||||
TEST_ES_SEARCH_CONFIG,
|
||||
gitVersion);
|
||||
// reindex
|
||||
ReindexResult rr = changedShardBuilder.buildIndex(TEST_INDEX_NAME, Map.of(), Map.of());
|
||||
|
||||
@ -15,6 +15,7 @@ import com.linkedin.metadata.config.DataHubAppConfiguration;
|
||||
import com.linkedin.metadata.config.SystemMetadataServiceConfig;
|
||||
import com.linkedin.metadata.config.TimeseriesAspectServiceConfig;
|
||||
import com.linkedin.metadata.config.graph.GraphServiceConfiguration;
|
||||
import com.linkedin.metadata.config.search.BuildIndicesConfiguration;
|
||||
import com.linkedin.metadata.config.search.ElasticSearchConfiguration;
|
||||
import com.linkedin.metadata.config.search.GraphQueryConfiguration;
|
||||
import com.linkedin.metadata.config.search.SearchConfiguration;
|
||||
@ -76,6 +77,8 @@ public class SearchTestUtils {
|
||||
});
|
||||
}
|
||||
})
|
||||
.buildIndices(
|
||||
BuildIndicesConfiguration.builder().reindexOptimizationEnabled(true).build())
|
||||
.build();
|
||||
|
||||
public static SystemMetadataServiceConfig TEST_SYSTEM_METADATA_SERVICE_CONFIG =
|
||||
|
||||
@ -1,12 +1,21 @@
|
||||
package com.linkedin.metadata.config.search;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Accessors(chain = true)
|
||||
@Builder(toBuilder = true)
|
||||
public class BuildIndicesConfiguration {
|
||||
|
||||
private boolean cloneIndices;
|
||||
private boolean allowDocCountMismatch;
|
||||
private String retentionUnit;
|
||||
private Long retentionValue;
|
||||
private boolean reindexOptimizationEnabled;
|
||||
}
|
||||
|
||||
@ -282,6 +282,7 @@ elasticsearch:
|
||||
cloneIndices: ${ELASTICSEARCH_BUILD_INDICES_CLONE_INDICES:true}
|
||||
retentionUnit: ${ELASTICSEARCH_BUILD_INDICES_RETENTION_UNIT:DAYS}
|
||||
retentionValue: ${ELASTICSEARCH_BUILD_INDICES_RETENTION_VALUE:60}
|
||||
reindexOptimizationEnabled: ${ELASTICSEARCH_BUILD_INDICES_REINDEX_OPTIMIZATION_ENABLED:true} # Disable when Multi-AZ zone replication is set to required, will prevent index from setting zero replicas during reindexing
|
||||
search:
|
||||
maxTermBucketSize: ${ELASTICSEARCH_QUERY_MAX_TERM_BUCKET_SIZE:20}
|
||||
# Defines the behavior of quoted searches, do they apply weights or exclude results
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user