Add ClusterAlias to Data Insights indexes (#20222)

This commit is contained in:
IceS2 2025-03-13 03:01:41 -03:00 committed by GitHub
parent a964cda244
commit dc84874a43
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 145 additions and 34 deletions

View File

@ -102,18 +102,25 @@ public class DataInsightsApp extends AbstractNativeApplication {
.equals(ElasticSearchConfiguration.SearchType.ELASTICSEARCH)) {
searchInterface =
new ElasticSearchDataInsightsClient(
(RestClient) searchRepository.getSearchClient().getLowLevelClient());
(RestClient) searchRepository.getSearchClient().getLowLevelClient(),
searchRepository.getClusterAlias());
} else {
searchInterface =
new OpenSearchDataInsightsClient(
(os.org.opensearch.client.RestClient)
searchRepository.getSearchClient().getLowLevelClient());
searchRepository.getSearchClient().getLowLevelClient(),
searchRepository.getClusterAlias());
}
return searchInterface;
}
public static String getDataStreamName(String dataAssetType) {
return String.format("%s-%s", DATA_ASSET_INDEX_PREFIX, dataAssetType).toLowerCase();
public static String getDataStreamName(String prefix, String dataAssetType) {
String dataStreamName =
String.format("%s-%s", DATA_ASSET_INDEX_PREFIX, dataAssetType).toLowerCase();
if (!(prefix == null || prefix.isEmpty())) {
dataStreamName = String.format("%s-%s", prefix, dataStreamName);
}
return dataStreamName;
}
private void createIndexInternal(String entityType) throws IOException {
@ -122,10 +129,12 @@ public class DataInsightsApp extends AbstractNativeApplication {
searchRepository.createIndex(resultIndexType);
}
DataInsightsSearchInterface searchInterface = getSearchInterface();
if (!searchInterface.dataAssetDataStreamExists(getDataStreamName(entityType))) {
if (!searchInterface.dataAssetDataStreamExists(
getDataStreamName(searchRepository.getClusterAlias(), entityType))) {
searchRepository
.getSearchClient()
.addIndexAlias(resultIndexType, getDataStreamName(entityType));
.addIndexAlias(
resultIndexType, getDataStreamName(searchRepository.getClusterAlias(), entityType));
}
}
@ -164,7 +173,8 @@ public class DataInsightsApp extends AbstractNativeApplication {
try {
for (String dataAssetType : dataAssetTypes) {
IndexMapping dataAssetIndex = searchRepository.getIndexMapping(dataAssetType);
String dataStreamName = getDataStreamName(dataAssetType);
String dataStreamName =
getDataStreamName(searchRepository.getClusterAlias(), dataAssetType);
if (!searchInterface.dataAssetDataStreamExists(dataStreamName)) {
searchInterface.createDataAssetsDataStream(
dataStreamName,
@ -186,7 +196,8 @@ public class DataInsightsApp extends AbstractNativeApplication {
try {
for (String dataAssetType : dataAssetTypes) {
String dataStreamName = getDataStreamName(dataAssetType);
String dataStreamName =
getDataStreamName(searchRepository.getClusterAlias(), dataAssetType);
if (searchInterface.dataAssetDataStreamExists(dataStreamName)) {
searchInterface.deleteDataAssetDataStream(dataStreamName);
}

View File

@ -94,4 +94,17 @@ public interface DataInsightsSearchInterface {
void updateLifecyclePolicy(int retentionDays) throws IOException;
Boolean dataAssetDataStreamExists(String name) throws IOException;
String getClusterAlias();
default String getStringWithClusterAlias(String s) {
return getStringWithClusterAlias(getClusterAlias(), s);
}
static String getStringWithClusterAlias(String clusterAlias, String s) {
if (!(clusterAlias == null || clusterAlias.isEmpty())) {
return String.format("%s-%s", clusterAlias, s);
}
return s;
}
}

View File

@ -0,0 +1,36 @@
package org.openmetadata.service.apps.bundles.insights.search;
import static org.openmetadata.service.apps.bundles.insights.search.DataInsightsSearchInterface.getStringWithClusterAlias;
import java.util.List;
import java.util.Map;
import org.openmetadata.service.util.JsonUtils;
public class IndexTemplate {
public static final String COMPOSED_OF = "composed_of";
public static final String INDEX_PATTERNS = "index_patterns";
public static String getIndexTemplateWithClusterAlias(
String clusterAlias, String indexTemplateJson) {
Map<String, Object> indexTemplateMap =
JsonUtils.readOrConvertValue(indexTemplateJson, Map.class);
List<String> composedOf =
JsonUtils.readOrConvertValue(indexTemplateMap.get(COMPOSED_OF), List.class);
composedOf =
composedOf.stream().map(part -> getStringWithClusterAlias(clusterAlias, part)).toList();
indexTemplateMap.put(COMPOSED_OF, composedOf);
List<String> indexPatterns =
JsonUtils.readOrConvertValue(indexTemplateMap.get(INDEX_PATTERNS), List.class);
indexPatterns =
indexPatterns.stream()
.map(pattern -> getStringWithClusterAlias(clusterAlias, pattern))
.toList();
indexTemplateMap.put(INDEX_PATTERNS, indexPatterns);
return JsonUtils.pojoToJson(indexTemplateMap);
}
}

View File

@ -7,15 +7,23 @@ import java.io.IOException;
import org.apache.http.util.EntityUtils;
import org.openmetadata.service.apps.bundles.insights.search.DataInsightsSearchInterface;
import org.openmetadata.service.apps.bundles.insights.search.IndexLifecyclePolicyConfig;
import org.openmetadata.service.apps.bundles.insights.search.IndexTemplate;
import org.openmetadata.service.search.models.IndexMapping;
public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterface {
private final RestClient client;
private final String resourcePath = "/dataInsights/elasticsearch";
private final String lifecyclePolicyName = "di-data-assets-lifecycle";
private final String clusterAlias;
public ElasticSearchDataInsightsClient(RestClient client) {
public ElasticSearchDataInsightsClient(RestClient client, String clusterAlias) {
this.client = client;
this.clusterAlias = clusterAlias;
}
@Override
public String getClusterAlias() {
return clusterAlias;
}
private Response performRequest(String method, String path) throws IOException {
@ -65,22 +73,24 @@ public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterf
int retentionDays)
throws IOException {
createLifecyclePolicy(
lifecyclePolicyName,
getStringWithClusterAlias(lifecyclePolicyName),
buildLifecyclePolicy(
readResource(String.format("%s/indexLifecyclePolicy.json", resourcePath)),
retentionDays));
createComponentTemplate(
"di-data-assets-settings",
getStringWithClusterAlias("di-data-assets-settings"),
readResource(String.format("%s/indexSettingsTemplate.json", resourcePath)));
createComponentTemplate(
"di-data-assets-mapping",
getStringWithClusterAlias("di-data-assets-mapping"),
buildMapping(
entityType,
entityIndexMapping,
language,
readResource(String.format("%s/indexMappingsTemplate.json", resourcePath))));
createIndexTemplate(
"di-data-assets", readResource(String.format("%s/indexTemplate.json", resourcePath)));
getStringWithClusterAlias("di-data-assets"),
IndexTemplate.getIndexTemplateWithClusterAlias(
getClusterAlias(), readResource(String.format("%s/indexTemplate.json", resourcePath))));
createDataStream(name);
}
@ -94,10 +104,13 @@ public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterf
public void updateLifecyclePolicy(int retentionDays) throws IOException {
String currentLifecyclePolicy =
EntityUtils.toString(
performRequest("GET", String.format("/_ilm/policy/%s", lifecyclePolicyName))
performRequest(
"GET",
String.format(
"/_ilm/policy/%s", getStringWithClusterAlias(lifecyclePolicyName)))
.getEntity());
if (new IndexLifecyclePolicyConfig(
lifecyclePolicyName,
getStringWithClusterAlias(lifecyclePolicyName),
currentLifecyclePolicy,
IndexLifecyclePolicyConfig.SearchType.ELASTICSEARCH)
.getRetentionDays()
@ -107,7 +120,9 @@ public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterf
readResource(String.format("%s/indexLifecyclePolicy.json", resourcePath)),
retentionDays);
performRequest(
"PUT", String.format("/_ilm/policy/%s", lifecyclePolicyName), updatedLifecyclePolicy);
"PUT",
String.format("/_ilm/policy/%s", getStringWithClusterAlias(lifecyclePolicyName)),
updatedLifecyclePolicy);
}
}

View File

@ -4,6 +4,7 @@ import java.io.IOException;
import org.apache.http.util.EntityUtils;
import org.openmetadata.service.apps.bundles.insights.search.DataInsightsSearchInterface;
import org.openmetadata.service.apps.bundles.insights.search.IndexLifecyclePolicyConfig;
import org.openmetadata.service.apps.bundles.insights.search.IndexTemplate;
import org.openmetadata.service.search.models.IndexMapping;
import os.org.opensearch.client.Request;
import os.org.opensearch.client.Response;
@ -14,9 +15,16 @@ public class OpenSearchDataInsightsClient implements DataInsightsSearchInterface
private final RestClient client;
private final String resourcePath = "/dataInsights/opensearch";
private final String lifecyclePolicyName = "di-data-assets-lifecycle";
private final String clusterAlias;
public OpenSearchDataInsightsClient(RestClient client) {
public OpenSearchDataInsightsClient(RestClient client, String clusterAlias) {
this.client = client;
this.clusterAlias = clusterAlias;
}
@Override
public String getClusterAlias() {
return clusterAlias;
}
private Response performRequest(String method, String path) throws IOException {
@ -76,19 +84,21 @@ public class OpenSearchDataInsightsClient implements DataInsightsSearchInterface
int retentionDays)
throws IOException {
createLifecyclePolicy(
lifecyclePolicyName,
getStringWithClusterAlias(lifecyclePolicyName),
buildLifecyclePolicy(
readResource(String.format("%s/indexLifecyclePolicy.json", resourcePath)),
retentionDays));
createComponentTemplate(
"di-data-assets-mapping",
getStringWithClusterAlias("di-data-assets-mapping"),
buildMapping(
entityType,
entityIndexMapping,
language,
readResource(String.format("%s/indexMappingsTemplate.json", resourcePath))));
createIndexTemplate(
"di-data-assets", readResource(String.format("%s/indexTemplate.json", resourcePath)));
getStringWithClusterAlias("di-data-assets"),
IndexTemplate.getIndexTemplateWithClusterAlias(
getClusterAlias(), readResource(String.format("%s/indexTemplate.json", resourcePath))));
createDataStream(name);
}
@ -102,10 +112,14 @@ public class OpenSearchDataInsightsClient implements DataInsightsSearchInterface
public void updateLifecyclePolicy(int retentionDays) throws IOException {
String currentLifecyclePolicy =
EntityUtils.toString(
performRequest("GET", String.format("/_plugins/_ism/policies/%s", lifecyclePolicyName))
performRequest(
"GET",
String.format(
"/_plugins/_ism/policies/%s",
getStringWithClusterAlias(lifecyclePolicyName)))
.getEntity());
if (new IndexLifecyclePolicyConfig(
lifecyclePolicyName,
getStringWithClusterAlias(lifecyclePolicyName),
currentLifecyclePolicy,
IndexLifecyclePolicyConfig.SearchType.OPENSEARCH)
.getRetentionDays()
@ -114,7 +128,7 @@ public class OpenSearchDataInsightsClient implements DataInsightsSearchInterface
buildLifecyclePolicy(
readResource(String.format("%s/indexLifecyclePolicy.json", resourcePath)),
retentionDays);
createLifecyclePolicy(lifecyclePolicyName, updatedLifecyclePolicy);
createLifecyclePolicy(getStringWithClusterAlias(lifecyclePolicyName), updatedLifecyclePolicy);
}
}

View File

@ -190,9 +190,13 @@ public class DataAssetsWorkflow {
contextData.put(END_TIMESTAMP_KEY, endTimestamp);
for (PaginatedEntitiesSource source : sources) {
deleteBasedOnDataRetentionPolicy(getDataStreamName(source.getEntityType()));
deleteDataBeforeInserting(getDataStreamName(source.getEntityType()));
contextData.put(DATA_STREAM_KEY, getDataStreamName(source.getEntityType()));
deleteBasedOnDataRetentionPolicy(
getDataStreamName(searchRepository.getClusterAlias(), source.getEntityType()));
deleteDataBeforeInserting(
getDataStreamName(searchInterface.getClusterAlias(), source.getEntityType()));
contextData.put(
DATA_STREAM_KEY,
getDataStreamName(searchInterface.getClusterAlias(), source.getEntityType()));
contextData.put(ENTITY_TYPE_KEY, source.getEntityType());
contextData.put(
ENTITY_TYPE_FIELDS_KEY,

View File

@ -59,6 +59,22 @@ public class DataInsightSystemChartRepository extends EntityRepository<DataInsig
"");
}
public static String getDataInsightsIndexPrefix() {
String clusterAlias = Entity.getSearchRepository().getClusterAlias();
if (!(clusterAlias == null || clusterAlias.isEmpty())) {
return String.format("%s-%s", clusterAlias, DI_SEARCH_INDEX_PREFIX);
}
return DI_SEARCH_INDEX_PREFIX;
}
public static String getDataInsightsSearchIndex() {
String clusterAlias = Entity.getSearchRepository().getClusterAlias();
if (!(clusterAlias == null || clusterAlias.isEmpty())) {
return String.format("%s-%s", clusterAlias, DI_SEARCH_INDEX);
}
return DI_SEARCH_INDEX;
}
@Override
public void setFields(DataInsightCustomChart entity, EntityUtil.Fields fields) {
/* Nothing to do */

View File

@ -2422,7 +2422,7 @@ public class ElasticSearchClient implements SearchClient {
GetMappingsRequest request =
new GetMappingsRequest()
.indices(
DataInsightSystemChartRepository.DI_SEARCH_INDEX_PREFIX
DataInsightSystemChartRepository.getDataInsightsIndexPrefix()
+ "-"
+ type.toLowerCase());

View File

@ -124,7 +124,7 @@ public class ElasticSearchLineChartAggregator
searchSourceBuilder.query(queryFilter);
es.org.elasticsearch.action.search.SearchRequest searchRequest =
new es.org.elasticsearch.action.search.SearchRequest(
DataInsightSystemChartRepository.DI_SEARCH_INDEX);
DataInsightSystemChartRepository.getDataInsightsSearchIndex());
searchRequest.source(searchSourceBuilder);
return searchRequest;
}

View File

@ -56,7 +56,7 @@ public class ElasticSearchSummaryCardAggregator
searchSourceBuilder.size(0);
es.org.elasticsearch.action.search.SearchRequest searchRequest =
new es.org.elasticsearch.action.search.SearchRequest(
DataInsightSystemChartRepository.DI_SEARCH_INDEX);
DataInsightSystemChartRepository.getDataInsightsSearchIndex());
searchRequest.source(searchSourceBuilder);
return searchRequest;
}

View File

@ -2388,7 +2388,7 @@ public class OpenSearchClient implements SearchClient {
GetMappingsRequest request =
new GetMappingsRequest()
.indices(
DataInsightSystemChartRepository.DI_SEARCH_INDEX_PREFIX
DataInsightSystemChartRepository.getDataInsightsIndexPrefix()
+ "-"
+ type.toLowerCase());

View File

@ -123,7 +123,7 @@ public class OpenSearchLineChartAggregator implements OpenSearchDynamicChartAggr
searchSourceBuilder.query(queryFilter);
os.org.opensearch.action.search.SearchRequest searchRequest =
new os.org.opensearch.action.search.SearchRequest(
DataInsightSystemChartRepository.DI_SEARCH_INDEX);
DataInsightSystemChartRepository.getDataInsightsSearchIndex());
searchRequest.source(searchSourceBuilder);
return searchRequest;
}

View File

@ -54,7 +54,7 @@ public class OpenSearchSummaryCardAggregator implements OpenSearchDynamicChartAg
searchSourceBuilder.size(0);
os.org.opensearch.action.search.SearchRequest searchRequest =
new os.org.opensearch.action.search.SearchRequest(
DataInsightSystemChartRepository.DI_SEARCH_INDEX);
DataInsightSystemChartRepository.getDataInsightsSearchIndex());
searchRequest.source(searchSourceBuilder);
return searchRequest;
}

View File

@ -70,12 +70,14 @@ public class KpiResourceTest extends EntityResourceTest<Kpi, CreateKpiRequest> {
.equals(ElasticSearchConfiguration.SearchType.ELASTICSEARCH)) {
searchInterface =
new ElasticSearchDataInsightsClient(
(RestClient) getSearchRepository().getSearchClient().getLowLevelClient());
(RestClient) getSearchRepository().getSearchClient().getLowLevelClient(),
getSearchRepository().getClusterAlias());
} else {
searchInterface =
new OpenSearchDataInsightsClient(
(os.org.opensearch.client.RestClient)
getSearchRepository().getSearchClient().getLowLevelClient());
getSearchRepository().getSearchClient().getLowLevelClient(),
getSearchRepository().getClusterAlias());
}
try {
for (String dataAssetType : dataAssetTypes) {