Adds Ability to Extend search functionalty (#17508)

* Adds Ability to Extend search functionalty

* Fix Tests
This commit is contained in:
Mohit Yadav 2024-08-20 23:14:30 +05:30 committed by GitHub
parent bee4bda501
commit d906bcfcac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 45 additions and 25 deletions

View File

@ -596,4 +596,8 @@ public final class Entity {
public static <T> T getDao() { public static <T> T getDao() {
return (T) collectionDAO; return (T) collectionDAO;
} }
public static <T> T getSearchRepo() {
return (T) searchRepository;
}
} }

View File

@ -65,6 +65,7 @@ import org.openmetadata.schema.api.security.AuthenticationConfiguration;
import org.openmetadata.schema.api.security.AuthorizerConfiguration; import org.openmetadata.schema.api.security.AuthorizerConfiguration;
import org.openmetadata.schema.api.security.ClientType; import org.openmetadata.schema.api.security.ClientType;
import org.openmetadata.schema.configuration.LimitsConfiguration; import org.openmetadata.schema.configuration.LimitsConfiguration;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.schema.services.connections.metadata.AuthProvider; import org.openmetadata.schema.services.connections.metadata.AuthProvider;
import org.openmetadata.service.apps.ApplicationHandler; import org.openmetadata.service.apps.ApplicationHandler;
import org.openmetadata.service.apps.scheduler.AppScheduler; import org.openmetadata.service.apps.scheduler.AppScheduler;
@ -164,9 +165,7 @@ public class OpenMetadataApplication extends Application<OpenMetadataApplication
jdbi = createAndSetupJDBI(environment, catalogConfig.getDataSourceFactory()); jdbi = createAndSetupJDBI(environment, catalogConfig.getDataSourceFactory());
Entity.setCollectionDAO(getDao(jdbi)); Entity.setCollectionDAO(getDao(jdbi));
// initialize Search Repository, all repositories use SearchRepository this line should always installSearchRepository(catalogConfig.getElasticSearchConfiguration());
// before initializing repository
new SearchRepository(catalogConfig.getElasticSearchConfiguration());
// Initialize the MigrationValidationClient, used in the Settings Repository // Initialize the MigrationValidationClient, used in the Settings Repository
MigrationValidationClient.initialize(jdbi.onDemand(MigrationDAO.class), catalogConfig); MigrationValidationClient.initialize(jdbi.onDemand(MigrationDAO.class), catalogConfig);
// as first step register all the repositories // as first step register all the repositories
@ -302,6 +301,13 @@ public class OpenMetadataApplication extends Application<OpenMetadataApplication
} }
} }
protected void installSearchRepository(ElasticSearchConfiguration esConfig) {
// initialize Search Repository, all repositories use SearchRepository this line should always
// before initializing repository
SearchRepository searchRepository = new SearchRepository(esConfig);
Entity.setSearchRepository(searchRepository);
}
private void registerHealthCheck(Environment environment) { private void registerHealthCheck(Environment environment) {
environment environment
.healthChecks() .healthChecks()

View File

@ -115,7 +115,7 @@ public class AppResource extends EntityResource<App, AppRepository> {
// Create an On Demand DAO // Create an On Demand DAO
CollectionDAO dao = Entity.getCollectionDAO(); CollectionDAO dao = Entity.getCollectionDAO();
searchRepository = new SearchRepository(config.getElasticSearchConfiguration()); searchRepository = Entity.getSearchRepository();
AppScheduler.initialize(config, dao, searchRepository); AppScheduler.initialize(config, dao, searchRepository);
// Initialize Default Apps // Initialize Default Apps

View File

@ -135,6 +135,11 @@ public interface SearchClient {
String entityType) String entityType)
throws IOException; throws IOException;
default Response listPageHierarchy(String parent) {
throw new CustomExceptionMessage(
Response.Status.NOT_IMPLEMENTED, NOT_IMPLEMENTED_ERROR_TYPE, NOT_IMPLEMENTED_METHOD);
}
Map<String, Object> searchLineageInternal( Map<String, Object> searchLineageInternal(
String fqn, String fqn,
int upstreamDepth, int upstreamDepth,

View File

@ -109,30 +109,14 @@ public class SearchRepository {
public SearchRepository(ElasticSearchConfiguration config) { public SearchRepository(ElasticSearchConfiguration config) {
elasticSearchConfiguration = config; elasticSearchConfiguration = config;
if (config != null searchClient = buildSearchClient(config);
&& config.getSearchType() == ElasticSearchConfiguration.SearchType.OPENSEARCH) { searchIndexFactory = buildIndexFactory();
searchClient = new OpenSearchClient(config);
} else {
searchClient = new ElasticSearchClient(config);
}
try {
if (config != null && (!nullOrEmpty(config.getSearchIndexFactoryClassName()))) {
this.searchIndexFactory =
Class.forName(config.getSearchIndexFactoryClassName())
.asSubclass(SearchIndexFactory.class)
.getDeclaredConstructor()
.newInstance();
}
} catch (Exception e) {
LOG.warn("Failed to initialize search index factory using default one", e);
}
language = language =
config != null && config.getSearchIndexMappingLanguage() != null config != null && config.getSearchIndexMappingLanguage() != null
? config.getSearchIndexMappingLanguage().value() ? config.getSearchIndexMappingLanguage().value()
: "en"; : "en";
clusterAlias = config != null ? config.getClusterAlias() : ""; clusterAlias = config != null ? config.getClusterAlias() : "";
loadIndexMappings(); loadIndexMappings();
Entity.setSearchRepository(this);
} }
private void loadIndexMappings() { private void loadIndexMappings() {
@ -164,6 +148,21 @@ public class SearchRepository {
} }
} }
public SearchClient buildSearchClient(ElasticSearchConfiguration config) {
SearchClient sc;
if (config != null
&& config.getSearchType() == ElasticSearchConfiguration.SearchType.OPENSEARCH) {
sc = new OpenSearchClient(config);
} else {
sc = new ElasticSearchClient(config);
}
return sc;
}
public SearchIndexFactory buildIndexFactory() {
return new SearchIndexFactory();
}
public ElasticSearchConfiguration.SearchType getSearchType() { public ElasticSearchConfiguration.SearchType getSearchType() {
return searchClient.getSearchType(); return searchClient.getSearchType();
} }
@ -881,4 +880,8 @@ public class SearchRepository {
} }
return new ArrayList<>(); return new ArrayList<>();
} }
public <T> T getRestHighLevelClient() {
return (T) searchClient;
}
} }

View File

@ -208,7 +208,7 @@ import org.openmetadata.service.workflows.searchIndex.ReindexingUtil;
public class ElasticSearchClient implements SearchClient { public class ElasticSearchClient implements SearchClient {
@SuppressWarnings("deprecated") @SuppressWarnings("deprecated")
private final RestHighLevelClient client; protected final RestHighLevelClient client;
private final boolean isClientAvailable; private final boolean isClientAvailable;
public static final NamedXContentRegistry xContentRegistry; public static final NamedXContentRegistry xContentRegistry;

View File

@ -205,7 +205,7 @@ import os.org.opensearch.search.suggest.completion.context.CategoryQueryContext;
@Slf4j @Slf4j
// Not tagged with Repository annotation as it is programmatically initialized // Not tagged with Repository annotation as it is programmatically initialized
public class OpenSearchClient implements SearchClient { public class OpenSearchClient implements SearchClient {
private final RestHighLevelClient client; protected final RestHighLevelClient client;
public static final NamedXContentRegistry X_CONTENT_REGISTRY; public static final NamedXContentRegistry X_CONTENT_REGISTRY;
private final boolean isClientAvailable; private final boolean isClientAvailable;

View File

@ -232,7 +232,9 @@ public abstract class OpenMetadataApplicationTest {
pipelineServiceClientConfiguration, pipelineServiceClientConfiguration,
forceMigrations); forceMigrations);
// Initialize search repository // Initialize search repository
new SearchRepository(config.getElasticSearchConfiguration()); SearchRepository searchRepository =
new SearchRepository(config.getElasticSearchConfiguration());
Entity.setSearchRepository(searchRepository);
Entity.setCollectionDAO(jdbi.onDemand(CollectionDAO.class)); Entity.setCollectionDAO(jdbi.onDemand(CollectionDAO.class));
Entity.initializeRepositories(config, jdbi); Entity.initializeRepositories(config, jdbi);
workflow.loadMigrations(); workflow.loadMigrations();