Refactor elasticsearch search indexed (#13451)

This commit is contained in:
jmacryl 2025-05-14 15:57:08 +02:00 committed by GitHub
parent c756af31b1
commit 5749f6f970
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 302 additions and 471 deletions

View File

@ -7,7 +7,6 @@ import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.registry.SchemaRegistryService;
import com.linkedin.metadata.registry.SchemaRegistryServiceImpl;
import com.linkedin.metadata.search.SearchService;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders;
import com.linkedin.mxe.TopicConventionImpl;
import io.ebean.Database;
import org.springframework.boot.test.context.TestConfiguration;
@ -31,8 +30,6 @@ public class UpgradeCliApplicationTestConfiguration {
@MockBean public ConfigEntityRegistry configEntityRegistry;
@MockBean public EntityIndexBuilders entityIndexBuilders;
@Bean
public SchemaRegistryService schemaRegistryService() {
return new SchemaRegistryServiceImpl(new TopicConventionImpl());

View File

@ -137,6 +137,11 @@ public class ElasticSearchGraphService implements GraphService, ElasticSearchInd
return searchDocument.toString();
}
@Override
public ESIndexBuilder getIndexBuilder() {
return _indexBuilder;
}
@Override
public LineageRegistry getLineageRegistry() {
return _lineageRegistry;

View File

@ -6,6 +6,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.browse.BrowseResult;
import com.linkedin.metadata.browse.BrowseResultV2;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.query.AutoCompleteResult;
import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.metadata.query.filter.Filter;
@ -13,8 +14,7 @@ import com.linkedin.metadata.query.filter.SortCriterion;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.search.ScrollResult;
import com.linkedin.metadata.search.SearchResult;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ReindexConfig;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.*;
import com.linkedin.metadata.search.elasticsearch.query.ESBrowseDAO;
import com.linkedin.metadata.search.elasticsearch.query.ESSearchDAO;
import com.linkedin.metadata.search.elasticsearch.update.ESWriteDAO;
@ -25,13 +25,7 @@ import com.linkedin.structured.StructuredPropertyDefinition;
import com.linkedin.util.Pair;
import io.datahubproject.metadata.context.OperationContext;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@ -44,6 +38,10 @@ import org.opensearch.action.search.SearchResponse;
@Slf4j
@RequiredArgsConstructor
public class ElasticSearchService implements EntitySearchService, ElasticSearchIndexed {
private final ESIndexBuilder indexBuilder;
private final EntityRegistry entityRegistry;
@Getter private final IndexConvention indexConvention;
private final SettingsBuilder settingsBuilder;
public static final SearchFlags DEFAULT_SERVICE_SEARCH_FLAGS =
new SearchFlags()
@ -63,20 +61,68 @@ public class ElasticSearchService implements EntitySearchService, ElasticSearchI
+ "if (ctx._source.runId.length > params.maxRunIds) { ctx._source.runId.remove(0) } } "
+ "} else { ctx._source.runId = [params.runId] }";
private final EntityIndexBuilders indexBuilders;
@VisibleForTesting @Getter private final ESSearchDAO esSearchDAO;
private final ESBrowseDAO esBrowseDAO;
private final ESWriteDAO esWriteDAO;
@Override
public void reindexAll(Collection<Pair<Urn, StructuredPropertyDefinition>> properties) {
indexBuilders.reindexAll(properties);
for (ReindexConfig config : buildReindexConfigs(properties)) {
try {
indexBuilder.buildIndex(config);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
@Override
public List<ReindexConfig> buildReindexConfigs(
Collection<Pair<Urn, StructuredPropertyDefinition>> properties) throws IOException {
return indexBuilders.buildReindexConfigs(properties);
Collection<Pair<Urn, StructuredPropertyDefinition>> properties) {
Map<String, Object> settings = settingsBuilder.getSettings();
return entityRegistry.getEntitySpecs().values().stream()
.map(
entitySpec -> {
try {
Map<String, Object> mappings =
MappingsBuilder.getMappings(entityRegistry, entitySpec, properties);
return indexBuilder.buildReindexState(
indexConvention.getIndexName(entitySpec), mappings, settings);
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toList());
}
/**
* Given a structured property generate all entity index configurations impacted by it, preserving
* existing properties
*
* @param property the new property
* @return index configurations impacted by the new property
*/
public List<ReindexConfig> buildReindexConfigsWithNewStructProp(
Urn urn, StructuredPropertyDefinition property) {
Map<String, Object> settings = settingsBuilder.getSettings();
return entityRegistry.getEntitySpecs().values().stream()
.map(
entitySpec -> {
try {
Map<String, Object> mappings =
MappingsBuilder.getMappings(
entityRegistry, entitySpec, List.of(Pair.of(urn, property)));
return indexBuilder.buildReindexState(
indexConvention.getIndexName(entitySpec), mappings, settings, true);
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.filter(Objects::nonNull)
.filter(ReindexConfig::hasNewStructuredProperty)
.collect(Collectors.toList());
}
@Override
@ -118,7 +164,7 @@ public class ElasticSearchService implements EntitySearchService, ElasticSearchI
@Override
public void appendRunId(
@Nonnull OperationContext opContext, @Nonnull Urn urn, @Nullable String runId) {
final String docId = indexBuilders.getIndexConvention().getEntityDocumentId(urn);
final String docId = getIndexConvention().getEntityDocumentId(urn);
log.debug(
"Appending run id for entity name: {}, doc id: {}, run id: {}",
@ -458,6 +504,11 @@ public class ElasticSearchService implements EntitySearchService, ElasticSearchI
@Override
public IndexConvention getIndexConvention() {
return indexBuilders.getIndexConvention();
return indexConvention;
}
@Override
public ESIndexBuilder getIndexBuilder() {
return indexBuilder;
}
}

View File

@ -1,90 +0,0 @@
package com.linkedin.metadata.search.elasticsearch.indexbuilder;
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.shared.ElasticSearchIndexed;
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
import com.linkedin.structured.StructuredPropertyDefinition;
import com.linkedin.util.Pair;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@RequiredArgsConstructor
@Slf4j
public class EntityIndexBuilders implements ElasticSearchIndexed {
private final ESIndexBuilder indexBuilder;
private final EntityRegistry entityRegistry;
@Getter private final IndexConvention indexConvention;
private final SettingsBuilder settingsBuilder;
public ESIndexBuilder getIndexBuilder() {
return indexBuilder;
}
@Override
public void reindexAll(Collection<Pair<Urn, StructuredPropertyDefinition>> properties) {
for (ReindexConfig config : buildReindexConfigs(properties)) {
try {
indexBuilder.buildIndex(config);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
@Override
public List<ReindexConfig> buildReindexConfigs(
Collection<Pair<Urn, StructuredPropertyDefinition>> properties) {
Map<String, Object> settings = settingsBuilder.getSettings();
return entityRegistry.getEntitySpecs().values().stream()
.map(
entitySpec -> {
try {
Map<String, Object> mappings =
MappingsBuilder.getMappings(entityRegistry, entitySpec, properties);
return indexBuilder.buildReindexState(
indexConvention.getIndexName(entitySpec), mappings, settings);
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toList());
}
/**
* Given a structured property generate all entity index configurations impacted by it, preserving
* existing properties
*
* @param property the new property
* @return index configurations impacted by the new property
*/
public List<ReindexConfig> buildReindexConfigsWithNewStructProp(
Urn urn, StructuredPropertyDefinition property) {
Map<String, Object> settings = settingsBuilder.getSettings();
return entityRegistry.getEntitySpecs().values().stream()
.map(
entitySpec -> {
try {
Map<String, Object> mappings =
MappingsBuilder.getMappings(
entityRegistry, entitySpec, List.of(Pair.of(urn, property)));
return indexBuilder.buildReindexState(
indexConvention.getIndexName(entitySpec), mappings, settings, true);
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.filter(Objects::nonNull)
.filter(ReindexConfig::hasNewStructuredProperty)
.collect(Collectors.toList());
}
}

View File

@ -21,8 +21,7 @@ import com.linkedin.metadata.entity.SearchIndicesService;
import com.linkedin.metadata.entity.ebean.batch.MCLItemImpl;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
import com.linkedin.metadata.search.transformer.SearchDocumentTransformer;
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
@ -52,11 +51,10 @@ import lombok.extern.slf4j.Slf4j;
public class UpdateIndicesService implements SearchIndicesService {
@VisibleForTesting @Getter private final UpdateGraphIndicesService updateGraphIndicesService;
private final EntitySearchService entitySearchService;
private final ElasticSearchService elasticSearchService;
private final TimeseriesAspectService timeseriesAspectService;
private final SystemMetadataService systemMetadataService;
private final SearchDocumentTransformer searchDocumentTransformer;
private final EntityIndexBuilders entityIndexBuilders;
@Nonnull private final String idHashAlgo;
@Getter private final boolean searchDiffMode;
@ -78,19 +76,17 @@ public class UpdateIndicesService implements SearchIndicesService {
public UpdateIndicesService(
UpdateGraphIndicesService updateGraphIndicesService,
EntitySearchService entitySearchService,
ElasticSearchService elasticSearchService,
TimeseriesAspectService timeseriesAspectService,
SystemMetadataService systemMetadataService,
SearchDocumentTransformer searchDocumentTransformer,
EntityIndexBuilders entityIndexBuilders,
@Nonnull String idHashAlgo) {
this(
updateGraphIndicesService,
entitySearchService,
elasticSearchService,
timeseriesAspectService,
systemMetadataService,
searchDocumentTransformer,
entityIndexBuilders,
idHashAlgo,
true,
true,
@ -99,21 +95,19 @@ public class UpdateIndicesService implements SearchIndicesService {
public UpdateIndicesService(
UpdateGraphIndicesService updateGraphIndicesService,
EntitySearchService entitySearchService,
ElasticSearchService elasticSearchService,
TimeseriesAspectService timeseriesAspectService,
SystemMetadataService systemMetadataService,
SearchDocumentTransformer searchDocumentTransformer,
EntityIndexBuilders entityIndexBuilders,
@Nonnull String idHashAlgo,
boolean searchDiffMode,
boolean structuredPropertiesHookEnabled,
boolean structuredPropertiesWriteEnabled) {
this.updateGraphIndicesService = updateGraphIndicesService;
this.entitySearchService = entitySearchService;
this.elasticSearchService = elasticSearchService;
this.timeseriesAspectService = timeseriesAspectService;
this.systemMetadataService = systemMetadataService;
this.searchDocumentTransformer = searchDocumentTransformer;
this.entityIndexBuilders = entityIndexBuilders;
this.idHashAlgo = idHashAlgo;
this.searchDiffMode = searchDiffMode;
this.structuredPropertiesHookEnabled = structuredPropertiesHookEnabled;
@ -229,7 +223,7 @@ public class UpdateIndicesService implements SearchIndicesService {
newDefinition.getEntityTypes().removeAll(oldEntityTypes);
if (newDefinition.getEntityTypes().size() > 0) {
entityIndexBuilders
elasticSearchService
.buildReindexConfigsWithNewStructProp(urn, newDefinition)
.forEach(
reindexState -> {
@ -238,7 +232,7 @@ public class UpdateIndicesService implements SearchIndicesService {
"Applying new structured property {} to index {}",
newDefinition,
reindexState.name());
entityIndexBuilders.getIndexBuilder().applyMappings(reindexState, false);
elasticSearchService.getIndexBuilder().applyMappings(reindexState, false);
} catch (IOException e) {
throw new RuntimeException(e);
}
@ -348,7 +342,7 @@ public class UpdateIndicesService implements SearchIndicesService {
return;
}
final String docId = entityIndexBuilders.getIndexConvention().getEntityDocumentId(urn);
final String docId = elasticSearchService.getIndexConvention().getEntityDocumentId(urn);
if (searchDiffMode
&& (systemMetadata == null
@ -387,7 +381,7 @@ public class UpdateIndicesService implements SearchIndicesService {
searchDocument.get(), previousSearchDocument.orElse(null))
.toString();
entitySearchService.upsertDocument(opContext, entityName, finalDocument, docId);
elasticSearchService.upsertDocument(opContext, entityName, finalDocument, docId);
}
/** Process snapshot and update time-series index */
@ -458,7 +452,7 @@ public class UpdateIndicesService implements SearchIndicesService {
}
if (isKeyAspect) {
entitySearchService.deleteDocument(opContext, entityName, docId);
elasticSearchService.deleteDocument(opContext, entityName, docId);
return;
}
@ -478,6 +472,6 @@ public class UpdateIndicesService implements SearchIndicesService {
return;
}
entitySearchService.upsertDocument(opContext, entityName, searchDocument.get(), docId);
elasticSearchService.upsertDocument(opContext, entityName, searchDocument.get(), docId);
}
}

View File

@ -1,6 +1,7 @@
package com.linkedin.metadata.shared;
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ReindexConfig;
import com.linkedin.structured.StructuredPropertyDefinition;
import com.linkedin.util.Pair;
@ -24,4 +25,6 @@ public interface ElasticSearchIndexed {
*/
void reindexAll(Collection<Pair<Urn, StructuredPropertyDefinition>> properties)
throws IOException;
ESIndexBuilder getIndexBuilder();
}

View File

@ -106,6 +106,11 @@ public class ElasticSearchSystemMetadataService
}
}
@Override
public ESIndexBuilder getIndexBuilder() {
return _indexBuilder;
}
@Override
public void deleteAspect(String urn, String aspect) {
_esDAO.deleteByUrnAspect(urn, aspect);

View File

@ -16,10 +16,12 @@ import com.linkedin.metadata.config.TimeseriesAspectServiceConfig;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.annotation.SearchableAnnotation;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.query.filter.Condition;
import com.linkedin.metadata.query.filter.Criterion;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.query.filter.SortCriterion;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ReindexConfig;
import com.linkedin.metadata.search.elasticsearch.query.filter.QueryFilterRewriteChain;
import com.linkedin.metadata.search.elasticsearch.query.request.SearchAfterWrapper;
@ -32,8 +34,8 @@ import com.linkedin.metadata.timeseries.GenericTimeseriesDocument;
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
import com.linkedin.metadata.timeseries.TimeseriesScrollResult;
import com.linkedin.metadata.timeseries.elastic.indexbuilder.MappingsBuilder;
import com.linkedin.metadata.timeseries.elastic.indexbuilder.TimeseriesAspectIndexBuilders;
import com.linkedin.metadata.timeseries.elastic.query.ESAggregatedStatsDAO;
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.mxe.GenericAspect;
import com.linkedin.mxe.SystemMetadata;
@ -47,14 +49,7 @@ import com.linkedin.util.Pair;
import io.datahubproject.metadata.context.OperationContext;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@ -105,20 +100,23 @@ public class ElasticSearchTimeseriesAspectService
private final ESBulkProcessor bulkProcessor;
private final int numRetries;
private final TimeseriesAspectIndexBuilders indexBuilders;
private final RestHighLevelClient searchClient;
private final ESAggregatedStatsDAO esAggregatedStatsDAO;
private final QueryFilterRewriteChain queryFilterRewriteChain;
private final ExecutorService queryPool;
@Nonnull private final EntityRegistry entityRegistry;
@Nonnull private final IndexConvention indexConvention;
@Nonnull private final ESIndexBuilder indexBuilder;
public ElasticSearchTimeseriesAspectService(
@Nonnull RestHighLevelClient searchClient,
@Nonnull TimeseriesAspectIndexBuilders indexBuilders,
@Nonnull ESBulkProcessor bulkProcessor,
int numRetries,
@Nonnull QueryFilterRewriteChain queryFilterRewriteChain,
@Nonnull TimeseriesAspectServiceConfig timeseriesAspectServiceConfig) {
this.indexBuilders = indexBuilders;
@Nonnull TimeseriesAspectServiceConfig timeseriesAspectServiceConfig,
@Nonnull EntityRegistry entityRegistry,
@Nonnull IndexConvention indexConvention,
@Nonnull ESIndexBuilder indexBuilder) {
this.searchClient = searchClient;
this.bulkProcessor = bulkProcessor;
this.numRetries = numRetries;
@ -132,6 +130,9 @@ public class ElasticSearchTimeseriesAspectService
new ArrayBlockingQueue<>(
timeseriesAspectServiceConfig.getQuery().getQueueSize()), // fixed size queue
new ThreadPoolExecutor.CallerRunsPolicy());
this.entityRegistry = entityRegistry;
this.indexConvention = indexConvention;
this.indexBuilder = indexBuilder;
esAggregatedStatsDAO = new ESAggregatedStatsDAO(searchClient, queryFilterRewriteChain);
}
@ -225,19 +226,75 @@ public class ElasticSearchTimeseriesAspectService
@Override
public List<ReindexConfig> buildReindexConfigs(
Collection<Pair<Urn, StructuredPropertyDefinition>> properties) throws IOException {
return indexBuilders.buildReindexConfigs(properties);
Collection<Pair<Urn, StructuredPropertyDefinition>> properties) {
return entityRegistry.getEntitySpecs().values().stream()
.flatMap(
entitySpec ->
entitySpec.getAspectSpecs().stream()
.map(aspectSpec -> Pair.of(entitySpec, aspectSpec)))
.filter(pair -> pair.getSecond().isTimeseries())
.map(
pair -> {
try {
return indexBuilder.buildReindexState(
indexConvention.getTimeseriesAspectIndexName(
pair.getFirst().getName(), pair.getSecond().getName()),
MappingsBuilder.getMappings(pair.getSecond()),
Collections.emptyMap());
} catch (IOException e) {
log.error(
"Issue while building timeseries field index for entity {} aspect {}",
pair.getFirst().getName(),
pair.getSecond().getName());
throw new RuntimeException(e);
}
})
.collect(Collectors.toList());
}
public String reindexAsync(
String index, @Nullable QueryBuilder filterQuery, BatchWriteOperationsOptions options)
throws Exception {
return indexBuilders.reindexAsync(index, filterQuery, options);
Optional<Pair<String, String>> entityAndAspect = indexConvention.getEntityAndAspectName(index);
if (entityAndAspect.isEmpty()) {
throw new IllegalArgumentException("Could not extract entity and aspect from index " + index);
}
String entityName = entityAndAspect.get().getFirst();
String aspectName = entityAndAspect.get().getSecond();
EntitySpec entitySpec = entityRegistry.getEntitySpec(entityName);
for (String aspect : entitySpec.getAspectSpecMap().keySet()) {
if (aspect.toLowerCase().equals(aspectName)) {
aspectName = aspect;
break;
}
}
if (!entitySpec.hasAspect(aspectName)) {
throw new IllegalArgumentException(
String.format("Could not find aspect %s of entity %s", aspectName, entityName));
}
ReindexConfig config =
indexBuilder.buildReindexState(
index,
MappingsBuilder.getMappings(
entityRegistry.getEntitySpec(entityName).getAspectSpec(aspectName)),
Collections.emptyMap());
return indexBuilder.reindexInPlaceAsync(index, filterQuery, options, config);
}
@Override
public void reindexAll(Collection<Pair<Urn, StructuredPropertyDefinition>> properties) {
indexBuilders.reindexAll(properties);
for (ReindexConfig config : buildReindexConfigs(properties)) {
try {
indexBuilder.buildIndex(config);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
@Override
public ESIndexBuilder getIndexBuilder() {
return indexBuilder;
}
@Override

View File

@ -1,99 +0,0 @@
package com.linkedin.metadata.timeseries.elastic.indexbuilder;
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ReindexConfig;
import com.linkedin.metadata.shared.ElasticSearchIndexed;
import com.linkedin.metadata.timeseries.BatchWriteOperationsOptions;
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
import com.linkedin.structured.StructuredPropertyDefinition;
import com.linkedin.util.Pair;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.index.query.QueryBuilder;
@Slf4j
@RequiredArgsConstructor
public class TimeseriesAspectIndexBuilders implements ElasticSearchIndexed {
@Nonnull private final ESIndexBuilder indexBuilder;
@Nonnull private final EntityRegistry entityRegistry;
@Nonnull private final IndexConvention indexConvention;
@Override
public void reindexAll(Collection<Pair<Urn, StructuredPropertyDefinition>> properties) {
for (ReindexConfig config : buildReindexConfigs(properties)) {
try {
indexBuilder.buildIndex(config);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
public String reindexAsync(
String index, @Nullable QueryBuilder filterQuery, BatchWriteOperationsOptions options)
throws Exception {
Optional<Pair<String, String>> entityAndAspect = indexConvention.getEntityAndAspectName(index);
if (entityAndAspect.isEmpty()) {
throw new IllegalArgumentException("Could not extract entity and aspect from index " + index);
}
String entityName = entityAndAspect.get().getFirst();
String aspectName = entityAndAspect.get().getSecond();
EntitySpec entitySpec = entityRegistry.getEntitySpec(entityName);
for (String aspect : entitySpec.getAspectSpecMap().keySet()) {
if (aspect.toLowerCase().equals(aspectName)) {
aspectName = aspect;
break;
}
}
if (!entitySpec.hasAspect(aspectName)) {
throw new IllegalArgumentException(
String.format("Could not find aspect %s of entity %s", aspectName, entityName));
}
ReindexConfig config =
indexBuilder.buildReindexState(
index,
MappingsBuilder.getMappings(
entityRegistry.getEntitySpec(entityName).getAspectSpec(aspectName)),
Collections.emptyMap());
return indexBuilder.reindexInPlaceAsync(index, filterQuery, options, config);
}
@Override
public List<ReindexConfig> buildReindexConfigs(
Collection<Pair<Urn, StructuredPropertyDefinition>> properties) {
return entityRegistry.getEntitySpecs().values().stream()
.flatMap(
entitySpec ->
entitySpec.getAspectSpecs().stream()
.map(aspectSpec -> Pair.of(entitySpec, aspectSpec)))
.filter(pair -> pair.getSecond().isTimeseries())
.map(
pair -> {
try {
return indexBuilder.buildReindexState(
indexConvention.getTimeseriesAspectIndexName(
pair.getFirst().getName(), pair.getSecond().getName()),
MappingsBuilder.getMappings(pair.getSecond()),
Collections.emptyMap());
} catch (IOException e) {
log.error(
"Issue while building timeseries field index for entity {} aspect {}",
pair.getFirst().getName(),
pair.getSecond().getName());
throw new RuntimeException(e);
}
})
.collect(Collectors.toList());
}
}

View File

@ -12,7 +12,6 @@ import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.SettingsBuilder;
import com.linkedin.metadata.search.elasticsearch.query.ESBrowseDAO;
import com.linkedin.metadata.search.elasticsearch.query.ESSearchDAO;
@ -46,15 +45,15 @@ public class ElasticSearchServiceTest {
@BeforeMethod
public void setup() {
MockitoAnnotations.openMocks(this);
EntityIndexBuilders indexBuilders =
new EntityIndexBuilders(
testInstance =
new ElasticSearchService(
mock(ESIndexBuilder.class),
opContext.getEntityRegistry(),
opContext.getSearchContext().getIndexConvention(),
mock(SettingsBuilder.class));
testInstance =
new ElasticSearchService(
indexBuilders, mock(ESSearchDAO.class), mock(ESBrowseDAO.class), mockEsWriteDAO);
mock(SettingsBuilder.class),
mock(ESSearchDAO.class),
mock(ESBrowseDAO.class),
mockEsWriteDAO);
}
@Test
@ -129,19 +128,16 @@ public class ElasticSearchServiceTest {
@Test
public void testRaw_WithValidUrns() {
// Mock dependencies
ESSearchDAO mockEsSearchDAO = mock(ESSearchDAO.class);
EntityIndexBuilders indexBuilders =
new EntityIndexBuilders(
testInstance =
new ElasticSearchService(
mock(ESIndexBuilder.class),
opContext.getEntityRegistry(),
opContext.getSearchContext().getIndexConvention(),
mock(SettingsBuilder.class));
// Create test instance with mocked ESSearchDAO
testInstance =
new ElasticSearchService(
indexBuilders, mockEsSearchDAO, mock(ESBrowseDAO.class), mockEsWriteDAO);
mock(SettingsBuilder.class),
mockEsSearchDAO,
mock(ESBrowseDAO.class),
mockEsWriteDAO);
// Create test data
Urn urn1 = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:snowflake,test_dataset1,PROD)");
@ -193,19 +189,16 @@ public class ElasticSearchServiceTest {
@Test
public void testRaw_WithEmptyHits() {
// Mock dependencies
ESSearchDAO mockEsSearchDAO = mock(ESSearchDAO.class);
EntityIndexBuilders indexBuilders =
new EntityIndexBuilders(
testInstance =
new ElasticSearchService(
mock(ESIndexBuilder.class),
opContext.getEntityRegistry(),
opContext.getSearchContext().getIndexConvention(),
mock(SettingsBuilder.class));
// Create test instance with mocked ESSearchDAO
testInstance =
new ElasticSearchService(
indexBuilders, mockEsSearchDAO, mock(ESBrowseDAO.class), mockEsWriteDAO);
mock(SettingsBuilder.class),
mockEsSearchDAO,
mock(ESBrowseDAO.class),
mockEsWriteDAO);
// Create test data
Urn urn1 = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:snowflake,test_dataset1,PROD)");
@ -253,20 +246,16 @@ public class ElasticSearchServiceTest {
@Test
public void testRaw_WithNullHits() {
// Mock dependencies
ESSearchDAO mockEsSearchDAO = mock(ESSearchDAO.class);
EntityIndexBuilders indexBuilders =
new EntityIndexBuilders(
testInstance =
new ElasticSearchService(
mock(ESIndexBuilder.class),
opContext.getEntityRegistry(),
opContext.getSearchContext().getIndexConvention(),
mock(SettingsBuilder.class));
// Create test instance with mocked ESSearchDAO
testInstance =
new ElasticSearchService(
indexBuilders, mockEsSearchDAO, mock(ESBrowseDAO.class), mockEsWriteDAO);
mock(SettingsBuilder.class),
mockEsSearchDAO,
mock(ESBrowseDAO.class),
mockEsWriteDAO);
// Create test data
Urn urn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:snowflake,test_dataset1,PROD)");
Set<Urn> urns = Set.of(urn);
@ -296,20 +285,16 @@ public class ElasticSearchServiceTest {
@Test
public void testRaw_WithEmptyUrns() {
// Mock dependencies
ESSearchDAO mockEsSearchDAO = mock(ESSearchDAO.class);
EntityIndexBuilders indexBuilders =
new EntityIndexBuilders(
testInstance =
new ElasticSearchService(
mock(ESIndexBuilder.class),
opContext.getEntityRegistry(),
opContext.getSearchContext().getIndexConvention(),
mock(SettingsBuilder.class));
// Create test instance with mocked ESSearchDAO
testInstance =
new ElasticSearchService(
indexBuilders, mockEsSearchDAO, mock(ESBrowseDAO.class), mockEsWriteDAO);
mock(SettingsBuilder.class),
mockEsSearchDAO,
mock(ESBrowseDAO.class),
mockEsWriteDAO);
// Create empty set of URNs
Set<Urn> emptyUrns = Collections.emptySet();

View File

@ -52,7 +52,6 @@ import com.linkedin.metadata.search.cache.EntityDocCountCache;
import com.linkedin.metadata.search.client.CachingEntitySearchService;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.SettingsBuilder;
import com.linkedin.metadata.search.elasticsearch.query.ESBrowseDAO;
import com.linkedin.metadata.search.elasticsearch.query.ESSearchDAO;
@ -193,12 +192,6 @@ public abstract class LineageServiceTestBase extends AbstractTestNGSpringContext
@Nonnull
private ElasticSearchService buildEntitySearchService() {
EntityIndexBuilders indexBuilders =
new EntityIndexBuilders(
getIndexBuilder(),
operationContext.getEntityRegistry(),
operationContext.getSearchContext().getIndexConvention(),
settingsBuilder);
searchClientSpy = spy(getSearchClient());
ESSearchDAO searchDAO =
new ESSearchDAO(
@ -212,7 +205,16 @@ public abstract class LineageServiceTestBase extends AbstractTestNGSpringContext
new ESBrowseDAO(
searchClientSpy, getSearchConfiguration(), null, QueryFilterRewriteChain.EMPTY);
ESWriteDAO writeDAO = new ESWriteDAO(searchClientSpy, getBulkProcessor(), 1);
return new ElasticSearchService(indexBuilders, searchDAO, browseDAO, writeDAO);
ElasticSearchService searchService =
new ElasticSearchService(
getIndexBuilder(),
operationContext.getEntityRegistry(),
operationContext.getSearchContext().getIndexConvention(),
settingsBuilder,
searchDAO,
browseDAO,
writeDAO);
return searchService;
}
private void clearCache(boolean withLightingCache) {

View File

@ -26,7 +26,6 @@ import com.linkedin.metadata.search.cache.EntityDocCountCache;
import com.linkedin.metadata.search.client.CachingEntitySearchService;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.SettingsBuilder;
import com.linkedin.metadata.search.elasticsearch.query.ESBrowseDAO;
import com.linkedin.metadata.search.elasticsearch.query.ESSearchDAO;
@ -119,12 +118,6 @@ public abstract class SearchServiceTestBase extends AbstractTestNGSpringContextT
@Nonnull
private ElasticSearchService buildEntitySearchService() {
EntityIndexBuilders indexBuilders =
new EntityIndexBuilders(
getIndexBuilder(),
operationContext.getEntityRegistry(),
operationContext.getSearchContext().getIndexConvention(),
settingsBuilder);
ESSearchDAO searchDAO =
new ESSearchDAO(
getSearchClient(),
@ -137,7 +130,16 @@ public abstract class SearchServiceTestBase extends AbstractTestNGSpringContextT
new ESBrowseDAO(
getSearchClient(), getSearchConfiguration(), null, QueryFilterRewriteChain.EMPTY);
ESWriteDAO writeDAO = new ESWriteDAO(getSearchClient(), getBulkProcessor(), 1);
return new ElasticSearchService(indexBuilders, searchDAO, browseDAO, writeDAO);
ElasticSearchService searchService =
new ElasticSearchService(
getIndexBuilder(),
operationContext.getEntityRegistry(),
operationContext.getSearchContext().getIndexConvention(),
settingsBuilder,
searchDAO,
browseDAO,
writeDAO);
return searchService;
}
private void clearCache() {

View File

@ -17,7 +17,6 @@ import com.linkedin.metadata.config.search.SearchConfiguration;
import com.linkedin.metadata.models.registry.SnapshotEntityRegistry;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.SettingsBuilder;
import com.linkedin.metadata.search.elasticsearch.query.ESBrowseDAO;
import com.linkedin.metadata.search.elasticsearch.query.ESSearchDAO;
@ -85,12 +84,6 @@ public abstract class TestEntityTestBase extends AbstractTestNGSpringContextTest
@Nonnull
private ElasticSearchService buildService() {
EntityIndexBuilders indexBuilders =
new EntityIndexBuilders(
getIndexBuilder(),
opContext.getEntityRegistry(),
opContext.getSearchContext().getIndexConvention(),
settingsBuilder);
ESSearchDAO searchDAO =
new ESSearchDAO(
getSearchClient(),
@ -104,7 +97,14 @@ public abstract class TestEntityTestBase extends AbstractTestNGSpringContextTest
getSearchClient(), getSearchConfiguration(), null, QueryFilterRewriteChain.EMPTY);
ESWriteDAO writeDAO = new ESWriteDAO(getSearchClient(), getBulkProcessor(), 1);
ElasticSearchService searchService =
new ElasticSearchService(indexBuilders, searchDAO, browseDAO, writeDAO);
new ElasticSearchService(
getIndexBuilder(),
opContext.getEntityRegistry(),
opContext.getSearchContext().getIndexConvention(),
settingsBuilder,
searchDAO,
browseDAO,
writeDAO);
return searchService;
}

View File

@ -12,8 +12,7 @@ import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
import com.linkedin.metadata.search.transformer.SearchDocumentTransformer;
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
@ -30,11 +29,10 @@ import org.testng.annotations.Test;
public class UpdateIndicesServiceTest {
@Mock private UpdateGraphIndicesService updateGraphIndicesService;
@Mock private EntitySearchService entitySearchService;
@Mock private ElasticSearchService entitySearchService;
@Mock private TimeseriesAspectService timeseriesAspectService;
@Mock private SystemMetadataService systemMetadataService;
@Mock private SearchDocumentTransformer searchDocumentTransformer;
@Mock private EntityIndexBuilders entityIndexBuilders;
private OperationContext operationContext;
private UpdateIndicesService updateIndicesService;
@ -50,7 +48,6 @@ public class UpdateIndicesServiceTest {
timeseriesAspectService,
systemMetadataService,
searchDocumentTransformer,
entityIndexBuilders,
"MD5");
}

View File

@ -43,7 +43,6 @@ import com.linkedin.metadata.search.elasticsearch.query.filter.QueryFilterRewrit
import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor;
import com.linkedin.metadata.search.utils.QueryUtils;
import com.linkedin.metadata.timeseries.elastic.ElasticSearchTimeseriesAspectService;
import com.linkedin.metadata.timeseries.elastic.indexbuilder.TimeseriesAspectIndexBuilders;
import com.linkedin.metadata.timeseries.transformer.TimeseriesAspectTransformer;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl;
@ -146,14 +145,13 @@ public abstract class TimeseriesAspectServiceTestBase extends AbstractTestNGSpri
private ElasticSearchTimeseriesAspectService buildService() {
return new ElasticSearchTimeseriesAspectService(
getSearchClient(),
new TimeseriesAspectIndexBuilders(
getIndexBuilder(),
opContext.getEntityRegistry(),
opContext.getSearchContext().getIndexConvention()),
getBulkProcessor(),
1,
QueryFilterRewriteChain.EMPTY,
TimeseriesAspectServiceConfig.builder().build());
TimeseriesAspectServiceConfig.builder().build(),
opContext.getEntityRegistry(),
opContext.getSearchContext().getIndexConvention(),
getIndexBuilder());
}
/*

View File

@ -8,15 +8,16 @@ import com.fasterxml.jackson.databind.node.NumericNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.metadata.config.TimeseriesAspectServiceConfig;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.query.filter.SortCriterion;
import com.linkedin.metadata.query.filter.SortOrder;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder;
import com.linkedin.metadata.search.elasticsearch.query.filter.QueryFilterRewriteChain;
import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor;
import com.linkedin.metadata.search.utils.QueryUtils;
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
import com.linkedin.metadata.timeseries.elastic.ElasticSearchTimeseriesAspectService;
import com.linkedin.metadata.timeseries.elastic.indexbuilder.TimeseriesAspectIndexBuilders;
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
import com.linkedin.timeseries.TimeseriesIndexSizeResult;
import io.datahubproject.metadata.context.OperationContext;
@ -43,18 +44,20 @@ public class TimeseriesAspectServiceUnitTest {
private final RestHighLevelClient searchClient = mock(RestHighLevelClient.class);
private final IndexConvention indexConvention = mock(IndexConvention.class);
private final TimeseriesAspectIndexBuilders timeseriesAspectIndexBuilders =
mock(TimeseriesAspectIndexBuilders.class);
private final ESBulkProcessor bulkProcessor = mock(ESBulkProcessor.class);
private final RestClient restClient = mock(RestClient.class);
private final EntityRegistry entityRegistry = mock(EntityRegistry.class);
private final ESIndexBuilder indexBuilder = mock(ESIndexBuilder.class);
private final TimeseriesAspectService _timeseriesAspectService =
new ElasticSearchTimeseriesAspectService(
searchClient,
timeseriesAspectIndexBuilders,
bulkProcessor,
0,
QueryFilterRewriteChain.EMPTY,
TimeseriesAspectServiceConfig.builder().build());
TimeseriesAspectServiceConfig.builder().build(),
entityRegistry,
indexConvention,
indexBuilder);
private final OperationContext opContext =
TestOperationContexts.systemContextNoSearchAuthorization(indexConvention);
@ -62,7 +65,7 @@ public class TimeseriesAspectServiceUnitTest {
@BeforeMethod
public void resetMocks() {
reset(searchClient, indexConvention, timeseriesAspectIndexBuilders, bulkProcessor, restClient);
reset(searchClient, indexConvention, bulkProcessor, restClient, entityRegistry, indexBuilder);
}
@Test

View File

@ -25,7 +25,6 @@ import com.linkedin.metadata.search.cache.EntityDocCountCache;
import com.linkedin.metadata.search.client.CachingEntitySearchService;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.SettingsBuilder;
import com.linkedin.metadata.search.elasticsearch.query.ESBrowseDAO;
import com.linkedin.metadata.search.elasticsearch.query.ESSearchDAO;
@ -118,18 +117,6 @@ public class SampleDataFixtureConfiguration {
return "long_tail";
}
@Bean(name = "sampleDataEntityIndexBuilders")
protected EntityIndexBuilders entityIndexBuilders(
@Qualifier("sampleDataOperationContext") OperationContext opContext) {
return entityIndexBuildersHelper(opContext);
}
@Bean(name = "longTailEntityIndexBuilders")
protected EntityIndexBuilders longTailEntityIndexBuilders(
@Qualifier("longTailOperationContext") OperationContext opContext) {
return entityIndexBuildersHelper(opContext);
}
@Bean(name = "sampleDataOperationContext")
protected OperationContext sampleDataOperationContext(
@Qualifier("sampleDataIndexConvention") IndexConvention indexConvention) {
@ -152,7 +139,8 @@ public class SampleDataFixtureConfiguration {
.build(testOpContext.getSessionAuthentication(), true);
}
protected EntityIndexBuilders entityIndexBuildersHelper(OperationContext opContext) {
protected ElasticSearchService entitySearchServiceHelper(OperationContext opContext)
throws IOException {
GitVersion gitVersion = new GitVersion("0.0.0-test", "123456", Optional.empty());
ESIndexBuilder indexBuilder =
new ESIndexBuilder(
@ -170,30 +158,6 @@ public class SampleDataFixtureConfiguration {
IndexConfiguration indexConfiguration = new IndexConfiguration();
indexConfiguration.setMinSearchFilterLength(3);
SettingsBuilder settingsBuilder = new SettingsBuilder(null, indexConfiguration);
return new EntityIndexBuilders(
indexBuilder,
opContext.getEntityRegistry(),
opContext.getSearchContext().getIndexConvention(),
settingsBuilder);
}
@Bean(name = "sampleDataEntitySearchService")
protected ElasticSearchService entitySearchService(
@Qualifier("sampleDataEntityIndexBuilders") EntityIndexBuilders indexBuilders)
throws IOException {
return entitySearchServiceHelper(indexBuilders);
}
@Bean(name = "longTailEntitySearchService")
protected ElasticSearchService longTailEntitySearchService(
@Qualifier("longTailEntityIndexBuilders") EntityIndexBuilders longTaiIndexBuilders)
throws IOException {
return entitySearchServiceHelper(longTaiIndexBuilders);
}
protected ElasticSearchService entitySearchServiceHelper(EntityIndexBuilders indexBuilders)
throws IOException {
ESSearchDAO searchDAO =
new ESSearchDAO(
_searchClient,
@ -210,7 +174,27 @@ public class SampleDataFixtureConfiguration {
_customSearchConfiguration,
queryFilterRewriteChain);
ESWriteDAO writeDAO = new ESWriteDAO(_searchClient, _bulkProcessor, 1);
return new ElasticSearchService(indexBuilders, searchDAO, browseDAO, writeDAO);
return new ElasticSearchService(
indexBuilder,
opContext.getEntityRegistry(),
opContext.getSearchContext().getIndexConvention(),
settingsBuilder,
searchDAO,
browseDAO,
writeDAO);
}
@Bean(name = "sampleDataEntitySearchService")
protected ElasticSearchService entitySearchService(
@Qualifier("sampleDataOperationContext") OperationContext opContext) throws IOException {
return entitySearchServiceHelper(opContext);
}
@Bean(name = "longTailEntitySearchService")
protected ElasticSearchService longTailEntitySearchService(
@Qualifier("longTailOperationContext") OperationContext longtailOperationContext)
throws IOException {
return entitySearchServiceHelper(longtailOperationContext);
}
@Bean(name = "sampleDataSearchService")
@ -218,16 +202,11 @@ public class SampleDataFixtureConfiguration {
protected SearchService searchService(
@Qualifier("sampleDataOperationContext") OperationContext sampleDataOperationContext,
@Qualifier("sampleDataEntitySearchService") ElasticSearchService entitySearchService,
@Qualifier("sampleDataEntityIndexBuilders") EntityIndexBuilders indexBuilders,
@Qualifier("sampleDataPrefix") String prefix,
@Qualifier("sampleDataFixtureName") String sampleDataFixtureName)
throws IOException {
return searchServiceHelper(
sampleDataOperationContext,
entitySearchService,
indexBuilders,
prefix,
sampleDataFixtureName);
sampleDataOperationContext, entitySearchService, prefix, sampleDataFixtureName);
}
@Bean(name = "longTailSearchService")
@ -235,22 +214,16 @@ public class SampleDataFixtureConfiguration {
protected SearchService longTailSearchService(
@Qualifier("longTailOperationContext") OperationContext longtailOperationContext,
@Qualifier("longTailEntitySearchService") ElasticSearchService longTailEntitySearchService,
@Qualifier("longTailEntityIndexBuilders") EntityIndexBuilders longTailIndexBuilders,
@Qualifier("longTailPrefix") String longTailPrefix,
@Qualifier("longTailFixtureName") String longTailFixtureName)
throws IOException {
return searchServiceHelper(
longtailOperationContext,
longTailEntitySearchService,
longTailIndexBuilders,
longTailPrefix,
longTailFixtureName);
longtailOperationContext, longTailEntitySearchService, longTailPrefix, longTailFixtureName);
}
public SearchService searchServiceHelper(
OperationContext opContext,
ElasticSearchService entitySearchService,
EntityIndexBuilders indexBuilders,
String prefix,
String fixtureName)
throws IOException {
@ -271,7 +244,7 @@ public class SampleDataFixtureConfiguration {
ranker);
// Build indices & write fixture data
indexBuilders.reindexAll(Collections.emptySet());
entitySearchService.reindexAll(Collections.emptySet());
FixtureReader.builder()
.bulkProcessor(_bulkProcessor)

View File

@ -28,7 +28,6 @@ import com.linkedin.metadata.search.cache.EntityDocCountCache;
import com.linkedin.metadata.search.client.CachingEntitySearchService;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.SettingsBuilder;
import com.linkedin.metadata.search.elasticsearch.query.ESBrowseDAO;
import com.linkedin.metadata.search.elasticsearch.query.ESSearchDAO;
@ -110,9 +109,10 @@ public class SearchLineageFixtureConfiguration {
return conf;
}
@Bean(name = "searchLineageEntityIndexBuilders")
protected EntityIndexBuilders entityIndexBuilders(
@Qualifier("searchLineageOperationContext") OperationContext opContext) {
@Bean(name = "searchLineageEntitySearchService")
protected ElasticSearchService entitySearchService(
@Qualifier("searchLineageOperationContext") OperationContext opContext,
final QueryFilterRewriteChain queryFilterRewriteChain) {
GitVersion gitVersion = new GitVersion("0.0.0-test", "123456", Optional.empty());
ESIndexBuilder indexBuilder =
new ESIndexBuilder(
@ -130,17 +130,6 @@ public class SearchLineageFixtureConfiguration {
IndexConfiguration indexConfiguration = new IndexConfiguration();
indexConfiguration.setMinSearchFilterLength(3);
SettingsBuilder settingsBuilder = new SettingsBuilder(null, indexConfiguration);
return new EntityIndexBuilders(
indexBuilder,
opContext.getEntityRegistry(),
opContext.getSearchContext().getIndexConvention(),
settingsBuilder);
}
@Bean(name = "searchLineageEntitySearchService")
protected ElasticSearchService entitySearchService(
@Qualifier("searchLineageEntityIndexBuilders") EntityIndexBuilders indexBuilders,
final QueryFilterRewriteChain queryFilterRewriteChain) {
ESSearchDAO searchDAO =
new ESSearchDAO(
searchClient,
@ -154,7 +143,14 @@ public class SearchLineageFixtureConfiguration {
searchClient, searchConfiguration, customSearchConfiguration, queryFilterRewriteChain);
ESWriteDAO writeDAO = new ESWriteDAO(searchClient, bulkProcessor, 1);
return new ElasticSearchService(indexBuilders, searchDAO, browseDAO, writeDAO);
return new ElasticSearchService(
indexBuilder,
opContext.getEntityRegistry(),
opContext.getSearchContext().getIndexConvention(),
settingsBuilder,
searchDAO,
browseDAO,
writeDAO);
}
@Bean(name = "searchLineageOperationContext")
@ -233,8 +229,7 @@ public class SearchLineageFixtureConfiguration {
@Nonnull
protected SearchService searchService(
@Qualifier("searchLineageOperationContext") OperationContext opContext,
@Qualifier("searchLineageEntitySearchService") ElasticSearchService entitySearchService,
@Qualifier("searchLineageEntityIndexBuilders") EntityIndexBuilders indexBuilders)
@Qualifier("searchLineageEntitySearchService") ElasticSearchService entitySearchService)
throws IOException {
int batchSize = 100;
@ -254,7 +249,7 @@ public class SearchLineageFixtureConfiguration {
ranker);
// Build indices
indexBuilders.reindexAll(Collections.emptySet());
entitySearchService.reindexAll(Collections.emptySet());
return service;
}

View File

@ -6,7 +6,7 @@ import com.linkedin.metadata.entity.EntityServiceImpl;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
import com.linkedin.metadata.systemmetadata.ElasticSearchSystemMetadataService;
import io.datahubproject.metadata.services.RestrictedService;
import io.datahubproject.metadata.services.SecretService;
@ -37,5 +37,5 @@ public class MaeConsumerApplicationTestConfiguration {
@MockBean private ConfigEntityRegistry _configEntityRegistry;
@MockBean public EntityIndexBuilders entityIndexBuilders;
@MockBean public ElasticSearchService elasticSearchService;
}

View File

@ -43,8 +43,7 @@ import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.query.filter.RelationshipDirection;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
import com.linkedin.metadata.search.transformer.SearchDocumentTransformer;
import com.linkedin.metadata.service.UpdateGraphIndicesService;
import com.linkedin.metadata.service.UpdateIndicesService;
@ -92,13 +91,12 @@ public class UpdateIndicesHookTest {
static final long LAST_OBSERVED_3 = 789L;
private UpdateIndicesHook updateIndicesHook;
private GraphService mockGraphService;
private EntitySearchService mockEntitySearchService;
private ElasticSearchService mockEntitySearchService;
private TimeseriesAspectService mockTimeseriesAspectService;
private SystemMetadataService mockSystemMetadataService;
private SearchDocumentTransformer searchDocumentTransformer;
private DataHubUpgradeKafkaListener mockDataHubUpgradeKafkaListener;
private ConfigurationProvider mockConfigurationProvider;
private EntityIndexBuilders mockEntityIndexBuilders;
private Urn actorUrn;
private UpdateIndicesService updateIndicesService;
private UpdateIndicesHook reprocessUIHook;
@ -111,15 +109,14 @@ public class UpdateIndicesHookTest {
public void setupTest() {
actorUrn = UrnUtils.getUrn(TEST_ACTOR_URN);
mockGraphService = mock(ElasticSearchGraphService.class);
mockEntitySearchService = mock(EntitySearchService.class);
mockEntitySearchService = mock(ElasticSearchService.class);
mockTimeseriesAspectService = mock(TimeseriesAspectService.class);
mockSystemMetadataService = mock(SystemMetadataService.class);
searchDocumentTransformer = new SearchDocumentTransformer(1000, 1000, 1000);
mockDataHubUpgradeKafkaListener = mock(DataHubUpgradeKafkaListener.class);
mockConfigurationProvider = mock(ConfigurationProvider.class);
mockEntityIndexBuilders = mock(EntityIndexBuilders.class);
when(mockEntityIndexBuilders.getIndexConvention()).thenReturn(IndexConventionImpl.noPrefix(""));
when(mockEntitySearchService.getIndexConvention()).thenReturn(IndexConventionImpl.noPrefix(""));
ElasticSearchConfiguration elasticSearchConfiguration = new ElasticSearchConfiguration();
SystemUpdateConfiguration systemUpdateConfiguration = new SystemUpdateConfiguration();
@ -132,9 +129,7 @@ public class UpdateIndicesHookTest {
mockTimeseriesAspectService,
mockSystemMetadataService,
searchDocumentTransformer,
mockEntityIndexBuilders,
"MD5");
opContext = TestOperationContexts.systemContextNoSearchAuthorization();
updateIndicesHook = new UpdateIndicesHook(updateIndicesService, true, false);
@ -244,7 +239,6 @@ public class UpdateIndicesHookTest {
mockTimeseriesAspectService,
mockSystemMetadataService,
searchDocumentTransformer,
mockEntityIndexBuilders,
"MD5");
updateIndicesHook = new UpdateIndicesHook(updateIndicesService, true, false);

View File

@ -13,7 +13,6 @@ import com.linkedin.metadata.dao.throttle.ThrottleSensor;
import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders;
import com.linkedin.metadata.search.transformer.SearchDocumentTransformer;
import com.linkedin.metadata.service.FormService;
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
@ -81,8 +80,6 @@ public class MCLSpringCommonTestConfiguration {
@MockBean(name = "duheKafkaConsumerFactory")
public DefaultKafkaConsumerFactory<String, GenericRecord> defaultKafkaConsumerFactory;
@MockBean public EntityIndexBuilders entityIndexBuilders;
@Bean(name = "systemOperationContext")
public OperationContext operationContext(
final EntityRegistry entityRegistry,

View File

@ -12,7 +12,6 @@ import com.linkedin.metadata.graph.SiblingGraphService;
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.restli.DefaultRestliClientFactory;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders;
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
import com.linkedin.parseq.retry.backoff.ExponentialBackoff;
import com.linkedin.restli.client.Client;
@ -70,6 +69,4 @@ public class MceConsumerApplicationTestConfiguration {
@MockBean protected ConfigEntityRegistry configEntityRegistry;
@MockBean protected SiblingGraphService siblingGraphService;
@MockBean public EntityIndexBuilders entityIndexBuilders;
}

View File

@ -1,10 +1,9 @@
package com.linkedin.gms.factory.entity.update.indices;
import com.linkedin.gms.factory.search.EntityIndexBuildersFactory;
import com.linkedin.gms.factory.search.ElasticSearchServiceFactory;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
import com.linkedin.metadata.search.transformer.SearchDocumentTransformer;
import com.linkedin.metadata.service.UpdateGraphIndicesService;
import com.linkedin.metadata.service.UpdateIndicesService;
@ -17,7 +16,7 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
@Configuration
@Import(EntityIndexBuildersFactory.class)
@Import(ElasticSearchServiceFactory.class)
public class UpdateIndicesServiceFactory {
@Value("${featureFlags.searchServiceDiffModeEnabled}")
@ -43,11 +42,10 @@ public class UpdateIndicesServiceFactory {
@ConditionalOnProperty(name = "entityClient.impl", havingValue = "restli")
public UpdateIndicesService searchIndicesServiceNonGMS(
GraphService graphService,
EntitySearchService entitySearchService,
ElasticSearchService entitySearchService,
TimeseriesAspectService timeseriesAspectService,
SystemMetadataService systemMetadataService,
SearchDocumentTransformer searchDocumentTransformer,
EntityIndexBuilders entityIndexBuilders,
@Value("${elasticsearch.idHashAlgo}") final String idHashAlgo) {
return new UpdateIndicesService(
@ -56,7 +54,6 @@ public class UpdateIndicesServiceFactory {
timeseriesAspectService,
systemMetadataService,
searchDocumentTransformer,
entityIndexBuilders,
idHashAlgo,
searchDiffMode,
structuredPropertiesHookEnabled,
@ -67,11 +64,10 @@ public class UpdateIndicesServiceFactory {
@ConditionalOnProperty(name = "entityClient.impl", havingValue = "java", matchIfMissing = true)
public UpdateIndicesService searchIndicesServiceGMS(
final GraphService graphService,
final EntitySearchService entitySearchService,
final ElasticSearchService entitySearchService,
final TimeseriesAspectService timeseriesAspectService,
final SystemMetadataService systemMetadataService,
final SearchDocumentTransformer searchDocumentTransformer,
final EntityIndexBuilders entityIndexBuilders,
final EntityService<?> entityService,
@Value("${elasticsearch.idHashAlgo}") final String idHashAlgo) {
@ -82,7 +78,6 @@ public class UpdateIndicesServiceFactory {
timeseriesAspectService,
systemMetadataService,
searchDocumentTransformer,
entityIndexBuilders,
idHashAlgo,
searchDiffMode,
structuredPropertiesHookEnabled,

View File

@ -7,8 +7,8 @@ import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory;
import com.linkedin.metadata.config.search.ElasticSearchConfiguration;
import com.linkedin.metadata.config.search.SearchConfiguration;
import com.linkedin.metadata.config.search.custom.CustomSearchConfiguration;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.SettingsBuilder;
import com.linkedin.metadata.search.elasticsearch.query.ESBrowseDAO;
import com.linkedin.metadata.search.elasticsearch.query.ESSearchDAO;
@ -37,9 +37,9 @@ public class ElasticSearchServiceFactory {
@Qualifier("settingsBuilder")
private SettingsBuilder settingsBuilder;
@Autowired private EntityIndexBuilders entityIndexBuilders;
@Autowired private ConfigurationProvider configurationProvider;
@Autowired
@Qualifier("entityRegistry")
private EntityRegistry entityRegistry;
@Bean(name = "elasticSearchService")
@Nonnull
@ -66,7 +66,10 @@ public class ElasticSearchServiceFactory {
customSearchConfiguration,
queryFilterRewriteChain);
return new ElasticSearchService(
entityIndexBuilders,
components.getIndexBuilder(),
entityRegistry,
components.getIndexConvention(),
settingsBuilder,
esSearchDAO,
new ESBrowseDAO(
components.getSearchClient(),

View File

@ -1,34 +0,0 @@
package com.linkedin.gms.factory.search;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.SettingsBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class EntityIndexBuildersFactory {
@Autowired
@Qualifier("baseElasticSearchComponents")
private BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components;
@Autowired
@Qualifier("entityRegistry")
private EntityRegistry entityRegistry;
@Autowired
@Qualifier("settingsBuilder")
private SettingsBuilder settingsBuilder;
@Bean
protected EntityIndexBuilders entityIndexBuilders() {
return new EntityIndexBuilders(
components.getIndexBuilder(),
entityRegistry,
components.getIndexConvention(),
settingsBuilder);
}
}

View File

@ -3,10 +3,10 @@ package com.linkedin.gms.factory.timeseries;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory;
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
import com.linkedin.metadata.config.TimeseriesAspectServiceConfig;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.elasticsearch.query.filter.QueryFilterRewriteChain;
import com.linkedin.metadata.timeseries.elastic.ElasticSearchTimeseriesAspectService;
import com.linkedin.metadata.timeseries.elastic.indexbuilder.TimeseriesAspectIndexBuilders;
import javax.annotation.Nonnull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
@ -32,11 +32,12 @@ public class ElasticSearchTimeseriesAspectServiceFactory {
final ConfigurationProvider configurationProvider) {
return new ElasticSearchTimeseriesAspectService(
components.getSearchClient(),
new TimeseriesAspectIndexBuilders(
components.getIndexBuilder(), entityRegistry, components.getIndexConvention()),
components.getBulkProcessor(),
components.getNumRetries(),
queryFilterRewriteChain,
configurationProvider.getTimeseriesAspectService());
TimeseriesAspectServiceConfig.builder().build(),
entityRegistry,
components.getIndexConvention(),
components.getIndexBuilder());
}
}