mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-27 08:44:49 +00:00
changes needed for adding knowledgeCenterIndex (#14223)
* chnages needed for adding knowledgeCenterIndex * fix tests * CI fix
This commit is contained in:
parent
f1e4142acf
commit
db921645f3
@ -35,6 +35,7 @@ import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import javax.ws.rs.BadRequestException;
|
||||
import javax.ws.rs.core.UriInfo;
|
||||
import lombok.Getter;
|
||||
import lombok.NonNull;
|
||||
@ -63,6 +64,7 @@ import org.openmetadata.service.jdbi3.TokenRepository;
|
||||
import org.openmetadata.service.jdbi3.UsageRepository;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser.EntityLink;
|
||||
import org.openmetadata.service.search.SearchRepository;
|
||||
import org.openmetadata.service.search.indexes.SearchIndex;
|
||||
import org.openmetadata.service.util.EntityUtil.Fields;
|
||||
import org.openmetadata.service.util.FullyQualifiedName;
|
||||
|
||||
@ -555,4 +557,11 @@ public final class Entity {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static SearchIndex buildSearchIndex(String entityType, Object entity) {
|
||||
if (searchRepository != null) {
|
||||
return searchRepository.getSearchIndexFactory().buildIndex(entityType, entity);
|
||||
}
|
||||
throw new BadRequestException("searchrepository not initialized");
|
||||
}
|
||||
}
|
||||
|
||||
@ -90,6 +90,7 @@ import org.openmetadata.service.monitoring.EventMonitorPublisher;
|
||||
import org.openmetadata.service.resources.CollectionRegistry;
|
||||
import org.openmetadata.service.resources.databases.DatasourceConfig;
|
||||
import org.openmetadata.service.resources.settings.SettingsCache;
|
||||
import org.openmetadata.service.search.SearchIndexFactory;
|
||||
import org.openmetadata.service.search.SearchRepository;
|
||||
import org.openmetadata.service.secrets.SecretsManager;
|
||||
import org.openmetadata.service.secrets.SecretsManagerFactory;
|
||||
@ -138,7 +139,7 @@ public class OpenMetadataApplication extends Application<OpenMetadataApplication
|
||||
|
||||
// initialize Search Repository, all repositories use SearchRepository this line should always before initializing
|
||||
// repository
|
||||
new SearchRepository(catalogConfig.getElasticSearchConfiguration());
|
||||
new SearchRepository(catalogConfig.getElasticSearchConfiguration(), new SearchIndexFactory());
|
||||
// as first step register all the repositories
|
||||
Entity.initializeRepositories(catalogConfig, jdbi);
|
||||
|
||||
|
||||
@ -72,6 +72,7 @@ import org.openmetadata.service.jdbi3.IngestionPipelineRepository;
|
||||
import org.openmetadata.service.jdbi3.ListFilter;
|
||||
import org.openmetadata.service.resources.Collection;
|
||||
import org.openmetadata.service.resources.EntityResource;
|
||||
import org.openmetadata.service.search.SearchIndexFactory;
|
||||
import org.openmetadata.service.search.SearchRepository;
|
||||
import org.openmetadata.service.secrets.SecretsManager;
|
||||
import org.openmetadata.service.secrets.SecretsManagerFactory;
|
||||
@ -107,7 +108,7 @@ public class AppResource extends EntityResource<App, AppRepository> {
|
||||
|
||||
// Create an On Demand DAO
|
||||
CollectionDAO dao = Entity.getCollectionDAO();
|
||||
searchRepository = new SearchRepository(config.getElasticSearchConfiguration());
|
||||
searchRepository = new SearchRepository(config.getElasticSearchConfiguration(), new SearchIndexFactory());
|
||||
|
||||
try {
|
||||
AppScheduler.initialize(dao, searchRepository);
|
||||
|
||||
@ -73,9 +73,8 @@ import org.openmetadata.service.search.indexes.WebAnalyticUserActivityReportData
|
||||
|
||||
@Slf4j
|
||||
public class SearchIndexFactory {
|
||||
private SearchIndexFactory() {}
|
||||
|
||||
public static SearchIndex buildIndex(String entityType, Object entity) {
|
||||
public SearchIndex buildIndex(String entityType, Object entity) {
|
||||
switch (entityType) {
|
||||
case Entity.TABLE:
|
||||
return new TableIndex((Table) entity);
|
||||
@ -150,8 +149,11 @@ public class SearchIndexFactory {
|
||||
case Entity.TEST_CASE_RESOLUTION_STATUS:
|
||||
return new TestCaseResolutionStatusIndex((TestCaseResolutionStatus) entity);
|
||||
default:
|
||||
LOG.warn("Ignoring Entity Type {}", entityType);
|
||||
return buildExternalIndexes(entityType, entity);
|
||||
}
|
||||
}
|
||||
|
||||
protected SearchIndex buildExternalIndexes(String entityType, Object entity) {
|
||||
throw new IllegalArgumentException(String.format("Entity Type [%s] is not valid for Index Factory", entityType));
|
||||
}
|
||||
}
|
||||
|
||||
@ -36,6 +36,7 @@ import java.util.SortedMap;
|
||||
import javax.json.JsonObject;
|
||||
import javax.ws.rs.core.Response;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang.exception.ExceptionUtils;
|
||||
import org.apache.commons.lang3.tuple.ImmutablePair;
|
||||
@ -69,6 +70,8 @@ public class SearchRepository {
|
||||
|
||||
private final String language;
|
||||
|
||||
@Getter @Setter private SearchIndexFactory searchIndexFactory;
|
||||
|
||||
private final List<String> inheritableFields =
|
||||
List.of(Entity.FIELD_OWNER, Entity.FIELD_DOMAIN, Entity.FIELD_DISABLED);
|
||||
private final List<String> propagateFields = List.of(Entity.FIELD_TAGS);
|
||||
@ -85,13 +88,14 @@ public class SearchRepository {
|
||||
|
||||
public static final String ELASTIC_SEARCH_EXTENSION = "service.eventPublisher";
|
||||
|
||||
public SearchRepository(ElasticSearchConfiguration config) {
|
||||
public SearchRepository(ElasticSearchConfiguration config, SearchIndexFactory searchIndexFactory) {
|
||||
elasticSearchConfiguration = config;
|
||||
if (config != null && config.getSearchType() == ElasticSearchConfiguration.SearchType.OPENSEARCH) {
|
||||
searchClient = new OpenSearchClient(config);
|
||||
} else {
|
||||
searchClient = new ElasticSearchClient(config);
|
||||
}
|
||||
this.searchIndexFactory = searchIndexFactory;
|
||||
this.language = config != null ? config.getSearchIndexMappingLanguage().value() : "en";
|
||||
loadIndexMappings();
|
||||
Entity.setSearchRepository(this);
|
||||
@ -106,17 +110,29 @@ public class SearchRepository {
|
||||
}
|
||||
|
||||
private void loadIndexMappings() {
|
||||
Set<String> entities;
|
||||
entityIndexMap = new HashMap<>();
|
||||
try (InputStream in = getClass().getResourceAsStream("/elasticsearch/indexMapping.json")) {
|
||||
assert in != null;
|
||||
JsonObject jsonPayload = JsonUtils.readJson(new String(in.readAllBytes())).asJsonObject();
|
||||
Set<String> entities = jsonPayload.keySet();
|
||||
entities = jsonPayload.keySet();
|
||||
for (String s : entities) {
|
||||
entityIndexMap.put(s, JsonUtils.readValue(jsonPayload.get(s).toString(), IndexMapping.class));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to load indexMapping.json");
|
||||
}
|
||||
try (InputStream in2 = getClass().getResourceAsStream("/elasticsearch/collate/indexMapping.json")) {
|
||||
if (in2 != null) {
|
||||
JsonObject jsonPayload = JsonUtils.readJson(new String(in2.readAllBytes())).asJsonObject();
|
||||
entities = jsonPayload.keySet();
|
||||
for (String s : entities) {
|
||||
entityIndexMap.put(s, JsonUtils.readValue(jsonPayload.get(s).toString(), IndexMapping.class));
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to load indexMapping.json");
|
||||
}
|
||||
}
|
||||
|
||||
public ElasticSearchConfiguration.SearchType getSearchType() {
|
||||
@ -202,7 +218,7 @@ public class SearchRepository {
|
||||
String entityType = entity.getEntityReference().getType();
|
||||
try {
|
||||
IndexMapping indexMapping = entityIndexMap.get(entityType);
|
||||
SearchIndex index = SearchIndexFactory.buildIndex(entityType, entity);
|
||||
SearchIndex index = searchIndexFactory.buildIndex(entityType, entity);
|
||||
String doc = JsonUtils.pojoToJson(index.buildESDoc());
|
||||
searchClient.createEntity(indexMapping.getIndexName(), entityId, doc);
|
||||
} catch (Exception ie) {
|
||||
@ -226,7 +242,7 @@ public class SearchRepository {
|
||||
String entityId = entity.getId().toString();
|
||||
try {
|
||||
IndexMapping indexMapping = entityIndexMap.get(entityType);
|
||||
SearchIndex index = SearchIndexFactory.buildIndex(entityType, entity);
|
||||
SearchIndex index = searchIndexFactory.buildIndex(entityType, entity);
|
||||
String doc = JsonUtils.pojoToJson(index.buildESDoc());
|
||||
searchClient.createTimeSeriesEntity(indexMapping.getIndexName(), entityId, doc);
|
||||
} catch (Exception ie) {
|
||||
@ -250,7 +266,7 @@ public class SearchRepository {
|
||||
&& Objects.equals(entity.getVersion(), entity.getChangeDescription().getPreviousVersion())) {
|
||||
scriptTxt = getScriptWithParams(entity, doc);
|
||||
} else {
|
||||
SearchIndex elasticSearchIndex = SearchIndexFactory.buildIndex(entityType, entity);
|
||||
SearchIndex elasticSearchIndex = searchIndexFactory.buildIndex(entityType, entity);
|
||||
doc = elasticSearchIndex.buildESDoc();
|
||||
}
|
||||
searchClient.updateEntity(indexMapping.getIndexName(), entityId, doc, scriptTxt);
|
||||
|
||||
@ -905,6 +905,7 @@ public class ElasticSearchClient implements SearchClient {
|
||||
AggregationBuilders.terms(OWNER_DISPLAY_NAME_KEYWORD)
|
||||
.field(OWNER_DISPLAY_NAME_KEYWORD)
|
||||
.size(MAX_AGGREGATE_SIZE))
|
||||
.aggregation(AggregationBuilders.terms("owner.displayName").field("owner.displayName").size(MAX_AGGREGATE_SIZE))
|
||||
.aggregation(
|
||||
AggregationBuilders.terms(DOMAIN_DISPLAY_NAME_KEYWORD)
|
||||
.field(DOMAIN_DISPLAY_NAME_KEYWORD)
|
||||
|
||||
@ -15,7 +15,6 @@ import org.openmetadata.schema.EntityInterface;
|
||||
import org.openmetadata.schema.system.StepStats;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.exception.ProcessorException;
|
||||
import org.openmetadata.service.search.SearchIndexFactory;
|
||||
import org.openmetadata.service.search.models.IndexMapping;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
import org.openmetadata.service.util.ResultList;
|
||||
@ -73,7 +72,7 @@ public class ElasticSearchEntitiesProcessor implements Processor<BulkRequest, Re
|
||||
IndexMapping indexMapping = Entity.getSearchRepository().getIndexMapping(entityType);
|
||||
UpdateRequest updateRequest = new UpdateRequest(indexMapping.getIndexName(), entity.getId().toString());
|
||||
updateRequest.doc(
|
||||
JsonUtils.pojoToJson(Objects.requireNonNull(SearchIndexFactory.buildIndex(entityType, entity)).buildESDoc()),
|
||||
JsonUtils.pojoToJson(Objects.requireNonNull(Entity.buildSearchIndex(entityType, entity)).buildESDoc()),
|
||||
XContentType.JSON);
|
||||
updateRequest.docAsUpsert(true);
|
||||
return updateRequest;
|
||||
|
||||
@ -12,7 +12,6 @@ import org.openmetadata.schema.EntityInterface;
|
||||
import org.openmetadata.schema.system.StepStats;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.exception.ProcessorException;
|
||||
import org.openmetadata.service.search.SearchIndexFactory;
|
||||
import org.openmetadata.service.search.models.IndexMapping;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
import org.openmetadata.service.util.ResultList;
|
||||
@ -73,7 +72,7 @@ public class OpenSearchEntitiesProcessor implements Processor<BulkRequest, Resul
|
||||
IndexMapping indexMapping = Entity.getSearchRepository().getIndexMapping(entityType);
|
||||
UpdateRequest updateRequest = new UpdateRequest(indexMapping.getIndexName(), entity.getId().toString());
|
||||
updateRequest.doc(
|
||||
JsonUtils.pojoToJson(Objects.requireNonNull(SearchIndexFactory.buildIndex(entityType, entity)).buildESDoc()),
|
||||
JsonUtils.pojoToJson(Objects.requireNonNull(Entity.buildSearchIndex(entityType, entity)).buildESDoc()),
|
||||
XContentType.JSON);
|
||||
updateRequest.docAsUpsert(true);
|
||||
return updateRequest;
|
||||
|
||||
@ -50,6 +50,7 @@ import org.openmetadata.service.jdbi3.locator.ConnectionAwareAnnotationSqlLocato
|
||||
import org.openmetadata.service.jdbi3.locator.ConnectionType;
|
||||
import org.openmetadata.service.migration.api.MigrationWorkflow;
|
||||
import org.openmetadata.service.resources.databases.DatasourceConfig;
|
||||
import org.openmetadata.service.search.SearchIndexFactory;
|
||||
import org.openmetadata.service.search.SearchRepository;
|
||||
import org.openmetadata.service.secrets.SecretsManagerFactory;
|
||||
import org.openmetadata.service.util.jdbi.DatabaseAuthenticationProviderFactory;
|
||||
@ -255,7 +256,8 @@ public final class TablesInitializer {
|
||||
jdbi.installPlugin(new SqlObjectPlugin());
|
||||
jdbi.getConfig(SqlObjects.class)
|
||||
.setSqlLocator(new ConnectionAwareAnnotationSqlLocator(config.getDataSourceFactory().getDriverClass()));
|
||||
SearchRepository searchRepository = new SearchRepository(config.getElasticSearchConfiguration());
|
||||
SearchRepository searchRepository =
|
||||
new SearchRepository(config.getElasticSearchConfiguration(), new SearchIndexFactory());
|
||||
|
||||
// Initialize secrets manager
|
||||
SecretsManagerFactory.createSecretsManager(config.getSecretsManagerConfiguration(), config.getClusterName());
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user