From 73eca212ce29ac149bc94337a9a65be1a6ef8cc3 Mon Sep 17 00:00:00 2001 From: Mohit Yadav <105265192+mohityadav766@users.noreply.github.com> Date: Thu, 16 Oct 2025 12:43:41 +0530 Subject: [PATCH] Fix Indexing Recreate (#23867) * Fix Indexing Recreate * Fix OpenMetadata Operation Creation and Deletion * Default Should Cleanup all precreated prefixed indices --- .../bundles/insights/DataInsightsApp.java | 4 +- .../searchIndex/OpenSearchBulkSink.java | 8 +- .../bundles/searchIndex/SearchIndexApp.java | 49 ++- .../search/DefaultRecreateHandler.java | 303 ++++++++++-------- .../service/search/EntityReindexContext.java | 20 ++ .../service/search/IndexManagementClient.java | 8 + .../service/search/RecreateIndexHandler.java | 66 +--- .../service/search/ReindexContext.java | 67 ++++ .../service/search/SearchRepository.java | 33 +- .../elasticsearch/ElasticSearchClient.java | 5 + .../ElasticSearchIndexManager.java | 21 ++ .../search/opensearch/OpenSearchClient.java | 5 + .../opensearch/OpenSearchIndexManager.java | 21 ++ .../service/util/OpenMetadataOperations.java | 78 +++-- .../searchIndex/SearchIndexAppTest.java | 50 +-- 15 files changed, 466 insertions(+), 272 deletions(-) create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/search/EntityReindexContext.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/search/ReindexContext.java diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java index cdfc5955ece..1927b2d0f0d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java @@ -149,7 +149,7 @@ public class DataInsightsApp extends AbstractNativeApplication { } } - private void createDataQualityDataIndex() { + public void createDataQualityDataIndex() { try { createIndexInternal(Entity.TEST_CASE_RESULT); createIndexInternal(Entity.TEST_CASE_RESOLUTION_STATUS); @@ -165,7 +165,7 @@ public class DataInsightsApp extends AbstractNativeApplication { deleteIndexInternal(Entity.TEST_CASE_RESOLUTION_STATUS); } - private void createOrUpdateDataAssetsDataStream() { + public void createOrUpdateDataAssetsDataStream() { DataInsightsSearchInterface searchInterface = getSearchInterface(); ElasticSearchConfiguration config = searchRepository.getSearchConfiguration(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSink.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSink.java index 08665994fb1..88152e64673 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSink.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSink.java @@ -17,7 +17,7 @@ import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.search.IndexMapping; import org.openmetadata.service.Entity; import org.openmetadata.service.exception.SearchIndexException; -import org.openmetadata.service.search.RecreateIndexHandler; +import org.openmetadata.service.search.ReindexContext; import org.openmetadata.service.search.SearchRepository; import org.openmetadata.service.search.opensearch.OpenSearchClient; import os.org.opensearch.action.bulk.BackoffPolicy; @@ -222,7 +222,7 @@ public class OpenSearchBulkSink implements BulkSink { indexName, recreateIndex, (contextData.containsKey(RECREATE_CONTEXT) - ? (RecreateIndexHandler.ReindexContext) contextData.get(RECREATE_CONTEXT) + ? (ReindexContext) contextData.get(RECREATE_CONTEXT) : null), embeddingsEnabled); } @@ -247,7 +247,7 @@ public class OpenSearchBulkSink implements BulkSink { EntityInterface entity, String indexName, boolean recreateIndex, - RecreateIndexHandler.ReindexContext reindexContext, + ReindexContext reindexContext, boolean embeddingsEnabled) { // Build the search index document using the proper transformation String entityType = Entity.getEntityTypeFromObject(entity); @@ -369,5 +369,5 @@ public class OpenSearchBulkSink implements BulkSink { BulkProcessor bulkProcessor, EntityInterface entity, boolean recreateIndex, - RecreateIndexHandler.ReindexContext reindexContext) {} + ReindexContext reindexContext) {} } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java index 2b799f786e8..9a495eedddc 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java @@ -63,8 +63,9 @@ import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.jdbi3.EntityTimeSeriesRepository; import org.openmetadata.service.jdbi3.ListFilter; import org.openmetadata.service.jdbi3.SystemRepository; +import org.openmetadata.service.search.EntityReindexContext; import org.openmetadata.service.search.RecreateIndexHandler; -import org.openmetadata.service.search.RecreateIndexHandler.ReindexContext; +import org.openmetadata.service.search.ReindexContext; import org.openmetadata.service.search.SearchClusterMetrics; import org.openmetadata.service.search.SearchRepository; import org.openmetadata.service.socket.WebSocketManager; @@ -259,7 +260,24 @@ public class SearchIndexApp extends AbstractNativeApplication { success = jobData != null && jobData.getStatus() == EventPublisherJob.Status.COMPLETED; handleJobCompletion(); } finally { - finalizeRecreateIndexes(success); + finalizeAllEntityReindex(success); + } + } + + private void finalizeAllEntityReindex(boolean finalSuccess) { + try { + recreateContext + .getEntities() + .forEach( + entityType -> { + try { + finalizeEntityReindex(entityType, finalSuccess); + } catch (Exception ex) { + LOG.error("Failed to finalize reindex for entity: {}", entityType, ex); + } + }); + } finally { + recreateContext = null; } } @@ -371,17 +389,36 @@ public class SearchIndexApp extends AbstractNativeApplication { return Optional.empty(); } - private void finalizeRecreateIndexes(boolean success) { + private void finalizeEntityReindex(String entityType, boolean success) { if (recreateIndexHandler == null || recreateContext == null) { return; } + String originalIndex = recreateContext.getOriginalIndex(entityType).orElse(null); + String canonicalIndex = recreateContext.getCanonicalIndex(entityType).orElse(null); + String activeIndex = recreateContext.getOriginalIndex(entityType).orElse(null); + String stagedIndex = recreateContext.getStagedIndex(entityType).orElse(null); + String canonicalAlias = recreateContext.getCanonicalAlias(entityType).orElse(null); + Set existingAliases = recreateContext.getExistingAliases(entityType); + Set parentAliases = + new HashSet<>(listOrEmpty(recreateContext.getParentAliases(entityType))); + + EntityReindexContext entityReindexContext = + EntityReindexContext.builder() + .entityType(entityType) + .originalIndex(originalIndex) + .canonicalIndex(canonicalIndex) + .activeIndex(activeIndex) + .stagedIndex(stagedIndex) + .canonicalAliases(canonicalAlias) + .existingAliases(existingAliases) + .parentAliases(parentAliases) + .build(); + try { - recreateIndexHandler.finalizeReindex(recreateContext, success); + recreateIndexHandler.finalizeReindex(entityReindexContext, success); } catch (Exception ex) { LOG.error("Failed to finalize index recreation flow", ex); - } finally { - recreateContext = null; } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java index 974036af36c..8fd58d66be7 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java @@ -1,13 +1,12 @@ package org.openmetadata.service.search; +import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; + import java.util.HashSet; -import java.util.List; import java.util.Set; import lombok.extern.slf4j.Slf4j; -import org.openmetadata.common.utils.CommonUtil; import org.openmetadata.search.IndexMapping; import org.openmetadata.service.Entity; -import org.openmetadata.service.search.RecreateIndexHandler.ReindexContext; /** * Default implementation of RecreateHandler that provides zero-downtime index recreation. @@ -20,13 +19,10 @@ public class DefaultRecreateHandler implements RecreateIndexHandler { ReindexContext context = new ReindexContext(); SearchRepository searchRepository = Entity.getSearchRepository(); - if (CommonUtil.nullOrEmpty(entities)) { + if (nullOrEmpty(entities)) { return context; } - String clusterAlias = searchRepository.getClusterAlias(); - SearchClient searchClient = searchRepository.getSearchClient(); - for (String entityType : entities) { IndexMapping indexMapping = searchRepository.getIndexMapping(entityType); if (indexMapping == null) { @@ -35,160 +31,187 @@ public class DefaultRecreateHandler implements RecreateIndexHandler { continue; } - String canonicalIndexName = indexMapping.getIndexName(clusterAlias); - String activeIndexName = canonicalIndexName; - - if (!searchClient.indexExists(canonicalIndexName)) { - Set aliasTargets = - searchClient.getIndicesByAlias(indexMapping.getAlias(clusterAlias)); - if (!aliasTargets.isEmpty()) { - activeIndexName = aliasTargets.iterator().next(); - LOG.debug( - "Resolved active index '{}' for entity '{}' via alias '{}'.", - activeIndexName, - entityType, - indexMapping.getAlias(clusterAlias)); - } else { - LOG.debug( - "No existing index or alias found for entity '{}'. Rebuilding from scratch.", - entityType); - activeIndexName = null; - } - } - - String mappingContent = searchRepository.readIndexMapping(indexMapping); - if (mappingContent == null) { - LOG.warn( - "Unable to read index mapping content for '{}'. Skipping staged recreation.", - canonicalIndexName); - continue; - } - - String stagedIndexName = buildStagedIndexName(canonicalIndexName); - searchClient.createIndex(stagedIndexName, mappingContent); - - Set existingAliases = - activeIndexName != null ? searchClient.getAliases(activeIndexName) : Set.of(); - - context.add( - entityType, - canonicalIndexName, - activeIndexName, - stagedIndexName, - existingAliases, - indexMapping.getAlias(clusterAlias), - indexMapping.getParentAliases(clusterAlias)); - LOG.info( - "Created staged index '{}' for entity '{}' to support zero-downtime recreation.", - stagedIndexName, - entityType); + recreateIndexFromMapping(context, indexMapping, entityType); } return context; } @Override - public void finalizeReindex(ReindexContext context, boolean success) { - if (context == null || context.isEmpty()) { - return; - } + public void finalizeReindex(EntityReindexContext context, boolean reindexSuccess) { + String entityType = context.getEntityType(); + String canonicalIndex = context.getCanonicalIndex(); + String activeIndex = context.getActiveIndex(); + String stagedIndex = context.getStagedIndex(); + Set existingAliases = context.getExistingAliases(); + String canonicalAlias = context.getCanonicalAliases(); + Set parentAliases = context.getParentAliases(); SearchRepository searchRepository = Entity.getSearchRepository(); SearchClient searchClient = searchRepository.getSearchClient(); - for (String entityType : context.getEntities()) { - String canonicalIndex = context.getCanonicalIndex(entityType).orElse(null); - String activeIndex = context.getOriginalIndex(entityType).orElse(null); - String stagedIndex = context.getStagedIndex(entityType).orElse(null); + if (canonicalIndex == null || stagedIndex == null) { + LOG.error( + "Cannot finalize reindex for entity '{}'. Missing canonical or staged index name.", + entityType); + return; + } - if (canonicalIndex == null || stagedIndex == null) { - continue; - } + if (reindexSuccess) { + try { + Set aliasesToAttach = new HashSet<>(); - if (success) { - try { - Set aliasesToAttach = new HashSet<>(); - aliasesToAttach.addAll(context.getExistingAliases(entityType)); - context.getCanonicalAlias(entityType).ifPresent(aliasesToAttach::add); - aliasesToAttach.add(canonicalIndex); - List parentAliases = context.getParentAliases(entityType); - if (parentAliases != null) { - parentAliases.stream() - .filter(alias -> alias != null && !alias.isBlank()) - .forEach(aliasesToAttach::add); - } - aliasesToAttach.removeIf(alias -> alias == null || alias.isBlank()); + // Existing Aliases + existingAliases.stream() + .filter(alias -> alias != null && !alias.isBlank()) + .forEach(aliasesToAttach::add); - for (String alias : aliasesToAttach) { - Set targets = searchClient.getIndicesByAlias(alias); - for (String target : targets) { - if (target.equals(stagedIndex)) { - continue; - } - - boolean belongsToEntity = - target.equals(canonicalIndex) || target.startsWith(canonicalIndex + "_rebuild_"); - - if (!belongsToEntity) { - LOG.debug( - "Skipping alias '{}' removal from index '{}' as it does not belong to entity '{}'.", - alias, - target, - entityType); - continue; - } - - searchClient.removeAliases(target, Set.of(alias)); - LOG.info( - "Removed alias '{}' from index '{}' during promotion for entity '{}'.", - alias, - target, - entityType); - - if (searchClient.indexExists(target)) { - searchClient.deleteIndex(target); - LOG.debug("Replaced old index '{}' for entity '{}'.", target, entityType); - } - } - } - - if (activeIndex != null && searchClient.indexExists(activeIndex)) { - searchClient.deleteIndex(activeIndex); - LOG.debug("Replaced old index '{}' for entity '{}'.", activeIndex, entityType); - } - - if (!aliasesToAttach.isEmpty()) { - searchClient.addAliases(stagedIndex, aliasesToAttach); - } - LOG.info( - "Promoted staged index '{}' to serve entity '{}' (aliases: {}).", - stagedIndex, - entityType, - aliasesToAttach); - } catch (Exception ex) { - LOG.error( - "Failed to promote staged index '{}' for entity '{}'.", stagedIndex, entityType, ex); + // Canonical Alias + if (!nullOrEmpty(canonicalAlias)) { + aliasesToAttach.add(canonicalAlias); } - } else { - try { - if (searchClient.indexExists(stagedIndex)) { - searchClient.deleteIndex(stagedIndex); - LOG.info( - "Deleted staged index '{}' after unsuccessful reindex for entity '{}'.", - stagedIndex, + + // Parent Aliases + parentAliases.stream() + .filter(alias -> alias != null && !alias.isBlank()) + .forEach(aliasesToAttach::add); + + // Remove any null or blank aliases + aliasesToAttach.removeIf(alias -> alias == null || alias.isBlank()); + + Set allEntityIndices = searchClient.listIndicesByPrefix(canonicalIndex); + for (String oldIndex : allEntityIndices) { + if (oldIndex.equals(stagedIndex)) { + LOG.debug( + "Skipping deletion of staged index '{}' for entity '{}'.", stagedIndex, entityType); + continue; + } + + if (activeIndex != null && oldIndex.equals(activeIndex)) { + LOG.debug( + "Skipping deletion of currently active index '{}' for entity '{}' (will be deleted after alias swap).", + activeIndex, entityType); + continue; + } + + try { + if (searchClient.indexExists(oldIndex)) { + searchClient.deleteIndex(oldIndex); + LOG.info("Cleaned up old index '{}' for entity '{}'.", oldIndex, entityType); + } + } catch (Exception deleteEx) { + LOG.warn( + "Failed to delete old index '{}' for entity '{}'.", oldIndex, entityType, deleteEx); } - } catch (Exception ex) { - LOG.warn( - "Failed to delete staged index '{}' for entity '{}' after failure.", - stagedIndex, - entityType, - ex); } + + if (activeIndex != null && searchClient.indexExists(activeIndex)) { + searchClient.deleteIndex(activeIndex); + LOG.info( + "Deleted previously active index '{}' for entity '{}'.", activeIndex, entityType); + } + + if (!aliasesToAttach.isEmpty()) { + searchClient.addAliases(stagedIndex, aliasesToAttach); + } + LOG.info( + "Promoted staged index '{}' to serve entity '{}' (aliases: {}).", + stagedIndex, + entityType, + aliasesToAttach); + } catch (Exception ex) { + LOG.error( + "Failed to promote staged index '{}' for entity '{}'.", stagedIndex, entityType, ex); + } + } else { + try { + if (searchClient.indexExists(stagedIndex)) { + searchClient.deleteIndex(stagedIndex); + LOG.info( + "Deleted staged index '{}' after unsuccessful reindex for entity '{}'.", + stagedIndex, + entityType); + } + } catch (Exception ex) { + LOG.warn( + "Failed to delete staged index '{}' for entity '{}' after failure.", + stagedIndex, + entityType, + ex); } } } + protected void recreateIndexFromMapping( + ReindexContext context, IndexMapping indexMapping, String entityType) { + if (indexMapping == null) { + LOG.warn("IndexMapping is null for entityType '{}'. Cannot recreate index.", entityType); + return; + } + + if (context == null) { + LOG.warn("ReindexContext is null for entityType '{}'. Cannot recreate index.", entityType); + return; + } + + SearchRepository searchRepository = Entity.getSearchRepository(); + String clusterAlias = searchRepository.getClusterAlias(); + SearchClient searchClient = searchRepository.getSearchClient(); + + String canonicalIndexName = indexMapping.getIndexName(clusterAlias); + String activeIndexName = canonicalIndexName; + + if (!searchClient.indexExists(canonicalIndexName)) { + Set aliasTargets = + searchClient.getIndicesByAlias(indexMapping.getAlias(clusterAlias)); + if (!aliasTargets.isEmpty()) { + activeIndexName = aliasTargets.iterator().next(); + LOG.debug( + "Resolved active index '{}' for entity '{}' via alias '{}'.", + activeIndexName, + entityType, + indexMapping.getAlias(clusterAlias)); + } else { + LOG.debug( + "No existing index or alias found for entity '{}'. Rebuilding from scratch.", + entityType); + activeIndexName = null; + } + } + + String mappingContent = searchRepository.readIndexMapping(indexMapping); + if (mappingContent == null) { + LOG.warn( + "Unable to read index mapping content for '{}'. Cannot recreate index.", + canonicalIndexName); + return; + } + + String stagedIndexName = buildStagedIndexName(canonicalIndexName); + searchClient.createIndex(stagedIndexName, mappingContent); + + Set existingAliases = + activeIndexName != null ? searchClient.getAliases(activeIndexName) : new HashSet<>(); + + // Add the default index + existingAliases.add(indexMapping.getAlias(clusterAlias)); + existingAliases.add(indexMapping.getIndexName(clusterAlias)); + context.add( + entityType, + canonicalIndexName, + activeIndexName, + stagedIndexName, + existingAliases, + indexMapping.getAlias(clusterAlias), + indexMapping.getParentAliases(clusterAlias)); + + LOG.info( + "Created staged index '{}' for entity '{}' using provided IndexMapping.", + stagedIndexName, + entityType); + } + private String buildStagedIndexName(String originalIndexName) { return String.format("%s_rebuild_%d", originalIndexName, System.currentTimeMillis()); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/EntityReindexContext.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/EntityReindexContext.java new file mode 100644 index 00000000000..70c7c4ab487 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/EntityReindexContext.java @@ -0,0 +1,20 @@ +package org.openmetadata.service.search; + +import java.util.Set; +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +@Builder +public class EntityReindexContext { + String entityType; + String canonicalIndex; + String originalIndex; + String activeIndex; + String stagedIndex; + String canonicalAliases; + Set existingAliases; + Set parentAliases; +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/IndexManagementClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/IndexManagementClient.java index 025c9305834..1e54c0fea35 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/IndexManagementClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/IndexManagementClient.java @@ -102,4 +102,12 @@ public interface IndexManagementClient { * @return set of indices that have the alias */ Set getIndicesByAlias(String aliasName); + + /** + * Get all indices that match the given prefix. + * + * @param prefix the prefix to match index names against + * @return set of indices that start with the prefix + */ + Set listIndicesByPrefix(String prefix); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/RecreateIndexHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/RecreateIndexHandler.java index 5d746c21482..53717f42703 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/RecreateIndexHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/RecreateIndexHandler.java @@ -1,11 +1,5 @@ package org.openmetadata.service.search; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; import java.util.Set; /** @@ -15,63 +9,5 @@ import java.util.Set; public interface RecreateIndexHandler { ReindexContext reCreateIndexes(Set entities); - default void finalizeReindex(ReindexContext context, boolean success) {} - - class ReindexContext { - private final Map canonicalIndexByEntity = new HashMap<>(); - private final Map originalIndexByEntity = new HashMap<>(); - private final Map stagedIndexByEntity = new HashMap<>(); - private final Map> existingAliasesByEntity = new HashMap<>(); - private final Map canonicalAliasByEntity = new HashMap<>(); - private final Map> parentAliasesByEntity = new HashMap<>(); - - public void add( - String entity, - String canonicalIndex, - String originalIndex, - String stagedIndex, - Set existingAliases, - String canonicalAlias, - List parentAliases) { - canonicalIndexByEntity.put(entity, canonicalIndex); - originalIndexByEntity.put(entity, originalIndex); - stagedIndexByEntity.put(entity, stagedIndex); - existingAliasesByEntity.put( - entity, new HashSet<>(Optional.ofNullable(existingAliases).orElseGet(HashSet::new))); - canonicalAliasByEntity.put(entity, canonicalAlias); - parentAliasesByEntity.put(entity, parentAliases != null ? parentAliases : List.of()); - } - - public Optional getCanonicalIndex(String entity) { - return Optional.ofNullable(canonicalIndexByEntity.get(entity)); - } - - public Set getEntities() { - return Collections.unmodifiableSet(stagedIndexByEntity.keySet()); - } - - public Optional getStagedIndex(String entity) { - return Optional.ofNullable(stagedIndexByEntity.get(entity)); - } - - public Optional getOriginalIndex(String entity) { - return Optional.ofNullable(originalIndexByEntity.get(entity)); - } - - public Set getExistingAliases(String entity) { - return existingAliasesByEntity.getOrDefault(entity, Collections.emptySet()); - } - - public Optional getCanonicalAlias(String entity) { - return Optional.ofNullable(canonicalAliasByEntity.get(entity)); - } - - public List getParentAliases(String entity) { - return parentAliasesByEntity.getOrDefault(entity, List.of()); - } - - public boolean isEmpty() { - return stagedIndexByEntity.isEmpty(); - } - } + default void finalizeReindex(EntityReindexContext entityReindexContext, boolean reindexSuccess) {} } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/ReindexContext.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/ReindexContext.java new file mode 100644 index 00000000000..c73989f3163 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/ReindexContext.java @@ -0,0 +1,67 @@ +package org.openmetadata.service.search; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public class ReindexContext { + private final Map canonicalIndexByEntity = new HashMap<>(); + private final Map originalIndexByEntity = new HashMap<>(); + private final Map stagedIndexByEntity = new HashMap<>(); + private final Map> existingAliasesByEntity = new HashMap<>(); + private final Map canonicalAliasByEntity = new HashMap<>(); + private final Map> parentAliasesByEntity = new HashMap<>(); + + public void add( + String entity, + String canonicalIndex, + String originalIndex, + String stagedIndex, + Set existingAliases, + String canonicalAlias, + List parentAliases) { + canonicalIndexByEntity.put(entity, canonicalIndex); + originalIndexByEntity.put(entity, originalIndex); + stagedIndexByEntity.put(entity, stagedIndex); + existingAliasesByEntity.put( + entity, new HashSet<>(Optional.ofNullable(existingAliases).orElseGet(HashSet::new))); + canonicalAliasByEntity.put(entity, canonicalAlias); + parentAliasesByEntity.put(entity, parentAliases != null ? parentAliases : List.of()); + } + + public Optional getCanonicalIndex(String entity) { + return Optional.ofNullable(canonicalIndexByEntity.get(entity)); + } + + public Set getEntities() { + return Collections.unmodifiableSet(stagedIndexByEntity.keySet()); + } + + public Optional getStagedIndex(String entity) { + return Optional.ofNullable(stagedIndexByEntity.get(entity)); + } + + public Optional getOriginalIndex(String entity) { + return Optional.ofNullable(originalIndexByEntity.get(entity)); + } + + public Set getExistingAliases(String entity) { + return existingAliasesByEntity.getOrDefault(entity, Collections.emptySet()); + } + + public Optional getCanonicalAlias(String entity) { + return Optional.ofNullable(canonicalAliasByEntity.get(entity)); + } + + public List getParentAliases(String entity) { + return parentAliasesByEntity.getOrDefault(entity, List.of()); + } + + public boolean isEmpty() { + return stagedIndexByEntity.isEmpty(); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java index f15b838296a..0f431090890 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java @@ -1,5 +1,6 @@ package org.openmetadata.service.search; +import static org.openmetadata.common.utils.CommonUtil.listOrEmpty; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.search.IndexMapping.INDEX_NAME_SEPARATOR; import static org.openmetadata.service.Entity.AGGREGATED_COST_ANALYSIS_REPORT_DATA; @@ -236,10 +237,36 @@ public class SearchRepository { public void createIndexes() { RecreateIndexHandler recreateIndexHandler = this.createReindexHandler(); - RecreateIndexHandler.ReindexContext context = - recreateIndexHandler.reCreateIndexes(entityIndexMap.keySet()); + ReindexContext context = recreateIndexHandler.reCreateIndexes(entityIndexMap.keySet()); if (context != null) { - recreateIndexHandler.finalizeReindex(context, true); + for (String entityType : context.getEntities()) { + try { + String originalIndex = context.getOriginalIndex(entityType).orElse(null); + String canonicalIndex = context.getCanonicalIndex(entityType).orElse(null); + String activeIndex = context.getOriginalIndex(entityType).orElse(null); + String stagedIndex = context.getStagedIndex(entityType).orElse(null); + String canonicalAlias = context.getCanonicalAlias(entityType).orElse(null); + Set existingAliases = context.getExistingAliases(entityType); + Set parentAliases = + new HashSet<>(listOrEmpty(context.getParentAliases(entityType))); + + EntityReindexContext entityReindexContext = + EntityReindexContext.builder() + .entityType(entityType) + .originalIndex(originalIndex) + .canonicalIndex(canonicalIndex) + .activeIndex(activeIndex) + .stagedIndex(stagedIndex) + .canonicalAliases(canonicalAlias) + .existingAliases(existingAliases) + .parentAliases(parentAliases) + .build(); + recreateIndexHandler.finalizeReindex(entityReindexContext, true); + + } catch (Exception ex) { + LOG.error("Failed to recreate index for entity {}", entityType, ex); + } + } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java index c34121961e4..551001b46dd 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java @@ -346,6 +346,11 @@ public class ElasticSearchClient implements SearchClient { return indexManager.getIndicesByAlias(aliasName); } + @Override + public Set listIndicesByPrefix(String prefix) { + return indexManager.listIndicesByPrefix(prefix); + } + @Override public void updateIndex(IndexMapping indexMapping, String indexMappingContent) { indexManager.updateIndex(indexMapping, indexMappingContent); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexManager.java index b206afaf87b..d86ec7d7b22 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexManager.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexManager.java @@ -287,4 +287,25 @@ public class ElasticSearchIndexManager implements IndexManagementClient { } return indices; } + + @Override + public Set listIndicesByPrefix(String prefix) { + Set indices = new HashSet<>(); + if (!isClientAvailable) { + LOG.error("ElasticSearch client is not available. Cannot list indices by prefix."); + return indices; + } + try { + String pattern = prefix + "*"; + GetAliasRequest request = GetAliasRequest.of(g -> g.index(pattern)); + GetAliasResponse response = client.indices().getAlias(request); + + indices.addAll(response.result().keySet()); + + LOG.info("Retrieved {} indices matching prefix '{}': {}", indices.size(), prefix, indices); + } catch (Exception e) { + LOG.error("Failed to list indices by prefix {} due to", prefix, e); + } + return indices; + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java index d3d5927bb8c..03894084275 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java @@ -352,6 +352,11 @@ public class OpenSearchClient implements SearchClient { return indexManager.getIndicesByAlias(aliasName); } + @Override + public Set listIndicesByPrefix(String prefix) { + return indexManager.listIndicesByPrefix(prefix); + } + @Override public void updateIndex(IndexMapping indexMapping, String indexMappingContent) { indexManager.updateIndex(indexMapping, indexMappingContent); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchIndexManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchIndexManager.java index b33daee172e..984d6a107fa 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchIndexManager.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchIndexManager.java @@ -411,4 +411,25 @@ public class OpenSearchIndexManager implements IndexManagementClient { } return indices; } + + @Override + public Set listIndicesByPrefix(String prefix) { + Set indices = new HashSet<>(); + if (!isClientAvailable) { + LOG.error("OpenSearch client is not available. Cannot list indices by prefix."); + return indices; + } + try { + String pattern = prefix + "*"; + GetAliasRequest request = GetAliasRequest.of(g -> g.index(pattern)); + GetAliasResponse response = client.indices().getAlias(request); + + indices.addAll(response.result().keySet()); + + LOG.info("Retrieved {} indices matching prefix '{}': {}", indices.size(), prefix, indices); + } catch (Exception e) { + LOG.error("Failed to list indices by prefix {} due to", prefix, e); + } + return indices; + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java index e5086378fda..402b9f32f68 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java @@ -38,7 +38,6 @@ import java.util.Objects; import java.util.Scanner; import java.util.Set; import java.util.concurrent.Callable; -import java.util.stream.Collectors; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.flywaydb.core.Flyway; @@ -67,6 +66,7 @@ import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.Include; import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.sdk.PipelineServiceClientInterface; +import org.openmetadata.search.IndexMapping; import org.openmetadata.search.IndexMappingLoader; import org.openmetadata.service.Entity; import org.openmetadata.service.OpenMetadataApplicationConfig; @@ -152,7 +152,7 @@ public class OpenMetadataOperations implements Callable { LOG.info( "Subcommand needed: 'info', 'validate', 'repair', 'check-connection', " + "'drop-create', 'changelog', 'migrate', 'migrate-secrets', 'reindex', 'reindex-rdf', 'deploy-pipelines', " - + "'dbServiceCleanup', 'relationshipCleanup', 'drop-indexes', 'remove-security-config'"); + + "'dbServiceCleanup', 'relationshipCleanup', 'drop-indexes', 'remove-security-config', 'create-indexes'"); LOG.info( "Use 'reindex --auto-tune' for automatic performance optimization based on cluster capabilities"); return 0; @@ -1435,15 +1435,28 @@ public class OpenMetadataOperations implements Callable { // Drop regular search repository indexes for (String entityType : searchRepository.getEntityIndexMap().keySet()) { LOG.info("Dropping index for entity type: {}", entityType); - searchRepository.deleteIndex(searchRepository.getIndexMapping(entityType)); + IndexMapping entityIndexMapping = searchRepository.getIndexMapping(entityType); + Set allEntityIndices = + searchRepository + .getSearchClient() + .listIndicesByPrefix( + entityIndexMapping.getIndexName(searchRepository.getClusterAlias())); + for (String oldIndex : allEntityIndices) { + try { + if (searchRepository.getSearchClient().indexExists(oldIndex)) { + searchRepository.getSearchClient().deleteIndex(oldIndex); + LOG.info("Cleaned up old index '{}' for entity '{}'.", oldIndex, entityType); + } + } catch (Exception deleteEx) { + LOG.warn( + "Failed to delete old index '{}' for entity '{}'.", oldIndex, entityType, deleteEx); + } + } } // Drop data streams and data quality indexes created by DataInsightsApp dropDataInsightsIndexes(); - // Drop orphaned rebuild indexes from zero-downtime reindexing - dropRebuildIndexes(); - LOG.info("All indexes dropped successfully."); return 0; } catch (Exception e) { @@ -1452,6 +1465,22 @@ public class OpenMetadataOperations implements Callable { } } + @Command(name = "create-indexes", description = "Creates Indexes for Elastic/OpenSearch") + public Integer createIndexes() { + try { + LOG.info("Creating indexes for search engine..."); + parseConfig(); + searchRepository.createIndexes(); + createDataInsightsIndexes(); + Entity.cleanup(); + LOG.info("All indexes created successfully."); + return 0; + } catch (Exception e) { + LOG.error("Failed to drop create due to ", e); + return 1; + } + } + private void dropDataInsightsIndexes() { try { LOG.info("Dropping Data Insights data streams and indexes..."); @@ -1474,36 +1503,25 @@ public class OpenMetadataOperations implements Callable { } } - private void dropRebuildIndexes() { + private void createDataInsightsIndexes() { try { - LOG.info("Dropping orphaned rebuild indexes from zero-downtime reindexing..."); + LOG.info("Create Data Insights data streams and indexes..."); - Set allIndices = getAllIndices(); - List rebuildIndices = - allIndices.stream() - .filter(index -> index.contains("_rebuild_")) - .collect(Collectors.toList()); + // Create a DataInsightsApp instance to access its cleanup methods + DataInsightsApp dataInsightsApp = new DataInsightsApp(collectionDAO, searchRepository); - if (rebuildIndices.isEmpty()) { - LOG.info("No rebuild indexes found to delete."); - return; - } + // Drop data assets data streams + LOG.info("Create/Update data assets data streams..."); + dataInsightsApp.createOrUpdateDataAssetsDataStream(); - LOG.info("Found {} rebuild indexes to delete: {}", rebuildIndices.size(), rebuildIndices); + // Drop data quality indexes + LOG.info("Create/Updated data quality indexes..."); + dataInsightsApp.createDataQualityDataIndex(); - for (String index : rebuildIndices) { - try { - searchRepository.getSearchClient().deleteIndex(index); - LOG.info("Deleted rebuild index: {}", index); - } catch (Exception ex) { - LOG.warn("Failed to delete rebuild index {}: {}", index, ex.getMessage()); - } - } - - LOG.info("Rebuild index cleanup completed."); + LOG.info("Data Insights indexes and data streams created successfully."); } catch (Exception e) { - LOG.warn("Failed to drop rebuild indexes: {}", e.getMessage()); - LOG.debug("Rebuild index cleanup error details: ", e); + LOG.warn("Failed to create some Data Insights indexes: {}", e.getMessage()); + LOG.debug("Data Insights index creation error details: ", e); } } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexAppTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexAppTest.java index 7089243ae20..2a523ca7cc2 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexAppTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexAppTest.java @@ -60,7 +60,8 @@ import org.openmetadata.service.OpenMetadataApplicationTest; import org.openmetadata.service.exception.SearchIndexException; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.search.DefaultRecreateHandler; -import org.openmetadata.service.search.RecreateIndexHandler; +import org.openmetadata.service.search.EntityReindexContext; +import org.openmetadata.service.search.ReindexContext; import org.openmetadata.service.search.SearchClient; import org.openmetadata.service.search.SearchRepository; import org.openmetadata.service.socket.WebSocketManager; @@ -201,7 +202,7 @@ class SearchIndexAppTest extends OpenMetadataApplicationTest { searchIndexApp.init(testApp); - RecreateIndexHandler.ReindexContext context = new RecreateIndexHandler.ReindexContext(); + ReindexContext context = new ReindexContext(); context.add( "table", "cluster_table", @@ -245,17 +246,20 @@ class SearchIndexAppTest extends OpenMetadataApplicationTest { try (MockedStatic entityMock = mockStatic(Entity.class)) { entityMock.when(Entity::getSearchRepository).thenReturn(repo); - RecreateIndexHandler.ReindexContext context = new RecreateIndexHandler.ReindexContext(); - context.add( - "table", - "table_search_index", - "table_search_index_rebuild_old", - "table_search_index_rebuild_new", - Set.of("table", "table_search_index", "all", "dataAsset"), - "table", - List.of("all", "dataAsset", "database", "databaseSchema", "databaseService")); + EntityReindexContext entityReindexContext = + EntityReindexContext.builder() + .entityType("table") + .canonicalIndex("table_search_index") + .originalIndex("table_search_index_rebuild_old") + .activeIndex("table_search_index_rebuild_old") + .stagedIndex("table_search_index_rebuild_new") + .existingAliases(Set.of("table", "table_search_index", "all", "dataAsset")) + .canonicalAliases("table") + .parentAliases( + Set.of("all", "dataAsset", "database", "databaseSchema", "databaseService")) + .build(); - new DefaultRecreateHandler().finalizeReindex(context, true); + new DefaultRecreateHandler().finalizeReindex(entityReindexContext, true); } assertTrue(aliasState.deletedIndices.contains("table_search_index_rebuild_old")); @@ -292,17 +296,19 @@ class SearchIndexAppTest extends OpenMetadataApplicationTest { try (MockedStatic entityMock = mockStatic(Entity.class)) { entityMock.when(Entity::getSearchRepository).thenReturn(repo); - RecreateIndexHandler.ReindexContext context = new RecreateIndexHandler.ReindexContext(); - context.add( - "table", - "table_search_index", - "table_search_index_rebuild_old1", - "table_search_index_rebuild_new", - Set.of("table", "table_search_index", "all", "dataAsset"), - "table", - List.of("all", "dataAsset")); + EntityReindexContext entityReindexContext = + EntityReindexContext.builder() + .entityType("table") + .canonicalIndex("table_search_index") + .originalIndex("table_search_index_rebuild_old1") + .activeIndex("table_search_index_rebuild_old1") + .stagedIndex("table_search_index_rebuild_new") + .existingAliases(Set.of("table", "table_search_index", "all", "dataAsset")) + .canonicalAliases("table") + .parentAliases(Set.of("all", "dataAsset")) + .build(); - new DefaultRecreateHandler().finalizeReindex(context, true); + new DefaultRecreateHandler().finalizeReindex(entityReindexContext, true); } assertTrue(aliasState.deletedIndices.contains("table_search_index_rebuild_old1"));