mirror of
https://github.com/datahub-project/datahub.git
synced 2025-08-31 21:03:03 +00:00
fix(lineage) Fix lineage source/dest filtering with explored per hop limit (#10879)
This commit is contained in:
parent
aa92a99130
commit
d77d565ba9
@ -177,13 +177,18 @@ public class ESGraphQueryDAO {
|
|||||||
sourceFilterQuery.minimumShouldMatch(1);
|
sourceFilterQuery.minimumShouldMatch(1);
|
||||||
validEdges.stream()
|
validEdges.stream()
|
||||||
.filter(pair -> RelationshipDirection.OUTGOING.equals(pair.getValue().getDirection()))
|
.filter(pair -> RelationshipDirection.OUTGOING.equals(pair.getValue().getDirection()))
|
||||||
.forEach(pair -> sourceFilterQuery.should(getAggregationFilter(pair)));
|
.forEach(
|
||||||
|
pair ->
|
||||||
|
sourceFilterQuery.should(
|
||||||
|
getAggregationFilter(pair, RelationshipDirection.OUTGOING)));
|
||||||
|
|
||||||
BoolQueryBuilder destFilterQuery = QueryBuilders.boolQuery();
|
BoolQueryBuilder destFilterQuery = QueryBuilders.boolQuery();
|
||||||
destFilterQuery.minimumShouldMatch(1);
|
destFilterQuery.minimumShouldMatch(1);
|
||||||
validEdges.stream()
|
validEdges.stream()
|
||||||
.filter(pair -> RelationshipDirection.INCOMING.equals(pair.getValue().getDirection()))
|
.filter(pair -> RelationshipDirection.INCOMING.equals(pair.getValue().getDirection()))
|
||||||
.forEach(pair -> destFilterQuery.should(getAggregationFilter(pair)));
|
.forEach(
|
||||||
|
pair ->
|
||||||
|
destFilterQuery.should(getAggregationFilter(pair, RelationshipDirection.INCOMING)));
|
||||||
|
|
||||||
FilterAggregationBuilder sourceRelationshipTypeFilters =
|
FilterAggregationBuilder sourceRelationshipTypeFilters =
|
||||||
AggregationBuilders.filter(FILTER_BY_SOURCE_RELATIONSHIP, sourceFilterQuery);
|
AggregationBuilders.filter(FILTER_BY_SOURCE_RELATIONSHIP, sourceFilterQuery);
|
||||||
@ -226,17 +231,28 @@ public class ESGraphQueryDAO {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private BoolQueryBuilder getAggregationFilter(Pair<String, EdgeInfo> pair) {
|
private BoolQueryBuilder getAggregationFilter(
|
||||||
|
Pair<String, EdgeInfo> pair, RelationshipDirection direction) {
|
||||||
BoolQueryBuilder subFilter = QueryBuilders.boolQuery();
|
BoolQueryBuilder subFilter = QueryBuilders.boolQuery();
|
||||||
TermQueryBuilder relationshipTypeTerm =
|
TermQueryBuilder relationshipTypeTerm =
|
||||||
QueryBuilders.termQuery(RELATIONSHIP_TYPE, pair.getValue().getType());
|
QueryBuilders.termQuery(RELATIONSHIP_TYPE, pair.getValue().getType()).caseInsensitive(true);
|
||||||
subFilter.must(relationshipTypeTerm);
|
subFilter.must(relationshipTypeTerm);
|
||||||
|
|
||||||
|
String sourceType;
|
||||||
|
String destinationType;
|
||||||
|
if (direction.equals(RelationshipDirection.OUTGOING)) {
|
||||||
|
sourceType = pair.getKey();
|
||||||
|
destinationType = pair.getValue().getOpposingEntityType();
|
||||||
|
} else {
|
||||||
|
sourceType = pair.getValue().getOpposingEntityType();
|
||||||
|
destinationType = pair.getKey();
|
||||||
|
}
|
||||||
|
|
||||||
TermQueryBuilder sourceTypeTerm =
|
TermQueryBuilder sourceTypeTerm =
|
||||||
QueryBuilders.termQuery(SOURCE + ".entityType", pair.getKey());
|
QueryBuilders.termQuery(SOURCE + ".entityType", sourceType).caseInsensitive(true);
|
||||||
subFilter.must(sourceTypeTerm);
|
subFilter.must(sourceTypeTerm);
|
||||||
TermQueryBuilder destinationTypeTerm =
|
TermQueryBuilder destinationTypeTerm =
|
||||||
QueryBuilders.termQuery(
|
QueryBuilders.termQuery(DESTINATION + ".entityType", destinationType).caseInsensitive(true);
|
||||||
DESTINATION + ".entityType", pair.getValue().getOpposingEntityType());
|
|
||||||
subFilter.must(destinationTypeTerm);
|
subFilter.must(destinationTypeTerm);
|
||||||
return subFilter;
|
return subFilter;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user