diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java index 92960bc922..97cb186ce9 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java @@ -336,17 +336,10 @@ public class ESGraphQueryDAO { Collectors.toMap( Function.identity(), entityType -> lineageRegistry.getLineageRelationships(entityType, direction))); - BoolQueryBuilder finalQuery = QueryBuilders.boolQuery(); - // Get all relation types relevant to the set of urns to hop from - urnsPerEntityType.forEach( - (entityType, urns) -> - finalQuery.should( - getQueryForLineage( - urns, - edgesPerEntityType.getOrDefault(entityType, Collections.emptyList()), - graphFilters, - startTimeMillis, - endTimeMillis))); + + QueryBuilder finalQuery = + getLineageQuery( + urnsPerEntityType, edgesPerEntityType, graphFilters, startTimeMillis, endTimeMillis); SearchResponse response = executeSearchQuery(finalQuery, 0, graphQueryConfiguration.getMaxResult()); Set entityUrnSet = new HashSet<>(entityUrns); @@ -361,18 +354,53 @@ public class ESGraphQueryDAO { entityUrnSet, response, validEdges, visitedEntities, numHops, existingPaths); } - // Get search query for given list of edges and source urns @VisibleForTesting - public static QueryBuilder getQueryForLineage( - @Nonnull List urns, - @Nonnull List lineageEdges, + public static QueryBuilder getLineageQuery( + @Nonnull Map> urnsPerEntityType, + @Nonnull Map> edgesPerEntityType, @Nonnull GraphFilters graphFilters, @Nullable Long startTimeMillis, @Nullable Long endTimeMillis) { - BoolQueryBuilder query = QueryBuilders.boolQuery(); - if (lineageEdges.isEmpty()) { - return query; + BoolQueryBuilder entityTypeQueries = QueryBuilders.boolQuery(); + // Get all relation types relevant to the set of urns to hop from + urnsPerEntityType.forEach( + (entityType, urns) -> { + if (edgesPerEntityType.containsKey(entityType) + && !edgesPerEntityType.get(entityType).isEmpty()) { + entityTypeQueries.should( + getLineageQueryForEntityType( + urns, edgesPerEntityType.get(entityType), graphFilters)); + } + }); + + BoolQueryBuilder finalQuery = QueryBuilders.boolQuery(); + + finalQuery.filter(entityTypeQueries); + finalQuery.filter(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), SOURCE)); + finalQuery.filter(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), DESTINATION)); + + /* + * Optional - Add edge filtering based on time windows. + */ + if (startTimeMillis != null && endTimeMillis != null) { + finalQuery.filter(TimeFilterUtils.getEdgeTimeFilterQuery(startTimeMillis, endTimeMillis)); + } else { + log.debug( + String.format( + "Empty time filter range provided: start time %s, end time: %s. Skipping application of time filters", + startTimeMillis, endTimeMillis)); } + + return finalQuery; + } + + // Get search query for given list of edges and source urns + @VisibleForTesting + public static QueryBuilder getLineageQueryForEntityType( + @Nonnull List urns, + @Nonnull List lineageEdges, + @Nonnull GraphFilters graphFilters) { + BoolQueryBuilder query = QueryBuilders.boolQuery(); Map> edgesByDirection = lineageEdges.stream().collect(Collectors.groupingBy(EdgeInfo::getDirection)); @@ -388,18 +416,6 @@ public class ESGraphQueryDAO { query.should(getIncomingEdgeQuery(urns, incomingEdges, graphFilters)); } - /* - * Optional - Add edge filtering based on time windows. - */ - if (startTimeMillis != null && endTimeMillis != null) { - query.must(TimeFilterUtils.getEdgeTimeFilterQuery(startTimeMillis, endTimeMillis)); - } else { - log.debug( - String.format( - "Empty time filter range provided: start time %s, end time: %s. Skipping application of time filters", - startTimeMillis, endTimeMillis)); - } - return query; } @@ -601,9 +617,6 @@ public class ESGraphQueryDAO { BoolQueryBuilder outgoingEdgeQuery = QueryBuilders.boolQuery(); outgoingEdgeQuery.must(buildUrnFilters(urns, SOURCE)); outgoingEdgeQuery.must(buildEdgeFilters(outgoingEdges)); - outgoingEdgeQuery.must(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), SOURCE)); - outgoingEdgeQuery.must( - buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), DESTINATION)); return outgoingEdgeQuery; } @@ -612,9 +625,6 @@ public class ESGraphQueryDAO { BoolQueryBuilder incomingEdgeQuery = QueryBuilders.boolQuery(); incomingEdgeQuery.must(buildUrnFilters(urns, DESTINATION)); incomingEdgeQuery.must(buildEdgeFilters(incomingEdges)); - incomingEdgeQuery.must(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), SOURCE)); - incomingEdgeQuery.must( - buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), DESTINATION)); return incomingEdgeQuery; } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/search/ESGraphQueryDAOTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/search/ESGraphQueryDAOTest.java index 9fc9490bfd..5b7f880e6d 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/search/ESGraphQueryDAOTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/search/ESGraphQueryDAOTest.java @@ -23,16 +23,40 @@ import org.testng.annotations.Test; public class ESGraphQueryDAOTest { - private static final String TEST_QUERY_FILE = - "elasticsearch/sample_filters/lineage_query_filters_1.json"; + private static final String TEST_QUERY_FILE_LIMITED = + "elasticsearch/sample_filters/lineage_query_filters_limited.json"; + private static final String TEST_QUERY_FILE_FULL = + "elasticsearch/sample_filters/lineage_query_filters_full.json"; + private static final String TEST_QUERY_FILE_FULL_EMPTY_FILTERS = + "elasticsearch/sample_filters/lineage_query_filters_full_empty_filters.json"; + private static final String TEST_QUERY_FILE_FULL_MULTIPLE_FILTERS = + "elasticsearch/sample_filters/lineage_query_filters_full_multiple_filters.json"; @Test private static void testGetQueryForLineageFullArguments() throws Exception { - URL url = Resources.getResource(TEST_QUERY_FILE); - String expectedQuery = Resources.toString(url, StandardCharsets.UTF_8); + URL urlLimited = Resources.getResource(TEST_QUERY_FILE_LIMITED); + String expectedQueryLimited = Resources.toString(urlLimited, StandardCharsets.UTF_8); + URL urlFull = Resources.getResource(TEST_QUERY_FILE_FULL); + String expectedQueryFull = Resources.toString(urlFull, StandardCharsets.UTF_8); + URL urlFullEmptyFilters = Resources.getResource(TEST_QUERY_FILE_FULL_EMPTY_FILTERS); + String expectedQueryFullEmptyFilters = + Resources.toString(urlFullEmptyFilters, StandardCharsets.UTF_8); + URL urlFullMultipleFilters = Resources.getResource(TEST_QUERY_FILE_FULL_MULTIPLE_FILTERS); + String expectedQueryFullMultipleFilters = + Resources.toString(urlFullMultipleFilters, StandardCharsets.UTF_8); - List urns = new ArrayList<>(); + List urns = List.of(Urn.createFromString("urn:li:dataset:test-urn")); + List urnsMultiple1 = + ImmutableList.of( + UrnUtils.getUrn("urn:li:dataset:test-urn"), + UrnUtils.getUrn("urn:li:dataset:test-urn2"), + UrnUtils.getUrn("urn:li:dataset:test-urn3")); + List urnsMultiple2 = + ImmutableList.of( + UrnUtils.getUrn("urn:li:chart:test-urn"), + UrnUtils.getUrn("urn:li:chart:test-urn2"), + UrnUtils.getUrn("urn:li:chart:test-urn3")); List edgeInfos = new ArrayList<>( ImmutableList.of( @@ -40,14 +64,64 @@ public class ESGraphQueryDAOTest { "DownstreamOf", RelationshipDirection.INCOMING, Constants.DATASET_ENTITY_NAME))); + List edgeInfosMultiple1 = + ImmutableList.of( + new LineageRegistry.EdgeInfo( + "DownstreamOf", RelationshipDirection.OUTGOING, Constants.DATASET_ENTITY_NAME), + new LineageRegistry.EdgeInfo( + "Consumes", RelationshipDirection.OUTGOING, Constants.DATASET_ENTITY_NAME)); + List edgeInfosMultiple2 = + ImmutableList.of( + new LineageRegistry.EdgeInfo( + "DownstreamOf", RelationshipDirection.OUTGOING, Constants.DATA_JOB_ENTITY_NAME), + new LineageRegistry.EdgeInfo( + "Consumes", RelationshipDirection.OUTGOING, Constants.DATA_JOB_ENTITY_NAME)); + String entityType = "testEntityType"; + Map> urnsPerEntityType = Map.of(entityType, urns); + Map> urnsPerEntityTypeMultiple = + Map.of( + Constants.DATASET_ENTITY_NAME, + urnsMultiple1, + Constants.CHART_ENTITY_NAME, + urnsMultiple2); + Map> edgesPerEntityType = Map.of(entityType, edgeInfos); + Map> edgesPerEntityTypeMultiple = + Map.of( + Constants.DATASET_ENTITY_NAME, edgeInfosMultiple1, + Constants.DATA_JOB_ENTITY_NAME, edgeInfosMultiple2); GraphFilters graphFilters = new GraphFilters(ImmutableList.of(Constants.DATASET_ENTITY_NAME)); + GraphFilters graphFiltersMultiple = + new GraphFilters( + ImmutableList.of( + Constants.DATASET_ENTITY_NAME, + Constants.DASHBOARD_ENTITY_NAME, + Constants.DATA_JOB_ENTITY_NAME)); Long startTime = 0L; Long endTime = 1L; - QueryBuilder builder = - ESGraphQueryDAO.getQueryForLineage(urns, edgeInfos, graphFilters, startTime, endTime); + QueryBuilder limitedBuilder = + ESGraphQueryDAO.getLineageQueryForEntityType(urns, edgeInfos, graphFilters); - Assert.assertEquals(builder.toString(), expectedQuery); + QueryBuilder fullBuilder = + ESGraphQueryDAO.getLineageQuery( + urnsPerEntityType, edgesPerEntityType, graphFilters, startTime, endTime); + + QueryBuilder fullBuilderEmptyFilters = + ESGraphQueryDAO.getLineageQuery( + urnsPerEntityType, edgesPerEntityType, GraphFilters.emptyGraphFilters, null, null); + + QueryBuilder fullBuilderMultipleFilters = + ESGraphQueryDAO.getLineageQuery( + urnsPerEntityTypeMultiple, + edgesPerEntityTypeMultiple, + graphFiltersMultiple, + startTime, + endTime); + + Assert.assertEquals(limitedBuilder.toString(), expectedQueryLimited); + Assert.assertEquals(fullBuilder.toString(), expectedQueryFull); + Assert.assertEquals(fullBuilderEmptyFilters.toString(), expectedQueryFullEmptyFilters); + Assert.assertEquals(fullBuilderMultipleFilters.toString(), expectedQueryFullMultipleFilters); } @Test diff --git a/metadata-io/src/test/resources/elasticsearch/sample_filters/lineage_query_filters_1.json b/metadata-io/src/test/resources/elasticsearch/sample_filters/lineage_query_filters_full.json similarity index 81% rename from metadata-io/src/test/resources/elasticsearch/sample_filters/lineage_query_filters_1.json rename to metadata-io/src/test/resources/elasticsearch/sample_filters/lineage_query_filters_full.json index eb84638f0c..0a1cee0841 100644 --- a/metadata-io/src/test/resources/elasticsearch/sample_filters/lineage_query_filters_1.json +++ b/metadata-io/src/test/resources/elasticsearch/sample_filters/lineage_query_filters_full.json @@ -1,6 +1,62 @@ { "bool" : { - "must" : [ + "filter" : [ + { + "bool" : { + "should" : [ + { + "bool" : { + "should" : [ + { + "bool" : { + "must" : [ + { + "terms" : { + "destination.urn" : [ + "urn:li:dataset:test-urn" + ], + "boost" : 1.0 + } + }, + { + "terms" : { + "relationshipType" : [ + "DownstreamOf" + ], + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + }, + { + "terms" : { + "source.entityType" : [ + "dataset" + ], + "boost" : 1.0 + } + }, + { + "terms" : { + "destination.entityType" : [ + "dataset" + ], + "boost" : 1.0 + } + }, { "bool" : { "should" : [ @@ -160,46 +216,6 @@ } } ], - "should" : [ - { - "bool" : { - "must" : [ - { - "terms" : { - "destination.urn" : [ ], - "boost" : 1.0 - } - }, - { - "terms" : { - "relationshipType" : [ - "DownstreamOf" - ], - "boost" : 1.0 - } - }, - { - "terms" : { - "source.entityType" : [ - "dataset" - ], - "boost" : 1.0 - } - }, - { - "terms" : { - "destination.entityType" : [ - "dataset" - ], - "boost" : 1.0 - } - } - ], - "adjust_pure_negative" : true, - "boost" : 1.0 - } - } - ], "adjust_pure_negative" : true, "boost" : 1.0 } diff --git a/metadata-io/src/test/resources/elasticsearch/sample_filters/lineage_query_filters_full_empty_filters.json b/metadata-io/src/test/resources/elasticsearch/sample_filters/lineage_query_filters_full_empty_filters.json new file mode 100644 index 0000000000..ab2841d660 --- /dev/null +++ b/metadata-io/src/test/resources/elasticsearch/sample_filters/lineage_query_filters_full_empty_filters.json @@ -0,0 +1,60 @@ +{ + "bool" : { + "filter" : [ + { + "bool" : { + "should" : [ + { + "bool" : { + "should" : [ + { + "bool" : { + "must" : [ + { + "terms" : { + "destination.urn" : [ + "urn:li:dataset:test-urn" + ], + "boost" : 1.0 + } + }, + { + "terms" : { + "relationshipType" : [ + "DownstreamOf" + ], + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + }, + { + "terms" : { + "source.entityType" : [ ], + "boost" : 1.0 + } + }, + { + "terms" : { + "destination.entityType" : [ ], + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } +} \ No newline at end of file diff --git a/metadata-io/src/test/resources/elasticsearch/sample_filters/lineage_query_filters_full_multiple_filters.json b/metadata-io/src/test/resources/elasticsearch/sample_filters/lineage_query_filters_full_multiple_filters.json new file mode 100644 index 0000000000..39f595e0e8 --- /dev/null +++ b/metadata-io/src/test/resources/elasticsearch/sample_filters/lineage_query_filters_full_multiple_filters.json @@ -0,0 +1,229 @@ +{ + "bool" : { + "filter" : [ + { + "bool" : { + "should" : [ + { + "bool" : { + "should" : [ + { + "bool" : { + "must" : [ + { + "terms" : { + "source.urn" : [ + "urn:li:dataset:test-urn", + "urn:li:dataset:test-urn2", + "urn:li:dataset:test-urn3" + ], + "boost" : 1.0 + } + }, + { + "terms" : { + "relationshipType" : [ + "DownstreamOf", + "Consumes" + ], + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + }, + { + "terms" : { + "source.entityType" : [ + "dataset", + "dashboard", + "dataJob" + ], + "boost" : 1.0 + } + }, + { + "terms" : { + "destination.entityType" : [ + "dataset", + "dashboard", + "dataJob" + ], + "boost" : 1.0 + } + }, + { + "bool" : { + "should" : [ + { + "bool" : { + "should" : [ + { + "bool" : { + "must" : [ + { + "exists" : { + "field" : "createdOn", + "boost" : 1.0 + } + }, + { + "range" : { + "createdOn" : { + "from" : 0, + "to" : 1, + "include_lower" : true, + "include_upper" : true, + "boost" : 1.0 + } + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + }, + { + "bool" : { + "must" : [ + { + "exists" : { + "field" : "updatedOn", + "boost" : 1.0 + } + }, + { + "range" : { + "updatedOn" : { + "from" : 0, + "to" : 1, + "include_lower" : true, + "include_upper" : true, + "boost" : 1.0 + } + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + }, + { + "bool" : { + "must" : [ + { + "bool" : { + "should" : [ + { + "bool" : { + "must_not" : [ + { + "exists" : { + "field" : "createdOn", + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + }, + { + "bool" : { + "must" : [ + { + "term" : { + "createdOn" : { + "value" : 0, + "boost" : 1.0 + } + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + }, + { + "bool" : { + "should" : [ + { + "bool" : { + "must_not" : [ + { + "exists" : { + "field" : "updatedOn", + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + }, + { + "bool" : { + "must" : [ + { + "term" : { + "updatedOn" : { + "value" : 0, + "boost" : 1.0 + } + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + }, + { + "term" : { + "properties.source" : { + "value" : "UI", + "boost" : 1.0 + } + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } +} \ No newline at end of file diff --git a/metadata-io/src/test/resources/elasticsearch/sample_filters/lineage_query_filters_limited.json b/metadata-io/src/test/resources/elasticsearch/sample_filters/lineage_query_filters_limited.json new file mode 100644 index 0000000000..95d468ec3d --- /dev/null +++ b/metadata-io/src/test/resources/elasticsearch/sample_filters/lineage_query_filters_limited.json @@ -0,0 +1,32 @@ +{ + "bool" : { + "should" : [ + { + "bool" : { + "must" : [ + { + "terms" : { + "destination.urn" : [ + "urn:li:dataset:test-urn" + ], + "boost" : 1.0 + } + }, + { + "terms" : { + "relationshipType" : [ + "DownstreamOf" + ], + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } +} \ No newline at end of file