mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-30 18:17:53 +00:00
Fix Indexing Recreate (#23867)
* Fix Indexing Recreate * Fix OpenMetadata Operation Creation and Deletion * Default Should Cleanup all precreated prefixed indices
This commit is contained in:
parent
b6544b6855
commit
73eca212ce
@ -149,7 +149,7 @@ public class DataInsightsApp extends AbstractNativeApplication {
|
||||
}
|
||||
}
|
||||
|
||||
private void createDataQualityDataIndex() {
|
||||
public void createDataQualityDataIndex() {
|
||||
try {
|
||||
createIndexInternal(Entity.TEST_CASE_RESULT);
|
||||
createIndexInternal(Entity.TEST_CASE_RESOLUTION_STATUS);
|
||||
@ -165,7 +165,7 @@ public class DataInsightsApp extends AbstractNativeApplication {
|
||||
deleteIndexInternal(Entity.TEST_CASE_RESOLUTION_STATUS);
|
||||
}
|
||||
|
||||
private void createOrUpdateDataAssetsDataStream() {
|
||||
public void createOrUpdateDataAssetsDataStream() {
|
||||
DataInsightsSearchInterface searchInterface = getSearchInterface();
|
||||
|
||||
ElasticSearchConfiguration config = searchRepository.getSearchConfiguration();
|
||||
|
||||
@ -17,7 +17,7 @@ import org.openmetadata.schema.utils.JsonUtils;
|
||||
import org.openmetadata.search.IndexMapping;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.exception.SearchIndexException;
|
||||
import org.openmetadata.service.search.RecreateIndexHandler;
|
||||
import org.openmetadata.service.search.ReindexContext;
|
||||
import org.openmetadata.service.search.SearchRepository;
|
||||
import org.openmetadata.service.search.opensearch.OpenSearchClient;
|
||||
import os.org.opensearch.action.bulk.BackoffPolicy;
|
||||
@ -222,7 +222,7 @@ public class OpenSearchBulkSink implements BulkSink {
|
||||
indexName,
|
||||
recreateIndex,
|
||||
(contextData.containsKey(RECREATE_CONTEXT)
|
||||
? (RecreateIndexHandler.ReindexContext) contextData.get(RECREATE_CONTEXT)
|
||||
? (ReindexContext) contextData.get(RECREATE_CONTEXT)
|
||||
: null),
|
||||
embeddingsEnabled);
|
||||
}
|
||||
@ -247,7 +247,7 @@ public class OpenSearchBulkSink implements BulkSink {
|
||||
EntityInterface entity,
|
||||
String indexName,
|
||||
boolean recreateIndex,
|
||||
RecreateIndexHandler.ReindexContext reindexContext,
|
||||
ReindexContext reindexContext,
|
||||
boolean embeddingsEnabled) {
|
||||
// Build the search index document using the proper transformation
|
||||
String entityType = Entity.getEntityTypeFromObject(entity);
|
||||
@ -369,5 +369,5 @@ public class OpenSearchBulkSink implements BulkSink {
|
||||
BulkProcessor bulkProcessor,
|
||||
EntityInterface entity,
|
||||
boolean recreateIndex,
|
||||
RecreateIndexHandler.ReindexContext reindexContext) {}
|
||||
ReindexContext reindexContext) {}
|
||||
}
|
||||
|
||||
@ -63,8 +63,9 @@ import org.openmetadata.service.jdbi3.EntityRepository;
|
||||
import org.openmetadata.service.jdbi3.EntityTimeSeriesRepository;
|
||||
import org.openmetadata.service.jdbi3.ListFilter;
|
||||
import org.openmetadata.service.jdbi3.SystemRepository;
|
||||
import org.openmetadata.service.search.EntityReindexContext;
|
||||
import org.openmetadata.service.search.RecreateIndexHandler;
|
||||
import org.openmetadata.service.search.RecreateIndexHandler.ReindexContext;
|
||||
import org.openmetadata.service.search.ReindexContext;
|
||||
import org.openmetadata.service.search.SearchClusterMetrics;
|
||||
import org.openmetadata.service.search.SearchRepository;
|
||||
import org.openmetadata.service.socket.WebSocketManager;
|
||||
@ -259,7 +260,24 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
||||
success = jobData != null && jobData.getStatus() == EventPublisherJob.Status.COMPLETED;
|
||||
handleJobCompletion();
|
||||
} finally {
|
||||
finalizeRecreateIndexes(success);
|
||||
finalizeAllEntityReindex(success);
|
||||
}
|
||||
}
|
||||
|
||||
private void finalizeAllEntityReindex(boolean finalSuccess) {
|
||||
try {
|
||||
recreateContext
|
||||
.getEntities()
|
||||
.forEach(
|
||||
entityType -> {
|
||||
try {
|
||||
finalizeEntityReindex(entityType, finalSuccess);
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Failed to finalize reindex for entity: {}", entityType, ex);
|
||||
}
|
||||
});
|
||||
} finally {
|
||||
recreateContext = null;
|
||||
}
|
||||
}
|
||||
|
||||
@ -371,17 +389,36 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
private void finalizeRecreateIndexes(boolean success) {
|
||||
private void finalizeEntityReindex(String entityType, boolean success) {
|
||||
if (recreateIndexHandler == null || recreateContext == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
String originalIndex = recreateContext.getOriginalIndex(entityType).orElse(null);
|
||||
String canonicalIndex = recreateContext.getCanonicalIndex(entityType).orElse(null);
|
||||
String activeIndex = recreateContext.getOriginalIndex(entityType).orElse(null);
|
||||
String stagedIndex = recreateContext.getStagedIndex(entityType).orElse(null);
|
||||
String canonicalAlias = recreateContext.getCanonicalAlias(entityType).orElse(null);
|
||||
Set<String> existingAliases = recreateContext.getExistingAliases(entityType);
|
||||
Set<String> parentAliases =
|
||||
new HashSet<>(listOrEmpty(recreateContext.getParentAliases(entityType)));
|
||||
|
||||
EntityReindexContext entityReindexContext =
|
||||
EntityReindexContext.builder()
|
||||
.entityType(entityType)
|
||||
.originalIndex(originalIndex)
|
||||
.canonicalIndex(canonicalIndex)
|
||||
.activeIndex(activeIndex)
|
||||
.stagedIndex(stagedIndex)
|
||||
.canonicalAliases(canonicalAlias)
|
||||
.existingAliases(existingAliases)
|
||||
.parentAliases(parentAliases)
|
||||
.build();
|
||||
|
||||
try {
|
||||
recreateIndexHandler.finalizeReindex(recreateContext, success);
|
||||
recreateIndexHandler.finalizeReindex(entityReindexContext, success);
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Failed to finalize index recreation flow", ex);
|
||||
} finally {
|
||||
recreateContext = null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -1,13 +1,12 @@
|
||||
package org.openmetadata.service.search;
|
||||
|
||||
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.openmetadata.common.utils.CommonUtil;
|
||||
import org.openmetadata.search.IndexMapping;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.search.RecreateIndexHandler.ReindexContext;
|
||||
|
||||
/**
|
||||
* Default implementation of RecreateHandler that provides zero-downtime index recreation.
|
||||
@ -20,13 +19,10 @@ public class DefaultRecreateHandler implements RecreateIndexHandler {
|
||||
ReindexContext context = new ReindexContext();
|
||||
SearchRepository searchRepository = Entity.getSearchRepository();
|
||||
|
||||
if (CommonUtil.nullOrEmpty(entities)) {
|
||||
if (nullOrEmpty(entities)) {
|
||||
return context;
|
||||
}
|
||||
|
||||
String clusterAlias = searchRepository.getClusterAlias();
|
||||
SearchClient<?> searchClient = searchRepository.getSearchClient();
|
||||
|
||||
for (String entityType : entities) {
|
||||
IndexMapping indexMapping = searchRepository.getIndexMapping(entityType);
|
||||
if (indexMapping == null) {
|
||||
@ -35,160 +31,187 @@ public class DefaultRecreateHandler implements RecreateIndexHandler {
|
||||
continue;
|
||||
}
|
||||
|
||||
String canonicalIndexName = indexMapping.getIndexName(clusterAlias);
|
||||
String activeIndexName = canonicalIndexName;
|
||||
|
||||
if (!searchClient.indexExists(canonicalIndexName)) {
|
||||
Set<String> aliasTargets =
|
||||
searchClient.getIndicesByAlias(indexMapping.getAlias(clusterAlias));
|
||||
if (!aliasTargets.isEmpty()) {
|
||||
activeIndexName = aliasTargets.iterator().next();
|
||||
LOG.debug(
|
||||
"Resolved active index '{}' for entity '{}' via alias '{}'.",
|
||||
activeIndexName,
|
||||
entityType,
|
||||
indexMapping.getAlias(clusterAlias));
|
||||
} else {
|
||||
LOG.debug(
|
||||
"No existing index or alias found for entity '{}'. Rebuilding from scratch.",
|
||||
entityType);
|
||||
activeIndexName = null;
|
||||
}
|
||||
}
|
||||
|
||||
String mappingContent = searchRepository.readIndexMapping(indexMapping);
|
||||
if (mappingContent == null) {
|
||||
LOG.warn(
|
||||
"Unable to read index mapping content for '{}'. Skipping staged recreation.",
|
||||
canonicalIndexName);
|
||||
continue;
|
||||
}
|
||||
|
||||
String stagedIndexName = buildStagedIndexName(canonicalIndexName);
|
||||
searchClient.createIndex(stagedIndexName, mappingContent);
|
||||
|
||||
Set<String> existingAliases =
|
||||
activeIndexName != null ? searchClient.getAliases(activeIndexName) : Set.of();
|
||||
|
||||
context.add(
|
||||
entityType,
|
||||
canonicalIndexName,
|
||||
activeIndexName,
|
||||
stagedIndexName,
|
||||
existingAliases,
|
||||
indexMapping.getAlias(clusterAlias),
|
||||
indexMapping.getParentAliases(clusterAlias));
|
||||
LOG.info(
|
||||
"Created staged index '{}' for entity '{}' to support zero-downtime recreation.",
|
||||
stagedIndexName,
|
||||
entityType);
|
||||
recreateIndexFromMapping(context, indexMapping, entityType);
|
||||
}
|
||||
|
||||
return context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void finalizeReindex(ReindexContext context, boolean success) {
|
||||
if (context == null || context.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
public void finalizeReindex(EntityReindexContext context, boolean reindexSuccess) {
|
||||
String entityType = context.getEntityType();
|
||||
String canonicalIndex = context.getCanonicalIndex();
|
||||
String activeIndex = context.getActiveIndex();
|
||||
String stagedIndex = context.getStagedIndex();
|
||||
Set<String> existingAliases = context.getExistingAliases();
|
||||
String canonicalAlias = context.getCanonicalAliases();
|
||||
Set<String> parentAliases = context.getParentAliases();
|
||||
|
||||
SearchRepository searchRepository = Entity.getSearchRepository();
|
||||
SearchClient<?> searchClient = searchRepository.getSearchClient();
|
||||
|
||||
for (String entityType : context.getEntities()) {
|
||||
String canonicalIndex = context.getCanonicalIndex(entityType).orElse(null);
|
||||
String activeIndex = context.getOriginalIndex(entityType).orElse(null);
|
||||
String stagedIndex = context.getStagedIndex(entityType).orElse(null);
|
||||
if (canonicalIndex == null || stagedIndex == null) {
|
||||
LOG.error(
|
||||
"Cannot finalize reindex for entity '{}'. Missing canonical or staged index name.",
|
||||
entityType);
|
||||
return;
|
||||
}
|
||||
|
||||
if (canonicalIndex == null || stagedIndex == null) {
|
||||
continue;
|
||||
}
|
||||
if (reindexSuccess) {
|
||||
try {
|
||||
Set<String> aliasesToAttach = new HashSet<>();
|
||||
|
||||
if (success) {
|
||||
try {
|
||||
Set<String> aliasesToAttach = new HashSet<>();
|
||||
aliasesToAttach.addAll(context.getExistingAliases(entityType));
|
||||
context.getCanonicalAlias(entityType).ifPresent(aliasesToAttach::add);
|
||||
aliasesToAttach.add(canonicalIndex);
|
||||
List<String> parentAliases = context.getParentAliases(entityType);
|
||||
if (parentAliases != null) {
|
||||
parentAliases.stream()
|
||||
.filter(alias -> alias != null && !alias.isBlank())
|
||||
.forEach(aliasesToAttach::add);
|
||||
}
|
||||
aliasesToAttach.removeIf(alias -> alias == null || alias.isBlank());
|
||||
// Existing Aliases
|
||||
existingAliases.stream()
|
||||
.filter(alias -> alias != null && !alias.isBlank())
|
||||
.forEach(aliasesToAttach::add);
|
||||
|
||||
for (String alias : aliasesToAttach) {
|
||||
Set<String> targets = searchClient.getIndicesByAlias(alias);
|
||||
for (String target : targets) {
|
||||
if (target.equals(stagedIndex)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
boolean belongsToEntity =
|
||||
target.equals(canonicalIndex) || target.startsWith(canonicalIndex + "_rebuild_");
|
||||
|
||||
if (!belongsToEntity) {
|
||||
LOG.debug(
|
||||
"Skipping alias '{}' removal from index '{}' as it does not belong to entity '{}'.",
|
||||
alias,
|
||||
target,
|
||||
entityType);
|
||||
continue;
|
||||
}
|
||||
|
||||
searchClient.removeAliases(target, Set.of(alias));
|
||||
LOG.info(
|
||||
"Removed alias '{}' from index '{}' during promotion for entity '{}'.",
|
||||
alias,
|
||||
target,
|
||||
entityType);
|
||||
|
||||
if (searchClient.indexExists(target)) {
|
||||
searchClient.deleteIndex(target);
|
||||
LOG.debug("Replaced old index '{}' for entity '{}'.", target, entityType);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (activeIndex != null && searchClient.indexExists(activeIndex)) {
|
||||
searchClient.deleteIndex(activeIndex);
|
||||
LOG.debug("Replaced old index '{}' for entity '{}'.", activeIndex, entityType);
|
||||
}
|
||||
|
||||
if (!aliasesToAttach.isEmpty()) {
|
||||
searchClient.addAliases(stagedIndex, aliasesToAttach);
|
||||
}
|
||||
LOG.info(
|
||||
"Promoted staged index '{}' to serve entity '{}' (aliases: {}).",
|
||||
stagedIndex,
|
||||
entityType,
|
||||
aliasesToAttach);
|
||||
} catch (Exception ex) {
|
||||
LOG.error(
|
||||
"Failed to promote staged index '{}' for entity '{}'.", stagedIndex, entityType, ex);
|
||||
// Canonical Alias
|
||||
if (!nullOrEmpty(canonicalAlias)) {
|
||||
aliasesToAttach.add(canonicalAlias);
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
if (searchClient.indexExists(stagedIndex)) {
|
||||
searchClient.deleteIndex(stagedIndex);
|
||||
LOG.info(
|
||||
"Deleted staged index '{}' after unsuccessful reindex for entity '{}'.",
|
||||
stagedIndex,
|
||||
|
||||
// Parent Aliases
|
||||
parentAliases.stream()
|
||||
.filter(alias -> alias != null && !alias.isBlank())
|
||||
.forEach(aliasesToAttach::add);
|
||||
|
||||
// Remove any null or blank aliases
|
||||
aliasesToAttach.removeIf(alias -> alias == null || alias.isBlank());
|
||||
|
||||
Set<String> allEntityIndices = searchClient.listIndicesByPrefix(canonicalIndex);
|
||||
for (String oldIndex : allEntityIndices) {
|
||||
if (oldIndex.equals(stagedIndex)) {
|
||||
LOG.debug(
|
||||
"Skipping deletion of staged index '{}' for entity '{}'.", stagedIndex, entityType);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (activeIndex != null && oldIndex.equals(activeIndex)) {
|
||||
LOG.debug(
|
||||
"Skipping deletion of currently active index '{}' for entity '{}' (will be deleted after alias swap).",
|
||||
activeIndex,
|
||||
entityType);
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
if (searchClient.indexExists(oldIndex)) {
|
||||
searchClient.deleteIndex(oldIndex);
|
||||
LOG.info("Cleaned up old index '{}' for entity '{}'.", oldIndex, entityType);
|
||||
}
|
||||
} catch (Exception deleteEx) {
|
||||
LOG.warn(
|
||||
"Failed to delete old index '{}' for entity '{}'.", oldIndex, entityType, deleteEx);
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
LOG.warn(
|
||||
"Failed to delete staged index '{}' for entity '{}' after failure.",
|
||||
stagedIndex,
|
||||
entityType,
|
||||
ex);
|
||||
}
|
||||
|
||||
if (activeIndex != null && searchClient.indexExists(activeIndex)) {
|
||||
searchClient.deleteIndex(activeIndex);
|
||||
LOG.info(
|
||||
"Deleted previously active index '{}' for entity '{}'.", activeIndex, entityType);
|
||||
}
|
||||
|
||||
if (!aliasesToAttach.isEmpty()) {
|
||||
searchClient.addAliases(stagedIndex, aliasesToAttach);
|
||||
}
|
||||
LOG.info(
|
||||
"Promoted staged index '{}' to serve entity '{}' (aliases: {}).",
|
||||
stagedIndex,
|
||||
entityType,
|
||||
aliasesToAttach);
|
||||
} catch (Exception ex) {
|
||||
LOG.error(
|
||||
"Failed to promote staged index '{}' for entity '{}'.", stagedIndex, entityType, ex);
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
if (searchClient.indexExists(stagedIndex)) {
|
||||
searchClient.deleteIndex(stagedIndex);
|
||||
LOG.info(
|
||||
"Deleted staged index '{}' after unsuccessful reindex for entity '{}'.",
|
||||
stagedIndex,
|
||||
entityType);
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
LOG.warn(
|
||||
"Failed to delete staged index '{}' for entity '{}' after failure.",
|
||||
stagedIndex,
|
||||
entityType,
|
||||
ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void recreateIndexFromMapping(
|
||||
ReindexContext context, IndexMapping indexMapping, String entityType) {
|
||||
if (indexMapping == null) {
|
||||
LOG.warn("IndexMapping is null for entityType '{}'. Cannot recreate index.", entityType);
|
||||
return;
|
||||
}
|
||||
|
||||
if (context == null) {
|
||||
LOG.warn("ReindexContext is null for entityType '{}'. Cannot recreate index.", entityType);
|
||||
return;
|
||||
}
|
||||
|
||||
SearchRepository searchRepository = Entity.getSearchRepository();
|
||||
String clusterAlias = searchRepository.getClusterAlias();
|
||||
SearchClient<?> searchClient = searchRepository.getSearchClient();
|
||||
|
||||
String canonicalIndexName = indexMapping.getIndexName(clusterAlias);
|
||||
String activeIndexName = canonicalIndexName;
|
||||
|
||||
if (!searchClient.indexExists(canonicalIndexName)) {
|
||||
Set<String> aliasTargets =
|
||||
searchClient.getIndicesByAlias(indexMapping.getAlias(clusterAlias));
|
||||
if (!aliasTargets.isEmpty()) {
|
||||
activeIndexName = aliasTargets.iterator().next();
|
||||
LOG.debug(
|
||||
"Resolved active index '{}' for entity '{}' via alias '{}'.",
|
||||
activeIndexName,
|
||||
entityType,
|
||||
indexMapping.getAlias(clusterAlias));
|
||||
} else {
|
||||
LOG.debug(
|
||||
"No existing index or alias found for entity '{}'. Rebuilding from scratch.",
|
||||
entityType);
|
||||
activeIndexName = null;
|
||||
}
|
||||
}
|
||||
|
||||
String mappingContent = searchRepository.readIndexMapping(indexMapping);
|
||||
if (mappingContent == null) {
|
||||
LOG.warn(
|
||||
"Unable to read index mapping content for '{}'. Cannot recreate index.",
|
||||
canonicalIndexName);
|
||||
return;
|
||||
}
|
||||
|
||||
String stagedIndexName = buildStagedIndexName(canonicalIndexName);
|
||||
searchClient.createIndex(stagedIndexName, mappingContent);
|
||||
|
||||
Set<String> existingAliases =
|
||||
activeIndexName != null ? searchClient.getAliases(activeIndexName) : new HashSet<>();
|
||||
|
||||
// Add the default index
|
||||
existingAliases.add(indexMapping.getAlias(clusterAlias));
|
||||
existingAliases.add(indexMapping.getIndexName(clusterAlias));
|
||||
context.add(
|
||||
entityType,
|
||||
canonicalIndexName,
|
||||
activeIndexName,
|
||||
stagedIndexName,
|
||||
existingAliases,
|
||||
indexMapping.getAlias(clusterAlias),
|
||||
indexMapping.getParentAliases(clusterAlias));
|
||||
|
||||
LOG.info(
|
||||
"Created staged index '{}' for entity '{}' using provided IndexMapping.",
|
||||
stagedIndexName,
|
||||
entityType);
|
||||
}
|
||||
|
||||
private String buildStagedIndexName(String originalIndexName) {
|
||||
return String.format("%s_rebuild_%d", originalIndexName, System.currentTimeMillis());
|
||||
}
|
||||
|
||||
@ -0,0 +1,20 @@
|
||||
package org.openmetadata.service.search;
|
||||
|
||||
import java.util.Set;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
@Builder
|
||||
public class EntityReindexContext {
|
||||
String entityType;
|
||||
String canonicalIndex;
|
||||
String originalIndex;
|
||||
String activeIndex;
|
||||
String stagedIndex;
|
||||
String canonicalAliases;
|
||||
Set<String> existingAliases;
|
||||
Set<String> parentAliases;
|
||||
}
|
||||
@ -102,4 +102,12 @@ public interface IndexManagementClient {
|
||||
* @return set of indices that have the alias
|
||||
*/
|
||||
Set<String> getIndicesByAlias(String aliasName);
|
||||
|
||||
/**
|
||||
* Get all indices that match the given prefix.
|
||||
*
|
||||
* @param prefix the prefix to match index names against
|
||||
* @return set of indices that start with the prefix
|
||||
*/
|
||||
Set<String> listIndicesByPrefix(String prefix);
|
||||
}
|
||||
|
||||
@ -1,11 +1,5 @@
|
||||
package org.openmetadata.service.search;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
@ -15,63 +9,5 @@ import java.util.Set;
|
||||
public interface RecreateIndexHandler {
|
||||
ReindexContext reCreateIndexes(Set<String> entities);
|
||||
|
||||
default void finalizeReindex(ReindexContext context, boolean success) {}
|
||||
|
||||
class ReindexContext {
|
||||
private final Map<String, String> canonicalIndexByEntity = new HashMap<>();
|
||||
private final Map<String, String> originalIndexByEntity = new HashMap<>();
|
||||
private final Map<String, String> stagedIndexByEntity = new HashMap<>();
|
||||
private final Map<String, Set<String>> existingAliasesByEntity = new HashMap<>();
|
||||
private final Map<String, String> canonicalAliasByEntity = new HashMap<>();
|
||||
private final Map<String, List<String>> parentAliasesByEntity = new HashMap<>();
|
||||
|
||||
public void add(
|
||||
String entity,
|
||||
String canonicalIndex,
|
||||
String originalIndex,
|
||||
String stagedIndex,
|
||||
Set<String> existingAliases,
|
||||
String canonicalAlias,
|
||||
List<String> parentAliases) {
|
||||
canonicalIndexByEntity.put(entity, canonicalIndex);
|
||||
originalIndexByEntity.put(entity, originalIndex);
|
||||
stagedIndexByEntity.put(entity, stagedIndex);
|
||||
existingAliasesByEntity.put(
|
||||
entity, new HashSet<>(Optional.ofNullable(existingAliases).orElseGet(HashSet::new)));
|
||||
canonicalAliasByEntity.put(entity, canonicalAlias);
|
||||
parentAliasesByEntity.put(entity, parentAliases != null ? parentAliases : List.of());
|
||||
}
|
||||
|
||||
public Optional<String> getCanonicalIndex(String entity) {
|
||||
return Optional.ofNullable(canonicalIndexByEntity.get(entity));
|
||||
}
|
||||
|
||||
public Set<String> getEntities() {
|
||||
return Collections.unmodifiableSet(stagedIndexByEntity.keySet());
|
||||
}
|
||||
|
||||
public Optional<String> getStagedIndex(String entity) {
|
||||
return Optional.ofNullable(stagedIndexByEntity.get(entity));
|
||||
}
|
||||
|
||||
public Optional<String> getOriginalIndex(String entity) {
|
||||
return Optional.ofNullable(originalIndexByEntity.get(entity));
|
||||
}
|
||||
|
||||
public Set<String> getExistingAliases(String entity) {
|
||||
return existingAliasesByEntity.getOrDefault(entity, Collections.emptySet());
|
||||
}
|
||||
|
||||
public Optional<String> getCanonicalAlias(String entity) {
|
||||
return Optional.ofNullable(canonicalAliasByEntity.get(entity));
|
||||
}
|
||||
|
||||
public List<String> getParentAliases(String entity) {
|
||||
return parentAliasesByEntity.getOrDefault(entity, List.of());
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
return stagedIndexByEntity.isEmpty();
|
||||
}
|
||||
}
|
||||
default void finalizeReindex(EntityReindexContext entityReindexContext, boolean reindexSuccess) {}
|
||||
}
|
||||
|
||||
@ -0,0 +1,67 @@
|
||||
package org.openmetadata.service.search;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
public class ReindexContext {
|
||||
private final Map<String, String> canonicalIndexByEntity = new HashMap<>();
|
||||
private final Map<String, String> originalIndexByEntity = new HashMap<>();
|
||||
private final Map<String, String> stagedIndexByEntity = new HashMap<>();
|
||||
private final Map<String, Set<String>> existingAliasesByEntity = new HashMap<>();
|
||||
private final Map<String, String> canonicalAliasByEntity = new HashMap<>();
|
||||
private final Map<String, List<String>> parentAliasesByEntity = new HashMap<>();
|
||||
|
||||
public void add(
|
||||
String entity,
|
||||
String canonicalIndex,
|
||||
String originalIndex,
|
||||
String stagedIndex,
|
||||
Set<String> existingAliases,
|
||||
String canonicalAlias,
|
||||
List<String> parentAliases) {
|
||||
canonicalIndexByEntity.put(entity, canonicalIndex);
|
||||
originalIndexByEntity.put(entity, originalIndex);
|
||||
stagedIndexByEntity.put(entity, stagedIndex);
|
||||
existingAliasesByEntity.put(
|
||||
entity, new HashSet<>(Optional.ofNullable(existingAliases).orElseGet(HashSet::new)));
|
||||
canonicalAliasByEntity.put(entity, canonicalAlias);
|
||||
parentAliasesByEntity.put(entity, parentAliases != null ? parentAliases : List.of());
|
||||
}
|
||||
|
||||
public Optional<String> getCanonicalIndex(String entity) {
|
||||
return Optional.ofNullable(canonicalIndexByEntity.get(entity));
|
||||
}
|
||||
|
||||
public Set<String> getEntities() {
|
||||
return Collections.unmodifiableSet(stagedIndexByEntity.keySet());
|
||||
}
|
||||
|
||||
public Optional<String> getStagedIndex(String entity) {
|
||||
return Optional.ofNullable(stagedIndexByEntity.get(entity));
|
||||
}
|
||||
|
||||
public Optional<String> getOriginalIndex(String entity) {
|
||||
return Optional.ofNullable(originalIndexByEntity.get(entity));
|
||||
}
|
||||
|
||||
public Set<String> getExistingAliases(String entity) {
|
||||
return existingAliasesByEntity.getOrDefault(entity, Collections.emptySet());
|
||||
}
|
||||
|
||||
public Optional<String> getCanonicalAlias(String entity) {
|
||||
return Optional.ofNullable(canonicalAliasByEntity.get(entity));
|
||||
}
|
||||
|
||||
public List<String> getParentAliases(String entity) {
|
||||
return parentAliasesByEntity.getOrDefault(entity, List.of());
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
return stagedIndexByEntity.isEmpty();
|
||||
}
|
||||
}
|
||||
@ -1,5 +1,6 @@
|
||||
package org.openmetadata.service.search;
|
||||
|
||||
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
|
||||
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
|
||||
import static org.openmetadata.search.IndexMapping.INDEX_NAME_SEPARATOR;
|
||||
import static org.openmetadata.service.Entity.AGGREGATED_COST_ANALYSIS_REPORT_DATA;
|
||||
@ -236,10 +237,36 @@ public class SearchRepository {
|
||||
|
||||
public void createIndexes() {
|
||||
RecreateIndexHandler recreateIndexHandler = this.createReindexHandler();
|
||||
RecreateIndexHandler.ReindexContext context =
|
||||
recreateIndexHandler.reCreateIndexes(entityIndexMap.keySet());
|
||||
ReindexContext context = recreateIndexHandler.reCreateIndexes(entityIndexMap.keySet());
|
||||
if (context != null) {
|
||||
recreateIndexHandler.finalizeReindex(context, true);
|
||||
for (String entityType : context.getEntities()) {
|
||||
try {
|
||||
String originalIndex = context.getOriginalIndex(entityType).orElse(null);
|
||||
String canonicalIndex = context.getCanonicalIndex(entityType).orElse(null);
|
||||
String activeIndex = context.getOriginalIndex(entityType).orElse(null);
|
||||
String stagedIndex = context.getStagedIndex(entityType).orElse(null);
|
||||
String canonicalAlias = context.getCanonicalAlias(entityType).orElse(null);
|
||||
Set<String> existingAliases = context.getExistingAliases(entityType);
|
||||
Set<String> parentAliases =
|
||||
new HashSet<>(listOrEmpty(context.getParentAliases(entityType)));
|
||||
|
||||
EntityReindexContext entityReindexContext =
|
||||
EntityReindexContext.builder()
|
||||
.entityType(entityType)
|
||||
.originalIndex(originalIndex)
|
||||
.canonicalIndex(canonicalIndex)
|
||||
.activeIndex(activeIndex)
|
||||
.stagedIndex(stagedIndex)
|
||||
.canonicalAliases(canonicalAlias)
|
||||
.existingAliases(existingAliases)
|
||||
.parentAliases(parentAliases)
|
||||
.build();
|
||||
recreateIndexHandler.finalizeReindex(entityReindexContext, true);
|
||||
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Failed to recreate index for entity {}", entityType, ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -346,6 +346,11 @@ public class ElasticSearchClient implements SearchClient<RestHighLevelClient> {
|
||||
return indexManager.getIndicesByAlias(aliasName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> listIndicesByPrefix(String prefix) {
|
||||
return indexManager.listIndicesByPrefix(prefix);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateIndex(IndexMapping indexMapping, String indexMappingContent) {
|
||||
indexManager.updateIndex(indexMapping, indexMappingContent);
|
||||
|
||||
@ -287,4 +287,25 @@ public class ElasticSearchIndexManager implements IndexManagementClient {
|
||||
}
|
||||
return indices;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> listIndicesByPrefix(String prefix) {
|
||||
Set<String> indices = new HashSet<>();
|
||||
if (!isClientAvailable) {
|
||||
LOG.error("ElasticSearch client is not available. Cannot list indices by prefix.");
|
||||
return indices;
|
||||
}
|
||||
try {
|
||||
String pattern = prefix + "*";
|
||||
GetAliasRequest request = GetAliasRequest.of(g -> g.index(pattern));
|
||||
GetAliasResponse response = client.indices().getAlias(request);
|
||||
|
||||
indices.addAll(response.result().keySet());
|
||||
|
||||
LOG.info("Retrieved {} indices matching prefix '{}': {}", indices.size(), prefix, indices);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to list indices by prefix {} due to", prefix, e);
|
||||
}
|
||||
return indices;
|
||||
}
|
||||
}
|
||||
|
||||
@ -352,6 +352,11 @@ public class OpenSearchClient implements SearchClient<RestHighLevelClient> {
|
||||
return indexManager.getIndicesByAlias(aliasName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> listIndicesByPrefix(String prefix) {
|
||||
return indexManager.listIndicesByPrefix(prefix);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateIndex(IndexMapping indexMapping, String indexMappingContent) {
|
||||
indexManager.updateIndex(indexMapping, indexMappingContent);
|
||||
|
||||
@ -411,4 +411,25 @@ public class OpenSearchIndexManager implements IndexManagementClient {
|
||||
}
|
||||
return indices;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> listIndicesByPrefix(String prefix) {
|
||||
Set<String> indices = new HashSet<>();
|
||||
if (!isClientAvailable) {
|
||||
LOG.error("OpenSearch client is not available. Cannot list indices by prefix.");
|
||||
return indices;
|
||||
}
|
||||
try {
|
||||
String pattern = prefix + "*";
|
||||
GetAliasRequest request = GetAliasRequest.of(g -> g.index(pattern));
|
||||
GetAliasResponse response = client.indices().getAlias(request);
|
||||
|
||||
indices.addAll(response.result().keySet());
|
||||
|
||||
LOG.info("Retrieved {} indices matching prefix '{}': {}", indices.size(), prefix, indices);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to list indices by prefix {} due to", prefix, e);
|
||||
}
|
||||
return indices;
|
||||
}
|
||||
}
|
||||
|
||||
@ -38,7 +38,6 @@ import java.util.Objects;
|
||||
import java.util.Scanner;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flywaydb.core.Flyway;
|
||||
@ -67,6 +66,7 @@ import org.openmetadata.schema.type.EntityReference;
|
||||
import org.openmetadata.schema.type.Include;
|
||||
import org.openmetadata.schema.utils.JsonUtils;
|
||||
import org.openmetadata.sdk.PipelineServiceClientInterface;
|
||||
import org.openmetadata.search.IndexMapping;
|
||||
import org.openmetadata.search.IndexMappingLoader;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.OpenMetadataApplicationConfig;
|
||||
@ -152,7 +152,7 @@ public class OpenMetadataOperations implements Callable<Integer> {
|
||||
LOG.info(
|
||||
"Subcommand needed: 'info', 'validate', 'repair', 'check-connection', "
|
||||
+ "'drop-create', 'changelog', 'migrate', 'migrate-secrets', 'reindex', 'reindex-rdf', 'deploy-pipelines', "
|
||||
+ "'dbServiceCleanup', 'relationshipCleanup', 'drop-indexes', 'remove-security-config'");
|
||||
+ "'dbServiceCleanup', 'relationshipCleanup', 'drop-indexes', 'remove-security-config', 'create-indexes'");
|
||||
LOG.info(
|
||||
"Use 'reindex --auto-tune' for automatic performance optimization based on cluster capabilities");
|
||||
return 0;
|
||||
@ -1435,15 +1435,28 @@ public class OpenMetadataOperations implements Callable<Integer> {
|
||||
// Drop regular search repository indexes
|
||||
for (String entityType : searchRepository.getEntityIndexMap().keySet()) {
|
||||
LOG.info("Dropping index for entity type: {}", entityType);
|
||||
searchRepository.deleteIndex(searchRepository.getIndexMapping(entityType));
|
||||
IndexMapping entityIndexMapping = searchRepository.getIndexMapping(entityType);
|
||||
Set<String> allEntityIndices =
|
||||
searchRepository
|
||||
.getSearchClient()
|
||||
.listIndicesByPrefix(
|
||||
entityIndexMapping.getIndexName(searchRepository.getClusterAlias()));
|
||||
for (String oldIndex : allEntityIndices) {
|
||||
try {
|
||||
if (searchRepository.getSearchClient().indexExists(oldIndex)) {
|
||||
searchRepository.getSearchClient().deleteIndex(oldIndex);
|
||||
LOG.info("Cleaned up old index '{}' for entity '{}'.", oldIndex, entityType);
|
||||
}
|
||||
} catch (Exception deleteEx) {
|
||||
LOG.warn(
|
||||
"Failed to delete old index '{}' for entity '{}'.", oldIndex, entityType, deleteEx);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Drop data streams and data quality indexes created by DataInsightsApp
|
||||
dropDataInsightsIndexes();
|
||||
|
||||
// Drop orphaned rebuild indexes from zero-downtime reindexing
|
||||
dropRebuildIndexes();
|
||||
|
||||
LOG.info("All indexes dropped successfully.");
|
||||
return 0;
|
||||
} catch (Exception e) {
|
||||
@ -1452,6 +1465,22 @@ public class OpenMetadataOperations implements Callable<Integer> {
|
||||
}
|
||||
}
|
||||
|
||||
@Command(name = "create-indexes", description = "Creates Indexes for Elastic/OpenSearch")
|
||||
public Integer createIndexes() {
|
||||
try {
|
||||
LOG.info("Creating indexes for search engine...");
|
||||
parseConfig();
|
||||
searchRepository.createIndexes();
|
||||
createDataInsightsIndexes();
|
||||
Entity.cleanup();
|
||||
LOG.info("All indexes created successfully.");
|
||||
return 0;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to drop create due to ", e);
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
private void dropDataInsightsIndexes() {
|
||||
try {
|
||||
LOG.info("Dropping Data Insights data streams and indexes...");
|
||||
@ -1474,36 +1503,25 @@ public class OpenMetadataOperations implements Callable<Integer> {
|
||||
}
|
||||
}
|
||||
|
||||
private void dropRebuildIndexes() {
|
||||
private void createDataInsightsIndexes() {
|
||||
try {
|
||||
LOG.info("Dropping orphaned rebuild indexes from zero-downtime reindexing...");
|
||||
LOG.info("Create Data Insights data streams and indexes...");
|
||||
|
||||
Set<String> allIndices = getAllIndices();
|
||||
List<String> rebuildIndices =
|
||||
allIndices.stream()
|
||||
.filter(index -> index.contains("_rebuild_"))
|
||||
.collect(Collectors.toList());
|
||||
// Create a DataInsightsApp instance to access its cleanup methods
|
||||
DataInsightsApp dataInsightsApp = new DataInsightsApp(collectionDAO, searchRepository);
|
||||
|
||||
if (rebuildIndices.isEmpty()) {
|
||||
LOG.info("No rebuild indexes found to delete.");
|
||||
return;
|
||||
}
|
||||
// Drop data assets data streams
|
||||
LOG.info("Create/Update data assets data streams...");
|
||||
dataInsightsApp.createOrUpdateDataAssetsDataStream();
|
||||
|
||||
LOG.info("Found {} rebuild indexes to delete: {}", rebuildIndices.size(), rebuildIndices);
|
||||
// Drop data quality indexes
|
||||
LOG.info("Create/Updated data quality indexes...");
|
||||
dataInsightsApp.createDataQualityDataIndex();
|
||||
|
||||
for (String index : rebuildIndices) {
|
||||
try {
|
||||
searchRepository.getSearchClient().deleteIndex(index);
|
||||
LOG.info("Deleted rebuild index: {}", index);
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("Failed to delete rebuild index {}: {}", index, ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
LOG.info("Rebuild index cleanup completed.");
|
||||
LOG.info("Data Insights indexes and data streams created successfully.");
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to drop rebuild indexes: {}", e.getMessage());
|
||||
LOG.debug("Rebuild index cleanup error details: ", e);
|
||||
LOG.warn("Failed to create some Data Insights indexes: {}", e.getMessage());
|
||||
LOG.debug("Data Insights index creation error details: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -60,7 +60,8 @@ import org.openmetadata.service.OpenMetadataApplicationTest;
|
||||
import org.openmetadata.service.exception.SearchIndexException;
|
||||
import org.openmetadata.service.jdbi3.CollectionDAO;
|
||||
import org.openmetadata.service.search.DefaultRecreateHandler;
|
||||
import org.openmetadata.service.search.RecreateIndexHandler;
|
||||
import org.openmetadata.service.search.EntityReindexContext;
|
||||
import org.openmetadata.service.search.ReindexContext;
|
||||
import org.openmetadata.service.search.SearchClient;
|
||||
import org.openmetadata.service.search.SearchRepository;
|
||||
import org.openmetadata.service.socket.WebSocketManager;
|
||||
@ -201,7 +202,7 @@ class SearchIndexAppTest extends OpenMetadataApplicationTest {
|
||||
|
||||
searchIndexApp.init(testApp);
|
||||
|
||||
RecreateIndexHandler.ReindexContext context = new RecreateIndexHandler.ReindexContext();
|
||||
ReindexContext context = new ReindexContext();
|
||||
context.add(
|
||||
"table",
|
||||
"cluster_table",
|
||||
@ -245,17 +246,20 @@ class SearchIndexAppTest extends OpenMetadataApplicationTest {
|
||||
try (MockedStatic<Entity> entityMock = mockStatic(Entity.class)) {
|
||||
entityMock.when(Entity::getSearchRepository).thenReturn(repo);
|
||||
|
||||
RecreateIndexHandler.ReindexContext context = new RecreateIndexHandler.ReindexContext();
|
||||
context.add(
|
||||
"table",
|
||||
"table_search_index",
|
||||
"table_search_index_rebuild_old",
|
||||
"table_search_index_rebuild_new",
|
||||
Set.of("table", "table_search_index", "all", "dataAsset"),
|
||||
"table",
|
||||
List.of("all", "dataAsset", "database", "databaseSchema", "databaseService"));
|
||||
EntityReindexContext entityReindexContext =
|
||||
EntityReindexContext.builder()
|
||||
.entityType("table")
|
||||
.canonicalIndex("table_search_index")
|
||||
.originalIndex("table_search_index_rebuild_old")
|
||||
.activeIndex("table_search_index_rebuild_old")
|
||||
.stagedIndex("table_search_index_rebuild_new")
|
||||
.existingAliases(Set.of("table", "table_search_index", "all", "dataAsset"))
|
||||
.canonicalAliases("table")
|
||||
.parentAliases(
|
||||
Set.of("all", "dataAsset", "database", "databaseSchema", "databaseService"))
|
||||
.build();
|
||||
|
||||
new DefaultRecreateHandler().finalizeReindex(context, true);
|
||||
new DefaultRecreateHandler().finalizeReindex(entityReindexContext, true);
|
||||
}
|
||||
|
||||
assertTrue(aliasState.deletedIndices.contains("table_search_index_rebuild_old"));
|
||||
@ -292,17 +296,19 @@ class SearchIndexAppTest extends OpenMetadataApplicationTest {
|
||||
try (MockedStatic<Entity> entityMock = mockStatic(Entity.class)) {
|
||||
entityMock.when(Entity::getSearchRepository).thenReturn(repo);
|
||||
|
||||
RecreateIndexHandler.ReindexContext context = new RecreateIndexHandler.ReindexContext();
|
||||
context.add(
|
||||
"table",
|
||||
"table_search_index",
|
||||
"table_search_index_rebuild_old1",
|
||||
"table_search_index_rebuild_new",
|
||||
Set.of("table", "table_search_index", "all", "dataAsset"),
|
||||
"table",
|
||||
List.of("all", "dataAsset"));
|
||||
EntityReindexContext entityReindexContext =
|
||||
EntityReindexContext.builder()
|
||||
.entityType("table")
|
||||
.canonicalIndex("table_search_index")
|
||||
.originalIndex("table_search_index_rebuild_old1")
|
||||
.activeIndex("table_search_index_rebuild_old1")
|
||||
.stagedIndex("table_search_index_rebuild_new")
|
||||
.existingAliases(Set.of("table", "table_search_index", "all", "dataAsset"))
|
||||
.canonicalAliases("table")
|
||||
.parentAliases(Set.of("all", "dataAsset"))
|
||||
.build();
|
||||
|
||||
new DefaultRecreateHandler().finalizeReindex(context, true);
|
||||
new DefaultRecreateHandler().finalizeReindex(entityReindexContext, true);
|
||||
}
|
||||
|
||||
assertTrue(aliasState.deletedIndices.contains("table_search_index_rebuild_old1"));
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user