MINOR: Remove ILM from Data Insights Data Streams (#21019)

* Remove ILM from Data Insights Data Streams

* Put DataInsights healthcheck wihtin SearchValidation

(cherry picked from commit 2ac773f4df28eff541dc3ac0ad98f070ee49b549)
This commit is contained in:
IceS2 2025-04-29 14:48:58 +02:00 committed by OpenMetadata Release Bot
parent 67a389992a
commit 6022cc4bf2
9 changed files with 42 additions and 293 deletions

View File

@ -186,8 +186,6 @@ public class DataInsightsApp extends AbstractNativeApplication {
dataAssetIndex,
language,
dataAssetsConfig.getRetention());
} else {
searchInterface.updateLifecyclePolicy(dataAssetsConfig.getRetention());
}
}
} catch (IOException ex) {

View File

@ -10,8 +10,6 @@ import org.openmetadata.service.util.JsonUtils;
public interface DataInsightsSearchInterface {
String DATA_INSIGHTS_SEARCH_CONFIG_PATH = "/dataInsights/config.json";
void createLifecyclePolicy(String name, String policy) throws IOException;
void createComponentTemplate(String name, String template) throws IOException;
void createIndexTemplate(String name, String template) throws IOException;
@ -91,8 +89,6 @@ public interface DataInsightsSearchInterface {
void deleteDataAssetDataStream(String name) throws IOException;
void updateLifecyclePolicy(int retentionDays) throws IOException;
Boolean dataAssetDataStreamExists(String name) throws IOException;
String getClusterAlias();

View File

@ -1,84 +0,0 @@
package org.openmetadata.service.apps.bundles.insights.search;
import java.util.List;
import java.util.Map;
import lombok.Getter;
import org.openmetadata.service.util.JsonUtils;
@Getter
public class IndexLifecyclePolicyConfig {
private final int retentionDays;
public enum SearchType {
OPENSEARCH("opensearch"),
ELASTICSEARCH("elasticsearch");
private final String value;
SearchType(String value) {
this.value = value;
}
public String toString() {
return value;
}
public static SearchType fromString(String value) {
for (SearchType type : SearchType.values()) {
if (type.value.equalsIgnoreCase(value)) {
return type;
}
}
throw new IllegalArgumentException("Unknown SearchType: " + value);
}
}
public IndexLifecyclePolicyConfig(String policyName, String policyStr, SearchType searchType) {
this.retentionDays = parseRetentionDays(policyName, policyStr, searchType);
}
public IndexLifecyclePolicyConfig(int retentionDays) {
this.retentionDays = retentionDays;
}
private Integer parseRetentionDays(String policyName, String policyStr, SearchType searchType) {
if (searchType.equals(SearchType.ELASTICSEARCH)) {
return parseElasticSearchRetentionDays(policyName, policyStr);
} else {
return parseOpenSearchRetentionDays(policyName, policyStr);
}
}
private Integer parseOpenSearchRetentionDays(String policyName, String policyStr) {
Map<String, Map<String, Object>> policyMap = JsonUtils.readOrConvertValue(policyStr, Map.class);
List<Map<String, Object>> states =
JsonUtils.readOrConvertValue(policyMap.get("policy").get("states"), List.class);
for (Map<String, Object> state : states) {
if (state.get("name").equals("warm")) {
List<Map<String, Object>> transitions =
JsonUtils.readOrConvertValue(state.get("transitions"), List.class);
Map<String, String> conditions =
JsonUtils.readOrConvertValue(transitions.get(0).get("conditions"), Map.class);
return parseRetentionStr(conditions.get("min_index_age"));
}
}
return null;
}
private int parseElasticSearchRetentionDays(String policyName, String policyStr) {
Map<String, Map<String, Map<String, Object>>> policyMap =
JsonUtils.readOrConvertValue(policyStr, Map.class);
Map<String, Object> phasesMap =
JsonUtils.readOrConvertValue(
policyMap.get(policyName).get("policy").get("phases"), Map.class);
Map<String, Object> deletePhase =
JsonUtils.readOrConvertValue(phasesMap.get("delete"), Map.class);
String retentionStr = (String) deletePhase.get("min_age");
return parseRetentionStr(retentionStr);
}
private int parseRetentionStr(String retentionStr) {
return Integer.parseInt(retentionStr.replaceAll("\\D", ""));
}
}

View File

@ -4,16 +4,13 @@ import es.org.elasticsearch.client.Request;
import es.org.elasticsearch.client.Response;
import es.org.elasticsearch.client.RestClient;
import java.io.IOException;
import org.apache.http.util.EntityUtils;
import org.openmetadata.service.apps.bundles.insights.search.DataInsightsSearchInterface;
import org.openmetadata.service.apps.bundles.insights.search.IndexLifecyclePolicyConfig;
import org.openmetadata.service.apps.bundles.insights.search.IndexTemplate;
import org.openmetadata.service.search.models.IndexMapping;
public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterface {
private final RestClient client;
private final String resourcePath = "/dataInsights/elasticsearch";
private final String lifecyclePolicyName = "di-data-assets-lifecycle";
private final String clusterAlias;
public ElasticSearchDataInsightsClient(RestClient client, String clusterAlias) {
@ -38,11 +35,6 @@ public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterf
return client.performRequest(request);
}
@Override
public void createLifecyclePolicy(String name, String policy) throws IOException {
performRequest("PUT", String.format("/_ilm/policy/%s", name), policy);
}
@Override
public void createComponentTemplate(String name, String template) throws IOException {
performRequest("PUT", String.format("/_component_template/%s", name), template);
@ -72,14 +64,6 @@ public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterf
String language,
int retentionDays)
throws IOException {
createLifecyclePolicy(
getStringWithClusterAlias(lifecyclePolicyName),
buildLifecyclePolicy(
readResource(String.format("%s/indexLifecyclePolicy.json", resourcePath)),
retentionDays));
createComponentTemplate(
getStringWithClusterAlias("di-data-assets-settings"),
readResource(String.format("%s/indexSettingsTemplate.json", resourcePath)));
createComponentTemplate(
getStringWithClusterAlias("di-data-assets-mapping"),
buildMapping(
@ -94,38 +78,6 @@ public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterf
createDataStream(name);
}
private String buildLifecyclePolicy(String lifecyclePolicy, int retentionDays) {
return lifecyclePolicy
.replace("{{retention}}", String.valueOf(retentionDays))
.replace("{{halfRetention}}", String.valueOf(retentionDays / 2));
}
@Override
public void updateLifecyclePolicy(int retentionDays) throws IOException {
String currentLifecyclePolicy =
EntityUtils.toString(
performRequest(
"GET",
String.format(
"/_ilm/policy/%s", getStringWithClusterAlias(lifecyclePolicyName)))
.getEntity());
if (new IndexLifecyclePolicyConfig(
getStringWithClusterAlias(lifecyclePolicyName),
currentLifecyclePolicy,
IndexLifecyclePolicyConfig.SearchType.ELASTICSEARCH)
.getRetentionDays()
!= retentionDays) {
String updatedLifecyclePolicy =
buildLifecyclePolicy(
readResource(String.format("%s/indexLifecyclePolicy.json", resourcePath)),
retentionDays);
performRequest(
"PUT",
String.format("/_ilm/policy/%s", getStringWithClusterAlias(lifecyclePolicyName)),
updatedLifecyclePolicy);
}
}
@Override
public void deleteDataAssetDataStream(String name) throws IOException {
performRequest("DELETE", String.format("/_data_stream/%s", name));

View File

@ -1,20 +1,16 @@
package org.openmetadata.service.apps.bundles.insights.search.opensearch;
import java.io.IOException;
import org.apache.http.util.EntityUtils;
import org.openmetadata.service.apps.bundles.insights.search.DataInsightsSearchInterface;
import org.openmetadata.service.apps.bundles.insights.search.IndexLifecyclePolicyConfig;
import org.openmetadata.service.apps.bundles.insights.search.IndexTemplate;
import org.openmetadata.service.search.models.IndexMapping;
import os.org.opensearch.client.Request;
import os.org.opensearch.client.Response;
import os.org.opensearch.client.ResponseException;
import os.org.opensearch.client.RestClient;
public class OpenSearchDataInsightsClient implements DataInsightsSearchInterface {
private final RestClient client;
private final String resourcePath = "/dataInsights/opensearch";
private final String lifecyclePolicyName = "di-data-assets-lifecycle";
private final String clusterAlias;
public OpenSearchDataInsightsClient(RestClient client, String clusterAlias) {
@ -39,21 +35,6 @@ public class OpenSearchDataInsightsClient implements DataInsightsSearchInterface
return client.performRequest(request);
}
@Override
public void createLifecyclePolicy(String name, String policy) throws IOException {
try {
performRequest("PUT", String.format("/_plugins/_ism/policies/%s", name), policy);
} catch (ResponseException ex) {
// Conflict since the Policy already exists
if (ex.getResponse().getStatusLine().getStatusCode() == 409) {
performRequest("DELETE", String.format("/_plugins/_ism/policies/%s", name));
performRequest("PUT", String.format("/_plugins/_ism/policies/%s", name), policy);
} else {
throw ex;
}
}
}
@Override
public void createComponentTemplate(String name, String template) throws IOException {
performRequest("PUT", String.format("/_component_template/%s", name), template);
@ -83,11 +64,6 @@ public class OpenSearchDataInsightsClient implements DataInsightsSearchInterface
String language,
int retentionDays)
throws IOException {
createLifecyclePolicy(
getStringWithClusterAlias(lifecyclePolicyName),
buildLifecyclePolicy(
readResource(String.format("%s/indexLifecyclePolicy.json", resourcePath)),
retentionDays));
createComponentTemplate(
getStringWithClusterAlias("di-data-assets-mapping"),
buildMapping(
@ -102,36 +78,6 @@ public class OpenSearchDataInsightsClient implements DataInsightsSearchInterface
createDataStream(name);
}
private String buildLifecyclePolicy(String lifecyclePolicy, int retentionDays) {
return lifecyclePolicy
.replace("{{retention}}", String.valueOf(retentionDays))
.replace("{{halfRetention}}", String.valueOf(retentionDays / 2));
}
@Override
public void updateLifecyclePolicy(int retentionDays) throws IOException {
String currentLifecyclePolicy =
EntityUtils.toString(
performRequest(
"GET",
String.format(
"/_plugins/_ism/policies/%s",
getStringWithClusterAlias(lifecyclePolicyName)))
.getEntity());
if (new IndexLifecyclePolicyConfig(
getStringWithClusterAlias(lifecyclePolicyName),
currentLifecyclePolicy,
IndexLifecyclePolicyConfig.SearchType.OPENSEARCH)
.getRetentionDays()
!= retentionDays) {
String updatedLifecyclePolicy =
buildLifecyclePolicy(
readResource(String.format("%s/indexLifecyclePolicy.json", resourcePath)),
retentionDays);
createLifecyclePolicy(getStringWithClusterAlias(lifecyclePolicyName), updatedLifecyclePolicy);
}
}
@Override
public void deleteDataAssetDataStream(String name) throws IOException {
performRequest("DELETE", String.format("/_data_stream/%s", name));

View File

@ -4,6 +4,7 @@ import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.schema.type.EventType.ENTITY_CREATED;
import static org.openmetadata.schema.type.EventType.ENTITY_DELETED;
import static org.openmetadata.schema.type.EventType.ENTITY_UPDATED;
import static org.openmetadata.service.apps.bundles.insights.DataInsightsApp.getDataStreamName;
import java.util.ArrayList;
import java.util.HashSet;
@ -23,6 +24,7 @@ import org.openmetadata.schema.configuration.ExecutorConfiguration;
import org.openmetadata.schema.configuration.HistoryCleanUpConfiguration;
import org.openmetadata.schema.configuration.WorkflowSettings;
import org.openmetadata.schema.email.SmtpSettings;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineServiceClientResponse;
import org.openmetadata.schema.security.client.OpenMetadataJWTClientConfig;
import org.openmetadata.schema.service.configuration.slackApp.SlackAppConfiguration;
@ -37,6 +39,7 @@ import org.openmetadata.sdk.PipelineServiceClientInterface;
import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.exception.CustomExceptionMessage;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.fernet.Fernet;
import org.openmetadata.service.governance.workflows.WorkflowHandler;
import org.openmetadata.service.jdbi3.CollectionDAO.SystemDAO;
@ -48,6 +51,7 @@ import org.openmetadata.service.secrets.SecretsManagerFactory;
import org.openmetadata.service.secrets.masker.PasswordEntityMasker;
import org.openmetadata.service.security.JwtFilter;
import org.openmetadata.service.security.auth.LoginAttemptCache;
import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.OpenMetadataConnectionBuilder;
import org.openmetadata.service.util.RestUtil;
@ -490,12 +494,21 @@ public class SystemRepository {
&& searchRepository
.getSearchClient()
.indexExists(Entity.getSearchRepository().getIndexOrAliasName(INDEX_NAME))) {
return new StepValidation()
.withDescription(ValidationStepDescription.SEARCH.key)
.withPassed(Boolean.TRUE)
.withMessage(
String.format(
"Connected to %s", applicationConfig.getElasticSearchConfiguration().getHost()));
if (validateDataInsights()) {
return new StepValidation()
.withDescription(ValidationStepDescription.SEARCH.key)
.withPassed(Boolean.TRUE)
.withMessage(
String.format(
"Connected to %s",
applicationConfig.getElasticSearchConfiguration().getHost()));
} else {
return new StepValidation()
.withDescription(ValidationStepDescription.SEARCH.key)
.withPassed(Boolean.FALSE)
.withMessage(
"Data Insights Application is Installed but it is not reachable or available");
}
} else {
return new StepValidation()
.withDescription(ValidationStepDescription.SEARCH.key)
@ -504,6 +517,28 @@ public class SystemRepository {
}
}
private boolean validateDataInsights() {
boolean isValid = false;
AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION);
try {
App dataInsightsApp =
appRepository.getByName(null, "DataInsightsApplication", EntityUtil.Fields.EMPTY_FIELDS);
SearchRepository searchRepository = Entity.getSearchRepository();
String dataStreamName = getDataStreamName(searchRepository.getClusterAlias(), Entity.TABLE);
if (Boolean.TRUE.equals(searchRepository.getSearchClient().isClientAvailable())
&& searchRepository.getSearchClient().indexExists(dataStreamName)) {
isValid = true;
}
} catch (EntityNotFoundException e) {
isValid = true;
LOG.info("Data Insights Application is not installed. Skip Validation.");
}
return isValid;
}
private StepValidation getPipelineServiceClientValidation(
OpenMetadataApplicationConfig applicationConfig,
PipelineServiceClientInterface pipelineServiceClient) {

View File

@ -1,31 +0,0 @@
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_primary_shard_size": "10gb",
"max_age": "{{halfRetention}}d"
}
}
},
"warm": {
"min_age": "{{halfRetention}}d",
"actions": {
"shrink": {
"number_of_shards": 1
},
"forcemerge": {
"max_num_segments": 1
}
}
},
"delete": {
"min_age": "{{retention}}d",
"actions": {
"delete": {}
}
}
}
}
}

View File

@ -4,8 +4,7 @@
],
"data_stream": {},
"composed_of": [
"di-data-assets-mapping",
"di-data-assets-settings"
"di-data-assets-mapping"
],
"priority": 500
}

View File

@ -1,62 +0,0 @@
{
"policy": {
"description": "Lifecycle Policy for DataAssets Data Stream.",
"default_state": "hot",
"states": [
{
"name": "hot",
"actions": [
{
"rollover": {
"min_primary_shard_size": "10gb",
"min_index_age": "{{halfRetention}}d"
}
}
],
"transitions": [
{
"state_name": "warm",
"conditions": {
"min_index_age": "{{halfRetention}}d"
}
}
]
},
{
"name": "warm",
"actions": [
{
"shrink": {
"num_new_shards": 1
}
},
{
"force_merge": {
"max_num_segments": 1
}
}
],
"transitions": [
{
"state_name": "delete",
"conditions": {
"min_index_age": "{{retention}}d"
}
}
]
},
{
"name": "delete",
"actions": [
{
"delete": {}
}
]
}
],
"ism_template": {
"index_patterns": ["di-data-assets-*"],
"priority": 500
}
}
}