Prepare OSS for Vector Embedding via SearchIndexApp (#22482)

* Prepare OSS for Vector Embedding

* Add Bulk Processor and Recreate to addEntityToVectorIndex

* Add RestHighLevelClient get to SearchClient

* Add ReindexHandler

* Adding SPI for SearchRepository

* Fix Failing Tests
This commit is contained in:
Mohit Yadav 2025-07-25 09:56:06 +05:30 committed by GitHub
parent 56267a9989
commit f519b44fad
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 498 additions and 47 deletions

View File

@ -38,7 +38,7 @@ import org.openmetadata.service.search.elasticsearch.ElasticSearchClient;
public class ElasticSearchBulkSink implements BulkSink {
private final ElasticSearchClient searchClient;
private final SearchRepository searchRepository;
protected final SearchRepository searchRepository;
private final BulkProcessor bulkProcessor;
private final StepStats stats = new StepStats();
@ -191,6 +191,9 @@ public class ElasticSearchBulkSink implements BulkSink {
Boolean recreateIndex = (Boolean) contextData.getOrDefault("recreateIndex", false);
// Check if embeddings are enabled for this specific entity type
boolean embeddingsEnabled = isVectorEmbeddingEnabledForEntity(entityType);
IndexMapping indexMapping = searchRepository.getIndexMapping(entityType);
if (indexMapping == null) {
LOG.debug("No index mapping found for entityType '{}'. Skipping indexing.", entityType);
@ -208,7 +211,7 @@ public class ElasticSearchBulkSink implements BulkSink {
} else {
List<EntityInterface> entityInterfaces = (List<EntityInterface>) entities;
for (EntityInterface entity : entityInterfaces) {
addEntity(entity, indexName, recreateIndex);
addEntity(entity, indexName, recreateIndex, embeddingsEnabled);
}
}
} catch (Exception e) {
@ -227,7 +230,8 @@ public class ElasticSearchBulkSink implements BulkSink {
}
}
private void addEntity(EntityInterface entity, String indexName, boolean recreateIndex) {
private void addEntity(
EntityInterface entity, String indexName, boolean recreateIndex, boolean embeddingsEnabled) {
// Build the search index document using the proper transformation
String entityType = Entity.getEntityTypeFromObject(entity);
Object searchIndexDoc = Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc();
@ -245,6 +249,11 @@ public class ElasticSearchBulkSink implements BulkSink {
updateRequest.docAsUpsert(true);
bulkProcessor.add(updateRequest);
}
// If embeddings are enabled, also index to vector_search_index
if (embeddingsEnabled) {
addEntityToVectorIndex(bulkProcessor, entity, recreateIndex);
}
}
private void addTimeSeriesEntity(EntityTimeSeriesInterface entity, String indexName) {
@ -324,4 +333,19 @@ public class ElasticSearchBulkSink implements BulkSink {
this.maxConcurrentRequests = concurrentRequests;
LOG.info("Concurrent requests updated to: {}", concurrentRequests);
}
/**
* Checks if vector embeddings are enabled for a specific entity type.
* This combines SearchRepository capability check with job configuration.
*/
protected boolean isVectorEmbeddingEnabledForEntity(String entityType) {
return false;
}
/**
* Adds entity to vector_search_index for embedding search.
* This method will only be called when embeddings are enabled for the entity type.
*/
protected void addEntityToVectorIndex(
BulkProcessor bulkProcessor, EntityInterface entity, boolean recreateIndex) {}
}

View File

@ -38,7 +38,7 @@ import os.org.opensearch.common.xcontent.XContentType;
public class OpenSearchBulkSink implements BulkSink {
private final OpenSearchClient searchClient;
private final SearchRepository searchRepository;
protected final SearchRepository searchRepository;
private final BulkProcessor bulkProcessor;
private final StepStats stats = new StepStats();
@ -191,6 +191,9 @@ public class OpenSearchBulkSink implements BulkSink {
Boolean recreateIndex = (Boolean) contextData.getOrDefault("recreateIndex", false);
// Check if embeddings are enabled for this specific entity type
boolean embeddingsEnabled = isVectorEmbeddingEnabledForEntity(entityType);
IndexMapping indexMapping = searchRepository.getIndexMapping(entityType);
if (indexMapping == null) {
LOG.debug("No index mapping found for entityType '{}'. Skipping indexing.", entityType);
@ -208,7 +211,7 @@ public class OpenSearchBulkSink implements BulkSink {
} else {
List<EntityInterface> entityInterfaces = (List<EntityInterface>) entities;
for (EntityInterface entity : entityInterfaces) {
addEntity(entity, indexName, recreateIndex);
addEntity(entity, indexName, recreateIndex, embeddingsEnabled);
}
}
} catch (Exception e) {
@ -227,7 +230,8 @@ public class OpenSearchBulkSink implements BulkSink {
}
}
private void addEntity(EntityInterface entity, String indexName, boolean recreateIndex) {
private void addEntity(
EntityInterface entity, String indexName, boolean recreateIndex, boolean embeddingsEnabled) {
// Build the search index document using the proper transformation
String entityType = Entity.getEntityTypeFromObject(entity);
Object searchIndexDoc = Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc();
@ -245,6 +249,11 @@ public class OpenSearchBulkSink implements BulkSink {
updateRequest.docAsUpsert(true);
bulkProcessor.add(updateRequest);
}
// If embeddings are enabled, also index to vector_search_index
if (embeddingsEnabled) {
addEntityToVectorIndex(bulkProcessor, entity, recreateIndex);
}
}
private void addTimeSeriesEntity(EntityTimeSeriesInterface entity, String indexName) {
@ -324,4 +333,19 @@ public class OpenSearchBulkSink implements BulkSink {
this.maxConcurrentRequests = concurrentRequests;
LOG.info("Concurrent requests updated to: {}", concurrentRequests);
}
/**
* Checks if vector embeddings are enabled for a specific entity type.
* This combines SearchRepository capability check with job configuration.
*/
protected boolean isVectorEmbeddingEnabledForEntity(String entityType) {
return false;
}
/**
* Adds entity to vector_search_index for embedding search.
* This method will only be called when embeddings are enabled for the entity type.
*/
protected void addEntityToVectorIndex(
BulkProcessor bulkProcessor, EntityInterface entity, boolean recreateIndex) {}
}

View File

@ -47,7 +47,6 @@ import org.openmetadata.schema.system.IndexingError;
import org.openmetadata.schema.system.Stats;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.search.IndexMapping;
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.AbstractNativeApplication;
import org.openmetadata.service.exception.AppException;
@ -57,6 +56,7 @@ import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.EntityTimeSeriesRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.jdbi3.SystemRepository;
import org.openmetadata.service.search.RecreateIndexHandler;
import org.openmetadata.service.search.SearchClusterMetrics;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.socket.WebSocketManager;
@ -91,6 +91,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
QUERY_COST_RECORD);
private BulkSink searchIndexSink;
private RecreateIndexHandler recreateIndexHandler;
@Getter private EventPublisherJob jobData;
private ExecutorService producerExecutor;
@ -365,23 +366,17 @@ public class SearchIndexApp extends AbstractNativeApplication {
ElasticSearchConfiguration.SearchType searchType = searchRepository.getSearchType();
jobLogger.addInitDetail("Search type", searchType);
if (searchType.equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) {
this.searchIndexSink =
new OpenSearchBulkSink(
searchRepository,
jobData.getBatchSize(),
jobData.getMaxConcurrentRequests(),
jobData.getPayLoadSize());
LOG.debug("Initialized OpenSearchBulkSink with batch size: {}", jobData.getBatchSize());
} else {
this.searchIndexSink =
new ElasticSearchBulkSink(
searchRepository,
jobData.getBatchSize(),
jobData.getMaxConcurrentRequests(),
jobData.getPayLoadSize());
LOG.debug("Initialized ElasticSearchBulkSink with batch size: {}", jobData.getBatchSize());
}
this.searchIndexSink =
searchRepository.createBulkSink(
jobData.getBatchSize(), jobData.getMaxConcurrentRequests(), jobData.getPayLoadSize());
LOG.debug(
"Initialized {} with batch size: {}",
searchIndexSink.getClass().getSimpleName(),
jobData.getBatchSize());
this.recreateIndexHandler = searchRepository.createReindexHandler();
LOG.debug(
"Initialized {} for reindex cleanup", recreateIndexHandler.getClass().getSimpleName());
return clusterMetrics;
}
@ -884,21 +879,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
}
private void reCreateIndexes(Set<String> entities) {
for (String entityType : entities) {
if (Boolean.FALSE.equals(jobData.getRecreateIndex())) {
LOG.debug("RecreateIndex is false. Skipping index recreation for '{}'.", entityType);
return;
}
IndexMapping indexType = searchRepository.getIndexMapping(entityType);
if (indexType == null) {
LOG.warn(
"No index mapping found for entityType '{}'. Skipping index recreation.", entityType);
continue;
}
searchRepository.deleteIndex(indexType);
searchRepository.createIndex(indexType);
LOG.debug("Recreated index for entityType '{}'.", entityType);
}
recreateIndexHandler.reCreateIndexes(entities);
}
private Source<?> createSource(String entityType) {

View File

@ -0,0 +1,29 @@
package org.openmetadata.service.search;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.search.IndexMapping;
import org.openmetadata.service.Entity;
/**
* Default implementation of RecreateHandler that provides basic logging.
* This can be overridden to provide more sophisticated cleanup operations.
*/
@Slf4j
public class DefaultRecreateHandler implements RecreateIndexHandler {
@Override
public void reCreateIndexes(Set<String> entities) {
SearchRepository searchRepository = Entity.getSearchRepository();
for (String entityType : entities) {
IndexMapping indexType = searchRepository.getIndexMapping(entityType);
if (indexType == null) {
LOG.warn(
"No index mapping found for entityType '{}'. Skipping index recreation.", entityType);
continue;
}
searchRepository.deleteIndex(indexType);
searchRepository.createIndex(indexType);
LOG.debug("Recreated index for entityType '{}'.", entityType);
}
}
}

View File

@ -0,0 +1,26 @@
package org.openmetadata.service.search;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
/**
* Implementation of SearchRepositoryProvider.
* This is the default provider that creates standard OpenMetadata SearchRepository instances.
*/
public class DefaultSearchRepositoryProvider implements SearchRepositoryProvider {
@Override
public SearchRepository createSearchRepository(
ElasticSearchConfiguration elasticSearchConfiguration, int maxSize) {
return new SearchRepository(elasticSearchConfiguration, maxSize);
}
@Override
public int getPriority() {
return 0;
}
@Override
public boolean isAvailable() {
return true;
}
}

View File

@ -0,0 +1,11 @@
package org.openmetadata.service.search;
import java.util.Set;
/**
* Interface for cleaning up resources during reindexing operations.
* This allows for different implementations to be provided for different deployment tiers.
*/
public interface RecreateIndexHandler {
void reCreateIndexes(Set<String> entities);
}

View File

@ -34,7 +34,7 @@ import os.org.opensearch.action.bulk.BulkRequest;
import os.org.opensearch.action.bulk.BulkResponse;
import os.org.opensearch.client.RequestOptions;
public interface SearchClient {
public interface SearchClient<T> {
String UPSTREAM_LINEAGE_FIELD = "upstreamLineage";
String FQN_FIELD = "fullyQualifiedName";
ExecutorService asyncExecutor = Executors.newFixedThreadPool(1);
@ -447,6 +447,8 @@ public interface SearchClient {
Object getClient();
T getHighLevelClient();
SearchHealthStatus getSearchHealthStatus() throws IOException;
QueryCostSearchResult getQueryCostRecords(String serviceName) throws IOException;

View File

@ -106,6 +106,9 @@ import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.search.IndexMapping;
import org.openmetadata.search.IndexMappingLoader;
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.bundles.searchIndex.BulkSink;
import org.openmetadata.service.apps.bundles.searchIndex.ElasticSearchBulkSink;
import org.openmetadata.service.apps.bundles.searchIndex.OpenSearchBulkSink;
import org.openmetadata.service.events.lifecycle.EntityLifecycleEventDispatcher;
import org.openmetadata.service.events.lifecycle.handlers.SearchIndexHandler;
import org.openmetadata.service.jdbi3.EntityRepository;
@ -225,9 +228,8 @@ public class SearchRepository {
}
public void createIndexes() {
for (IndexMapping indexMapping : entityIndexMap.values()) {
createIndex(indexMapping);
}
RecreateIndexHandler recreateIndexHandler = this.createReindexHandler();
recreateIndexHandler.reCreateIndexes(entityIndexMap.keySet());
}
public void updateIndexes() {
@ -1420,4 +1422,40 @@ public class SearchRepository {
LOG.error("Failed to initialize NLQ service", e);
}
}
/**
* Creates a BulkSink instance with vector embedding configuration.
* This method can be overridden in subclasses to provide different implementations.
*/
public BulkSink createBulkSink(
int batchSize, int maxConcurrentRequests, long maxPayloadSizeBytes) {
ElasticSearchConfiguration.SearchType searchType = getSearchType();
if (searchType.equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) {
return new OpenSearchBulkSink(this, batchSize, maxConcurrentRequests, maxPayloadSizeBytes);
} else {
return new ElasticSearchBulkSink(this, batchSize, maxConcurrentRequests, maxPayloadSizeBytes);
}
}
/**
* Creates a ReindexHandler instance for recreate operations during reindexing.
* This method can be overridden in subclasses to provide different implementations.
*/
public RecreateIndexHandler createReindexHandler() {
return new DefaultRecreateHandler();
}
/**
* Checks if vector embedding is enabled.
* This method can be overridden in subclasses to provide different configurations.
*/
@SuppressWarnings("unused")
public boolean isVectorEmbeddingEnabled() {
return false;
}
@SuppressWarnings("unused")
public <T> T getHighLevelClient() {
return (T) searchClient.getHighLevelClient();
}
}

View File

@ -0,0 +1,83 @@
package org.openmetadata.service.search;
import java.util.List;
import java.util.ServiceLoader;
import java.util.stream.StreamSupport;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
/**
* Factory for creating SearchRepository instances using Service Provider Interface (SPI).
* This allows different implementations to be plugged in at runtime
* without hardcoded class names or reflection.
*/
@Slf4j
public class SearchRepositoryFactory {
/**
* Creates a SearchRepository instance using the highest priority available provider.
*
* @param elasticSearchConfiguration The Elasticsearch configuration
* @param maxSize The maximum size parameter
* @return SearchRepository instance from the best available provider
*/
public static SearchRepository createSearchRepository(
ElasticSearchConfiguration elasticSearchConfiguration, int maxSize) {
SearchRepositoryProvider provider = findBestProvider();
LOG.info(
"Using SearchRepository provider: {} (priority: {})",
provider.getClass().getSimpleName(),
provider.getPriority());
return provider.createSearchRepository(elasticSearchConfiguration, maxSize);
}
/**
* Finds the best available SearchRepositoryProvider using SPI.
* Returns the provider with the highest priority that is available.
*/
private static SearchRepositoryProvider findBestProvider() {
ServiceLoader<SearchRepositoryProvider> loader =
ServiceLoader.load(SearchRepositoryProvider.class);
List<SearchRepositoryProvider> availableProviders =
StreamSupport.stream(loader.spliterator(), false)
.filter(SearchRepositoryProvider::isAvailable)
.sorted((p1, p2) -> Integer.compare(p2.getPriority(), p1.getPriority()))
.toList();
if (availableProviders.isEmpty()) {
LOG.warn(
"No SearchRepositoryProvider found via SPI, falling back to default OSS implementation");
return new DefaultSearchRepositoryProvider();
}
SearchRepositoryProvider bestProvider = availableProviders.getFirst();
if (availableProviders.size() > 1) {
LOG.info(
"Multiple SearchRepository providers available, selected: {} with priority: {}",
bestProvider.getClass().getSimpleName(),
bestProvider.getPriority());
}
return bestProvider;
}
/**
* Gets all available SearchRepository providers for debugging/inspection.
*
* @return List of all available providers
*/
public static List<SearchRepositoryProvider> getAvailableProviders() {
ServiceLoader<SearchRepositoryProvider> loader =
ServiceLoader.load(SearchRepositoryProvider.class);
return StreamSupport.stream(loader.spliterator(), false)
.filter(SearchRepositoryProvider::isAvailable)
.sorted((p1, p2) -> Integer.compare(p2.getPriority(), p1.getPriority()))
.toList();
}
}

View File

@ -0,0 +1,36 @@
package org.openmetadata.service.search;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
/**
* Service Provider Interface for creating SearchRepository instances.
*/
public interface SearchRepositoryProvider {
/**
* Creates a SearchRepository instance.
*
* @param elasticSearchConfiguration The Elasticsearch configuration
* @param maxSize The maximum size parameter
* @return SearchRepository instance
*/
SearchRepository createSearchRepository(
ElasticSearchConfiguration elasticSearchConfiguration, int maxSize);
/**
* Returns the priority of this provider.
* Higher priority providers will be selected first.
* OSS should return 0, Paid versions should return higher values.
*
* @return priority value
*/
int getPriority();
/**
* Returns true if this provider is available/applicable.
* This allows providers to check for required dependencies, licenses, etc.
*
* @return true if this provider can be used
*/
boolean isAvailable();
}

View File

@ -193,7 +193,7 @@ import org.openmetadata.service.util.FullyQualifiedName;
import org.openmetadata.service.workflows.searchIndex.ReindexingUtil;
@Slf4j
public class ElasticSearchClient implements SearchClient {
public class ElasticSearchClient implements SearchClient<RestHighLevelClient> {
@SuppressWarnings("deprecated")
@Getter
@ -2453,6 +2453,11 @@ public class ElasticSearchClient implements SearchClient {
return client.getLowLevelClient();
}
@Override
public RestHighLevelClient getHighLevelClient() {
return client;
}
@Override
public SearchHealthStatus getSearchHealthStatus() throws IOException {
ClusterHealthRequest request = new ClusterHealthRequest();

View File

@ -216,7 +216,7 @@ import os.org.opensearch.search.sort.SortOrder;
@Slf4j
// Not tagged with Repository annotation as it is programmatically initialized
public class OpenSearchClient implements SearchClient {
public class OpenSearchClient implements SearchClient<RestHighLevelClient> {
@Getter protected final RestHighLevelClient client;
private final boolean isClientAvailable;
private final RBACConditionEvaluator rbacConditionEvaluator;
@ -2583,6 +2583,11 @@ public class OpenSearchClient implements SearchClient {
return client.getLowLevelClient();
}
@Override
public RestHighLevelClient getHighLevelClient() {
return client;
}
private void buildSearchRBACQuery(
SubjectContext subjectContext, SearchSourceBuilder searchSourceBuilder) {
if (shouldApplyRbacConditions(subjectContext, rbacConditionEvaluator)) {

View File

@ -100,6 +100,7 @@ import org.openmetadata.service.resources.apps.AppMarketPlaceMapper;
import org.openmetadata.service.resources.databases.DatasourceConfig;
import org.openmetadata.service.search.IndexMappingVersionTracker;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.search.SearchRepositoryFactory;
import org.openmetadata.service.secrets.SecretsManager;
import org.openmetadata.service.secrets.SecretsManagerFactory;
import org.openmetadata.service.secrets.SecretsManagerUpdateService;
@ -1435,7 +1436,7 @@ public class OpenMetadataOperations implements Callable<Integer> {
jdbi = JdbiUtils.createAndSetupJDBI(dataSourceFactory);
searchRepository =
new SearchRepository(
SearchRepositoryFactory.createSearchRepository(
config.getElasticSearchConfiguration(), config.getDataSourceFactory().getMaxSize());
// Initialize secrets manager

View File

@ -0,0 +1 @@
org.openmetadata.service.search.DefaultSearchRepositoryProvider

View File

@ -65,6 +65,7 @@ import org.openmetadata.service.resources.events.SlackCallbackResource;
import org.openmetadata.service.resources.events.WebhookCallbackResource;
import org.openmetadata.service.resources.settings.SettingsCache;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.search.SearchRepositoryFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
@ -339,7 +340,9 @@ public abstract class OpenMetadataApplicationTest {
private void createIndices() {
ElasticSearchConfiguration esConfig = getEsConfig();
SearchRepository searchRepository = new SearchRepository(esConfig, 50);
SearchRepository searchRepository =
SearchRepositoryFactory.createSearchRepository(esConfig, 50);
Entity.setSearchRepository(searchRepository);
LOG.info("creating indexes.");
searchRepository.createIndexes();
}

View File

@ -80,4 +80,24 @@ class ElasticSearchBulkSinkSimpleTest {
recreateIndex = (Boolean) contextData.getOrDefault("recreateIndex", false);
assertEquals(false, recreateIndex);
}
@Test
void testIsVectorEmbeddingEnabledForEntity() {
// Test default implementation returns false
boolean result = elasticSearchBulkSink.isVectorEmbeddingEnabledForEntity("table");
assertEquals(false, result);
result = elasticSearchBulkSink.isVectorEmbeddingEnabledForEntity("user");
assertEquals(false, result);
result = elasticSearchBulkSink.isVectorEmbeddingEnabledForEntity("dashboard");
assertEquals(false, result);
}
@Test
void testAddEntityToVectorIndex() {
// Test default implementation does nothing (no exception thrown)
// This should not throw any exception as the default implementation is empty
elasticSearchBulkSink.addEntityToVectorIndex(null, null, true);
}
}

View File

@ -81,4 +81,24 @@ class OpenSearchBulkSinkSimpleTest {
recreateIndex = (Boolean) contextData.getOrDefault("recreateIndex", false);
assertEquals(false, recreateIndex);
}
@Test
void testIsVectorEmbeddingEnabledForEntity() {
// Test default implementation returns false
boolean result = openSearchBulkSink.isVectorEmbeddingEnabledForEntity("table");
assertEquals(false, result);
result = openSearchBulkSink.isVectorEmbeddingEnabledForEntity("user");
assertEquals(false, result);
result = openSearchBulkSink.isVectorEmbeddingEnabledForEntity("dashboard");
assertEquals(false, result);
}
@Test
void testAddEntityToVectorIndex() {
// Test default implementation does nothing (no exception thrown)
// This should not throw any exception as the default implementation is empty
openSearchBulkSink.addEntityToVectorIndex(null, null, true);
}
}

View File

@ -36,6 +36,7 @@ import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.app.FailureContext;
import org.openmetadata.schema.entity.app.SuccessContext;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.schema.system.EntityError;
import org.openmetadata.schema.system.EventPublisherJob;
import org.openmetadata.schema.system.IndexingError;
@ -556,4 +557,48 @@ public class SearchIndexAppTest extends OpenMetadataApplicationTest {
resultJobData.getMaxConcurrentRequests() > 0, "Concurrent requests should be positive");
assertTrue(resultJobData.getProducerThreads() > 0, "Producer threads should be positive");
}
@Test
void testSearchIndexSinkInitializationWithElasticSearch() {
App testApp =
new App()
.withName("SearchIndexingApplication")
.withAppConfiguration(JsonUtils.convertValue(testJobData, Object.class));
lenient()
.when(searchRepository.getSearchType())
.thenReturn(ElasticSearchConfiguration.SearchType.ELASTICSEARCH);
lenient().when(searchRepository.createBulkSink(5, 10, 1000000L)).thenReturn(mockSink);
assertDoesNotThrow(() -> searchIndexApp.init(testApp));
EventPublisherJob jobData = searchIndexApp.getJobData();
assertNotNull(jobData);
assertEquals(5, jobData.getBatchSize());
assertEquals(10, jobData.getMaxConcurrentRequests());
assertEquals(1000000L, jobData.getPayLoadSize());
}
@Test
void testSearchIndexSinkInitializationWithOpenSearch() {
App testApp =
new App()
.withName("SearchIndexingApplication")
.withAppConfiguration(JsonUtils.convertValue(testJobData, Object.class));
lenient()
.when(searchRepository.getSearchType())
.thenReturn(ElasticSearchConfiguration.SearchType.OPENSEARCH);
lenient().when(searchRepository.createBulkSink(5, 10, 1000000L)).thenReturn(mockSink);
assertDoesNotThrow(() -> searchIndexApp.init(testApp));
EventPublisherJob jobData = searchIndexApp.getJobData();
assertNotNull(jobData);
assertEquals(5, jobData.getBatchSize());
assertEquals(10, jobData.getMaxConcurrentRequests());
assertEquals(1000000L, jobData.getPayLoadSize());
}
}

View File

@ -0,0 +1,97 @@
package org.openmetadata.service.search;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.service.apps.bundles.searchIndex.BulkSink;
import org.openmetadata.service.apps.bundles.searchIndex.ElasticSearchBulkSink;
import org.openmetadata.service.apps.bundles.searchIndex.OpenSearchBulkSink;
@ExtendWith(MockitoExtension.class)
class SearchRepositoryTest {
@Mock private SearchRepository searchRepository;
@BeforeEach
void setUp() {
// Create a real instance for testing the new methods
searchRepository = mock(SearchRepository.class);
// Enable calling real methods for the methods we want to test
lenient().when(searchRepository.createBulkSink(10, 2, 1000000L)).thenCallRealMethod();
lenient().when(searchRepository.isVectorEmbeddingEnabled()).thenCallRealMethod();
}
@Test
void testCreateBulkSinkForElasticSearch() {
// Mock SearchRepository to return ElasticSearch type
lenient()
.when(searchRepository.getSearchType())
.thenReturn(ElasticSearchConfiguration.SearchType.ELASTICSEARCH);
BulkSink bulkSink = searchRepository.createBulkSink(10, 2, 1000000L);
assertNotNull(bulkSink);
assertInstanceOf(ElasticSearchBulkSink.class, bulkSink);
}
@Test
void testCreateBulkSinkForOpenSearch() {
// Mock SearchRepository to return OpenSearch type
lenient()
.when(searchRepository.getSearchType())
.thenReturn(ElasticSearchConfiguration.SearchType.OPENSEARCH);
BulkSink bulkSink = searchRepository.createBulkSink(10, 2, 1000000L);
assertNotNull(bulkSink);
assertInstanceOf(OpenSearchBulkSink.class, bulkSink);
}
@Test
void testCreateBulkSinkWithDifferentParameters() {
// Test with different parameter values
lenient()
.when(searchRepository.getSearchType())
.thenReturn(ElasticSearchConfiguration.SearchType.ELASTICSEARCH);
BulkSink bulkSink1 = searchRepository.createBulkSink(50, 5, 5000000L);
assertNotNull(bulkSink1);
assertInstanceOf(ElasticSearchBulkSink.class, bulkSink1);
BulkSink bulkSink2 = searchRepository.createBulkSink(100, 10, 10000000L);
assertNotNull(bulkSink2);
assertInstanceOf(ElasticSearchBulkSink.class, bulkSink2);
}
@Test
void testIsVectorEmbeddingEnabled() {
// Test default implementation returns false
boolean result = searchRepository.isVectorEmbeddingEnabled();
assertFalse(result);
}
@Test
void testCreateBulkSinkParameterValidation() {
lenient()
.when(searchRepository.getSearchType())
.thenReturn(ElasticSearchConfiguration.SearchType.ELASTICSEARCH);
// Test with minimum values
BulkSink bulkSink1 = searchRepository.createBulkSink(1, 1, 1L);
assertNotNull(bulkSink1);
// Test with large values
BulkSink bulkSink2 = searchRepository.createBulkSink(1000, 100, 100000000L);
assertNotNull(bulkSink2);
}
}