feat(lineage): removing dataset<>dataset edge in job index builder (#2501)

This commit is contained in:
Gabe Lyons 2021-05-06 07:52:19 -07:00 committed by GitHub
parent 3edbdcb3e9
commit 095ca397eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 6 additions and 39 deletions

View File

@ -3,7 +3,6 @@ package com.linkedin.metadata.builders.graph.relationship;
import com.linkedin.datajob.DataJobInputOutput; import com.linkedin.datajob.DataJobInputOutput;
import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.builders.graph.GraphBuilder; import com.linkedin.metadata.builders.graph.GraphBuilder;
import com.linkedin.metadata.relationship.DownstreamOf;
import com.linkedin.metadata.relationship.Consumes; import com.linkedin.metadata.relationship.Consumes;
import com.linkedin.metadata.relationship.Produces; import com.linkedin.metadata.relationship.Produces;
@ -23,12 +22,6 @@ public class RelationshipBuilderFromDataJobInputOutput extends BaseRelationshipB
@Nonnull @Nonnull
@Override @Override
public List<GraphBuilder.RelationshipUpdates> buildRelationships(@Nonnull Urn urn, @Nonnull DataJobInputOutput inputOutput) { public List<GraphBuilder.RelationshipUpdates> buildRelationships(@Nonnull Urn urn, @Nonnull DataJobInputOutput inputOutput) {
final List<DownstreamOf> downstreamEdges = inputOutput.getInputDatasets()
.stream()
.flatMap(upstreamDataset -> inputOutput.getOutputDatasets().stream()
.map(downstreamDataset -> new DownstreamOf().setSource(downstreamDataset).setDestination(upstreamDataset)))
.collect(Collectors.toList());
final List<Consumes> inputsList = inputOutput.getInputDatasets() final List<Consumes> inputsList = inputOutput.getInputDatasets()
.stream() .stream()
.map(inputDataset -> new Consumes().setSource(urn).setDestination(inputDataset)) .map(inputDataset -> new Consumes().setSource(urn).setDestination(inputDataset))
@ -40,7 +33,6 @@ public class RelationshipBuilderFromDataJobInputOutput extends BaseRelationshipB
.collect(Collectors.toList()); .collect(Collectors.toList());
return Arrays.asList( return Arrays.asList(
new GraphBuilder.RelationshipUpdates(downstreamEdges, REMOVE_ALL_EDGES_FROM_SOURCE),
new GraphBuilder.RelationshipUpdates(inputsList, REMOVE_ALL_EDGES_FROM_SOURCE), new GraphBuilder.RelationshipUpdates(inputsList, REMOVE_ALL_EDGES_FROM_SOURCE),
new GraphBuilder.RelationshipUpdates(outputsList, REMOVE_ALL_EDGES_FROM_SOURCE)); new GraphBuilder.RelationshipUpdates(outputsList, REMOVE_ALL_EDGES_FROM_SOURCE));
} }

View File

@ -26,50 +26,26 @@ public class RelationshipBuilderFromDataJobInputOutputTest {
List<GraphBuilder.RelationshipUpdates> operations = List<GraphBuilder.RelationshipUpdates> operations =
new RelationshipBuilderFromDataJobInputOutput().buildRelationships(job, inputOutput); new RelationshipBuilderFromDataJobInputOutput().buildRelationships(job, inputOutput);
assertEquals(operations.size(), 3); assertEquals(operations.size(), 2);
assertEquals(operations.get(0).getRelationships().size(), 4); assertEquals(operations.get(0).getRelationships().size(), 2);
assertEquals( assertEquals(
operations.get(0).getRelationships().get(0), operations.get(0).getRelationships().get(0),
makeDownstreamOf( makeConsumes(job, makeDatasetUrn("input1")));
makeDatasetUrn("output1"),
makeDatasetUrn("input1")));
assertEquals( assertEquals(
operations.get(0).getRelationships().get(1), operations.get(0).getRelationships().get(1),
makeDownstreamOf( makeConsumes(job, makeDatasetUrn("input2")));
makeDatasetUrn("output2"),
makeDatasetUrn("input1")));
assertEquals(
operations.get(0).getRelationships().get(2),
makeDownstreamOf(
makeDatasetUrn("output1"),
makeDatasetUrn("input2")));
assertEquals(
operations.get(0).getRelationships().get(3),
makeDownstreamOf(
makeDatasetUrn("output2"),
makeDatasetUrn("input2")));
assertEquals(operations.get(0).getPreUpdateOperation(), assertEquals(operations.get(0).getPreUpdateOperation(),
BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE); BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE);
assertEquals(operations.get(1).getRelationships().size(), 2); assertEquals(operations.get(1).getRelationships().size(), 2);
assertEquals( assertEquals(
operations.get(1).getRelationships().get(0), operations.get(1).getRelationships().get(0),
makeConsumes(job, makeDatasetUrn("input1")));
assertEquals(
operations.get(1).getRelationships().get(1),
makeConsumes(job, makeDatasetUrn("input2")));
assertEquals(operations.get(1).getPreUpdateOperation(),
BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE);
assertEquals(operations.get(2).getRelationships().size(), 2);
assertEquals(
operations.get(2).getRelationships().get(0),
makeProduces(job, makeDatasetUrn("output1"))); makeProduces(job, makeDatasetUrn("output1")));
assertEquals( assertEquals(
operations.get(2).getRelationships().get(1), operations.get(1).getRelationships().get(1),
makeProduces(job, makeDatasetUrn("output2"))); makeProduces(job, makeDatasetUrn("output2")));
assertEquals(operations.get(2).getPreUpdateOperation(), assertEquals(operations.get(1).getPreUpdateOperation(),
BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE); BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE);
} }
@ -93,4 +69,3 @@ public class RelationshipBuilderFromDataJobInputOutputTest {
} }
} }