mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-28 02:17:53 +00:00
fix(graph service): only query for entities that should have lineage [Breaking Change] (#5539)
* allow filtering multiple entity types in graph service * fixing up tests * fixing imports * lint fix * another lint fix * fixing another lint * responding to comments * fixing entity registry test * always fetch yourself
This commit is contained in:
parent
3ea0005fcb
commit
2e332d8c09
@ -147,7 +147,7 @@ export const OperationsTab = () => {
|
||||
inputs: run?.inputs?.relationships.map((relationship) => relationship.entity),
|
||||
outputs: run?.outputs?.relationships.map((relationship) => relationship.entity),
|
||||
externalUrl: run?.externalUrl,
|
||||
parentTemplate: run?.parentTemplate?.relationships?.[0].entity,
|
||||
parentTemplate: run?.parentTemplate?.relationships?.[0]?.entity,
|
||||
}));
|
||||
|
||||
return (
|
||||
|
||||
@ -19,6 +19,8 @@ export const CompactEntityNameList = ({ entities, onClick, linkUrlParams, showTo
|
||||
return (
|
||||
<>
|
||||
{entities.map((entity, index) => {
|
||||
if (!entity) return <></>;
|
||||
|
||||
const genericProps = entityRegistry.getGenericEntityProperties(entity.type, entity);
|
||||
const platformLogoUrl = genericProps?.platform?.properties?.logoUrl;
|
||||
const displayName = entityRegistry.getDisplayName(entity.type, entity);
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
package com.linkedin.metadata.models.registry;
|
||||
|
||||
import com.google.common.collect.Streams;
|
||||
import com.linkedin.metadata.graph.LineageDirection;
|
||||
import com.linkedin.metadata.models.EntitySpec;
|
||||
import com.linkedin.metadata.models.annotation.RelationshipAnnotation;
|
||||
import com.linkedin.metadata.query.filter.RelationshipDirection;
|
||||
import java.util.ArrayList;
|
||||
@ -25,9 +27,11 @@ import org.apache.commons.lang3.tuple.Triple;
|
||||
public class LineageRegistry {
|
||||
|
||||
private final Map<String, LineageSpec> _lineageSpecMap;
|
||||
private final EntityRegistry _entityRegistry;
|
||||
|
||||
public LineageRegistry(EntityRegistry entityRegistry) {
|
||||
_lineageSpecMap = buildLineageSpecs(entityRegistry);
|
||||
_entityRegistry = entityRegistry;
|
||||
}
|
||||
|
||||
private Map<String, LineageSpec> buildLineageSpecs(EntityRegistry entityRegistry) {
|
||||
@ -53,14 +57,14 @@ public class LineageRegistry {
|
||||
for (LineageEdge edge : lineageEdges) {
|
||||
if (edge.isUpstream()) {
|
||||
upstreamPerEntity.computeIfAbsent(edge.sourceEntity.toLowerCase(), (k) -> new HashSet<>())
|
||||
.add(new EdgeInfo(edge.type, RelationshipDirection.OUTGOING));
|
||||
.add(new EdgeInfo(edge.type, RelationshipDirection.OUTGOING, edge.destEntity));
|
||||
downstreamPerEntity.computeIfAbsent(edge.destEntity.toLowerCase(), (k) -> new HashSet<>())
|
||||
.add(new EdgeInfo(edge.type, RelationshipDirection.INCOMING));
|
||||
.add(new EdgeInfo(edge.type, RelationshipDirection.INCOMING, edge.sourceEntity));
|
||||
} else {
|
||||
downstreamPerEntity.computeIfAbsent(edge.sourceEntity.toLowerCase(), (k) -> new HashSet<>())
|
||||
.add(new EdgeInfo(edge.type, RelationshipDirection.OUTGOING));
|
||||
.add(new EdgeInfo(edge.type, RelationshipDirection.OUTGOING, edge.destEntity));
|
||||
upstreamPerEntity.computeIfAbsent(edge.destEntity.toLowerCase(), (k) -> new HashSet<>())
|
||||
.add(new EdgeInfo(edge.type, RelationshipDirection.INCOMING));
|
||||
.add(new EdgeInfo(edge.type, RelationshipDirection.INCOMING, edge.sourceEntity));
|
||||
}
|
||||
}
|
||||
|
||||
@ -86,6 +90,17 @@ public class LineageRegistry {
|
||||
return _lineageSpecMap.get(entityName.toLowerCase());
|
||||
}
|
||||
|
||||
public Set<String> getEntitiesWithLineageToEntityType(String entityType) {
|
||||
Map<String, EntitySpec> specs = _entityRegistry.getEntitySpecs();
|
||||
Set<String> result = Streams.concat(_lineageSpecMap.get(entityType.toLowerCase()).getDownstreamEdges().stream(),
|
||||
_lineageSpecMap.get(entityType.toLowerCase()).getUpstreamEdges().stream())
|
||||
.map(EdgeInfo::getOpposingEntityType)
|
||||
.map(entity -> specs.get(entity.toLowerCase()).getName())
|
||||
.collect(Collectors.toSet());
|
||||
result.add(entityType);
|
||||
return result;
|
||||
}
|
||||
|
||||
public List<EdgeInfo> getLineageRelationships(String entityName, LineageDirection direction) {
|
||||
LineageSpec spec = getLineageSpec(entityName);
|
||||
if (spec == null) {
|
||||
@ -116,5 +131,6 @@ public class LineageRegistry {
|
||||
public static class EdgeInfo {
|
||||
String type;
|
||||
RelationshipDirection direction;
|
||||
String opposingEntityType;
|
||||
}
|
||||
}
|
||||
|
||||
@ -51,14 +51,14 @@ public class LineageRegistryTest {
|
||||
LineageRegistry.LineageSpec lineageSpec = lineageRegistry.getLineageSpec("dataset");
|
||||
assertEquals(lineageSpec.getUpstreamEdges().size(), 2);
|
||||
assertTrue(lineageSpec.getUpstreamEdges()
|
||||
.contains(new LineageRegistry.EdgeInfo("DownstreamOf", RelationshipDirection.OUTGOING)));
|
||||
.contains(new LineageRegistry.EdgeInfo("DownstreamOf", RelationshipDirection.OUTGOING, "dataset")));
|
||||
assertTrue(lineageSpec.getUpstreamEdges()
|
||||
.contains(new LineageRegistry.EdgeInfo("Produces", RelationshipDirection.INCOMING)));
|
||||
.contains(new LineageRegistry.EdgeInfo("Produces", RelationshipDirection.INCOMING, "dataJob")));
|
||||
assertEquals(lineageSpec.getDownstreamEdges().size(), 2);
|
||||
assertTrue(lineageSpec.getDownstreamEdges()
|
||||
.contains(new LineageRegistry.EdgeInfo("DownstreamOf", RelationshipDirection.INCOMING)));
|
||||
.contains(new LineageRegistry.EdgeInfo("DownstreamOf", RelationshipDirection.INCOMING, "dataset")));
|
||||
assertTrue(lineageSpec.getDownstreamEdges()
|
||||
.contains(new LineageRegistry.EdgeInfo("Consumes", RelationshipDirection.INCOMING)));
|
||||
.contains(new LineageRegistry.EdgeInfo("Consumes", RelationshipDirection.INCOMING, "dataJob")));
|
||||
}
|
||||
|
||||
private RelationshipFieldSpec buildSpec(String relationshipType, List<String> destinationEntityTypes,
|
||||
|
||||
@ -0,0 +1,18 @@
|
||||
package com.linkedin.metadata.graph;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import java.util.List;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
public class GraphFilters {
|
||||
// entity types you want to allow in your result set
|
||||
public List<String> allowedEntityTypes;
|
||||
|
||||
public static GraphFilters emptyGraphFilters = new GraphFilters(ImmutableList.of());
|
||||
|
||||
public static GraphFilters defaultGraphFilters = new GraphFilters(ImmutableList.of());
|
||||
}
|
||||
@ -7,6 +7,7 @@ import com.linkedin.metadata.query.filter.RelationshipDirection;
|
||||
import com.linkedin.metadata.query.filter.RelationshipFilter;
|
||||
import com.linkedin.metadata.search.utils.QueryUtils;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -79,11 +80,12 @@ public interface GraphService {
|
||||
* - RelatedEntity("DownstreamOf", "dataset three")
|
||||
*/
|
||||
@Nonnull
|
||||
RelatedEntitiesResult findRelatedEntities(@Nullable final String sourceType, @Nonnull final Filter sourceEntityFilter,
|
||||
@Nullable final String destinationType, @Nonnull final Filter destinationEntityFilter,
|
||||
RelatedEntitiesResult findRelatedEntities(@Nullable final List<String> sourceTypes, @Nonnull final Filter sourceEntityFilter,
|
||||
@Nullable final List<String> destinationTypes, @Nonnull final Filter destinationEntityFilter,
|
||||
@Nonnull final List<String> relationshipTypes, @Nonnull final RelationshipFilter relationshipFilter,
|
||||
final int offset, final int count);
|
||||
|
||||
|
||||
/**
|
||||
* Traverse from the entityUrn towards the input direction up to maxHops number of hops
|
||||
* Abstracts away the concept of relationship types
|
||||
@ -93,6 +95,26 @@ public interface GraphService {
|
||||
@Nonnull
|
||||
default EntityLineageResult getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDirection direction, int offset,
|
||||
int count, int maxHops) {
|
||||
return getLineage(
|
||||
entityUrn,
|
||||
direction,
|
||||
new GraphFilters(new ArrayList(getLineageRegistry().getEntitiesWithLineageToEntityType(entityUrn.getEntityType()))),
|
||||
offset,
|
||||
count,
|
||||
maxHops
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Traverse from the entityUrn towards the input direction up to maxHops number of hops. If entityTypes is not empty,
|
||||
* will only return edges to entities that are within the entity types set.
|
||||
* Abstracts away the concept of relationship types
|
||||
*
|
||||
* Unless overridden, it uses the lineage registry to fetch valid edge types and queries for them
|
||||
*/
|
||||
@Nonnull
|
||||
default EntityLineageResult getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDirection direction,
|
||||
GraphFilters graphFilters, int offset, int count, int maxHops) {
|
||||
if (maxHops > 1) {
|
||||
maxHops = 1;
|
||||
}
|
||||
@ -109,10 +131,11 @@ public interface GraphService {
|
||||
// Outgoing edges
|
||||
if (!CollectionUtils.isEmpty(edgesByDirection.get(true))) {
|
||||
List<String> relationshipTypes =
|
||||
edgesByDirection.get(true).stream().map(LineageRegistry.EdgeInfo::getType).collect(Collectors.toList());
|
||||
new ArrayList(edgesByDirection.get(true).stream().map(LineageRegistry.EdgeInfo::getType).collect(Collectors.toSet()));
|
||||
// Fetch outgoing edges
|
||||
RelatedEntitiesResult outgoingEdges =
|
||||
findRelatedEntities(null, newFilter("urn", entityUrn.toString()), null, QueryUtils.EMPTY_FILTER,
|
||||
findRelatedEntities(null, newFilter("urn", entityUrn.toString()), graphFilters.getAllowedEntityTypes(),
|
||||
QueryUtils.EMPTY_FILTER,
|
||||
relationshipTypes, newRelationshipFilter(QueryUtils.EMPTY_FILTER, RelationshipDirection.OUTGOING), offset,
|
||||
count);
|
||||
|
||||
@ -137,7 +160,8 @@ public interface GraphService {
|
||||
List<String> relationshipTypes =
|
||||
edgesByDirection.get(false).stream().map(LineageRegistry.EdgeInfo::getType).collect(Collectors.toList());
|
||||
RelatedEntitiesResult incomingEdges =
|
||||
findRelatedEntities(null, newFilter("urn", entityUrn.toString()), null, QueryUtils.EMPTY_FILTER,
|
||||
findRelatedEntities(null, newFilter("urn", entityUrn.toString()), graphFilters.getAllowedEntityTypes(),
|
||||
QueryUtils.EMPTY_FILTER,
|
||||
relationshipTypes, newRelationshipFilter(QueryUtils.EMPTY_FILTER, RelationshipDirection.INCOMING), offset,
|
||||
count);
|
||||
result.setTotal(result.getTotal() + incomingEdges.getTotal());
|
||||
|
||||
@ -38,10 +38,9 @@ public class JavaGraphClient implements GraphClient {
|
||||
count = count == null ? DEFAULT_PAGE_SIZE : count;
|
||||
|
||||
RelatedEntitiesResult relatedEntitiesResult =
|
||||
_graphService.findRelatedEntities(
|
||||
"",
|
||||
_graphService.findRelatedEntities(null,
|
||||
QueryUtils.newFilter("urn", rawUrn),
|
||||
"",
|
||||
null,
|
||||
EMPTY_FILTER,
|
||||
relationshipTypes,
|
||||
QueryUtils.newRelationshipFilter(EMPTY_FILTER, direction),
|
||||
|
||||
@ -40,8 +40,8 @@ public class SiblingGraphService {
|
||||
* Unless overridden, it uses the lineage registry to fetch valid edge types and queries for them
|
||||
*/
|
||||
@Nonnull
|
||||
public EntityLineageResult getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDirection direction, int offset,
|
||||
int count, int maxHops, boolean separateSiblings) {
|
||||
public EntityLineageResult getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDirection direction,
|
||||
int offset, int count, int maxHops, boolean separateSiblings) {
|
||||
if (separateSiblings) {
|
||||
return _graphService.getLineage(entityUrn, direction, offset, count, maxHops);
|
||||
}
|
||||
|
||||
@ -26,6 +26,7 @@ import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@ -242,12 +243,13 @@ public class DgraphGraphService implements GraphService {
|
||||
}
|
||||
}
|
||||
|
||||
return relationships;
|
||||
// we need to remove duplicates in order to not cause invalid queries in dgraph
|
||||
return new ArrayList<>(new LinkedHashSet(relationships));
|
||||
}
|
||||
|
||||
protected static String getQueryForRelatedEntities(@Nullable String sourceType,
|
||||
protected static String getQueryForRelatedEntities(@Nullable List<String> sourceTypes,
|
||||
@Nonnull Filter sourceEntityFilter,
|
||||
@Nullable String destinationType,
|
||||
@Nullable List<String> destinationTypes,
|
||||
@Nonnull Filter destinationEntityFilter,
|
||||
@Nonnull List<String> relationshipTypes,
|
||||
@Nonnull RelationshipFilter relationshipFilter,
|
||||
@ -291,16 +293,20 @@ public class DgraphGraphService implements GraphService {
|
||||
List<String> destinationFilterNames = new ArrayList<>();
|
||||
List<String> relationshipTypeFilterNames = new ArrayList<>();
|
||||
|
||||
if (sourceType != null) {
|
||||
if (sourceTypes != null && sourceTypes.size() > 0) {
|
||||
sourceTypeFilterName = "sourceType";
|
||||
// TODO: escape string value
|
||||
filters.add(String.format("%s as var(func: eq(<type>, \"%s\"))", sourceTypeFilterName, sourceType));
|
||||
final StringJoiner joiner = new StringJoiner("\",\"", "[\"", "\"]");
|
||||
sourceTypes.forEach(type -> joiner.add(type));
|
||||
filters.add(String.format("%s as var(func: eq(<type>, %s))", sourceTypeFilterName, joiner.toString()));
|
||||
}
|
||||
|
||||
if (destinationType != null) {
|
||||
if (destinationTypes != null && destinationTypes.size() > 0) {
|
||||
destinationTypeFilterName = "destinationType";
|
||||
final StringJoiner joiner = new StringJoiner("\",\"", "[\"", "\"]");
|
||||
destinationTypes.forEach(type -> joiner.add(type));
|
||||
// TODO: escape string value
|
||||
filters.add(String.format("%s as var(func: eq(<type>, \"%s\"))", destinationTypeFilterName, destinationType));
|
||||
filters.add(String.format("%s as var(func: eq(<type>, %s))", destinationTypeFilterName, joiner.toString()));
|
||||
}
|
||||
|
||||
//noinspection ConstantConditions
|
||||
@ -381,21 +387,25 @@ public class DgraphGraphService implements GraphService {
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public RelatedEntitiesResult findRelatedEntities(@Nullable String sourceType,
|
||||
public RelatedEntitiesResult findRelatedEntities(@Nullable List<String> sourceTypes,
|
||||
@Nonnull Filter sourceEntityFilter,
|
||||
@Nullable String destinationType,
|
||||
@Nullable List<String> destinationTypes,
|
||||
@Nonnull Filter destinationEntityFilter,
|
||||
@Nonnull List<String> relationshipTypes,
|
||||
@Nonnull RelationshipFilter relationshipFilter,
|
||||
int offset,
|
||||
int count) {
|
||||
|
||||
if (sourceTypes != null && sourceTypes.isEmpty() || destinationTypes != null && destinationTypes.isEmpty()) {
|
||||
return new RelatedEntitiesResult(offset, 0, 0, Collections.emptyList());
|
||||
}
|
||||
if (relationshipTypes.isEmpty() || relationshipTypes.stream().noneMatch(relationship -> get_schema().hasField(relationship))) {
|
||||
return new RelatedEntitiesResult(offset, 0, 0, Collections.emptyList());
|
||||
}
|
||||
|
||||
String query = getQueryForRelatedEntities(
|
||||
sourceType, sourceEntityFilter,
|
||||
destinationType, destinationEntityFilter,
|
||||
sourceTypes, sourceEntityFilter,
|
||||
destinationTypes, destinationEntityFilter,
|
||||
relationshipTypes.stream().filter(get_schema()::hasField).collect(Collectors.toList()),
|
||||
relationshipFilter,
|
||||
offset, count
|
||||
|
||||
@ -6,6 +6,7 @@ import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.common.urn.UrnUtils;
|
||||
import com.linkedin.metadata.graph.GraphFilters;
|
||||
import com.linkedin.metadata.graph.LineageDirection;
|
||||
import com.linkedin.metadata.models.registry.LineageRegistry;
|
||||
import com.linkedin.metadata.models.registry.LineageRegistry.EdgeInfo;
|
||||
@ -107,19 +108,19 @@ public class ESGraphQueryDAO {
|
||||
}
|
||||
}
|
||||
|
||||
public SearchResponse getSearchResponse(@Nullable final String sourceType, @Nonnull final Filter sourceEntityFilter,
|
||||
@Nullable final String destinationType, @Nonnull final Filter destinationEntityFilter,
|
||||
public SearchResponse getSearchResponse(@Nullable final List<String> sourceTypes, @Nonnull final Filter sourceEntityFilter,
|
||||
@Nullable final List<String> destinationTypes, @Nonnull final Filter destinationEntityFilter,
|
||||
@Nonnull final List<String> relationshipTypes, @Nonnull final RelationshipFilter relationshipFilter,
|
||||
final int offset, final int count) {
|
||||
BoolQueryBuilder finalQuery =
|
||||
buildQuery(sourceType, sourceEntityFilter, destinationType, destinationEntityFilter, relationshipTypes,
|
||||
buildQuery(sourceTypes, sourceEntityFilter, destinationTypes, destinationEntityFilter, relationshipTypes,
|
||||
relationshipFilter);
|
||||
|
||||
return executeSearchQuery(finalQuery, offset, count);
|
||||
}
|
||||
|
||||
public static BoolQueryBuilder buildQuery(@Nullable final String sourceType, @Nonnull final Filter sourceEntityFilter,
|
||||
@Nullable final String destinationType, @Nonnull final Filter destinationEntityFilter,
|
||||
public static BoolQueryBuilder buildQuery(@Nullable final List<String> sourceTypes, @Nonnull final Filter sourceEntityFilter,
|
||||
@Nullable final List<String> destinationTypes, @Nonnull final Filter destinationEntityFilter,
|
||||
@Nonnull final List<String> relationshipTypes, @Nonnull final RelationshipFilter relationshipFilter) {
|
||||
BoolQueryBuilder finalQuery = QueryBuilders.boolQuery();
|
||||
|
||||
@ -127,15 +128,15 @@ public class ESGraphQueryDAO {
|
||||
|
||||
// set source filter
|
||||
String sourceNode = relationshipDirection == RelationshipDirection.OUTGOING ? SOURCE : DESTINATION;
|
||||
if (sourceType != null && sourceType.length() > 0) {
|
||||
finalQuery.must(QueryBuilders.termQuery(sourceNode + ".entityType", sourceType));
|
||||
if (sourceTypes != null && sourceTypes.size() > 0) {
|
||||
finalQuery.must(QueryBuilders.termsQuery(sourceNode + ".entityType", sourceTypes));
|
||||
}
|
||||
addFilterToQueryBuilder(sourceEntityFilter, sourceNode, finalQuery);
|
||||
|
||||
// set destination filter
|
||||
String destinationNode = relationshipDirection == RelationshipDirection.OUTGOING ? DESTINATION : SOURCE;
|
||||
if (destinationType != null && destinationType.length() > 0) {
|
||||
finalQuery.must(QueryBuilders.termQuery(destinationNode + ".entityType", destinationType));
|
||||
if (destinationTypes != null && destinationTypes.size() > 0) {
|
||||
finalQuery.must(QueryBuilders.termsQuery(destinationNode + ".entityType", destinationTypes));
|
||||
}
|
||||
addFilterToQueryBuilder(destinationEntityFilter, destinationNode, finalQuery);
|
||||
|
||||
@ -150,7 +151,7 @@ public class ESGraphQueryDAO {
|
||||
}
|
||||
|
||||
@WithSpan
|
||||
public LineageResponse getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDirection direction, int offset, int count,
|
||||
public LineageResponse getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDirection direction, GraphFilters graphFilters, int offset, int count,
|
||||
int maxHops) {
|
||||
List<LineageRelationship> result = new ArrayList<>();
|
||||
long currentTime = System.currentTimeMillis();
|
||||
@ -175,7 +176,7 @@ public class ESGraphQueryDAO {
|
||||
|
||||
// Do one hop on the lineage graph
|
||||
List<LineageRelationship> oneHopRelationships =
|
||||
getLineageRelationshipsInBatches(currentLevel, direction, visitedEntities, i + 1, remainingTime);
|
||||
getLineageRelationshipsInBatches(currentLevel, direction, graphFilters, visitedEntities, i + 1, remainingTime);
|
||||
result.addAll(oneHopRelationships);
|
||||
currentLevel = oneHopRelationships.stream().map(LineageRelationship::getEntity).collect(Collectors.toList());
|
||||
currentTime = System.currentTimeMillis();
|
||||
@ -196,11 +197,11 @@ public class ESGraphQueryDAO {
|
||||
// Get 1-hop lineage relationships asynchronously in batches with timeout
|
||||
@WithSpan
|
||||
public List<LineageRelationship> getLineageRelationshipsInBatches(@Nonnull List<Urn> entityUrns,
|
||||
@Nonnull LineageDirection direction, Set<Urn> visitedEntities, int numHops, long remainingTime) {
|
||||
@Nonnull LineageDirection direction, GraphFilters graphFilters, Set<Urn> visitedEntities, int numHops, long remainingTime) {
|
||||
List<List<Urn>> batches = Lists.partition(entityUrns, BATCH_SIZE);
|
||||
return ConcurrencyUtils.getAllCompleted(batches.stream()
|
||||
.map(batchUrns -> CompletableFuture.supplyAsync(
|
||||
() -> getLineageRelationships(batchUrns, direction, visitedEntities, numHops)))
|
||||
() -> getLineageRelationships(batchUrns, direction, graphFilters, visitedEntities, numHops)))
|
||||
.collect(Collectors.toList()), remainingTime, TimeUnit.MILLISECONDS)
|
||||
.stream()
|
||||
.flatMap(List::stream)
|
||||
@ -210,7 +211,7 @@ public class ESGraphQueryDAO {
|
||||
// Get 1-hop lineage relationships
|
||||
@WithSpan
|
||||
private List<LineageRelationship> getLineageRelationships(@Nonnull List<Urn> entityUrns,
|
||||
@Nonnull LineageDirection direction, Set<Urn> visitedEntities, int numHops) {
|
||||
@Nonnull LineageDirection direction, GraphFilters graphFilters, Set<Urn> visitedEntities, int numHops) {
|
||||
Map<String, List<Urn>> urnsPerEntityType = entityUrns.stream().collect(Collectors.groupingBy(Urn::getEntityType));
|
||||
Map<String, List<EdgeInfo>> edgesPerEntityType = urnsPerEntityType.keySet()
|
||||
.stream()
|
||||
@ -219,7 +220,7 @@ public class ESGraphQueryDAO {
|
||||
BoolQueryBuilder finalQuery = QueryBuilders.boolQuery();
|
||||
// Get all relation types relevant to the set of urns to hop from
|
||||
urnsPerEntityType.forEach((entityType, urns) -> finalQuery.should(
|
||||
getQueryForLineage(urns, edgesPerEntityType.getOrDefault(entityType, Collections.emptyList()))));
|
||||
getQueryForLineage(urns, edgesPerEntityType.getOrDefault(entityType, Collections.emptyList()), graphFilters)));
|
||||
SearchResponse response = executeSearchQuery(finalQuery, 0, MAX_ELASTIC_RESULT);
|
||||
Set<Urn> entityUrnSet = new HashSet<>(entityUrns);
|
||||
// Get all valid edges given the set of urns to hop from
|
||||
@ -248,7 +249,7 @@ public class ESGraphQueryDAO {
|
||||
// Skip if already visited
|
||||
// Skip if edge is not a valid outgoing edge
|
||||
if (!visitedEntities.contains(destinationUrn) && validEdges.contains(
|
||||
Pair.of(sourceUrn.getEntityType(), new EdgeInfo(type, RelationshipDirection.OUTGOING)))) {
|
||||
Pair.of(sourceUrn.getEntityType(), new EdgeInfo(type, RelationshipDirection.OUTGOING, destinationUrn.getEntityType().toLowerCase())))) {
|
||||
visitedEntities.add(destinationUrn);
|
||||
result.add(new LineageRelationship().setType(type).setEntity(destinationUrn).setDegree(numHops));
|
||||
}
|
||||
@ -259,7 +260,7 @@ public class ESGraphQueryDAO {
|
||||
// Skip if already visited
|
||||
// Skip if edge is not a valid outgoing edge
|
||||
if (!visitedEntities.contains(sourceUrn) && validEdges.contains(
|
||||
Pair.of(destinationUrn.getEntityType(), new EdgeInfo(type, RelationshipDirection.INCOMING)))) {
|
||||
Pair.of(destinationUrn.getEntityType(), new EdgeInfo(type, RelationshipDirection.INCOMING, sourceUrn.getEntityType().toLowerCase())))) {
|
||||
visitedEntities.add(sourceUrn);
|
||||
result.add(new LineageRelationship().setType(type).setEntity(sourceUrn).setDegree(numHops));
|
||||
}
|
||||
@ -268,8 +269,29 @@ public class ESGraphQueryDAO {
|
||||
return result;
|
||||
}
|
||||
|
||||
BoolQueryBuilder getOutGoingEdgeQuery(List<Urn> urns, List<EdgeInfo> outgoingEdges) {
|
||||
BoolQueryBuilder outgoingEdgeQuery = QueryBuilders.boolQuery();
|
||||
outgoingEdgeQuery.must(buildUrnFilters(urns, SOURCE));
|
||||
outgoingEdgeQuery.must(buildEdgeFilters(outgoingEdges));
|
||||
return outgoingEdgeQuery;
|
||||
}
|
||||
|
||||
BoolQueryBuilder getIncomingEdgeQuery(List<Urn> urns, List<EdgeInfo> incomingEdges) {
|
||||
BoolQueryBuilder incomingEdgeQuery = QueryBuilders.boolQuery();
|
||||
incomingEdgeQuery.must(buildUrnFilters(urns, DESTINATION));
|
||||
incomingEdgeQuery.must(buildEdgeFilters(incomingEdges));
|
||||
return incomingEdgeQuery;
|
||||
}
|
||||
|
||||
BoolQueryBuilder getAllowedEntityTypesFilter(GraphFilters graphFilters) {
|
||||
BoolQueryBuilder allowedEntityTypesFilter = QueryBuilders.boolQuery();
|
||||
allowedEntityTypesFilter.must(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), SOURCE));
|
||||
allowedEntityTypesFilter.must(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), DESTINATION));
|
||||
return allowedEntityTypesFilter;
|
||||
}
|
||||
|
||||
// Get search query for given list of edges and source urns
|
||||
public QueryBuilder getQueryForLineage(List<Urn> urns, List<EdgeInfo> lineageEdges) {
|
||||
public QueryBuilder getQueryForLineage(List<Urn> urns, List<EdgeInfo> lineageEdges, GraphFilters graphFilters) {
|
||||
BoolQueryBuilder query = QueryBuilders.boolQuery();
|
||||
if (lineageEdges.isEmpty()) {
|
||||
return query;
|
||||
@ -280,23 +302,27 @@ public class ESGraphQueryDAO {
|
||||
List<EdgeInfo> outgoingEdges =
|
||||
edgesByDirection.getOrDefault(RelationshipDirection.OUTGOING, Collections.emptyList());
|
||||
if (!outgoingEdges.isEmpty()) {
|
||||
BoolQueryBuilder outgoingEdgeQuery = QueryBuilders.boolQuery();
|
||||
outgoingEdgeQuery.must(buildUrnFilters(urns, SOURCE));
|
||||
outgoingEdgeQuery.must(buildEdgeFilters(outgoingEdges));
|
||||
query.should(outgoingEdgeQuery);
|
||||
query.should(getOutGoingEdgeQuery(urns, outgoingEdges));
|
||||
}
|
||||
|
||||
List<EdgeInfo> incomingEdges =
|
||||
edgesByDirection.getOrDefault(RelationshipDirection.INCOMING, Collections.emptyList());
|
||||
if (!incomingEdges.isEmpty()) {
|
||||
BoolQueryBuilder incomingEdgeQuery = QueryBuilders.boolQuery();
|
||||
incomingEdgeQuery.must(buildUrnFilters(urns, DESTINATION));
|
||||
incomingEdgeQuery.must(buildEdgeFilters(incomingEdges));
|
||||
query.should(incomingEdgeQuery);
|
||||
query.should(getIncomingEdgeQuery(urns, incomingEdges));
|
||||
}
|
||||
|
||||
if (graphFilters != null) {
|
||||
if (graphFilters.getAllowedEntityTypes() != null && !graphFilters.getAllowedEntityTypes().isEmpty()) {
|
||||
query.must(getAllowedEntityTypesFilter(graphFilters));
|
||||
}
|
||||
}
|
||||
return query;
|
||||
}
|
||||
|
||||
public QueryBuilder buildEntityTypesFilter(List<String> entityTypes, String prefix) {
|
||||
return QueryBuilders.termsQuery(prefix + ".entityType", entityTypes.stream().map(Object::toString).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
public QueryBuilder buildUrnFilters(List<Urn> urns, String prefix) {
|
||||
return QueryBuilders.termsQuery(prefix + ".urn", urns.stream().map(Object::toString).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
package com.linkedin.metadata.graph.elastic;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.linkedin.metadata.query.filter.Filter;
|
||||
import com.linkedin.metadata.query.filter.RelationshipFilter;
|
||||
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
|
||||
@ -50,8 +51,9 @@ public class ESGraphWriteDAO {
|
||||
@Nullable final String destinationType, @Nonnull final Filter destinationEntityFilter,
|
||||
@Nonnull final List<String> relationshipTypes, @Nonnull final RelationshipFilter relationshipFilter) {
|
||||
BoolQueryBuilder finalQuery =
|
||||
buildQuery(sourceType, sourceEntityFilter, destinationType, destinationEntityFilter, relationshipTypes,
|
||||
relationshipFilter);
|
||||
buildQuery(sourceType == null ? ImmutableList.of() : ImmutableList.of(sourceType), sourceEntityFilter,
|
||||
destinationType == null ? ImmutableList.of() : ImmutableList.of(destinationType), destinationEntityFilter,
|
||||
relationshipTypes, relationshipFilter);
|
||||
|
||||
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
|
||||
|
||||
|
||||
@ -6,6 +6,7 @@ import com.google.common.collect.ImmutableList;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.metadata.graph.Edge;
|
||||
import com.linkedin.metadata.graph.EntityLineageResult;
|
||||
import com.linkedin.metadata.graph.GraphFilters;
|
||||
import com.linkedin.metadata.graph.GraphService;
|
||||
import com.linkedin.metadata.graph.LineageDirection;
|
||||
import com.linkedin.metadata.models.registry.LineageRegistry;
|
||||
@ -108,22 +109,25 @@ public class ElasticSearchGraphService implements GraphService {
|
||||
|
||||
@Nonnull
|
||||
public RelatedEntitiesResult findRelatedEntities(
|
||||
@Nullable final String sourceType,
|
||||
@Nullable final List<String> sourceTypes,
|
||||
@Nonnull final Filter sourceEntityFilter,
|
||||
@Nullable final String destinationType,
|
||||
@Nullable final List<String> destinationTypes,
|
||||
@Nonnull final Filter destinationEntityFilter,
|
||||
@Nonnull final List<String> relationshipTypes,
|
||||
@Nonnull final RelationshipFilter relationshipFilter,
|
||||
final int offset,
|
||||
final int count) {
|
||||
if (sourceTypes != null && sourceTypes.isEmpty() || destinationTypes != null && destinationTypes.isEmpty()) {
|
||||
return new RelatedEntitiesResult(offset, 0, 0, Collections.emptyList());
|
||||
}
|
||||
|
||||
final RelationshipDirection relationshipDirection = relationshipFilter.getDirection();
|
||||
String destinationNode = relationshipDirection == RelationshipDirection.OUTGOING ? "destination" : "source";
|
||||
|
||||
SearchResponse response = _graphReadDAO.getSearchResponse(
|
||||
sourceType,
|
||||
sourceTypes,
|
||||
sourceEntityFilter,
|
||||
destinationType,
|
||||
destinationTypes,
|
||||
destinationEntityFilter,
|
||||
relationshipTypes,
|
||||
relationshipFilter,
|
||||
@ -160,10 +164,12 @@ public class ElasticSearchGraphService implements GraphService {
|
||||
@Nonnull
|
||||
@WithSpan
|
||||
@Override
|
||||
public EntityLineageResult getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDirection direction, int offset,
|
||||
public EntityLineageResult getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDirection direction,
|
||||
GraphFilters graphFilters,
|
||||
int offset,
|
||||
int count, int maxHops) {
|
||||
ESGraphQueryDAO.LineageResponse lineageResponse =
|
||||
_graphReadDAO.getLineage(entityUrn, direction, offset, count, maxHops);
|
||||
_graphReadDAO.getLineage(entityUrn, direction, graphFilters, offset, count, maxHops);
|
||||
return new EntityLineageResult().setRelationships(
|
||||
new LineageRelationshipArray(lineageResponse.getLineageRelationships()))
|
||||
.setStart(offset)
|
||||
|
||||
@ -23,6 +23,7 @@ import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.StringJoiner;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
import lombok.AllArgsConstructor;
|
||||
@ -94,9 +95,9 @@ public class Neo4jGraphService implements GraphService {
|
||||
|
||||
@Nonnull
|
||||
public RelatedEntitiesResult findRelatedEntities(
|
||||
@Nullable final String sourceType,
|
||||
@Nullable final List<String> sourceTypes,
|
||||
@Nonnull final Filter sourceEntityFilter,
|
||||
@Nullable final String destinationType,
|
||||
@Nullable final List<String> destinationTypes,
|
||||
@Nonnull final Filter destinationEntityFilter,
|
||||
@Nonnull final List<String> relationshipTypes,
|
||||
@Nonnull final RelationshipFilter relationshipFilter,
|
||||
@ -105,7 +106,7 @@ public class Neo4jGraphService implements GraphService {
|
||||
|
||||
log.debug(
|
||||
String.format("Finding related Neo4j nodes sourceType: %s, sourceEntityFilter: %s, destinationType: %s, ",
|
||||
sourceType, sourceEntityFilter, destinationType)
|
||||
sourceTypes, sourceEntityFilter, destinationTypes)
|
||||
+ String.format(
|
||||
"destinationEntityFilter: %s, relationshipTypes: %s, relationshipFilter: %s, ",
|
||||
destinationEntityFilter, relationshipTypes, relationshipFilter)
|
||||
@ -114,20 +115,24 @@ public class Neo4jGraphService implements GraphService {
|
||||
offset, count)
|
||||
);
|
||||
|
||||
final String srcCriteria = filterToCriteria(sourceEntityFilter);
|
||||
final String destCriteria = filterToCriteria(destinationEntityFilter);
|
||||
if (sourceTypes != null && sourceTypes.isEmpty() || destinationTypes != null && destinationTypes.isEmpty()) {
|
||||
return new RelatedEntitiesResult(offset, 0, 0, Collections.emptyList());
|
||||
}
|
||||
|
||||
final String srcCriteria = filterToCriteria(sourceEntityFilter).trim();
|
||||
final String destCriteria = filterToCriteria(destinationEntityFilter).trim();
|
||||
final String edgeCriteria = relationshipFilterToCriteria(relationshipFilter);
|
||||
|
||||
final RelationshipDirection relationshipDirection = relationshipFilter.getDirection();
|
||||
|
||||
String matchTemplate = "MATCH (src%s %s)-[r%s %s]-(dest%s %s)";
|
||||
String matchTemplate = "MATCH (src %s)-[r%s %s]-(dest %s)%s";
|
||||
if (relationshipDirection == RelationshipDirection.INCOMING) {
|
||||
matchTemplate = "MATCH (src%s %s)<-[r%s %s]-(dest%s %s)";
|
||||
matchTemplate = "MATCH (src %s)<-[r%s %s]-(dest %s)%s";
|
||||
} else if (relationshipDirection == RelationshipDirection.OUTGOING) {
|
||||
matchTemplate = "MATCH (src%s %s)-[r%s %s]->(dest%s %s)";
|
||||
matchTemplate = "MATCH (src %s)-[r%s %s]->(dest %s)%s";
|
||||
}
|
||||
|
||||
final String returnNodes = String.format("RETURN dest%s, type(r)", destinationType); // Return both related entity and the relationship type.
|
||||
final String returnNodes = String.format("RETURN dest, type(r)"); // Return both related entity and the relationship type.
|
||||
final String returnCount = "RETURN count(*)"; // For getting the total results.
|
||||
|
||||
String relationshipTypeFilter = "";
|
||||
@ -135,10 +140,13 @@ public class Neo4jGraphService implements GraphService {
|
||||
relationshipTypeFilter = ":" + StringUtils.join(relationshipTypes, "|");
|
||||
}
|
||||
|
||||
// Build Statement strings
|
||||
String whereClause = computeEntityTypeWhereClause(sourceTypes, destinationTypes);
|
||||
|
||||
// Build Statement strings
|
||||
String baseStatementString =
|
||||
String.format(matchTemplate, sourceType, srcCriteria, relationshipTypeFilter, edgeCriteria,
|
||||
destinationType, destCriteria);
|
||||
String.format(matchTemplate, srcCriteria, relationshipTypeFilter, edgeCriteria, destCriteria, whereClause);
|
||||
|
||||
log.info(baseStatementString);
|
||||
|
||||
final String resultStatementString = String.format("%s %s SKIP $offset LIMIT $count", baseStatementString, returnNodes);
|
||||
final String countStatementString = String.format("%s %s", baseStatementString, returnCount);
|
||||
@ -156,6 +164,29 @@ public class Neo4jGraphService implements GraphService {
|
||||
return new RelatedEntitiesResult(offset, relatedEntities.size(), totalCount, relatedEntities);
|
||||
}
|
||||
|
||||
private String computeEntityTypeWhereClause(@Nonnull final List<String> sourceTypes,
|
||||
@Nonnull final List<String> destinationTypes) {
|
||||
String whereClause = "";
|
||||
|
||||
Boolean hasSourceTypes = sourceTypes != null && !sourceTypes.isEmpty();
|
||||
Boolean hasDestTypes = destinationTypes != null && !destinationTypes.isEmpty();
|
||||
if (hasSourceTypes && hasDestTypes) {
|
||||
whereClause = String.format(" WHERE %s AND %s",
|
||||
sourceTypes.stream().map(type -> "src:" + type).collect(Collectors.joining(" OR ")),
|
||||
destinationTypes.stream().map(type -> "dest:" + type).collect(Collectors.joining(" OR "))
|
||||
);
|
||||
} else if (hasSourceTypes) {
|
||||
whereClause = String.format(" WHERE %s",
|
||||
sourceTypes.stream().map(type -> "src:" + type).collect(Collectors.joining(" OR "))
|
||||
);
|
||||
} else if (hasDestTypes) {
|
||||
whereClause = String.format(" WHERE %s",
|
||||
destinationTypes.stream().map(type -> "dest:" + type).collect(Collectors.joining(" OR "))
|
||||
);
|
||||
}
|
||||
return whereClause;
|
||||
}
|
||||
|
||||
public void removeNode(@Nonnull final Urn urn) {
|
||||
|
||||
log.debug(String.format("Removing Neo4j node with urn: %s", urn));
|
||||
@ -299,6 +330,15 @@ public class Neo4jGraphService implements GraphService {
|
||||
}
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
private static String toCriterionWhereString(@Nonnull String key, @Nonnull Object value) {
|
||||
if (ClassUtils.isPrimitiveOrWrapper(value.getClass())) {
|
||||
return key + " = " + value;
|
||||
}
|
||||
|
||||
return key + " = \"" + value.toString() + "\"";
|
||||
}
|
||||
|
||||
// Returns "key:value" String, if value is not primitive, then use toString() and double quote it
|
||||
@Nonnull
|
||||
private static String toCriterionString(@Nonnull String key, @Nonnull Object value) {
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
package com.linkedin.metadata.graph;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.linkedin.common.urn.DataFlowUrn;
|
||||
import com.linkedin.common.urn.DataJobUrn;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
@ -144,7 +145,7 @@ abstract public class GraphServiceTestBase {
|
||||
/**
|
||||
* Any source and destination type value.
|
||||
*/
|
||||
protected static @Nullable String anyType = null;
|
||||
protected static @Nullable List<String> anyType = null;
|
||||
|
||||
/**
|
||||
* Timeout used to test concurrent ops in doTestConcurrentOp.
|
||||
@ -735,12 +736,12 @@ abstract public class GraphServiceTestBase {
|
||||
}
|
||||
|
||||
@Test(dataProvider = "FindRelatedEntitiesSourceTypeTests")
|
||||
public void testFindRelatedEntitiesSourceType(String datasetType,
|
||||
public void testFindRelatedEntitiesSourceType(String entityTypeFilter,
|
||||
List<String> relationshipTypes,
|
||||
RelationshipFilter relationships,
|
||||
List<RelatedEntity> expectedRelatedEntities) throws Exception {
|
||||
doTestFindRelatedEntities(
|
||||
datasetType,
|
||||
entityTypeFilter != null ? ImmutableList.of(entityTypeFilter) : null,
|
||||
anyType,
|
||||
relationshipTypes,
|
||||
relationships,
|
||||
@ -861,13 +862,13 @@ abstract public class GraphServiceTestBase {
|
||||
}
|
||||
|
||||
@Test(dataProvider = "FindRelatedEntitiesDestinationTypeTests")
|
||||
public void testFindRelatedEntitiesDestinationType(String datasetType,
|
||||
public void testFindRelatedEntitiesDestinationType(String entityTypeFilter,
|
||||
List<String> relationshipTypes,
|
||||
RelationshipFilter relationships,
|
||||
List<RelatedEntity> expectedRelatedEntities) throws Exception {
|
||||
doTestFindRelatedEntities(
|
||||
anyType,
|
||||
datasetType,
|
||||
entityTypeFilter != null ? ImmutableList.of(entityTypeFilter) : null,
|
||||
relationshipTypes,
|
||||
relationships,
|
||||
expectedRelatedEntities
|
||||
@ -875,8 +876,8 @@ abstract public class GraphServiceTestBase {
|
||||
}
|
||||
|
||||
private void doTestFindRelatedEntities(
|
||||
final String sourceType,
|
||||
final String destinationType,
|
||||
final List<String> sourceType,
|
||||
final List<String> destinationType,
|
||||
final List<String> relationshipTypes,
|
||||
final RelationshipFilter relationshipFilter,
|
||||
List<RelatedEntity> expectedRelatedEntities
|
||||
@ -893,8 +894,8 @@ abstract public class GraphServiceTestBase {
|
||||
assertEqualsAnyOrder(relatedEntities, expectedRelatedEntities);
|
||||
}
|
||||
|
||||
private void doTestFindRelatedEntitiesEntityType(@Nullable String sourceType,
|
||||
@Nullable String destinationType,
|
||||
private void doTestFindRelatedEntitiesEntityType(@Nullable List<String> sourceType,
|
||||
@Nullable List<String> destinationType,
|
||||
@Nonnull String relationshipType,
|
||||
@Nonnull RelationshipFilter relationshipFilter,
|
||||
@Nonnull GraphService service,
|
||||
@ -916,17 +917,17 @@ abstract public class GraphServiceTestBase {
|
||||
assertNotNull(nullUrn);
|
||||
RelatedEntity nullRelatedEntity = new RelatedEntity(downstreamOf, nullUrn.toString());
|
||||
|
||||
doTestFindRelatedEntitiesEntityType(anyType, "null", downstreamOf, outgoingRelationships, service);
|
||||
doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service);
|
||||
doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service);
|
||||
|
||||
service.addEdge(new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf));
|
||||
syncAfterWrite();
|
||||
doTestFindRelatedEntitiesEntityType(anyType, "null", downstreamOf, outgoingRelationships, service);
|
||||
doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service);
|
||||
doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service, downstreamOfDatasetOneRelatedEntity);
|
||||
|
||||
service.addEdge(new Edge(datasetOneUrn, nullUrn, downstreamOf));
|
||||
syncAfterWrite();
|
||||
doTestFindRelatedEntitiesEntityType(anyType, "null", downstreamOf, outgoingRelationships, service, nullRelatedEntity);
|
||||
doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service, nullRelatedEntity);
|
||||
doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service, nullRelatedEntity, downstreamOfDatasetOneRelatedEntity);
|
||||
}
|
||||
|
||||
@ -938,17 +939,17 @@ abstract public class GraphServiceTestBase {
|
||||
assertNotNull(nullUrn);
|
||||
RelatedEntity nullRelatedEntity = new RelatedEntity(downstreamOf, nullUrn.toString());
|
||||
|
||||
doTestFindRelatedEntitiesEntityType(anyType, "null", downstreamOf, outgoingRelationships, service);
|
||||
doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service);
|
||||
doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service);
|
||||
|
||||
service.addEdge(new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf));
|
||||
syncAfterWrite();
|
||||
doTestFindRelatedEntitiesEntityType(anyType, "null", downstreamOf, outgoingRelationships, service);
|
||||
doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service);
|
||||
doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service, downstreamOfDatasetOneRelatedEntity);
|
||||
|
||||
service.addEdge(new Edge(datasetOneUrn, nullUrn, downstreamOf));
|
||||
syncAfterWrite();
|
||||
doTestFindRelatedEntitiesEntityType(anyType, "null", downstreamOf, outgoingRelationships, service, nullRelatedEntity);
|
||||
doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service, nullRelatedEntity);
|
||||
doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service, nullRelatedEntity, downstreamOfDatasetOneRelatedEntity);
|
||||
}
|
||||
|
||||
@ -1038,8 +1039,8 @@ abstract public class GraphServiceTestBase {
|
||||
GraphService service = getPopulatedGraphService();
|
||||
|
||||
RelatedEntitiesResult relatedEntities = service.findRelatedEntities(
|
||||
datasetType, newFilter("urn", datasetOneUrnString),
|
||||
userType, newFilter("urn", userOneUrnString),
|
||||
ImmutableList.of(datasetType), newFilter("urn", datasetOneUrnString),
|
||||
ImmutableList.of(userType), newFilter("urn", userOneUrnString),
|
||||
Arrays.asList(hasOwner), outgoingRelationships,
|
||||
0, 10
|
||||
);
|
||||
@ -1047,8 +1048,8 @@ abstract public class GraphServiceTestBase {
|
||||
assertEquals(relatedEntities.entities, Arrays.asList(hasOwnerUserOneRelatedEntity));
|
||||
|
||||
relatedEntities = service.findRelatedEntities(
|
||||
datasetType, newFilter("urn", datasetOneUrnString),
|
||||
userType, newFilter("urn", userTwoUrnString),
|
||||
ImmutableList.of(datasetType), newFilter("urn", datasetOneUrnString),
|
||||
ImmutableList.of(userType), newFilter("urn", userTwoUrnString),
|
||||
Arrays.asList(hasOwner), incomingRelationships,
|
||||
0, 10
|
||||
);
|
||||
@ -1056,13 +1057,36 @@ abstract public class GraphServiceTestBase {
|
||||
assertEquals(relatedEntities.entities, Collections.emptyList());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindRelatedEntitiesMultipleEntityTypes() throws Exception {
|
||||
GraphService service = getPopulatedGraphService();
|
||||
|
||||
RelatedEntitiesResult relatedEntities = service.findRelatedEntities(
|
||||
ImmutableList.of(datasetType, userType), newFilter("urn", datasetOneUrnString),
|
||||
ImmutableList.of(datasetType, userType), newFilter("urn", userOneUrnString),
|
||||
Arrays.asList(hasOwner), outgoingRelationships,
|
||||
0, 10
|
||||
);
|
||||
|
||||
assertEquals(relatedEntities.entities, Arrays.asList(hasOwnerUserOneRelatedEntity));
|
||||
|
||||
relatedEntities = service.findRelatedEntities(
|
||||
ImmutableList.of(datasetType, userType), newFilter("urn", datasetOneUrnString),
|
||||
ImmutableList.of(datasetType, userType), newFilter("urn", userTwoUrnString),
|
||||
Arrays.asList(hasOwner), incomingRelationships,
|
||||
0, 10
|
||||
);
|
||||
|
||||
assertEquals(relatedEntities.entities, Collections.emptyList());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindRelatedEntitiesOffsetAndCount() throws Exception {
|
||||
GraphService service = getPopulatedGraphService();
|
||||
|
||||
// populated graph asserted in testPopulatedGraphService
|
||||
RelatedEntitiesResult allRelatedEntities = service.findRelatedEntities(
|
||||
datasetType, EMPTY_FILTER,
|
||||
ImmutableList.of(datasetType), EMPTY_FILTER,
|
||||
anyType, EMPTY_FILTER,
|
||||
Arrays.asList(downstreamOf, hasOwner, knowsUser), outgoingRelationships,
|
||||
0, 100
|
||||
@ -1072,7 +1096,7 @@ abstract public class GraphServiceTestBase {
|
||||
IntStream.range(0, allRelatedEntities.entities.size())
|
||||
.forEach(idx -> individualRelatedEntities.addAll(
|
||||
service.findRelatedEntities(
|
||||
datasetType, EMPTY_FILTER,
|
||||
ImmutableList.of(datasetType), EMPTY_FILTER,
|
||||
anyType, EMPTY_FILTER,
|
||||
Arrays.asList(downstreamOf, hasOwner, knowsUser), outgoingRelationships,
|
||||
idx, 1
|
||||
@ -1360,7 +1384,7 @@ abstract public class GraphServiceTestBase {
|
||||
// assert the modified graph: check all nodes related to upstreamOf and nextVersionOf edges again
|
||||
assertEqualsAnyOrder(
|
||||
service.findRelatedEntities(
|
||||
datasetType, EMPTY_FILTER,
|
||||
ImmutableList.of(datasetType), EMPTY_FILTER,
|
||||
anyType, EMPTY_FILTER,
|
||||
Arrays.asList(downstreamOf), outgoingRelationships,
|
||||
0, 100
|
||||
@ -1369,7 +1393,7 @@ abstract public class GraphServiceTestBase {
|
||||
);
|
||||
assertEqualsAnyOrder(
|
||||
service.findRelatedEntities(
|
||||
userType, EMPTY_FILTER,
|
||||
ImmutableList.of(userType), EMPTY_FILTER,
|
||||
anyType, EMPTY_FILTER,
|
||||
Arrays.asList(hasOwner), outgoingRelationships,
|
||||
0, 100
|
||||
@ -1379,7 +1403,7 @@ abstract public class GraphServiceTestBase {
|
||||
assertEqualsAnyOrder(
|
||||
service.findRelatedEntities(
|
||||
anyType, EMPTY_FILTER,
|
||||
userType, EMPTY_FILTER,
|
||||
ImmutableList.of(userType), EMPTY_FILTER,
|
||||
Arrays.asList(knowsUser), outgoingRelationships,
|
||||
0, 100
|
||||
),
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
package com.linkedin.metadata.graph.dgraph;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.linkedin.metadata.graph.GraphService;
|
||||
import com.linkedin.metadata.graph.GraphServiceTestBase;
|
||||
import com.linkedin.metadata.graph.RelatedEntity;
|
||||
@ -532,8 +533,8 @@ public class DgraphGraphServiceTest extends GraphServiceTestBase {
|
||||
public void testGetQueryForRelatedEntitiesOutgoing() {
|
||||
doTestGetQueryForRelatedEntitiesDirection(RelationshipDirection.OUTGOING,
|
||||
"query {\n"
|
||||
+ " sourceType as var(func: eq(<type>, \"sourceType\"))\n"
|
||||
+ " destinationType as var(func: eq(<type>, \"destinationType\"))\n"
|
||||
+ " sourceType as var(func: eq(<type>, [\"sourceType\"]))\n"
|
||||
+ " destinationType as var(func: eq(<type>, [\"destinationType\"]))\n"
|
||||
+ " sourceFilter1 as var(func: eq(<urn>, \"urn:ns:type:source-key\"))\n"
|
||||
+ " sourceFilter2 as var(func: eq(<key>, \"source-key\"))\n"
|
||||
+ " destinationFilter1 as var(func: eq(<urn>, \"urn:ns:type:dest-key\"))\n"
|
||||
@ -565,8 +566,8 @@ public class DgraphGraphServiceTest extends GraphServiceTestBase {
|
||||
public void testGetQueryForRelatedEntitiesIncoming() {
|
||||
doTestGetQueryForRelatedEntitiesDirection(RelationshipDirection.INCOMING,
|
||||
"query {\n"
|
||||
+ " sourceType as var(func: eq(<type>, \"sourceType\"))\n"
|
||||
+ " destinationType as var(func: eq(<type>, \"destinationType\"))\n"
|
||||
+ " sourceType as var(func: eq(<type>, [\"sourceType\"]))\n"
|
||||
+ " destinationType as var(func: eq(<type>, [\"destinationType\"]))\n"
|
||||
+ " sourceFilter1 as var(func: eq(<urn>, \"urn:ns:type:source-key\"))\n"
|
||||
+ " sourceFilter2 as var(func: eq(<key>, \"source-key\"))\n"
|
||||
+ " destinationFilter1 as var(func: eq(<urn>, \"urn:ns:type:dest-key\"))\n"
|
||||
@ -598,8 +599,8 @@ public class DgraphGraphServiceTest extends GraphServiceTestBase {
|
||||
public void testGetQueryForRelatedEntitiesUndirected() {
|
||||
doTestGetQueryForRelatedEntitiesDirection(RelationshipDirection.UNDIRECTED,
|
||||
"query {\n"
|
||||
+ " sourceType as var(func: eq(<type>, \"sourceType\"))\n"
|
||||
+ " destinationType as var(func: eq(<type>, \"destinationType\"))\n"
|
||||
+ " sourceType as var(func: eq(<type>, [\"sourceType\"]))\n"
|
||||
+ " destinationType as var(func: eq(<type>, [\"destinationType\"]))\n"
|
||||
+ " sourceFilter1 as var(func: eq(<urn>, \"urn:ns:type:source-key\"))\n"
|
||||
+ " sourceFilter2 as var(func: eq(<key>, \"source-key\"))\n"
|
||||
+ " destinationFilter1 as var(func: eq(<urn>, \"urn:ns:type:dest-key\"))\n"
|
||||
@ -638,12 +639,12 @@ public class DgraphGraphServiceTest extends GraphServiceTestBase {
|
||||
private void doTestGetQueryForRelatedEntitiesDirection(@Nonnull RelationshipDirection direction, @Nonnull String expectedQuery) {
|
||||
assertEquals(
|
||||
DgraphGraphService.getQueryForRelatedEntities(
|
||||
"sourceType",
|
||||
ImmutableList.of("sourceType"),
|
||||
newFilter(new HashMap<String, String>() {{
|
||||
put("urn", "urn:ns:type:source-key");
|
||||
put("key", "source-key");
|
||||
}}),
|
||||
"destinationType",
|
||||
ImmutableList.of("destinationType"),
|
||||
newFilter(new HashMap<String, String>() {{
|
||||
put("urn", "urn:ns:type:dest-key");
|
||||
put("key", "dest-key");
|
||||
|
||||
@ -67,7 +67,7 @@ public final class Relationships extends SimpleResourceTemplate<EntityRelationsh
|
||||
start = start == null ? 0 : start;
|
||||
count = count == null ? MAX_DOWNSTREAM_CNT : count;
|
||||
|
||||
return _graphService.findRelatedEntities("", newFilter("urn", rawUrn), "", QueryUtils.EMPTY_FILTER,
|
||||
return _graphService.findRelatedEntities(null, newFilter("urn", rawUrn), null, QueryUtils.EMPTY_FILTER,
|
||||
relationshipTypes, newRelationshipFilter(QueryUtils.EMPTY_FILTER, direction), start, count);
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user