diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSink.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSink.java index dc93f11b894..bc38f889002 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSink.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSink.java @@ -38,7 +38,7 @@ import org.openmetadata.service.search.elasticsearch.ElasticSearchClient; public class ElasticSearchBulkSink implements BulkSink { private final ElasticSearchClient searchClient; - private final SearchRepository searchRepository; + protected final SearchRepository searchRepository; private final BulkProcessor bulkProcessor; private final StepStats stats = new StepStats(); @@ -191,6 +191,9 @@ public class ElasticSearchBulkSink implements BulkSink { Boolean recreateIndex = (Boolean) contextData.getOrDefault("recreateIndex", false); + // Check if embeddings are enabled for this specific entity type + boolean embeddingsEnabled = isVectorEmbeddingEnabledForEntity(entityType); + IndexMapping indexMapping = searchRepository.getIndexMapping(entityType); if (indexMapping == null) { LOG.debug("No index mapping found for entityType '{}'. Skipping indexing.", entityType); @@ -208,7 +211,7 @@ public class ElasticSearchBulkSink implements BulkSink { } else { List entityInterfaces = (List) entities; for (EntityInterface entity : entityInterfaces) { - addEntity(entity, indexName, recreateIndex); + addEntity(entity, indexName, recreateIndex, embeddingsEnabled); } } } catch (Exception e) { @@ -227,7 +230,8 @@ public class ElasticSearchBulkSink implements BulkSink { } } - private void addEntity(EntityInterface entity, String indexName, boolean recreateIndex) { + private void addEntity( + EntityInterface entity, String indexName, boolean recreateIndex, boolean embeddingsEnabled) { // Build the search index document using the proper transformation String entityType = Entity.getEntityTypeFromObject(entity); Object searchIndexDoc = Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc(); @@ -245,6 +249,11 @@ public class ElasticSearchBulkSink implements BulkSink { updateRequest.docAsUpsert(true); bulkProcessor.add(updateRequest); } + + // If embeddings are enabled, also index to vector_search_index + if (embeddingsEnabled) { + addEntityToVectorIndex(bulkProcessor, entity, recreateIndex); + } } private void addTimeSeriesEntity(EntityTimeSeriesInterface entity, String indexName) { @@ -324,4 +333,19 @@ public class ElasticSearchBulkSink implements BulkSink { this.maxConcurrentRequests = concurrentRequests; LOG.info("Concurrent requests updated to: {}", concurrentRequests); } + + /** + * Checks if vector embeddings are enabled for a specific entity type. + * This combines SearchRepository capability check with job configuration. + */ + protected boolean isVectorEmbeddingEnabledForEntity(String entityType) { + return false; + } + + /** + * Adds entity to vector_search_index for embedding search. + * This method will only be called when embeddings are enabled for the entity type. + */ + protected void addEntityToVectorIndex( + BulkProcessor bulkProcessor, EntityInterface entity, boolean recreateIndex) {} } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSink.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSink.java index dd554db92e0..4dc3e7ad381 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSink.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSink.java @@ -38,7 +38,7 @@ import os.org.opensearch.common.xcontent.XContentType; public class OpenSearchBulkSink implements BulkSink { private final OpenSearchClient searchClient; - private final SearchRepository searchRepository; + protected final SearchRepository searchRepository; private final BulkProcessor bulkProcessor; private final StepStats stats = new StepStats(); @@ -191,6 +191,9 @@ public class OpenSearchBulkSink implements BulkSink { Boolean recreateIndex = (Boolean) contextData.getOrDefault("recreateIndex", false); + // Check if embeddings are enabled for this specific entity type + boolean embeddingsEnabled = isVectorEmbeddingEnabledForEntity(entityType); + IndexMapping indexMapping = searchRepository.getIndexMapping(entityType); if (indexMapping == null) { LOG.debug("No index mapping found for entityType '{}'. Skipping indexing.", entityType); @@ -208,7 +211,7 @@ public class OpenSearchBulkSink implements BulkSink { } else { List entityInterfaces = (List) entities; for (EntityInterface entity : entityInterfaces) { - addEntity(entity, indexName, recreateIndex); + addEntity(entity, indexName, recreateIndex, embeddingsEnabled); } } } catch (Exception e) { @@ -227,7 +230,8 @@ public class OpenSearchBulkSink implements BulkSink { } } - private void addEntity(EntityInterface entity, String indexName, boolean recreateIndex) { + private void addEntity( + EntityInterface entity, String indexName, boolean recreateIndex, boolean embeddingsEnabled) { // Build the search index document using the proper transformation String entityType = Entity.getEntityTypeFromObject(entity); Object searchIndexDoc = Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc(); @@ -245,6 +249,11 @@ public class OpenSearchBulkSink implements BulkSink { updateRequest.docAsUpsert(true); bulkProcessor.add(updateRequest); } + + // If embeddings are enabled, also index to vector_search_index + if (embeddingsEnabled) { + addEntityToVectorIndex(bulkProcessor, entity, recreateIndex); + } } private void addTimeSeriesEntity(EntityTimeSeriesInterface entity, String indexName) { @@ -324,4 +333,19 @@ public class OpenSearchBulkSink implements BulkSink { this.maxConcurrentRequests = concurrentRequests; LOG.info("Concurrent requests updated to: {}", concurrentRequests); } + + /** + * Checks if vector embeddings are enabled for a specific entity type. + * This combines SearchRepository capability check with job configuration. + */ + protected boolean isVectorEmbeddingEnabledForEntity(String entityType) { + return false; + } + + /** + * Adds entity to vector_search_index for embedding search. + * This method will only be called when embeddings are enabled for the entity type. + */ + protected void addEntityToVectorIndex( + BulkProcessor bulkProcessor, EntityInterface entity, boolean recreateIndex) {} } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java index 7b5b5819a8f..259ca3f5ce4 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java @@ -47,7 +47,6 @@ import org.openmetadata.schema.system.IndexingError; import org.openmetadata.schema.system.Stats; import org.openmetadata.schema.system.StepStats; import org.openmetadata.schema.utils.JsonUtils; -import org.openmetadata.search.IndexMapping; import org.openmetadata.service.Entity; import org.openmetadata.service.apps.AbstractNativeApplication; import org.openmetadata.service.exception.AppException; @@ -57,6 +56,7 @@ import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.jdbi3.EntityTimeSeriesRepository; import org.openmetadata.service.jdbi3.ListFilter; import org.openmetadata.service.jdbi3.SystemRepository; +import org.openmetadata.service.search.RecreateIndexHandler; import org.openmetadata.service.search.SearchClusterMetrics; import org.openmetadata.service.search.SearchRepository; import org.openmetadata.service.socket.WebSocketManager; @@ -91,6 +91,7 @@ public class SearchIndexApp extends AbstractNativeApplication { QUERY_COST_RECORD); private BulkSink searchIndexSink; + private RecreateIndexHandler recreateIndexHandler; @Getter private EventPublisherJob jobData; private ExecutorService producerExecutor; @@ -365,23 +366,17 @@ public class SearchIndexApp extends AbstractNativeApplication { ElasticSearchConfiguration.SearchType searchType = searchRepository.getSearchType(); jobLogger.addInitDetail("Search type", searchType); - if (searchType.equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) { - this.searchIndexSink = - new OpenSearchBulkSink( - searchRepository, - jobData.getBatchSize(), - jobData.getMaxConcurrentRequests(), - jobData.getPayLoadSize()); - LOG.debug("Initialized OpenSearchBulkSink with batch size: {}", jobData.getBatchSize()); - } else { - this.searchIndexSink = - new ElasticSearchBulkSink( - searchRepository, - jobData.getBatchSize(), - jobData.getMaxConcurrentRequests(), - jobData.getPayLoadSize()); - LOG.debug("Initialized ElasticSearchBulkSink with batch size: {}", jobData.getBatchSize()); - } + this.searchIndexSink = + searchRepository.createBulkSink( + jobData.getBatchSize(), jobData.getMaxConcurrentRequests(), jobData.getPayLoadSize()); + LOG.debug( + "Initialized {} with batch size: {}", + searchIndexSink.getClass().getSimpleName(), + jobData.getBatchSize()); + + this.recreateIndexHandler = searchRepository.createReindexHandler(); + LOG.debug( + "Initialized {} for reindex cleanup", recreateIndexHandler.getClass().getSimpleName()); return clusterMetrics; } @@ -884,21 +879,7 @@ public class SearchIndexApp extends AbstractNativeApplication { } private void reCreateIndexes(Set entities) { - for (String entityType : entities) { - if (Boolean.FALSE.equals(jobData.getRecreateIndex())) { - LOG.debug("RecreateIndex is false. Skipping index recreation for '{}'.", entityType); - return; - } - IndexMapping indexType = searchRepository.getIndexMapping(entityType); - if (indexType == null) { - LOG.warn( - "No index mapping found for entityType '{}'. Skipping index recreation.", entityType); - continue; - } - searchRepository.deleteIndex(indexType); - searchRepository.createIndex(indexType); - LOG.debug("Recreated index for entityType '{}'.", entityType); - } + recreateIndexHandler.reCreateIndexes(entities); } private Source createSource(String entityType) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java new file mode 100644 index 00000000000..0f0a17dc209 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java @@ -0,0 +1,29 @@ +package org.openmetadata.service.search; + +import java.util.Set; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.search.IndexMapping; +import org.openmetadata.service.Entity; + +/** + * Default implementation of RecreateHandler that provides basic logging. + * This can be overridden to provide more sophisticated cleanup operations. + */ +@Slf4j +public class DefaultRecreateHandler implements RecreateIndexHandler { + @Override + public void reCreateIndexes(Set entities) { + SearchRepository searchRepository = Entity.getSearchRepository(); + for (String entityType : entities) { + IndexMapping indexType = searchRepository.getIndexMapping(entityType); + if (indexType == null) { + LOG.warn( + "No index mapping found for entityType '{}'. Skipping index recreation.", entityType); + continue; + } + searchRepository.deleteIndex(indexType); + searchRepository.createIndex(indexType); + LOG.debug("Recreated index for entityType '{}'.", entityType); + } + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultSearchRepositoryProvider.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultSearchRepositoryProvider.java new file mode 100644 index 00000000000..db80053eb93 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultSearchRepositoryProvider.java @@ -0,0 +1,26 @@ +package org.openmetadata.service.search; + +import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration; + +/** + * Implementation of SearchRepositoryProvider. + * This is the default provider that creates standard OpenMetadata SearchRepository instances. + */ +public class DefaultSearchRepositoryProvider implements SearchRepositoryProvider { + + @Override + public SearchRepository createSearchRepository( + ElasticSearchConfiguration elasticSearchConfiguration, int maxSize) { + return new SearchRepository(elasticSearchConfiguration, maxSize); + } + + @Override + public int getPriority() { + return 0; + } + + @Override + public boolean isAvailable() { + return true; + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/RecreateIndexHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/RecreateIndexHandler.java new file mode 100644 index 00000000000..1f7e24a7c4b --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/RecreateIndexHandler.java @@ -0,0 +1,11 @@ +package org.openmetadata.service.search; + +import java.util.Set; + +/** + * Interface for cleaning up resources during reindexing operations. + * This allows for different implementations to be provided for different deployment tiers. + */ +public interface RecreateIndexHandler { + void reCreateIndexes(Set entities); +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java index 6d4ddac52d0..fd0fd6bfbbc 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java @@ -34,7 +34,7 @@ import os.org.opensearch.action.bulk.BulkRequest; import os.org.opensearch.action.bulk.BulkResponse; import os.org.opensearch.client.RequestOptions; -public interface SearchClient { +public interface SearchClient { String UPSTREAM_LINEAGE_FIELD = "upstreamLineage"; String FQN_FIELD = "fullyQualifiedName"; ExecutorService asyncExecutor = Executors.newFixedThreadPool(1); @@ -447,6 +447,8 @@ public interface SearchClient { Object getClient(); + T getHighLevelClient(); + SearchHealthStatus getSearchHealthStatus() throws IOException; QueryCostSearchResult getQueryCostRecords(String serviceName) throws IOException; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java index 8289002f935..6d1bfd5e452 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java @@ -106,6 +106,9 @@ import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.search.IndexMapping; import org.openmetadata.search.IndexMappingLoader; import org.openmetadata.service.Entity; +import org.openmetadata.service.apps.bundles.searchIndex.BulkSink; +import org.openmetadata.service.apps.bundles.searchIndex.ElasticSearchBulkSink; +import org.openmetadata.service.apps.bundles.searchIndex.OpenSearchBulkSink; import org.openmetadata.service.events.lifecycle.EntityLifecycleEventDispatcher; import org.openmetadata.service.events.lifecycle.handlers.SearchIndexHandler; import org.openmetadata.service.jdbi3.EntityRepository; @@ -225,9 +228,8 @@ public class SearchRepository { } public void createIndexes() { - for (IndexMapping indexMapping : entityIndexMap.values()) { - createIndex(indexMapping); - } + RecreateIndexHandler recreateIndexHandler = this.createReindexHandler(); + recreateIndexHandler.reCreateIndexes(entityIndexMap.keySet()); } public void updateIndexes() { @@ -1420,4 +1422,40 @@ public class SearchRepository { LOG.error("Failed to initialize NLQ service", e); } } + + /** + * Creates a BulkSink instance with vector embedding configuration. + * This method can be overridden in subclasses to provide different implementations. + */ + public BulkSink createBulkSink( + int batchSize, int maxConcurrentRequests, long maxPayloadSizeBytes) { + ElasticSearchConfiguration.SearchType searchType = getSearchType(); + if (searchType.equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) { + return new OpenSearchBulkSink(this, batchSize, maxConcurrentRequests, maxPayloadSizeBytes); + } else { + return new ElasticSearchBulkSink(this, batchSize, maxConcurrentRequests, maxPayloadSizeBytes); + } + } + + /** + * Creates a ReindexHandler instance for recreate operations during reindexing. + * This method can be overridden in subclasses to provide different implementations. + */ + public RecreateIndexHandler createReindexHandler() { + return new DefaultRecreateHandler(); + } + + /** + * Checks if vector embedding is enabled. + * This method can be overridden in subclasses to provide different configurations. + */ + @SuppressWarnings("unused") + public boolean isVectorEmbeddingEnabled() { + return false; + } + + @SuppressWarnings("unused") + public T getHighLevelClient() { + return (T) searchClient.getHighLevelClient(); + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepositoryFactory.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepositoryFactory.java new file mode 100644 index 00000000000..1809800baca --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepositoryFactory.java @@ -0,0 +1,83 @@ +package org.openmetadata.service.search; + +import java.util.List; +import java.util.ServiceLoader; +import java.util.stream.StreamSupport; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration; + +/** + * Factory for creating SearchRepository instances using Service Provider Interface (SPI). + * This allows different implementations to be plugged in at runtime + * without hardcoded class names or reflection. + */ +@Slf4j +public class SearchRepositoryFactory { + + /** + * Creates a SearchRepository instance using the highest priority available provider. + * + * @param elasticSearchConfiguration The Elasticsearch configuration + * @param maxSize The maximum size parameter + * @return SearchRepository instance from the best available provider + */ + public static SearchRepository createSearchRepository( + ElasticSearchConfiguration elasticSearchConfiguration, int maxSize) { + + SearchRepositoryProvider provider = findBestProvider(); + + LOG.info( + "Using SearchRepository provider: {} (priority: {})", + provider.getClass().getSimpleName(), + provider.getPriority()); + + return provider.createSearchRepository(elasticSearchConfiguration, maxSize); + } + + /** + * Finds the best available SearchRepositoryProvider using SPI. + * Returns the provider with the highest priority that is available. + */ + private static SearchRepositoryProvider findBestProvider() { + ServiceLoader loader = + ServiceLoader.load(SearchRepositoryProvider.class); + + List availableProviders = + StreamSupport.stream(loader.spliterator(), false) + .filter(SearchRepositoryProvider::isAvailable) + .sorted((p1, p2) -> Integer.compare(p2.getPriority(), p1.getPriority())) + .toList(); + + if (availableProviders.isEmpty()) { + LOG.warn( + "No SearchRepositoryProvider found via SPI, falling back to default OSS implementation"); + return new DefaultSearchRepositoryProvider(); + } + + SearchRepositoryProvider bestProvider = availableProviders.getFirst(); + + if (availableProviders.size() > 1) { + LOG.info( + "Multiple SearchRepository providers available, selected: {} with priority: {}", + bestProvider.getClass().getSimpleName(), + bestProvider.getPriority()); + } + + return bestProvider; + } + + /** + * Gets all available SearchRepository providers for debugging/inspection. + * + * @return List of all available providers + */ + public static List getAvailableProviders() { + ServiceLoader loader = + ServiceLoader.load(SearchRepositoryProvider.class); + + return StreamSupport.stream(loader.spliterator(), false) + .filter(SearchRepositoryProvider::isAvailable) + .sorted((p1, p2) -> Integer.compare(p2.getPriority(), p1.getPriority())) + .toList(); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepositoryProvider.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepositoryProvider.java new file mode 100644 index 00000000000..6efcfda42ec --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepositoryProvider.java @@ -0,0 +1,36 @@ +package org.openmetadata.service.search; + +import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration; + +/** + * Service Provider Interface for creating SearchRepository instances. + */ +public interface SearchRepositoryProvider { + + /** + * Creates a SearchRepository instance. + * + * @param elasticSearchConfiguration The Elasticsearch configuration + * @param maxSize The maximum size parameter + * @return SearchRepository instance + */ + SearchRepository createSearchRepository( + ElasticSearchConfiguration elasticSearchConfiguration, int maxSize); + + /** + * Returns the priority of this provider. + * Higher priority providers will be selected first. + * OSS should return 0, Paid versions should return higher values. + * + * @return priority value + */ + int getPriority(); + + /** + * Returns true if this provider is available/applicable. + * This allows providers to check for required dependencies, licenses, etc. + * + * @return true if this provider can be used + */ + boolean isAvailable(); +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java index 8e67a0051a7..480e272ff7c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java @@ -193,7 +193,7 @@ import org.openmetadata.service.util.FullyQualifiedName; import org.openmetadata.service.workflows.searchIndex.ReindexingUtil; @Slf4j -public class ElasticSearchClient implements SearchClient { +public class ElasticSearchClient implements SearchClient { @SuppressWarnings("deprecated") @Getter @@ -2453,6 +2453,11 @@ public class ElasticSearchClient implements SearchClient { return client.getLowLevelClient(); } + @Override + public RestHighLevelClient getHighLevelClient() { + return client; + } + @Override public SearchHealthStatus getSearchHealthStatus() throws IOException { ClusterHealthRequest request = new ClusterHealthRequest(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java index d946be536eb..61b755ace7c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java @@ -216,7 +216,7 @@ import os.org.opensearch.search.sort.SortOrder; @Slf4j // Not tagged with Repository annotation as it is programmatically initialized -public class OpenSearchClient implements SearchClient { +public class OpenSearchClient implements SearchClient { @Getter protected final RestHighLevelClient client; private final boolean isClientAvailable; private final RBACConditionEvaluator rbacConditionEvaluator; @@ -2583,6 +2583,11 @@ public class OpenSearchClient implements SearchClient { return client.getLowLevelClient(); } + @Override + public RestHighLevelClient getHighLevelClient() { + return client; + } + private void buildSearchRBACQuery( SubjectContext subjectContext, SearchSourceBuilder searchSourceBuilder) { if (shouldApplyRbacConditions(subjectContext, rbacConditionEvaluator)) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java index b654a0c5d77..56f7b36457c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java @@ -100,6 +100,7 @@ import org.openmetadata.service.resources.apps.AppMarketPlaceMapper; import org.openmetadata.service.resources.databases.DatasourceConfig; import org.openmetadata.service.search.IndexMappingVersionTracker; import org.openmetadata.service.search.SearchRepository; +import org.openmetadata.service.search.SearchRepositoryFactory; import org.openmetadata.service.secrets.SecretsManager; import org.openmetadata.service.secrets.SecretsManagerFactory; import org.openmetadata.service.secrets.SecretsManagerUpdateService; @@ -1435,7 +1436,7 @@ public class OpenMetadataOperations implements Callable { jdbi = JdbiUtils.createAndSetupJDBI(dataSourceFactory); searchRepository = - new SearchRepository( + SearchRepositoryFactory.createSearchRepository( config.getElasticSearchConfiguration(), config.getDataSourceFactory().getMaxSize()); // Initialize secrets manager diff --git a/openmetadata-service/src/main/resources/META-INF/services/org.openmetadata.service.search.SearchRepositoryProvider b/openmetadata-service/src/main/resources/META-INF/services/org.openmetadata.service.search.SearchRepositoryProvider new file mode 100644 index 00000000000..03d56e3895e --- /dev/null +++ b/openmetadata-service/src/main/resources/META-INF/services/org.openmetadata.service.search.SearchRepositoryProvider @@ -0,0 +1 @@ +org.openmetadata.service.search.DefaultSearchRepositoryProvider \ No newline at end of file diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/OpenMetadataApplicationTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/OpenMetadataApplicationTest.java index a23797ac028..fa18fc6bcd4 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/OpenMetadataApplicationTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/OpenMetadataApplicationTest.java @@ -65,6 +65,7 @@ import org.openmetadata.service.resources.events.SlackCallbackResource; import org.openmetadata.service.resources.events.WebhookCallbackResource; import org.openmetadata.service.resources.settings.SettingsCache; import org.openmetadata.service.search.SearchRepository; +import org.openmetadata.service.search.SearchRepositoryFactory; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.JdbcDatabaseContainer; import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; @@ -339,7 +340,9 @@ public abstract class OpenMetadataApplicationTest { private void createIndices() { ElasticSearchConfiguration esConfig = getEsConfig(); - SearchRepository searchRepository = new SearchRepository(esConfig, 50); + SearchRepository searchRepository = + SearchRepositoryFactory.createSearchRepository(esConfig, 50); + Entity.setSearchRepository(searchRepository); LOG.info("creating indexes."); searchRepository.createIndexes(); } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSinkSimpleTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSinkSimpleTest.java index 758f0e35e52..2d7a89557e6 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSinkSimpleTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSinkSimpleTest.java @@ -80,4 +80,24 @@ class ElasticSearchBulkSinkSimpleTest { recreateIndex = (Boolean) contextData.getOrDefault("recreateIndex", false); assertEquals(false, recreateIndex); } + + @Test + void testIsVectorEmbeddingEnabledForEntity() { + // Test default implementation returns false + boolean result = elasticSearchBulkSink.isVectorEmbeddingEnabledForEntity("table"); + assertEquals(false, result); + + result = elasticSearchBulkSink.isVectorEmbeddingEnabledForEntity("user"); + assertEquals(false, result); + + result = elasticSearchBulkSink.isVectorEmbeddingEnabledForEntity("dashboard"); + assertEquals(false, result); + } + + @Test + void testAddEntityToVectorIndex() { + // Test default implementation does nothing (no exception thrown) + // This should not throw any exception as the default implementation is empty + elasticSearchBulkSink.addEntityToVectorIndex(null, null, true); + } } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSinkSimpleTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSinkSimpleTest.java index 5a06ae708a5..9565d78018c 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSinkSimpleTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSinkSimpleTest.java @@ -81,4 +81,24 @@ class OpenSearchBulkSinkSimpleTest { recreateIndex = (Boolean) contextData.getOrDefault("recreateIndex", false); assertEquals(false, recreateIndex); } + + @Test + void testIsVectorEmbeddingEnabledForEntity() { + // Test default implementation returns false + boolean result = openSearchBulkSink.isVectorEmbeddingEnabledForEntity("table"); + assertEquals(false, result); + + result = openSearchBulkSink.isVectorEmbeddingEnabledForEntity("user"); + assertEquals(false, result); + + result = openSearchBulkSink.isVectorEmbeddingEnabledForEntity("dashboard"); + assertEquals(false, result); + } + + @Test + void testAddEntityToVectorIndex() { + // Test default implementation does nothing (no exception thrown) + // This should not throw any exception as the default implementation is empty + openSearchBulkSink.addEntityToVectorIndex(null, null, true); + } } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexAppTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexAppTest.java index 1d42471a59a..cb1bfb1470f 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexAppTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexAppTest.java @@ -36,6 +36,7 @@ import org.openmetadata.schema.entity.app.App; import org.openmetadata.schema.entity.app.AppRunRecord; import org.openmetadata.schema.entity.app.FailureContext; import org.openmetadata.schema.entity.app.SuccessContext; +import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration; import org.openmetadata.schema.system.EntityError; import org.openmetadata.schema.system.EventPublisherJob; import org.openmetadata.schema.system.IndexingError; @@ -556,4 +557,48 @@ public class SearchIndexAppTest extends OpenMetadataApplicationTest { resultJobData.getMaxConcurrentRequests() > 0, "Concurrent requests should be positive"); assertTrue(resultJobData.getProducerThreads() > 0, "Producer threads should be positive"); } + + @Test + void testSearchIndexSinkInitializationWithElasticSearch() { + App testApp = + new App() + .withName("SearchIndexingApplication") + .withAppConfiguration(JsonUtils.convertValue(testJobData, Object.class)); + + lenient() + .when(searchRepository.getSearchType()) + .thenReturn(ElasticSearchConfiguration.SearchType.ELASTICSEARCH); + + lenient().when(searchRepository.createBulkSink(5, 10, 1000000L)).thenReturn(mockSink); + + assertDoesNotThrow(() -> searchIndexApp.init(testApp)); + + EventPublisherJob jobData = searchIndexApp.getJobData(); + assertNotNull(jobData); + assertEquals(5, jobData.getBatchSize()); + assertEquals(10, jobData.getMaxConcurrentRequests()); + assertEquals(1000000L, jobData.getPayLoadSize()); + } + + @Test + void testSearchIndexSinkInitializationWithOpenSearch() { + App testApp = + new App() + .withName("SearchIndexingApplication") + .withAppConfiguration(JsonUtils.convertValue(testJobData, Object.class)); + + lenient() + .when(searchRepository.getSearchType()) + .thenReturn(ElasticSearchConfiguration.SearchType.OPENSEARCH); + + lenient().when(searchRepository.createBulkSink(5, 10, 1000000L)).thenReturn(mockSink); + + assertDoesNotThrow(() -> searchIndexApp.init(testApp)); + + EventPublisherJob jobData = searchIndexApp.getJobData(); + assertNotNull(jobData); + assertEquals(5, jobData.getBatchSize()); + assertEquals(10, jobData.getMaxConcurrentRequests()); + assertEquals(1000000L, jobData.getPayLoadSize()); + } } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/search/SearchRepositoryTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/search/SearchRepositoryTest.java new file mode 100644 index 00000000000..14321c6eb68 --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/search/SearchRepositoryTest.java @@ -0,0 +1,97 @@ +package org.openmetadata.service.search; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration; +import org.openmetadata.service.apps.bundles.searchIndex.BulkSink; +import org.openmetadata.service.apps.bundles.searchIndex.ElasticSearchBulkSink; +import org.openmetadata.service.apps.bundles.searchIndex.OpenSearchBulkSink; + +@ExtendWith(MockitoExtension.class) +class SearchRepositoryTest { + + @Mock private SearchRepository searchRepository; + + @BeforeEach + void setUp() { + // Create a real instance for testing the new methods + searchRepository = mock(SearchRepository.class); + + // Enable calling real methods for the methods we want to test + lenient().when(searchRepository.createBulkSink(10, 2, 1000000L)).thenCallRealMethod(); + lenient().when(searchRepository.isVectorEmbeddingEnabled()).thenCallRealMethod(); + } + + @Test + void testCreateBulkSinkForElasticSearch() { + // Mock SearchRepository to return ElasticSearch type + lenient() + .when(searchRepository.getSearchType()) + .thenReturn(ElasticSearchConfiguration.SearchType.ELASTICSEARCH); + + BulkSink bulkSink = searchRepository.createBulkSink(10, 2, 1000000L); + + assertNotNull(bulkSink); + assertInstanceOf(ElasticSearchBulkSink.class, bulkSink); + } + + @Test + void testCreateBulkSinkForOpenSearch() { + // Mock SearchRepository to return OpenSearch type + lenient() + .when(searchRepository.getSearchType()) + .thenReturn(ElasticSearchConfiguration.SearchType.OPENSEARCH); + + BulkSink bulkSink = searchRepository.createBulkSink(10, 2, 1000000L); + + assertNotNull(bulkSink); + assertInstanceOf(OpenSearchBulkSink.class, bulkSink); + } + + @Test + void testCreateBulkSinkWithDifferentParameters() { + // Test with different parameter values + lenient() + .when(searchRepository.getSearchType()) + .thenReturn(ElasticSearchConfiguration.SearchType.ELASTICSEARCH); + + BulkSink bulkSink1 = searchRepository.createBulkSink(50, 5, 5000000L); + assertNotNull(bulkSink1); + assertInstanceOf(ElasticSearchBulkSink.class, bulkSink1); + + BulkSink bulkSink2 = searchRepository.createBulkSink(100, 10, 10000000L); + assertNotNull(bulkSink2); + assertInstanceOf(ElasticSearchBulkSink.class, bulkSink2); + } + + @Test + void testIsVectorEmbeddingEnabled() { + // Test default implementation returns false + boolean result = searchRepository.isVectorEmbeddingEnabled(); + assertFalse(result); + } + + @Test + void testCreateBulkSinkParameterValidation() { + lenient() + .when(searchRepository.getSearchType()) + .thenReturn(ElasticSearchConfiguration.SearchType.ELASTICSEARCH); + + // Test with minimum values + BulkSink bulkSink1 = searchRepository.createBulkSink(1, 1, 1L); + assertNotNull(bulkSink1); + + // Test with large values + BulkSink bulkSink2 = searchRepository.createBulkSink(1000, 100, 100000000L); + assertNotNull(bulkSink2); + } +}