mirror of
https://github.com/datahub-project/datahub.git
synced 2026-01-07 15:27:05 +00:00
feat(gms): Write back lineage search results to in-memory cache bound to feature flag (#6006)
Co-authored-by: Piyush Narang <piyushn@stripe.com>
This commit is contained in:
parent
2c03005fc2
commit
2c659214ab
@ -6,4 +6,5 @@ import lombok.Data;
|
||||
@Data
|
||||
public class FeatureFlags {
|
||||
private boolean showSimplifiedHomepageByDefault = false;
|
||||
private boolean lineageSearchCacheEnabled = false;
|
||||
}
|
||||
|
||||
@ -13,6 +13,7 @@ import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
|
||||
import com.linkedin.metadata.query.filter.Criterion;
|
||||
import com.linkedin.metadata.query.filter.Filter;
|
||||
import com.linkedin.metadata.query.filter.SortCriterion;
|
||||
import com.linkedin.metadata.search.cache.CachedEntityLineageResult;
|
||||
import com.linkedin.metadata.search.utils.FilterUtils;
|
||||
import com.linkedin.metadata.search.utils.QueryUtils;
|
||||
import com.linkedin.metadata.search.utils.SearchUtils;
|
||||
@ -30,16 +31,20 @@ import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.springframework.cache.Cache;
|
||||
|
||||
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class LineageSearchService {
|
||||
private final SearchService _searchService;
|
||||
private final GraphService _graphService;
|
||||
@Nullable
|
||||
private final Cache cache;
|
||||
private final boolean cacheEnabled;
|
||||
|
||||
private static final String DEGREE_FILTER = "degree";
|
||||
private static final String DEGREE_FILTER_INPUT = "degree.keyword";
|
||||
@ -50,6 +55,7 @@ public class LineageSearchService {
|
||||
private static final int MAX_RELATIONSHIPS = 1000000;
|
||||
private static final int MAX_TERMS = 50000;
|
||||
private static final SearchFlags SKIP_CACHE = new SearchFlags().setSkipCache(true);
|
||||
private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;
|
||||
|
||||
/**
|
||||
* Gets a list of documents that match given search request that is related to the input entity
|
||||
@ -71,10 +77,20 @@ public class LineageSearchService {
|
||||
@Nonnull List<String> entities, @Nullable String input, @Nullable Integer maxHops, @Nullable Filter inputFilters,
|
||||
@Nullable SortCriterion sortCriterion, int from, int size) {
|
||||
// Cache multihop result for faster performance
|
||||
EntityLineageResult lineageResult = cache.get(Pair.of(sourceUrn, direction), EntityLineageResult.class);
|
||||
if (lineageResult == null) {
|
||||
CachedEntityLineageResult cachedLineageResult = cacheEnabled
|
||||
? cache.get(Pair.of(sourceUrn, direction), CachedEntityLineageResult.class) : null;
|
||||
EntityLineageResult lineageResult;
|
||||
if (cachedLineageResult == null) {
|
||||
maxHops = maxHops != null ? maxHops : 1000;
|
||||
lineageResult = _graphService.getLineage(sourceUrn, direction, 0, MAX_RELATIONSHIPS, maxHops);
|
||||
if (cacheEnabled) {
|
||||
cache.put(Pair.of(sourceUrn, direction), new CachedEntityLineageResult(lineageResult, System.currentTimeMillis()));
|
||||
}
|
||||
} else {
|
||||
lineageResult = cachedLineageResult.getEntityLineageResult();
|
||||
if (System.currentTimeMillis() - cachedLineageResult.getTimestamp() > DAY_IN_MS) {
|
||||
log.warn("Cached lineage entry for: {} is older than one day.", sourceUrn);
|
||||
}
|
||||
}
|
||||
|
||||
// Filter hopped result based on the set of entities to return and inputFilters before sending to search
|
||||
|
||||
11
metadata-io/src/main/java/com/linkedin/metadata/search/cache/CachedEntityLineageResult.java
vendored
Normal file
11
metadata-io/src/main/java/com/linkedin/metadata/search/cache/CachedEntityLineageResult.java
vendored
Normal file
@ -0,0 +1,11 @@
|
||||
package com.linkedin.metadata.search.cache;
|
||||
|
||||
import com.linkedin.metadata.graph.EntityLineageResult;
|
||||
import lombok.Data;
|
||||
|
||||
|
||||
@Data
|
||||
public class CachedEntityLineageResult {
|
||||
private final EntityLineageResult entityLineageResult;
|
||||
private final long timestamp;
|
||||
}
|
||||
@ -87,10 +87,10 @@ public class LineageSearchServiceTest {
|
||||
_elasticSearchService.configure();
|
||||
_cacheManager = new ConcurrentMapCacheManager();
|
||||
_graphService = mock(GraphService.class);
|
||||
resetService();
|
||||
resetService(true);
|
||||
}
|
||||
|
||||
private void resetService() {
|
||||
private void resetService(boolean withCache) {
|
||||
CachingEntitySearchService cachingEntitySearchService = new CachingEntitySearchService(_cacheManager, _elasticSearchService, 100, true);
|
||||
_lineageSearchService = new LineageSearchService(
|
||||
new SearchService(
|
||||
@ -102,7 +102,7 @@ public class LineageSearchServiceTest {
|
||||
100,
|
||||
true),
|
||||
new SimpleRanker()),
|
||||
_graphService, _cacheManager.getCache("test"));
|
||||
_graphService, _cacheManager.getCache("test"), withCache);
|
||||
}
|
||||
|
||||
@BeforeMethod
|
||||
@ -126,7 +126,7 @@ public class LineageSearchServiceTest {
|
||||
|
||||
private void clearCache() {
|
||||
_cacheManager.getCacheNames().forEach(cache -> _cacheManager.getCache(cache).clear());
|
||||
resetService();
|
||||
resetService(true);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
||||
@ -1,13 +1,12 @@
|
||||
package com.linkedin.gms.factory.search;
|
||||
|
||||
import com.linkedin.gms.factory.common.GraphServiceFactory;
|
||||
import com.linkedin.gms.factory.config.ConfigurationProvider;
|
||||
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
|
||||
import com.linkedin.metadata.graph.GraphService;
|
||||
import com.linkedin.metadata.search.LineageSearchService;
|
||||
import com.linkedin.metadata.search.SearchService;
|
||||
import javax.annotation.Nonnull;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.cache.CacheManager;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
@ -21,22 +20,13 @@ import org.springframework.context.annotation.PropertySource;
|
||||
@PropertySource(value = "classpath:/application.yml", factory = YamlPropertySourceFactory.class)
|
||||
public class LineageSearchServiceFactory {
|
||||
|
||||
@Autowired
|
||||
@Qualifier("searchService")
|
||||
private SearchService searchService;
|
||||
|
||||
@Autowired
|
||||
@Qualifier("graphService")
|
||||
private GraphService graphService;
|
||||
|
||||
@Autowired
|
||||
private CacheManager cacheManager;
|
||||
|
||||
@Bean(name = "relationshipSearchService")
|
||||
@Primary
|
||||
@Nonnull
|
||||
protected LineageSearchService getInstance() {
|
||||
protected LineageSearchService getInstance(CacheManager cacheManager, GraphService graphService,
|
||||
SearchService searchService, ConfigurationProvider configurationProvider) {
|
||||
boolean cacheEnabled = configurationProvider.getFeatureFlags().isLineageSearchCacheEnabled();
|
||||
return new LineageSearchService(searchService, graphService,
|
||||
cacheManager.getCache("relationshipSearchService"));
|
||||
cacheEnabled ? cacheManager.getCache("relationshipSearchService") : null, cacheEnabled);
|
||||
}
|
||||
}
|
||||
|
||||
@ -204,6 +204,7 @@ bootstrap:
|
||||
|
||||
featureFlags:
|
||||
showSimplifiedHomepageByDefault: ${SHOW_SIMPLIFIED_HOMEPAGE_BY_DEFAULT:false} # shows a simplified homepage with just datasets, charts and dashboards by default to users. this can be configured in user settings
|
||||
lineageSearchCacheEnabled: ${LINEAGE_SEARCH_CACHE_ENABLED:false} # Enables in-memory cache for searchAcrossLineage query, disabled by default to prevent unexpected update delays
|
||||
|
||||
entityChangeEvents:
|
||||
enabled: ${ENABLE_ENTITY_CHANGE_EVENTS_HOOK:true}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user