fix(metadata-io): in Neo4j service use proper algorithm to get lineage (#8687)

Co-authored-by: RyanHolstien <RyanHolstien@users.noreply.github.com>
Co-authored-by: david-leifker <114954101+david-leifker@users.noreply.github.com>
This commit is contained in:
Xuelei Li 2023-11-10 17:58:38 +01:00 committed by GitHub
parent 9c0f4de382
commit 179f103412
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 266 additions and 114 deletions

View File

@ -7,6 +7,8 @@ buildscript {
ext.springBootVersion = '2.7.14'
ext.openTelemetryVersion = '1.18.0'
ext.neo4jVersion = '4.4.9'
ext.neo4jTestVersion = '4.4.25'
ext.neo4jApocVersion = '4.4.0.20:all'
ext.testContainersVersion = '1.17.4'
ext.elasticsearchVersion = '2.9.0' // ES 7.10, Opensearch 1.x, 2.x
ext.jacksonVersion = '2.15.2'
@ -154,8 +156,10 @@ project.ext.externalDependency = [
'mockServer': 'org.mock-server:mockserver-netty:5.11.2',
'mockServerClient': 'org.mock-server:mockserver-client-java:5.11.2',
'mysqlConnector': 'mysql:mysql-connector-java:8.0.20',
'neo4jHarness': 'org.neo4j.test:neo4j-harness:' + neo4jVersion,
'neo4jHarness': 'org.neo4j.test:neo4j-harness:' + neo4jTestVersion,
'neo4jJavaDriver': 'org.neo4j.driver:neo4j-java-driver:' + neo4jVersion,
'neo4jTestJavaDriver': 'org.neo4j.driver:neo4j-java-driver:' + neo4jTestVersion,
'neo4jApoc': 'org.neo4j.procedure:apoc:' + neo4jApocVersion,
'opentelemetryApi': 'io.opentelemetry:opentelemetry-api:' + openTelemetryVersion,
'opentelemetryAnnotations': 'io.opentelemetry:opentelemetry-extension-annotations:' + openTelemetryVersion,
'opentracingJdbc':'io.opentracing.contrib:opentracing-jdbc:0.2.15',
@ -218,7 +222,7 @@ project.ext.externalDependency = [
'common': 'commons-io:commons-io:2.7',
'jline':'jline:jline:1.4.1',
'jetbrains':' org.jetbrains.kotlin:kotlin-stdlib:1.6.0'
]
allprojects {

View File

@ -1,3 +1,4 @@
NEO4J_AUTH=neo4j/datahub
NEO4J_dbms_default__database=graph.db
NEO4J_dbms_allow__upgrade=true
NEO4JLABS_PLUGINS="[\"apoc\"]"

View File

@ -253,6 +253,7 @@ services:
- NEO4J_AUTH=neo4j/datahub
- NEO4J_dbms_default__database=graph.db
- NEO4J_dbms_allow__upgrade=true
- NEO4JLABS_PLUGINS=["apoc"]
healthcheck:
interval: 1s
retries: 5

View File

@ -253,6 +253,7 @@ services:
- NEO4J_AUTH=neo4j/datahub
- NEO4J_dbms_default__database=graph.db
- NEO4J_dbms_allow__upgrade=true
- NEO4JLABS_PLUGINS=["apoc"]
healthcheck:
interval: 1s
retries: 5

View File

@ -16,6 +16,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
### Breaking Changes
- #8687 (datahub-helm #365 #353) - If Helm is used for installation and Neo4j is enabled, update the prerequisites Helm chart to version >=0.1.2 and adjust your value overrides in the `neo4j:` section according to the new structure.
- #9044 - GraphQL APIs for adding ownership now expect either an `ownershipTypeUrn` referencing a customer ownership type or a (deprecated) `type`. Where before adding an ownership without a concrete type was allowed, this is no longer the case. For simplicity you can use the `type` parameter which will get translated to a custom ownership type internally if one exists for the type being added.
- #9010 - In Redshift source's config `incremental_lineage` is set default to off.
- #8810 - Removed support for SQLAlchemy 1.3.x. Only SQLAlchemy 1.4.x is supported now.

View File

@ -57,6 +57,9 @@ dependencies {
testImplementation externalDependency.h2
testImplementation externalDependency.mysqlConnector
testImplementation externalDependency.neo4jHarness
testImplementation (externalDependency.neo4jApoc) {
exclude group: 'org.yaml', module: 'snakeyaml'
}
testImplementation externalDependency.mockito
testImplementation externalDependency.mockitoInline
testImplementation externalDependency.iStackCommons

View File

@ -5,6 +5,7 @@ import com.datahub.util.Statement;
import com.datahub.util.exception.RetryLimitReached;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.linkedin.common.UrnArray;
import com.linkedin.common.UrnArrayArray;
import com.linkedin.common.urn.Urn;
@ -25,17 +26,20 @@ import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.query.filter.RelationshipDirection;
import com.linkedin.metadata.query.filter.RelationshipFilter;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.util.Pair;
import io.opentelemetry.extension.annotations.WithSpan;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.StringJoiner;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
@ -50,8 +54,7 @@ import org.neo4j.driver.Result;
import org.neo4j.driver.Session;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.exceptions.Neo4jException;
import org.neo4j.driver.internal.InternalRelationship;
import org.neo4j.driver.types.Node;
import org.neo4j.driver.types.Relationship;
@Slf4j
@ -62,9 +65,6 @@ public class Neo4jGraphService implements GraphService {
private final Driver _driver;
private SessionConfig _sessionConfig;
private static final String SOURCE = "source";
private static final String UI = "UI";
public Neo4jGraphService(@Nonnull LineageRegistry lineageRegistry, @Nonnull Driver driver) {
this(lineageRegistry, driver, SessionConfig.defaultConfig());
}
@ -234,53 +234,36 @@ public class Neo4jGraphService implements GraphService {
@Nullable Long endTimeMillis) {
log.debug(String.format("Neo4j getLineage maxHops = %d", maxHops));
final String statement =
generateLineageStatement(entityUrn, direction, graphFilters, maxHops, startTimeMillis, endTimeMillis);
final var statementAndParams =
generateLineageStatementAndParameters(entityUrn, direction, graphFilters, maxHops, startTimeMillis, endTimeMillis);
final var statement = statementAndParams.getFirst();
final var parameters = statementAndParams.getSecond();
List<Record> neo4jResult =
statement != null ? runQuery(buildStatement(statement, new HashMap<>())).list() : new ArrayList<>();
// It is possible to have more than 1 path from node A to node B in the graph and previous query returns all the paths.
// We convert the List into Map with only the shortest paths. "item.get(i).size()" is the path size between two nodes in relation.
// The key for mapping is the destination node as the source node is always the same, and it is defined by parameter.
neo4jResult = neo4jResult.stream()
.collect(Collectors.toMap(item -> item.values().get(2).asNode().get("urn").asString(), Function.identity(),
(item1, item2) -> item1.get(1).size() < item2.get(1).size() ? item1 : item2))
.values()
.stream()
.collect(Collectors.toList());
statement != null ? runQuery(buildStatement(statement, parameters)).list() : new ArrayList<>();
LineageRelationshipArray relations = new LineageRelationshipArray();
neo4jResult.stream().skip(offset).limit(count).forEach(item -> {
String urn = item.values().get(2).asNode().get("urn").asString();
String relationType = ((InternalRelationship) item.get(1).asList().get(0)).type().split("r_")[1];
int numHops = item.get(1).size();
try {
// Generate path from r in neo4jResult
List<Urn> pathFromRelationships =
item.values().get(1).asList(Collections.singletonList(new ArrayList<Node>())).stream().map(t -> createFromString(
// Get real upstream node/downstream node by direction
((InternalRelationship) t).get(direction == LineageDirection.UPSTREAM ? "startUrn" : "endUrn")
.asString())).collect(Collectors.toList());
if (direction == LineageDirection.UPSTREAM) {
// For ui to show path correctly, reverse path for UPSTREAM direction
Collections.reverse(pathFromRelationships);
// Add missing original node to the end since we generate path from relationships
pathFromRelationships.add(Urn.createFromString(item.values().get(0).asNode().get("urn").asString()));
} else {
// Add missing original node to the beginning since we generate path from relationships
pathFromRelationships.add(0, Urn.createFromString(item.values().get(0).asNode().get("urn").asString()));
}
final var path = item.get(1).asPath();
final List<Urn> nodeListAsPath = StreamSupport.stream(
path.nodes().spliterator(), false)
.map(node -> createFromString(node.get("urn").asString()))
.collect(Collectors.toList());
final var firstRelationship = Optional.ofNullable(Iterables.getFirst(path.relationships(), null));
relations.add(new LineageRelationship().setEntity(Urn.createFromString(urn))
.setType(relationType)
.setDegree(numHops)
.setPaths(new UrnArrayArray(new UrnArray(pathFromRelationships))));
// although firstRelationship should never be absent, provide "" as fallback value
.setType(firstRelationship.map(Relationship::type).orElse(""))
.setDegree(path.length())
.setPaths(new UrnArrayArray(new UrnArray(nodeListAsPath))));
} catch (URISyntaxException ignored) {
log.warn(String.format("Can't convert urn = %s, Error = %s", urn, ignored.getMessage()));
}
});
EntityLineageResult result = new EntityLineageResult().setStart(offset)
.setCount(relations.size())
.setRelationships(relations)
@ -290,31 +273,104 @@ public class Neo4jGraphService implements GraphService {
return result;
}
private String generateLineageStatement(@Nonnull Urn entityUrn, @Nonnull LineageDirection direction,
GraphFilters graphFilters, int maxHops, @Nullable Long startTimeMillis, @Nullable Long endTimeMillis) {
String statement;
final String allowedEntityTypes = String.join(" OR b:", graphFilters.getAllowedEntityTypes());
private String getPathFindingLabelFilter(List<String> entityNames) {
return entityNames.stream().map(x -> String.format("+%s", x)).collect(Collectors.joining("|"));
}
final String multiHopMatchTemplateIndirect = "MATCH p = shortestPath((a {urn: '%s'})<-[r*1..%d]-(b)) ";
final String multiHopMatchTemplateDirect = "MATCH p = shortestPath((a {urn: '%s'})-[r*1..%d]->(b)) ";
// directionFilterTemplate should apply to all condition.
final String multiHopMatchTemplate =
direction == LineageDirection.UPSTREAM ? multiHopMatchTemplateIndirect : multiHopMatchTemplateDirect;
final String fullQueryTemplate = generateFullQueryTemplate(multiHopMatchTemplate, startTimeMillis, endTimeMillis);
if (startTimeMillis != null && endTimeMillis != null) {
statement =
String.format(fullQueryTemplate, startTimeMillis, endTimeMillis, entityUrn, maxHops, allowedEntityTypes,
entityUrn);
} else if (startTimeMillis != null) {
statement = String.format(fullQueryTemplate, startTimeMillis, entityUrn, maxHops, allowedEntityTypes, entityUrn);
} else if (endTimeMillis != null) {
statement = String.format(fullQueryTemplate, endTimeMillis, entityUrn, maxHops, allowedEntityTypes, entityUrn);
} else {
statement = String.format(fullQueryTemplate, entityUrn, maxHops, allowedEntityTypes, entityUrn);
private String getPathFindingRelationshipFilter(@Nonnull List<String> entityNames, @Nullable LineageDirection direction) {
// relationshipFilter supports mixing different directions for various relation types,
// so simply transform entries lineage registry into format of filter
final var filterComponents = new HashSet<String>();
for (final var entityName : entityNames) {
if (direction != null) {
for (final var edgeInfo : _lineageRegistry.getLineageRelationships(entityName, direction)) {
final var type = edgeInfo.getType();
if (edgeInfo.getDirection() == RelationshipDirection.INCOMING) {
filterComponents.add("<" + type);
} else {
filterComponents.add(type + ">");
}
}
} else {
// return disjunctive combination of edge types regardless of direction
for (final var direction1 : List.of(LineageDirection.UPSTREAM, LineageDirection.DOWNSTREAM)) {
for (final var edgeInfo : _lineageRegistry.getLineageRelationships(entityName, direction1)) {
filterComponents.add(edgeInfo.getType());
}
}
}
}
return String.join("|", filterComponents);
}
return statement;
private Pair<String, Map<String, Object>> generateLineageStatementAndParameters(
@Nonnull Urn entityUrn, @Nonnull LineageDirection direction,
GraphFilters graphFilters, int maxHops,
@Nullable Long startTimeMillis, @Nullable Long endTimeMillis) {
final var parameterMap = new HashMap<String, Object>(Map.of(
"urn", entityUrn.toString(),
"labelFilter", getPathFindingLabelFilter(graphFilters.getAllowedEntityTypes()),
"relationshipFilter", getPathFindingRelationshipFilter(graphFilters.getAllowedEntityTypes(), direction),
"maxHops", maxHops
));
if (startTimeMillis == null && endTimeMillis == null) {
// if no time filtering required, simply find all expansion paths to other nodes
final var statement = "MATCH (a {urn: $urn}) "
+ "CALL apoc.path.spanningTree(a, { "
+ " relationshipFilter: $relationshipFilter, "
+ " labelFilter: $labelFilter, "
+ " minLevel: 1, "
+ " maxLevel: $maxHops "
+ "}) "
+ "YIELD path "
+ "WITH a, path AS path "
+ "RETURN a, path, last(nodes(path));";
return Pair.of(statement, parameterMap);
} else {
// when needing time filtering, possibility on multiple paths between two
// nodes must be considered, and we need to construct more complex query
// use r_ edges until they are no longer useful
final var relationFilter = getPathFindingRelationshipFilter(graphFilters.getAllowedEntityTypes(), null)
.replaceAll("(\\w+)", "r_$1");
final var relationshipPattern =
String.format(
(direction == LineageDirection.UPSTREAM ? "<-[:%s*1..%d]-" : "-[:%s*1..%d]->"),
relationFilter, maxHops);
// two steps:
// 1. find list of nodes reachable within maxHops
// 2. find the shortest paths from start node to every other node in these nodes
// (note: according to the docs of shortestPath, WHERE conditions are applied during path exploration, not
// after path exploration is done)
final var statement = "MATCH (a {urn: $urn}) "
+ "CALL apoc.path.subgraphNodes(a, { "
+ " relationshipFilter: $relationshipFilter, "
+ " labelFilter: $labelFilter, "
+ " minLevel: 1, "
+ " maxLevel: $maxHops "
+ "}) "
+ "YIELD node AS b "
+ "WITH a, b "
+ "MATCH path = shortestPath((a)" + relationshipPattern + "(b)) "
+ "WHERE a <> b "
+ " AND ALL(rt IN relationships(path) WHERE "
+ " (EXISTS(rt.source) AND rt.source = 'UI') OR "
+ " (NOT EXISTS(rt.createdOn) AND NOT EXISTS(rt.updatedOn)) OR "
+ " ($startTimeMillis <= rt.createdOn <= $endTimeMillis OR "
+ " $startTimeMillis <= rt.updatedOn <= $endTimeMillis) "
+ " ) "
+ "RETURN a, path, b;";
// provide dummy start/end time when not provided, so no need to
// format clause differently if either of them is missing
parameterMap.put("startTimeMillis", startTimeMillis == null ? 0 : startTimeMillis);
parameterMap.put("endTimeMillis", endTimeMillis == null ? System.currentTimeMillis() : endTimeMillis);
return Pair.of(statement, parameterMap);
}
}
@Nonnull
@ -583,15 +639,6 @@ public class Neo4jGraphService implements GraphService {
}
}
@Nonnull
private static String toCriterionWhereString(@Nonnull String key, @Nonnull Object value) {
if (ClassUtils.isPrimitiveOrWrapper(value.getClass())) {
return key + " = " + value;
}
return key + " = \"" + value.toString() + "\"";
}
// Returns "key:value" String, if value is not primitive, then use toString() and double quote it
@Nonnull
private static String toCriterionString(@Nonnull String key, @Nonnull Object value) {
@ -715,44 +762,4 @@ public class Neo4jGraphService implements GraphService {
return null;
}
}
private String generateFullQueryTemplate(@Nonnull String multiHopMatchTemplate, @Nullable Long startTimeMillis, @Nullable Long endTimeMillis) {
final String sourceUiCheck = String.format("(EXISTS(rt.%s) AND rt.%s = '%s') ", SOURCE, SOURCE, UI);
final String whereTemplate = "WHERE (b:%s) AND b.urn <> '%s' ";
final String returnTemplate = "RETURN a,r,b";
String withTimeTemplate = "";
String timeFilterConditionTemplate = "AND ALL(rt IN relationships(p) WHERE left(type(rt), 2)='r_')";
if (startTimeMillis != null && endTimeMillis != null) {
withTimeTemplate = "WITH %d as startTimeMillis, %d as endTimeMillis ";
timeFilterConditionTemplate =
"AND ALL(rt IN relationships(p) WHERE " + sourceUiCheck + "OR "
+ "(NOT EXISTS(rt.createdOn) AND NOT EXISTS(rt.updatedOn)) OR "
+ "((rt.createdOn >= startTimeMillis AND rt.createdOn <= endTimeMillis) OR "
+ "(rt.updatedOn >= startTimeMillis AND rt.updatedOn <= endTimeMillis))) "
+ "AND ALL(rt IN relationships(p) WHERE left(type(rt), 2)='r_')";
} else if (startTimeMillis != null) {
withTimeTemplate = "WITH %d as startTimeMillis ";
timeFilterConditionTemplate =
"AND ALL(rt IN relationships(p) WHERE " + sourceUiCheck + "OR "
+ "(NOT EXISTS(rt.createdOn) AND NOT EXISTS(rt.updatedOn)) OR "
+ "(rt.createdOn >= startTimeMillis OR rt.updatedOn >= startTimeMillis)) "
+ "AND ALL(rt IN relationships(p) WHERE left(type(rt), 2)='r_')";
} else if (endTimeMillis != null) {
withTimeTemplate = "WITH %d as endTimeMillis ";
timeFilterConditionTemplate =
"AND ALL(rt IN relationships(p) WHERE " + sourceUiCheck + "OR "
+ "(NOT EXISTS(rt.createdOn) AND NOT EXISTS(rt.updatedOn)) OR "
+ "(rt.createdOn <= endTimeMillis OR rt.updatedOn <= endTimeMillis)) "
+ "AND ALL(rt IN relationships(p) WHERE left(type(rt), 2)='r_')";
}
final StringJoiner fullQueryTemplateJoiner = new StringJoiner(" ");
fullQueryTemplateJoiner.add(withTimeTemplate);
fullQueryTemplateJoiner.add(multiHopMatchTemplate);
fullQueryTemplateJoiner.add(whereTemplate);
fullQueryTemplateJoiner.add(timeFilterConditionTemplate);
fullQueryTemplateJoiner.add(returnTemplate);
return fullQueryTemplateJoiner.toString();
}
}

View File

@ -1,6 +1,7 @@
package com.linkedin.metadata.graph.neo4j;
import com.linkedin.common.FabricType;
import com.linkedin.common.UrnArray;
import com.linkedin.common.urn.DataPlatformUrn;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.TagUrn;
@ -17,6 +18,7 @@ import com.linkedin.metadata.query.filter.RelationshipDirection;
import com.linkedin.metadata.query.filter.RelationshipFilter;
import java.util.Arrays;
import java.util.Collections;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.testng.SkipException;
@ -29,6 +31,8 @@ import javax.annotation.Nonnull;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static com.linkedin.metadata.search.utils.QueryUtils.*;
import static org.testng.Assert.assertEquals;
@ -194,11 +198,82 @@ public class Neo4jGraphServiceTest extends GraphServiceTestBase {
assertEquals(result.getTotal(), 0);
}
private Set<UrnArray> getPathUrnArraysFromLineageResult(EntityLineageResult result) {
return result.getRelationships()
.stream()
.map(x -> x.getPaths().get(0))
.collect(Collectors.toSet());
}
@Test
public void testGetLineage() {
GraphService service = getGraphService();
List<Edge> edges = Arrays.asList(
// d1 <-Consumes- dj1 -Produces-> d2 <-DownstreamOf- d3 <-DownstreamOf- d5
new Edge(dataJobOneUrn, datasetOneUrn, consumes, 1L, null, 3L, null, null),
new Edge(dataJobOneUrn, datasetTwoUrn, produces, 5L, null, 7L, null, null),
new Edge(datasetThreeUrn, datasetTwoUrn, downstreamOf, 9L, null, null, null, null),
new Edge(datasetFiveUrn, datasetThreeUrn, downstreamOf, 11L, null, null, null, null),
// another path between d2 and d5 which is shorter
// d1 <-DownstreamOf- d4 <-DownstreamOf- d5
new Edge(datasetFourUrn, datasetOneUrn, downstreamOf, 13L, null, 13L, null, null),
new Edge(datasetFiveUrn, datasetFourUrn, downstreamOf, 13L, null, 13L, null, null)
);
edges.forEach(service::addEdge);
// simple path finding
final var upstreamLineageDataset3Hop3 = service.getLineage(datasetThreeUrn, LineageDirection.UPSTREAM, 0, 1000, 3);
assertEquals(upstreamLineageDataset3Hop3.getTotal().intValue(), 3);
assertEquals(
getPathUrnArraysFromLineageResult(upstreamLineageDataset3Hop3),
Set.of(
new UrnArray(datasetThreeUrn, datasetTwoUrn),
new UrnArray(datasetThreeUrn, datasetTwoUrn, dataJobOneUrn),
new UrnArray(datasetThreeUrn, datasetTwoUrn, dataJobOneUrn, datasetOneUrn)));
// simple path finding
final var upstreamLineageDatasetFiveHop2 = service.getLineage(datasetFiveUrn, LineageDirection.UPSTREAM, 0, 1000, 2);
assertEquals(upstreamLineageDatasetFiveHop2.getTotal().intValue(), 4);
assertEquals(
getPathUrnArraysFromLineageResult(upstreamLineageDatasetFiveHop2),
Set.of(
new UrnArray(datasetFiveUrn, datasetThreeUrn),
new UrnArray(datasetFiveUrn, datasetThreeUrn, datasetTwoUrn),
new UrnArray(datasetFiveUrn, datasetFourUrn),
new UrnArray(datasetFiveUrn, datasetFourUrn, datasetOneUrn)));
// there are two paths from p5 to p1, one longer and one shorter, and the longer one is discarded from result
final var upstreamLineageDataset5Hop5 = service.getLineage(datasetFiveUrn, LineageDirection.UPSTREAM, 0, 1000, 5);
assertEquals(upstreamLineageDataset5Hop5.getTotal().intValue(), 5);
assertEquals(
getPathUrnArraysFromLineageResult(upstreamLineageDataset5Hop5),
Set.of(
new UrnArray(datasetFiveUrn, datasetThreeUrn),
new UrnArray(datasetFiveUrn, datasetThreeUrn, datasetTwoUrn),
new UrnArray(datasetFiveUrn, datasetThreeUrn, datasetTwoUrn, dataJobOneUrn),
new UrnArray(datasetFiveUrn, datasetFourUrn),
new UrnArray(datasetFiveUrn, datasetFourUrn, datasetOneUrn)));
// downstream lookup
final var downstreamLineageDataset1Hop2 = service.getLineage(datasetOneUrn, LineageDirection.DOWNSTREAM, 0, 1000, 2);
assertEquals(downstreamLineageDataset1Hop2.getTotal().intValue(), 4);
assertEquals(
getPathUrnArraysFromLineageResult(downstreamLineageDataset1Hop2),
Set.of(
new UrnArray(datasetOneUrn, dataJobOneUrn),
new UrnArray(datasetOneUrn, dataJobOneUrn, datasetTwoUrn),
new UrnArray(datasetOneUrn, datasetFourUrn),
new UrnArray(datasetOneUrn, datasetFourUrn, datasetFiveUrn)));
}
@Test
public void testGetLineageTimeFilterQuery() throws Exception {
GraphService service = getGraphService();
List<Edge> edges = Arrays.asList(
// d1 <-Consumes- dj1 -Produces-> d2 <-DownstreamOf- d3 <-DownstreamOf- d4
new Edge(dataJobOneUrn, datasetOneUrn, consumes, 1L, null, 3L, null, null),
new Edge(dataJobOneUrn, datasetTwoUrn, produces, 5L, null, 7L, null, null),
new Edge(datasetThreeUrn, datasetTwoUrn, downstreamOf, 9L, null, null, null, null),
@ -206,21 +281,76 @@ public class Neo4jGraphServiceTest extends GraphServiceTestBase {
);
edges.forEach(service::addEdge);
// no time filtering
EntityLineageResult upstreamLineageTwoHops = service.getLineage(datasetFourUrn, LineageDirection.UPSTREAM, 0, 1000, 2);
assertEquals(upstreamLineageTwoHops.getTotal().intValue(), 2);
assertEquals(upstreamLineageTwoHops.getRelationships().size(), 2);
assertEquals(
getPathUrnArraysFromLineageResult(upstreamLineageTwoHops),
Set.of(
new UrnArray(datasetFourUrn, datasetThreeUrn),
new UrnArray(datasetFourUrn, datasetThreeUrn, datasetTwoUrn)));
// with time filtering
EntityLineageResult upstreamLineageTwoHopsWithTimeFilter = service.getLineage(datasetFourUrn, LineageDirection.UPSTREAM, 0, 1000, 2, 10L, 12L);
assertEquals(upstreamLineageTwoHopsWithTimeFilter.getTotal().intValue(), 1);
assertEquals(upstreamLineageTwoHopsWithTimeFilter.getRelationships().size(), 1);
assertEquals(
getPathUrnArraysFromLineageResult(upstreamLineageTwoHopsWithTimeFilter),
Set.of(
new UrnArray(datasetFourUrn, datasetThreeUrn)));
// with time filtering
EntityLineageResult upstreamLineageTimeFilter = service.getLineage(datasetTwoUrn, LineageDirection.UPSTREAM, 0, 1000, 4, 2L, 6L);
assertEquals(upstreamLineageTimeFilter.getTotal().intValue(), 2);
assertEquals(upstreamLineageTimeFilter.getRelationships().size(), 2);
assertEquals(
getPathUrnArraysFromLineageResult(upstreamLineageTimeFilter),
Set.of(
new UrnArray(datasetTwoUrn, dataJobOneUrn),
new UrnArray(datasetTwoUrn, dataJobOneUrn, datasetOneUrn)));
// with time filtering
EntityLineageResult downstreamLineageTimeFilter = service.getLineage(datasetOneUrn, LineageDirection.DOWNSTREAM, 0, 1000, 4, 0L, 4L);
assertEquals(downstreamLineageTimeFilter.getTotal().intValue(), 1);
assertEquals(downstreamLineageTimeFilter.getRelationships().size(), 1);
assertEquals(
getPathUrnArraysFromLineageResult(downstreamLineageTimeFilter),
Set.of(
new UrnArray(datasetOneUrn, dataJobOneUrn)));
}
@Test
public void testGetLineageTimeFilteringSkipsShorterButNonMatchingPaths() {
GraphService service = getGraphService();
List<Edge> edges = Arrays.asList(
// d1 <-Consumes- dj1 -Produces-> d2 <-DownstreamOf- d3
new Edge(dataJobOneUrn, datasetOneUrn, consumes, 5L, null, 5L, null, null),
new Edge(dataJobOneUrn, datasetTwoUrn, produces, 7L, null, 7L, null, null),
new Edge(datasetThreeUrn, datasetTwoUrn, downstreamOf, 9L, null, null, null, null),
// d1 <-DownstreamOf- d3 (shorter path from d3 to d1, but with very old time)
new Edge(datasetThreeUrn, datasetOneUrn, downstreamOf, 1L, null, 2L, null, null)
);
edges.forEach(service::addEdge);
// no time filtering, shorter path from d3 to d1 is returned
EntityLineageResult upstreamLineageNoTimeFiltering = service.getLineage(datasetThreeUrn, LineageDirection.UPSTREAM, 0, 1000, 3);
assertEquals(
getPathUrnArraysFromLineageResult(upstreamLineageNoTimeFiltering),
Set.of(
new UrnArray(datasetThreeUrn, datasetTwoUrn),
new UrnArray(datasetThreeUrn, datasetTwoUrn, dataJobOneUrn),
new UrnArray(datasetThreeUrn, datasetOneUrn)));
// with time filtering, shorter path from d3 to d1 is excluded so longer path is returned
EntityLineageResult upstreamLineageTimeFiltering = service.getLineage(datasetThreeUrn, LineageDirection.UPSTREAM, 0, 1000, 3, 3L, 17L);
assertEquals(
getPathUrnArraysFromLineageResult(upstreamLineageTimeFiltering),
Set.of(
new UrnArray(datasetThreeUrn, datasetTwoUrn),
new UrnArray(datasetThreeUrn, datasetTwoUrn, dataJobOneUrn),
new UrnArray(datasetThreeUrn, datasetTwoUrn, dataJobOneUrn, datasetOneUrn)));
}
}

View File

@ -2,6 +2,8 @@ package com.linkedin.metadata.graph.neo4j;
import java.io.File;
import java.net.URI;
import apoc.path.PathExplorer;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.harness.Neo4j;
import org.neo4j.harness.Neo4jBuilder;
@ -17,7 +19,9 @@ public class Neo4jTestServerBuilder {
}
public Neo4jTestServerBuilder() {
this(new InProcessNeo4jBuilder());
this(new InProcessNeo4jBuilder()
.withProcedure(PathExplorer.class)
);
}
public Neo4jTestServerBuilder(File workingDirectory) {