diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/LineageSearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/LineageSearchService.java index 5fb1ab0889..930a55c56c 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/LineageSearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/LineageSearchService.java @@ -189,11 +189,14 @@ public class LineageSearchService { long numEntities = 0; String codePath = null; try { - if (canDoLightning(lineageRelationships, input, inputFilters, sortCriterion)) { + Filter reducedFilters = + SearchUtils.removeCriteria(inputFilters, criterion -> criterion.getField().equals(DEGREE_FILTER_INPUT)); + + if (canDoLightning(lineageRelationships, input, reducedFilters, sortCriterion)) { codePath = "lightning"; // use lightning approach to return lineage search results LineageSearchResult lineageSearchResult = getLightningSearchResult(lineageRelationships, - inputFilters, from, size, new HashSet<>(entities)); + reducedFilters, from, size, new HashSet<>(entities)); if (!lineageSearchResult.getEntities().isEmpty()) { log.debug("Lightning Lineage entity result: {}", lineageSearchResult.getEntities().get(0).toString()); } @@ -202,7 +205,7 @@ public class LineageSearchService { } else { codePath = "tortoise"; LineageSearchResult lineageSearchResult = getSearchResultInBatches(lineageRelationships, input, - inputFilters, sortCriterion, from, size, finalFlags); + reducedFilters, sortCriterion, from, size, finalFlags); if (!lineageSearchResult.getEntities().isEmpty()) { log.debug("Lineage entity result: {}", lineageSearchResult.getEntities().get(0).toString()); } @@ -513,15 +516,13 @@ public class LineageSearchService { if (inputFilters == null) { return QueryUtils.newFilter(urnMatchCriterion); } - Filter reducedFilters = - SearchUtils.removeCriteria(inputFilters, criterion -> criterion.getField().equals(DEGREE_FILTER_INPUT)); // Add urn match criterion to each or clause - if (!CollectionUtils.isEmpty(reducedFilters.getOr())) { - for (ConjunctiveCriterion conjunctiveCriterion : reducedFilters.getOr()) { + if (!CollectionUtils.isEmpty(inputFilters.getOr())) { + for (ConjunctiveCriterion conjunctiveCriterion : inputFilters.getOr()) { conjunctiveCriterion.getAnd().add(urnMatchCriterion); } - return reducedFilters; + return inputFilters; } return QueryUtils.newFilter(urnMatchCriterion); } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/utils/SearchUtils.java b/metadata-io/src/main/java/com/linkedin/metadata/search/utils/SearchUtils.java index 38bdef5bd3..5b9ce7444a 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/utils/SearchUtils.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/utils/SearchUtils.java @@ -120,9 +120,8 @@ public class SearchUtils { } } - @Nonnull - public static Filter removeCriteria(@Nonnull Filter originalFilter, Predicate shouldRemove) { - if (originalFilter.getOr() != null) { + public static Filter removeCriteria(@Nullable Filter originalFilter, Predicate shouldRemove) { + if (originalFilter != null && originalFilter.getOr() != null) { return new Filter().setOr(new ConjunctiveCriterionArray(originalFilter.getOr() .stream() .map(criteria -> removeCriteria(criteria, shouldRemove)) diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/LineageSearchServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/LineageSearchServiceTest.java index e887269177..e5396ffaeb 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/LineageSearchServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/LineageSearchServiceTest.java @@ -132,7 +132,7 @@ public class LineageSearchServiceTest extends AbstractTestNGSpringContextTests { searchLineageCacheConfiguration.setTtlSeconds(600L); searchLineageCacheConfiguration.setLightningThreshold(withLightingCache ? -1 : 300); - _lineageSearchService = new LineageSearchService( + _lineageSearchService = spy(new LineageSearchService( new SearchService( new EntityDocCountCache(_entityRegistry, _elasticSearchService, entityDocCountCacheConfiguration), cachingEntitySearchService, @@ -143,7 +143,7 @@ public class LineageSearchServiceTest extends AbstractTestNGSpringContextTests { 100, true), new SimpleRanker()), - _graphService, _cacheManager.getCache("test"), withCache, searchLineageCacheConfiguration); + _graphService, _cacheManager.getCache("test"), withCache, searchLineageCacheConfiguration)); } @BeforeMethod @@ -371,16 +371,19 @@ public class LineageSearchServiceTest extends AbstractTestNGSpringContextTests { assertEquals(searchResult.getNumEntities().intValue(), 1); assertEquals(searchResult.getEntities().get(0).getEntity(), urn); assertEquals(searchResult.getEntities().get(0).getDegree().intValue(), 1); + verify(_lineageSearchService, times(1)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet()); searchResult = searchAcrossLineage(QueryUtils.newFilter("degree.keyword", "1"), testStar); assertEquals(searchResult.getNumEntities().intValue(), 1); assertEquals(searchResult.getEntities().get(0).getEntity(), urn); assertEquals(searchResult.getEntities().get(0).getDegree().intValue(), 1); + verify(_lineageSearchService, times(2)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet()); searchResult = searchAcrossLineage(QueryUtils.newFilter("degree.keyword", "2"), testStar); assertEquals(searchResult.getNumEntities().intValue(), 0); assertEquals(searchResult.getEntities().size(), 0); - clearCache(true); + verify(_lineageSearchService, times(3)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet()); + clearCache(true); // resets spy Urn urn2 = new TestEntityUrn("test2", "urn2", "VALUE_2"); ObjectNode document2 = JsonNodeFactory.instance.objectNode(); @@ -394,6 +397,7 @@ public class LineageSearchServiceTest extends AbstractTestNGSpringContextTests { searchResult = searchAcrossLineage(null, testStar); assertEquals(searchResult.getNumEntities().intValue(), 1); assertEquals(searchResult.getEntities().get(0).getEntity(), urn); + verify(_lineageSearchService, times(1)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet()); clearCache(true); when(_graphService.getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(), @@ -402,10 +406,12 @@ public class LineageSearchServiceTest extends AbstractTestNGSpringContextTests { searchResult = searchAcrossLineage(null, testStar); assertEquals(searchResult.getNumEntities().intValue(), 1); assertEquals(searchResult.getEntities().size(), 1); + verify(_lineageSearchService, times(1)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet()); clearCache(true); // Test Cache Behavior - Mockito.reset(_graphService); + reset(_graphService); + reset(_lineageSearchService); // Case 1: Use the maxHops in the cache. when(_graphService.getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(), @@ -421,16 +427,18 @@ public class LineageSearchServiceTest extends AbstractTestNGSpringContextTests { new SearchFlags().setSkipCache(false)); assertEquals(searchResult.getNumEntities().intValue(), 1); - Mockito.verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(), + verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(), eq(1000), eq(null), eq(null)); + verify(_lineageSearchService, times(1)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet()); // Hit the cache on second attempt searchResult = _lineageSearchService.searchAcrossLineage(TEST_URN, LineageDirection.DOWNSTREAM, ImmutableList.of(ENTITY_NAME), "*", 1000, null, null, 0, 10, null, null, new SearchFlags().setSkipCache(false)); assertEquals(searchResult.getNumEntities().intValue(), 1); - Mockito.verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(), + verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(), eq(1000), eq(null), eq(null)); + verify(_lineageSearchService, times(2)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet()); // Case 2: Use the start and end time in the cache. @@ -447,20 +455,24 @@ public class LineageSearchServiceTest extends AbstractTestNGSpringContextTests { new SearchFlags().setSkipCache(false)); assertEquals(searchResult.getNumEntities().intValue(), 1); - Mockito.verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(), + verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(), eq(1000), eq(0L), eq(1L)); + verify(_lineageSearchService, times(3)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet()); // Hit the cache on second attempt searchResult = _lineageSearchService.searchAcrossLineage(TEST_URN, LineageDirection.DOWNSTREAM, ImmutableList.of(ENTITY_NAME), "*", null, null, null, 0, 10, 0L, 1L, new SearchFlags().setSkipCache(false)); assertEquals(searchResult.getNumEntities().intValue(), 1); - Mockito.verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(), + verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(), eq(1000), eq(0L), eq(1L)); + verify(_lineageSearchService, times(4)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet()); /* * Test filtering */ + reset(_lineageSearchService); + // Entity searchResult = _lineageSearchService.searchAcrossLineage(TEST_URN, LineageDirection.DOWNSTREAM, ImmutableList.of(DATASET_ENTITY_NAME), @@ -468,6 +480,7 @@ public class LineageSearchServiceTest extends AbstractTestNGSpringContextTests { new SearchFlags().setSkipCache(false)); assertEquals(searchResult.getNumEntities().intValue(), 0); assertEquals(searchResult.getEntities().size(), 0); + verify(_lineageSearchService, times(1)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet()); // Cached searchResult = @@ -476,25 +489,35 @@ public class LineageSearchServiceTest extends AbstractTestNGSpringContextTests { new SearchFlags().setSkipCache(false)); Mockito.verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(), eq(1000), eq(0L), eq(1L)); + verify(_lineageSearchService, times(2)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet()); assertEquals(searchResult.getNumEntities().intValue(), 0); assertEquals(searchResult.getEntities().size(), 0); // Platform - Filter filter = QueryUtils.newFilter("platform", "urn:li:dataPlatform:kafka"); + ConjunctiveCriterionArray conCritArr = new ConjunctiveCriterionArray(); + Criterion platform1Crit = new Criterion().setField("platform").setValue("urn:li:dataPlatform:kafka").setCondition(Condition.EQUAL); + CriterionArray critArr = new CriterionArray(ImmutableList.of(platform1Crit)); + conCritArr.add(new ConjunctiveCriterion().setAnd(critArr)); + Criterion degreeCrit = new Criterion().setField("degree.keyword").setValue("2").setCondition(Condition.EQUAL); + conCritArr.add(new ConjunctiveCriterion().setAnd(new CriterionArray(ImmutableList.of(degreeCrit)))); + Filter filter = new Filter().setOr(conCritArr); + searchResult = _lineageSearchService.searchAcrossLineage(TEST_URN, LineageDirection.DOWNSTREAM, ImmutableList.of(ENTITY_NAME), "*", 1000, filter, null, 0, 10, null, null, new SearchFlags().setSkipCache(false)); assertEquals(searchResult.getNumEntities().intValue(), 0); assertEquals(searchResult.getEntities().size(), 0); + verify(_lineageSearchService, times(3)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet()); // Cached searchResult = _lineageSearchService.searchAcrossLineage(TEST_URN, LineageDirection.DOWNSTREAM, ImmutableList.of(ENTITY_NAME), "*", 1000, filter, null, 0, 10, null, null, new SearchFlags().setSkipCache(false)); - Mockito.verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(), + verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(), eq(1000), eq(0L), eq(1L)); + verify(_lineageSearchService, times(4)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet()); assertEquals(searchResult.getNumEntities().intValue(), 0); assertEquals(searchResult.getEntities().size(), 0); @@ -506,14 +529,16 @@ public class LineageSearchServiceTest extends AbstractTestNGSpringContextTests { new SearchFlags().setSkipCache(false)); assertEquals(searchResult.getNumEntities().intValue(), 0); assertEquals(searchResult.getEntities().size(), 0); + verify(_lineageSearchService, times(5)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet()); // Cached searchResult = _lineageSearchService.searchAcrossLineage(TEST_URN, LineageDirection.DOWNSTREAM, ImmutableList.of(ENTITY_NAME), "*", 1000, originFilter, null, 0, 10, null, null, new SearchFlags().setSkipCache(false)); - Mockito.verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(), + verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(), eq(1000), eq(0L), eq(1L)); + verify(_lineageSearchService, times(6)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet()); assertEquals(searchResult.getNumEntities().intValue(), 0); assertEquals(searchResult.getEntities().size(), 0); @@ -749,7 +774,5 @@ public class LineageSearchServiceTest extends AbstractTestNGSpringContextTests { size = 10; filter = new Filter().setOr(conCritArr); Assert.assertTrue(_lineageSearchService.canDoLightning(lineageRelationships, "*", filter, null)); - - } }