perf(lineage): Rewrite lineage query for Elastic graph store (#9552)

This commit is contained in:
Shirshanka Das 2024-01-03 12:21:06 -08:00 committed by GitHub
parent 83b904e379
commit 186b6f942d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 506 additions and 85 deletions

View File

@ -336,17 +336,10 @@ public class ESGraphQueryDAO {
Collectors.toMap(
Function.identity(),
entityType -> lineageRegistry.getLineageRelationships(entityType, direction)));
BoolQueryBuilder finalQuery = QueryBuilders.boolQuery();
// Get all relation types relevant to the set of urns to hop from
urnsPerEntityType.forEach(
(entityType, urns) ->
finalQuery.should(
getQueryForLineage(
urns,
edgesPerEntityType.getOrDefault(entityType, Collections.emptyList()),
graphFilters,
startTimeMillis,
endTimeMillis)));
QueryBuilder finalQuery =
getLineageQuery(
urnsPerEntityType, edgesPerEntityType, graphFilters, startTimeMillis, endTimeMillis);
SearchResponse response =
executeSearchQuery(finalQuery, 0, graphQueryConfiguration.getMaxResult());
Set<Urn> entityUrnSet = new HashSet<>(entityUrns);
@ -361,18 +354,53 @@ public class ESGraphQueryDAO {
entityUrnSet, response, validEdges, visitedEntities, numHops, existingPaths);
}
// Get search query for given list of edges and source urns
@VisibleForTesting
public static QueryBuilder getQueryForLineage(
@Nonnull List<Urn> urns,
@Nonnull List<EdgeInfo> lineageEdges,
public static QueryBuilder getLineageQuery(
@Nonnull Map<String, List<Urn>> urnsPerEntityType,
@Nonnull Map<String, List<EdgeInfo>> edgesPerEntityType,
@Nonnull GraphFilters graphFilters,
@Nullable Long startTimeMillis,
@Nullable Long endTimeMillis) {
BoolQueryBuilder query = QueryBuilders.boolQuery();
if (lineageEdges.isEmpty()) {
return query;
BoolQueryBuilder entityTypeQueries = QueryBuilders.boolQuery();
// Get all relation types relevant to the set of urns to hop from
urnsPerEntityType.forEach(
(entityType, urns) -> {
if (edgesPerEntityType.containsKey(entityType)
&& !edgesPerEntityType.get(entityType).isEmpty()) {
entityTypeQueries.should(
getLineageQueryForEntityType(
urns, edgesPerEntityType.get(entityType), graphFilters));
}
});
BoolQueryBuilder finalQuery = QueryBuilders.boolQuery();
finalQuery.filter(entityTypeQueries);
finalQuery.filter(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), SOURCE));
finalQuery.filter(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), DESTINATION));
/*
* Optional - Add edge filtering based on time windows.
*/
if (startTimeMillis != null && endTimeMillis != null) {
finalQuery.filter(TimeFilterUtils.getEdgeTimeFilterQuery(startTimeMillis, endTimeMillis));
} else {
log.debug(
String.format(
"Empty time filter range provided: start time %s, end time: %s. Skipping application of time filters",
startTimeMillis, endTimeMillis));
}
return finalQuery;
}
// Get search query for given list of edges and source urns
@VisibleForTesting
public static QueryBuilder getLineageQueryForEntityType(
@Nonnull List<Urn> urns,
@Nonnull List<EdgeInfo> lineageEdges,
@Nonnull GraphFilters graphFilters) {
BoolQueryBuilder query = QueryBuilders.boolQuery();
Map<RelationshipDirection, List<EdgeInfo>> edgesByDirection =
lineageEdges.stream().collect(Collectors.groupingBy(EdgeInfo::getDirection));
@ -388,18 +416,6 @@ public class ESGraphQueryDAO {
query.should(getIncomingEdgeQuery(urns, incomingEdges, graphFilters));
}
/*
* Optional - Add edge filtering based on time windows.
*/
if (startTimeMillis != null && endTimeMillis != null) {
query.must(TimeFilterUtils.getEdgeTimeFilterQuery(startTimeMillis, endTimeMillis));
} else {
log.debug(
String.format(
"Empty time filter range provided: start time %s, end time: %s. Skipping application of time filters",
startTimeMillis, endTimeMillis));
}
return query;
}
@ -601,9 +617,6 @@ public class ESGraphQueryDAO {
BoolQueryBuilder outgoingEdgeQuery = QueryBuilders.boolQuery();
outgoingEdgeQuery.must(buildUrnFilters(urns, SOURCE));
outgoingEdgeQuery.must(buildEdgeFilters(outgoingEdges));
outgoingEdgeQuery.must(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), SOURCE));
outgoingEdgeQuery.must(
buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), DESTINATION));
return outgoingEdgeQuery;
}
@ -612,9 +625,6 @@ public class ESGraphQueryDAO {
BoolQueryBuilder incomingEdgeQuery = QueryBuilders.boolQuery();
incomingEdgeQuery.must(buildUrnFilters(urns, DESTINATION));
incomingEdgeQuery.must(buildEdgeFilters(incomingEdges));
incomingEdgeQuery.must(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), SOURCE));
incomingEdgeQuery.must(
buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), DESTINATION));
return incomingEdgeQuery;
}

View File

@ -23,16 +23,40 @@ import org.testng.annotations.Test;
public class ESGraphQueryDAOTest {
private static final String TEST_QUERY_FILE =
"elasticsearch/sample_filters/lineage_query_filters_1.json";
private static final String TEST_QUERY_FILE_LIMITED =
"elasticsearch/sample_filters/lineage_query_filters_limited.json";
private static final String TEST_QUERY_FILE_FULL =
"elasticsearch/sample_filters/lineage_query_filters_full.json";
private static final String TEST_QUERY_FILE_FULL_EMPTY_FILTERS =
"elasticsearch/sample_filters/lineage_query_filters_full_empty_filters.json";
private static final String TEST_QUERY_FILE_FULL_MULTIPLE_FILTERS =
"elasticsearch/sample_filters/lineage_query_filters_full_multiple_filters.json";
@Test
private static void testGetQueryForLineageFullArguments() throws Exception {
URL url = Resources.getResource(TEST_QUERY_FILE);
String expectedQuery = Resources.toString(url, StandardCharsets.UTF_8);
URL urlLimited = Resources.getResource(TEST_QUERY_FILE_LIMITED);
String expectedQueryLimited = Resources.toString(urlLimited, StandardCharsets.UTF_8);
URL urlFull = Resources.getResource(TEST_QUERY_FILE_FULL);
String expectedQueryFull = Resources.toString(urlFull, StandardCharsets.UTF_8);
URL urlFullEmptyFilters = Resources.getResource(TEST_QUERY_FILE_FULL_EMPTY_FILTERS);
String expectedQueryFullEmptyFilters =
Resources.toString(urlFullEmptyFilters, StandardCharsets.UTF_8);
URL urlFullMultipleFilters = Resources.getResource(TEST_QUERY_FILE_FULL_MULTIPLE_FILTERS);
String expectedQueryFullMultipleFilters =
Resources.toString(urlFullMultipleFilters, StandardCharsets.UTF_8);
List<Urn> urns = new ArrayList<>();
List<Urn> urns = List.of(Urn.createFromString("urn:li:dataset:test-urn"));
List<Urn> urnsMultiple1 =
ImmutableList.of(
UrnUtils.getUrn("urn:li:dataset:test-urn"),
UrnUtils.getUrn("urn:li:dataset:test-urn2"),
UrnUtils.getUrn("urn:li:dataset:test-urn3"));
List<Urn> urnsMultiple2 =
ImmutableList.of(
UrnUtils.getUrn("urn:li:chart:test-urn"),
UrnUtils.getUrn("urn:li:chart:test-urn2"),
UrnUtils.getUrn("urn:li:chart:test-urn3"));
List<LineageRegistry.EdgeInfo> edgeInfos =
new ArrayList<>(
ImmutableList.of(
@ -40,14 +64,64 @@ public class ESGraphQueryDAOTest {
"DownstreamOf",
RelationshipDirection.INCOMING,
Constants.DATASET_ENTITY_NAME)));
List<LineageRegistry.EdgeInfo> edgeInfosMultiple1 =
ImmutableList.of(
new LineageRegistry.EdgeInfo(
"DownstreamOf", RelationshipDirection.OUTGOING, Constants.DATASET_ENTITY_NAME),
new LineageRegistry.EdgeInfo(
"Consumes", RelationshipDirection.OUTGOING, Constants.DATASET_ENTITY_NAME));
List<LineageRegistry.EdgeInfo> edgeInfosMultiple2 =
ImmutableList.of(
new LineageRegistry.EdgeInfo(
"DownstreamOf", RelationshipDirection.OUTGOING, Constants.DATA_JOB_ENTITY_NAME),
new LineageRegistry.EdgeInfo(
"Consumes", RelationshipDirection.OUTGOING, Constants.DATA_JOB_ENTITY_NAME));
String entityType = "testEntityType";
Map<String, List<Urn>> urnsPerEntityType = Map.of(entityType, urns);
Map<String, List<Urn>> urnsPerEntityTypeMultiple =
Map.of(
Constants.DATASET_ENTITY_NAME,
urnsMultiple1,
Constants.CHART_ENTITY_NAME,
urnsMultiple2);
Map<String, List<LineageRegistry.EdgeInfo>> edgesPerEntityType = Map.of(entityType, edgeInfos);
Map<String, List<LineageRegistry.EdgeInfo>> edgesPerEntityTypeMultiple =
Map.of(
Constants.DATASET_ENTITY_NAME, edgeInfosMultiple1,
Constants.DATA_JOB_ENTITY_NAME, edgeInfosMultiple2);
GraphFilters graphFilters = new GraphFilters(ImmutableList.of(Constants.DATASET_ENTITY_NAME));
GraphFilters graphFiltersMultiple =
new GraphFilters(
ImmutableList.of(
Constants.DATASET_ENTITY_NAME,
Constants.DASHBOARD_ENTITY_NAME,
Constants.DATA_JOB_ENTITY_NAME));
Long startTime = 0L;
Long endTime = 1L;
QueryBuilder builder =
ESGraphQueryDAO.getQueryForLineage(urns, edgeInfos, graphFilters, startTime, endTime);
QueryBuilder limitedBuilder =
ESGraphQueryDAO.getLineageQueryForEntityType(urns, edgeInfos, graphFilters);
Assert.assertEquals(builder.toString(), expectedQuery);
QueryBuilder fullBuilder =
ESGraphQueryDAO.getLineageQuery(
urnsPerEntityType, edgesPerEntityType, graphFilters, startTime, endTime);
QueryBuilder fullBuilderEmptyFilters =
ESGraphQueryDAO.getLineageQuery(
urnsPerEntityType, edgesPerEntityType, GraphFilters.emptyGraphFilters, null, null);
QueryBuilder fullBuilderMultipleFilters =
ESGraphQueryDAO.getLineageQuery(
urnsPerEntityTypeMultiple,
edgesPerEntityTypeMultiple,
graphFiltersMultiple,
startTime,
endTime);
Assert.assertEquals(limitedBuilder.toString(), expectedQueryLimited);
Assert.assertEquals(fullBuilder.toString(), expectedQueryFull);
Assert.assertEquals(fullBuilderEmptyFilters.toString(), expectedQueryFullEmptyFilters);
Assert.assertEquals(fullBuilderMultipleFilters.toString(), expectedQueryFullMultipleFilters);
}
@Test

View File

@ -1,6 +1,62 @@
{
"bool" : {
"must" : [
"filter" : [
{
"bool" : {
"should" : [
{
"bool" : {
"should" : [
{
"bool" : {
"must" : [
{
"terms" : {
"destination.urn" : [
"urn:li:dataset:test-urn"
],
"boost" : 1.0
}
},
{
"terms" : {
"relationshipType" : [
"DownstreamOf"
],
"boost" : 1.0
}
}
],
"adjust_pure_negative" : true,
"boost" : 1.0
}
}
],
"adjust_pure_negative" : true,
"boost" : 1.0
}
}
],
"adjust_pure_negative" : true,
"boost" : 1.0
}
},
{
"terms" : {
"source.entityType" : [
"dataset"
],
"boost" : 1.0
}
},
{
"terms" : {
"destination.entityType" : [
"dataset"
],
"boost" : 1.0
}
},
{
"bool" : {
"should" : [
@ -160,46 +216,6 @@
}
}
],
"should" : [
{
"bool" : {
"must" : [
{
"terms" : {
"destination.urn" : [ ],
"boost" : 1.0
}
},
{
"terms" : {
"relationshipType" : [
"DownstreamOf"
],
"boost" : 1.0
}
},
{
"terms" : {
"source.entityType" : [
"dataset"
],
"boost" : 1.0
}
},
{
"terms" : {
"destination.entityType" : [
"dataset"
],
"boost" : 1.0
}
}
],
"adjust_pure_negative" : true,
"boost" : 1.0
}
}
],
"adjust_pure_negative" : true,
"boost" : 1.0
}

View File

@ -0,0 +1,60 @@
{
"bool" : {
"filter" : [
{
"bool" : {
"should" : [
{
"bool" : {
"should" : [
{
"bool" : {
"must" : [
{
"terms" : {
"destination.urn" : [
"urn:li:dataset:test-urn"
],
"boost" : 1.0
}
},
{
"terms" : {
"relationshipType" : [
"DownstreamOf"
],
"boost" : 1.0
}
}
],
"adjust_pure_negative" : true,
"boost" : 1.0
}
}
],
"adjust_pure_negative" : true,
"boost" : 1.0
}
}
],
"adjust_pure_negative" : true,
"boost" : 1.0
}
},
{
"terms" : {
"source.entityType" : [ ],
"boost" : 1.0
}
},
{
"terms" : {
"destination.entityType" : [ ],
"boost" : 1.0
}
}
],
"adjust_pure_negative" : true,
"boost" : 1.0
}
}

View File

@ -0,0 +1,229 @@
{
"bool" : {
"filter" : [
{
"bool" : {
"should" : [
{
"bool" : {
"should" : [
{
"bool" : {
"must" : [
{
"terms" : {
"source.urn" : [
"urn:li:dataset:test-urn",
"urn:li:dataset:test-urn2",
"urn:li:dataset:test-urn3"
],
"boost" : 1.0
}
},
{
"terms" : {
"relationshipType" : [
"DownstreamOf",
"Consumes"
],
"boost" : 1.0
}
}
],
"adjust_pure_negative" : true,
"boost" : 1.0
}
}
],
"adjust_pure_negative" : true,
"boost" : 1.0
}
}
],
"adjust_pure_negative" : true,
"boost" : 1.0
}
},
{
"terms" : {
"source.entityType" : [
"dataset",
"dashboard",
"dataJob"
],
"boost" : 1.0
}
},
{
"terms" : {
"destination.entityType" : [
"dataset",
"dashboard",
"dataJob"
],
"boost" : 1.0
}
},
{
"bool" : {
"should" : [
{
"bool" : {
"should" : [
{
"bool" : {
"must" : [
{
"exists" : {
"field" : "createdOn",
"boost" : 1.0
}
},
{
"range" : {
"createdOn" : {
"from" : 0,
"to" : 1,
"include_lower" : true,
"include_upper" : true,
"boost" : 1.0
}
}
}
],
"adjust_pure_negative" : true,
"boost" : 1.0
}
},
{
"bool" : {
"must" : [
{
"exists" : {
"field" : "updatedOn",
"boost" : 1.0
}
},
{
"range" : {
"updatedOn" : {
"from" : 0,
"to" : 1,
"include_lower" : true,
"include_upper" : true,
"boost" : 1.0
}
}
}
],
"adjust_pure_negative" : true,
"boost" : 1.0
}
}
],
"adjust_pure_negative" : true,
"boost" : 1.0
}
},
{
"bool" : {
"must" : [
{
"bool" : {
"should" : [
{
"bool" : {
"must_not" : [
{
"exists" : {
"field" : "createdOn",
"boost" : 1.0
}
}
],
"adjust_pure_negative" : true,
"boost" : 1.0
}
},
{
"bool" : {
"must" : [
{
"term" : {
"createdOn" : {
"value" : 0,
"boost" : 1.0
}
}
}
],
"adjust_pure_negative" : true,
"boost" : 1.0
}
}
],
"adjust_pure_negative" : true,
"boost" : 1.0
}
},
{
"bool" : {
"should" : [
{
"bool" : {
"must_not" : [
{
"exists" : {
"field" : "updatedOn",
"boost" : 1.0
}
}
],
"adjust_pure_negative" : true,
"boost" : 1.0
}
},
{
"bool" : {
"must" : [
{
"term" : {
"updatedOn" : {
"value" : 0,
"boost" : 1.0
}
}
}
],
"adjust_pure_negative" : true,
"boost" : 1.0
}
}
],
"adjust_pure_negative" : true,
"boost" : 1.0
}
}
],
"adjust_pure_negative" : true,
"boost" : 1.0
}
},
{
"term" : {
"properties.source" : {
"value" : "UI",
"boost" : 1.0
}
}
}
],
"adjust_pure_negative" : true,
"boost" : 1.0
}
}
],
"adjust_pure_negative" : true,
"boost" : 1.0
}
}

View File

@ -0,0 +1,32 @@
{
"bool" : {
"should" : [
{
"bool" : {
"must" : [
{
"terms" : {
"destination.urn" : [
"urn:li:dataset:test-urn"
],
"boost" : 1.0
}
},
{
"terms" : {
"relationshipType" : [
"DownstreamOf"
],
"boost" : 1.0
}
}
],
"adjust_pure_negative" : true,
"boost" : 1.0
}
}
],
"adjust_pure_negative" : true,
"boost" : 1.0
}
}