diff --git a/metadata-io/src/main/java/com/linkedin/metadata/config/CacheConfiguration.java b/metadata-io/src/main/java/com/linkedin/metadata/config/CacheConfiguration.java new file mode 100644 index 0000000000..b0f7cbda3a --- /dev/null +++ b/metadata-io/src/main/java/com/linkedin/metadata/config/CacheConfiguration.java @@ -0,0 +1,10 @@ +package com.linkedin.metadata.config; + +import lombok.Data; + + +@Data +public class CacheConfiguration { + PrimaryCacheConfiguration primary; + HomepageCacheConfiguration homepage; +} diff --git a/metadata-io/src/main/java/com/linkedin/metadata/config/EntityDocCountCacheConfiguration.java b/metadata-io/src/main/java/com/linkedin/metadata/config/EntityDocCountCacheConfiguration.java new file mode 100644 index 0000000000..5fab3d5893 --- /dev/null +++ b/metadata-io/src/main/java/com/linkedin/metadata/config/EntityDocCountCacheConfiguration.java @@ -0,0 +1,9 @@ +package com.linkedin.metadata.config; + +import lombok.Data; + + +@Data +public class EntityDocCountCacheConfiguration { + long ttlSeconds; +} diff --git a/metadata-io/src/main/java/com/linkedin/metadata/config/HomepageCacheConfiguration.java b/metadata-io/src/main/java/com/linkedin/metadata/config/HomepageCacheConfiguration.java new file mode 100644 index 0000000000..1252d531ab --- /dev/null +++ b/metadata-io/src/main/java/com/linkedin/metadata/config/HomepageCacheConfiguration.java @@ -0,0 +1,9 @@ +package com.linkedin.metadata.config; + +import lombok.Data; + + +@Data +public class HomepageCacheConfiguration { + EntityDocCountCacheConfiguration entityCounts; +} diff --git a/metadata-io/src/main/java/com/linkedin/metadata/config/PrimaryCacheConfiguration.java b/metadata-io/src/main/java/com/linkedin/metadata/config/PrimaryCacheConfiguration.java new file mode 100644 index 0000000000..881d30660c --- /dev/null +++ b/metadata-io/src/main/java/com/linkedin/metadata/config/PrimaryCacheConfiguration.java @@ -0,0 +1,10 @@ +package com.linkedin.metadata.config; + +import lombok.Data; + + +@Data +public class PrimaryCacheConfiguration { + long ttlSeconds; + long maxSize; +} diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/aggregator/AllEntitiesSearchAggregator.java b/metadata-io/src/main/java/com/linkedin/metadata/search/aggregator/AllEntitiesSearchAggregator.java index 8a093f792d..cff9ed8b65 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/aggregator/AllEntitiesSearchAggregator.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/aggregator/AllEntitiesSearchAggregator.java @@ -3,6 +3,7 @@ package com.linkedin.metadata.search.aggregator; import com.codahale.metrics.Timer; import com.linkedin.data.template.GetMode; import com.linkedin.data.template.LongMap; +import com.linkedin.metadata.config.EntityDocCountCacheConfiguration; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.query.SearchFlags; import com.linkedin.metadata.query.filter.Filter; @@ -54,11 +55,12 @@ public class AllEntitiesSearchAggregator { EntityRegistry entityRegistry, EntitySearchService entitySearchService, CachingEntitySearchService cachingEntitySearchService, - SearchRanker searchRanker) { + SearchRanker searchRanker, + EntityDocCountCacheConfiguration entityDocCountCacheConfiguration) { _entitySearchService = Objects.requireNonNull(entitySearchService); _searchRanker = Objects.requireNonNull(searchRanker); _cachingEntitySearchService = Objects.requireNonNull(cachingEntitySearchService); - _entityDocCountCache = new EntityDocCountCache(entityRegistry, entitySearchService); + _entityDocCountCache = new EntityDocCountCache(entityRegistry, entitySearchService, entityDocCountCacheConfiguration); _maxAggregationValueCount = DEFAULT_MAX_AGGREGATION_VALUES; // TODO: Make this externally configurable } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/cache/EntityDocCountCache.java b/metadata-io/src/main/java/com/linkedin/metadata/search/cache/EntityDocCountCache.java index 40192e4cf6..6369876dd3 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/cache/EntityDocCountCache.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/cache/EntityDocCountCache.java @@ -1,8 +1,10 @@ package com.linkedin.metadata.search.cache; import com.google.common.base.Suppliers; +import com.linkedin.metadata.config.EntityDocCountCacheConfiguration; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.search.EntitySearchService; +import com.linkedin.metadata.utils.ConcurrencyUtils; import io.opentelemetry.extension.annotations.WithSpan; import java.util.List; import java.util.Map; @@ -17,17 +19,18 @@ public class EntityDocCountCache { private final EntitySearchService _entitySearchService; private final Supplier> entityDocCount; - public EntityDocCountCache(EntityRegistry entityRegistry, EntitySearchService entitySearchService) { + public EntityDocCountCache(EntityRegistry entityRegistry, EntitySearchService entitySearchService, + EntityDocCountCacheConfiguration config) { _entityRegistry = entityRegistry; _entitySearchService = entitySearchService; - entityDocCount = Suppliers.memoizeWithExpiration(this::fetchEntityDocCount, 1, TimeUnit.MINUTES); + entityDocCount = Suppliers.memoizeWithExpiration(this::fetchEntityDocCount, config.getTtlSeconds(), TimeUnit.SECONDS); } private Map fetchEntityDocCount() { - return _entityRegistry.getEntitySpecs() - .keySet() - .stream() - .collect(Collectors.toMap(Function.identity(), _entitySearchService::docCount)); + return ConcurrencyUtils + .transformAndCollectAsync(_entityRegistry.getEntitySpecs().keySet(), + Function.identity(), + Collectors.toMap(Function.identity(), _entitySearchService::docCount)); } @WithSpan diff --git a/metadata-io/src/test/java/com/linkedin/metadata/ESSampleDataFixture.java b/metadata-io/src/test/java/com/linkedin/metadata/ESSampleDataFixture.java index 428b8185c8..89b66ef8ad 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/ESSampleDataFixture.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/ESSampleDataFixture.java @@ -3,6 +3,7 @@ package com.linkedin.metadata; import com.linkedin.entity.client.EntityClient; import com.linkedin.metadata.client.JavaEntityClient; import com.linkedin.metadata.config.ElasticSearchConfiguration; +import com.linkedin.metadata.config.EntityDocCountCacheConfiguration; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.search.SearchService; @@ -102,9 +103,11 @@ public class ESSampleDataFixture { int batchSize = 100; SearchRanker ranker = new SimpleRanker(); CacheManager cacheManager = new ConcurrentMapCacheManager(); + EntityDocCountCacheConfiguration entityDocCountCacheConfiguration = new EntityDocCountCacheConfiguration(); + entityDocCountCacheConfiguration.setTtlSeconds(600L); SearchService service = new SearchService( - new EntityDocCountCache(entityRegistry, entitySearchService), + new EntityDocCountCache(entityRegistry, entitySearchService, entityDocCountCacheConfiguration), new CachingEntitySearchService( cacheManager, entitySearchService, @@ -122,7 +125,8 @@ public class ESSampleDataFixture { batchSize, false ), - ranker + ranker, + entityDocCountCacheConfiguration ), batchSize, false diff --git a/metadata-io/src/test/java/com/linkedin/metadata/ESSearchLineageFixture.java b/metadata-io/src/test/java/com/linkedin/metadata/ESSearchLineageFixture.java index 91b7959bd2..64dd5053a8 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/ESSearchLineageFixture.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/ESSearchLineageFixture.java @@ -3,6 +3,7 @@ package com.linkedin.metadata; import com.linkedin.entity.client.EntityClient; import com.linkedin.metadata.client.JavaEntityClient; import com.linkedin.metadata.config.ElasticSearchConfiguration; +import com.linkedin.metadata.config.EntityDocCountCacheConfiguration; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.graph.elastic.ESGraphQueryDAO; import com.linkedin.metadata.graph.elastic.ESGraphWriteDAO; @@ -149,9 +150,11 @@ public class ESSearchLineageFixture { int batchSize = 100; SearchRanker ranker = new SimpleRanker(); CacheManager cacheManager = new ConcurrentMapCacheManager(); + EntityDocCountCacheConfiguration entityDocCountCacheConfiguration = new EntityDocCountCacheConfiguration(); + entityDocCountCacheConfiguration.setTtlSeconds(600L); SearchService service = new SearchService( - new EntityDocCountCache(entityRegistry, entitySearchService), + new EntityDocCountCache(entityRegistry, entitySearchService, entityDocCountCacheConfiguration), new CachingEntitySearchService( cacheManager, entitySearchService, @@ -169,7 +172,8 @@ public class ESSearchLineageFixture { batchSize, false ), - ranker + ranker, + entityDocCountCacheConfiguration ), batchSize, false diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/LineageSearchServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/LineageSearchServiceTest.java index 00fc49c6ce..24735d93d0 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/LineageSearchServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/LineageSearchServiceTest.java @@ -9,6 +9,7 @@ import com.linkedin.common.urn.Urn; import com.linkedin.data.schema.annotation.PathSpecBasedSchemaAnnotationVisitor; import com.linkedin.metadata.ESTestConfiguration; import com.linkedin.metadata.TestEntityUtil; +import com.linkedin.metadata.config.EntityDocCountCacheConfiguration; import com.linkedin.metadata.graph.EntityLineageResult; import com.linkedin.metadata.graph.GraphService; import com.linkedin.metadata.graph.LineageDirection; @@ -91,13 +92,16 @@ public class LineageSearchServiceTest extends AbstractTestNGSpringContextTests { private void resetService(boolean withCache) { CachingEntitySearchService cachingEntitySearchService = new CachingEntitySearchService(_cacheManager, _elasticSearchService, 100, true); + EntityDocCountCacheConfiguration entityDocCountCacheConfiguration = new EntityDocCountCacheConfiguration(); + entityDocCountCacheConfiguration.setTtlSeconds(600L); _lineageSearchService = new LineageSearchService( new SearchService( - new EntityDocCountCache(_entityRegistry, _elasticSearchService), + new EntityDocCountCache(_entityRegistry, _elasticSearchService, entityDocCountCacheConfiguration), cachingEntitySearchService, new CachingAllEntitiesSearchAggregator( _cacheManager, - new AllEntitiesSearchAggregator(_entityRegistry, _elasticSearchService, cachingEntitySearchService, new SimpleRanker()), + new AllEntitiesSearchAggregator(_entityRegistry, _elasticSearchService, cachingEntitySearchService, + new SimpleRanker(), entityDocCountCacheConfiguration), 100, true), new SimpleRanker()), diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/SearchServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/SearchServiceTest.java index 647b16ad35..29c1f5d9ba 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/SearchServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/SearchServiceTest.java @@ -8,6 +8,7 @@ import com.linkedin.common.urn.TestEntityUrn; import com.linkedin.common.urn.Urn; import com.linkedin.data.template.StringArray; import com.linkedin.metadata.ESTestConfiguration; +import com.linkedin.metadata.config.EntityDocCountCacheConfiguration; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.models.registry.SnapshotEntityRegistry; import com.linkedin.metadata.query.SearchFlags; @@ -82,8 +83,11 @@ public class SearchServiceTest extends AbstractTestNGSpringContextTests { _elasticSearchService, 100, true); + + EntityDocCountCacheConfiguration entityDocCountCacheConfiguration = new EntityDocCountCacheConfiguration(); + entityDocCountCacheConfiguration.setTtlSeconds(600L); _searchService = new SearchService( - new EntityDocCountCache(_entityRegistry, _elasticSearchService), + new EntityDocCountCache(_entityRegistry, _elasticSearchService, entityDocCountCacheConfiguration), cachingEntitySearchService, new CachingAllEntitiesSearchAggregator( _cacheManager, @@ -91,7 +95,7 @@ public class SearchServiceTest extends AbstractTestNGSpringContextTests { _entityRegistry, _elasticSearchService, cachingEntitySearchService, - new SimpleRanker()), + new SimpleRanker(), entityDocCountCacheConfiguration), 100, true), new SimpleRanker()); @@ -161,6 +165,9 @@ public class SearchServiceTest extends AbstractTestNGSpringContextTests { assertEquals(searchResult.getEntities().get(0).getEntity(), urn2); clearCache(); + long docCount = _elasticSearchService.docCount(ENTITY_NAME); + assertEquals(docCount, 2L); + _elasticSearchService.deleteDocument(ENTITY_NAME, urn.toString()); _elasticSearchService.deleteDocument(ENTITY_NAME, urn2.toString()); syncAfterWrite(_bulkProcessor); diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/CacheConfig.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/CacheConfig.java index da03924e41..820b272bed 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/CacheConfig.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/CacheConfig.java @@ -21,10 +21,10 @@ import org.springframework.context.annotation.Configuration; @Configuration public class CacheConfig { - @Value("${CACHE_TTL_SECONDS:600}") + @Value("${cache.primary.ttlSeconds:600}") private int cacheTtlSeconds; - @Value("${CACHE_MAX_SIZE:10000}") + @Value("${cache.primary.maxSize:10000}") private int cacheMaxSize; @Value("${searchService.cache.hazelcast.serviceName:hazelcast-service}") diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/config/ConfigurationProvider.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/config/ConfigurationProvider.java index 3ecef41ee1..19a4f816b2 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/config/ConfigurationProvider.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/config/ConfigurationProvider.java @@ -3,6 +3,7 @@ package com.linkedin.gms.factory.config; import com.datahub.authentication.AuthenticationConfiguration; import com.datahub.authorization.AuthorizationConfiguration; import com.linkedin.datahub.graphql.featureflags.FeatureFlags; +import com.linkedin.metadata.config.CacheConfiguration; import com.linkedin.metadata.config.DataHubConfiguration; import com.linkedin.metadata.config.ElasticSearchConfiguration; import com.linkedin.metadata.config.IngestionConfiguration; @@ -69,4 +70,9 @@ public class ConfigurationProvider { * System Update configurations */ private SystemUpdateConfiguration systemUpdate; + + /** + * Configuration for caching + */ + private CacheConfiguration cache; } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/AllEntitiesSearchAggregatorFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/AllEntitiesSearchAggregatorFactory.java index c3076dca2a..b6e83356b6 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/AllEntitiesSearchAggregatorFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/AllEntitiesSearchAggregatorFactory.java @@ -1,5 +1,6 @@ package com.linkedin.gms.factory.search; +import com.linkedin.gms.factory.config.ConfigurationProvider; import com.linkedin.gms.factory.spring.YamlPropertySourceFactory; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.search.EntitySearchService; @@ -38,11 +39,12 @@ public class AllEntitiesSearchAggregatorFactory { @Bean(name = "allEntitiesSearchAggregator") @Primary @Nonnull - protected AllEntitiesSearchAggregator getInstance() { + protected AllEntitiesSearchAggregator getInstance(ConfigurationProvider configurationProvider) { return new AllEntitiesSearchAggregator( entityRegistry, entitySearchService, cachingEntitySearchService, - searchRanker); + searchRanker, + configurationProvider.getCache().getHomepage().getEntityCounts()); } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/SearchServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/SearchServiceFactory.java index 1ead5669fd..5783c5b8af 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/SearchServiceFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/SearchServiceFactory.java @@ -1,5 +1,6 @@ package com.linkedin.gms.factory.search; +import com.linkedin.gms.factory.config.ConfigurationProvider; import com.linkedin.gms.factory.spring.YamlPropertySourceFactory; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.search.EntitySearchService; @@ -44,9 +45,10 @@ public class SearchServiceFactory { @Bean(name = "searchService") @Primary @Nonnull - protected SearchService getInstance() { + protected SearchService getInstance(ConfigurationProvider configurationProvider) { return new SearchService( - new EntityDocCountCache(entityRegistry, entitySearchService), + new EntityDocCountCache(entityRegistry, entitySearchService, configurationProvider.getCache() + .getHomepage().getEntityCounts()), cachingEntitySearchService, cachingAllEntitiesSearchAggregator, searchRanker); diff --git a/metadata-service/factories/src/main/resources/application.yml b/metadata-service/factories/src/main/resources/application.yml index 304b11c6fb..745533cfdd 100644 --- a/metadata-service/factories/src/main/resources/application.yml +++ b/metadata-service/factories/src/main/resources/application.yml @@ -255,3 +255,11 @@ entityClient: usageClient: retryInterval: ${USAGE_CLIENT_RETRY_INTERVAL:2} numRetries: ${USAGE_CLIENT_NUM_RETRIES:3} + +cache: + primary: + ttlSeconds: ${CACHE_TTL_SECONDS:600} + maxSize: ${CACHE_MAX_SIZE:10000} + homepage: + entityCounts: + ttlSeconds: ${CACHE_ENTITY_COUNTS_TTL_SECONDS:600} diff --git a/metadata-utils/src/main/java/com/linkedin/metadata/utils/ConcurrencyUtils.java b/metadata-utils/src/main/java/com/linkedin/metadata/utils/ConcurrencyUtils.java index 552f0d6b99..551683153a 100644 --- a/metadata-utils/src/main/java/com/linkedin/metadata/utils/ConcurrencyUtils.java +++ b/metadata-utils/src/main/java/com/linkedin/metadata/utils/ConcurrencyUtils.java @@ -1,11 +1,13 @@ package com.linkedin.metadata.utils; +import java.util.Collection; import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.Function; +import java.util.stream.Collector; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -15,18 +17,24 @@ public class ConcurrencyUtils { private ConcurrencyUtils() { } + public static List transformAndCollectAsync(List originalList, Function transformer) { + return transformAndCollectAsync(originalList, transformer, Collectors.toList()); + } + /** * Transforms original list into the final list using the function transformer in an asynchronous fashion * i.e. each element transform is run as a separate CompleteableFuture and then joined at the end */ - public static List transformAndCollectAsync(List originalList, Function transformer) { - return originalList.stream() + public static OUTPUT transformAndCollectAsync(Collection originalCollection, + Function transformer, Collector collector) { + return originalCollection.stream() .map(element -> CompletableFuture.supplyAsync(() -> transformer.apply(element))) .collect(Collectors.collectingAndThen(Collectors.toList(), completableFutureList -> completableFutureList.stream().map(CompletableFuture::join))) - .collect(Collectors.toList()); + .collect(collector); } + /** * Transforms original list into the final list using the function transformer in an asynchronous fashion * with exceptions handled by the input exceptionHandler @@ -34,13 +42,23 @@ public class ConcurrencyUtils { */ public static List transformAndCollectAsync(List originalList, Function transformer, BiFunction exceptionHandler) { - return originalList.stream() + return transformAndCollectAsync(originalList, transformer, exceptionHandler, Collectors.toList()); + } + + /** + * Transforms original list into the final list using the function transformer in an asynchronous fashion + * with exceptions handled by the input exceptionHandler + * i.e. each element transform is run as a separate CompleteableFuture and then joined at the end + */ + public static OUTPUT transformAndCollectAsync(Collection originalCollection, + Function transformer, BiFunction exceptionHandler, Collector collector) { + return originalCollection.stream() .map(element -> CompletableFuture.supplyAsync(() -> transformer.apply(element)) .exceptionally(e -> exceptionHandler.apply(element, e))) .filter(Objects::nonNull) .collect(Collectors.collectingAndThen(Collectors.toList(), completableFutureList -> completableFutureList.stream().map(CompletableFuture::join))) - .collect(Collectors.toList()); + .collect(collector); } /**