feat(cache): add hazelcast distributed cache option (#6645)

Co-authored-by: Aseem Bansal <asmbansal2@gmail.com>
This commit is contained in:
RyanHolstien 2023-01-19 11:03:08 -06:00 committed by GitHub
parent 6332038bb2
commit 8323128437
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 316 additions and 73 deletions

View File

@ -98,6 +98,9 @@ project.ext.externalDependency = [
'hadoopCommon':'org.apache.hadoop:hadoop-common:2.7.2',
'hadoopMapreduceClient':'org.apache.hadoop:hadoop-mapreduce-client-core:2.7.2',
'hadoopCommon3':'org.apache.hadoop:hadoop-common:3.3.4',
'hazelcast':'com.hazelcast:hazelcast:5.2.1',
'hazelcastSpring':'com.hazelcast:hazelcast-spring:5.2.1',
'hazelcastTest':'com.hazelcast:hazelcast:5.2.1:tests',
'hibernateCore': 'org.hibernate:hibernate-core:5.2.16.Final',
'httpClient': 'org.apache.httpcomponents:httpclient:4.5.9',
'httpAsyncClient': 'org.apache.httpcomponents:httpasyncclient:4.1.5',

View File

@ -37,9 +37,12 @@ import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.javatuples.Triplet;
import org.springframework.cache.Cache;
import static com.datahub.util.RecordUtils.*;
import static com.linkedin.metadata.search.utils.GZIPUtil.*;
@RequiredArgsConstructor
@Slf4j
@ -81,14 +84,15 @@ public class LineageSearchService {
@Nonnull List<String> entities, @Nullable String input, @Nullable Integer maxHops, @Nullable Filter inputFilters,
@Nullable SortCriterion sortCriterion, int from, int size) {
// Cache multihop result for faster performance
Triplet<String, LineageDirection, Integer> cacheKey = Triplet.with(sourceUrn.toString(), direction, maxHops);
CachedEntityLineageResult cachedLineageResult = cacheEnabled
? cache.get(Pair.of(sourceUrn, direction), CachedEntityLineageResult.class) : null;
? cache.get(cacheKey, CachedEntityLineageResult.class) : null;
EntityLineageResult lineageResult;
if (cachedLineageResult == null) {
maxHops = maxHops != null ? maxHops : 1000;
lineageResult = _graphService.getLineage(sourceUrn, direction, 0, MAX_RELATIONSHIPS, maxHops);
if (cacheEnabled) {
cache.put(Pair.of(sourceUrn, direction), new CachedEntityLineageResult(lineageResult, System.currentTimeMillis()));
cache.put(cacheKey, new CachedEntityLineageResult(lineageResult, System.currentTimeMillis()));
}
} else {
lineageResult = cachedLineageResult.getEntityLineageResult();

View File

@ -6,6 +6,7 @@ import com.linkedin.metadata.search.SearchEntity;
import com.linkedin.metadata.search.SearchEntityArray;
import com.linkedin.metadata.search.SearchResult;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
@ -15,6 +16,8 @@ import lombok.RequiredArgsConstructor;
import lombok.Value;
import org.springframework.cache.Cache;
import static com.datahub.util.RecordUtils.*;
/**
* Wrapper class to allow searching in batches and caching the results.
@ -33,7 +36,7 @@ public class CacheableSearcher<K> {
private final boolean enableCache;
@Value
public static class QueryPagination {
public static class QueryPagination implements Serializable {
int from;
int size;
}
@ -88,16 +91,19 @@ public class CacheableSearcher<K> {
QueryPagination batch = getBatchQuerySize(batchId);
SearchResult result;
if (enableCache()) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "getBatch_cache_access").time();
K cacheKey = cacheKeyGenerator.apply(batch);
result = cache.get(cacheKey, SearchResult.class);
cacheAccess.stop();
if (result == null) {
Timer.Context cacheMiss = MetricUtils.timer(this.getClass(), "getBatch_cache_miss").time();
result = searcher.apply(batch);
cache.put(cacheKey, result);
cacheMiss.stop();
MetricUtils.counter(this.getClass(), "getBatch_cache_miss_count").inc();
try (Timer.Context ignored2 = MetricUtils.timer(this.getClass(), "getBatch_cache").time()) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "getBatch_cache_access").time();
K cacheKey = cacheKeyGenerator.apply(batch);
String json = cache.get(cacheKey, String.class);
result = json != null ? toRecordTemplate(SearchResult.class, json) : null;
cacheAccess.stop();
if (result == null) {
Timer.Context cacheMiss = MetricUtils.timer(this.getClass(), "getBatch_cache_miss").time();
result = searcher.apply(batch);
cache.put(cacheKey, toJsonString(result));
cacheMiss.stop();
MetricUtils.counter(this.getClass(), "getBatch_cache_miss_count").inc();
}
}
} else {
result = searcher.apply(batch);

View File

@ -1,11 +1,24 @@
package com.linkedin.metadata.search.cache;
import com.linkedin.metadata.graph.EntityLineageResult;
import java.io.Serializable;
import lombok.Data;
import static com.datahub.util.RecordUtils.*;
import static com.linkedin.metadata.search.utils.GZIPUtil.*;
@Data
public class CachedEntityLineageResult {
private final EntityLineageResult entityLineageResult;
public class CachedEntityLineageResult implements Serializable {
private final byte[] entityLineageResult;
private final long timestamp;
public CachedEntityLineageResult(EntityLineageResult lineageResult, long timestamp) {
this.entityLineageResult = gzipCompress(toJsonString(lineageResult));
this.timestamp = timestamp;
}
public EntityLineageResult getEntityLineageResult() {
return toRecordTemplate(EntityLineageResult.class, gzipDecompress(entityLineageResult));
}
}

View File

@ -12,6 +12,8 @@ import lombok.RequiredArgsConstructor;
import org.javatuples.Quintet;
import org.springframework.cache.CacheManager;
import static com.datahub.util.RecordUtils.*;
@RequiredArgsConstructor
public class CachingAllEntitiesSearchAggregator {
@ -27,6 +29,8 @@ public class CachingAllEntitiesSearchAggregator {
return new CacheableSearcher<>(cacheManager.getCache(ALL_ENTITIES_SEARCH_AGGREGATOR_CACHE_NAME), batchSize,
querySize -> aggregator.search(entities, input, postFilters, sortCriterion, querySize.getFrom(),
querySize.getSize(), searchFlags),
querySize -> Quintet.with(entities, input, postFilters, sortCriterion, querySize), searchFlags, enableCache).getSearchResults(from, size);
querySize -> Quintet.with(entities, input, postFilters != null ? toJsonString(postFilters) : null,
sortCriterion != null ? toJsonString(sortCriterion) : null, querySize), searchFlags, enableCache)
.getSearchResults(from, size);
}
}

View File

@ -17,6 +17,8 @@ import org.javatuples.Quintet;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import static com.datahub.util.RecordUtils.*;
@RequiredArgsConstructor
public class CachingEntitySearchService {
@ -115,7 +117,8 @@ public class CachingEntitySearchService {
cacheManager.getCache(ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME),
batchSize,
querySize -> getRawSearchResults(entityName, query, filters, sortCriterion, querySize.getFrom(), querySize.getSize()),
querySize -> Quintet.with(entityName, query, filters, sortCriterion, querySize), flags, enableCache).getSearchResults(from, size);
querySize -> Quintet.with(entityName, query, filters != null ? toJsonString(filters) : null,
sortCriterion != null ? toJsonString(sortCriterion) : null, querySize), flags, enableCache).getSearchResults(from, size);
}
@ -133,16 +136,19 @@ public class CachingEntitySearchService {
Cache cache = cacheManager.getCache(ENTITY_SEARCH_SERVICE_AUTOCOMPLETE_CACHE_NAME);
AutoCompleteResult result;
if (enableCache(flags)) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "autocomplete_cache_access").time();
Object cacheKey = Quintet.with(entityName, input, field, filters, limit);
result = cache.get(cacheKey, AutoCompleteResult.class);
cacheAccess.stop();
if (result == null) {
Timer.Context cacheMiss = MetricUtils.timer(this.getClass(), "autocomplete_cache_miss").time();
result = getRawAutoCompleteResults(entityName, input, field, filters, limit);
cache.put(cacheKey, result);
cacheMiss.stop();
MetricUtils.counter(this.getClass(), "autocomplete_cache_miss_count").inc();
try (Timer.Context ignored2 = MetricUtils.timer(this.getClass(), "getCachedAutoCompleteResults_cache").time()) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "autocomplete_cache_access").time();
Object cacheKey = Quintet.with(entityName, input, field, filters != null ? toJsonString(filters) : null, limit);
String json = cache.get(cacheKey, String.class);
result = json != null ? toRecordTemplate(AutoCompleteResult.class, json) : null;
cacheAccess.stop();
if (result == null) {
Timer.Context cacheMiss = MetricUtils.timer(this.getClass(), "autocomplete_cache_miss").time();
result = getRawAutoCompleteResults(entityName, input, field, filters, limit);
cache.put(cacheKey, toJsonString(result));
cacheMiss.stop();
MetricUtils.counter(this.getClass(), "autocomplete_cache_miss_count").inc();
}
}
} else {
result = getRawAutoCompleteResults(entityName, input, field, filters, limit);
@ -165,16 +171,19 @@ public class CachingEntitySearchService {
Cache cache = cacheManager.getCache(ENTITY_SEARCH_SERVICE_BROWSE_CACHE_NAME);
BrowseResult result;
if (enableCache(flags)) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "browse_cache_access").time();
Object cacheKey = Quintet.with(entityName, path, filters, from, size);
result = cache.get(cacheKey, BrowseResult.class);
cacheAccess.stop();
if (result == null) {
Timer.Context cacheMiss = MetricUtils.timer(this.getClass(), "browse_cache_miss").time();
result = getRawBrowseResults(entityName, path, filters, from, size);
cache.put(cacheKey, result);
cacheMiss.stop();
MetricUtils.counter(this.getClass(), "browse_cache_miss_count").inc();
try (Timer.Context ignored2 = MetricUtils.timer(this.getClass(), "getCachedBrowseResults_cache").time()) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "browse_cache_access").time();
Object cacheKey = Quintet.with(entityName, path, filters != null ? toJsonString(filters) : null, from, size);
String json = cache.get(cacheKey, String.class);
result = json != null ? toRecordTemplate(BrowseResult.class, json) : null;
cacheAccess.stop();
if (result == null) {
Timer.Context cacheMiss = MetricUtils.timer(this.getClass(), "browse_cache_miss").time();
result = getRawBrowseResults(entityName, path, filters, from, size);
cache.put(cacheKey, toJsonString(result));
cacheMiss.stop();
MetricUtils.counter(this.getClass(), "browse_cache_miss_count").inc();
}
}
} else {
result = getRawBrowseResults(entityName, path, filters, from, size);

View File

@ -0,0 +1,48 @@
package com.linkedin.metadata.search.utils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
public class GZIPUtil {
private GZIPUtil() { }
public static String gzipDecompress(byte[] gzipped) {
String unzipped;
try (ByteArrayInputStream bis = new ByteArrayInputStream(gzipped);
GZIPInputStream gis = new GZIPInputStream(bis);
ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
byte[] buffer = new byte[1024];
int len;
while ((len = gis.read(buffer)) != -1) {
bos.write(buffer, 0, len);
}
unzipped = bos.toString(StandardCharsets.UTF_8);
} catch (IOException ie) {
throw new IllegalStateException("Error while unzipping value.", ie);
}
return unzipped;
}
public static byte[] gzipCompress(String unzipped) {
byte[] gzipped;
try (ByteArrayInputStream bis = new ByteArrayInputStream(unzipped.getBytes(StandardCharsets.UTF_8));
ByteArrayOutputStream bos = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(bos)) {
byte[] buffer = new byte[1024];
int len;
while ((len = bis.read(buffer)) != -1) {
gzipOutputStream.write(buffer, 0, len);
}
gzipOutputStream.finish();
gzipped = bos.toByteArray();
} catch (IOException ie) {
throw new IllegalStateException("Error while gzipping value: " + unzipped);
}
return gzipped;
}
}

View File

@ -15,6 +15,8 @@ dependencies {
compile externalDependency.elasticSearchRest
compile externalDependency.httpClient
compile externalDependency.gson
compile externalDependency.hazelcast
compile externalDependency.hazelcastSpring
compile externalDependency.kafkaClients
compile externalDependency.kafkaAvroSerde
compileOnly externalDependency.lombok
@ -39,6 +41,7 @@ dependencies {
testCompile externalDependency.mockito
testCompile externalDependency.testng
testCompile externalDependency.hazelcastTest
}

View File

@ -0,0 +1,74 @@
package com.linkedin.gms.factory.common;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.hazelcast.config.Config;
import com.hazelcast.config.EvictionConfig;
import com.hazelcast.config.EvictionPolicy;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.MaxSizePolicy;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.spring.cache.HazelcastCacheManager;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.CacheManager;
import org.springframework.cache.caffeine.CaffeineCacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CacheConfig {
@Value("${CACHE_TTL_SECONDS:600}")
private int cacheTtlSeconds;
@Value("${CACHE_MAX_SIZE:10000}")
private int cacheMaxSize;
@Value("${searchService.cache.hazelcast.serviceName:hazelcast-service}")
private String hazelcastServiceName;
@Bean
@ConditionalOnProperty(name = "searchService.cacheImplementation", havingValue = "caffeine")
public CacheManager caffeineCacheManager() {
CaffeineCacheManager cacheManager = new CaffeineCacheManager();
cacheManager.setCaffeine(caffeineCacheBuilder());
return cacheManager;
}
private Caffeine<Object, Object> caffeineCacheBuilder() {
return Caffeine.newBuilder()
.initialCapacity(100)
.maximumSize(cacheMaxSize)
.expireAfterAccess(cacheTtlSeconds, TimeUnit.SECONDS)
.recordStats();
}
@Bean
@ConditionalOnProperty(name = "searchService.cacheImplementation", havingValue = "hazelcast")
public CacheManager hazelcastCacheManager() {
Config config = new Config();
// TODO: This setting is equivalent to expireAfterAccess, refreshes timer after a get, put, containsKey etc.
// is this behavior what we actually desire? Should we change it now?
MapConfig mapConfig = new MapConfig().setMaxIdleSeconds(cacheTtlSeconds);
EvictionConfig evictionConfig = new EvictionConfig()
.setMaxSizePolicy(MaxSizePolicy.PER_NODE)
.setSize(cacheMaxSize)
.setEvictionPolicy(EvictionPolicy.LFU);
mapConfig.setEvictionConfig(evictionConfig);
mapConfig.setName("default");
config.addMapConfig(mapConfig);
config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false);
config.getNetworkConfig().getJoin().getKubernetesConfig().setEnabled(true)
.setProperty("service-dns", hazelcastServiceName);
HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance(config);
return new HazelcastCacheManager(hazelcastInstance);
}
}

View File

@ -1,35 +0,0 @@
package com.linkedin.gms.factory.common;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.CacheManager;
import org.springframework.cache.caffeine.CaffeineCacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CaffeineCacheConfig {
@Value("${CACHE_TTL_SECONDS:600}")
private int cacheTtlSeconds;
@Value("${CACHE_MAX_SIZE:10000}")
private int cacheMaxSize;
@Bean
public CacheManager cacheManager() {
CaffeineCacheManager cacheManager = new CaffeineCacheManager();
cacheManager.setCaffeine(caffeineCacheBuilder());
return cacheManager;
}
private Caffeine<Object, Object> caffeineCacheBuilder() {
return Caffeine.newBuilder()
.initialCapacity(100)
.maximumSize(cacheMaxSize)
.expireAfterAccess(cacheTtlSeconds, TimeUnit.SECONDS)
.recordStats();
}
}

View File

@ -81,6 +81,10 @@ graphService:
searchService:
resultBatchSize: ${SEARCH_SERVICE_BATCH_SIZE:100}
enableCache: ${SEARCH_SERVICE_ENABLE_CACHE:false}
cacheImplementation: ${SEARCH_SERVICE_CACHE_IMPLEMENTATION:caffeine}
cache:
hazelcast:
serviceName: ${SEARCH_SERVICE_HAZELCAST_SERVICE_NAME:hazelcast-service}
configEntityRegistry:
path: ${ENTITY_REGISTRY_CONFIG_PATH:../../metadata-models/src/main/resources/entity-registry.yml}

View File

@ -0,0 +1,110 @@
package com.linkedin.gms.factory.search;
import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.spring.cache.HazelcastCacheManager;
import com.linkedin.common.urn.CorpuserUrn;
import com.linkedin.metadata.graph.EntityLineageResult;
import com.linkedin.metadata.graph.LineageDirection;
import com.linkedin.metadata.graph.LineageRelationship;
import com.linkedin.metadata.graph.LineageRelationshipArray;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.query.filter.SortCriterion;
import com.linkedin.metadata.search.SearchEntity;
import com.linkedin.metadata.search.SearchEntityArray;
import com.linkedin.metadata.search.SearchResult;
import com.linkedin.metadata.search.SearchResultMetadata;
import com.linkedin.metadata.search.cache.CacheableSearcher;
import com.linkedin.metadata.search.cache.CachedEntityLineageResult;
import java.util.List;
import org.javatuples.Quintet;
import org.javatuples.Triplet;
import org.springframework.cache.Cache;
import org.testng.Assert;
import org.testng.annotations.Test;
import static com.datahub.util.RecordUtils.*;
import static com.linkedin.metadata.search.utils.GZIPUtil.*;
public class CacheTest extends JetTestSupport {
HazelcastCacheManager cacheManager1;
HazelcastCacheManager cacheManager2;
HazelcastInstance instance1;
HazelcastInstance instance2;
public CacheTest() {
Config config = new Config();
instance1 = createHazelcastInstance(config);
instance2 = createHazelcastInstance(config);
cacheManager1 = new HazelcastCacheManager(instance1);
cacheManager2 = new HazelcastCacheManager(instance2);
}
@Test
public void hazelcastTest() {
CorpuserUrn corpuserUrn = new CorpuserUrn("user");
SearchEntity searchEntity = new SearchEntity().setEntity(corpuserUrn);
SearchResult searchResult = new SearchResult()
.setEntities(new SearchEntityArray(List.of(searchEntity)))
.setNumEntities(1)
.setFrom(0)
.setPageSize(1)
.setMetadata(new SearchResultMetadata());
Quintet<List<String>, String, Filter, SortCriterion, CacheableSearcher.QueryPagination>
quintet = Quintet.with(List.of(corpuserUrn.toString()), "*", null, null,
new CacheableSearcher.QueryPagination(0, 1));
CacheableSearcher<Quintet<List<String>, String, Filter, SortCriterion, CacheableSearcher.QueryPagination>> cacheableSearcher1 =
new CacheableSearcher<>(cacheManager1.getCache("test"), 10,
querySize -> searchResult,
querySize -> quintet, null, true);
CacheableSearcher<Quintet<List<String>, String, Filter, SortCriterion, CacheableSearcher.QueryPagination>> cacheableSearcher2 =
new CacheableSearcher<>(cacheManager2.getCache("test"), 10,
querySize -> searchResult,
querySize -> quintet, null, true);
// Cache result
SearchResult result = cacheableSearcher1.getSearchResults(0, 1);
Assert.assertNotEquals(result, null);
Assert.assertEquals(instance1.getMap("test").get(quintet), instance2.getMap("test").get(quintet));
Assert.assertEquals(cacheableSearcher1.getSearchResults(0, 1), searchResult);
Assert.assertEquals(cacheableSearcher1.getSearchResults(0, 1), cacheableSearcher2.getSearchResults(0, 1));
}
@Test
public void testLineageCaching() {
CorpuserUrn corpuserUrn = new CorpuserUrn("user");
EntityLineageResult lineageResult = new EntityLineageResult();
LineageRelationshipArray array = new LineageRelationshipArray();
LineageRelationship lineageRelationship = new LineageRelationship().setEntity(corpuserUrn).setType("type");
for (int i = 0; i < 10000; i++) {
array.add(lineageRelationship);
}
lineageResult.setRelationships(array).setCount(1).setStart(0).setTotal(1);
CachedEntityLineageResult cachedEntityLineageResult = new CachedEntityLineageResult(lineageResult,
System.currentTimeMillis());
Cache cache1 = cacheManager1.getCache("relationshipSearchService");
Cache cache2 = cacheManager2.getCache("relationshipSearchService");
Triplet<String, LineageDirection, Integer> triplet = Triplet.with(corpuserUrn.toString(), LineageDirection.DOWNSTREAM, 3);
cache1.put(triplet, cachedEntityLineageResult);
Assert.assertEquals(instance1.getMap("relationshipSearchService").get(triplet),
instance2.getMap("relationshipSearchService").get(triplet));
CachedEntityLineageResult cachedResult1 = cache1.get(triplet, CachedEntityLineageResult.class);
CachedEntityLineageResult cachedResult2 = cache2.get(triplet, CachedEntityLineageResult.class);
Assert.assertEquals(cachedResult1, cachedResult2);
Assert.assertEquals(cache1.get(triplet, CachedEntityLineageResult.class), cachedEntityLineageResult);
Assert.assertEquals(cache2.get(triplet, CachedEntityLineageResult.class).getEntityLineageResult(), lineageResult);
}
}