mirror of
https://github.com/datahub-project/datahub.git
synced 2025-08-23 08:38:02 +00:00
Fix lineage (#4326)
This commit is contained in:
parent
7efc88e389
commit
424d357cfd
@ -52,14 +52,14 @@ public class LineageRegistry {
|
||||
// A produces B : A -> downstream (produces, OUTGOING), B -> upstream (produces, INCOMING)
|
||||
for (LineageEdge edge : lineageEdges) {
|
||||
if (edge.isUpstream()) {
|
||||
upstreamPerEntity.computeIfAbsent(edge.sourceEntity, (k) -> new HashSet<>())
|
||||
upstreamPerEntity.computeIfAbsent(edge.sourceEntity.toLowerCase(), (k) -> new HashSet<>())
|
||||
.add(new EdgeInfo(edge.type, RelationshipDirection.OUTGOING));
|
||||
downstreamPerEntity.computeIfAbsent(edge.destEntity, (k) -> new HashSet<>())
|
||||
downstreamPerEntity.computeIfAbsent(edge.destEntity.toLowerCase(), (k) -> new HashSet<>())
|
||||
.add(new EdgeInfo(edge.type, RelationshipDirection.INCOMING));
|
||||
} else {
|
||||
downstreamPerEntity.computeIfAbsent(edge.sourceEntity, (k) -> new HashSet<>())
|
||||
downstreamPerEntity.computeIfAbsent(edge.sourceEntity.toLowerCase(), (k) -> new HashSet<>())
|
||||
.add(new EdgeInfo(edge.type, RelationshipDirection.OUTGOING));
|
||||
upstreamPerEntity.computeIfAbsent(edge.destEntity, (k) -> new HashSet<>())
|
||||
upstreamPerEntity.computeIfAbsent(edge.destEntity.toLowerCase(), (k) -> new HashSet<>())
|
||||
.add(new EdgeInfo(edge.type, RelationshipDirection.INCOMING));
|
||||
}
|
||||
}
|
||||
|
@ -426,6 +426,28 @@ abstract public class GraphServiceTestBase {
|
||||
downstreamLineage = service.getLineage(datasetThreeUrn, LineageDirection.DOWNSTREAM, 0, 1000, 1);
|
||||
assertEquals(downstreamLineage.getTotal().intValue(), 0);
|
||||
assertEquals(downstreamLineage.getRelationships().size(), 0);
|
||||
|
||||
upstreamLineage = service.getLineage(dataJobOneUrn, LineageDirection.UPSTREAM, 0, 1000, 1);
|
||||
assertEquals(upstreamLineage.getTotal().intValue(), 2);
|
||||
assertEquals(upstreamLineage.getRelationships().size(), 2);
|
||||
relationships = upstreamLineage.getRelationships().stream().collect(Collectors.toMap(LineageRelationship::getEntity,
|
||||
Function.identity()));
|
||||
assertTrue(relationships.containsKey(datasetOneUrn));
|
||||
assertEquals(relationships.get(datasetOneUrn).getType(), consumes);
|
||||
assertTrue(relationships.containsKey(datasetTwoUrn));
|
||||
assertEquals(relationships.get(datasetTwoUrn).getType(), consumes);
|
||||
|
||||
downstreamLineage = service.getLineage(dataJobOneUrn, LineageDirection.DOWNSTREAM, 0, 1000, 1);
|
||||
assertEquals(downstreamLineage.getTotal().intValue(), 3);
|
||||
assertEquals(downstreamLineage.getRelationships().size(), 3);
|
||||
relationships = downstreamLineage.getRelationships().stream().collect(Collectors.toMap(LineageRelationship::getEntity,
|
||||
Function.identity()));
|
||||
assertTrue(relationships.containsKey(datasetThreeUrn));
|
||||
assertEquals(relationships.get(datasetThreeUrn).getType(), produces);
|
||||
assertTrue(relationships.containsKey(datasetFourUrn));
|
||||
assertEquals(relationships.get(datasetFourUrn).getType(), produces);
|
||||
assertTrue(relationships.containsKey(dataJobTwoUrn));
|
||||
assertEquals(relationships.get(dataJobTwoUrn).getType(), downstreamOf);
|
||||
}
|
||||
|
||||
@DataProvider(name = "FindRelatedEntitiesSourceEntityFilterTests")
|
||||
|
Loading…
x
Reference in New Issue
Block a user