feat(graph) Add createdOn, createdActor, updatedOn, updatedActor to graph edges (#6615)

This commit is contained in:
Chris Collins 2022-12-05 15:56:28 -06:00 committed by GitHub
parent 4dd66be654
commit b6887d23bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 371 additions and 68 deletions

View File

@ -20,11 +20,19 @@ public class RelationshipAnnotation {
private static final String ENTITY_TYPES_FIELD = "entityTypes";
private static final String IS_UPSTREAM_FIELD = "isUpstream";
private static final String IS_LINEAGE_FIELD = "isLineage";
private static final String CREATED_ON = "createdOn";
private static final String CREATED_ACTOR = "createdActor";
private static final String UPDATED_ON = "updatedOn";
private static final String UPDATED_ACTOR = "updatedActor";
String name;
List<String> validDestinationTypes;
boolean isUpstream;
boolean isLineage;
String createdOn;
String createdActor;
String updatedOn;
String updatedActor;
@Nonnull
public static RelationshipAnnotation fromPegasusAnnotationObject(
@ -70,7 +78,19 @@ public class RelationshipAnnotation {
final Optional<Boolean> isUpstream = AnnotationUtils.getField(map, IS_UPSTREAM_FIELD, Boolean.class);
final Optional<Boolean> isLineage = AnnotationUtils.getField(map, IS_LINEAGE_FIELD, Boolean.class);
final Optional<String> createdOn = AnnotationUtils.getField(map, CREATED_ON, String.class);
final Optional<String> createdActor = AnnotationUtils.getField(map, CREATED_ACTOR, String.class);
final Optional<String> updatedOn = AnnotationUtils.getField(map, UPDATED_ON, String.class);
final Optional<String> updatedActor = AnnotationUtils.getField(map, UPDATED_ACTOR, String.class);
return new RelationshipAnnotation(name.get(), entityTypes, isUpstream.orElse(true), isLineage.orElse(false));
}
return new RelationshipAnnotation(
name.get(),
entityTypes,
isUpstream.orElse(true),
isLineage.orElse(false),
createdOn.orElse(null),
createdActor.orElse(null),
updatedOn.orElse(null),
updatedActor.orElse(null)
); }
}

View File

@ -65,7 +65,7 @@ public class LineageRegistryTest {
boolean isUpstream, boolean isLineage) {
RelationshipFieldSpec spec = mock(RelationshipFieldSpec.class);
when(spec.getRelationshipAnnotation()).thenReturn(
new RelationshipAnnotation(relationshipType, destinationEntityTypes, isUpstream, isLineage));
new RelationshipAnnotation(relationshipType, destinationEntityTypes, isUpstream, isLineage, null, null, null, null));
return spec;
}
}

View File

@ -11,4 +11,8 @@ public class Edge {
private Urn source;
private Urn destination;
private String relationshipType;
private Long createdOn;
private Urn createdActor;
private Long updatedOn;
private Urn updatedActor;
}

View File

@ -76,6 +76,18 @@ public class ElasticSearchGraphService implements GraphService {
searchDocument.set("source", sourceObject);
searchDocument.set("destination", destinationObject);
searchDocument.put("relationshipType", edge.getRelationshipType());
if (edge.getCreatedOn() != null) {
searchDocument.put("createdOn", edge.getCreatedOn());
}
if (edge.getCreatedActor() != null) {
searchDocument.put("createdActor", edge.getCreatedActor().toString());
}
if (edge.getUpdatedOn() != null) {
searchDocument.put("updatedOn", edge.getUpdatedOn());
}
if (edge.getUpdatedActor() != null) {
searchDocument.put("updatedActor", edge.getUpdatedActor().toString());
}
return searchDocument.toString();
}

View File

@ -202,17 +202,17 @@ abstract public class GraphServiceTestBase extends AbstractTestNGSpringContextTe
GraphService service = getGraphService();
List<Edge> edges = Arrays.asList(
new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf),
new Edge(datasetThreeUrn, datasetTwoUrn, downstreamOf),
new Edge(datasetFourUrn, datasetTwoUrn, downstreamOf),
new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf, null, null, null, null),
new Edge(datasetThreeUrn, datasetTwoUrn, downstreamOf, null, null, null, null),
new Edge(datasetFourUrn, datasetTwoUrn, downstreamOf, null, null, null, null),
new Edge(datasetOneUrn, userOneUrn, hasOwner),
new Edge(datasetTwoUrn, userOneUrn, hasOwner),
new Edge(datasetThreeUrn, userTwoUrn, hasOwner),
new Edge(datasetFourUrn, userTwoUrn, hasOwner),
new Edge(datasetOneUrn, userOneUrn, hasOwner, null, null, null, null),
new Edge(datasetTwoUrn, userOneUrn, hasOwner, null, null, null, null),
new Edge(datasetThreeUrn, userTwoUrn, hasOwner, null, null, null, null),
new Edge(datasetFourUrn, userTwoUrn, hasOwner, null, null, null, null),
new Edge(userOneUrn, userTwoUrn, knowsUser),
new Edge(userTwoUrn, userOneUrn, knowsUser)
new Edge(userOneUrn, userTwoUrn, knowsUser, null, null, null, null),
new Edge(userTwoUrn, userOneUrn, knowsUser, null, null, null, null)
);
edges.forEach(service::addEdge);
@ -225,25 +225,25 @@ abstract public class GraphServiceTestBase extends AbstractTestNGSpringContextTe
GraphService service = getGraphService();
List<Edge> edges = Arrays.asList(
new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf),
new Edge(datasetThreeUrn, datasetTwoUrn, downstreamOf),
new Edge(datasetFourUrn, datasetTwoUrn, downstreamOf),
new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf, null, null, null, null),
new Edge(datasetThreeUrn, datasetTwoUrn, downstreamOf, null, null, null, null),
new Edge(datasetFourUrn, datasetTwoUrn, downstreamOf, null, null, null, null),
new Edge(datasetOneUrn, userOneUrn, hasOwner),
new Edge(datasetTwoUrn, userOneUrn, hasOwner),
new Edge(datasetThreeUrn, userTwoUrn, hasOwner),
new Edge(datasetFourUrn, userTwoUrn, hasOwner),
new Edge(datasetOneUrn, userOneUrn, hasOwner, null, null, null, null),
new Edge(datasetTwoUrn, userOneUrn, hasOwner, null, null, null, null),
new Edge(datasetThreeUrn, userTwoUrn, hasOwner, null, null, null, null),
new Edge(datasetFourUrn, userTwoUrn, hasOwner, null, null, null, null),
new Edge(userOneUrn, userTwoUrn, knowsUser),
new Edge(userTwoUrn, userOneUrn, knowsUser),
new Edge(userOneUrn, userTwoUrn, knowsUser, null, null, null, null),
new Edge(userTwoUrn, userOneUrn, knowsUser, null, null, null, null),
new Edge(dataJobOneUrn, datasetOneUrn, consumes),
new Edge(dataJobOneUrn, datasetTwoUrn, consumes),
new Edge(dataJobOneUrn, datasetThreeUrn, produces),
new Edge(dataJobOneUrn, datasetFourUrn, produces),
new Edge(dataJobTwoUrn, datasetOneUrn, consumes),
new Edge(dataJobTwoUrn, datasetTwoUrn, consumes),
new Edge(dataJobTwoUrn, dataJobOneUrn, downstreamOf)
new Edge(dataJobOneUrn, datasetOneUrn, consumes, null, null, null, null),
new Edge(dataJobOneUrn, datasetTwoUrn, consumes, null, null, null, null),
new Edge(dataJobOneUrn, datasetThreeUrn, produces, null, null, null, null),
new Edge(dataJobOneUrn, datasetFourUrn, produces, null, null, null, null),
new Edge(dataJobTwoUrn, datasetOneUrn, consumes, null, null, null, null),
new Edge(dataJobTwoUrn, datasetTwoUrn, consumes, null, null, null, null),
new Edge(dataJobTwoUrn, dataJobOneUrn, downstreamOf, null, null, null, null)
);
edges.forEach(service::addEdge);
@ -295,24 +295,24 @@ abstract public class GraphServiceTestBase extends AbstractTestNGSpringContextTe
Arrays.asList()
},
new Object[]{
Arrays.asList(new Edge(datasetOneUrn, datasetTwoUrn, downstreamOf)),
Arrays.asList(new Edge(datasetOneUrn, datasetTwoUrn, downstreamOf, null, null, null, null)),
Arrays.asList(downstreamOfDatasetTwoRelatedEntity),
Arrays.asList(downstreamOfDatasetOneRelatedEntity)
},
new Object[]{
Arrays.asList(
new Edge(datasetOneUrn, datasetTwoUrn, downstreamOf),
new Edge(datasetTwoUrn, datasetThreeUrn, downstreamOf)
new Edge(datasetOneUrn, datasetTwoUrn, downstreamOf, null, null, null, null),
new Edge(datasetTwoUrn, datasetThreeUrn, downstreamOf, null, null, null, null)
),
Arrays.asList(downstreamOfDatasetTwoRelatedEntity, downstreamOfDatasetThreeRelatedEntity),
Arrays.asList(downstreamOfDatasetOneRelatedEntity, downstreamOfDatasetTwoRelatedEntity)
},
new Object[]{
Arrays.asList(
new Edge(datasetOneUrn, datasetTwoUrn, downstreamOf),
new Edge(datasetOneUrn, userOneUrn, hasOwner),
new Edge(datasetTwoUrn, userTwoUrn, hasOwner),
new Edge(userOneUrn, userTwoUrn, knowsUser)
new Edge(datasetOneUrn, datasetTwoUrn, downstreamOf, null, null, null, null),
new Edge(datasetOneUrn, userOneUrn, hasOwner, null, null, null, null),
new Edge(datasetTwoUrn, userTwoUrn, hasOwner, null, null, null, null),
new Edge(userOneUrn, userTwoUrn, knowsUser, null, null, null, null)
),
Arrays.asList(
downstreamOfDatasetTwoRelatedEntity,
@ -328,9 +328,9 @@ abstract public class GraphServiceTestBase extends AbstractTestNGSpringContextTe
},
new Object[]{
Arrays.asList(
new Edge(userOneUrn, userOneUrn, knowsUser),
new Edge(userOneUrn, userOneUrn, knowsUser),
new Edge(userOneUrn, userOneUrn, knowsUser)
new Edge(userOneUrn, userOneUrn, knowsUser, null, null, null, null),
new Edge(userOneUrn, userOneUrn, knowsUser, null, null, null, null),
new Edge(userOneUrn, userOneUrn, knowsUser, null, null, null, null)
),
Arrays.asList(knowsUserOneRelatedEntity),
Arrays.asList(knowsUserOneRelatedEntity)
@ -922,12 +922,12 @@ abstract public class GraphServiceTestBase extends AbstractTestNGSpringContextTe
doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service);
doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service);
service.addEdge(new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf));
service.addEdge(new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf, null, null, null, null));
syncAfterWrite();
doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service);
doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service, downstreamOfDatasetOneRelatedEntity);
service.addEdge(new Edge(datasetOneUrn, nullUrn, downstreamOf));
service.addEdge(new Edge(datasetOneUrn, nullUrn, downstreamOf, null, null, null, null));
syncAfterWrite();
doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service, nullRelatedEntity);
doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service, nullRelatedEntity, downstreamOfDatasetOneRelatedEntity);
@ -944,12 +944,12 @@ abstract public class GraphServiceTestBase extends AbstractTestNGSpringContextTe
doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service);
doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service);
service.addEdge(new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf));
service.addEdge(new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf, null, null, null, null));
syncAfterWrite();
doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service);
doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service, downstreamOfDatasetOneRelatedEntity);
service.addEdge(new Edge(datasetOneUrn, nullUrn, downstreamOf));
service.addEdge(new Edge(datasetOneUrn, nullUrn, downstreamOf, null, null, null, null));
syncAfterWrite();
doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service, nullRelatedEntity);
doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service, nullRelatedEntity, downstreamOfDatasetOneRelatedEntity);
@ -1424,7 +1424,7 @@ abstract public class GraphServiceTestBase extends AbstractTestNGSpringContextTe
int destinationType = destinationNode % 3;
Urn destination = createFromString("urn:li:type" + destinationType + ":(urn:li:node" + destinationNode + ")");
edges.add(new Edge(source, destination, relationship));
edges.add(new Edge(source, destination, relationship, null, null, null, null));
}
}
}

View File

@ -188,7 +188,7 @@ public class ElasticSearchGraphServiceTest extends GraphServiceTestBase {
public void testRemoveEdge() throws Exception {
DatasetUrn datasetUrn = new DatasetUrn(new DataPlatformUrn("snowflake"), "test", FabricType.TEST);
TagUrn tagUrn = new TagUrn("newTag");
Edge edge = new Edge(datasetUrn, tagUrn, TAG_RELATIONSHIP);
Edge edge = new Edge(datasetUrn, tagUrn, TAG_RELATIONSHIP, null, null, null, null);
getGraphService().addEdge(edge);
syncAfterWrite();
RelatedEntitiesResult result = getGraphService().findRelatedEntities(Collections.singletonList(datasetType),

View File

@ -0,0 +1,129 @@
package com.linkedin.metadata.kafka.hook;
import com.datahub.util.RecordUtils;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.schema.PathSpec;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.graph.Edge;
import com.linkedin.metadata.models.RelationshipFieldSpec;
import com.linkedin.mxe.MetadataChangeLog;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@Slf4j
public class GraphIndexUtils {
private GraphIndexUtils() { }
@Nullable
private static List<Urn> getActorList(@Nullable final String path, @Nonnull final RecordTemplate aspect) {
if (path == null) {
return null;
}
final PathSpec actorPathSpec = new PathSpec(path.split("/"));
final Optional<Object> value = RecordUtils.getFieldValue(aspect, actorPathSpec);
return (List<Urn>) value.orElse(null);
}
@Nullable
private static List<Long> getTimestampList(@Nullable final String path, @Nonnull final RecordTemplate aspect) {
if (path == null) {
return null;
}
final PathSpec timestampPathSpec = new PathSpec(path.split("/"));
final Optional<Object> value = RecordUtils.getFieldValue(aspect, timestampPathSpec);
return (List<Long>) value.orElse(null);
}
@Nullable
private static boolean isValueListValid(@Nullable final List<?> entryList, final int valueListSize) {
if (entryList == null) {
log.warn("Unable to get entry as entryList is null");
return false;
}
if (valueListSize != entryList.size()) {
log.warn("Unable to get entry for graph edge as values list and entry list have differing sizes");
return false;
}
return true;
}
@Nullable
private static Long getTimestamp(@Nullable final List<Long> timestampList, final int index, final int valueListSize) {
if (isValueListValid(timestampList, valueListSize)) {
return timestampList.get(index);
}
return null;
}
@Nullable
private static Urn getActor(@Nullable final List<Urn> actorList, final int index, final int valueListSize) {
if (isValueListValid(actorList, valueListSize)) {
return actorList.get(index);
}
return null;
}
/**
* Used to create new edges for the graph db, adding all the metadata associated with each edge based on the aspect.
* Returns a list of Edges to be consumed by the graph service.
*/
@Nonnull
public static List<Edge> extractGraphEdges(
@Nonnull final Map.Entry<RelationshipFieldSpec, List<Object>> extractedFieldsEntry,
@Nonnull final RecordTemplate aspect,
@Nonnull final Urn urn,
@Nonnull final MetadataChangeLog event
) {
final List<Edge> edgesToAdd = new ArrayList<>();
final String createdOnPath = extractedFieldsEntry.getKey().getRelationshipAnnotation().getCreatedOn();
final String createdActorPath = extractedFieldsEntry.getKey().getRelationshipAnnotation().getCreatedActor();
final String updatedOnPath = extractedFieldsEntry.getKey().getRelationshipAnnotation().getUpdatedOn();
final String updatedActorPath = extractedFieldsEntry.getKey().getRelationshipAnnotation().getUpdatedActor();
final List<Long> createdOnList = getTimestampList(createdOnPath, aspect);
final List<Urn> createdActorList = getActorList(createdActorPath, aspect);
final List<Long> updatedOnList = getTimestampList(updatedOnPath, aspect);
final List<Urn> updatedActorList = getActorList(updatedActorPath, aspect);
int index = 0;
for (Object fieldValue : extractedFieldsEntry.getValue()) {
Long createdOn = getTimestamp(createdOnList, index, extractedFieldsEntry.getValue().size());
Urn createdActor = getActor(createdActorList, index, extractedFieldsEntry.getValue().size());
final Long updatedOn = getTimestamp(updatedOnList, index, extractedFieldsEntry.getValue().size());
final Urn updatedActor = getActor(updatedActorList, index, extractedFieldsEntry.getValue().size());
if (createdOn == null && event.hasSystemMetadata()) {
createdOn = event.getSystemMetadata().getLastObserved();
}
if (createdActor == null && event.hasCreated()) {
createdActor = event.getCreated().getActor();
}
try {
edgesToAdd.add(
new Edge(
urn,
Urn.createFromString(fieldValue.toString()),
extractedFieldsEntry.getKey().getRelationshipName(),
createdOn,
createdActor,
updatedOn,
updatedActor
)
);
} catch (URISyntaxException e) {
log.error("Invalid destination urn: {}", fieldValue.toString(), e);
}
index++;
}
return edgesToAdd;
}
}

View File

@ -43,7 +43,6 @@ import com.linkedin.mxe.MetadataChangeLog;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.util.Pair;
import java.io.UnsupportedEncodingException;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.HashMap;
@ -170,9 +169,9 @@ public class UpdateIndicesHook implements MetadataChangeLogHook {
// Step 2. For all aspects, attempt to update Graph
if (_diffMode) {
updateGraphServiceDiff(urn, aspectSpec, previousAspect, aspect);
updateGraphServiceDiff(urn, aspectSpec, previousAspect, aspect, event);
} else {
updateGraphService(urn, aspectSpec, aspect);
updateGraphService(urn, aspectSpec, aspect, event);
}
}
@ -212,11 +211,12 @@ public class UpdateIndicesHook implements MetadataChangeLogHook {
if (!aspectSpec.isTimeseries()) {
deleteSystemMetadata(urn, aspectSpec, isDeletingKey);
deleteGraphData(urn, aspectSpec, aspect, isDeletingKey);
deleteGraphData(urn, aspectSpec, aspect, isDeletingKey, event);
deleteSearchData(urn, entitySpec.getName(), aspectSpec, aspect, isDeletingKey);
}
}
// TODO: remove this method once we implement sourceOverride when creating graph edges
private void updateFineGrainedEdgesAndRelationships(
RecordTemplate aspect,
List<Edge> edgesToAdd,
@ -231,7 +231,8 @@ public class UpdateIndicesHook implements MetadataChangeLogHook {
// for every downstream, create an edge with each of the upstreams
for (Urn downstream : fineGrainedLineage.getDownstreams()) {
for (Urn upstream : fineGrainedLineage.getUpstreams()) {
edgesToAdd.add(new Edge(downstream, upstream, DOWNSTREAM_OF));
// TODO: add edges uniformly across aspects
edgesToAdd.add(new Edge(downstream, upstream, DOWNSTREAM_OF, null, null, null, null));
Set<String> relationshipTypes = urnToRelationshipTypesBeingAdded.getOrDefault(downstream, new HashSet<>());
relationshipTypes.add(DOWNSTREAM_OF);
urnToRelationshipTypesBeingAdded.put(downstream, relationshipTypes);
@ -248,6 +249,7 @@ public class UpdateIndicesHook implements MetadataChangeLogHook {
return EntityKeyUtils.convertEntityKeyToUrn(key, Constants.SCHEMA_FIELD_ENTITY_NAME);
}
// TODO: remove this method once we implement sourceOverride and update inputFields aspect
private void updateInputFieldEdgesAndRelationships(
@Nonnull final Urn urn,
@Nonnull final InputFields inputFields,
@ -258,7 +260,8 @@ public class UpdateIndicesHook implements MetadataChangeLogHook {
for (final InputField field : inputFields.getFields()) {
if (field.hasSchemaFieldUrn() && field.hasSchemaField() && field.getSchemaField().hasFieldPath()) {
final Urn sourceFieldUrn = generateSchemaFieldUrn(urn.toString(), field.getSchemaField().getFieldPath());
edgesToAdd.add(new Edge(sourceFieldUrn, field.getSchemaFieldUrn(), DOWNSTREAM_OF));
// TODO: add edges uniformly across aspects
edgesToAdd.add(new Edge(sourceFieldUrn, field.getSchemaFieldUrn(), DOWNSTREAM_OF, null, null, null, null));
final Set<String> relationshipTypes = urnToRelationshipTypesBeingAdded.getOrDefault(sourceFieldUrn, new HashSet<>());
relationshipTypes.add(DOWNSTREAM_OF);
urnToRelationshipTypesBeingAdded.put(sourceFieldUrn, relationshipTypes);
@ -267,7 +270,12 @@ public class UpdateIndicesHook implements MetadataChangeLogHook {
}
}
private Pair<List<Edge>, HashMap<Urn, Set<String>>> getEdgesAndRelationshipTypesFromAspect(Urn urn, AspectSpec aspectSpec, @Nonnull RecordTemplate aspect) {
private Pair<List<Edge>, HashMap<Urn, Set<String>>> getEdgesAndRelationshipTypesFromAspect(
@Nonnull final Urn urn,
@Nonnull final AspectSpec aspectSpec,
@Nonnull final RecordTemplate aspect,
@Nonnull final MetadataChangeLog event
) {
final List<Edge> edgesToAdd = new ArrayList<>();
final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded = new HashMap<>();
@ -288,14 +296,8 @@ public class UpdateIndicesHook implements MetadataChangeLogHook {
Set<String> relationshipTypes = urnToRelationshipTypesBeingAdded.getOrDefault(urn, new HashSet<>());
relationshipTypes.add(entry.getKey().getRelationshipName());
urnToRelationshipTypesBeingAdded.put(urn, relationshipTypes);
for (Object fieldValue : entry.getValue()) {
try {
edgesToAdd.add(
new Edge(urn, Urn.createFromString(fieldValue.toString()), entry.getKey().getRelationshipName()));
} catch (URISyntaxException e) {
log.error("Invalid destination urn: {}", fieldValue.toString(), e);
}
}
final List<Edge> newEdges = GraphIndexUtils.extractGraphEdges(entry, aspect, urn, event);
edgesToAdd.addAll(newEdges);
}
return Pair.of(edgesToAdd, urnToRelationshipTypesBeingAdded);
}
@ -303,9 +305,14 @@ public class UpdateIndicesHook implements MetadataChangeLogHook {
/**
* Process snapshot and update graph index
*/
private void updateGraphService(Urn urn, AspectSpec aspectSpec, RecordTemplate aspect) {
private void updateGraphService(
@Nonnull final Urn urn,
@Nonnull final AspectSpec aspectSpec,
@Nonnull final RecordTemplate aspect,
@Nonnull final MetadataChangeLog event
) {
Pair<List<Edge>, HashMap<Urn, Set<String>>> edgeAndRelationTypes =
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect);
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event);
final List<Edge> edgesToAdd = edgeAndRelationTypes.getFirst();
final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded = edgeAndRelationTypes.getSecond();
@ -320,17 +327,23 @@ public class UpdateIndicesHook implements MetadataChangeLogHook {
}
}
private void updateGraphServiceDiff(Urn urn, AspectSpec aspectSpec, @Nullable RecordTemplate oldAspect, @Nonnull RecordTemplate newAspect) {
private void updateGraphServiceDiff(
@Nonnull final Urn urn,
@Nonnull final AspectSpec aspectSpec,
@Nullable final RecordTemplate oldAspect,
@Nonnull final RecordTemplate newAspect,
@Nonnull final MetadataChangeLog event
) {
Pair<List<Edge>, HashMap<Urn, Set<String>>> oldEdgeAndRelationTypes = null;
if (oldAspect != null) {
oldEdgeAndRelationTypes = getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, oldAspect);
oldEdgeAndRelationTypes = getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, oldAspect, event);
}
final List<Edge> oldEdges = oldEdgeAndRelationTypes != null ? oldEdgeAndRelationTypes.getFirst() : Collections.emptyList();
final Set<Edge> oldEdgeSet = new HashSet<>(oldEdges);
Pair<List<Edge>, HashMap<Urn, Set<String>>> newEdgeAndRelationTypes =
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, newAspect);
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, newAspect, event);
final List<Edge> newEdges = newEdgeAndRelationTypes.getFirst();
final Set<Edge> newEdgeSet = new HashSet<>(newEdges);
@ -419,14 +432,20 @@ public class UpdateIndicesHook implements MetadataChangeLogHook {
}
}
private void deleteGraphData(Urn urn, AspectSpec aspectSpec, RecordTemplate aspect, Boolean isKeyAspect) {
private void deleteGraphData(
@Nonnull final Urn urn,
@Nonnull final AspectSpec aspectSpec,
@Nonnull final RecordTemplate aspect,
@Nonnull final Boolean isKeyAspect,
@Nonnull final MetadataChangeLog event
) {
if (isKeyAspect) {
_graphService.removeNode(urn);
return;
}
Pair<List<Edge>, HashMap<Urn, Set<String>>> edgeAndRelationTypes =
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect);
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event);
final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded = edgeAndRelationTypes.getSecond();
if (urnToRelationshipTypesBeingAdded.size() > 0) {

View File

@ -0,0 +1,119 @@
package com.linkedin.metadata.kafka.hook;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.dataset.DatasetLineageType;
import com.linkedin.dataset.Upstream;
import com.linkedin.dataset.UpstreamArray;
import com.linkedin.dataset.UpstreamLineage;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.graph.Edge;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.RelationshipFieldSpec;
import com.linkedin.metadata.models.extractor.FieldExtractor;
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeLog;
import com.linkedin.mxe.SystemMetadata;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class GraphIndexUtilsTest {
private static final String UPSTREAM_RELATIONSHIP_PATH = "/upstreams/*/dataset";
private static final long CREATED_EVENT_TIME = 123L;
private static final long UPDATED_EVENT_TIME = 234L;
private Urn _datasetUrn;
private DatasetUrn _upstreamDataset1;
private DatasetUrn _upstreamDataset2;
private static final String CREATED_ACTOR_URN = "urn:li:corpuser:creating";
private static final String UPDATED_ACTOR_URN = "urn:li:corpuser:updating";
private EntityRegistry _mockRegistry;
private Urn _createdActorUrn;
private Urn _updatedActorUrn;
@BeforeMethod
public void setupTest() {
_createdActorUrn = UrnUtils.getUrn(CREATED_ACTOR_URN);
_updatedActorUrn = UrnUtils.getUrn(UPDATED_ACTOR_URN);
_datasetUrn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-proj.jaffle_shop.customers,PROD)");
_upstreamDataset1 = UrnUtils.toDatasetUrn("snowflake", "test", "DEV");
_upstreamDataset2 = UrnUtils.toDatasetUrn("snowflake", "test2", "DEV");
_mockRegistry = new ConfigEntityRegistry(
UpdateIndicesHookTest.class.getClassLoader().getResourceAsStream("test-entity-registry.yml"));
}
@Test
public void testExtractGraphEdgesDefault() {
UpstreamLineage upstreamLineage = createUpstreamLineage();
MetadataChangeLog event = createMCL(upstreamLineage);
EntitySpec entitySpec = _mockRegistry.getEntitySpec(event.getEntityType());
AspectSpec aspectSpec = entitySpec.getAspectSpec(event.getAspectName());
Map<RelationshipFieldSpec, List<Object>> extractedFields =
FieldExtractor.extractFields(upstreamLineage, aspectSpec.getRelationshipFieldSpecs());
for (Map.Entry<RelationshipFieldSpec, List<Object>> entry : extractedFields.entrySet()) {
// check specifically for the upstreams relationship entry
if (entry.getKey().getPath().toString().equals(UPSTREAM_RELATIONSHIP_PATH)) {
List<Edge> edgesToAdd = GraphIndexUtils.extractGraphEdges(entry, upstreamLineage, _datasetUrn, event);
List<Edge> expectedEdgesToAdd = new ArrayList<>();
// edges contain default created event time and created actor from system metadata
Edge edge1 = new Edge(_datasetUrn, _upstreamDataset1, entry.getKey().getRelationshipName(), CREATED_EVENT_TIME, _createdActorUrn, null, null);
Edge edge2 = new Edge(_datasetUrn, _upstreamDataset2, entry.getKey().getRelationshipName(), CREATED_EVENT_TIME, _createdActorUrn, null, null);
expectedEdgesToAdd.add(edge1);
expectedEdgesToAdd.add(edge2);
Assert.assertEquals(expectedEdgesToAdd.size(), edgesToAdd.size());
Assert.assertTrue(edgesToAdd.containsAll(expectedEdgesToAdd));
Assert.assertTrue(expectedEdgesToAdd.containsAll(edgesToAdd));
}
}
}
private UpstreamLineage createUpstreamLineage() {
UpstreamLineage upstreamLineage = new UpstreamLineage();
UpstreamArray upstreams = new UpstreamArray();
Upstream upstream1 = new Upstream();
upstream1.setDataset(_upstreamDataset1);
upstream1.setAuditStamp(new AuditStamp().setActor(_updatedActorUrn).setTime(UPDATED_EVENT_TIME));
upstream1.setType(DatasetLineageType.TRANSFORMED);
Upstream upstream2 = new Upstream();
upstream2.setDataset(_upstreamDataset2);
upstream2.setAuditStamp(new AuditStamp().setActor(_updatedActorUrn).setTime(UPDATED_EVENT_TIME));
upstream2.setType(DatasetLineageType.TRANSFORMED);
upstreams.add(upstream1);
upstreams.add(upstream2);
upstreamLineage.setUpstreams(upstreams);
return upstreamLineage;
}
private MetadataChangeLog createMCL(RecordTemplate aspect) {
MetadataChangeLog event = new MetadataChangeLog();
event.setEntityType(Constants.DATASET_ENTITY_NAME);
event.setAspectName(Constants.UPSTREAM_LINEAGE_ASPECT_NAME);
event.setChangeType(ChangeType.UPSERT);
event.setAspect(GenericRecordUtils.serializeAspect(aspect));
event.setEntityUrn(_datasetUrn);
SystemMetadata systemMetadata = new SystemMetadata();
systemMetadata.setLastObserved(CREATED_EVENT_TIME);
event.setSystemMetadata(systemMetadata);
event.setCreated(new AuditStamp().setActor(_createdActorUrn).setTime(CREATED_EVENT_TIME));
return event;
}
}

View File

@ -89,7 +89,7 @@ public class UpdateIndicesHookTest {
MetadataChangeLog event = createUpstreamLineageMCL(upstreamUrn, downstreamUrn);
_updateIndicesHook.invoke(event);
Edge edge = new Edge(downstreamUrn, upstreamUrn, DOWNSTREAM_OF);
Edge edge = new Edge(downstreamUrn, upstreamUrn, DOWNSTREAM_OF, null, null, null, null);
Mockito.verify(_mockGraphService, Mockito.times(1)).addEdge(Mockito.eq(edge));
Mockito.verify(_mockGraphService, Mockito.times(1)).removeEdgesFromNode(
Mockito.eq(downstreamUrn),
@ -117,7 +117,7 @@ public class UpdateIndicesHookTest {
Urn downstreamUrn = UrnUtils.getUrn(String.format("urn:li:schemaField:(%s,%s)", TEST_CHART_URN, downstreamFieldPath));
Edge edge = new Edge(downstreamUrn, upstreamUrn, DOWNSTREAM_OF);
Edge edge = new Edge(downstreamUrn, upstreamUrn, DOWNSTREAM_OF, null, null, null, null);
Mockito.verify(_mockGraphService, Mockito.times(1)).addEdge(Mockito.eq(edge));
Mockito.verify(_mockGraphService, Mockito.times(1)).removeEdgesFromNode(
Mockito.eq(downstreamUrn),