mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-02 11:49:23 +00:00
feat(graphql): Support logical models (#14588)
This commit is contained in:
parent
12e2e83505
commit
0aaa8c6987
@ -32,6 +32,7 @@ public class Constants {
|
||||
public static final String VERSION_SCHEMA_FILE = "versioning.graphql";
|
||||
public static final String OPERATIONS_SCHEMA_FILE = "operations.graphql";
|
||||
public static final String TIMESERIES_SCHEMA_FILE = "timeseries.graphql";
|
||||
public static final String LOGICAL_SCHEMA_FILE = "logical.graphql";
|
||||
|
||||
public static final String QUERY_SCHEMA_FILE = "query.graphql";
|
||||
public static final String TEMPLATE_SCHEMA_FILE = "template.graphql";
|
||||
|
||||
@ -141,6 +141,7 @@ import com.linkedin.datahub.graphql.resolvers.load.LoadableTypeResolver;
|
||||
import com.linkedin.datahub.graphql.resolvers.load.OwnerTypeBatchResolver;
|
||||
import com.linkedin.datahub.graphql.resolvers.load.OwnerTypeResolver;
|
||||
import com.linkedin.datahub.graphql.resolvers.load.TimeSeriesAspectResolver;
|
||||
import com.linkedin.datahub.graphql.resolvers.logical.SetLogicalParentResolver;
|
||||
import com.linkedin.datahub.graphql.resolvers.module.DeletePageModuleResolver;
|
||||
import com.linkedin.datahub.graphql.resolvers.module.UpsertPageModuleResolver;
|
||||
import com.linkedin.datahub.graphql.resolvers.mutate.AddLinkResolver;
|
||||
@ -359,6 +360,7 @@ import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
@ -829,6 +831,7 @@ public class GmsGraphQLEngine {
|
||||
.addSchema(fileBasedSchema(PROPERTIES_SCHEMA_FILE))
|
||||
.addSchema(fileBasedSchema(FORMS_SCHEMA_FILE))
|
||||
.addSchema(fileBasedSchema(COMMON_SCHEMA_FILE))
|
||||
.addSchema(fileBasedSchema(LOGICAL_SCHEMA_FILE))
|
||||
.addSchema(fileBasedSchema(CONNECTIONS_SCHEMA_FILE))
|
||||
.addSchema(fileBasedSchema(ASSERTIONS_SCHEMA_FILE))
|
||||
.addSchema(fileBasedSchema(INCIDENTS_SCHEMA_FILE))
|
||||
@ -1392,6 +1395,7 @@ public class GmsGraphQLEngine {
|
||||
"deletePageTemplate", new DeletePageTemplateResolver(this.pageTemplateService))
|
||||
.dataFetcher("upsertPageModule", new UpsertPageModuleResolver(this.pageModuleService))
|
||||
.dataFetcher("deletePageModule", new DeletePageModuleResolver(this.pageModuleService))
|
||||
.dataFetcher("setLogicalParent", new SetLogicalParentResolver(this.entityClient))
|
||||
.dataFetcher(
|
||||
"updateDocPropagationSettings",
|
||||
new UpdateDocPropagationSettingsResolver(this.settingsService))
|
||||
@ -1753,7 +1757,15 @@ public class GmsGraphQLEngine {
|
||||
.dataFetcher("parentContainers", new ParentContainersResolver(entityClient))
|
||||
.dataFetcher(
|
||||
"siblingsSearch",
|
||||
new SiblingsSearchResolver(this.entityClient, this.viewService)))
|
||||
new SiblingsSearchResolver(this.entityClient, this.viewService))
|
||||
.dataFetcher(
|
||||
"logicalParent",
|
||||
new EntityTypeResolver(
|
||||
entityTypes,
|
||||
(env) ->
|
||||
Optional.ofNullable((Dataset) env.getSource())
|
||||
.map(Dataset::getLogicalParent)
|
||||
.orElse(null))))
|
||||
.type(
|
||||
"Owner",
|
||||
typeWiring ->
|
||||
@ -1919,6 +1931,14 @@ public class GmsGraphQLEngine {
|
||||
"parent",
|
||||
new EntityTypeResolver(
|
||||
entityTypes, (env) -> ((SchemaFieldEntity) env.getSource()).getParent()))
|
||||
.dataFetcher(
|
||||
"logicalParent",
|
||||
new EntityTypeResolver(
|
||||
entityTypes,
|
||||
(env) ->
|
||||
Optional.ofNullable((Dataset) env.getSource())
|
||||
.map(Dataset::getLogicalParent)
|
||||
.orElse(null)))
|
||||
.dataFetcher("relationships", new EntityRelationshipsResultResolver(graphClient))
|
||||
.dataFetcher(
|
||||
"lineage",
|
||||
@ -2342,6 +2362,15 @@ public class GmsGraphQLEngine {
|
||||
browsableTypes.stream()
|
||||
.map(graphType -> (EntityType<?, ?>) graphType)
|
||||
.collect(Collectors.toList()))))
|
||||
.type(
|
||||
"HasLogicalParent",
|
||||
typeWiring ->
|
||||
typeWiring.typeResolver(
|
||||
new EntityInterfaceTypeResolver(
|
||||
loadableTypes.stream()
|
||||
.filter(graphType -> graphType instanceof EntityType)
|
||||
.map(graphType -> (EntityType<?, ?>) graphType)
|
||||
.collect(Collectors.toList()))))
|
||||
.type(
|
||||
"OwnerType",
|
||||
typeWiring ->
|
||||
|
||||
@ -0,0 +1,80 @@
|
||||
package com.linkedin.datahub.graphql.resolvers.logical;
|
||||
|
||||
import static com.linkedin.data.template.SetMode.*;
|
||||
import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;
|
||||
import static com.linkedin.datahub.graphql.resolvers.mutate.MutationUtils.*;
|
||||
import static com.linkedin.metadata.Constants.*;
|
||||
|
||||
import com.linkedin.common.AuditStamp;
|
||||
import com.linkedin.common.Edge;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.datahub.graphql.QueryContext;
|
||||
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
|
||||
import com.linkedin.datahub.graphql.generated.SetLogicalParentInput;
|
||||
import com.linkedin.entity.client.EntityClient;
|
||||
import com.linkedin.logical.LogicalParent;
|
||||
import com.linkedin.mxe.MetadataChangeProposal;
|
||||
import graphql.schema.DataFetcher;
|
||||
import graphql.schema.DataFetchingEnvironment;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import javax.annotation.Nullable;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class SetLogicalParentResolver implements DataFetcher<CompletableFuture<Boolean>> {
|
||||
private final EntityClient _entityClient;
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> get(DataFetchingEnvironment environment) throws Exception {
|
||||
// Note: No validation on existence of parent urn
|
||||
|
||||
final QueryContext context = environment.getContext();
|
||||
final SetLogicalParentInput input =
|
||||
bindArgument(environment.getArgument("input"), SetLogicalParentInput.class);
|
||||
final Urn entityUrn = Urn.createFromString(input.getResourceUrn());
|
||||
@Nullable final String parent = input.getParentUrn();
|
||||
LogicalParent logicalParent = createLogicalParent(parent, context);
|
||||
final MetadataChangeProposal proposal =
|
||||
buildMetadataChangeProposalWithUrn(entityUrn, LOGICAL_PARENT_ASPECT_NAME, logicalParent);
|
||||
|
||||
return GraphQLConcurrencyUtils.supplyAsync(
|
||||
() -> {
|
||||
try {
|
||||
_entityClient.ingestProposal(context.getOperationContext(), proposal, false);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
log.error(
|
||||
"Failed to set Logical Parent on entity urn {} to {}: {}",
|
||||
entityUrn,
|
||||
parent,
|
||||
e.getMessage());
|
||||
throw new RuntimeException(
|
||||
String.format("Failed to set Logical Parent on entity %s to %s", entityUrn, parent),
|
||||
e);
|
||||
}
|
||||
},
|
||||
this.getClass().getSimpleName(),
|
||||
"get");
|
||||
}
|
||||
|
||||
private LogicalParent createLogicalParent(@Nullable String parent, QueryContext context)
|
||||
throws URISyntaxException {
|
||||
if (parent == null) {
|
||||
return new LogicalParent().setParent(null, REMOVE_IF_NULL);
|
||||
}
|
||||
|
||||
Urn parentUrn = Urn.createFromString(parent);
|
||||
Urn actor = Urn.createFromString(context.getActorUrn());
|
||||
long now = System.currentTimeMillis();
|
||||
Edge edge =
|
||||
new Edge()
|
||||
.setDestinationUrn(parentUrn)
|
||||
.setCreated(new AuditStamp().setTime(now).setActor(actor))
|
||||
.setLastModified(new AuditStamp().setTime(now).setActor(actor));
|
||||
|
||||
return new LogicalParent().setParent(edge);
|
||||
}
|
||||
}
|
||||
@ -91,7 +91,8 @@ public class DatasetType
|
||||
FORMS_ASPECT_NAME,
|
||||
SUB_TYPES_ASPECT_NAME,
|
||||
APPLICATION_MEMBERSHIP_ASPECT_NAME,
|
||||
VERSION_PROPERTIES_ASPECT_NAME);
|
||||
VERSION_PROPERTIES_ASPECT_NAME,
|
||||
LOGICAL_PARENT_ASPECT_NAME);
|
||||
|
||||
private static final Set<String> FACET_FIELDS = ImmutableSet.of("origin", "platform");
|
||||
private static final String ENTITY_NAME = "dataset";
|
||||
|
||||
@ -42,6 +42,7 @@ import com.linkedin.datahub.graphql.types.common.mappers.SiblingsMapper;
|
||||
import com.linkedin.datahub.graphql.types.common.mappers.StatusMapper;
|
||||
import com.linkedin.datahub.graphql.types.common.mappers.SubTypesMapper;
|
||||
import com.linkedin.datahub.graphql.types.common.mappers.UpstreamLineagesMapper;
|
||||
import com.linkedin.datahub.graphql.types.common.mappers.UrnToEntityMapper;
|
||||
import com.linkedin.datahub.graphql.types.common.mappers.util.MappingHelper;
|
||||
import com.linkedin.datahub.graphql.types.common.mappers.util.SystemMetadataUtils;
|
||||
import com.linkedin.datahub.graphql.types.domain.DomainAssociationMapper;
|
||||
@ -60,10 +61,12 @@ import com.linkedin.dataset.ViewProperties;
|
||||
import com.linkedin.domain.Domains;
|
||||
import com.linkedin.entity.EntityResponse;
|
||||
import com.linkedin.entity.EnvelopedAspectMap;
|
||||
import com.linkedin.logical.LogicalParent;
|
||||
import com.linkedin.metadata.key.DatasetKey;
|
||||
import com.linkedin.schema.EditableSchemaMetadata;
|
||||
import com.linkedin.schema.SchemaMetadata;
|
||||
import com.linkedin.structured.StructuredProperties;
|
||||
import java.util.Optional;
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -197,6 +200,15 @@ public class DatasetMapper implements ModelMapper<EntityResponse, Dataset> {
|
||||
(entity, dataMap) ->
|
||||
entity.setVersionProperties(
|
||||
VersionPropertiesMapper.map(context, new VersionProperties(dataMap))));
|
||||
mappingHelper.mapToResult(
|
||||
LOGICAL_PARENT_ASPECT_NAME,
|
||||
(entity, dataMap) ->
|
||||
entity.setLogicalParent(
|
||||
Optional.ofNullable(new LogicalParent(dataMap).getParent())
|
||||
.map(
|
||||
logicalParent ->
|
||||
UrnToEntityMapper.map(context, logicalParent.getDestinationUrn()))
|
||||
.orElse(null)));
|
||||
|
||||
if (context != null && !canView(context.getOperationContext(), entityUrn)) {
|
||||
return AuthorizationUtils.restrictEntity(mappingHelper.getResult(), Dataset.class);
|
||||
|
||||
@ -24,7 +24,9 @@ import com.linkedin.datahub.graphql.types.structuredproperty.StructuredPropertie
|
||||
import com.linkedin.datahub.graphql.types.tag.mappers.GlobalTagsMapper;
|
||||
import com.linkedin.entity.EntityResponse;
|
||||
import com.linkedin.entity.EnvelopedAspectMap;
|
||||
import com.linkedin.logical.LogicalParent;
|
||||
import com.linkedin.structured.StructuredProperties;
|
||||
import java.util.Optional;
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
@ -77,6 +79,15 @@ public class SchemaFieldMapper implements ModelMapper<EntityResponse, SchemaFiel
|
||||
(schemaField, dataMap) ->
|
||||
schemaField.setGlossaryTerms(
|
||||
GlossaryTermsMapper.map(context, new GlossaryTerms(dataMap), entityUrn)));
|
||||
mappingHelper.mapToResult(
|
||||
LOGICAL_PARENT_ASPECT_NAME,
|
||||
(entity, dataMap) ->
|
||||
entity.setLogicalParent(
|
||||
Optional.ofNullable(new LogicalParent(dataMap).getParent())
|
||||
.map(
|
||||
logicalParent ->
|
||||
UrnToEntityMapper.map(context, logicalParent.getDestinationUrn()))
|
||||
.orElse(null)));
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -37,7 +37,8 @@ public class SchemaFieldType
|
||||
DOCUMENTATION_ASPECT_NAME,
|
||||
STATUS_ASPECT_NAME,
|
||||
GLOBAL_TAGS_ASPECT_NAME,
|
||||
GLOSSARY_TERMS_ASPECT_NAME);
|
||||
GLOSSARY_TERMS_ASPECT_NAME,
|
||||
LOGICAL_PARENT_ASPECT_NAME);
|
||||
|
||||
private final EntityClient _entityClient;
|
||||
private final FeatureFlags _featureFlags;
|
||||
|
||||
32
datahub-graphql-core/src/main/resources/logical.graphql
Normal file
32
datahub-graphql-core/src/main/resources/logical.graphql
Normal file
@ -0,0 +1,32 @@
|
||||
interface HasLogicalParent {
|
||||
"""
|
||||
If this entity represents a physical asset, this is its logical parent, from which metadata can propagate.
|
||||
"""
|
||||
logicalParent: Entity
|
||||
}
|
||||
|
||||
extend type Dataset implements HasLogicalParent {
|
||||
"""
|
||||
If this entity represents a physical asset, this is its logical parent, from which metadata can propagate.
|
||||
"""
|
||||
logicalParent: Entity
|
||||
}
|
||||
|
||||
extend type SchemaFieldEntity implements HasLogicalParent {
|
||||
"""
|
||||
If this entity represents a physical asset, this is its logical parent, from which metadata can propagate.
|
||||
"""
|
||||
logicalParent: Entity
|
||||
}
|
||||
|
||||
input SetLogicalParentInput {
|
||||
resourceUrn: String!
|
||||
parentUrn: String
|
||||
}
|
||||
|
||||
extend type Mutation {
|
||||
"""
|
||||
Set or unset an entity's logical parent
|
||||
"""
|
||||
setLogicalParent(input: SetLogicalParentInput!): Boolean
|
||||
}
|
||||
@ -0,0 +1,178 @@
|
||||
package com.linkedin.datahub.graphql.resolvers.logical;
|
||||
|
||||
import static com.linkedin.datahub.graphql.TestUtils.*;
|
||||
import static com.linkedin.metadata.Constants.*;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||
import static org.testng.Assert.*;
|
||||
|
||||
import com.linkedin.common.Edge;
|
||||
import com.linkedin.datahub.graphql.QueryContext;
|
||||
import com.linkedin.datahub.graphql.generated.SetLogicalParentInput;
|
||||
import com.linkedin.entity.client.EntityClient;
|
||||
import com.linkedin.logical.LogicalParent;
|
||||
import com.linkedin.metadata.utils.GenericRecordUtils;
|
||||
import com.linkedin.mxe.MetadataChangeProposal;
|
||||
import com.linkedin.r2.RemoteInvocationException;
|
||||
import graphql.schema.DataFetchingEnvironment;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import org.mockito.Mockito;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
public class SetLogicalParentResolverTest {
|
||||
|
||||
private static final String TEST_ENTITY_URN =
|
||||
"urn:li:dataset:(urn:li:dataPlatform:mysql,my-test,PROD)";
|
||||
private static final String TEST_PARENT_URN =
|
||||
"urn:li:dataset:(urn:li:dataPlatform:mysql,my-parent,PROD)";
|
||||
private static final String TEST_ACTOR_URN = "urn:li:corpuser:test";
|
||||
|
||||
@Test
|
||||
public void testGetSuccessSetParent() throws Exception {
|
||||
EntityClient mockClient = Mockito.mock(EntityClient.class);
|
||||
SetLogicalParentResolver resolver = new SetLogicalParentResolver(mockClient);
|
||||
|
||||
QueryContext mockContext = getMockAllowContext();
|
||||
Mockito.when(mockContext.getActorUrn()).thenReturn(TEST_ACTOR_URN);
|
||||
|
||||
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
|
||||
SetLogicalParentInput input = new SetLogicalParentInput();
|
||||
input.setResourceUrn(TEST_ENTITY_URN);
|
||||
input.setParentUrn(TEST_PARENT_URN);
|
||||
|
||||
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(input);
|
||||
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
|
||||
|
||||
assertTrue(resolver.get(mockEnv).get());
|
||||
|
||||
Mockito.verify(mockClient, Mockito.times(1))
|
||||
.ingestProposal(any(), any(MetadataChangeProposal.class), anyBoolean());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSuccessRemoveParent() throws Exception {
|
||||
EntityClient mockClient = Mockito.mock(EntityClient.class);
|
||||
SetLogicalParentResolver resolver = new SetLogicalParentResolver(mockClient);
|
||||
|
||||
QueryContext mockContext = getMockAllowContext();
|
||||
Mockito.when(mockContext.getActorUrn()).thenReturn(TEST_ACTOR_URN);
|
||||
|
||||
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
|
||||
SetLogicalParentInput input = new SetLogicalParentInput();
|
||||
input.setResourceUrn(TEST_ENTITY_URN);
|
||||
input.setParentUrn(null);
|
||||
|
||||
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(input);
|
||||
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
|
||||
|
||||
assertTrue(resolver.get(mockEnv).get());
|
||||
|
||||
Mockito.verify(mockClient, Mockito.times(1))
|
||||
.ingestProposal(any(), any(MetadataChangeProposal.class), anyBoolean());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetEntityClientException() throws Exception {
|
||||
EntityClient mockClient = Mockito.mock(EntityClient.class);
|
||||
Mockito.doThrow(RemoteInvocationException.class)
|
||||
.when(mockClient)
|
||||
.ingestProposal(any(), any(MetadataChangeProposal.class), anyBoolean());
|
||||
|
||||
SetLogicalParentResolver resolver = new SetLogicalParentResolver(mockClient);
|
||||
|
||||
QueryContext mockContext = getMockAllowContext();
|
||||
Mockito.when(mockContext.getActorUrn()).thenReturn(TEST_ACTOR_URN);
|
||||
|
||||
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
|
||||
SetLogicalParentInput input = new SetLogicalParentInput();
|
||||
input.setResourceUrn(TEST_ENTITY_URN);
|
||||
input.setParentUrn(TEST_PARENT_URN);
|
||||
|
||||
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(input);
|
||||
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
|
||||
|
||||
assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateLogicalParentWithParent() throws Exception {
|
||||
EntityClient mockClient = Mockito.mock(EntityClient.class);
|
||||
SetLogicalParentResolver resolver = new SetLogicalParentResolver(mockClient);
|
||||
|
||||
QueryContext mockContext = getMockAllowContext();
|
||||
Mockito.when(mockContext.getActorUrn()).thenReturn(TEST_ACTOR_URN);
|
||||
|
||||
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
|
||||
SetLogicalParentInput input = new SetLogicalParentInput();
|
||||
input.setResourceUrn(TEST_ENTITY_URN);
|
||||
input.setParentUrn(TEST_PARENT_URN);
|
||||
|
||||
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(input);
|
||||
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
|
||||
|
||||
resolver.get(mockEnv).get();
|
||||
|
||||
Mockito.verify(mockClient, Mockito.times(1))
|
||||
.ingestProposal(
|
||||
any(),
|
||||
Mockito.argThat(
|
||||
proposal -> {
|
||||
try {
|
||||
LogicalParent logicalParent =
|
||||
GenericRecordUtils.deserializeAspect(
|
||||
proposal.getAspect().getValue(),
|
||||
proposal.getAspect().getContentType(),
|
||||
LogicalParent.class);
|
||||
if (logicalParent.getParent() == null) {
|
||||
return false;
|
||||
}
|
||||
Edge edge = logicalParent.getParent();
|
||||
return edge.getDestinationUrn().toString().equals(TEST_PARENT_URN)
|
||||
&& edge.getCreated() != null
|
||||
&& edge.getLastModified() != null
|
||||
&& edge.getCreated().getActor().toString().equals(TEST_ACTOR_URN)
|
||||
&& edge.getLastModified().getActor().toString().equals(TEST_ACTOR_URN);
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
}),
|
||||
anyBoolean());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateLogicalParentWithoutParent() throws Exception {
|
||||
EntityClient mockClient = Mockito.mock(EntityClient.class);
|
||||
SetLogicalParentResolver resolver = new SetLogicalParentResolver(mockClient);
|
||||
|
||||
QueryContext mockContext = getMockAllowContext();
|
||||
Mockito.when(mockContext.getActorUrn()).thenReturn(TEST_ACTOR_URN);
|
||||
|
||||
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
|
||||
SetLogicalParentInput input = new SetLogicalParentInput();
|
||||
input.setResourceUrn(TEST_ENTITY_URN);
|
||||
input.setParentUrn(null);
|
||||
|
||||
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(input);
|
||||
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
|
||||
|
||||
resolver.get(mockEnv).get();
|
||||
|
||||
Mockito.verify(mockClient, Mockito.times(1))
|
||||
.ingestProposal(
|
||||
any(),
|
||||
Mockito.argThat(
|
||||
proposal -> {
|
||||
try {
|
||||
LogicalParent logicalParent =
|
||||
GenericRecordUtils.deserializeAspect(
|
||||
proposal.getAspect().getValue(),
|
||||
proposal.getAspect().getContentType(),
|
||||
LogicalParent.class);
|
||||
return logicalParent.getParent() == null;
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
}),
|
||||
anyBoolean());
|
||||
}
|
||||
}
|
||||
@ -1,13 +1,15 @@
|
||||
package com.linkedin.datahub.graphql.types.dataset.mappers;
|
||||
|
||||
import com.linkedin.common.AuditStamp;
|
||||
import com.linkedin.common.Edge;
|
||||
import com.linkedin.common.TimeStamp;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.datahub.graphql.generated.AuditStamp;
|
||||
import com.linkedin.datahub.graphql.generated.Dataset;
|
||||
import com.linkedin.datahub.graphql.generated.DatasetProperties;
|
||||
import com.linkedin.entity.Aspect;
|
||||
import com.linkedin.entity.EntityResponse;
|
||||
import com.linkedin.entity.EnvelopedAspectMap;
|
||||
import com.linkedin.logical.LogicalParent;
|
||||
import com.linkedin.metadata.Constants;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
@ -22,6 +24,10 @@ public class DatasetMapperTest {
|
||||
Urn.createFromTuple(Constants.CORP_USER_ENTITY_NAME, "created");
|
||||
private static final Urn TEST_LAST_MODIFIED_ACTOR_URN =
|
||||
Urn.createFromTuple(Constants.CORP_USER_ENTITY_NAME, "lastmodified");
|
||||
private static final Urn TEST_PARENT_URN =
|
||||
Urn.createFromTuple(Constants.DATASET_ENTITY_NAME, "parent");
|
||||
private static final Urn TEST_ACTOR_URN =
|
||||
Urn.createFromTuple(Constants.CORP_USER_ENTITY_NAME, "actor");
|
||||
|
||||
@Test
|
||||
public void testDatasetPropertiesMapperWithCreatedAndLastModified() {
|
||||
@ -60,7 +66,8 @@ public class DatasetMapperTest {
|
||||
expectedDatasetProperties.setLastModifiedActor(TEST_LAST_MODIFIED_ACTOR_URN.toString());
|
||||
expectedDatasetProperties.setCreatedActor(TEST_CREATED_ACTOR_URN.toString());
|
||||
expectedDatasetProperties.setLastModified(
|
||||
new AuditStamp(20L, TEST_LAST_MODIFIED_ACTOR_URN.toString()));
|
||||
new com.linkedin.datahub.graphql.generated.AuditStamp(
|
||||
20L, TEST_LAST_MODIFIED_ACTOR_URN.toString()));
|
||||
expectedDatasetProperties.setCreated(10L);
|
||||
expected.setProperties(expectedDatasetProperties);
|
||||
|
||||
@ -108,7 +115,8 @@ public class DatasetMapperTest {
|
||||
expectedDatasetProperties.setName("Test");
|
||||
expectedDatasetProperties.setLastModifiedActor(null);
|
||||
expectedDatasetProperties.setCreatedActor(null);
|
||||
expectedDatasetProperties.setLastModified(new AuditStamp(0L, null));
|
||||
expectedDatasetProperties.setLastModified(
|
||||
new com.linkedin.datahub.graphql.generated.AuditStamp(0L, null));
|
||||
expectedDatasetProperties.setCreated(null);
|
||||
expected.setProperties(expectedDatasetProperties);
|
||||
|
||||
@ -162,7 +170,8 @@ public class DatasetMapperTest {
|
||||
expectedDatasetProperties.setName("Test");
|
||||
expectedDatasetProperties.setLastModifiedActor(null);
|
||||
expectedDatasetProperties.setCreatedActor(null);
|
||||
expectedDatasetProperties.setLastModified(new AuditStamp(20L, null));
|
||||
expectedDatasetProperties.setLastModified(
|
||||
new com.linkedin.datahub.graphql.generated.AuditStamp(20L, null));
|
||||
expectedDatasetProperties.setCreated(10L);
|
||||
expected.setProperties(expectedDatasetProperties);
|
||||
|
||||
@ -183,4 +192,66 @@ public class DatasetMapperTest {
|
||||
Assert.assertEquals(
|
||||
actual.getProperties().getCreatedActor(), expected.getProperties().getCreatedActor());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDatasetMapperWithLogicalParent() {
|
||||
final LogicalParent input = new LogicalParent();
|
||||
final Edge edge = new Edge();
|
||||
edge.setDestinationUrn(TEST_PARENT_URN);
|
||||
edge.setCreated(new AuditStamp().setTime(10L).setActor(TEST_ACTOR_URN));
|
||||
edge.setLastModified(new AuditStamp().setTime(20L).setActor(TEST_ACTOR_URN));
|
||||
input.setParent(edge);
|
||||
|
||||
final Map<String, com.linkedin.entity.EnvelopedAspect> aspects = new HashMap<>();
|
||||
aspects.put(
|
||||
Constants.LOGICAL_PARENT_ASPECT_NAME,
|
||||
new com.linkedin.entity.EnvelopedAspect().setValue(new Aspect(input.data())));
|
||||
|
||||
final EntityResponse response =
|
||||
new EntityResponse()
|
||||
.setEntityName(Constants.DATASET_ENTITY_NAME)
|
||||
.setUrn(TEST_DATASET_URN)
|
||||
.setAspects(new EnvelopedAspectMap(aspects));
|
||||
|
||||
final Dataset actual = DatasetMapper.map(null, response);
|
||||
|
||||
Assert.assertNotNull(actual.getLogicalParent());
|
||||
Assert.assertEquals(actual.getLogicalParent().getUrn(), TEST_PARENT_URN.toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDatasetMapperWithNullLogicalParent() {
|
||||
final LogicalParent input = new LogicalParent();
|
||||
// Don't set parent - leave it as default (null)
|
||||
|
||||
final Map<String, com.linkedin.entity.EnvelopedAspect> aspects = new HashMap<>();
|
||||
aspects.put(
|
||||
Constants.LOGICAL_PARENT_ASPECT_NAME,
|
||||
new com.linkedin.entity.EnvelopedAspect().setValue(new Aspect(input.data())));
|
||||
|
||||
final EntityResponse response =
|
||||
new EntityResponse()
|
||||
.setEntityName(Constants.DATASET_ENTITY_NAME)
|
||||
.setUrn(TEST_DATASET_URN)
|
||||
.setAspects(new EnvelopedAspectMap(aspects));
|
||||
|
||||
final Dataset actual = DatasetMapper.map(null, response);
|
||||
|
||||
Assert.assertNull(actual.getLogicalParent());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDatasetMapperWithoutLogicalParent() {
|
||||
final Map<String, com.linkedin.entity.EnvelopedAspect> aspects = new HashMap<>();
|
||||
|
||||
final EntityResponse response =
|
||||
new EntityResponse()
|
||||
.setEntityName(Constants.DATASET_ENTITY_NAME)
|
||||
.setUrn(TEST_DATASET_URN)
|
||||
.setAspects(new EnvelopedAspectMap(aspects));
|
||||
|
||||
final Dataset actual = DatasetMapper.map(null, response);
|
||||
|
||||
Assert.assertNull(actual.getLogicalParent());
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,143 @@
|
||||
package com.linkedin.datahub.graphql.types.schemafield;
|
||||
|
||||
import static org.testng.Assert.*;
|
||||
|
||||
import com.linkedin.common.AuditStamp;
|
||||
import com.linkedin.common.Edge;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.datahub.graphql.generated.EntityType;
|
||||
import com.linkedin.datahub.graphql.generated.SchemaFieldEntity;
|
||||
import com.linkedin.entity.Aspect;
|
||||
import com.linkedin.entity.EntityResponse;
|
||||
import com.linkedin.entity.EnvelopedAspect;
|
||||
import com.linkedin.entity.EnvelopedAspectMap;
|
||||
import com.linkedin.logical.LogicalParent;
|
||||
import com.linkedin.metadata.Constants;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
public class SchemaFieldMapperTest {
|
||||
|
||||
private static final String TEST_DATASET_URN =
|
||||
"urn:li:dataset:(urn:li:dataPlatform:mysql,my-test,PROD)";
|
||||
private static final String TEST_FIELD_PATH = "field1";
|
||||
private static final String TEST_SCHEMA_FIELD_URN =
|
||||
"urn:li:schemaField:(" + TEST_DATASET_URN + "," + TEST_FIELD_PATH + ")";
|
||||
private static final String TEST_PARENT_URN =
|
||||
"urn:li:dataset:(urn:li:dataPlatform:mysql,my-parent,PROD)";
|
||||
private static final String TEST_ACTOR_URN = "urn:li:corpuser:actor";
|
||||
|
||||
@Test
|
||||
public void testSchemaFieldMapperBasic() throws Exception {
|
||||
// Create basic schema field key
|
||||
Urn schemaFieldUrn = Urn.createFromString(TEST_SCHEMA_FIELD_URN);
|
||||
|
||||
EntityResponse entityResponse = new EntityResponse();
|
||||
entityResponse.setUrn(schemaFieldUrn);
|
||||
entityResponse.setAspects(new EnvelopedAspectMap(new HashMap<>()));
|
||||
|
||||
SchemaFieldEntity result = SchemaFieldMapper.map(null, entityResponse);
|
||||
|
||||
assertNotNull(result);
|
||||
assertEquals(result.getUrn(), TEST_SCHEMA_FIELD_URN);
|
||||
assertEquals(result.getType(), EntityType.SCHEMA_FIELD);
|
||||
assertEquals(result.getFieldPath(), TEST_FIELD_PATH);
|
||||
assertNotNull(result.getParent());
|
||||
assertEquals(result.getParent().getUrn(), TEST_DATASET_URN);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSchemaFieldMapperWithLogicalParent() throws Exception {
|
||||
Urn schemaFieldUrn = Urn.createFromString(TEST_SCHEMA_FIELD_URN);
|
||||
Urn parentUrn = Urn.createFromString(TEST_PARENT_URN);
|
||||
Urn actorUrn = Urn.createFromString(TEST_ACTOR_URN);
|
||||
|
||||
// Create logical parent aspect
|
||||
LogicalParent logicalParent = new LogicalParent();
|
||||
Edge edge = new Edge();
|
||||
edge.setDestinationUrn(parentUrn);
|
||||
edge.setCreated(new AuditStamp().setTime(10L).setActor(actorUrn));
|
||||
edge.setLastModified(new AuditStamp().setTime(20L).setActor(actorUrn));
|
||||
logicalParent.setParent(edge);
|
||||
|
||||
Map<String, EnvelopedAspect> aspects = new HashMap<>();
|
||||
aspects.put(
|
||||
Constants.LOGICAL_PARENT_ASPECT_NAME,
|
||||
new EnvelopedAspect().setValue(new Aspect(logicalParent.data())));
|
||||
|
||||
EntityResponse entityResponse = new EntityResponse();
|
||||
entityResponse.setUrn(schemaFieldUrn);
|
||||
entityResponse.setAspects(new EnvelopedAspectMap(aspects));
|
||||
|
||||
SchemaFieldEntity result = SchemaFieldMapper.map(null, entityResponse);
|
||||
|
||||
assertNotNull(result);
|
||||
assertEquals(result.getUrn(), TEST_SCHEMA_FIELD_URN);
|
||||
assertEquals(result.getType(), EntityType.SCHEMA_FIELD);
|
||||
assertEquals(result.getFieldPath(), TEST_FIELD_PATH);
|
||||
|
||||
// Verify the schema field parent (from URN parsing)
|
||||
assertNotNull(result.getParent());
|
||||
assertEquals(result.getParent().getUrn(), TEST_DATASET_URN);
|
||||
|
||||
// Verify the logical parent aspect
|
||||
assertNotNull(result.getLogicalParent());
|
||||
assertEquals(result.getLogicalParent().getUrn(), TEST_PARENT_URN);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSchemaFieldMapperWithNullLogicalParent() throws Exception {
|
||||
Urn schemaFieldUrn = Urn.createFromString(TEST_SCHEMA_FIELD_URN);
|
||||
|
||||
// Create logical parent aspect with null parent (default)
|
||||
LogicalParent logicalParent = new LogicalParent();
|
||||
// Don't set parent - leave it as default (null)
|
||||
|
||||
Map<String, EnvelopedAspect> aspects = new HashMap<>();
|
||||
aspects.put(
|
||||
Constants.LOGICAL_PARENT_ASPECT_NAME,
|
||||
new EnvelopedAspect().setValue(new Aspect(logicalParent.data())));
|
||||
|
||||
EntityResponse entityResponse = new EntityResponse();
|
||||
entityResponse.setUrn(schemaFieldUrn);
|
||||
entityResponse.setAspects(new EnvelopedAspectMap(aspects));
|
||||
|
||||
SchemaFieldEntity result = SchemaFieldMapper.map(null, entityResponse);
|
||||
|
||||
assertNotNull(result);
|
||||
assertEquals(result.getUrn(), TEST_SCHEMA_FIELD_URN);
|
||||
assertEquals(result.getType(), EntityType.SCHEMA_FIELD);
|
||||
assertEquals(result.getFieldPath(), TEST_FIELD_PATH);
|
||||
|
||||
// Verify the schema field parent (from URN parsing) is still set
|
||||
assertNotNull(result.getParent());
|
||||
assertEquals(result.getParent().getUrn(), TEST_DATASET_URN);
|
||||
|
||||
// Verify the logical parent aspect is null
|
||||
assertNull(result.getLogicalParent());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSchemaFieldMapperWithoutLogicalParent() throws Exception {
|
||||
Urn schemaFieldUrn = Urn.createFromString(TEST_SCHEMA_FIELD_URN);
|
||||
|
||||
EntityResponse entityResponse = new EntityResponse();
|
||||
entityResponse.setUrn(schemaFieldUrn);
|
||||
entityResponse.setAspects(new EnvelopedAspectMap(new HashMap<>()));
|
||||
|
||||
SchemaFieldEntity result = SchemaFieldMapper.map(null, entityResponse);
|
||||
|
||||
assertNotNull(result);
|
||||
assertEquals(result.getUrn(), TEST_SCHEMA_FIELD_URN);
|
||||
assertEquals(result.getType(), EntityType.SCHEMA_FIELD);
|
||||
assertEquals(result.getFieldPath(), TEST_FIELD_PATH);
|
||||
|
||||
// Verify the schema field parent (from URN parsing) is still set
|
||||
assertNotNull(result.getParent());
|
||||
assertEquals(result.getParent().getUrn(), TEST_DATASET_URN);
|
||||
|
||||
// Verify no logical parent aspect
|
||||
assertNull(result.getLogicalParent());
|
||||
}
|
||||
}
|
||||
@ -24,7 +24,7 @@ public class FieldExtractor {
|
||||
|
||||
private FieldExtractor() {}
|
||||
|
||||
private static long getNumArrayWildcards(PathSpec pathSpec) {
|
||||
public static long getNumArrayWildcards(PathSpec pathSpec) {
|
||||
return pathSpec.getPathComponents().stream().filter(ARRAY_WILDCARD::equals).count();
|
||||
}
|
||||
|
||||
|
||||
@ -518,6 +518,9 @@ public class Constants {
|
||||
public static final String VERSION_SET_FIELD_NAME = "versionSet";
|
||||
public static final String VERSION_LABEL_FIELD_NAME = "version";
|
||||
|
||||
// Logical
|
||||
public static final String LOGICAL_PARENT_ASPECT_NAME = "logicalParent";
|
||||
|
||||
public static final String DISPLAY_PROPERTIES_ASPECT_NAME = "displayProperties";
|
||||
|
||||
// Config
|
||||
|
||||
@ -9,6 +9,7 @@ import com.linkedin.common.urn.DatasetUrn;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.common.urn.UrnUtils;
|
||||
import com.linkedin.data.template.RecordTemplate;
|
||||
import com.linkedin.data.template.StringMap;
|
||||
import com.linkedin.dataset.DatasetLineageType;
|
||||
import com.linkedin.dataset.Upstream;
|
||||
import com.linkedin.dataset.UpstreamArray;
|
||||
@ -36,28 +37,26 @@ import org.testng.annotations.Test;
|
||||
public class GraphIndexUtilsTest {
|
||||
|
||||
private static final String UPSTREAM_RELATIONSHIP_PATH = "/upstreams/*/dataset";
|
||||
private static final long DEFAULT_CREATED_TIME = 1L;
|
||||
private static final long CREATED_EVENT_TIME = 123L;
|
||||
private static final long UPDATED_EVENT_TIME_1 = 234L;
|
||||
private static final long UPDATED_EVENT_TIME_2 = 345L;
|
||||
private Urn _datasetUrn;
|
||||
private DatasetUrn _upstreamDataset1;
|
||||
private DatasetUrn _upstreamDataset2;
|
||||
private static final String CREATED_ACTOR_URN = "urn:li:corpuser:creating";
|
||||
private static final String UPDATED_ACTOR_URN = "urn:li:corpuser:updating";
|
||||
private static final Urn DATASET_URN =
|
||||
UrnUtils.getUrn(
|
||||
"urn:li:dataset:(urn:li:dataPlatform:bigquery,my-proj.jaffle_shop.customers,PROD)");
|
||||
private static final DatasetUrn UPSTREAM_DATASET_1 =
|
||||
UrnUtils.toDatasetUrn("snowflake", "test", "DEV");
|
||||
private static final DatasetUrn UPSTREAM_DATASET_2 =
|
||||
UrnUtils.toDatasetUrn("snowflake", "test2", "DEV");
|
||||
private static final Urn QUERY_URN = UrnUtils.getUrn("urn:li:query:queryid");
|
||||
private static final Urn CREATED_ACTOR_URN = UrnUtils.getUrn("urn:li:corpuser:creating");
|
||||
private static final Urn UPDATED_ACTOR_URN = UrnUtils.getUrn("urn:li:corpuser:updating");
|
||||
private static final Urn DATAHUB_ACTOR_URN = UrnUtils.getUrn("urn:li:corpuser:datahub");
|
||||
private static final String DOWNSTREAM_RELATIONSHIP_TYPE = "DownstreamOf";
|
||||
private EntityRegistry _mockRegistry;
|
||||
private Urn _createdActorUrn;
|
||||
private Urn _updatedActorUrn;
|
||||
|
||||
@BeforeMethod
|
||||
public void setupTest() {
|
||||
_createdActorUrn = UrnUtils.getUrn(CREATED_ACTOR_URN);
|
||||
_updatedActorUrn = UrnUtils.getUrn(UPDATED_ACTOR_URN);
|
||||
_datasetUrn =
|
||||
UrnUtils.getUrn(
|
||||
"urn:li:dataset:(urn:li:dataPlatform:bigquery,my-proj.jaffle_shop.customers,PROD)");
|
||||
_upstreamDataset1 = UrnUtils.toDatasetUrn("snowflake", "test", "DEV");
|
||||
_upstreamDataset2 = UrnUtils.toDatasetUrn("snowflake", "test2", "DEV");
|
||||
_mockRegistry = ENTITY_REGISTRY;
|
||||
}
|
||||
|
||||
@ -76,34 +75,36 @@ public class GraphIndexUtilsTest {
|
||||
// check specifically for the upstreams relationship entry
|
||||
if (entry.getKey().getPath().toString().equals(UPSTREAM_RELATIONSHIP_PATH)) {
|
||||
List<Edge> edgesToAdd =
|
||||
GraphIndexUtils.extractGraphEdges(entry, upstreamLineage, _datasetUrn, event, true);
|
||||
GraphIndexUtils.extractGraphEdges(entry, upstreamLineage, DATASET_URN, event, true);
|
||||
List<Edge> expectedEdgesToAdd = new ArrayList<>();
|
||||
// edges contain default created event time and created actor from system metadata
|
||||
Edge edge1 =
|
||||
new Edge(
|
||||
_datasetUrn,
|
||||
_upstreamDataset1,
|
||||
DATASET_URN,
|
||||
UPSTREAM_DATASET_1,
|
||||
entry.getKey().getRelationshipName(),
|
||||
CREATED_EVENT_TIME,
|
||||
_createdActorUrn,
|
||||
CREATED_ACTOR_URN,
|
||||
UPDATED_EVENT_TIME_1,
|
||||
_updatedActorUrn,
|
||||
null);
|
||||
UPDATED_ACTOR_URN,
|
||||
Map.of("foo", "bar"),
|
||||
null,
|
||||
QUERY_URN);
|
||||
Edge edge2 =
|
||||
new Edge(
|
||||
_datasetUrn,
|
||||
_upstreamDataset2,
|
||||
DATASET_URN,
|
||||
UPSTREAM_DATASET_2,
|
||||
entry.getKey().getRelationshipName(),
|
||||
CREATED_EVENT_TIME,
|
||||
_createdActorUrn,
|
||||
DEFAULT_CREATED_TIME,
|
||||
DATAHUB_ACTOR_URN,
|
||||
UPDATED_EVENT_TIME_2,
|
||||
_updatedActorUrn,
|
||||
UPDATED_ACTOR_URN,
|
||||
null);
|
||||
expectedEdgesToAdd.add(edge1);
|
||||
expectedEdgesToAdd.add(edge2);
|
||||
assertEquals(expectedEdgesToAdd.size(), edgesToAdd.size());
|
||||
Assert.assertTrue(edgesToAdd.containsAll(expectedEdgesToAdd));
|
||||
Assert.assertTrue(expectedEdgesToAdd.containsAll(edgesToAdd));
|
||||
Assert.assertTrue(edgesToAdd.contains(edge1));
|
||||
Assert.assertTrue(edgesToAdd.contains(edge2));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -112,30 +113,30 @@ public class GraphIndexUtilsTest {
|
||||
public void testMergeEdges() {
|
||||
final Edge edge1 =
|
||||
new Edge(
|
||||
_datasetUrn,
|
||||
_upstreamDataset1,
|
||||
DATASET_URN,
|
||||
UPSTREAM_DATASET_1,
|
||||
DOWNSTREAM_RELATIONSHIP_TYPE,
|
||||
CREATED_EVENT_TIME,
|
||||
_createdActorUrn,
|
||||
CREATED_ACTOR_URN,
|
||||
UPDATED_EVENT_TIME_1,
|
||||
_updatedActorUrn,
|
||||
UPDATED_ACTOR_URN,
|
||||
Collections.singletonMap("foo", "bar"));
|
||||
final Edge edge2 =
|
||||
new Edge(
|
||||
_datasetUrn,
|
||||
_upstreamDataset1,
|
||||
DATASET_URN,
|
||||
UPSTREAM_DATASET_1,
|
||||
DOWNSTREAM_RELATIONSHIP_TYPE,
|
||||
UPDATED_EVENT_TIME_2,
|
||||
_updatedActorUrn,
|
||||
UPDATED_ACTOR_URN,
|
||||
UPDATED_EVENT_TIME_2,
|
||||
_updatedActorUrn,
|
||||
UPDATED_ACTOR_URN,
|
||||
Collections.singletonMap("foo", "baz"));
|
||||
final Edge edge3 = mergeEdges(edge1, edge2);
|
||||
assertEquals(edge3.getSource(), edge1.getSource());
|
||||
assertEquals(edge3.getDestination(), edge1.getDestination());
|
||||
assertEquals(edge3.getRelationshipType(), edge1.getRelationshipType());
|
||||
assertEquals(edge3.getCreatedOn(), null);
|
||||
assertEquals(edge3.getCreatedActor(), null);
|
||||
assertNull(edge3.getCreatedOn());
|
||||
assertNull(edge3.getCreatedActor());
|
||||
assertEquals(edge3.getUpdatedOn(), edge2.getUpdatedOn());
|
||||
assertEquals(edge3.getUpdatedActor(), edge2.getUpdatedActor());
|
||||
assertEquals(edge3.getProperties(), edge2.getProperties());
|
||||
@ -145,15 +146,19 @@ public class GraphIndexUtilsTest {
|
||||
UpstreamLineage upstreamLineage = new UpstreamLineage();
|
||||
UpstreamArray upstreams = new UpstreamArray();
|
||||
Upstream upstream1 = new Upstream();
|
||||
upstream1.setDataset(_upstreamDataset1);
|
||||
upstream1.setDataset(UPSTREAM_DATASET_1);
|
||||
upstream1.setAuditStamp(
|
||||
new AuditStamp().setActor(_updatedActorUrn).setTime(UPDATED_EVENT_TIME_1));
|
||||
new AuditStamp().setActor(UPDATED_ACTOR_URN).setTime(UPDATED_EVENT_TIME_1));
|
||||
upstream1.setCreated(new AuditStamp().setActor(CREATED_ACTOR_URN).setTime(CREATED_EVENT_TIME));
|
||||
upstream1.setProperties(new StringMap(Map.of("foo", "bar")));
|
||||
upstream1.setQuery(QUERY_URN);
|
||||
upstream1.setType(DatasetLineageType.TRANSFORMED);
|
||||
|
||||
Upstream upstream2 = new Upstream();
|
||||
upstream2.setDataset(_upstreamDataset2);
|
||||
upstream2.setDataset(UPSTREAM_DATASET_2);
|
||||
upstream2.setAuditStamp(
|
||||
new AuditStamp().setActor(_updatedActorUrn).setTime(UPDATED_EVENT_TIME_1));
|
||||
upstream2.setType(DatasetLineageType.TRANSFORMED);
|
||||
new AuditStamp().setActor(UPDATED_ACTOR_URN).setTime(UPDATED_EVENT_TIME_1));
|
||||
upstream2.setType(DatasetLineageType.COPY);
|
||||
upstreams.add(upstream1);
|
||||
upstreams.add(upstream2);
|
||||
upstreamLineage.setUpstreams(upstreams);
|
||||
@ -168,12 +173,12 @@ public class GraphIndexUtilsTest {
|
||||
event.setChangeType(ChangeType.UPSERT);
|
||||
|
||||
event.setAspect(GenericRecordUtils.serializeAspect(aspect));
|
||||
event.setEntityUrn(_datasetUrn);
|
||||
event.setEntityUrn(DATASET_URN);
|
||||
|
||||
SystemMetadata systemMetadata = new SystemMetadata();
|
||||
systemMetadata.setLastObserved(CREATED_EVENT_TIME);
|
||||
event.setSystemMetadata(systemMetadata);
|
||||
event.setCreated(new AuditStamp().setActor(_createdActorUrn).setTime(CREATED_EVENT_TIME));
|
||||
event.setCreated(new AuditStamp().setActor(DATAHUB_ACTOR_URN).setTime(DEFAULT_CREATED_TIME));
|
||||
|
||||
return event;
|
||||
}
|
||||
|
||||
@ -27,5 +27,5 @@ record LogicalParent {
|
||||
"filterNameOverride": "Physical Instance Of"
|
||||
}
|
||||
}
|
||||
parent: Edge
|
||||
parent: optional Edge
|
||||
}
|
||||
|
||||
@ -6,6 +6,7 @@ import com.linkedin.data.schema.PathSpec;
|
||||
import com.linkedin.data.template.RecordTemplate;
|
||||
import com.linkedin.metadata.aspect.models.graph.Edge;
|
||||
import com.linkedin.metadata.models.RelationshipFieldSpec;
|
||||
import com.linkedin.metadata.models.extractor.FieldExtractor;
|
||||
import com.linkedin.mxe.MetadataChangeLog;
|
||||
import com.linkedin.mxe.SystemMetadata;
|
||||
import java.net.URISyntaxException;
|
||||
@ -22,95 +23,33 @@ public class GraphIndexUtils {
|
||||
private GraphIndexUtils() {}
|
||||
|
||||
@Nullable
|
||||
private static List<Urn> getActorList(
|
||||
private static <T> List<T> getList(
|
||||
@Nullable final String path, @Nonnull final RecordTemplate aspect) {
|
||||
if (path == null) {
|
||||
return null;
|
||||
}
|
||||
final PathSpec actorPathSpec = new PathSpec(path.split("/"));
|
||||
final Object value = RecordUtils.getNullableFieldValue(aspect, actorPathSpec);
|
||||
return (List<Urn>) value;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private static List<Long> getTimestampList(
|
||||
@Nullable final String path, @Nonnull final RecordTemplate aspect) {
|
||||
if (path == null) {
|
||||
return null;
|
||||
final PathSpec pathSpec = new PathSpec(path.split("/"));
|
||||
final Object value = RecordUtils.getNullableFieldValue(aspect, pathSpec);
|
||||
if (FieldExtractor.getNumArrayWildcards(pathSpec) > 0) {
|
||||
return (List<T>) value;
|
||||
} else {
|
||||
return value != null ? List.of((T) value) : null;
|
||||
}
|
||||
final PathSpec timestampPathSpec = new PathSpec(path.split("/"));
|
||||
final Object value = RecordUtils.getNullableFieldValue(aspect, timestampPathSpec);
|
||||
return (List<Long>) value;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private static List<Map<String, Object>> getPropertiesList(
|
||||
@Nullable final String path, @Nonnull final RecordTemplate aspect) {
|
||||
if (path == null) {
|
||||
return null;
|
||||
}
|
||||
final PathSpec propertiesPathSpec = new PathSpec(path.split("/"));
|
||||
final Object value = RecordUtils.getNullableFieldValue(aspect, propertiesPathSpec);
|
||||
return (List<Map<String, Object>>) value;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private static List<Urn> getViaList(
|
||||
@Nullable final String path, @Nonnull final RecordTemplate aspect) {
|
||||
if (path == null) {
|
||||
return null;
|
||||
}
|
||||
final PathSpec viaPathSpec = new PathSpec(path.split("/"));
|
||||
final Object value = RecordUtils.getNullableFieldValue(aspect, viaPathSpec);
|
||||
return (List<Urn>) value;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private static boolean isValueListValid(
|
||||
@Nullable final List<?> entryList, final int valueListSize) {
|
||||
if (entryList == null) {
|
||||
return false;
|
||||
}
|
||||
if (valueListSize != entryList.size()) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
return valueListSize == entryList.size();
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private static Long getTimestamp(
|
||||
@Nullable final List<Long> timestampList, final int index, final int valueListSize) {
|
||||
if (isValueListValid(timestampList, valueListSize)) {
|
||||
return timestampList.get(index);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private static Urn getActor(
|
||||
@Nullable final List<Urn> actorList, final int index, final int valueListSize) {
|
||||
if (isValueListValid(actorList, valueListSize)) {
|
||||
return actorList.get(index);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private static Map<String, Object> getProperties(
|
||||
@Nullable final List<Map<String, Object>> propertiesList,
|
||||
final int index,
|
||||
final int valueListSize) {
|
||||
if (isValueListValid(propertiesList, valueListSize)) {
|
||||
return propertiesList.get(index);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private static Urn getVia(
|
||||
@Nullable final List<Urn> viaList, final int index, final int valueListSize) {
|
||||
if (isValueListValid(viaList, valueListSize)) {
|
||||
return viaList.get(index);
|
||||
private static <T> T getValue(
|
||||
@Nullable final List<T> list, final int index, final int valueListSize) {
|
||||
if (isValueListValid(list, valueListSize)) {
|
||||
return list.get(index);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
@ -139,39 +78,39 @@ public class GraphIndexUtils {
|
||||
extractedFieldsEntry.getKey().getRelationshipAnnotation().getProperties();
|
||||
final String viaNodePath = extractedFieldsEntry.getKey().getRelationshipAnnotation().getVia();
|
||||
|
||||
final List<Long> createdOnList = getTimestampList(createdOnPath, aspect);
|
||||
final List<Urn> createdActorList = getActorList(createdActorPath, aspect);
|
||||
final List<Long> updatedOnList = getTimestampList(updatedOnPath, aspect);
|
||||
final List<Urn> updatedActorList = getActorList(updatedActorPath, aspect);
|
||||
final List<Map<String, Object>> propertiesList = getPropertiesList(propertiesPath, aspect);
|
||||
final List<Urn> viaList = getViaList(viaNodePath, aspect);
|
||||
final List<Long> createdOnList = getList(createdOnPath, aspect);
|
||||
final List<Urn> createdActorList = getList(createdActorPath, aspect);
|
||||
final List<Long> updatedOnList = getList(updatedOnPath, aspect);
|
||||
final List<Urn> updatedActorList = getList(updatedActorPath, aspect);
|
||||
final List<Map<String, Object>> propertiesList = getList(propertiesPath, aspect);
|
||||
final List<Urn> viaList = getList(viaNodePath, aspect);
|
||||
|
||||
int index = 0;
|
||||
for (Object fieldValue : extractedFieldsEntry.getValue()) {
|
||||
Long createdOn =
|
||||
createdOnList != null
|
||||
? getTimestamp(createdOnList, index, extractedFieldsEntry.getValue().size())
|
||||
? getValue(createdOnList, index, extractedFieldsEntry.getValue().size())
|
||||
: null;
|
||||
Urn createdActor =
|
||||
createdActorList != null
|
||||
? getActor(createdActorList, index, extractedFieldsEntry.getValue().size())
|
||||
? getValue(createdActorList, index, extractedFieldsEntry.getValue().size())
|
||||
: null;
|
||||
Long updatedOn =
|
||||
updatedOnList != null
|
||||
? getTimestamp(updatedOnList, index, extractedFieldsEntry.getValue().size())
|
||||
? getValue(updatedOnList, index, extractedFieldsEntry.getValue().size())
|
||||
: null;
|
||||
Urn updatedActor =
|
||||
updatedActorList != null
|
||||
? getActor(updatedActorList, index, extractedFieldsEntry.getValue().size())
|
||||
? getValue(updatedActorList, index, extractedFieldsEntry.getValue().size())
|
||||
: null;
|
||||
final Map<String, Object> properties =
|
||||
propertiesList != null
|
||||
? getProperties(propertiesList, index, extractedFieldsEntry.getValue().size())
|
||||
? getValue(propertiesList, index, extractedFieldsEntry.getValue().size())
|
||||
: null;
|
||||
|
||||
Urn viaNode =
|
||||
viaNodePath != null
|
||||
? getVia(viaList, index, extractedFieldsEntry.getValue().size())
|
||||
? getValue(viaList, index, extractedFieldsEntry.getValue().size())
|
||||
: null;
|
||||
|
||||
SystemMetadata systemMetadata;
|
||||
|
||||
@ -0,0 +1,500 @@
|
||||
package com.linkedin.metadata.graph;
|
||||
|
||||
import static org.mockito.Mockito.*;
|
||||
import static org.testng.Assert.*;
|
||||
|
||||
import com.linkedin.common.AuditStamp;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.common.urn.UrnUtils;
|
||||
import com.linkedin.data.template.RecordTemplate;
|
||||
import com.linkedin.dataset.DatasetLineageType;
|
||||
import com.linkedin.dataset.Upstream;
|
||||
import com.linkedin.dataset.UpstreamArray;
|
||||
import com.linkedin.dataset.UpstreamLineage;
|
||||
import com.linkedin.events.metadata.ChangeType;
|
||||
import com.linkedin.metadata.Constants;
|
||||
import com.linkedin.metadata.aspect.models.graph.Edge;
|
||||
import com.linkedin.metadata.models.RelationshipFieldSpec;
|
||||
import com.linkedin.metadata.models.annotation.RelationshipAnnotation;
|
||||
import com.linkedin.metadata.utils.GenericRecordUtils;
|
||||
import com.linkedin.mxe.MetadataChangeLog;
|
||||
import com.linkedin.mxe.SystemMetadata;
|
||||
import java.util.AbstractMap;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.testng.annotations.BeforeMethod;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
public class GraphIndexUtilsTest {
|
||||
|
||||
private static final String DATASET_URN_STRING =
|
||||
"urn:li:dataset:(urn:li:dataPlatform:mysql,test.table,PROD)";
|
||||
private static final String UPSTREAM_URN_STRING =
|
||||
"urn:li:dataset:(urn:li:dataPlatform:mysql,upstream.table,PROD)";
|
||||
private static final String ACTOR_URN_STRING = "urn:li:corpuser:testuser";
|
||||
private static final String RELATIONSHIP_NAME = "DownstreamOf";
|
||||
|
||||
private Urn datasetUrn;
|
||||
private Urn upstreamUrn;
|
||||
private Urn actorUrn;
|
||||
|
||||
@BeforeMethod
|
||||
public void setUp() {
|
||||
datasetUrn = UrnUtils.getUrn(DATASET_URN_STRING);
|
||||
upstreamUrn = UrnUtils.getUrn(UPSTREAM_URN_STRING);
|
||||
actorUrn = UrnUtils.getUrn(ACTOR_URN_STRING);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExtractGraphEdgesBasic() {
|
||||
// Create test data
|
||||
UpstreamLineage upstreamLineage = createBasicUpstreamLineage();
|
||||
MetadataChangeLog event = createBasicEvent(upstreamLineage, true);
|
||||
|
||||
RelationshipFieldSpec relationshipSpec = createBasicRelationshipSpec();
|
||||
List<Object> destinationUrns = Arrays.asList(upstreamUrn);
|
||||
|
||||
Map.Entry<RelationshipFieldSpec, List<Object>> entry =
|
||||
new AbstractMap.SimpleEntry<>(relationshipSpec, destinationUrns);
|
||||
|
||||
// Execute
|
||||
List<Edge> edges =
|
||||
GraphIndexUtils.extractGraphEdges(entry, upstreamLineage, datasetUrn, event, true);
|
||||
|
||||
// Verify
|
||||
assertEquals(edges.size(), 1);
|
||||
Edge edge = edges.get(0);
|
||||
assertEquals(edge.getSource(), datasetUrn);
|
||||
assertEquals(edge.getDestination(), upstreamUrn);
|
||||
assertEquals(edge.getRelationshipType(), RELATIONSHIP_NAME);
|
||||
assertNotNull(edge.getCreatedOn());
|
||||
assertNotNull(edge.getCreatedActor());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExtractGraphEdgesWithMultipleDestinations() {
|
||||
// Create test data with multiple destinations
|
||||
Urn secondUpstreamUrn =
|
||||
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:mysql,upstream2.table,PROD)");
|
||||
|
||||
UpstreamLineage upstreamLineage = createUpstreamLineageWithMultiple();
|
||||
MetadataChangeLog event = createBasicEvent(upstreamLineage, true);
|
||||
|
||||
RelationshipFieldSpec relationshipSpec = createBasicRelationshipSpec();
|
||||
List<Object> destinationUrns = Arrays.asList(upstreamUrn, secondUpstreamUrn);
|
||||
|
||||
Map.Entry<RelationshipFieldSpec, List<Object>> entry =
|
||||
new AbstractMap.SimpleEntry<>(relationshipSpec, destinationUrns);
|
||||
|
||||
// Execute
|
||||
List<Edge> edges =
|
||||
GraphIndexUtils.extractGraphEdges(entry, upstreamLineage, datasetUrn, event, true);
|
||||
|
||||
// Verify
|
||||
assertEquals(edges.size(), 2);
|
||||
assertTrue(edges.stream().anyMatch(edge -> edge.getDestination().equals(upstreamUrn)));
|
||||
assertTrue(edges.stream().anyMatch(edge -> edge.getDestination().equals(secondUpstreamUrn)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExtractGraphEdgesWithSystemMetadataFallback() {
|
||||
// Create test data without audit stamps in the aspect
|
||||
UpstreamLineage upstreamLineage = createUpstreamLineageWithoutAuditStamp();
|
||||
MetadataChangeLog event = createEventWithSystemMetadata(upstreamLineage);
|
||||
|
||||
RelationshipFieldSpec relationshipSpec = createBasicRelationshipSpec();
|
||||
List<Object> destinationUrns = Arrays.asList(upstreamUrn);
|
||||
|
||||
Map.Entry<RelationshipFieldSpec, List<Object>> entry =
|
||||
new AbstractMap.SimpleEntry<>(relationshipSpec, destinationUrns);
|
||||
|
||||
// Execute
|
||||
List<Edge> edges =
|
||||
GraphIndexUtils.extractGraphEdges(entry, upstreamLineage, datasetUrn, event, true);
|
||||
|
||||
// Verify
|
||||
assertEquals(edges.size(), 1);
|
||||
Edge edge = edges.get(0);
|
||||
assertEquals(edge.getCreatedOn(), Long.valueOf(12345L)); // From system metadata
|
||||
assertEquals(edge.getUpdatedOn(), Long.valueOf(12345L)); // From system metadata
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExtractGraphEdgesWithEventCreatedActor() {
|
||||
// Create test data without created actor in aspect
|
||||
UpstreamLineage upstreamLineage = createUpstreamLineageWithoutCreatedActor();
|
||||
MetadataChangeLog event = createEventWithCreatedActor(upstreamLineage);
|
||||
|
||||
RelationshipFieldSpec relationshipSpec = createBasicRelationshipSpec();
|
||||
List<Object> destinationUrns = Arrays.asList(upstreamUrn);
|
||||
|
||||
Map.Entry<RelationshipFieldSpec, List<Object>> entry =
|
||||
new AbstractMap.SimpleEntry<>(relationshipSpec, destinationUrns);
|
||||
|
||||
// Execute
|
||||
List<Edge> edges =
|
||||
GraphIndexUtils.extractGraphEdges(entry, upstreamLineage, datasetUrn, event, true);
|
||||
|
||||
// Verify
|
||||
assertEquals(edges.size(), 1);
|
||||
Edge edge = edges.get(0);
|
||||
assertEquals(edge.getCreatedActor(), actorUrn); // From event
|
||||
assertEquals(edge.getUpdatedActor(), actorUrn); // From event
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExtractGraphEdgesEmptyDestinations() {
|
||||
// Test with empty destination list
|
||||
UpstreamLineage upstreamLineage = createBasicUpstreamLineage();
|
||||
MetadataChangeLog event = createBasicEvent(upstreamLineage, true);
|
||||
|
||||
RelationshipFieldSpec relationshipSpec = createBasicRelationshipSpec();
|
||||
List<Object> destinationUrns = Arrays.asList(); // Empty list
|
||||
|
||||
Map.Entry<RelationshipFieldSpec, List<Object>> entry =
|
||||
new AbstractMap.SimpleEntry<>(relationshipSpec, destinationUrns);
|
||||
|
||||
// Execute
|
||||
List<Edge> edges =
|
||||
GraphIndexUtils.extractGraphEdges(entry, upstreamLineage, datasetUrn, event, true);
|
||||
|
||||
// Verify - should return empty list for empty destinations
|
||||
assertEquals(edges.size(), 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExtractGraphEdgesWithPreviousSystemMetadata() {
|
||||
// Test when isNewAspectVersion is false
|
||||
UpstreamLineage upstreamLineage = createBasicUpstreamLineage();
|
||||
MetadataChangeLog event = createEventWithPreviousSystemMetadata(upstreamLineage);
|
||||
|
||||
RelationshipFieldSpec relationshipSpec = createBasicRelationshipSpec();
|
||||
List<Object> destinationUrns = Arrays.asList(upstreamUrn);
|
||||
|
||||
Map.Entry<RelationshipFieldSpec, List<Object>> entry =
|
||||
new AbstractMap.SimpleEntry<>(relationshipSpec, destinationUrns);
|
||||
|
||||
// Execute with isNewAspectVersion = false
|
||||
List<Edge> edges =
|
||||
GraphIndexUtils.extractGraphEdges(entry, upstreamLineage, datasetUrn, event, false);
|
||||
|
||||
// Verify
|
||||
assertEquals(edges.size(), 1);
|
||||
Edge edge = edges.get(0);
|
||||
// Should use previous system metadata timestamp
|
||||
assertEquals(edge.getCreatedOn(), Long.valueOf(54321L));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExtractGraphEdgesWithProperties() {
|
||||
// Test with null properties path (common case for aspects like UpstreamLineage)
|
||||
UpstreamLineage upstreamLineage = createBasicUpstreamLineage();
|
||||
MetadataChangeLog event = createBasicEvent(upstreamLineage, true);
|
||||
|
||||
RelationshipFieldSpec relationshipSpec = createRelationshipSpecWithProperties();
|
||||
List<Object> destinationUrns = Arrays.asList(upstreamUrn);
|
||||
|
||||
Map.Entry<RelationshipFieldSpec, List<Object>> entry =
|
||||
new AbstractMap.SimpleEntry<>(relationshipSpec, destinationUrns);
|
||||
|
||||
// Execute
|
||||
List<Edge> edges =
|
||||
GraphIndexUtils.extractGraphEdges(entry, upstreamLineage, datasetUrn, event, true);
|
||||
|
||||
// Verify
|
||||
assertEquals(edges.size(), 1);
|
||||
Edge edge = edges.get(0);
|
||||
// Since properties path is null, edge properties should be null
|
||||
assertNull(edge.getProperties());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExtractGraphEdgesWithViaNode() {
|
||||
// Test with null via path (common case for aspects like UpstreamLineage)
|
||||
UpstreamLineage upstreamLineage = createBasicUpstreamLineage();
|
||||
MetadataChangeLog event = createBasicEvent(upstreamLineage, true);
|
||||
|
||||
RelationshipFieldSpec relationshipSpec = createRelationshipSpecWithVia();
|
||||
List<Object> destinationUrns = Arrays.asList(upstreamUrn);
|
||||
|
||||
Map.Entry<RelationshipFieldSpec, List<Object>> entry =
|
||||
new AbstractMap.SimpleEntry<>(relationshipSpec, destinationUrns);
|
||||
|
||||
// Execute
|
||||
List<Edge> edges =
|
||||
GraphIndexUtils.extractGraphEdges(entry, upstreamLineage, datasetUrn, event, true);
|
||||
|
||||
// Verify
|
||||
assertEquals(edges.size(), 1);
|
||||
Edge edge = edges.get(0);
|
||||
// Since via path is null, edge via should be null
|
||||
assertNull(edge.getVia());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeEdgesBasic() {
|
||||
// Create old and new edges
|
||||
Edge oldEdge =
|
||||
new Edge(
|
||||
datasetUrn,
|
||||
upstreamUrn,
|
||||
RELATIONSHIP_NAME,
|
||||
1000L, // createdOn
|
||||
actorUrn, // createdActor
|
||||
2000L, // updatedOn
|
||||
actorUrn, // updatedActor
|
||||
Collections.singletonMap("oldKey", "oldValue"), // properties
|
||||
null, // lifecycleOwner
|
||||
null // via
|
||||
);
|
||||
|
||||
Urn newActorUrn = UrnUtils.getUrn("urn:li:corpuser:newuser");
|
||||
Edge newEdge =
|
||||
new Edge(
|
||||
datasetUrn,
|
||||
upstreamUrn,
|
||||
RELATIONSHIP_NAME,
|
||||
3000L, // createdOn (will be ignored)
|
||||
newActorUrn, // createdActor (will be ignored)
|
||||
4000L, // updatedOn
|
||||
newActorUrn, // updatedActor
|
||||
Collections.singletonMap("newKey", "newValue"), // properties
|
||||
null, // lifecycleOwner
|
||||
null // via
|
||||
);
|
||||
|
||||
// Execute
|
||||
Edge mergedEdge = GraphIndexUtils.mergeEdges(oldEdge, newEdge);
|
||||
|
||||
// Verify
|
||||
assertEquals(mergedEdge.getSource(), oldEdge.getSource());
|
||||
assertEquals(mergedEdge.getDestination(), oldEdge.getDestination());
|
||||
assertEquals(mergedEdge.getRelationshipType(), oldEdge.getRelationshipType());
|
||||
|
||||
// Created fields should be null (not copied from either edge)
|
||||
assertNull(mergedEdge.getCreatedOn());
|
||||
assertNull(mergedEdge.getCreatedActor());
|
||||
|
||||
// Updated fields should come from new edge
|
||||
assertEquals(mergedEdge.getUpdatedOn(), newEdge.getUpdatedOn());
|
||||
assertEquals(mergedEdge.getUpdatedActor(), newEdge.getUpdatedActor());
|
||||
assertEquals(mergedEdge.getProperties(), newEdge.getProperties());
|
||||
|
||||
// Other fields should come from old edge
|
||||
assertEquals(mergedEdge.getLifecycleOwner(), oldEdge.getLifecycleOwner());
|
||||
assertEquals(mergedEdge.getVia(), oldEdge.getVia());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeEdgesWithAllFieldsFromOldEdge() {
|
||||
// Create edges with all optional fields populated
|
||||
Urn lifecycleOwnerUrn = UrnUtils.getUrn("urn:li:corpuser:owner");
|
||||
Urn viaUrn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:mysql,via.table,PROD)");
|
||||
|
||||
Edge oldEdge =
|
||||
new Edge(
|
||||
datasetUrn, // source
|
||||
upstreamUrn, // destination
|
||||
RELATIONSHIP_NAME, // relationshipType
|
||||
1000L, // createdOn
|
||||
actorUrn, // createdActor
|
||||
2000L, // updatedOn
|
||||
actorUrn, // updatedActor
|
||||
Collections.singletonMap("oldKey", "oldValue"), // properties
|
||||
lifecycleOwnerUrn, // lifecycleOwner
|
||||
viaUrn, // via
|
||||
true, // sourceStatus
|
||||
true, // destinationStatus
|
||||
true, // viaStatus
|
||||
true // lifecycleOwnerStatus
|
||||
);
|
||||
|
||||
Edge newEdge =
|
||||
new Edge(
|
||||
datasetUrn,
|
||||
upstreamUrn,
|
||||
RELATIONSHIP_NAME,
|
||||
3000L,
|
||||
UrnUtils.getUrn("urn:li:corpuser:newuser"),
|
||||
4000L,
|
||||
UrnUtils.getUrn("urn:li:corpuser:newuser"),
|
||||
Collections.singletonMap("newKey", "newValue"),
|
||||
null, // Different lifecycle owner (will be ignored)
|
||||
null // Different via (will be ignored)
|
||||
);
|
||||
|
||||
// Execute
|
||||
Edge mergedEdge = GraphIndexUtils.mergeEdges(oldEdge, newEdge);
|
||||
|
||||
// Verify that all old edge fields are preserved
|
||||
assertEquals(mergedEdge.getLifecycleOwner(), oldEdge.getLifecycleOwner());
|
||||
assertEquals(mergedEdge.getVia(), oldEdge.getVia());
|
||||
assertEquals(mergedEdge.getViaStatus(), oldEdge.getViaStatus());
|
||||
assertEquals(mergedEdge.getLifecycleOwnerStatus(), oldEdge.getLifecycleOwnerStatus());
|
||||
assertEquals(mergedEdge.getSourceStatus(), oldEdge.getSourceStatus());
|
||||
assertEquals(mergedEdge.getDestinationStatus(), oldEdge.getDestinationStatus());
|
||||
}
|
||||
|
||||
// Helper methods for creating test data
|
||||
|
||||
private UpstreamLineage createBasicUpstreamLineage() {
|
||||
UpstreamLineage upstreamLineage = new UpstreamLineage();
|
||||
UpstreamArray upstreams = new UpstreamArray();
|
||||
|
||||
Upstream upstream = new Upstream();
|
||||
upstream.setDataset(UrnUtils.toDatasetUrn("mysql", "upstream.table", "PROD"));
|
||||
upstream.setType(DatasetLineageType.TRANSFORMED);
|
||||
upstream.setAuditStamp(new AuditStamp().setTime(12345L).setActor(actorUrn));
|
||||
|
||||
upstreams.add(upstream);
|
||||
upstreamLineage.setUpstreams(upstreams);
|
||||
|
||||
return upstreamLineage;
|
||||
}
|
||||
|
||||
private UpstreamLineage createUpstreamLineageWithMultiple() {
|
||||
UpstreamLineage upstreamLineage = new UpstreamLineage();
|
||||
UpstreamArray upstreams = new UpstreamArray();
|
||||
|
||||
Upstream upstream1 = new Upstream();
|
||||
upstream1.setDataset(UrnUtils.toDatasetUrn("mysql", "upstream.table", "PROD"));
|
||||
upstream1.setType(DatasetLineageType.TRANSFORMED);
|
||||
upstream1.setAuditStamp(new AuditStamp().setTime(12345L).setActor(actorUrn));
|
||||
|
||||
Upstream upstream2 = new Upstream();
|
||||
upstream2.setDataset(UrnUtils.toDatasetUrn("mysql", "upstream2.table", "PROD"));
|
||||
upstream2.setType(DatasetLineageType.TRANSFORMED);
|
||||
upstream2.setAuditStamp(new AuditStamp().setTime(12346L).setActor(actorUrn));
|
||||
|
||||
upstreams.add(upstream1);
|
||||
upstreams.add(upstream2);
|
||||
upstreamLineage.setUpstreams(upstreams);
|
||||
|
||||
return upstreamLineage;
|
||||
}
|
||||
|
||||
private UpstreamLineage createUpstreamLineageWithoutAuditStamp() {
|
||||
UpstreamLineage upstreamLineage = new UpstreamLineage();
|
||||
UpstreamArray upstreams = new UpstreamArray();
|
||||
|
||||
Upstream upstream = new Upstream();
|
||||
upstream.setDataset(UrnUtils.toDatasetUrn("mysql", "upstream.table", "PROD"));
|
||||
upstream.setType(DatasetLineageType.TRANSFORMED);
|
||||
// No audit stamp set
|
||||
|
||||
upstreams.add(upstream);
|
||||
upstreamLineage.setUpstreams(upstreams);
|
||||
|
||||
return upstreamLineage;
|
||||
}
|
||||
|
||||
private UpstreamLineage createUpstreamLineageWithoutCreatedActor() {
|
||||
UpstreamLineage upstreamLineage = new UpstreamLineage();
|
||||
UpstreamArray upstreams = new UpstreamArray();
|
||||
|
||||
Upstream upstream = new Upstream();
|
||||
upstream.setDataset(UrnUtils.toDatasetUrn("mysql", "upstream.table", "PROD"));
|
||||
upstream.setType(DatasetLineageType.TRANSFORMED);
|
||||
upstream.setAuditStamp(new AuditStamp().setTime(12345L)); // No actor
|
||||
|
||||
upstreams.add(upstream);
|
||||
upstreamLineage.setUpstreams(upstreams);
|
||||
|
||||
return upstreamLineage;
|
||||
}
|
||||
|
||||
private MetadataChangeLog createBasicEvent(RecordTemplate aspect, boolean isNew) {
|
||||
MetadataChangeLog event = new MetadataChangeLog();
|
||||
event.setEntityUrn(datasetUrn);
|
||||
event.setEntityType(Constants.DATASET_ENTITY_NAME);
|
||||
event.setAspectName(Constants.UPSTREAM_LINEAGE_ASPECT_NAME);
|
||||
event.setChangeType(ChangeType.UPSERT);
|
||||
event.setAspect(GenericRecordUtils.serializeAspect(aspect));
|
||||
|
||||
// Add system metadata for timestamp fallback
|
||||
SystemMetadata systemMetadata = new SystemMetadata();
|
||||
systemMetadata.setLastObserved(12345L);
|
||||
event.setSystemMetadata(systemMetadata);
|
||||
|
||||
// Add created audit stamp for actor fallback
|
||||
com.linkedin.common.AuditStamp createdStamp = new com.linkedin.common.AuditStamp();
|
||||
createdStamp.setTime(12345L);
|
||||
createdStamp.setActor(actorUrn);
|
||||
event.setCreated(createdStamp);
|
||||
|
||||
return event;
|
||||
}
|
||||
|
||||
private MetadataChangeLog createEventWithSystemMetadata(RecordTemplate aspect) {
|
||||
MetadataChangeLog event = createBasicEvent(aspect, true);
|
||||
|
||||
SystemMetadata systemMetadata = new SystemMetadata();
|
||||
systemMetadata.setLastObserved(12345L);
|
||||
event.setSystemMetadata(systemMetadata);
|
||||
|
||||
return event;
|
||||
}
|
||||
|
||||
private MetadataChangeLog createEventWithPreviousSystemMetadata(RecordTemplate aspect) {
|
||||
MetadataChangeLog event = createBasicEvent(aspect, false);
|
||||
|
||||
SystemMetadata previousSystemMetadata = new SystemMetadata();
|
||||
previousSystemMetadata.setLastObserved(54321L);
|
||||
event.setPreviousSystemMetadata(previousSystemMetadata);
|
||||
|
||||
return event;
|
||||
}
|
||||
|
||||
private MetadataChangeLog createEventWithCreatedActor(RecordTemplate aspect) {
|
||||
MetadataChangeLog event = createBasicEvent(aspect, true);
|
||||
|
||||
AuditStamp createdStamp = new AuditStamp();
|
||||
createdStamp.setTime(12345L);
|
||||
createdStamp.setActor(actorUrn);
|
||||
event.setCreated(createdStamp);
|
||||
|
||||
return event;
|
||||
}
|
||||
|
||||
private RelationshipFieldSpec createBasicRelationshipSpec() {
|
||||
RelationshipFieldSpec spec = mock(RelationshipFieldSpec.class);
|
||||
RelationshipAnnotation annotation = mock(RelationshipAnnotation.class);
|
||||
|
||||
when(spec.getRelationshipName()).thenReturn(RELATIONSHIP_NAME);
|
||||
when(spec.getRelationshipAnnotation()).thenReturn(annotation);
|
||||
|
||||
// Mock annotation methods to return null for optional paths
|
||||
when(annotation.getCreatedOn()).thenReturn(null);
|
||||
when(annotation.getCreatedActor()).thenReturn(null);
|
||||
when(annotation.getUpdatedOn()).thenReturn(null);
|
||||
when(annotation.getUpdatedActor()).thenReturn(null);
|
||||
when(annotation.getProperties()).thenReturn(null);
|
||||
when(annotation.getVia()).thenReturn(null);
|
||||
|
||||
return spec;
|
||||
}
|
||||
|
||||
private RelationshipFieldSpec createRelationshipSpecWithProperties() {
|
||||
RelationshipFieldSpec spec = createBasicRelationshipSpec();
|
||||
RelationshipAnnotation annotation = spec.getRelationshipAnnotation();
|
||||
|
||||
// For UpstreamLineage, there's no properties field, so we test with null path
|
||||
// This tests that the code handles null properties path gracefully
|
||||
when(annotation.getProperties()).thenReturn(null);
|
||||
|
||||
return spec;
|
||||
}
|
||||
|
||||
private RelationshipFieldSpec createRelationshipSpecWithVia() {
|
||||
RelationshipFieldSpec spec = createBasicRelationshipSpec();
|
||||
RelationshipAnnotation annotation = spec.getRelationshipAnnotation();
|
||||
|
||||
// For UpstreamLineage, there's no via field, so we test with null path
|
||||
// This tests that the code handles null via path gracefully
|
||||
when(annotation.getVia()).thenReturn(null);
|
||||
|
||||
return spec;
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user