diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java index 90a9bdbd397..dce85e5dc34 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java @@ -102,18 +102,25 @@ public class DataInsightsApp extends AbstractNativeApplication { .equals(ElasticSearchConfiguration.SearchType.ELASTICSEARCH)) { searchInterface = new ElasticSearchDataInsightsClient( - (RestClient) searchRepository.getSearchClient().getLowLevelClient()); + (RestClient) searchRepository.getSearchClient().getLowLevelClient(), + searchRepository.getClusterAlias()); } else { searchInterface = new OpenSearchDataInsightsClient( (os.org.opensearch.client.RestClient) - searchRepository.getSearchClient().getLowLevelClient()); + searchRepository.getSearchClient().getLowLevelClient(), + searchRepository.getClusterAlias()); } return searchInterface; } - public static String getDataStreamName(String dataAssetType) { - return String.format("%s-%s", DATA_ASSET_INDEX_PREFIX, dataAssetType).toLowerCase(); + public static String getDataStreamName(String prefix, String dataAssetType) { + String dataStreamName = + String.format("%s-%s", DATA_ASSET_INDEX_PREFIX, dataAssetType).toLowerCase(); + if (!(prefix == null || prefix.isEmpty())) { + dataStreamName = String.format("%s-%s", prefix, dataStreamName); + } + return dataStreamName; } private void createIndexInternal(String entityType) throws IOException { @@ -122,10 +129,12 @@ public class DataInsightsApp extends AbstractNativeApplication { searchRepository.createIndex(resultIndexType); } DataInsightsSearchInterface searchInterface = getSearchInterface(); - if (!searchInterface.dataAssetDataStreamExists(getDataStreamName(entityType))) { + if (!searchInterface.dataAssetDataStreamExists( + getDataStreamName(searchRepository.getClusterAlias(), entityType))) { searchRepository .getSearchClient() - .addIndexAlias(resultIndexType, getDataStreamName(entityType)); + .addIndexAlias( + resultIndexType, getDataStreamName(searchRepository.getClusterAlias(), entityType)); } } @@ -164,7 +173,8 @@ public class DataInsightsApp extends AbstractNativeApplication { try { for (String dataAssetType : dataAssetTypes) { IndexMapping dataAssetIndex = searchRepository.getIndexMapping(dataAssetType); - String dataStreamName = getDataStreamName(dataAssetType); + String dataStreamName = + getDataStreamName(searchRepository.getClusterAlias(), dataAssetType); if (!searchInterface.dataAssetDataStreamExists(dataStreamName)) { searchInterface.createDataAssetsDataStream( dataStreamName, @@ -186,7 +196,8 @@ public class DataInsightsApp extends AbstractNativeApplication { try { for (String dataAssetType : dataAssetTypes) { - String dataStreamName = getDataStreamName(dataAssetType); + String dataStreamName = + getDataStreamName(searchRepository.getClusterAlias(), dataAssetType); if (searchInterface.dataAssetDataStreamExists(dataStreamName)) { searchInterface.deleteDataAssetDataStream(dataStreamName); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/DataInsightsSearchInterface.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/DataInsightsSearchInterface.java index b32e3a12f71..70fc3019cd5 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/DataInsightsSearchInterface.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/DataInsightsSearchInterface.java @@ -94,4 +94,17 @@ public interface DataInsightsSearchInterface { void updateLifecyclePolicy(int retentionDays) throws IOException; Boolean dataAssetDataStreamExists(String name) throws IOException; + + String getClusterAlias(); + + default String getStringWithClusterAlias(String s) { + return getStringWithClusterAlias(getClusterAlias(), s); + } + + static String getStringWithClusterAlias(String clusterAlias, String s) { + if (!(clusterAlias == null || clusterAlias.isEmpty())) { + return String.format("%s-%s", clusterAlias, s); + } + return s; + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/IndexTemplate.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/IndexTemplate.java new file mode 100644 index 00000000000..9fdd464ae55 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/IndexTemplate.java @@ -0,0 +1,36 @@ +package org.openmetadata.service.apps.bundles.insights.search; + +import static org.openmetadata.service.apps.bundles.insights.search.DataInsightsSearchInterface.getStringWithClusterAlias; + +import java.util.List; +import java.util.Map; +import org.openmetadata.service.util.JsonUtils; + +public class IndexTemplate { + public static final String COMPOSED_OF = "composed_of"; + public static final String INDEX_PATTERNS = "index_patterns"; + + public static String getIndexTemplateWithClusterAlias( + String clusterAlias, String indexTemplateJson) { + + Map indexTemplateMap = + JsonUtils.readOrConvertValue(indexTemplateJson, Map.class); + List composedOf = + JsonUtils.readOrConvertValue(indexTemplateMap.get(COMPOSED_OF), List.class); + + composedOf = + composedOf.stream().map(part -> getStringWithClusterAlias(clusterAlias, part)).toList(); + indexTemplateMap.put(COMPOSED_OF, composedOf); + + List indexPatterns = + JsonUtils.readOrConvertValue(indexTemplateMap.get(INDEX_PATTERNS), List.class); + + indexPatterns = + indexPatterns.stream() + .map(pattern -> getStringWithClusterAlias(clusterAlias, pattern)) + .toList(); + indexTemplateMap.put(INDEX_PATTERNS, indexPatterns); + + return JsonUtils.pojoToJson(indexTemplateMap); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/elasticsearch/ElasticSearchDataInsightsClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/elasticsearch/ElasticSearchDataInsightsClient.java index ea3c4227342..8295940ae40 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/elasticsearch/ElasticSearchDataInsightsClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/elasticsearch/ElasticSearchDataInsightsClient.java @@ -7,15 +7,23 @@ import java.io.IOException; import org.apache.http.util.EntityUtils; import org.openmetadata.service.apps.bundles.insights.search.DataInsightsSearchInterface; import org.openmetadata.service.apps.bundles.insights.search.IndexLifecyclePolicyConfig; +import org.openmetadata.service.apps.bundles.insights.search.IndexTemplate; import org.openmetadata.service.search.models.IndexMapping; public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterface { private final RestClient client; private final String resourcePath = "/dataInsights/elasticsearch"; private final String lifecyclePolicyName = "di-data-assets-lifecycle"; + private final String clusterAlias; - public ElasticSearchDataInsightsClient(RestClient client) { + public ElasticSearchDataInsightsClient(RestClient client, String clusterAlias) { this.client = client; + this.clusterAlias = clusterAlias; + } + + @Override + public String getClusterAlias() { + return clusterAlias; } private Response performRequest(String method, String path) throws IOException { @@ -65,22 +73,24 @@ public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterf int retentionDays) throws IOException { createLifecyclePolicy( - lifecyclePolicyName, + getStringWithClusterAlias(lifecyclePolicyName), buildLifecyclePolicy( readResource(String.format("%s/indexLifecyclePolicy.json", resourcePath)), retentionDays)); createComponentTemplate( - "di-data-assets-settings", + getStringWithClusterAlias("di-data-assets-settings"), readResource(String.format("%s/indexSettingsTemplate.json", resourcePath))); createComponentTemplate( - "di-data-assets-mapping", + getStringWithClusterAlias("di-data-assets-mapping"), buildMapping( entityType, entityIndexMapping, language, readResource(String.format("%s/indexMappingsTemplate.json", resourcePath)))); createIndexTemplate( - "di-data-assets", readResource(String.format("%s/indexTemplate.json", resourcePath))); + getStringWithClusterAlias("di-data-assets"), + IndexTemplate.getIndexTemplateWithClusterAlias( + getClusterAlias(), readResource(String.format("%s/indexTemplate.json", resourcePath)))); createDataStream(name); } @@ -94,10 +104,13 @@ public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterf public void updateLifecyclePolicy(int retentionDays) throws IOException { String currentLifecyclePolicy = EntityUtils.toString( - performRequest("GET", String.format("/_ilm/policy/%s", lifecyclePolicyName)) + performRequest( + "GET", + String.format( + "/_ilm/policy/%s", getStringWithClusterAlias(lifecyclePolicyName))) .getEntity()); if (new IndexLifecyclePolicyConfig( - lifecyclePolicyName, + getStringWithClusterAlias(lifecyclePolicyName), currentLifecyclePolicy, IndexLifecyclePolicyConfig.SearchType.ELASTICSEARCH) .getRetentionDays() @@ -107,7 +120,9 @@ public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterf readResource(String.format("%s/indexLifecyclePolicy.json", resourcePath)), retentionDays); performRequest( - "PUT", String.format("/_ilm/policy/%s", lifecyclePolicyName), updatedLifecyclePolicy); + "PUT", + String.format("/_ilm/policy/%s", getStringWithClusterAlias(lifecyclePolicyName)), + updatedLifecyclePolicy); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/opensearch/OpenSearchDataInsightsClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/opensearch/OpenSearchDataInsightsClient.java index 0c505658bba..b7d261e4ed6 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/opensearch/OpenSearchDataInsightsClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/opensearch/OpenSearchDataInsightsClient.java @@ -4,6 +4,7 @@ import java.io.IOException; import org.apache.http.util.EntityUtils; import org.openmetadata.service.apps.bundles.insights.search.DataInsightsSearchInterface; import org.openmetadata.service.apps.bundles.insights.search.IndexLifecyclePolicyConfig; +import org.openmetadata.service.apps.bundles.insights.search.IndexTemplate; import org.openmetadata.service.search.models.IndexMapping; import os.org.opensearch.client.Request; import os.org.opensearch.client.Response; @@ -14,9 +15,16 @@ public class OpenSearchDataInsightsClient implements DataInsightsSearchInterface private final RestClient client; private final String resourcePath = "/dataInsights/opensearch"; private final String lifecyclePolicyName = "di-data-assets-lifecycle"; + private final String clusterAlias; - public OpenSearchDataInsightsClient(RestClient client) { + public OpenSearchDataInsightsClient(RestClient client, String clusterAlias) { this.client = client; + this.clusterAlias = clusterAlias; + } + + @Override + public String getClusterAlias() { + return clusterAlias; } private Response performRequest(String method, String path) throws IOException { @@ -76,19 +84,21 @@ public class OpenSearchDataInsightsClient implements DataInsightsSearchInterface int retentionDays) throws IOException { createLifecyclePolicy( - lifecyclePolicyName, + getStringWithClusterAlias(lifecyclePolicyName), buildLifecyclePolicy( readResource(String.format("%s/indexLifecyclePolicy.json", resourcePath)), retentionDays)); createComponentTemplate( - "di-data-assets-mapping", + getStringWithClusterAlias("di-data-assets-mapping"), buildMapping( entityType, entityIndexMapping, language, readResource(String.format("%s/indexMappingsTemplate.json", resourcePath)))); createIndexTemplate( - "di-data-assets", readResource(String.format("%s/indexTemplate.json", resourcePath))); + getStringWithClusterAlias("di-data-assets"), + IndexTemplate.getIndexTemplateWithClusterAlias( + getClusterAlias(), readResource(String.format("%s/indexTemplate.json", resourcePath)))); createDataStream(name); } @@ -102,10 +112,14 @@ public class OpenSearchDataInsightsClient implements DataInsightsSearchInterface public void updateLifecyclePolicy(int retentionDays) throws IOException { String currentLifecyclePolicy = EntityUtils.toString( - performRequest("GET", String.format("/_plugins/_ism/policies/%s", lifecyclePolicyName)) + performRequest( + "GET", + String.format( + "/_plugins/_ism/policies/%s", + getStringWithClusterAlias(lifecyclePolicyName))) .getEntity()); if (new IndexLifecyclePolicyConfig( - lifecyclePolicyName, + getStringWithClusterAlias(lifecyclePolicyName), currentLifecyclePolicy, IndexLifecyclePolicyConfig.SearchType.OPENSEARCH) .getRetentionDays() @@ -114,7 +128,7 @@ public class OpenSearchDataInsightsClient implements DataInsightsSearchInterface buildLifecyclePolicy( readResource(String.format("%s/indexLifecyclePolicy.json", resourcePath)), retentionDays); - createLifecyclePolicy(lifecyclePolicyName, updatedLifecyclePolicy); + createLifecyclePolicy(getStringWithClusterAlias(lifecyclePolicyName), updatedLifecyclePolicy); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java index 00a89fca779..ae4e5128c87 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java @@ -190,9 +190,13 @@ public class DataAssetsWorkflow { contextData.put(END_TIMESTAMP_KEY, endTimestamp); for (PaginatedEntitiesSource source : sources) { - deleteBasedOnDataRetentionPolicy(getDataStreamName(source.getEntityType())); - deleteDataBeforeInserting(getDataStreamName(source.getEntityType())); - contextData.put(DATA_STREAM_KEY, getDataStreamName(source.getEntityType())); + deleteBasedOnDataRetentionPolicy( + getDataStreamName(searchRepository.getClusterAlias(), source.getEntityType())); + deleteDataBeforeInserting( + getDataStreamName(searchInterface.getClusterAlias(), source.getEntityType())); + contextData.put( + DATA_STREAM_KEY, + getDataStreamName(searchInterface.getClusterAlias(), source.getEntityType())); contextData.put(ENTITY_TYPE_KEY, source.getEntityType()); contextData.put( ENTITY_TYPE_FIELDS_KEY, diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataInsightSystemChartRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataInsightSystemChartRepository.java index 046ffa34692..c1ca0db0e35 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataInsightSystemChartRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataInsightSystemChartRepository.java @@ -59,6 +59,22 @@ public class DataInsightSystemChartRepository extends EntityRepository { .equals(ElasticSearchConfiguration.SearchType.ELASTICSEARCH)) { searchInterface = new ElasticSearchDataInsightsClient( - (RestClient) getSearchRepository().getSearchClient().getLowLevelClient()); + (RestClient) getSearchRepository().getSearchClient().getLowLevelClient(), + getSearchRepository().getClusterAlias()); } else { searchInterface = new OpenSearchDataInsightsClient( (os.org.opensearch.client.RestClient) - getSearchRepository().getSearchClient().getLowLevelClient()); + getSearchRepository().getSearchClient().getLowLevelClient(), + getSearchRepository().getClusterAlias()); } try { for (String dataAssetType : dataAssetTypes) {