Fix search index issue due to zero runtime changes (#23604)

* Remove new rebuild indexes

* Fix Reindex for vector embedding
This commit is contained in:
Mohit Yadav 2025-09-29 18:06:24 +05:30 committed by GitHub
parent 92c30f2a0a
commit 5de12a8cef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 29 additions and 5 deletions

View File

@ -1,6 +1,7 @@
package org.openmetadata.service.apps.bundles.searchIndex; package org.openmetadata.service.apps.bundles.searchIndex;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.RECREATE_CONTEXT;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.TARGET_INDEX_KEY; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.TARGET_INDEX_KEY;
import java.util.List; import java.util.List;
@ -16,6 +17,7 @@ import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.search.IndexMapping; import org.openmetadata.search.IndexMapping;
import org.openmetadata.service.Entity; import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.SearchIndexException; import org.openmetadata.service.exception.SearchIndexException;
import org.openmetadata.service.search.RecreateIndexHandler;
import org.openmetadata.service.search.SearchRepository; import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.search.opensearch.OpenSearchClient; import org.openmetadata.service.search.opensearch.OpenSearchClient;
import os.org.opensearch.action.bulk.BackoffPolicy; import os.org.opensearch.action.bulk.BackoffPolicy;
@ -215,7 +217,14 @@ public class OpenSearchBulkSink implements BulkSink {
} else { } else {
List<EntityInterface> entityInterfaces = (List<EntityInterface>) entities; List<EntityInterface> entityInterfaces = (List<EntityInterface>) entities;
for (EntityInterface entity : entityInterfaces) { for (EntityInterface entity : entityInterfaces) {
addEntity(entity, indexName, recreateIndex, embeddingsEnabled); addEntity(
entity,
indexName,
recreateIndex,
(contextData.containsKey(RECREATE_CONTEXT)
? (RecreateIndexHandler.ReindexContext) contextData.get(RECREATE_CONTEXT)
: null),
embeddingsEnabled);
} }
} }
} catch (Exception e) { } catch (Exception e) {
@ -235,7 +244,11 @@ public class OpenSearchBulkSink implements BulkSink {
} }
private void addEntity( private void addEntity(
EntityInterface entity, String indexName, boolean recreateIndex, boolean embeddingsEnabled) { EntityInterface entity,
String indexName,
boolean recreateIndex,
RecreateIndexHandler.ReindexContext reindexContext,
boolean embeddingsEnabled) {
// Build the search index document using the proper transformation // Build the search index document using the proper transformation
String entityType = Entity.getEntityTypeFromObject(entity); String entityType = Entity.getEntityTypeFromObject(entity);
Object searchIndexDoc = Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc(); Object searchIndexDoc = Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc();
@ -256,7 +269,7 @@ public class OpenSearchBulkSink implements BulkSink {
// If embeddings are enabled, also index to vector_search_index // If embeddings are enabled, also index to vector_search_index
if (embeddingsEnabled) { if (embeddingsEnabled) {
addEntityToVectorIndex(bulkProcessor, entity, recreateIndex); addEntityToVectorIndex(bulkProcessor, entity, recreateIndex, reindexContext);
} }
} }
@ -353,5 +366,8 @@ public class OpenSearchBulkSink implements BulkSink {
* This method will only be called when embeddings are enabled for the entity type. * This method will only be called when embeddings are enabled for the entity type.
*/ */
protected void addEntityToVectorIndex( protected void addEntityToVectorIndex(
BulkProcessor bulkProcessor, EntityInterface entity, boolean recreateIndex) {} BulkProcessor bulkProcessor,
EntityInterface entity,
boolean recreateIndex,
RecreateIndexHandler.ReindexContext reindexContext) {}
} }

View File

@ -10,6 +10,7 @@ import static org.openmetadata.service.apps.scheduler.OmAppJobListener.APP_CONFI
import static org.openmetadata.service.apps.scheduler.OmAppJobListener.APP_RUN_STATS; import static org.openmetadata.service.apps.scheduler.OmAppJobListener.APP_RUN_STATS;
import static org.openmetadata.service.apps.scheduler.OmAppJobListener.WEBSOCKET_STATUS_CHANNEL; import static org.openmetadata.service.apps.scheduler.OmAppJobListener.WEBSOCKET_STATUS_CHANNEL;
import static org.openmetadata.service.socket.WebSocketManager.SEARCH_INDEX_JOB_BROADCAST_CHANNEL; import static org.openmetadata.service.socket.WebSocketManager.SEARCH_INDEX_JOB_BROADCAST_CHANNEL;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.RECREATE_CONTEXT;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.TARGET_INDEX_KEY; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.TARGET_INDEX_KEY;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isDataInsightIndex; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isDataInsightIndex;
@ -1554,6 +1555,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
Map<String, Object> contextData = new HashMap<>(); Map<String, Object> contextData = new HashMap<>();
contextData.put(ENTITY_TYPE_KEY, entityType); contextData.put(ENTITY_TYPE_KEY, entityType);
contextData.put(RECREATE_INDEX, jobData.getRecreateIndex()); contextData.put(RECREATE_INDEX, jobData.getRecreateIndex());
contextData.put(RECREATE_CONTEXT, recreateContext);
getTargetIndexForEntity(entityType) getTargetIndexForEntity(entityType)
.ifPresent(index -> contextData.put(TARGET_INDEX_KEY, index)); .ifPresent(index -> contextData.put(TARGET_INDEX_KEY, index));
return contextData; return contextData;

View File

@ -144,6 +144,11 @@ public class DefaultRecreateHandler implements RecreateIndexHandler {
alias, alias,
target, target,
entityType); entityType);
if (searchClient.indexExists(target)) {
searchClient.deleteIndex(target);
LOG.debug("Replaced old index '{}' for entity '{}'.", target, entityType);
}
} }
} }

View File

@ -51,6 +51,7 @@ public class ReindexingUtil {
public static final String ENTITY_NAME_LIST_KEY = "entityNameList"; public static final String ENTITY_NAME_LIST_KEY = "entityNameList";
public static final String TIMESTAMP_KEY = "@timestamp"; public static final String TIMESTAMP_KEY = "@timestamp";
public static final String TARGET_INDEX_KEY = "targetIndex"; public static final String TARGET_INDEX_KEY = "targetIndex";
public static final String RECREATE_CONTEXT = "recreateContext";
public static void getUpdatedStats(StepStats stats, int currentSuccess, int currentFailed) { public static void getUpdatedStats(StepStats stats, int currentSuccess, int currentFailed) {
stats.setSuccessRecords(stats.getSuccessRecords() + currentSuccess); stats.setSuccessRecords(stats.getSuccessRecords() + currentSuccess);

View File

@ -99,6 +99,6 @@ class OpenSearchBulkSinkSimpleTest {
void testAddEntityToVectorIndex() { void testAddEntityToVectorIndex() {
// Test default implementation does nothing (no exception thrown) // Test default implementation does nothing (no exception thrown)
// This should not throw any exception as the default implementation is empty // This should not throw any exception as the default implementation is empty
openSearchBulkSink.addEntityToVectorIndex(null, null, true); openSearchBulkSink.addEntityToVectorIndex(null, null, true, null);
} }
} }