fix(elasticsearch): fix regression system-update elasticsearch-setup (#15107)

This commit is contained in:
david-leifker 2025-10-24 22:28:05 -05:00 committed by GitHub
parent 080233054b
commit 8038dc9c45
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 2471 additions and 168 deletions

View File

@ -72,6 +72,7 @@ dependencies {
implementation externalDependency.slf4jApi
compileOnly externalDependency.lombok
implementation externalDependency.picocli
implementation externalDependency.resilience4j
implementation externalDependency.parquet
implementation externalDependency.protobuf
implementation externalDependency.springBeans

View File

@ -4,10 +4,12 @@ import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.datahub.upgrade.system.elasticsearch.util.IndexUtils;
import com.linkedin.datahub.upgrade.system.elasticsearch.util.UsageEventIndexUtils;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
import com.linkedin.upgrade.DataHubUpgradeState;
import io.datahubproject.metadata.context.OperationContext;
import java.util.function.Function;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -28,28 +30,32 @@ public class CreateUsageEventIndicesStep implements UpgradeStep {
return 3;
}
@Override
public boolean skip(UpgradeContext context) {
boolean analyticsEnabled = configurationProvider.getPlatformAnalytics().isEnabled();
if (!analyticsEnabled) {
log.info("DataHub analytics is disabled, skipping usage event index setup");
}
return !analyticsEnabled;
}
@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
try {
boolean analyticsEnabled = configurationProvider.getPlatformAnalytics().isEnabled();
String indexPrefix = configurationProvider.getElasticSearch().getIndex().getPrefix();
// Handle null prefix by converting to empty string
if (indexPrefix == null) {
indexPrefix = "";
}
if (!analyticsEnabled) {
log.info("DataHub analytics is disabled, skipping usage event index setup");
return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.SUCCEEDED);
}
boolean useOpenSearch = esComponents.getSearchClient().getEngineType().isOpenSearch();
int numShards = esComponents.getIndexBuilder().getNumShards();
int numReplicas = esComponents.getIndexBuilder().getNumReplicas();
if (useOpenSearch) {
setupOpenSearchUsageEvents(indexPrefix, numShards, numReplicas);
setupOpenSearchUsageEvents(indexPrefix, numShards, numReplicas, context.opContext());
} else {
setupElasticsearchUsageEvents(indexPrefix, numShards, numReplicas);
}
@ -64,10 +70,10 @@ public class CreateUsageEventIndicesStep implements UpgradeStep {
private void setupElasticsearchUsageEvents(String prefix, int numShards, int numReplicas)
throws Exception {
String separator = prefix.isEmpty() ? "" : "_";
String prefixedPolicy = prefix + separator + "datahub_usage_event_policy";
String prefixedTemplate = prefix + separator + "datahub_usage_event_index_template";
String prefixedDataStream = prefix + separator + "datahub_usage_event";
String prefixedPolicy = IndexUtils.createPrefixedName(prefix, "datahub_usage_event_policy");
String prefixedTemplate =
IndexUtils.createPrefixedName(prefix, "datahub_usage_event_index_template");
String prefixedDataStream = IndexUtils.createPrefixedName(prefix, "datahub_usage_event");
// Create ILM policy
UsageEventIndexUtils.createIlmPolicy(esComponents, prefixedPolicy);
@ -80,21 +86,36 @@ public class CreateUsageEventIndicesStep implements UpgradeStep {
UsageEventIndexUtils.createDataStream(esComponents, prefixedDataStream);
}
private void setupOpenSearchUsageEvents(String prefix, int numShards, int numReplicas)
private void setupOpenSearchUsageEvents(
String prefix, int numShards, int numReplicas, OperationContext operationContext)
throws Exception {
String separator = prefix.isEmpty() ? "" : "_";
String prefixedPolicy = prefix + separator + "datahub_usage_event_policy";
String prefixedTemplate = prefix + separator + "datahub_usage_event_index_template";
String prefixedIndex = prefix + separator + "datahub_usage_event-000001";
String prefixedPolicy = IndexUtils.createPrefixedName(prefix, "datahub_usage_event_policy");
String prefixedTemplate =
IndexUtils.createPrefixedName(prefix, "datahub_usage_event_index_template");
String prefixedIndex = IndexUtils.createPrefixedName(prefix, "datahub_usage_event-000001");
// Create ISM policy
UsageEventIndexUtils.createIsmPolicy(esComponents, prefixedPolicy, prefix);
// Create ISM policy (both AWS and self-hosted OpenSearch use the same format)
boolean policyCreated =
UsageEventIndexUtils.createIsmPolicy(
esComponents, prefixedPolicy, prefix, operationContext);
log.info("ISM policy creation result: {}", policyCreated);
// Create index template
UsageEventIndexUtils.createOpenSearchIndexTemplate(
esComponents, prefixedTemplate, numShards, numReplicas, prefix);
if (policyCreated) {
log.info("ISM policy created successfully, proceeding with template and index creation");
// Create initial index
UsageEventIndexUtils.createOpenSearchIndex(esComponents, prefixedIndex, prefix);
// Create index template (both AWS and self-hosted OpenSearch use the same format and
// endpoint)
log.info("Creating index template: {}", prefixedTemplate);
UsageEventIndexUtils.createOpenSearchIndexTemplate(
esComponents, prefixedTemplate, numShards, numReplicas, prefix);
// Create initial numbered index (both AWS and self-hosted OpenSearch use the same approach)
log.info("Creating initial index: {}", prefixedIndex);
UsageEventIndexUtils.createOpenSearchIndex(esComponents, prefixedIndex, prefix);
} else {
log.warn(
"ISM policy creation failed or is not supported. Skipping template and index creation to avoid configuration issues.");
log.info("Usage event tracking will not be available without proper policy configuration.");
}
}
}

View File

@ -1,12 +1,22 @@
package com.linkedin.datahub.upgrade.system.elasticsearch.util;
import com.fasterxml.jackson.databind.JsonNode;
import com.linkedin.common.urn.Urn;
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ReindexConfig;
import com.linkedin.metadata.shared.ElasticSearchIndexed;
import com.linkedin.metadata.utils.elasticsearch.SearchClientShim;
import com.linkedin.metadata.utils.elasticsearch.responses.RawResponse;
import com.linkedin.structured.StructuredPropertyDefinition;
import com.linkedin.util.Pair;
import io.datahubproject.metadata.context.OperationContext;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.retry.RetryRegistry;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -17,10 +27,24 @@ import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.opensearch.client.GetAliasesResponse;
import org.opensearch.client.Request;
import org.opensearch.client.RequestOptions;
@Slf4j
public class IndexUtils {
/** Default retry configuration for upgrade operations. */
private static final RetryConfig DEFAULT_RETRY_CONFIG =
RetryConfig.custom()
.maxAttempts(5)
.waitDuration(Duration.ofSeconds(2))
.retryOnException(e -> true) // Retry on any exception
.failAfterMaxAttempts(false) // Return false instead of throwing
.build();
/** Retry registry for upgrade operations. */
private static final RetryRegistry RETRY_REGISTRY = RetryRegistry.of(DEFAULT_RETRY_CONFIG);
public static final String INDEX_BLOCKS_WRITE_SETTING = "index.blocks.write";
public static final int INDEX_BLOCKS_WRITE_RETRY = 4;
public static final int INDEX_BLOCKS_WRITE_WAIT_SECONDS = 10;
@ -95,4 +119,248 @@ public class IndexUtils {
return finalIndexName;
}
/**
* Extracts a JSON value from a JSON string using Jackson ObjectMapper.
*
* <p>This method uses the OperationContext's ObjectMapper to parse JSON and extract integer
* values by key. This is more robust than regex-based parsing and handles various JSON formats
* correctly.
*
* @param operationContext the operation context providing the ObjectMapper
* @param json the JSON string to parse
* @param key the key to extract
* @return the integer value, or -1 if not found or parsing fails
*/
static int extractJsonValue(OperationContext operationContext, String json, String key) {
try {
JsonNode jsonNode = operationContext.getObjectMapper().readTree(json);
JsonNode valueNode = jsonNode.get(key);
if (valueNode != null && valueNode.isNumber()) {
return valueNode.asInt();
}
} catch (Exception e) {
log.warn("Error extracting JSON value for key {}: {}", key, e.getMessage());
}
return -1;
}
/**
* Creates a prefixed name for usage event resources.
*
* <p>This method handles the logic for adding prefixes to usage event resource names, including
* the proper separator handling. If the prefix is empty, no separator is added. If the prefix is
* not empty, an underscore separator is added between the prefix and the resource name.
*
* @param prefix the index prefix (e.g., "prod", "dev", or empty string)
* @param resourceName the base resource name (e.g., "datahub_usage_event_policy")
* @return the prefixed resource name (e.g., "prod_datahub_usage_event_policy" or
* "datahub_usage_event_policy")
*/
public static String createPrefixedName(String prefix, String resourceName) {
if (prefix == null || prefix.isEmpty()) {
return resourceName;
}
return prefix + "_" + resourceName;
}
/**
* Loads a resource file as a UTF-8 encoded string.
*
* <p>This utility method reads a resource file from the classpath and returns its contents as a
* string. It's used internally by other methods to load JSON templates and configuration files.
*
* <p>The method uses the class loader to locate the resource and reads it using a buffered input
* stream for efficient memory usage.
*
* @param resourcePath the path to the resource file (e.g.,
* "/index/usage-event/elasticsearch_policy.json")
* @return the contents of the resource file as a UTF-8 encoded string
* @throws IOException if the resource cannot be found or read
*/
static String loadResourceAsString(String resourcePath) throws IOException {
try (InputStream inputStream = IndexUtils.class.getResourceAsStream(resourcePath)) {
if (inputStream == null) {
throw new IOException("Resource not found: " + resourcePath);
}
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
}
}
/**
* Executes a function with retry logic using Resilience4j RetryConfig.
*
* <p>This method uses Resilience4j's retry mechanism with exponential backoff. It retries the
* provided operation up to the configured number of attempts.
*
* @param maxAttempts the maximum number of attempts
* @param initialDelayMs the initial delay in milliseconds (used for custom config)
* @param operation the operation to retry
* @return true if the operation succeeded, false if all attempts failed
*/
public static boolean retryWithBackoff(
int maxAttempts, long initialDelayMs, RetryableOperation operation) {
// Create a custom retry config for this specific operation
RetryConfig customConfig =
RetryConfig.custom()
.maxAttempts(maxAttempts)
.waitDuration(Duration.ofMillis(initialDelayMs))
.retryOnException(e -> true) // Retry on any exception
.failAfterMaxAttempts(false) // Return false instead of throwing
.build();
RetryRegistry customRegistry = RetryRegistry.of(customConfig);
Retry retry = customRegistry.retry("upgrade-operation");
try {
return retry.executeSupplier(
() -> {
try {
return operation.execute();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
} catch (Exception e) {
log.error("All {} attempts failed", maxAttempts);
return false;
}
}
/**
* Executes a function with retry logic using the default RetryConfig.
*
* <p>This method uses the default retry configuration (5 attempts, 2 second delay). It's a
* convenience method for common retry scenarios.
*
* @param operation the operation to retry
* @return true if the operation succeeded, false if all attempts failed
*/
public static boolean retryWithDefaultConfig(RetryableOperation operation) {
Retry retry = RETRY_REGISTRY.retry("default-upgrade-operation");
try {
return retry.executeSupplier(
() -> {
try {
return operation.execute();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
} catch (Exception e) {
log.error("All attempts failed with default config");
return false;
}
}
/** Functional interface for retryable operations. */
/**
* Detects if the OpenSearch instance is AWS OpenSearch Service based on the host URL.
*
* <p>AWS OpenSearch Service instances typically have URLs containing "amazonaws.com" or
* "es.amazonaws.com". This method checks the host configuration to determine if we're connecting
* to AWS OpenSearch Service.
*
* @param esComponents the Elasticsearch components factory providing search client access
* @return true if the instance appears to be AWS OpenSearch Service, false otherwise
*/
public static boolean isAwsOpenSearchService(
BaseElasticSearchComponentsFactory.BaseElasticSearchComponents esComponents) {
try {
// Get the host from the search client configuration
String host = esComponents.getSearchClient().getShimConfiguration().getHost();
if (host != null) {
return host.contains("amazonaws.com") || host.contains("es.amazonaws.com");
}
} catch (Exception e) {
log.debug("Could not determine host for AWS OpenSearch detection: {}", e.getMessage());
}
return false;
}
@FunctionalInterface
public interface RetryableOperation {
boolean execute() throws Exception;
}
/**
* Performs a GET request with consistent logging.
*
* @param esComponents the Elasticsearch components factory providing search client access
* @param endpoint the endpoint to GET (e.g., "/_plugins/_ism/policies/policy_name")
* @return the raw response from the request
* @throws IOException if the request fails
*/
public static RawResponse performGetRequest(
BaseElasticSearchComponentsFactory.BaseElasticSearchComponents esComponents, String endpoint)
throws IOException {
log.info("GET => {}", endpoint);
Request request = new Request("GET", endpoint);
return esComponents.getSearchClient().performLowLevelRequest(request);
}
/**
* Performs a PUT request with consistent logging.
*
* @param esComponents the Elasticsearch components factory providing search client access
* @param endpoint the endpoint to PUT to (e.g., "/_plugins/_ism/policies/policy_name")
* @param jsonBody the JSON body to send with the request
* @return the raw response from the request
* @throws IOException if the request fails
*/
public static RawResponse performPutRequest(
BaseElasticSearchComponentsFactory.BaseElasticSearchComponents esComponents,
String endpoint,
String jsonBody)
throws IOException {
log.info("PUT => {}", endpoint);
Request request = new Request("PUT", endpoint);
request.setJsonEntity(jsonBody);
return esComponents.getSearchClient().performLowLevelRequest(request);
}
/**
* Performs a POST request with consistent logging.
*
* @param esComponents the Elasticsearch components factory providing search client access
* @param endpoint the endpoint to POST to (e.g., "/_search")
* @param jsonBody the JSON body to send with the request
* @return the raw response from the request
* @throws IOException if the request fails
*/
public static RawResponse performPostRequest(
BaseElasticSearchComponentsFactory.BaseElasticSearchComponents esComponents,
String endpoint,
String jsonBody)
throws IOException {
log.info("POST => {}", endpoint);
Request request = new Request("POST", endpoint);
request.setJsonEntity(jsonBody);
return esComponents.getSearchClient().performLowLevelRequest(request);
}
/**
* Performs a PUT request with query parameters and consistent logging.
*
* @param esComponents the Elasticsearch components factory providing search client access
* @param endpoint the base endpoint to PUT to (e.g., "/_plugins/_ism/policies/policy_name")
* @param queryParams the query parameters to append (e.g., "?if_seq_no=123&if_primary_term=456")
* @param jsonBody the JSON body to send with the request
* @return the raw response from the request
* @throws IOException if the request fails
*/
public static RawResponse performPutRequestWithParams(
BaseElasticSearchComponentsFactory.BaseElasticSearchComponents esComponents,
String endpoint,
String queryParams,
String jsonBody)
throws IOException {
String fullEndpoint = endpoint + queryParams;
log.info("PUT => {}", fullEndpoint);
Request request = new Request("PUT", fullEndpoint);
request.setJsonEntity(jsonBody);
return esComponents.getSearchClient().performLowLevelRequest(request);
}
}

View File

@ -2,13 +2,13 @@ package com.linkedin.datahub.upgrade.system.elasticsearch.util;
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
import com.linkedin.metadata.utils.elasticsearch.responses.RawResponse;
import io.datahubproject.metadata.context.OperationContext;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.client.Request;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.CreateIndexResponse;
@ -31,10 +31,10 @@ public class UsageEventIndexUtils {
*
* <p>This method creates an ILM policy that manages the lifecycle of usage event indices,
* including rollover and retention policies. The policy is loaded from the resource file {@code
* /index/usage-event/policy.json} and applied to the specified policy name.
* /index/usage-event/elasticsearch_policy.json} and applied to the specified policy name.
*
* <p>The method uses the low-level REST client to make a PUT request to the {@code
* _ilm/policy/{policyName}} endpoint, which provides upsert behavior (creates if not exists,
* /_ilm/policy/{policyName}} endpoint, which provides upsert behavior (creates if not exists,
* updates if exists).
*
* @param esComponents the Elasticsearch components factory providing search client access
@ -47,23 +47,47 @@ public class UsageEventIndexUtils {
String policyName)
throws IOException {
try {
String policyJson = loadResourceAsString("/index/usage-event/policy.json");
String policyJson =
IndexUtils.loadResourceAsString("/index/usage-event/elasticsearch_policy.json");
// Use the low-level client to make the PUT request to _ilm/policy endpoint
String endpoint = "_ilm/policy/" + policyName;
String endpoint = "/_ilm/policy/" + policyName;
Request request = new Request("PUT", endpoint);
request.setJsonEntity(policyJson);
// Use retry logic for policy creation
boolean success =
IndexUtils.retryWithBackoff(
5,
2000,
() -> {
try {
RawResponse response =
IndexUtils.performPutRequest(esComponents, endpoint, policyJson);
RawResponse response = esComponents.getSearchClient().performLowLevelRequest(request);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == 200 || statusCode == 201) {
log.info("Successfully created ILM policy: {}", policyName);
return true;
} else if (statusCode == 409) {
log.info("ILM policy {} already exists", policyName);
return true; // Consider this a success since policy exists
} else {
log.error("ILM policy creation returned status: {}", statusCode);
return false;
}
} catch (ResponseException e) {
if (e.getResponse().getStatusLine().getStatusCode() == 409) {
log.info("ILM policy {} already exists", policyName);
return true;
} else {
throw e;
}
}
});
if (response.getStatusLine().getStatusCode() == 200
|| response.getStatusLine().getStatusCode() == 201) {
log.info("Successfully created ILM policy: {}", policyName);
} else {
log.error(
"ILM policy creation returned status: {}", response.getStatusLine().getStatusCode());
if (!success) {
throw new IOException("Failed to create ILM policy after retries: " + policyName);
}
} catch (ResponseException e) {
if (e.getResponse().getStatusLine().getStatusCode() == 409) {
log.info("ILM policy {} already exists", policyName);
@ -74,58 +98,271 @@ public class UsageEventIndexUtils {
}
/**
* Creates an Index State Management (ISM) policy for AWS OpenSearch usage events.
* Creates or updates an Index State Management (ISM) policy for AWS OpenSearch usage events.
*
* <p>This method creates an ISM policy that manages the lifecycle of usage event indices in AWS
* OpenSearch environments. The policy is loaded from the resource file {@code
* /index/usage-event/aws_es_ism_policy.json} and applied to the specified policy name.
* <p>This method follows the same pattern as the Docker setup script: first checking if the
* policy exists, then either updating the existing policy or creating a new one. The policy is
* loaded from the resource file {@code /index/usage-event/opensearch_policy.json} and applied to
* the specified policy name.
*
* <p>The method uses the low-level REST client to make a PUT request to the {@code
* _plugins/_ism/policies/{policyName}} endpoint, which is specific to AWS OpenSearch. The policy
* <p>The method uses the low-level REST client to make requests to the {@code
* /_plugins/_ism/policies/{policyName}} endpoint, which is specific to AWS OpenSearch. The policy
* template includes placeholder replacement for the index prefix.
*
* <p>ISM policies in OpenSearch provide similar functionality to ILM policies in Elasticsearch,
* including rollover conditions, state transitions, and retention policies.
*
* <p>This method handles the following scenarios:
*
* <ul>
* <li>Policy exists (200): Updates the existing policy using optimistic concurrency control
* <li>Policy doesn't exist (404): Creates a new policy
* <li>ISM not supported (400): Returns false gracefully
* </ul>
*
* @param esComponents the Elasticsearch components factory providing search client access
* @param policyName the name of the ISM policy to create (e.g., "datahub_usage_event_policy")
* @param policyName the name of the ISM policy to create/update (e.g.,
* "datahub_usage_event_policy")
* @param prefix the index prefix to apply to policy configurations (e.g., "prod_")
* @return true if the policy was successfully created, updated, or already exists, false if
* policy operation failed due to unsupported features or other errors
* @throws IOException if there's an error reading the policy template or making the request
* @throws ResponseException if the request fails with a non-409 status code
*/
public static void createIsmPolicy(
public static boolean createIsmPolicy(
BaseElasticSearchComponentsFactory.BaseElasticSearchComponents esComponents,
String policyName,
String prefix)
String prefix,
OperationContext operationContext)
throws IOException {
try {
String policyJson = loadResourceAsString("/index/usage-event/aws_es_ism_policy.json");
// Replace placeholders
policyJson = policyJson.replace("PREFIX", prefix);
log.debug("Creating ISM policy: {}", policyName);
// For AWS OpenSearch, we need to use the low-level REST client for ISM policies
// since the high-level client doesn't support the _plugins/_ism/policies endpoint
String endpoint = "_plugins/_ism/policies/" + policyName;
String policyJson = loadPolicyTemplate(prefix);
String endpoint = "/_plugins/_ism/policies/" + policyName;
// Use the low-level client to make the PUT request
Request request = new Request("PUT", endpoint);
request.setJsonEntity(policyJson);
// Use retry logic for the entire policy creation operation (like the Docker script)
boolean success =
IndexUtils.retryWithBackoff(
5,
2000,
() -> {
try {
RawResponse getResponse = IndexUtils.performGetRequest(esComponents, endpoint);
return handleGetResponse(
getResponse,
esComponents,
policyName,
prefix,
endpoint,
policyJson,
operationContext);
} catch (ResponseException e) {
return handleResponseException(
e, esComponents, policyName, prefix, endpoint, policyJson, operationContext);
}
});
RawResponse response = esComponents.getSearchClient().performLowLevelRequest(request);
return success;
} catch (Exception e) {
log.error("Unexpected error creating ISM policy {}: {}", policyName, e.getMessage(), e);
return false;
}
}
if (response.getStatusLine().getStatusCode() == 200
|| response.getStatusLine().getStatusCode() == 201) {
/**
* Handles the response from a GET request to check ISM policy existence.
*
* @param getResponse the response from the GET request
* @param esComponents the Elasticsearch components factory
* @param policyName the name of the ISM policy
* @param prefix the prefix for index patterns
* @param endpoint the API endpoint
* @param policyJson the policy JSON to create
* @param operationContext the operation context for JSON parsing
* @return true if successful, false if retry is needed
*/
private static boolean handleGetResponse(
RawResponse getResponse,
BaseElasticSearchComponentsFactory.BaseElasticSearchComponents esComponents,
String policyName,
String prefix,
String endpoint,
String policyJson,
OperationContext operationContext) {
int getStatusCode = getResponse.getStatusLine().getStatusCode();
if (getStatusCode == 200) {
return handleExistingPolicy(esComponents, policyName, prefix, operationContext);
}
if (getStatusCode == 404) {
return createNewPolicy(esComponents, endpoint, policyJson, policyName);
}
// Handle other GET errors - these are retryable (like the Docker script)
String getResponseBody = extractResponseBody(getResponse);
log.warn(
"Failed to check ISM policy existence. Status: {}. Response: {}. Will retry.",
getStatusCode,
getResponseBody);
throw new RuntimeException("Retryable error: " + getStatusCode + " - " + getResponseBody);
}
/**
* Handles ResponseException from GET request.
*
* @param e the ResponseException
* @param esComponents the Elasticsearch components factory
* @param policyName the name of the ISM policy
* @param prefix the prefix for index patterns
* @param endpoint the API endpoint
* @param policyJson the policy JSON to create
* @param operationContext the operation context for JSON parsing
* @return true if successful, false if retry is needed
*/
private static boolean handleResponseException(
ResponseException e,
BaseElasticSearchComponentsFactory.BaseElasticSearchComponents esComponents,
String policyName,
String prefix,
String endpoint,
String policyJson,
OperationContext operationContext) {
int statusCode = e.getResponse().getStatusLine().getStatusCode();
String responseBody = extractResponseBody(e.getResponse());
if (statusCode == 200) {
log.info("ISM policy {} already exists (from exception), updating it", policyName);
return handleExistingPolicy(esComponents, policyName, prefix, operationContext);
}
if (statusCode == 404) {
log.info("ISM policy {} doesn't exist (from exception), creating it", policyName);
return createNewPolicy(esComponents, endpoint, policyJson, policyName);
}
// Handle all other errors as retryable (including 400 with .opendistro-ism-config)
log.warn(
"ISM policy operation failed with status: {}. Response: {}. Will retry.",
statusCode,
responseBody);
throw new RuntimeException("Retryable error: " + statusCode + " - " + responseBody);
}
/**
* Handles the case when an ISM policy already exists by updating it.
*
* @param esComponents the Elasticsearch components factory
* @param policyName the name of the ISM policy
* @param prefix the prefix for index patterns
* @param operationContext the operation context for JSON parsing
* @return true if successful, false otherwise
*/
private static boolean handleExistingPolicy(
BaseElasticSearchComponentsFactory.BaseElasticSearchComponents esComponents,
String policyName,
String prefix,
OperationContext operationContext) {
log.info("ISM policy {} already exists, updating it", policyName);
try {
updateIsmPolicy(esComponents, policyName, prefix, operationContext);
return true;
} catch (Exception updateException) {
log.warn(
"Failed to update existing ISM policy {} (non-fatal): {}",
policyName,
updateException.getMessage());
return true; // Still consider this success since policy exists
}
}
/**
* Creates a new ISM policy.
*
* @param esComponents the Elasticsearch components factory
* @param endpoint the API endpoint
* @param policyJson the policy JSON to create
* @param policyName the name of the ISM policy
* @return true if successful, false otherwise
*/
private static boolean createNewPolicy(
BaseElasticSearchComponentsFactory.BaseElasticSearchComponents esComponents,
String endpoint,
String policyJson,
String policyName) {
log.info("ISM policy {} doesn't exist, creating it", policyName);
try {
RawResponse createResponse = IndexUtils.performPutRequest(esComponents, endpoint, policyJson);
int createStatusCode = createResponse.getStatusLine().getStatusCode();
if (createStatusCode == 200 || createStatusCode == 201) {
log.info("Successfully created ISM policy: {}", policyName);
} else {
log.warn(
"ISM policy creation returned status: {}", response.getStatusLine().getStatusCode());
return true;
}
} catch (ResponseException e) {
if (e.getResponse().getStatusLine().getStatusCode() == 409) {
if (createStatusCode == 409) {
log.info("ISM policy {} already exists", policyName);
} else {
throw e;
return true; // Consider this a success since policy exists
}
log.error("ISM policy creation returned status: {}", createStatusCode);
return false;
} catch (IOException e) {
log.error("Failed to create ISM policy {}: {}", policyName, e.getMessage());
return false;
}
}
/**
* Loads the ISM policy template and applies the prefix.
*
* @param prefix the prefix to apply to policy configurations
* @return the policy JSON with the prefix applied
* @throws IOException if there's an error reading the policy template
*/
private static String loadPolicyTemplate(String prefix) throws IOException {
return IndexUtils.loadResourceAsString("/index/usage-event/opensearch_policy.json")
.replace("PREFIX", prefix);
}
/**
* Extracts the response body from a RawResponse.
*
* @param response the RawResponse
* @return the response body as a string, or a default message if extraction fails
*/
private static String extractResponseBody(RawResponse response) {
if (response.getEntity() == null) {
return "No response body";
}
try {
return new String(response.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8);
} catch (IOException e) {
return "Error reading response body: " + e.getMessage();
}
}
/**
* Extracts the response body from a Response.
*
* @param response the Response
* @return the response body as a string, or a default message if extraction fails
*/
private static String extractResponseBody(Response response) {
if (response.getEntity() == null) {
return "No response body";
}
try {
return new String(response.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8);
} catch (IOException e) {
return "Error reading response body: " + e.getMessage();
}
}
@ -134,7 +371,7 @@ public class UsageEventIndexUtils {
*
* <p>This method creates an index template that defines the structure and settings for usage
* event indices in standard Elasticsearch environments. The template is loaded from the resource
* file {@code /index/usage-event/index_template.json} and configured with the specified
* file {@code /index/usage-event/elasticsearch_template.json} and configured with the specified
* parameters.
*
* <p>The template includes:
@ -147,7 +384,7 @@ public class UsageEventIndexUtils {
* </ul>
*
* <p>The method uses the low-level REST client to make a PUT request to the {@code
* _index_template/{templateName}} endpoint, which provides upsert behavior.
* /_index_template/{templateName}} endpoint, which provides upsert behavior.
*
* @param esComponents the Elasticsearch components factory providing search client access
* @param templateName the name of the index template to create (e.g.,
@ -168,19 +405,17 @@ public class UsageEventIndexUtils {
String prefix)
throws IOException {
try {
String templateJson = loadResourceAsString("/index/usage-event/index_template.json");
String templateJson =
IndexUtils.loadResourceAsString("/index/usage-event/elasticsearch_template.json");
// Replace placeholders
templateJson = templateJson.replace("PREFIX", prefix);
templateJson = templateJson.replace("DUE_SHARDS", String.valueOf(numShards));
templateJson = templateJson.replace("DUE_REPLICAS", String.valueOf(numReplicas));
// Use the low-level client for index templates
String endpoint = "_index_template/" + templateName;
String endpoint = "/_index_template/" + templateName;
Request request = new Request("PUT", endpoint);
request.setJsonEntity(templateJson);
RawResponse response = esComponents.getSearchClient().performLowLevelRequest(request);
RawResponse response = IndexUtils.performPutRequest(esComponents, endpoint, templateJson);
if (response.getStatusLine().getStatusCode() == 200
|| response.getStatusLine().getStatusCode() == 201) {
@ -204,7 +439,7 @@ public class UsageEventIndexUtils {
*
* <p>This method creates an index template that defines the structure and settings for usage
* event indices in AWS OpenSearch environments. The template is loaded from the resource file
* {@code /index/usage-event/aws_es_index_template.json} and configured with the specified
* {@code /index/usage-event/opensearch_template.json} and configured with the specified
* parameters.
*
* <p>The template includes:
@ -217,7 +452,7 @@ public class UsageEventIndexUtils {
* </ul>
*
* <p>The method uses the low-level REST client to make a PUT request to the {@code
* _template/{templateName}} endpoint, which is the legacy template API used by AWS OpenSearch.
* /_template/{templateName}} endpoint, which is the legacy template API used by AWS OpenSearch.
* This provides upsert behavior.
*
* @param esComponents the Elasticsearch components factory providing search client access
@ -237,24 +472,25 @@ public class UsageEventIndexUtils {
String prefix)
throws IOException {
try {
String templateJson = loadResourceAsString("/index/usage-event/aws_es_index_template.json");
String templateJson;
String endpoint;
// Both AWS OpenSearch Service and self-hosted OpenSearch use the same endpoint and template
// format
templateJson = IndexUtils.loadResourceAsString("/index/usage-event/opensearch_template.json");
endpoint = "/_index_template/" + templateName;
// Replace placeholders
templateJson = templateJson.replace("PREFIX", prefix);
templateJson = templateJson.replace("DUE_SHARDS", String.valueOf(numShards));
templateJson = templateJson.replace("DUE_REPLICAS", String.valueOf(numReplicas));
// For AWS OpenSearch, we need to use the _template endpoint instead of _index_template
String endpoint = "_template/" + templateName;
// Use the low-level client to make the PUT request
Request request = new Request("PUT", endpoint);
request.setJsonEntity(templateJson);
RawResponse response = esComponents.getSearchClient().performLowLevelRequest(request);
RawResponse response = IndexUtils.performPutRequest(esComponents, endpoint, templateJson);
if (response.getStatusLine().getStatusCode() == 200
|| response.getStatusLine().getStatusCode() == 201) {
log.info("Successfully created OpenSearch index template: {}", templateName);
log.info("Successfully created/updated OpenSearch index template: {}", templateName);
} else {
log.warn(
"OpenSearch index template creation returned status: {}",
@ -263,6 +499,12 @@ public class UsageEventIndexUtils {
} catch (ResponseException e) {
if (e.getResponse().getStatusLine().getStatusCode() == 409) {
log.info("OpenSearch index template {} already exists", templateName);
} else if (e.getResponse().getStatusLine().getStatusCode() == 400) {
// Handle 400 Bad Request - this may indicate template format issues or unsupported features
log.warn(
"Index template creation failed with 400 Bad Request. This may indicate an issue with the template format or unsupported features. Template: {}",
templateName);
throw e;
} else {
throw e;
}
@ -328,6 +570,38 @@ public class UsageEventIndexUtils {
}
}
/**
* Creates an index with a write alias in a single request.
*
* <p>This method uses the common syntax supported by both Elasticsearch and OpenSearch to create
* an index and assign a write alias atomically. This is more efficient than creating the index
* and alias separately.
*
* @param esComponents the Elasticsearch/OpenSearch components factory
* @param indexName the name of the index to create
* @param aliasName the name of the alias to assign with is_write_index=true
* @throws IOException if there's an error creating the index
*/
private static void createIndexWithWriteAlias(
BaseElasticSearchComponentsFactory.BaseElasticSearchComponents esComponents,
String indexName,
String aliasName)
throws IOException {
String indexJson = String.format("{\"aliases\":{\"%s\":{\"is_write_index\":true}}}", aliasName);
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.source(indexJson, XContentType.JSON);
CreateIndexResponse response =
esComponents.getSearchClient().createIndex(request, RequestOptions.DEFAULT);
if (response.isAcknowledged()) {
log.info("Successfully created index: {} with write alias: {}", indexName, aliasName);
} else {
log.warn("Index creation not acknowledged: {}", indexName);
}
}
/**
* Creates an initial index for AWS OpenSearch usage events.
*
@ -335,9 +609,8 @@ public class UsageEventIndexUtils {
* OpenSearch environments. Unlike Elasticsearch data streams, OpenSearch uses numbered indices
* (e.g., "datahub_usage_event-000001") with aliases for rollover management.
*
* <p>The method loads the index configuration from the resource file {@code
* /index/usage-event/aws_es_index.json} and applies the specified prefix. The configuration
* includes alias settings that enable ISM policy rollover.
* <p>The method creates an empty index configuration and applies the specified prefix. The alias
* configuration is handled programmatically after index creation.
*
* <p>The created index includes:
*
@ -368,23 +641,13 @@ public class UsageEventIndexUtils {
esComponents.getSearchClient().indexExists(getRequest, RequestOptions.DEFAULT);
if (!exists) {
String indexJson = loadResourceAsString("/index/usage-event/aws_es_index.json");
// Replace PREFIX placeholder with actual prefix
indexJson = indexJson.replace("PREFIX", prefix);
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.source(indexJson, XContentType.JSON);
CreateIndexResponse response =
esComponents.getSearchClient().createIndex(request, RequestOptions.DEFAULT);
if (response.isAcknowledged()) {
log.info("Successfully created OpenSearch index: {}", indexName);
} else {
log.warn("OpenSearch index creation not acknowledged: {}", indexName);
}
// Create index with alias in a single request (common syntax for both Elasticsearch and
// OpenSearch)
String aliasName = prefix + "datahub_usage_event";
log.info("Creating new OpenSearch index: {} with alias: {}", indexName, aliasName);
createIndexWithWriteAlias(esComponents, indexName, aliasName);
} else {
log.info("OpenSearch index {} already exists", indexName);
log.info("OpenSearch index {} already exists - skipping creation", indexName);
}
} catch (OpenSearchStatusException e) {
if (e.getMessage().contains("resource_already_exists_exception")
@ -397,24 +660,92 @@ public class UsageEventIndexUtils {
}
/**
* Loads a resource file as a UTF-8 encoded string.
* Updates an existing ISM policy for AWS OpenSearch usage events.
*
* <p>This utility method reads a resource file from the classpath and returns its contents as a
* string. It's used internally by other methods to load JSON templates and configuration files.
* <p>This method updates an existing ISM policy using optimistic concurrency control with
* sequence numbers and primary terms. It first retrieves the current policy to get the sequence
* number and primary term, then updates the policy with the new configuration.
*
* <p>The method uses the class loader to locate the resource and reads it using a buffered input
* stream for efficient memory usage.
* <p>The method is non-fatal - if the policy cannot be updated (e.g., due to concurrent
* modifications), it logs a warning but does not throw an exception. This matches the behavior of
* the Docker script.
*
* @param resourcePath the path to the resource file (e.g., "/index/usage-event/policy.json")
* @return the contents of the resource file as a UTF-8 encoded string
* @throws IOException if the resource cannot be found or read
* @param esComponents the Elasticsearch components factory providing search client access
* @param policyName the name of the ISM policy to update (e.g., "datahub_usage_event_policy")
* @param prefix the index prefix to apply to policy configurations (e.g., "prod_")
* @throws IOException if there's an error reading the policy template or making the request
*/
private static String loadResourceAsString(String resourcePath) throws IOException {
try (InputStream inputStream = UsageEventIndexUtils.class.getResourceAsStream(resourcePath)) {
if (inputStream == null) {
throw new IOException("Resource not found: " + resourcePath);
public static void updateIsmPolicy(
BaseElasticSearchComponentsFactory.BaseElasticSearchComponents esComponents,
String policyName,
String prefix,
OperationContext operationContext)
throws IOException {
try {
String endpoint = "/_plugins/_ism/policies/" + policyName;
// Get existing policy to retrieve sequence number and primary term
RawResponse getResponse = IndexUtils.performGetRequest(esComponents, endpoint);
if (getResponse.getStatusLine().getStatusCode() != 200) {
log.warn("Could not get ISM policy {} for update. Ignoring.", policyName);
return;
}
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
String responseBody =
new String(getResponse.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8);
// Parse sequence number and primary term from response
// The response format is: {"policy_id": "...", "_seq_no": 123, "_primary_term": 456,
// "policy": {...}}
int seqNo = IndexUtils.extractJsonValue(operationContext, responseBody, "_seq_no");
int primaryTerm =
IndexUtils.extractJsonValue(operationContext, responseBody, "_primary_term");
if (seqNo == -1 || primaryTerm == -1) {
log.warn(
"Could not extract sequence number or primary term from ISM policy {}. Skipping update.",
policyName);
return;
}
// Load new policy configuration
String policyJson =
IndexUtils.loadResourceAsString("/index/usage-event/opensearch_policy.json");
policyJson = policyJson.replace("PREFIX", prefix);
// Update policy with optimistic concurrency control
String queryParams = "?if_seq_no=" + seqNo + "&if_primary_term=" + primaryTerm;
RawResponse updateResponse =
IndexUtils.performPutRequestWithParams(esComponents, endpoint, queryParams, policyJson);
if (updateResponse.getStatusLine().getStatusCode() == 200
|| updateResponse.getStatusLine().getStatusCode() == 201) {
log.info("Successfully updated ISM policy: {}", policyName);
} else {
log.warn(
"Failed to update ISM policy {} after retries (non-fatal). Status: {}",
policyName,
updateResponse.getStatusLine().getStatusCode());
}
} catch (ResponseException e) {
if (e.getResponse().getStatusLine().getStatusCode() == 409) {
log.warn("ISM policy {} was modified concurrently. Skipping update.", policyName);
} else if (e.getResponse().getStatusLine().getStatusCode() == 400) {
log.warn(
"Failed to update ISM policy {} (non-fatal). This may indicate that ISM policies are not supported in this environment. Status: {}",
policyName,
e.getResponse().getStatusLine().getStatusCode());
} else {
log.warn(
"Failed to update ISM policy {} (non-fatal). Status: {}",
policyName,
e.getResponse().getStatusLine().getStatusCode());
}
} catch (Exception e) {
log.warn(
"Unexpected error updating ISM policy {} (non-fatal): {}", policyName, e.getMessage());
}
}
}

View File

@ -0,0 +1,13 @@
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_age": "7d"
}
}
}
}
}
}

View File

@ -0,0 +1,31 @@
{
"index_patterns": ["*PREFIXdatahub_usage_event*"],
"data_stream": { },
"priority": 500,
"template": {
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"type": {
"type": "keyword"
},
"timestamp": {
"type": "date"
},
"userAgent": {
"type": "keyword"
},
"browserId": {
"type": "keyword"
}
}
},
"settings": {
"index.lifecycle.name": "PREFIXdatahub_usage_event_policy",
"index.number_of_shards": DUE_SHARDS,
"index.number_of_replicas": DUE_REPLICAS
}
}
}

View File

@ -0,0 +1,43 @@
{
"policy": {
"policy_id": "PREFIXdatahub_usage_event_policy",
"description": "Datahub Usage Event Policy",
"default_state": "Rollover",
"schema_version": 4,
"states": [
{
"name": "Rollover",
"actions": [
{
"rollover": {
"min_size": "5gb"
}
}
],
"transitions": [
{
"state_name": "ReadOnly",
"conditions": {
"min_index_age": "14d"
}
}
]
},
{
"name": "ReadOnly",
"actions": [
{
"read_only": {}
}
],
"transitions": []
}
],
"ism_template": {
"index_patterns": [
"PREFIXdatahub_usage_event-*"
],
"priority": 100
}
}
}

View File

@ -0,0 +1,30 @@
{
"index_patterns": ["PREFIXdatahub_usage_event-*"],
"priority": 500,
"template": {
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"type": {
"type": "keyword"
},
"timestamp": {
"type": "date"
},
"userAgent": {
"type": "keyword"
},
"browserId": {
"type": "keyword"
}
}
},
"settings": {
"index.opendistro.index_state_management.rollover_alias": "PREFIXdatahub_usage_event",
"index.number_of_shards": DUE_SHARDS,
"index.number_of_replicas": DUE_REPLICAS
}
}
}

View File

@ -24,6 +24,8 @@ import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.CreateIndexResponse;
import org.opensearch.client.indices.GetIndexRequest;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@ -68,7 +70,24 @@ public class CreateUsageEventIndicesStepTest {
step = new CreateUsageEventIndicesStep(esComponents, configurationProvider);
}
@BeforeClass
public void setup() {
System.setProperty("ENABLE_SYSTEM_UPDATE_DUE", "true");
}
@AfterClass
public void cleanup() {
System.clearProperty("ENABLE_SYSTEM_UPDATE_DUE");
}
private void setupMockClientResponses() throws IOException {
// Mock ShimConfiguration for AWS detection
SearchClientShim.ShimConfiguration shimConfig =
Mockito.mock(SearchClientShim.ShimConfiguration.class);
Mockito.when(searchClient.getShimConfiguration()).thenReturn(shimConfig);
Mockito.when(shimConfig.getHost())
.thenReturn("localhost"); // Non-AWS host for self-hosted OpenSearch
// Mock RawResponse for low-level requests (ILM/ISM policies, index templates)
Mockito.when(rawResponse.getStatusLine())
.thenReturn(
@ -125,21 +144,39 @@ public class CreateUsageEventIndicesStepTest {
}
@Test
public void testExecutable_AnalyticsDisabled() throws Exception {
public void testSkip_AnalyticsDisabled() throws Exception {
// Arrange
Mockito.when(platformAnalytics.isEnabled()).thenReturn(false);
// Act
Function<UpgradeContext, UpgradeStepResult> executable = step.executable();
UpgradeStepResult result = executable.apply(upgradeContext);
boolean shouldSkip = step.skip(upgradeContext);
// Assert
Assert.assertNotNull(result);
Assert.assertEquals(result.stepId(), "CreateUsageEventIndicesStep");
Assert.assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED);
Assert.assertTrue(shouldSkip);
Mockito.verify(platformAnalytics).isEnabled();
}
// Verify that no Elasticsearch operations were called
Mockito.verify(searchClient, Mockito.never()).getEngineType();
@Test
public void testSkip_AnalyticsEnabled() throws Exception {
// Arrange
Mockito.when(platformAnalytics.isEnabled()).thenReturn(true);
// Act
boolean shouldSkip = step.skip(upgradeContext);
// Assert
Assert.assertFalse(shouldSkip);
Mockito.verify(platformAnalytics).isEnabled();
}
@Test
public void testSkip_ConfigurationProviderException() throws Exception {
// Arrange
Mockito.when(platformAnalytics.isEnabled()).thenThrow(new RuntimeException("Config error"));
// Act & Assert
Assert.assertThrows(RuntimeException.class, () -> step.skip(upgradeContext));
Mockito.verify(platformAnalytics).isEnabled();
}
@Test
@ -220,26 +257,12 @@ public class CreateUsageEventIndicesStepTest {
Assert.assertEquals(result.result(), DataHubUpgradeState.FAILED);
}
@Test
public void testExecutable_ConfigurationProviderException() throws Exception {
// Arrange
Mockito.when(platformAnalytics.isEnabled()).thenThrow(new RuntimeException("Config error"));
// Act
Function<UpgradeContext, UpgradeStepResult> executable = step.executable();
UpgradeStepResult result = executable.apply(upgradeContext);
// Assert
Assert.assertNotNull(result);
Assert.assertEquals(result.stepId(), "CreateUsageEventIndicesStep");
Assert.assertEquals(result.result(), DataHubUpgradeState.FAILED);
}
@Test
public void testExecutable_EngineTypeException() throws Exception {
// Arrange
Mockito.when(platformAnalytics.isEnabled()).thenReturn(true);
Mockito.when(searchClient.getEngineType()).thenThrow(new RuntimeException("Engine type error"));
// Throw exception in a method that executable() actually calls
Mockito.when(esComponents.getSearchClient().getEngineType())
.thenThrow(new RuntimeException("Engine type error"));
// Act
Function<UpgradeContext, UpgradeStepResult> executable = step.executable();
@ -271,17 +294,21 @@ public class CreateUsageEventIndicesStepTest {
Mockito.verify(index).getPrefix();
// Verify that the low-level requests were made with correct names (no underscore prefix)
Mockito.verify(searchClient, Mockito.atLeast(2)).performLowLevelRequest(Mockito.any());
// Verify specific endpoint calls were made
Mockito.verify(searchClient)
.performLowLevelRequest(
Mockito.argThat(
request -> request.getEndpoint().equals("_ilm/policy/datahub_usage_event_policy")));
request ->
request.getEndpoint().equals("/_ilm/policy/datahub_usage_event_policy")));
Mockito.verify(searchClient)
.performLowLevelRequest(
Mockito.argThat(
request ->
request
.getEndpoint()
.equals("_index_template/datahub_usage_event_index_template")));
.equals("/_index_template/datahub_usage_event_index_template")));
}
@Test
@ -304,17 +331,21 @@ public class CreateUsageEventIndicesStepTest {
Mockito.verify(index).getPrefix();
// Verify that the low-level requests were made with correct names (no underscore prefix)
Mockito.verify(searchClient, Mockito.atLeast(2)).performLowLevelRequest(Mockito.any());
// Verify specific endpoint calls were made
Mockito.verify(searchClient)
.performLowLevelRequest(
Mockito.argThat(
request -> request.getEndpoint().equals("_ilm/policy/datahub_usage_event_policy")));
request ->
request.getEndpoint().equals("/_ilm/policy/datahub_usage_event_policy")));
Mockito.verify(searchClient)
.performLowLevelRequest(
Mockito.argThat(
request ->
request
.getEndpoint()
.equals("_index_template/datahub_usage_event_index_template")));
.equals("/_index_template/datahub_usage_event_index_template")));
}
@Test
@ -341,14 +372,14 @@ public class CreateUsageEventIndicesStepTest {
.performLowLevelRequest(
Mockito.argThat(
request ->
request.getEndpoint().equals("_ilm/policy/prod_datahub_usage_event_policy")));
request.getEndpoint().equals("/_ilm/policy/prod_datahub_usage_event_policy")));
Mockito.verify(searchClient)
.performLowLevelRequest(
Mockito.argThat(
request ->
request
.getEndpoint()
.equals("_index_template/prod_datahub_usage_event_index_template")));
.equals("/_index_template/prod_datahub_usage_event_index_template")));
}
@Test
@ -378,7 +409,8 @@ public class CreateUsageEventIndicesStepTest {
request ->
request
.getEndpoint()
.equals("_ilm/policy/kbcpyv7ss3-staging-test_datahub_usage_event_policy")));
.equals(
"/_ilm/policy/kbcpyv7ss3-staging-test_datahub_usage_event_policy")));
Mockito.verify(searchClient)
.performLowLevelRequest(
Mockito.argThat(
@ -386,7 +418,7 @@ public class CreateUsageEventIndicesStepTest {
request
.getEndpoint()
.equals(
"_index_template/kbcpyv7ss3-staging-test_datahub_usage_event_index_template")));
"/_index_template/kbcpyv7ss3-staging-test_datahub_usage_event_index_template")));
}
@Test
@ -410,18 +442,21 @@ public class CreateUsageEventIndicesStepTest {
Mockito.verify(index).getPrefix();
// Verify that the low-level requests were made with correct names (no underscore prefix)
// Note: createIsmPolicy makes 2 calls - one for creation and one for update attempt
Mockito.verify(searchClient, Mockito.atLeast(1))
.performLowLevelRequest(
Mockito.argThat(
request ->
request
.getEndpoint()
.equals("/_plugins/_ism/policies/datahub_usage_event_policy")));
Mockito.verify(searchClient)
.performLowLevelRequest(
Mockito.argThat(
request ->
request
.getEndpoint()
.equals("_plugins/_ism/policies/datahub_usage_event_policy")));
Mockito.verify(searchClient)
.performLowLevelRequest(
Mockito.argThat(
request ->
request.getEndpoint().equals("_template/datahub_usage_event_index_template")));
.equals("/_index_template/datahub_usage_event_index_template")));
}
@Test
@ -445,20 +480,21 @@ public class CreateUsageEventIndicesStepTest {
Mockito.verify(index).getPrefix();
// Verify that the low-level requests were made with correct names (with underscore prefix)
// Note: createIsmPolicy makes 2 calls - one for creation and one for update attempt
Mockito.verify(searchClient, Mockito.atLeast(1))
.performLowLevelRequest(
Mockito.argThat(
request ->
request
.getEndpoint()
.equals("/_plugins/_ism/policies/prod_datahub_usage_event_policy")));
Mockito.verify(searchClient)
.performLowLevelRequest(
Mockito.argThat(
request ->
request
.getEndpoint()
.equals("_plugins/_ism/policies/prod_datahub_usage_event_policy")));
Mockito.verify(searchClient)
.performLowLevelRequest(
Mockito.argThat(
request ->
request
.getEndpoint()
.equals("_template/prod_datahub_usage_event_index_template")));
.equals("/_index_template/prod_datahub_usage_event_index_template")));
}
@Test

View File

@ -0,0 +1,314 @@
package com.linkedin.datahub.upgrade.system.elasticsearch.util;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
import com.linkedin.metadata.utils.elasticsearch.SearchClientShim;
import com.linkedin.metadata.utils.elasticsearch.responses.RawResponse;
import io.datahubproject.metadata.context.OperationContext;
import java.io.IOException;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.opensearch.client.Request;
import org.opensearch.client.ResponseException;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class IndexUtilsTest {
@Mock private BaseElasticSearchComponentsFactory.BaseElasticSearchComponents esComponents;
@Mock private SearchClientShim searchClient;
@Mock private RawResponse rawResponse;
@Mock private ResponseException responseException;
@Mock private OperationContext operationContext;
@Mock private ObjectMapper objectMapper;
@BeforeMethod
public void setUp() {
MockitoAnnotations.openMocks(this);
Mockito.when(esComponents.getSearchClient()).thenReturn(searchClient);
Mockito.when(operationContext.getObjectMapper()).thenReturn(objectMapper);
}
@Test
public void testCreatePrefixedName_WithPrefix() {
// Arrange
String prefix = "test";
String resourceName = "index";
// Act
String result = IndexUtils.createPrefixedName(prefix, resourceName);
// Assert
Assert.assertEquals(result, "test_index");
}
@Test
public void testCreatePrefixedName_EmptyPrefix() {
// Arrange
String prefix = "";
String resourceName = "index";
// Act
String result = IndexUtils.createPrefixedName(prefix, resourceName);
// Assert
Assert.assertEquals(result, "index");
}
@Test
public void testCreatePrefixedName_NullPrefix() {
// Arrange
String prefix = null;
String resourceName = "index";
// Act
String result = IndexUtils.createPrefixedName(prefix, resourceName);
// Assert
Assert.assertEquals(result, "index");
}
@Test
public void testIsAwsOpenSearchService_AwsHost() {
// Arrange
Mockito.when(searchClient.getShimConfiguration())
.thenReturn(Mockito.mock(SearchClientShim.ShimConfiguration.class));
Mockito.when(searchClient.getShimConfiguration().getHost())
.thenReturn(
"vpc-usw2-staging-shared-xg5onkhkidjnmvfmjqi35vp7vu.us-west-2.es.amazonaws.com");
// Act
boolean result = IndexUtils.isAwsOpenSearchService(esComponents);
// Assert
Assert.assertTrue(result);
}
@Test
public void testIsAwsOpenSearchService_LocalHost() {
// Arrange
Mockito.when(searchClient.getShimConfiguration())
.thenReturn(Mockito.mock(SearchClientShim.ShimConfiguration.class));
Mockito.when(searchClient.getShimConfiguration().getHost()).thenReturn("localhost:9200");
// Act
boolean result = IndexUtils.isAwsOpenSearchService(esComponents);
// Assert
Assert.assertFalse(result);
}
@Test
public void testIsAwsOpenSearchService_ElasticCloudHost() {
// Arrange
Mockito.when(searchClient.getShimConfiguration())
.thenReturn(Mockito.mock(SearchClientShim.ShimConfiguration.class));
Mockito.when(searchClient.getShimConfiguration().getHost())
.thenReturn("my-cluster.es.us-central1.gcp.cloud.es.io");
// Act
boolean result = IndexUtils.isAwsOpenSearchService(esComponents);
// Assert
Assert.assertFalse(result);
}
@Test
public void testPerformGetRequest_Success() throws IOException {
// Arrange
String endpoint = "/_cluster/health";
Mockito.when(searchClient.performLowLevelRequest(Mockito.any(Request.class)))
.thenReturn(rawResponse);
// Act
RawResponse result = IndexUtils.performGetRequest(esComponents, endpoint);
// Assert
Assert.assertNotNull(result);
Mockito.verify(searchClient).performLowLevelRequest(Mockito.any(Request.class));
}
@Test
public void testPerformPutRequest_Success() throws IOException {
// Arrange
String endpoint = "/_index_template/test";
String jsonBody = "{\"template\": {\"mappings\": {}}}";
Mockito.when(searchClient.performLowLevelRequest(Mockito.any(Request.class)))
.thenReturn(rawResponse);
// Act
RawResponse result = IndexUtils.performPutRequest(esComponents, endpoint, jsonBody);
// Assert
Assert.assertNotNull(result);
Mockito.verify(searchClient).performLowLevelRequest(Mockito.any(Request.class));
}
@Test
public void testPerformPostRequest_Success() throws IOException {
// Arrange
String endpoint = "/_bulk";
String jsonBody = "{\"index\": {}}";
Mockito.when(searchClient.performLowLevelRequest(Mockito.any(Request.class)))
.thenReturn(rawResponse);
// Act
RawResponse result = IndexUtils.performPostRequest(esComponents, endpoint, jsonBody);
// Assert
Assert.assertNotNull(result);
Mockito.verify(searchClient).performLowLevelRequest(Mockito.any(Request.class));
}
@Test
public void testPerformPutRequestWithParams_Success() throws IOException {
// Arrange
String endpoint = "/_index_template/test";
String queryParams = "create=true";
String jsonBody = "{\"template\": {\"mappings\": {}}}";
Mockito.when(searchClient.performLowLevelRequest(Mockito.any(Request.class)))
.thenReturn(rawResponse);
// Act
RawResponse result =
IndexUtils.performPutRequestWithParams(esComponents, endpoint, queryParams, jsonBody);
// Assert
Assert.assertNotNull(result);
Mockito.verify(searchClient).performLowLevelRequest(Mockito.any(Request.class));
}
@Test
public void testRetryWithBackoff_Success() {
// Arrange
IndexUtils.RetryableOperation operation = () -> true;
// Act
boolean result = IndexUtils.retryWithBackoff(3, 100, operation);
// Assert
Assert.assertTrue(result);
}
@Test
public void testRetryWithBackoff_Failure() {
// Arrange
IndexUtils.RetryableOperation operation =
() -> {
throw new RuntimeException("Test error");
};
// Act
boolean result = IndexUtils.retryWithBackoff(3, 100, operation);
// Assert
Assert.assertFalse(result);
}
@Test
public void testRetryWithDefaultConfig_Success() {
// Arrange
IndexUtils.RetryableOperation operation = () -> true;
// Act
boolean result = IndexUtils.retryWithDefaultConfig(operation);
// Assert
Assert.assertTrue(result);
}
@Test
public void testRetryWithDefaultConfig_Failure() {
// Arrange
IndexUtils.RetryableOperation operation =
() -> {
throw new RuntimeException("Test error");
};
// Act
boolean result = IndexUtils.retryWithDefaultConfig(operation);
// Assert
Assert.assertFalse(result);
}
@Test
public void testExtractJsonValue_Success() throws Exception {
// Arrange
String json = "{\"_seq_no\": 123, \"_primary_term\": 456}";
com.fasterxml.jackson.databind.JsonNode jsonNode =
Mockito.mock(com.fasterxml.jackson.databind.JsonNode.class);
com.fasterxml.jackson.databind.JsonNode valueNode =
Mockito.mock(com.fasterxml.jackson.databind.JsonNode.class);
Mockito.when(objectMapper.readTree(json)).thenReturn(jsonNode);
Mockito.when(jsonNode.get("_seq_no")).thenReturn(valueNode);
Mockito.when(valueNode.isNumber()).thenReturn(true);
Mockito.when(valueNode.asInt()).thenReturn(123);
// Act
int result = IndexUtils.extractJsonValue(operationContext, json, "_seq_no");
// Assert
Assert.assertEquals(result, 123);
}
@Test
public void testExtractJsonValue_KeyNotFound() throws Exception {
// Arrange
String json = "{\"_primary_term\": 456}";
com.fasterxml.jackson.databind.JsonNode jsonNode =
Mockito.mock(com.fasterxml.jackson.databind.JsonNode.class);
Mockito.when(objectMapper.readTree(json)).thenReturn(jsonNode);
Mockito.when(jsonNode.get("_seq_no")).thenReturn(null);
// Act
int result = IndexUtils.extractJsonValue(operationContext, json, "_seq_no");
// Assert
Assert.assertEquals(result, -1);
}
@Test
public void testExtractJsonValue_InvalidJson() throws Exception {
// Arrange
String json = "invalid json";
Mockito.when(objectMapper.readTree(json)).thenThrow(new RuntimeException("Invalid JSON"));
// Act
int result = IndexUtils.extractJsonValue(operationContext, json, "_seq_no");
// Assert
Assert.assertEquals(result, -1);
}
@Test
public void testLoadResourceAsString_Success() throws IOException {
// Arrange
String resourcePath = "/index/usage-event/opensearch_policy.json";
// Act & Assert
// This test verifies the method doesn't throw an exception
// The actual content depends on the resource file
try {
String result = IndexUtils.loadResourceAsString(resourcePath);
Assert.assertNotNull(result);
} catch (IOException e) {
// Resource might not exist in test environment, which is acceptable
Assert.assertTrue(
e.getMessage().contains("resource") || e.getMessage().contains("not found"));
}
}
@Test(expectedExceptions = IOException.class)
public void testLoadResourceAsString_NonExistentResource() throws IOException {
// Arrange
String resourcePath = "/non/existent/resource.json";
// Act
IndexUtils.loadResourceAsString(resourcePath);
}
}