mirror of
https://github.com/datahub-project/datahub.git
synced 2025-10-13 18:04:55 +00:00
fix(search): fix lightning cache enable logic (#8522)
This commit is contained in:
parent
89f23d3c36
commit
fa05aae959
@ -189,11 +189,14 @@ public class LineageSearchService {
|
||||
long numEntities = 0;
|
||||
String codePath = null;
|
||||
try {
|
||||
if (canDoLightning(lineageRelationships, input, inputFilters, sortCriterion)) {
|
||||
Filter reducedFilters =
|
||||
SearchUtils.removeCriteria(inputFilters, criterion -> criterion.getField().equals(DEGREE_FILTER_INPUT));
|
||||
|
||||
if (canDoLightning(lineageRelationships, input, reducedFilters, sortCriterion)) {
|
||||
codePath = "lightning";
|
||||
// use lightning approach to return lineage search results
|
||||
LineageSearchResult lineageSearchResult = getLightningSearchResult(lineageRelationships,
|
||||
inputFilters, from, size, new HashSet<>(entities));
|
||||
reducedFilters, from, size, new HashSet<>(entities));
|
||||
if (!lineageSearchResult.getEntities().isEmpty()) {
|
||||
log.debug("Lightning Lineage entity result: {}", lineageSearchResult.getEntities().get(0).toString());
|
||||
}
|
||||
@ -202,7 +205,7 @@ public class LineageSearchService {
|
||||
} else {
|
||||
codePath = "tortoise";
|
||||
LineageSearchResult lineageSearchResult = getSearchResultInBatches(lineageRelationships, input,
|
||||
inputFilters, sortCriterion, from, size, finalFlags);
|
||||
reducedFilters, sortCriterion, from, size, finalFlags);
|
||||
if (!lineageSearchResult.getEntities().isEmpty()) {
|
||||
log.debug("Lineage entity result: {}", lineageSearchResult.getEntities().get(0).toString());
|
||||
}
|
||||
@ -513,15 +516,13 @@ public class LineageSearchService {
|
||||
if (inputFilters == null) {
|
||||
return QueryUtils.newFilter(urnMatchCriterion);
|
||||
}
|
||||
Filter reducedFilters =
|
||||
SearchUtils.removeCriteria(inputFilters, criterion -> criterion.getField().equals(DEGREE_FILTER_INPUT));
|
||||
|
||||
// Add urn match criterion to each or clause
|
||||
if (!CollectionUtils.isEmpty(reducedFilters.getOr())) {
|
||||
for (ConjunctiveCriterion conjunctiveCriterion : reducedFilters.getOr()) {
|
||||
if (!CollectionUtils.isEmpty(inputFilters.getOr())) {
|
||||
for (ConjunctiveCriterion conjunctiveCriterion : inputFilters.getOr()) {
|
||||
conjunctiveCriterion.getAnd().add(urnMatchCriterion);
|
||||
}
|
||||
return reducedFilters;
|
||||
return inputFilters;
|
||||
}
|
||||
return QueryUtils.newFilter(urnMatchCriterion);
|
||||
}
|
||||
|
@ -120,9 +120,8 @@ public class SearchUtils {
|
||||
}
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
public static Filter removeCriteria(@Nonnull Filter originalFilter, Predicate<Criterion> shouldRemove) {
|
||||
if (originalFilter.getOr() != null) {
|
||||
public static Filter removeCriteria(@Nullable Filter originalFilter, Predicate<Criterion> shouldRemove) {
|
||||
if (originalFilter != null && originalFilter.getOr() != null) {
|
||||
return new Filter().setOr(new ConjunctiveCriterionArray(originalFilter.getOr()
|
||||
.stream()
|
||||
.map(criteria -> removeCriteria(criteria, shouldRemove))
|
||||
|
@ -132,7 +132,7 @@ public class LineageSearchServiceTest extends AbstractTestNGSpringContextTests {
|
||||
searchLineageCacheConfiguration.setTtlSeconds(600L);
|
||||
searchLineageCacheConfiguration.setLightningThreshold(withLightingCache ? -1 : 300);
|
||||
|
||||
_lineageSearchService = new LineageSearchService(
|
||||
_lineageSearchService = spy(new LineageSearchService(
|
||||
new SearchService(
|
||||
new EntityDocCountCache(_entityRegistry, _elasticSearchService, entityDocCountCacheConfiguration),
|
||||
cachingEntitySearchService,
|
||||
@ -143,7 +143,7 @@ public class LineageSearchServiceTest extends AbstractTestNGSpringContextTests {
|
||||
100,
|
||||
true),
|
||||
new SimpleRanker()),
|
||||
_graphService, _cacheManager.getCache("test"), withCache, searchLineageCacheConfiguration);
|
||||
_graphService, _cacheManager.getCache("test"), withCache, searchLineageCacheConfiguration));
|
||||
}
|
||||
|
||||
@BeforeMethod
|
||||
@ -371,16 +371,19 @@ public class LineageSearchServiceTest extends AbstractTestNGSpringContextTests {
|
||||
assertEquals(searchResult.getNumEntities().intValue(), 1);
|
||||
assertEquals(searchResult.getEntities().get(0).getEntity(), urn);
|
||||
assertEquals(searchResult.getEntities().get(0).getDegree().intValue(), 1);
|
||||
verify(_lineageSearchService, times(1)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());
|
||||
|
||||
searchResult = searchAcrossLineage(QueryUtils.newFilter("degree.keyword", "1"), testStar);
|
||||
assertEquals(searchResult.getNumEntities().intValue(), 1);
|
||||
assertEquals(searchResult.getEntities().get(0).getEntity(), urn);
|
||||
assertEquals(searchResult.getEntities().get(0).getDegree().intValue(), 1);
|
||||
verify(_lineageSearchService, times(2)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());
|
||||
|
||||
searchResult = searchAcrossLineage(QueryUtils.newFilter("degree.keyword", "2"), testStar);
|
||||
assertEquals(searchResult.getNumEntities().intValue(), 0);
|
||||
assertEquals(searchResult.getEntities().size(), 0);
|
||||
clearCache(true);
|
||||
verify(_lineageSearchService, times(3)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());
|
||||
clearCache(true); // resets spy
|
||||
|
||||
Urn urn2 = new TestEntityUrn("test2", "urn2", "VALUE_2");
|
||||
ObjectNode document2 = JsonNodeFactory.instance.objectNode();
|
||||
@ -394,6 +397,7 @@ public class LineageSearchServiceTest extends AbstractTestNGSpringContextTests {
|
||||
searchResult = searchAcrossLineage(null, testStar);
|
||||
assertEquals(searchResult.getNumEntities().intValue(), 1);
|
||||
assertEquals(searchResult.getEntities().get(0).getEntity(), urn);
|
||||
verify(_lineageSearchService, times(1)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());
|
||||
clearCache(true);
|
||||
|
||||
when(_graphService.getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
|
||||
@ -402,10 +406,12 @@ public class LineageSearchServiceTest extends AbstractTestNGSpringContextTests {
|
||||
searchResult = searchAcrossLineage(null, testStar);
|
||||
assertEquals(searchResult.getNumEntities().intValue(), 1);
|
||||
assertEquals(searchResult.getEntities().size(), 1);
|
||||
verify(_lineageSearchService, times(1)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());
|
||||
clearCache(true);
|
||||
|
||||
// Test Cache Behavior
|
||||
Mockito.reset(_graphService);
|
||||
reset(_graphService);
|
||||
reset(_lineageSearchService);
|
||||
|
||||
// Case 1: Use the maxHops in the cache.
|
||||
when(_graphService.getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
|
||||
@ -421,16 +427,18 @@ public class LineageSearchServiceTest extends AbstractTestNGSpringContextTests {
|
||||
new SearchFlags().setSkipCache(false));
|
||||
|
||||
assertEquals(searchResult.getNumEntities().intValue(), 1);
|
||||
Mockito.verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
|
||||
verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
|
||||
eq(1000), eq(null), eq(null));
|
||||
verify(_lineageSearchService, times(1)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());
|
||||
|
||||
// Hit the cache on second attempt
|
||||
searchResult = _lineageSearchService.searchAcrossLineage(TEST_URN, LineageDirection.DOWNSTREAM, ImmutableList.of(ENTITY_NAME),
|
||||
"*", 1000, null, null, 0, 10, null, null,
|
||||
new SearchFlags().setSkipCache(false));
|
||||
assertEquals(searchResult.getNumEntities().intValue(), 1);
|
||||
Mockito.verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
|
||||
verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
|
||||
eq(1000), eq(null), eq(null));
|
||||
verify(_lineageSearchService, times(2)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());
|
||||
|
||||
|
||||
// Case 2: Use the start and end time in the cache.
|
||||
@ -447,20 +455,24 @@ public class LineageSearchServiceTest extends AbstractTestNGSpringContextTests {
|
||||
new SearchFlags().setSkipCache(false));
|
||||
|
||||
assertEquals(searchResult.getNumEntities().intValue(), 1);
|
||||
Mockito.verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
|
||||
verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
|
||||
eq(1000), eq(0L), eq(1L));
|
||||
verify(_lineageSearchService, times(3)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());
|
||||
|
||||
// Hit the cache on second attempt
|
||||
searchResult = _lineageSearchService.searchAcrossLineage(TEST_URN, LineageDirection.DOWNSTREAM, ImmutableList.of(ENTITY_NAME),
|
||||
"*", null, null, null, 0, 10, 0L, 1L,
|
||||
new SearchFlags().setSkipCache(false));
|
||||
assertEquals(searchResult.getNumEntities().intValue(), 1);
|
||||
Mockito.verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
|
||||
verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
|
||||
eq(1000), eq(0L), eq(1L));
|
||||
verify(_lineageSearchService, times(4)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());
|
||||
|
||||
/*
|
||||
* Test filtering
|
||||
*/
|
||||
reset(_lineageSearchService);
|
||||
|
||||
// Entity
|
||||
searchResult =
|
||||
_lineageSearchService.searchAcrossLineage(TEST_URN, LineageDirection.DOWNSTREAM, ImmutableList.of(DATASET_ENTITY_NAME),
|
||||
@ -468,6 +480,7 @@ public class LineageSearchServiceTest extends AbstractTestNGSpringContextTests {
|
||||
new SearchFlags().setSkipCache(false));
|
||||
assertEquals(searchResult.getNumEntities().intValue(), 0);
|
||||
assertEquals(searchResult.getEntities().size(), 0);
|
||||
verify(_lineageSearchService, times(1)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());
|
||||
|
||||
// Cached
|
||||
searchResult =
|
||||
@ -476,25 +489,35 @@ public class LineageSearchServiceTest extends AbstractTestNGSpringContextTests {
|
||||
new SearchFlags().setSkipCache(false));
|
||||
Mockito.verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
|
||||
eq(1000), eq(0L), eq(1L));
|
||||
verify(_lineageSearchService, times(2)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());
|
||||
assertEquals(searchResult.getNumEntities().intValue(), 0);
|
||||
assertEquals(searchResult.getEntities().size(), 0);
|
||||
|
||||
// Platform
|
||||
Filter filter = QueryUtils.newFilter("platform", "urn:li:dataPlatform:kafka");
|
||||
ConjunctiveCriterionArray conCritArr = new ConjunctiveCriterionArray();
|
||||
Criterion platform1Crit = new Criterion().setField("platform").setValue("urn:li:dataPlatform:kafka").setCondition(Condition.EQUAL);
|
||||
CriterionArray critArr = new CriterionArray(ImmutableList.of(platform1Crit));
|
||||
conCritArr.add(new ConjunctiveCriterion().setAnd(critArr));
|
||||
Criterion degreeCrit = new Criterion().setField("degree.keyword").setValue("2").setCondition(Condition.EQUAL);
|
||||
conCritArr.add(new ConjunctiveCriterion().setAnd(new CriterionArray(ImmutableList.of(degreeCrit))));
|
||||
Filter filter = new Filter().setOr(conCritArr);
|
||||
|
||||
searchResult =
|
||||
_lineageSearchService.searchAcrossLineage(TEST_URN, LineageDirection.DOWNSTREAM, ImmutableList.of(ENTITY_NAME),
|
||||
"*", 1000, filter, null, 0, 10, null, null,
|
||||
new SearchFlags().setSkipCache(false));
|
||||
assertEquals(searchResult.getNumEntities().intValue(), 0);
|
||||
assertEquals(searchResult.getEntities().size(), 0);
|
||||
verify(_lineageSearchService, times(3)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());
|
||||
|
||||
// Cached
|
||||
searchResult =
|
||||
_lineageSearchService.searchAcrossLineage(TEST_URN, LineageDirection.DOWNSTREAM, ImmutableList.of(ENTITY_NAME),
|
||||
"*", 1000, filter, null, 0, 10, null, null,
|
||||
new SearchFlags().setSkipCache(false));
|
||||
Mockito.verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
|
||||
verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
|
||||
eq(1000), eq(0L), eq(1L));
|
||||
verify(_lineageSearchService, times(4)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());
|
||||
assertEquals(searchResult.getNumEntities().intValue(), 0);
|
||||
assertEquals(searchResult.getEntities().size(), 0);
|
||||
|
||||
@ -506,14 +529,16 @@ public class LineageSearchServiceTest extends AbstractTestNGSpringContextTests {
|
||||
new SearchFlags().setSkipCache(false));
|
||||
assertEquals(searchResult.getNumEntities().intValue(), 0);
|
||||
assertEquals(searchResult.getEntities().size(), 0);
|
||||
verify(_lineageSearchService, times(5)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());
|
||||
|
||||
// Cached
|
||||
searchResult =
|
||||
_lineageSearchService.searchAcrossLineage(TEST_URN, LineageDirection.DOWNSTREAM, ImmutableList.of(ENTITY_NAME),
|
||||
"*", 1000, originFilter, null, 0, 10, null, null,
|
||||
new SearchFlags().setSkipCache(false));
|
||||
Mockito.verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
|
||||
verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
|
||||
eq(1000), eq(0L), eq(1L));
|
||||
verify(_lineageSearchService, times(6)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());
|
||||
assertEquals(searchResult.getNumEntities().intValue(), 0);
|
||||
assertEquals(searchResult.getEntities().size(), 0);
|
||||
|
||||
@ -749,7 +774,5 @@ public class LineageSearchServiceTest extends AbstractTestNGSpringContextTests {
|
||||
size = 10;
|
||||
filter = new Filter().setOr(conCritArr);
|
||||
Assert.assertTrue(_lineageSearchService.canDoLightning(lineageRelationships, "*", filter, null));
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user