diff --git a/entity-registry/src/main/java/com/linkedin/metadata/models/annotation/RelationshipAnnotation.java b/entity-registry/src/main/java/com/linkedin/metadata/models/annotation/RelationshipAnnotation.java index bbdf0fa071..34de3f0eb4 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/models/annotation/RelationshipAnnotation.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/models/annotation/RelationshipAnnotation.java @@ -20,11 +20,19 @@ public class RelationshipAnnotation { private static final String ENTITY_TYPES_FIELD = "entityTypes"; private static final String IS_UPSTREAM_FIELD = "isUpstream"; private static final String IS_LINEAGE_FIELD = "isLineage"; + private static final String CREATED_ON = "createdOn"; + private static final String CREATED_ACTOR = "createdActor"; + private static final String UPDATED_ON = "updatedOn"; + private static final String UPDATED_ACTOR = "updatedActor"; String name; List validDestinationTypes; boolean isUpstream; boolean isLineage; + String createdOn; + String createdActor; + String updatedOn; + String updatedActor; @Nonnull public static RelationshipAnnotation fromPegasusAnnotationObject( @@ -70,7 +78,19 @@ public class RelationshipAnnotation { final Optional isUpstream = AnnotationUtils.getField(map, IS_UPSTREAM_FIELD, Boolean.class); final Optional isLineage = AnnotationUtils.getField(map, IS_LINEAGE_FIELD, Boolean.class); + final Optional createdOn = AnnotationUtils.getField(map, CREATED_ON, String.class); + final Optional createdActor = AnnotationUtils.getField(map, CREATED_ACTOR, String.class); + final Optional updatedOn = AnnotationUtils.getField(map, UPDATED_ON, String.class); + final Optional updatedActor = AnnotationUtils.getField(map, UPDATED_ACTOR, String.class); - return new RelationshipAnnotation(name.get(), entityTypes, isUpstream.orElse(true), isLineage.orElse(false)); - } + return new RelationshipAnnotation( + name.get(), + entityTypes, + isUpstream.orElse(true), + isLineage.orElse(false), + createdOn.orElse(null), + createdActor.orElse(null), + updatedOn.orElse(null), + updatedActor.orElse(null) + ); } } \ No newline at end of file diff --git a/entity-registry/src/test/java/com/linkedin/metadata/models/registry/LineageRegistryTest.java b/entity-registry/src/test/java/com/linkedin/metadata/models/registry/LineageRegistryTest.java index db9c3caa17..cb1af11e97 100644 --- a/entity-registry/src/test/java/com/linkedin/metadata/models/registry/LineageRegistryTest.java +++ b/entity-registry/src/test/java/com/linkedin/metadata/models/registry/LineageRegistryTest.java @@ -65,7 +65,7 @@ public class LineageRegistryTest { boolean isUpstream, boolean isLineage) { RelationshipFieldSpec spec = mock(RelationshipFieldSpec.class); when(spec.getRelationshipAnnotation()).thenReturn( - new RelationshipAnnotation(relationshipType, destinationEntityTypes, isUpstream, isLineage)); + new RelationshipAnnotation(relationshipType, destinationEntityTypes, isUpstream, isLineage, null, null, null, null)); return spec; } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/Edge.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/Edge.java index 853f98ef9d..11e93b233b 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/Edge.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/Edge.java @@ -11,4 +11,8 @@ public class Edge { private Urn source; private Urn destination; private String relationshipType; + private Long createdOn; + private Urn createdActor; + private Long updatedOn; + private Urn updatedActor; } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java index a32ff7e8ef..074d552418 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java @@ -76,6 +76,18 @@ public class ElasticSearchGraphService implements GraphService { searchDocument.set("source", sourceObject); searchDocument.set("destination", destinationObject); searchDocument.put("relationshipType", edge.getRelationshipType()); + if (edge.getCreatedOn() != null) { + searchDocument.put("createdOn", edge.getCreatedOn()); + } + if (edge.getCreatedActor() != null) { + searchDocument.put("createdActor", edge.getCreatedActor().toString()); + } + if (edge.getUpdatedOn() != null) { + searchDocument.put("updatedOn", edge.getUpdatedOn()); + } + if (edge.getUpdatedActor() != null) { + searchDocument.put("updatedActor", edge.getUpdatedActor().toString()); + } return searchDocument.toString(); } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java index f0cbdfdad3..7e08cf12f9 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java @@ -202,17 +202,17 @@ abstract public class GraphServiceTestBase extends AbstractTestNGSpringContextTe GraphService service = getGraphService(); List edges = Arrays.asList( - new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf), - new Edge(datasetThreeUrn, datasetTwoUrn, downstreamOf), - new Edge(datasetFourUrn, datasetTwoUrn, downstreamOf), + new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf, null, null, null, null), + new Edge(datasetThreeUrn, datasetTwoUrn, downstreamOf, null, null, null, null), + new Edge(datasetFourUrn, datasetTwoUrn, downstreamOf, null, null, null, null), - new Edge(datasetOneUrn, userOneUrn, hasOwner), - new Edge(datasetTwoUrn, userOneUrn, hasOwner), - new Edge(datasetThreeUrn, userTwoUrn, hasOwner), - new Edge(datasetFourUrn, userTwoUrn, hasOwner), + new Edge(datasetOneUrn, userOneUrn, hasOwner, null, null, null, null), + new Edge(datasetTwoUrn, userOneUrn, hasOwner, null, null, null, null), + new Edge(datasetThreeUrn, userTwoUrn, hasOwner, null, null, null, null), + new Edge(datasetFourUrn, userTwoUrn, hasOwner, null, null, null, null), - new Edge(userOneUrn, userTwoUrn, knowsUser), - new Edge(userTwoUrn, userOneUrn, knowsUser) + new Edge(userOneUrn, userTwoUrn, knowsUser, null, null, null, null), + new Edge(userTwoUrn, userOneUrn, knowsUser, null, null, null, null) ); edges.forEach(service::addEdge); @@ -225,25 +225,25 @@ abstract public class GraphServiceTestBase extends AbstractTestNGSpringContextTe GraphService service = getGraphService(); List edges = Arrays.asList( - new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf), - new Edge(datasetThreeUrn, datasetTwoUrn, downstreamOf), - new Edge(datasetFourUrn, datasetTwoUrn, downstreamOf), + new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf, null, null, null, null), + new Edge(datasetThreeUrn, datasetTwoUrn, downstreamOf, null, null, null, null), + new Edge(datasetFourUrn, datasetTwoUrn, downstreamOf, null, null, null, null), - new Edge(datasetOneUrn, userOneUrn, hasOwner), - new Edge(datasetTwoUrn, userOneUrn, hasOwner), - new Edge(datasetThreeUrn, userTwoUrn, hasOwner), - new Edge(datasetFourUrn, userTwoUrn, hasOwner), + new Edge(datasetOneUrn, userOneUrn, hasOwner, null, null, null, null), + new Edge(datasetTwoUrn, userOneUrn, hasOwner, null, null, null, null), + new Edge(datasetThreeUrn, userTwoUrn, hasOwner, null, null, null, null), + new Edge(datasetFourUrn, userTwoUrn, hasOwner, null, null, null, null), - new Edge(userOneUrn, userTwoUrn, knowsUser), - new Edge(userTwoUrn, userOneUrn, knowsUser), + new Edge(userOneUrn, userTwoUrn, knowsUser, null, null, null, null), + new Edge(userTwoUrn, userOneUrn, knowsUser, null, null, null, null), - new Edge(dataJobOneUrn, datasetOneUrn, consumes), - new Edge(dataJobOneUrn, datasetTwoUrn, consumes), - new Edge(dataJobOneUrn, datasetThreeUrn, produces), - new Edge(dataJobOneUrn, datasetFourUrn, produces), - new Edge(dataJobTwoUrn, datasetOneUrn, consumes), - new Edge(dataJobTwoUrn, datasetTwoUrn, consumes), - new Edge(dataJobTwoUrn, dataJobOneUrn, downstreamOf) + new Edge(dataJobOneUrn, datasetOneUrn, consumes, null, null, null, null), + new Edge(dataJobOneUrn, datasetTwoUrn, consumes, null, null, null, null), + new Edge(dataJobOneUrn, datasetThreeUrn, produces, null, null, null, null), + new Edge(dataJobOneUrn, datasetFourUrn, produces, null, null, null, null), + new Edge(dataJobTwoUrn, datasetOneUrn, consumes, null, null, null, null), + new Edge(dataJobTwoUrn, datasetTwoUrn, consumes, null, null, null, null), + new Edge(dataJobTwoUrn, dataJobOneUrn, downstreamOf, null, null, null, null) ); edges.forEach(service::addEdge); @@ -295,24 +295,24 @@ abstract public class GraphServiceTestBase extends AbstractTestNGSpringContextTe Arrays.asList() }, new Object[]{ - Arrays.asList(new Edge(datasetOneUrn, datasetTwoUrn, downstreamOf)), + Arrays.asList(new Edge(datasetOneUrn, datasetTwoUrn, downstreamOf, null, null, null, null)), Arrays.asList(downstreamOfDatasetTwoRelatedEntity), Arrays.asList(downstreamOfDatasetOneRelatedEntity) }, new Object[]{ Arrays.asList( - new Edge(datasetOneUrn, datasetTwoUrn, downstreamOf), - new Edge(datasetTwoUrn, datasetThreeUrn, downstreamOf) + new Edge(datasetOneUrn, datasetTwoUrn, downstreamOf, null, null, null, null), + new Edge(datasetTwoUrn, datasetThreeUrn, downstreamOf, null, null, null, null) ), Arrays.asList(downstreamOfDatasetTwoRelatedEntity, downstreamOfDatasetThreeRelatedEntity), Arrays.asList(downstreamOfDatasetOneRelatedEntity, downstreamOfDatasetTwoRelatedEntity) }, new Object[]{ Arrays.asList( - new Edge(datasetOneUrn, datasetTwoUrn, downstreamOf), - new Edge(datasetOneUrn, userOneUrn, hasOwner), - new Edge(datasetTwoUrn, userTwoUrn, hasOwner), - new Edge(userOneUrn, userTwoUrn, knowsUser) + new Edge(datasetOneUrn, datasetTwoUrn, downstreamOf, null, null, null, null), + new Edge(datasetOneUrn, userOneUrn, hasOwner, null, null, null, null), + new Edge(datasetTwoUrn, userTwoUrn, hasOwner, null, null, null, null), + new Edge(userOneUrn, userTwoUrn, knowsUser, null, null, null, null) ), Arrays.asList( downstreamOfDatasetTwoRelatedEntity, @@ -328,9 +328,9 @@ abstract public class GraphServiceTestBase extends AbstractTestNGSpringContextTe }, new Object[]{ Arrays.asList( - new Edge(userOneUrn, userOneUrn, knowsUser), - new Edge(userOneUrn, userOneUrn, knowsUser), - new Edge(userOneUrn, userOneUrn, knowsUser) + new Edge(userOneUrn, userOneUrn, knowsUser, null, null, null, null), + new Edge(userOneUrn, userOneUrn, knowsUser, null, null, null, null), + new Edge(userOneUrn, userOneUrn, knowsUser, null, null, null, null) ), Arrays.asList(knowsUserOneRelatedEntity), Arrays.asList(knowsUserOneRelatedEntity) @@ -922,12 +922,12 @@ abstract public class GraphServiceTestBase extends AbstractTestNGSpringContextTe doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service); doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service); - service.addEdge(new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf)); + service.addEdge(new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf, null, null, null, null)); syncAfterWrite(); doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service); doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service, downstreamOfDatasetOneRelatedEntity); - service.addEdge(new Edge(datasetOneUrn, nullUrn, downstreamOf)); + service.addEdge(new Edge(datasetOneUrn, nullUrn, downstreamOf, null, null, null, null)); syncAfterWrite(); doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service, nullRelatedEntity); doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service, nullRelatedEntity, downstreamOfDatasetOneRelatedEntity); @@ -944,12 +944,12 @@ abstract public class GraphServiceTestBase extends AbstractTestNGSpringContextTe doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service); doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service); - service.addEdge(new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf)); + service.addEdge(new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf, null, null, null, null)); syncAfterWrite(); doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service); doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service, downstreamOfDatasetOneRelatedEntity); - service.addEdge(new Edge(datasetOneUrn, nullUrn, downstreamOf)); + service.addEdge(new Edge(datasetOneUrn, nullUrn, downstreamOf, null, null, null, null)); syncAfterWrite(); doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service, nullRelatedEntity); doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service, nullRelatedEntity, downstreamOfDatasetOneRelatedEntity); @@ -1424,7 +1424,7 @@ abstract public class GraphServiceTestBase extends AbstractTestNGSpringContextTe int destinationType = destinationNode % 3; Urn destination = createFromString("urn:li:type" + destinationType + ":(urn:li:node" + destinationNode + ")"); - edges.add(new Edge(source, destination, relationship)); + edges.add(new Edge(source, destination, relationship, null, null, null, null)); } } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphServiceTest.java index 9f812dbf65..b210bedbff 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphServiceTest.java @@ -188,7 +188,7 @@ public class ElasticSearchGraphServiceTest extends GraphServiceTestBase { public void testRemoveEdge() throws Exception { DatasetUrn datasetUrn = new DatasetUrn(new DataPlatformUrn("snowflake"), "test", FabricType.TEST); TagUrn tagUrn = new TagUrn("newTag"); - Edge edge = new Edge(datasetUrn, tagUrn, TAG_RELATIONSHIP); + Edge edge = new Edge(datasetUrn, tagUrn, TAG_RELATIONSHIP, null, null, null, null); getGraphService().addEdge(edge); syncAfterWrite(); RelatedEntitiesResult result = getGraphService().findRelatedEntities(Collections.singletonList(datasetType), diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/GraphIndexUtils.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/GraphIndexUtils.java new file mode 100644 index 0000000000..9f071bc467 --- /dev/null +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/GraphIndexUtils.java @@ -0,0 +1,129 @@ +package com.linkedin.metadata.kafka.hook; + +import com.datahub.util.RecordUtils; +import com.linkedin.common.urn.Urn; +import com.linkedin.data.schema.PathSpec; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.metadata.graph.Edge; +import com.linkedin.metadata.models.RelationshipFieldSpec; +import com.linkedin.mxe.MetadataChangeLog; +import lombok.extern.slf4j.Slf4j; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +@Slf4j +public class GraphIndexUtils { + + private GraphIndexUtils() { } + + @Nullable + private static List getActorList(@Nullable final String path, @Nonnull final RecordTemplate aspect) { + if (path == null) { + return null; + } + final PathSpec actorPathSpec = new PathSpec(path.split("/")); + final Optional value = RecordUtils.getFieldValue(aspect, actorPathSpec); + return (List) value.orElse(null); + } + + @Nullable + private static List getTimestampList(@Nullable final String path, @Nonnull final RecordTemplate aspect) { + if (path == null) { + return null; + } + final PathSpec timestampPathSpec = new PathSpec(path.split("/")); + final Optional value = RecordUtils.getFieldValue(aspect, timestampPathSpec); + return (List) value.orElse(null); + } + + @Nullable + private static boolean isValueListValid(@Nullable final List entryList, final int valueListSize) { + if (entryList == null) { + log.warn("Unable to get entry as entryList is null"); + return false; + } + if (valueListSize != entryList.size()) { + log.warn("Unable to get entry for graph edge as values list and entry list have differing sizes"); + return false; + } + return true; + } + + @Nullable + private static Long getTimestamp(@Nullable final List timestampList, final int index, final int valueListSize) { + if (isValueListValid(timestampList, valueListSize)) { + return timestampList.get(index); + } + return null; + } + + @Nullable + private static Urn getActor(@Nullable final List actorList, final int index, final int valueListSize) { + if (isValueListValid(actorList, valueListSize)) { + return actorList.get(index); + } + return null; + } + + /** + * Used to create new edges for the graph db, adding all the metadata associated with each edge based on the aspect. + * Returns a list of Edges to be consumed by the graph service. + */ + @Nonnull + public static List extractGraphEdges( + @Nonnull final Map.Entry> extractedFieldsEntry, + @Nonnull final RecordTemplate aspect, + @Nonnull final Urn urn, + @Nonnull final MetadataChangeLog event + ) { + final List edgesToAdd = new ArrayList<>(); + final String createdOnPath = extractedFieldsEntry.getKey().getRelationshipAnnotation().getCreatedOn(); + final String createdActorPath = extractedFieldsEntry.getKey().getRelationshipAnnotation().getCreatedActor(); + final String updatedOnPath = extractedFieldsEntry.getKey().getRelationshipAnnotation().getUpdatedOn(); + final String updatedActorPath = extractedFieldsEntry.getKey().getRelationshipAnnotation().getUpdatedActor(); + + final List createdOnList = getTimestampList(createdOnPath, aspect); + final List createdActorList = getActorList(createdActorPath, aspect); + final List updatedOnList = getTimestampList(updatedOnPath, aspect); + final List updatedActorList = getActorList(updatedActorPath, aspect); + + int index = 0; + for (Object fieldValue : extractedFieldsEntry.getValue()) { + Long createdOn = getTimestamp(createdOnList, index, extractedFieldsEntry.getValue().size()); + Urn createdActor = getActor(createdActorList, index, extractedFieldsEntry.getValue().size()); + final Long updatedOn = getTimestamp(updatedOnList, index, extractedFieldsEntry.getValue().size()); + final Urn updatedActor = getActor(updatedActorList, index, extractedFieldsEntry.getValue().size()); + + if (createdOn == null && event.hasSystemMetadata()) { + createdOn = event.getSystemMetadata().getLastObserved(); + } + if (createdActor == null && event.hasCreated()) { + createdActor = event.getCreated().getActor(); + } + + try { + edgesToAdd.add( + new Edge( + urn, + Urn.createFromString(fieldValue.toString()), + extractedFieldsEntry.getKey().getRelationshipName(), + createdOn, + createdActor, + updatedOn, + updatedActor + ) + ); + } catch (URISyntaxException e) { + log.error("Invalid destination urn: {}", fieldValue.toString(), e); + } + index++; + } + return edgesToAdd; + } +} diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java index 9ec1d755a7..bf06e99560 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java @@ -43,7 +43,6 @@ import com.linkedin.mxe.MetadataChangeLog; import com.linkedin.mxe.SystemMetadata; import com.linkedin.util.Pair; import java.io.UnsupportedEncodingException; -import java.net.URISyntaxException; import java.net.URLEncoder; import java.util.ArrayList; import java.util.HashMap; @@ -170,9 +169,9 @@ public class UpdateIndicesHook implements MetadataChangeLogHook { // Step 2. For all aspects, attempt to update Graph if (_diffMode) { - updateGraphServiceDiff(urn, aspectSpec, previousAspect, aspect); + updateGraphServiceDiff(urn, aspectSpec, previousAspect, aspect, event); } else { - updateGraphService(urn, aspectSpec, aspect); + updateGraphService(urn, aspectSpec, aspect, event); } } @@ -212,11 +211,12 @@ public class UpdateIndicesHook implements MetadataChangeLogHook { if (!aspectSpec.isTimeseries()) { deleteSystemMetadata(urn, aspectSpec, isDeletingKey); - deleteGraphData(urn, aspectSpec, aspect, isDeletingKey); + deleteGraphData(urn, aspectSpec, aspect, isDeletingKey, event); deleteSearchData(urn, entitySpec.getName(), aspectSpec, aspect, isDeletingKey); } } + // TODO: remove this method once we implement sourceOverride when creating graph edges private void updateFineGrainedEdgesAndRelationships( RecordTemplate aspect, List edgesToAdd, @@ -231,7 +231,8 @@ public class UpdateIndicesHook implements MetadataChangeLogHook { // for every downstream, create an edge with each of the upstreams for (Urn downstream : fineGrainedLineage.getDownstreams()) { for (Urn upstream : fineGrainedLineage.getUpstreams()) { - edgesToAdd.add(new Edge(downstream, upstream, DOWNSTREAM_OF)); + // TODO: add edges uniformly across aspects + edgesToAdd.add(new Edge(downstream, upstream, DOWNSTREAM_OF, null, null, null, null)); Set relationshipTypes = urnToRelationshipTypesBeingAdded.getOrDefault(downstream, new HashSet<>()); relationshipTypes.add(DOWNSTREAM_OF); urnToRelationshipTypesBeingAdded.put(downstream, relationshipTypes); @@ -248,6 +249,7 @@ public class UpdateIndicesHook implements MetadataChangeLogHook { return EntityKeyUtils.convertEntityKeyToUrn(key, Constants.SCHEMA_FIELD_ENTITY_NAME); } + // TODO: remove this method once we implement sourceOverride and update inputFields aspect private void updateInputFieldEdgesAndRelationships( @Nonnull final Urn urn, @Nonnull final InputFields inputFields, @@ -258,7 +260,8 @@ public class UpdateIndicesHook implements MetadataChangeLogHook { for (final InputField field : inputFields.getFields()) { if (field.hasSchemaFieldUrn() && field.hasSchemaField() && field.getSchemaField().hasFieldPath()) { final Urn sourceFieldUrn = generateSchemaFieldUrn(urn.toString(), field.getSchemaField().getFieldPath()); - edgesToAdd.add(new Edge(sourceFieldUrn, field.getSchemaFieldUrn(), DOWNSTREAM_OF)); + // TODO: add edges uniformly across aspects + edgesToAdd.add(new Edge(sourceFieldUrn, field.getSchemaFieldUrn(), DOWNSTREAM_OF, null, null, null, null)); final Set relationshipTypes = urnToRelationshipTypesBeingAdded.getOrDefault(sourceFieldUrn, new HashSet<>()); relationshipTypes.add(DOWNSTREAM_OF); urnToRelationshipTypesBeingAdded.put(sourceFieldUrn, relationshipTypes); @@ -267,7 +270,12 @@ public class UpdateIndicesHook implements MetadataChangeLogHook { } } - private Pair, HashMap>> getEdgesAndRelationshipTypesFromAspect(Urn urn, AspectSpec aspectSpec, @Nonnull RecordTemplate aspect) { + private Pair, HashMap>> getEdgesAndRelationshipTypesFromAspect( + @Nonnull final Urn urn, + @Nonnull final AspectSpec aspectSpec, + @Nonnull final RecordTemplate aspect, + @Nonnull final MetadataChangeLog event + ) { final List edgesToAdd = new ArrayList<>(); final HashMap> urnToRelationshipTypesBeingAdded = new HashMap<>(); @@ -288,14 +296,8 @@ public class UpdateIndicesHook implements MetadataChangeLogHook { Set relationshipTypes = urnToRelationshipTypesBeingAdded.getOrDefault(urn, new HashSet<>()); relationshipTypes.add(entry.getKey().getRelationshipName()); urnToRelationshipTypesBeingAdded.put(urn, relationshipTypes); - for (Object fieldValue : entry.getValue()) { - try { - edgesToAdd.add( - new Edge(urn, Urn.createFromString(fieldValue.toString()), entry.getKey().getRelationshipName())); - } catch (URISyntaxException e) { - log.error("Invalid destination urn: {}", fieldValue.toString(), e); - } - } + final List newEdges = GraphIndexUtils.extractGraphEdges(entry, aspect, urn, event); + edgesToAdd.addAll(newEdges); } return Pair.of(edgesToAdd, urnToRelationshipTypesBeingAdded); } @@ -303,9 +305,14 @@ public class UpdateIndicesHook implements MetadataChangeLogHook { /** * Process snapshot and update graph index */ - private void updateGraphService(Urn urn, AspectSpec aspectSpec, RecordTemplate aspect) { + private void updateGraphService( + @Nonnull final Urn urn, + @Nonnull final AspectSpec aspectSpec, + @Nonnull final RecordTemplate aspect, + @Nonnull final MetadataChangeLog event + ) { Pair, HashMap>> edgeAndRelationTypes = - getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect); + getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event); final List edgesToAdd = edgeAndRelationTypes.getFirst(); final HashMap> urnToRelationshipTypesBeingAdded = edgeAndRelationTypes.getSecond(); @@ -320,17 +327,23 @@ public class UpdateIndicesHook implements MetadataChangeLogHook { } } - private void updateGraphServiceDiff(Urn urn, AspectSpec aspectSpec, @Nullable RecordTemplate oldAspect, @Nonnull RecordTemplate newAspect) { + private void updateGraphServiceDiff( + @Nonnull final Urn urn, + @Nonnull final AspectSpec aspectSpec, + @Nullable final RecordTemplate oldAspect, + @Nonnull final RecordTemplate newAspect, + @Nonnull final MetadataChangeLog event + ) { Pair, HashMap>> oldEdgeAndRelationTypes = null; if (oldAspect != null) { - oldEdgeAndRelationTypes = getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, oldAspect); + oldEdgeAndRelationTypes = getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, oldAspect, event); } final List oldEdges = oldEdgeAndRelationTypes != null ? oldEdgeAndRelationTypes.getFirst() : Collections.emptyList(); final Set oldEdgeSet = new HashSet<>(oldEdges); Pair, HashMap>> newEdgeAndRelationTypes = - getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, newAspect); + getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, newAspect, event); final List newEdges = newEdgeAndRelationTypes.getFirst(); final Set newEdgeSet = new HashSet<>(newEdges); @@ -419,14 +432,20 @@ public class UpdateIndicesHook implements MetadataChangeLogHook { } } - private void deleteGraphData(Urn urn, AspectSpec aspectSpec, RecordTemplate aspect, Boolean isKeyAspect) { + private void deleteGraphData( + @Nonnull final Urn urn, + @Nonnull final AspectSpec aspectSpec, + @Nonnull final RecordTemplate aspect, + @Nonnull final Boolean isKeyAspect, + @Nonnull final MetadataChangeLog event + ) { if (isKeyAspect) { _graphService.removeNode(urn); return; } Pair, HashMap>> edgeAndRelationTypes = - getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect); + getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event); final HashMap> urnToRelationshipTypesBeingAdded = edgeAndRelationTypes.getSecond(); if (urnToRelationshipTypesBeingAdded.size() > 0) { diff --git a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/GraphIndexUtilsTest.java b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/GraphIndexUtilsTest.java new file mode 100644 index 0000000000..40b0670d73 --- /dev/null +++ b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/GraphIndexUtilsTest.java @@ -0,0 +1,119 @@ +package com.linkedin.metadata.kafka.hook; + +import com.linkedin.common.AuditStamp; +import com.linkedin.common.urn.DatasetUrn; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.dataset.DatasetLineageType; +import com.linkedin.dataset.Upstream; +import com.linkedin.dataset.UpstreamArray; +import com.linkedin.dataset.UpstreamLineage; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.Constants; +import com.linkedin.metadata.graph.Edge; +import com.linkedin.metadata.models.AspectSpec; +import com.linkedin.metadata.models.EntitySpec; +import com.linkedin.metadata.models.RelationshipFieldSpec; +import com.linkedin.metadata.models.extractor.FieldExtractor; +import com.linkedin.metadata.models.registry.ConfigEntityRegistry; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.utils.GenericRecordUtils; +import com.linkedin.mxe.MetadataChangeLog; +import com.linkedin.mxe.SystemMetadata; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class GraphIndexUtilsTest { + + private static final String UPSTREAM_RELATIONSHIP_PATH = "/upstreams/*/dataset"; + private static final long CREATED_EVENT_TIME = 123L; + private static final long UPDATED_EVENT_TIME = 234L; + private Urn _datasetUrn; + private DatasetUrn _upstreamDataset1; + private DatasetUrn _upstreamDataset2; + private static final String CREATED_ACTOR_URN = "urn:li:corpuser:creating"; + private static final String UPDATED_ACTOR_URN = "urn:li:corpuser:updating"; + private EntityRegistry _mockRegistry; + private Urn _createdActorUrn; + private Urn _updatedActorUrn; + + @BeforeMethod + public void setupTest() { + _createdActorUrn = UrnUtils.getUrn(CREATED_ACTOR_URN); + _updatedActorUrn = UrnUtils.getUrn(UPDATED_ACTOR_URN); + _datasetUrn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-proj.jaffle_shop.customers,PROD)"); + _upstreamDataset1 = UrnUtils.toDatasetUrn("snowflake", "test", "DEV"); + _upstreamDataset2 = UrnUtils.toDatasetUrn("snowflake", "test2", "DEV"); + _mockRegistry = new ConfigEntityRegistry( + UpdateIndicesHookTest.class.getClassLoader().getResourceAsStream("test-entity-registry.yml")); + } + + @Test + public void testExtractGraphEdgesDefault() { + UpstreamLineage upstreamLineage = createUpstreamLineage(); + MetadataChangeLog event = createMCL(upstreamLineage); + + EntitySpec entitySpec = _mockRegistry.getEntitySpec(event.getEntityType()); + AspectSpec aspectSpec = entitySpec.getAspectSpec(event.getAspectName()); + + Map> extractedFields = + FieldExtractor.extractFields(upstreamLineage, aspectSpec.getRelationshipFieldSpecs()); + + for (Map.Entry> entry : extractedFields.entrySet()) { + // check specifically for the upstreams relationship entry + if (entry.getKey().getPath().toString().equals(UPSTREAM_RELATIONSHIP_PATH)) { + List edgesToAdd = GraphIndexUtils.extractGraphEdges(entry, upstreamLineage, _datasetUrn, event); + List expectedEdgesToAdd = new ArrayList<>(); + // edges contain default created event time and created actor from system metadata + Edge edge1 = new Edge(_datasetUrn, _upstreamDataset1, entry.getKey().getRelationshipName(), CREATED_EVENT_TIME, _createdActorUrn, null, null); + Edge edge2 = new Edge(_datasetUrn, _upstreamDataset2, entry.getKey().getRelationshipName(), CREATED_EVENT_TIME, _createdActorUrn, null, null); + expectedEdgesToAdd.add(edge1); + expectedEdgesToAdd.add(edge2); + Assert.assertEquals(expectedEdgesToAdd.size(), edgesToAdd.size()); + Assert.assertTrue(edgesToAdd.containsAll(expectedEdgesToAdd)); + Assert.assertTrue(expectedEdgesToAdd.containsAll(edgesToAdd)); + } + } + } + + private UpstreamLineage createUpstreamLineage() { + UpstreamLineage upstreamLineage = new UpstreamLineage(); + UpstreamArray upstreams = new UpstreamArray(); + Upstream upstream1 = new Upstream(); + upstream1.setDataset(_upstreamDataset1); + upstream1.setAuditStamp(new AuditStamp().setActor(_updatedActorUrn).setTime(UPDATED_EVENT_TIME)); + upstream1.setType(DatasetLineageType.TRANSFORMED); + Upstream upstream2 = new Upstream(); + upstream2.setDataset(_upstreamDataset2); + upstream2.setAuditStamp(new AuditStamp().setActor(_updatedActorUrn).setTime(UPDATED_EVENT_TIME)); + upstream2.setType(DatasetLineageType.TRANSFORMED); + upstreams.add(upstream1); + upstreams.add(upstream2); + upstreamLineage.setUpstreams(upstreams); + + return upstreamLineage; + } + + private MetadataChangeLog createMCL(RecordTemplate aspect) { + MetadataChangeLog event = new MetadataChangeLog(); + event.setEntityType(Constants.DATASET_ENTITY_NAME); + event.setAspectName(Constants.UPSTREAM_LINEAGE_ASPECT_NAME); + event.setChangeType(ChangeType.UPSERT); + + event.setAspect(GenericRecordUtils.serializeAspect(aspect)); + event.setEntityUrn(_datasetUrn); + + SystemMetadata systemMetadata = new SystemMetadata(); + systemMetadata.setLastObserved(CREATED_EVENT_TIME); + event.setSystemMetadata(systemMetadata); + event.setCreated(new AuditStamp().setActor(_createdActorUrn).setTime(CREATED_EVENT_TIME)); + + return event; + } +} diff --git a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java index d9feb9c1dc..1aafd854b8 100644 --- a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java +++ b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java @@ -89,7 +89,7 @@ public class UpdateIndicesHookTest { MetadataChangeLog event = createUpstreamLineageMCL(upstreamUrn, downstreamUrn); _updateIndicesHook.invoke(event); - Edge edge = new Edge(downstreamUrn, upstreamUrn, DOWNSTREAM_OF); + Edge edge = new Edge(downstreamUrn, upstreamUrn, DOWNSTREAM_OF, null, null, null, null); Mockito.verify(_mockGraphService, Mockito.times(1)).addEdge(Mockito.eq(edge)); Mockito.verify(_mockGraphService, Mockito.times(1)).removeEdgesFromNode( Mockito.eq(downstreamUrn), @@ -117,7 +117,7 @@ public class UpdateIndicesHookTest { Urn downstreamUrn = UrnUtils.getUrn(String.format("urn:li:schemaField:(%s,%s)", TEST_CHART_URN, downstreamFieldPath)); - Edge edge = new Edge(downstreamUrn, upstreamUrn, DOWNSTREAM_OF); + Edge edge = new Edge(downstreamUrn, upstreamUrn, DOWNSTREAM_OF, null, null, null, null); Mockito.verify(_mockGraphService, Mockito.times(1)).addEdge(Mockito.eq(edge)); Mockito.verify(_mockGraphService, Mockito.times(1)).removeEdgesFromNode( Mockito.eq(downstreamUrn),