diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSink.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSink.java index da1bc61648a..08665994fb1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSink.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSink.java @@ -1,6 +1,7 @@ package org.openmetadata.service.apps.bundles.searchIndex; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY; +import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.RECREATE_CONTEXT; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.TARGET_INDEX_KEY; import java.util.List; @@ -16,6 +17,7 @@ import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.search.IndexMapping; import org.openmetadata.service.Entity; import org.openmetadata.service.exception.SearchIndexException; +import org.openmetadata.service.search.RecreateIndexHandler; import org.openmetadata.service.search.SearchRepository; import org.openmetadata.service.search.opensearch.OpenSearchClient; import os.org.opensearch.action.bulk.BackoffPolicy; @@ -215,7 +217,14 @@ public class OpenSearchBulkSink implements BulkSink { } else { List entityInterfaces = (List) entities; for (EntityInterface entity : entityInterfaces) { - addEntity(entity, indexName, recreateIndex, embeddingsEnabled); + addEntity( + entity, + indexName, + recreateIndex, + (contextData.containsKey(RECREATE_CONTEXT) + ? (RecreateIndexHandler.ReindexContext) contextData.get(RECREATE_CONTEXT) + : null), + embeddingsEnabled); } } } catch (Exception e) { @@ -235,7 +244,11 @@ public class OpenSearchBulkSink implements BulkSink { } private void addEntity( - EntityInterface entity, String indexName, boolean recreateIndex, boolean embeddingsEnabled) { + EntityInterface entity, + String indexName, + boolean recreateIndex, + RecreateIndexHandler.ReindexContext reindexContext, + boolean embeddingsEnabled) { // Build the search index document using the proper transformation String entityType = Entity.getEntityTypeFromObject(entity); Object searchIndexDoc = Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc(); @@ -256,7 +269,7 @@ public class OpenSearchBulkSink implements BulkSink { // If embeddings are enabled, also index to vector_search_index if (embeddingsEnabled) { - addEntityToVectorIndex(bulkProcessor, entity, recreateIndex); + addEntityToVectorIndex(bulkProcessor, entity, recreateIndex, reindexContext); } } @@ -353,5 +366,8 @@ public class OpenSearchBulkSink implements BulkSink { * This method will only be called when embeddings are enabled for the entity type. */ protected void addEntityToVectorIndex( - BulkProcessor bulkProcessor, EntityInterface entity, boolean recreateIndex) {} + BulkProcessor bulkProcessor, + EntityInterface entity, + boolean recreateIndex, + RecreateIndexHandler.ReindexContext reindexContext) {} } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java index 5ffe32cd86f..2b799f786e8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java @@ -10,6 +10,7 @@ import static org.openmetadata.service.apps.scheduler.OmAppJobListener.APP_CONFI import static org.openmetadata.service.apps.scheduler.OmAppJobListener.APP_RUN_STATS; import static org.openmetadata.service.apps.scheduler.OmAppJobListener.WEBSOCKET_STATUS_CHANNEL; import static org.openmetadata.service.socket.WebSocketManager.SEARCH_INDEX_JOB_BROADCAST_CHANNEL; +import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.RECREATE_CONTEXT; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.TARGET_INDEX_KEY; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isDataInsightIndex; @@ -1554,6 +1555,7 @@ public class SearchIndexApp extends AbstractNativeApplication { Map contextData = new HashMap<>(); contextData.put(ENTITY_TYPE_KEY, entityType); contextData.put(RECREATE_INDEX, jobData.getRecreateIndex()); + contextData.put(RECREATE_CONTEXT, recreateContext); getTargetIndexForEntity(entityType) .ifPresent(index -> contextData.put(TARGET_INDEX_KEY, index)); return contextData; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java index 561b55180f0..974036af36c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java @@ -144,6 +144,11 @@ public class DefaultRecreateHandler implements RecreateIndexHandler { alias, target, entityType); + + if (searchClient.indexExists(target)) { + searchClient.deleteIndex(target); + LOG.debug("Replaced old index '{}' for entity '{}'.", target, entityType); + } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java index 71491a7eb4b..ed4c4a9e0a4 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java @@ -51,6 +51,7 @@ public class ReindexingUtil { public static final String ENTITY_NAME_LIST_KEY = "entityNameList"; public static final String TIMESTAMP_KEY = "@timestamp"; public static final String TARGET_INDEX_KEY = "targetIndex"; + public static final String RECREATE_CONTEXT = "recreateContext"; public static void getUpdatedStats(StepStats stats, int currentSuccess, int currentFailed) { stats.setSuccessRecords(stats.getSuccessRecords() + currentSuccess); diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSinkSimpleTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSinkSimpleTest.java index 9565d78018c..0a494e6afd2 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSinkSimpleTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSinkSimpleTest.java @@ -99,6 +99,6 @@ class OpenSearchBulkSinkSimpleTest { void testAddEntityToVectorIndex() { // Test default implementation does nothing (no exception thrown) // This should not throw any exception as the default implementation is empty - openSearchBulkSink.addEntityToVectorIndex(null, null, true); + openSearchBulkSink.addEntityToVectorIndex(null, null, true, null); } }