diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java index 11effae38a4..56c913d3a23 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java @@ -186,8 +186,6 @@ public class DataInsightsApp extends AbstractNativeApplication { dataAssetIndex, language, dataAssetsConfig.getRetention()); - } else { - searchInterface.updateLifecyclePolicy(dataAssetsConfig.getRetention()); } } } catch (IOException ex) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/DataInsightsSearchInterface.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/DataInsightsSearchInterface.java index 70fc3019cd5..a859b926185 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/DataInsightsSearchInterface.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/DataInsightsSearchInterface.java @@ -10,8 +10,6 @@ import org.openmetadata.service.util.JsonUtils; public interface DataInsightsSearchInterface { String DATA_INSIGHTS_SEARCH_CONFIG_PATH = "/dataInsights/config.json"; - void createLifecyclePolicy(String name, String policy) throws IOException; - void createComponentTemplate(String name, String template) throws IOException; void createIndexTemplate(String name, String template) throws IOException; @@ -91,8 +89,6 @@ public interface DataInsightsSearchInterface { void deleteDataAssetDataStream(String name) throws IOException; - void updateLifecyclePolicy(int retentionDays) throws IOException; - Boolean dataAssetDataStreamExists(String name) throws IOException; String getClusterAlias(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/IndexLifecyclePolicyConfig.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/IndexLifecyclePolicyConfig.java deleted file mode 100644 index 69fcf6375dd..00000000000 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/IndexLifecyclePolicyConfig.java +++ /dev/null @@ -1,84 +0,0 @@ -package org.openmetadata.service.apps.bundles.insights.search; - -import java.util.List; -import java.util.Map; -import lombok.Getter; -import org.openmetadata.service.util.JsonUtils; - -@Getter -public class IndexLifecyclePolicyConfig { - private final int retentionDays; - - public enum SearchType { - OPENSEARCH("opensearch"), - ELASTICSEARCH("elasticsearch"); - - private final String value; - - SearchType(String value) { - this.value = value; - } - - public String toString() { - return value; - } - - public static SearchType fromString(String value) { - for (SearchType type : SearchType.values()) { - if (type.value.equalsIgnoreCase(value)) { - return type; - } - } - throw new IllegalArgumentException("Unknown SearchType: " + value); - } - } - - public IndexLifecyclePolicyConfig(String policyName, String policyStr, SearchType searchType) { - this.retentionDays = parseRetentionDays(policyName, policyStr, searchType); - } - - public IndexLifecyclePolicyConfig(int retentionDays) { - this.retentionDays = retentionDays; - } - - private Integer parseRetentionDays(String policyName, String policyStr, SearchType searchType) { - if (searchType.equals(SearchType.ELASTICSEARCH)) { - return parseElasticSearchRetentionDays(policyName, policyStr); - } else { - return parseOpenSearchRetentionDays(policyName, policyStr); - } - } - - private Integer parseOpenSearchRetentionDays(String policyName, String policyStr) { - Map> policyMap = JsonUtils.readOrConvertValue(policyStr, Map.class); - List> states = - JsonUtils.readOrConvertValue(policyMap.get("policy").get("states"), List.class); - for (Map state : states) { - if (state.get("name").equals("warm")) { - List> transitions = - JsonUtils.readOrConvertValue(state.get("transitions"), List.class); - Map conditions = - JsonUtils.readOrConvertValue(transitions.get(0).get("conditions"), Map.class); - return parseRetentionStr(conditions.get("min_index_age")); - } - } - return null; - } - - private int parseElasticSearchRetentionDays(String policyName, String policyStr) { - Map>> policyMap = - JsonUtils.readOrConvertValue(policyStr, Map.class); - Map phasesMap = - JsonUtils.readOrConvertValue( - policyMap.get(policyName).get("policy").get("phases"), Map.class); - Map deletePhase = - JsonUtils.readOrConvertValue(phasesMap.get("delete"), Map.class); - - String retentionStr = (String) deletePhase.get("min_age"); - return parseRetentionStr(retentionStr); - } - - private int parseRetentionStr(String retentionStr) { - return Integer.parseInt(retentionStr.replaceAll("\\D", "")); - } -} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/elasticsearch/ElasticSearchDataInsightsClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/elasticsearch/ElasticSearchDataInsightsClient.java index 8295940ae40..c78950b0c0d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/elasticsearch/ElasticSearchDataInsightsClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/elasticsearch/ElasticSearchDataInsightsClient.java @@ -4,16 +4,13 @@ import es.org.elasticsearch.client.Request; import es.org.elasticsearch.client.Response; import es.org.elasticsearch.client.RestClient; import java.io.IOException; -import org.apache.http.util.EntityUtils; import org.openmetadata.service.apps.bundles.insights.search.DataInsightsSearchInterface; -import org.openmetadata.service.apps.bundles.insights.search.IndexLifecyclePolicyConfig; import org.openmetadata.service.apps.bundles.insights.search.IndexTemplate; import org.openmetadata.service.search.models.IndexMapping; public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterface { private final RestClient client; private final String resourcePath = "/dataInsights/elasticsearch"; - private final String lifecyclePolicyName = "di-data-assets-lifecycle"; private final String clusterAlias; public ElasticSearchDataInsightsClient(RestClient client, String clusterAlias) { @@ -38,11 +35,6 @@ public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterf return client.performRequest(request); } - @Override - public void createLifecyclePolicy(String name, String policy) throws IOException { - performRequest("PUT", String.format("/_ilm/policy/%s", name), policy); - } - @Override public void createComponentTemplate(String name, String template) throws IOException { performRequest("PUT", String.format("/_component_template/%s", name), template); @@ -72,14 +64,6 @@ public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterf String language, int retentionDays) throws IOException { - createLifecyclePolicy( - getStringWithClusterAlias(lifecyclePolicyName), - buildLifecyclePolicy( - readResource(String.format("%s/indexLifecyclePolicy.json", resourcePath)), - retentionDays)); - createComponentTemplate( - getStringWithClusterAlias("di-data-assets-settings"), - readResource(String.format("%s/indexSettingsTemplate.json", resourcePath))); createComponentTemplate( getStringWithClusterAlias("di-data-assets-mapping"), buildMapping( @@ -94,38 +78,6 @@ public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterf createDataStream(name); } - private String buildLifecyclePolicy(String lifecyclePolicy, int retentionDays) { - return lifecyclePolicy - .replace("{{retention}}", String.valueOf(retentionDays)) - .replace("{{halfRetention}}", String.valueOf(retentionDays / 2)); - } - - @Override - public void updateLifecyclePolicy(int retentionDays) throws IOException { - String currentLifecyclePolicy = - EntityUtils.toString( - performRequest( - "GET", - String.format( - "/_ilm/policy/%s", getStringWithClusterAlias(lifecyclePolicyName))) - .getEntity()); - if (new IndexLifecyclePolicyConfig( - getStringWithClusterAlias(lifecyclePolicyName), - currentLifecyclePolicy, - IndexLifecyclePolicyConfig.SearchType.ELASTICSEARCH) - .getRetentionDays() - != retentionDays) { - String updatedLifecyclePolicy = - buildLifecyclePolicy( - readResource(String.format("%s/indexLifecyclePolicy.json", resourcePath)), - retentionDays); - performRequest( - "PUT", - String.format("/_ilm/policy/%s", getStringWithClusterAlias(lifecyclePolicyName)), - updatedLifecyclePolicy); - } - } - @Override public void deleteDataAssetDataStream(String name) throws IOException { performRequest("DELETE", String.format("/_data_stream/%s", name)); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/opensearch/OpenSearchDataInsightsClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/opensearch/OpenSearchDataInsightsClient.java index 43378215cd9..46f9abccb92 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/opensearch/OpenSearchDataInsightsClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/opensearch/OpenSearchDataInsightsClient.java @@ -1,20 +1,16 @@ package org.openmetadata.service.apps.bundles.insights.search.opensearch; import java.io.IOException; -import org.apache.http.util.EntityUtils; import org.openmetadata.service.apps.bundles.insights.search.DataInsightsSearchInterface; -import org.openmetadata.service.apps.bundles.insights.search.IndexLifecyclePolicyConfig; import org.openmetadata.service.apps.bundles.insights.search.IndexTemplate; import org.openmetadata.service.search.models.IndexMapping; import os.org.opensearch.client.Request; import os.org.opensearch.client.Response; -import os.org.opensearch.client.ResponseException; import os.org.opensearch.client.RestClient; public class OpenSearchDataInsightsClient implements DataInsightsSearchInterface { private final RestClient client; private final String resourcePath = "/dataInsights/opensearch"; - private final String lifecyclePolicyName = "di-data-assets-lifecycle"; private final String clusterAlias; public OpenSearchDataInsightsClient(RestClient client, String clusterAlias) { @@ -39,21 +35,6 @@ public class OpenSearchDataInsightsClient implements DataInsightsSearchInterface return client.performRequest(request); } - @Override - public void createLifecyclePolicy(String name, String policy) throws IOException { - try { - performRequest("PUT", String.format("/_plugins/_ism/policies/%s", name), policy); - } catch (ResponseException ex) { - // Conflict since the Policy already exists - if (ex.getResponse().getStatusLine().getStatusCode() == 409) { - performRequest("DELETE", String.format("/_plugins/_ism/policies/%s", name)); - performRequest("PUT", String.format("/_plugins/_ism/policies/%s", name), policy); - } else { - throw ex; - } - } - } - @Override public void createComponentTemplate(String name, String template) throws IOException { performRequest("PUT", String.format("/_component_template/%s", name), template); @@ -83,11 +64,6 @@ public class OpenSearchDataInsightsClient implements DataInsightsSearchInterface String language, int retentionDays) throws IOException { - createLifecyclePolicy( - getStringWithClusterAlias(lifecyclePolicyName), - buildLifecyclePolicy( - readResource(String.format("%s/indexLifecyclePolicy.json", resourcePath)), - retentionDays)); createComponentTemplate( getStringWithClusterAlias("di-data-assets-mapping"), buildMapping( @@ -102,36 +78,6 @@ public class OpenSearchDataInsightsClient implements DataInsightsSearchInterface createDataStream(name); } - private String buildLifecyclePolicy(String lifecyclePolicy, int retentionDays) { - return lifecyclePolicy - .replace("{{retention}}", String.valueOf(retentionDays)) - .replace("{{halfRetention}}", String.valueOf(retentionDays / 2)); - } - - @Override - public void updateLifecyclePolicy(int retentionDays) throws IOException { - String currentLifecyclePolicy = - EntityUtils.toString( - performRequest( - "GET", - String.format( - "/_plugins/_ism/policies/%s", - getStringWithClusterAlias(lifecyclePolicyName))) - .getEntity()); - if (new IndexLifecyclePolicyConfig( - getStringWithClusterAlias(lifecyclePolicyName), - currentLifecyclePolicy, - IndexLifecyclePolicyConfig.SearchType.OPENSEARCH) - .getRetentionDays() - != retentionDays) { - String updatedLifecyclePolicy = - buildLifecyclePolicy( - readResource(String.format("%s/indexLifecyclePolicy.json", resourcePath)), - retentionDays); - createLifecyclePolicy(getStringWithClusterAlias(lifecyclePolicyName), updatedLifecyclePolicy); - } - } - @Override public void deleteDataAssetDataStream(String name) throws IOException { performRequest("DELETE", String.format("/_data_stream/%s", name)); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/SystemRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/SystemRepository.java index 511e21597cd..ad321c7d996 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/SystemRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/SystemRepository.java @@ -4,6 +4,7 @@ import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.schema.type.EventType.ENTITY_CREATED; import static org.openmetadata.schema.type.EventType.ENTITY_DELETED; import static org.openmetadata.schema.type.EventType.ENTITY_UPDATED; +import static org.openmetadata.service.apps.bundles.insights.DataInsightsApp.getDataStreamName; import java.util.ArrayList; import java.util.HashSet; @@ -23,6 +24,7 @@ import org.openmetadata.schema.configuration.ExecutorConfiguration; import org.openmetadata.schema.configuration.HistoryCleanUpConfiguration; import org.openmetadata.schema.configuration.WorkflowSettings; import org.openmetadata.schema.email.SmtpSettings; +import org.openmetadata.schema.entity.app.App; import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineServiceClientResponse; import org.openmetadata.schema.security.client.OpenMetadataJWTClientConfig; import org.openmetadata.schema.service.configuration.slackApp.SlackAppConfiguration; @@ -37,6 +39,7 @@ import org.openmetadata.sdk.PipelineServiceClientInterface; import org.openmetadata.service.Entity; import org.openmetadata.service.OpenMetadataApplicationConfig; import org.openmetadata.service.exception.CustomExceptionMessage; +import org.openmetadata.service.exception.EntityNotFoundException; import org.openmetadata.service.fernet.Fernet; import org.openmetadata.service.governance.workflows.WorkflowHandler; import org.openmetadata.service.jdbi3.CollectionDAO.SystemDAO; @@ -48,6 +51,7 @@ import org.openmetadata.service.secrets.SecretsManagerFactory; import org.openmetadata.service.secrets.masker.PasswordEntityMasker; import org.openmetadata.service.security.JwtFilter; import org.openmetadata.service.security.auth.LoginAttemptCache; +import org.openmetadata.service.util.EntityUtil; import org.openmetadata.service.util.JsonUtils; import org.openmetadata.service.util.OpenMetadataConnectionBuilder; import org.openmetadata.service.util.RestUtil; @@ -490,12 +494,21 @@ public class SystemRepository { && searchRepository .getSearchClient() .indexExists(Entity.getSearchRepository().getIndexOrAliasName(INDEX_NAME))) { - return new StepValidation() - .withDescription(ValidationStepDescription.SEARCH.key) - .withPassed(Boolean.TRUE) - .withMessage( - String.format( - "Connected to %s", applicationConfig.getElasticSearchConfiguration().getHost())); + if (validateDataInsights()) { + return new StepValidation() + .withDescription(ValidationStepDescription.SEARCH.key) + .withPassed(Boolean.TRUE) + .withMessage( + String.format( + "Connected to %s", + applicationConfig.getElasticSearchConfiguration().getHost())); + } else { + return new StepValidation() + .withDescription(ValidationStepDescription.SEARCH.key) + .withPassed(Boolean.FALSE) + .withMessage( + "Data Insights Application is Installed but it is not reachable or available"); + } } else { return new StepValidation() .withDescription(ValidationStepDescription.SEARCH.key) @@ -504,6 +517,28 @@ public class SystemRepository { } } + private boolean validateDataInsights() { + boolean isValid = false; + + AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION); + try { + App dataInsightsApp = + appRepository.getByName(null, "DataInsightsApplication", EntityUtil.Fields.EMPTY_FIELDS); + + SearchRepository searchRepository = Entity.getSearchRepository(); + String dataStreamName = getDataStreamName(searchRepository.getClusterAlias(), Entity.TABLE); + + if (Boolean.TRUE.equals(searchRepository.getSearchClient().isClientAvailable()) + && searchRepository.getSearchClient().indexExists(dataStreamName)) { + isValid = true; + } + } catch (EntityNotFoundException e) { + isValid = true; + LOG.info("Data Insights Application is not installed. Skip Validation."); + } + return isValid; + } + private StepValidation getPipelineServiceClientValidation( OpenMetadataApplicationConfig applicationConfig, PipelineServiceClientInterface pipelineServiceClient) { diff --git a/openmetadata-service/src/main/resources/dataInsights/elasticsearch/indexLifecyclePolicy.json b/openmetadata-service/src/main/resources/dataInsights/elasticsearch/indexLifecyclePolicy.json deleted file mode 100644 index a84a3f9afa5..00000000000 --- a/openmetadata-service/src/main/resources/dataInsights/elasticsearch/indexLifecyclePolicy.json +++ /dev/null @@ -1,31 +0,0 @@ -{ - "policy": { - "phases": { - "hot": { - "actions": { - "rollover": { - "max_primary_shard_size": "10gb", - "max_age": "{{halfRetention}}d" - } - } - }, - "warm": { - "min_age": "{{halfRetention}}d", - "actions": { - "shrink": { - "number_of_shards": 1 - }, - "forcemerge": { - "max_num_segments": 1 - } - } - }, - "delete": { - "min_age": "{{retention}}d", - "actions": { - "delete": {} - } - } - } - } -} \ No newline at end of file diff --git a/openmetadata-service/src/main/resources/dataInsights/elasticsearch/indexTemplate.json b/openmetadata-service/src/main/resources/dataInsights/elasticsearch/indexTemplate.json index 2561a30fc9b..a8dbd4ba905 100644 --- a/openmetadata-service/src/main/resources/dataInsights/elasticsearch/indexTemplate.json +++ b/openmetadata-service/src/main/resources/dataInsights/elasticsearch/indexTemplate.json @@ -4,8 +4,7 @@ ], "data_stream": {}, "composed_of": [ - "di-data-assets-mapping", - "di-data-assets-settings" + "di-data-assets-mapping" ], "priority": 500 } \ No newline at end of file diff --git a/openmetadata-service/src/main/resources/dataInsights/opensearch/indexLifecyclePolicy.json b/openmetadata-service/src/main/resources/dataInsights/opensearch/indexLifecyclePolicy.json deleted file mode 100644 index 365f2889607..00000000000 --- a/openmetadata-service/src/main/resources/dataInsights/opensearch/indexLifecyclePolicy.json +++ /dev/null @@ -1,62 +0,0 @@ -{ - "policy": { - "description": "Lifecycle Policy for DataAssets Data Stream.", - "default_state": "hot", - "states": [ - { - "name": "hot", - "actions": [ - { - "rollover": { - "min_primary_shard_size": "10gb", - "min_index_age": "{{halfRetention}}d" - } - } - ], - "transitions": [ - { - "state_name": "warm", - "conditions": { - "min_index_age": "{{halfRetention}}d" - } - } - ] - }, - { - "name": "warm", - "actions": [ - { - "shrink": { - "num_new_shards": 1 - } - }, - { - "force_merge": { - "max_num_segments": 1 - } - } - ], - "transitions": [ - { - "state_name": "delete", - "conditions": { - "min_index_age": "{{retention}}d" - } - } - ] - }, - { - "name": "delete", - "actions": [ - { - "delete": {} - } - ] - } - ], - "ism_template": { - "index_patterns": ["di-data-assets-*"], - "priority": 500 - } - } -} \ No newline at end of file