mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-03 14:16:28 +00:00
feat(graphql) data contract resolvers for graphql (#10618)
This commit is contained in:
parent
b542143901
commit
979ee074a6
@ -0,0 +1,32 @@
|
||||
package com.linkedin.datahub.graphql.resolvers.datacontract;
|
||||
|
||||
import com.datahub.authorization.ConjunctivePrivilegeGroup;
|
||||
import com.datahub.authorization.DisjunctivePrivilegeGroup;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.datahub.graphql.QueryContext;
|
||||
import com.linkedin.datahub.graphql.authorization.AuthorizationUtils;
|
||||
import com.linkedin.metadata.authorization.PoliciesConfig;
|
||||
import javax.annotation.Nonnull;
|
||||
|
||||
public class DataContractUtils {
|
||||
|
||||
public static boolean canEditDataContract(@Nonnull QueryContext context, Urn entityUrn) {
|
||||
final DisjunctivePrivilegeGroup orPrivilegeGroups =
|
||||
new DisjunctivePrivilegeGroup(
|
||||
ImmutableList.of(
|
||||
AuthorizationUtils.ALL_PRIVILEGES_GROUP,
|
||||
new ConjunctivePrivilegeGroup(
|
||||
ImmutableList.of(
|
||||
PoliciesConfig.EDIT_ENTITY_DATA_CONTRACT_PRIVILEGE.getType()))));
|
||||
|
||||
return AuthorizationUtils.isAuthorized(
|
||||
context.getAuthorizer(),
|
||||
context.getActorUrn(),
|
||||
entityUrn.getEntityType(),
|
||||
entityUrn.toString(),
|
||||
orPrivilegeGroups);
|
||||
}
|
||||
|
||||
private DataContractUtils() {}
|
||||
}
|
||||
@ -0,0 +1,96 @@
|
||||
package com.linkedin.datahub.graphql.resolvers.datacontract;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.linkedin.common.EntityRelationship;
|
||||
import com.linkedin.common.EntityRelationships;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.datahub.graphql.QueryContext;
|
||||
import com.linkedin.datahub.graphql.generated.DataContract;
|
||||
import com.linkedin.datahub.graphql.generated.Entity;
|
||||
import com.linkedin.datahub.graphql.types.datacontract.DataContractMapper;
|
||||
import com.linkedin.entity.EntityResponse;
|
||||
import com.linkedin.entity.client.EntityClient;
|
||||
import com.linkedin.metadata.Constants;
|
||||
import com.linkedin.metadata.graph.GraphClient;
|
||||
import com.linkedin.metadata.query.filter.RelationshipDirection;
|
||||
import com.linkedin.r2.RemoteInvocationException;
|
||||
import graphql.schema.DataFetcher;
|
||||
import graphql.schema.DataFetchingEnvironment;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class EntityDataContractResolver implements DataFetcher<CompletableFuture<DataContract>> {
|
||||
static final String CONTRACT_FOR_RELATIONSHIP = "ContractFor";
|
||||
|
||||
private final EntityClient _entityClient;
|
||||
private final GraphClient _graphClient;
|
||||
|
||||
public EntityDataContractResolver(
|
||||
final EntityClient entityClient, final GraphClient graphClient) {
|
||||
_entityClient = Objects.requireNonNull(entityClient, "entityClient must not be null");
|
||||
_graphClient = Objects.requireNonNull(graphClient, "graphClient must not be null");
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<DataContract> get(DataFetchingEnvironment environment) {
|
||||
return CompletableFuture.supplyAsync(
|
||||
() -> {
|
||||
final QueryContext context = environment.getContext();
|
||||
final String entityUrn = ((Entity) environment.getSource()).getUrn();
|
||||
|
||||
try {
|
||||
// Step 1: Fetch the contract associated with the dataset.
|
||||
final EntityRelationships relationships =
|
||||
_graphClient.getRelatedEntities(
|
||||
entityUrn,
|
||||
ImmutableList.of(CONTRACT_FOR_RELATIONSHIP),
|
||||
RelationshipDirection.INCOMING,
|
||||
0,
|
||||
1,
|
||||
context.getActorUrn());
|
||||
|
||||
// If we found multiple contracts for same entity, we have an invalid system state. Log
|
||||
// a warning.
|
||||
if (relationships.getTotal() > 1) {
|
||||
// Someone created 2 contracts for the same entity. Currently we do not handle this in
|
||||
// the UI.
|
||||
log.warn(
|
||||
String.format(
|
||||
"Unexpectedly found multiple contracts (%s) for entity with urn %s! This may lead to inconsistent behavior.",
|
||||
relationships.getRelationships(), entityUrn));
|
||||
}
|
||||
|
||||
final List<Urn> contractUrns =
|
||||
relationships.getRelationships().stream()
|
||||
.map(EntityRelationship::getEntity)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
if (contractUrns.size() >= 1) {
|
||||
final Urn contractUrn = contractUrns.get(0);
|
||||
|
||||
// Step 2: Hydrate the contract entities based on the urns from step 1
|
||||
final EntityResponse entityResponse =
|
||||
_entityClient.getV2(
|
||||
context.getOperationContext(),
|
||||
Constants.DATA_CONTRACT_ENTITY_NAME,
|
||||
contractUrn,
|
||||
null);
|
||||
|
||||
if (entityResponse != null) {
|
||||
// Step 4: Package and return result
|
||||
return DataContractMapper.mapContract(entityResponse);
|
||||
}
|
||||
}
|
||||
// No contract found
|
||||
return null;
|
||||
} catch (URISyntaxException | RemoteInvocationException e) {
|
||||
throw new RuntimeException("Failed to retrieve Assertion Run Events from GMS", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,278 @@
|
||||
package com.linkedin.datahub.graphql.resolvers.datacontract;
|
||||
|
||||
import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;
|
||||
import static com.linkedin.datahub.graphql.resolvers.mutate.MutationUtils.*;
|
||||
|
||||
import com.datahub.authentication.Authentication;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.linkedin.common.EntityRelationships;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.common.urn.UrnUtils;
|
||||
import com.linkedin.datacontract.DataContractProperties;
|
||||
import com.linkedin.datacontract.DataContractState;
|
||||
import com.linkedin.datacontract.DataContractStatus;
|
||||
import com.linkedin.datacontract.DataQualityContract;
|
||||
import com.linkedin.datacontract.DataQualityContractArray;
|
||||
import com.linkedin.datacontract.FreshnessContract;
|
||||
import com.linkedin.datacontract.FreshnessContractArray;
|
||||
import com.linkedin.datacontract.SchemaContract;
|
||||
import com.linkedin.datacontract.SchemaContractArray;
|
||||
import com.linkedin.datahub.graphql.QueryContext;
|
||||
import com.linkedin.datahub.graphql.exception.AuthorizationException;
|
||||
import com.linkedin.datahub.graphql.exception.DataHubGraphQLErrorCode;
|
||||
import com.linkedin.datahub.graphql.exception.DataHubGraphQLException;
|
||||
import com.linkedin.datahub.graphql.generated.DataContract;
|
||||
import com.linkedin.datahub.graphql.generated.DataQualityContractInput;
|
||||
import com.linkedin.datahub.graphql.generated.FreshnessContractInput;
|
||||
import com.linkedin.datahub.graphql.generated.SchemaContractInput;
|
||||
import com.linkedin.datahub.graphql.generated.UpsertDataContractInput;
|
||||
import com.linkedin.datahub.graphql.types.datacontract.DataContractMapper;
|
||||
import com.linkedin.entity.EntityResponse;
|
||||
import com.linkedin.entity.client.EntityClient;
|
||||
import com.linkedin.metadata.Constants;
|
||||
import com.linkedin.metadata.graph.GraphClient;
|
||||
import com.linkedin.metadata.key.DataContractKey;
|
||||
import com.linkedin.metadata.query.filter.RelationshipDirection;
|
||||
import com.linkedin.metadata.utils.EntityKeyUtils;
|
||||
import com.linkedin.mxe.MetadataChangeProposal;
|
||||
import graphql.schema.DataFetcher;
|
||||
import graphql.schema.DataFetchingEnvironment;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class UpsertDataContractResolver implements DataFetcher<CompletableFuture<DataContract>> {
|
||||
|
||||
private static final DataContractState DEFAULT_CONTRACT_STATE = DataContractState.ACTIVE;
|
||||
private static final String CONTRACT_RELATIONSHIP_TYPE = "ContractFor";
|
||||
private final EntityClient _entityClient;
|
||||
private final GraphClient _graphClient;
|
||||
|
||||
public UpsertDataContractResolver(
|
||||
final EntityClient entityClient, final GraphClient graphClient) {
|
||||
_entityClient = Objects.requireNonNull(entityClient, "entityClient cannot be null");
|
||||
_graphClient = Objects.requireNonNull(graphClient, "graphClient cannot be null");
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<DataContract> get(final DataFetchingEnvironment environment)
|
||||
throws Exception {
|
||||
final QueryContext context = environment.getContext();
|
||||
final UpsertDataContractInput input =
|
||||
bindArgument(environment.getArgument("input"), UpsertDataContractInput.class);
|
||||
final Urn entityUrn = UrnUtils.getUrn(input.getEntityUrn());
|
||||
return CompletableFuture.supplyAsync(
|
||||
() -> {
|
||||
if (DataContractUtils.canEditDataContract(context, entityUrn)) {
|
||||
|
||||
// Verify that the provided contract, dataset, assertions all exist as valid entities.
|
||||
validateInput(entityUrn, input, context);
|
||||
|
||||
// First determine if there is an existing data contract
|
||||
final Urn maybeExistingContractUrn =
|
||||
getEntityContractUrn(entityUrn, context.getAuthentication());
|
||||
|
||||
final DataContractProperties newProperties = mapInputToProperties(entityUrn, input);
|
||||
final DataContractStatus newStatus = mapInputToStatus(input);
|
||||
|
||||
final Urn urn =
|
||||
maybeExistingContractUrn != null
|
||||
? maybeExistingContractUrn
|
||||
: EntityKeyUtils.convertEntityKeyToUrn(
|
||||
new DataContractKey()
|
||||
.setId(
|
||||
input.getId() != null
|
||||
? input.getId()
|
||||
: UUID.randomUUID().toString()),
|
||||
Constants.DATA_CONTRACT_ENTITY_NAME);
|
||||
|
||||
final MetadataChangeProposal propertiesProposal =
|
||||
buildMetadataChangeProposalWithUrn(
|
||||
urn, Constants.DATA_CONTRACT_PROPERTIES_ASPECT_NAME, newProperties);
|
||||
|
||||
final MetadataChangeProposal statusProposal =
|
||||
buildMetadataChangeProposalWithUrn(
|
||||
urn, Constants.DATA_CONTRACT_STATUS_ASPECT_NAME, newStatus);
|
||||
|
||||
try {
|
||||
_entityClient.batchIngestProposals(
|
||||
context.getOperationContext(),
|
||||
ImmutableList.of(propertiesProposal, statusProposal),
|
||||
false);
|
||||
|
||||
// Hydrate the contract entities based on the urns from step 1
|
||||
final EntityResponse entityResponse =
|
||||
_entityClient.getV2(
|
||||
context.getOperationContext(),
|
||||
Constants.DATA_CONTRACT_ENTITY_NAME,
|
||||
urn,
|
||||
null);
|
||||
|
||||
// Package and return result
|
||||
return DataContractMapper.mapContract(entityResponse);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(
|
||||
String.format("Failed to perform update against input %s", input.toString()), e);
|
||||
}
|
||||
}
|
||||
throw new AuthorizationException(
|
||||
"Unauthorized to perform this action. Please contact your DataHub administrator.");
|
||||
});
|
||||
}
|
||||
|
||||
private void validateInput(
|
||||
@Nonnull final Urn entityUrn,
|
||||
@Nonnull final UpsertDataContractInput input,
|
||||
@Nonnull final QueryContext context) {
|
||||
try {
|
||||
|
||||
// Validate the target entity exists
|
||||
if (!_entityClient.exists(context.getOperationContext(), entityUrn)) {
|
||||
throw new DataHubGraphQLException(
|
||||
String.format("Provided entity with urn %s does not exist!", entityUrn),
|
||||
DataHubGraphQLErrorCode.BAD_REQUEST);
|
||||
}
|
||||
|
||||
// Verify Freshness assertions
|
||||
if (input.getFreshness() != null) {
|
||||
final List<FreshnessContractInput> freshnessInputs = input.getFreshness();
|
||||
for (FreshnessContractInput freshnessInput : freshnessInputs) {
|
||||
final Urn assertionUrn = UrnUtils.getUrn(freshnessInput.getAssertionUrn());
|
||||
if (!_entityClient.exists(context.getOperationContext(), assertionUrn)) {
|
||||
throw new DataHubGraphQLException(
|
||||
String.format("Provided assertion with urn %s does not exist!", assertionUrn),
|
||||
DataHubGraphQLErrorCode.BAD_REQUEST);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Verify Schema assertions
|
||||
if (input.getSchema() != null) {
|
||||
final List<SchemaContractInput> schemaInputs = input.getSchema();
|
||||
for (SchemaContractInput schemaInput : schemaInputs) {
|
||||
final Urn assertionUrn = UrnUtils.getUrn(schemaInput.getAssertionUrn());
|
||||
if (!_entityClient.exists(context.getOperationContext(), assertionUrn)) {
|
||||
throw new DataHubGraphQLException(
|
||||
String.format("Provided assertion with urn %s does not exist!", assertionUrn),
|
||||
DataHubGraphQLErrorCode.BAD_REQUEST);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Verify DQ assertions
|
||||
if (input.getDataQuality() != null) {
|
||||
final List<DataQualityContractInput> dqInputs = input.getDataQuality();
|
||||
for (DataQualityContractInput dqInput : dqInputs) {
|
||||
final Urn assertionUrn = UrnUtils.getUrn(dqInput.getAssertionUrn());
|
||||
if (!_entityClient.exists(context.getOperationContext(), assertionUrn)) {
|
||||
throw new DataHubGraphQLException(
|
||||
String.format("Provided assertion with urn %s does not exist!", assertionUrn),
|
||||
DataHubGraphQLErrorCode.BAD_REQUEST);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
if (e instanceof DataHubGraphQLException) {
|
||||
throw (DataHubGraphQLException) e;
|
||||
} else {
|
||||
log.error(
|
||||
"Failed to validate inputs provided when upserting data contract! Failing the create.",
|
||||
e);
|
||||
throw new DataHubGraphQLException(
|
||||
"Failed to verify inputs. An unknown error occurred!",
|
||||
DataHubGraphQLErrorCode.SERVER_ERROR);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private Urn getEntityContractUrn(@Nonnull Urn entityUrn, @Nonnull Authentication authentication) {
|
||||
EntityRelationships relationships =
|
||||
_graphClient.getRelatedEntities(
|
||||
entityUrn.toString(),
|
||||
ImmutableList.of(CONTRACT_RELATIONSHIP_TYPE),
|
||||
RelationshipDirection.INCOMING,
|
||||
0,
|
||||
1,
|
||||
authentication.getActor().toUrnStr());
|
||||
|
||||
if (relationships.getTotal() > 1) {
|
||||
// Bad state - There are multiple contracts for a single entity! Cannot update.
|
||||
log.warn(
|
||||
String.format(
|
||||
"Unexpectedly found multiple contracts (%s) for entity with urn %s! This may lead to inconsistent behavior.",
|
||||
relationships.getRelationships(), entityUrn));
|
||||
}
|
||||
|
||||
if (relationships.getRelationships().size() == 1) {
|
||||
return relationships.getRelationships().get(0).getEntity();
|
||||
}
|
||||
// No Contract Found
|
||||
return null;
|
||||
}
|
||||
|
||||
private DataContractProperties mapInputToProperties(
|
||||
@Nonnull final Urn entityUrn, @Nonnull final UpsertDataContractInput input) {
|
||||
final DataContractProperties result = new DataContractProperties();
|
||||
result.setEntity(entityUrn);
|
||||
|
||||
// Construct the dataset contract.
|
||||
if (input.getFreshness() != null) {
|
||||
result.setFreshness(
|
||||
new FreshnessContractArray(
|
||||
input.getFreshness().stream()
|
||||
.map(this::mapFreshnessInput)
|
||||
.collect(Collectors.toList())));
|
||||
}
|
||||
|
||||
if (input.getSchema() != null) {
|
||||
result.setSchema(
|
||||
new SchemaContractArray(
|
||||
input.getSchema().stream().map(this::mapSchemaInput).collect(Collectors.toList())));
|
||||
}
|
||||
|
||||
if (input.getDataQuality() != null) {
|
||||
result.setDataQuality(
|
||||
new DataQualityContractArray(
|
||||
input.getDataQuality().stream()
|
||||
.map(this::mapDataQualityInput)
|
||||
.collect(Collectors.toList())));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private DataContractStatus mapInputToStatus(@Nonnull final UpsertDataContractInput input) {
|
||||
final DataContractStatus result = new DataContractStatus();
|
||||
if (input.getState() != null) {
|
||||
result.setState(DataContractState.valueOf(input.getState().toString()));
|
||||
} else {
|
||||
result.setState(DEFAULT_CONTRACT_STATE);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private FreshnessContract mapFreshnessInput(@Nonnull final FreshnessContractInput input) {
|
||||
final FreshnessContract result = new FreshnessContract();
|
||||
result.setAssertion(UrnUtils.getUrn(input.getAssertionUrn()));
|
||||
return result;
|
||||
}
|
||||
|
||||
private SchemaContract mapSchemaInput(@Nonnull final SchemaContractInput input) {
|
||||
final SchemaContract result = new SchemaContract();
|
||||
result.setAssertion(UrnUtils.getUrn(input.getAssertionUrn()));
|
||||
return result;
|
||||
}
|
||||
|
||||
private DataQualityContract mapDataQualityInput(@Nonnull final DataQualityContractInput input) {
|
||||
final DataQualityContract result = new DataQualityContract();
|
||||
result.setAssertion(UrnUtils.getUrn(input.getAssertionUrn()));
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,112 @@
|
||||
package com.linkedin.datahub.graphql.types.datacontract;
|
||||
|
||||
import com.linkedin.datahub.graphql.generated.Assertion;
|
||||
import com.linkedin.datahub.graphql.generated.DataContract;
|
||||
import com.linkedin.datahub.graphql.generated.DataContractProperties;
|
||||
import com.linkedin.datahub.graphql.generated.DataContractState;
|
||||
import com.linkedin.datahub.graphql.generated.DataContractStatus;
|
||||
import com.linkedin.datahub.graphql.generated.DataQualityContract;
|
||||
import com.linkedin.datahub.graphql.generated.EntityType;
|
||||
import com.linkedin.datahub.graphql.generated.FreshnessContract;
|
||||
import com.linkedin.datahub.graphql.generated.SchemaContract;
|
||||
import com.linkedin.entity.EntityResponse;
|
||||
import com.linkedin.entity.EnvelopedAspect;
|
||||
import com.linkedin.entity.EnvelopedAspectMap;
|
||||
import com.linkedin.metadata.Constants;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.annotation.Nonnull;
|
||||
|
||||
public class DataContractMapper {
|
||||
|
||||
public static DataContract mapContract(@Nonnull final EntityResponse entityResponse) {
|
||||
final DataContract result = new DataContract();
|
||||
final EnvelopedAspectMap aspects = entityResponse.getAspects();
|
||||
|
||||
result.setUrn(entityResponse.getUrn().toString());
|
||||
result.setType(EntityType.DATA_CONTRACT);
|
||||
|
||||
final EnvelopedAspect dataContractProperties =
|
||||
aspects.get(Constants.DATA_CONTRACT_PROPERTIES_ASPECT_NAME);
|
||||
if (dataContractProperties != null) {
|
||||
result.setProperties(
|
||||
mapProperties(
|
||||
new com.linkedin.datacontract.DataContractProperties(
|
||||
dataContractProperties.getValue().data())));
|
||||
} else {
|
||||
throw new RuntimeException(
|
||||
String.format("Data Contract does not exist!. urn: %s", entityResponse.getUrn()));
|
||||
}
|
||||
|
||||
final EnvelopedAspect dataContractStatus =
|
||||
aspects.get(Constants.DATA_CONTRACT_STATUS_ASPECT_NAME);
|
||||
if (dataContractStatus != null) {
|
||||
result.setStatus(
|
||||
mapStatus(
|
||||
new com.linkedin.datacontract.DataContractStatus(
|
||||
dataContractStatus.getValue().data())));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private static DataContractProperties mapProperties(
|
||||
final com.linkedin.datacontract.DataContractProperties properties) {
|
||||
final DataContractProperties result = new DataContractProperties();
|
||||
result.setEntityUrn(properties.getEntity().toString());
|
||||
if (properties.hasSchema()) {
|
||||
result.setSchema(
|
||||
properties.getSchema().stream()
|
||||
.map(DataContractMapper::mapSchemaContract)
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
if (properties.hasFreshness()) {
|
||||
result.setFreshness(
|
||||
properties.getFreshness().stream()
|
||||
.map(DataContractMapper::mapFreshnessContract)
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
if (properties.hasDataQuality()) {
|
||||
result.setDataQuality(
|
||||
properties.getDataQuality().stream()
|
||||
.map(DataContractMapper::mapDataQualityContract)
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private static DataContractStatus mapStatus(
|
||||
final com.linkedin.datacontract.DataContractStatus status) {
|
||||
final DataContractStatus result = new DataContractStatus();
|
||||
result.setState(DataContractState.valueOf(status.getState().toString()));
|
||||
return result;
|
||||
}
|
||||
|
||||
private static SchemaContract mapSchemaContract(
|
||||
final com.linkedin.datacontract.SchemaContract schemaContract) {
|
||||
final SchemaContract result = new SchemaContract();
|
||||
final Assertion partialAssertion = new Assertion();
|
||||
partialAssertion.setUrn(schemaContract.getAssertion().toString());
|
||||
result.setAssertion(partialAssertion);
|
||||
return result;
|
||||
}
|
||||
|
||||
private static FreshnessContract mapFreshnessContract(
|
||||
final com.linkedin.datacontract.FreshnessContract freshnessContract) {
|
||||
final FreshnessContract result = new FreshnessContract();
|
||||
final Assertion partialAssertion = new Assertion();
|
||||
partialAssertion.setUrn(freshnessContract.getAssertion().toString());
|
||||
result.setAssertion(partialAssertion);
|
||||
return result;
|
||||
}
|
||||
|
||||
private static DataQualityContract mapDataQualityContract(
|
||||
final com.linkedin.datacontract.DataQualityContract qualityContract) {
|
||||
final DataQualityContract result = new DataQualityContract();
|
||||
final Assertion partialAssertion = new Assertion();
|
||||
partialAssertion.setUrn(qualityContract.getAssertion().toString());
|
||||
result.setAssertion(partialAssertion);
|
||||
return result;
|
||||
}
|
||||
|
||||
private DataContractMapper() {}
|
||||
}
|
||||
@ -0,0 +1,84 @@
|
||||
package com.linkedin.datahub.graphql.types.datacontract;
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.common.urn.UrnUtils;
|
||||
import com.linkedin.datahub.graphql.QueryContext;
|
||||
import com.linkedin.datahub.graphql.generated.DataContract;
|
||||
import com.linkedin.datahub.graphql.generated.Entity;
|
||||
import com.linkedin.datahub.graphql.generated.EntityType;
|
||||
import com.linkedin.entity.EntityResponse;
|
||||
import com.linkedin.entity.client.EntityClient;
|
||||
import com.linkedin.metadata.Constants;
|
||||
import graphql.execution.DataFetcherResult;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.annotation.Nonnull;
|
||||
|
||||
public class DataContractType
|
||||
implements com.linkedin.datahub.graphql.types.EntityType<DataContract, String> {
|
||||
|
||||
static final Set<String> ASPECTS_TO_FETCH =
|
||||
ImmutableSet.of(
|
||||
Constants.DATA_CONTRACT_KEY_ASPECT_NAME,
|
||||
Constants.DATA_CONTRACT_PROPERTIES_ASPECT_NAME,
|
||||
Constants.DATA_CONTRACT_STATUS_ASPECT_NAME);
|
||||
private final EntityClient _entityClient;
|
||||
|
||||
public DataContractType(final EntityClient entityClient) {
|
||||
_entityClient = Objects.requireNonNull(entityClient, "entityClient must not be null");
|
||||
}
|
||||
|
||||
@Override
|
||||
public EntityType type() {
|
||||
return EntityType.DATA_CONTRACT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Function<Entity, String> getKeyProvider() {
|
||||
return Entity::getUrn;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<DataContract> objectClass() {
|
||||
return DataContract.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<DataFetcherResult<DataContract>> batchLoad(
|
||||
@Nonnull List<String> urns, @Nonnull QueryContext context) throws Exception {
|
||||
final List<Urn> dataContractUrns =
|
||||
urns.stream().map(UrnUtils::getUrn).collect(Collectors.toList());
|
||||
|
||||
try {
|
||||
final Map<Urn, EntityResponse> entities =
|
||||
_entityClient.batchGetV2(
|
||||
context.getOperationContext(),
|
||||
Constants.DATA_CONTRACT_ENTITY_NAME,
|
||||
new HashSet<>(dataContractUrns),
|
||||
ASPECTS_TO_FETCH);
|
||||
|
||||
final List<EntityResponse> gmsResults = new ArrayList<>();
|
||||
for (Urn urn : dataContractUrns) {
|
||||
gmsResults.add(entities.getOrDefault(urn, null));
|
||||
}
|
||||
return gmsResults.stream()
|
||||
.map(
|
||||
gmsResult ->
|
||||
gmsResult == null
|
||||
? null
|
||||
: DataFetcherResult.<DataContract>newResult()
|
||||
.data(DataContractMapper.mapContract(gmsResult))
|
||||
.build())
|
||||
.collect(Collectors.toList());
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to batch load Data Contracts", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
183
datahub-graphql-core/src/main/resources/contract.graphql
Normal file
183
datahub-graphql-core/src/main/resources/contract.graphql
Normal file
@ -0,0 +1,183 @@
|
||||
extend type Mutation {
|
||||
"""
|
||||
Create or update a data contract for a given dataset. Requires the "Edit Data Contract" privilege for the provided dataset.
|
||||
"""
|
||||
upsertDataContract(urn: String, input: UpsertDataContractInput!): DataContract!
|
||||
}
|
||||
|
||||
extend type Dataset {
|
||||
"""
|
||||
An optional Data Contract defined for the Dataset.
|
||||
"""
|
||||
contract: DataContract
|
||||
}
|
||||
|
||||
"""
|
||||
A Data Contract Entity. A Data Contract is a verifiable group of assertions regarding various aspects of the data: its freshness (sla),
|
||||
schema, and data quality or validity. This group of assertions represents a data owner's commitment to producing data that confirms to the agreed
|
||||
upon contract. Each dataset can have a single contract. The contract can be in a "passing" or "violating" state, depending
|
||||
on whether the assertions that compose the contract are passing or failing.
|
||||
Note that the data contract entity is currently in early preview (beta).
|
||||
"""
|
||||
type DataContract implements Entity {
|
||||
"""
|
||||
A primary key of the data contract
|
||||
"""
|
||||
urn: String!
|
||||
|
||||
"""
|
||||
The standard entity type
|
||||
"""
|
||||
type: EntityType!
|
||||
|
||||
"""
|
||||
Properties describing the data contract
|
||||
"""
|
||||
properties: DataContractProperties
|
||||
|
||||
"""
|
||||
The status of the data contract
|
||||
"""
|
||||
status: DataContractStatus
|
||||
|
||||
"""
|
||||
List of relationships between the source Entity and some destination entities with a given types
|
||||
"""
|
||||
relationships(input: RelationshipsInput!): EntityRelationshipsResult
|
||||
}
|
||||
|
||||
type DataContractProperties {
|
||||
"""
|
||||
The urn of the related entity, e.g. the Dataset today. In the future, we may support additional contract entities.
|
||||
"""
|
||||
entityUrn: String!
|
||||
|
||||
"""
|
||||
The Freshness (SLA) portion of the contract.
|
||||
As of today, it is expected that there will not be more than 1 Freshness contract. If there are, only the first will be displayed.
|
||||
"""
|
||||
freshness: [FreshnessContract!]
|
||||
|
||||
"""
|
||||
The schema / structural portion of the contract.
|
||||
As of today, it is expected that there will not be more than 1 Schema contract. If there are, only the first will be displayed.
|
||||
"""
|
||||
schema: [SchemaContract!]
|
||||
|
||||
"""
|
||||
A set of data quality related contracts, e.g. table and column-level contract constraints.
|
||||
"""
|
||||
dataQuality: [DataQualityContract!]
|
||||
}
|
||||
|
||||
"""
|
||||
The state of the data contract
|
||||
"""
|
||||
enum DataContractState {
|
||||
"""
|
||||
The data contract is active.
|
||||
"""
|
||||
ACTIVE
|
||||
|
||||
"""
|
||||
The data contract is pending. Note that this symbol is currently experimental.
|
||||
"""
|
||||
PENDING
|
||||
}
|
||||
|
||||
type DataContractStatus {
|
||||
"""
|
||||
The state of the data contract
|
||||
"""
|
||||
state: DataContractState!
|
||||
}
|
||||
|
||||
type DataQualityContract {
|
||||
"""
|
||||
The assertion representing the schema contract.
|
||||
"""
|
||||
assertion: Assertion!
|
||||
}
|
||||
|
||||
type SchemaContract {
|
||||
"""
|
||||
The assertion representing the schema contract.
|
||||
"""
|
||||
assertion: Assertion!
|
||||
}
|
||||
|
||||
type FreshnessContract {
|
||||
"""
|
||||
The assertion representing the Freshness contract.
|
||||
"""
|
||||
assertion: Assertion!
|
||||
}
|
||||
|
||||
"""
|
||||
Input required to upsert a Data Contract entity for an asset
|
||||
"""
|
||||
input UpsertDataContractInput {
|
||||
"""
|
||||
The urn of the related entity. Dataset is the only entity type supported today.
|
||||
"""
|
||||
entityUrn: String!
|
||||
|
||||
"""
|
||||
The Freshness / Freshness portion of the contract. If not provided, this will be set to none.
|
||||
For Dataset Contracts, it is expected that there will not be more than 1 Freshness contract. If there are, only the first will be displayed.
|
||||
"""
|
||||
freshness: [FreshnessContractInput!]
|
||||
|
||||
"""
|
||||
The schema / structural portion of the contract. If not provided, this will be set to none.
|
||||
For Dataset Contracts, it is expected that there will not be more than 1 Schema contract. If there are, only the first will be displayed.
|
||||
"""
|
||||
schema: [SchemaContractInput!]
|
||||
|
||||
"""
|
||||
The data quality portion of the contract. If not provided, this will be set to none.
|
||||
"""
|
||||
dataQuality: [DataQualityContractInput!]
|
||||
|
||||
"""
|
||||
The state of the data contract. If not provided, it will be in ACTIVE mode by default.
|
||||
"""
|
||||
state: DataContractState
|
||||
|
||||
"""
|
||||
Optional ID of the contract you want to create. Only applicable if this is a create operation. If not provided, a random
|
||||
id will be generated for you.
|
||||
"""
|
||||
id: String
|
||||
}
|
||||
|
||||
"""
|
||||
Input required to create an Freshness contract
|
||||
"""
|
||||
input FreshnessContractInput {
|
||||
"""
|
||||
The assertion monitoring this part of the data contract. Assertion must be of type Freshness.
|
||||
"""
|
||||
assertionUrn: String!
|
||||
}
|
||||
|
||||
"""
|
||||
Input required to create a schema contract
|
||||
"""
|
||||
input SchemaContractInput {
|
||||
"""
|
||||
The assertion monitoring this part of the data contract. Assertion must be of type Data Schema.
|
||||
"""
|
||||
assertionUrn: String!
|
||||
}
|
||||
|
||||
"""
|
||||
Input required to create a data quality contract
|
||||
"""
|
||||
input DataQualityContractInput {
|
||||
"""
|
||||
The assertion monitoring this part of the data contract. Assertion must be of type Dataset, Volume, Field / Column, or Custom SQL.
|
||||
"""
|
||||
assertionUrn: String!
|
||||
}
|
||||
|
||||
@ -1158,6 +1158,11 @@ enum EntityType {
|
||||
"""
|
||||
ROLE
|
||||
|
||||
"""
|
||||
A data contract
|
||||
"""
|
||||
DATA_CONTRACT
|
||||
|
||||
""""
|
||||
An structured property on entities
|
||||
"""
|
||||
|
||||
@ -0,0 +1,63 @@
|
||||
package com.linkedin.datahub.graphql.resolvers.datacontract;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import com.datahub.authentication.Actor;
|
||||
import com.datahub.authentication.ActorType;
|
||||
import com.datahub.authentication.Authentication;
|
||||
import com.datahub.authorization.AuthorizationRequest;
|
||||
import com.datahub.authorization.AuthorizationResult;
|
||||
import com.datahub.authorization.EntitySpec;
|
||||
import com.datahub.plugins.auth.authorization.Authorizer;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.common.urn.UrnUtils;
|
||||
import com.linkedin.datahub.graphql.QueryContext;
|
||||
import graphql.Assert;
|
||||
import io.datahubproject.metadata.context.OperationContext;
|
||||
import io.datahubproject.test.metadata.context.TestOperationContexts;
|
||||
import java.util.Optional;
|
||||
import org.mockito.Mockito;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
public class DataContractUtilsTest {
|
||||
|
||||
@Test
|
||||
public void testCanEditDataContract() {
|
||||
Urn testUrn = UrnUtils.getUrn("urn:li:dataContract:test");
|
||||
boolean result =
|
||||
DataContractUtils.canEditDataContract(
|
||||
new QueryContext() {
|
||||
@Override
|
||||
public boolean isAuthenticated() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Authentication getAuthentication() {
|
||||
Authentication auth = new Authentication(new Actor(ActorType.USER, "test"), "TEST");
|
||||
return auth;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Authorizer getAuthorizer() {
|
||||
Authorizer authorizer = mock(Authorizer.class);
|
||||
Mockito.when(authorizer.authorize(Mockito.any(AuthorizationRequest.class)))
|
||||
.thenReturn(
|
||||
new AuthorizationResult(
|
||||
new AuthorizationRequest(
|
||||
"TEST", "test", Optional.of(new EntitySpec("dataset", "test"))),
|
||||
AuthorizationResult.Type.ALLOW,
|
||||
"TEST"));
|
||||
return authorizer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OperationContext getOperationContext() {
|
||||
return TestOperationContexts.userContextNoSearchAuthorization(
|
||||
getAuthorizer(), getAuthentication());
|
||||
}
|
||||
},
|
||||
testUrn);
|
||||
Assert.assertTrue(result);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,206 @@
|
||||
package com.linkedin.datahub.graphql.resolvers.datacontract;
|
||||
|
||||
import static com.linkedin.datahub.graphql.resolvers.datacontract.EntityDataContractResolver.*;
|
||||
import static org.mockito.ArgumentMatchers.nullable;
|
||||
import static org.testng.Assert.*;
|
||||
|
||||
import com.datahub.authentication.Authentication;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.linkedin.common.AuditStamp;
|
||||
import com.linkedin.common.EntityRelationship;
|
||||
import com.linkedin.common.EntityRelationshipArray;
|
||||
import com.linkedin.common.EntityRelationships;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.common.urn.UrnUtils;
|
||||
import com.linkedin.datacontract.DataContractProperties;
|
||||
import com.linkedin.datacontract.DataContractState;
|
||||
import com.linkedin.datacontract.DataContractStatus;
|
||||
import com.linkedin.datacontract.DataQualityContract;
|
||||
import com.linkedin.datacontract.DataQualityContractArray;
|
||||
import com.linkedin.datacontract.FreshnessContract;
|
||||
import com.linkedin.datacontract.FreshnessContractArray;
|
||||
import com.linkedin.datacontract.SchemaContract;
|
||||
import com.linkedin.datacontract.SchemaContractArray;
|
||||
import com.linkedin.datahub.graphql.QueryContext;
|
||||
import com.linkedin.datahub.graphql.generated.DataContract;
|
||||
import com.linkedin.datahub.graphql.generated.Dataset;
|
||||
import com.linkedin.datahub.graphql.generated.EntityType;
|
||||
import com.linkedin.entity.Aspect;
|
||||
import com.linkedin.entity.EntityResponse;
|
||||
import com.linkedin.entity.EnvelopedAspect;
|
||||
import com.linkedin.entity.EnvelopedAspectMap;
|
||||
import com.linkedin.entity.client.EntityClient;
|
||||
import com.linkedin.metadata.Constants;
|
||||
import com.linkedin.metadata.graph.GraphClient;
|
||||
import com.linkedin.metadata.key.DataContractKey;
|
||||
import com.linkedin.metadata.query.filter.RelationshipDirection;
|
||||
import graphql.schema.DataFetchingEnvironment;
|
||||
import io.datahubproject.metadata.context.OperationContext;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import org.mockito.Mockito;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
public class EntityDataContractResolverTest {
|
||||
|
||||
private static final Urn TEST_DATASET_URN =
|
||||
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:snowflake,test,PROD)");
|
||||
private static final Urn TEST_DATA_CONTRACT_URN = UrnUtils.getUrn("urn:li:dataContract:test");
|
||||
private static final Urn TEST_QUALITY_ASSERTION_URN = UrnUtils.getUrn("urn:li:assertion:quality");
|
||||
private static final Urn TEST_FRESHNESS_ASSERTION_URN =
|
||||
UrnUtils.getUrn("urn:li:assertion:freshness");
|
||||
private static final Urn TEST_SCHEMA_ASSERTION_URN = UrnUtils.getUrn("urn:li:assertion:schema");
|
||||
|
||||
@Test
|
||||
public void testGetSuccessOneContract() throws Exception {
|
||||
GraphClient mockGraphClient = Mockito.mock(GraphClient.class);
|
||||
EntityClient mockClient = Mockito.mock(EntityClient.class);
|
||||
|
||||
Mockito.when(
|
||||
mockGraphClient.getRelatedEntities(
|
||||
Mockito.eq(TEST_DATASET_URN.toString()),
|
||||
Mockito.eq(ImmutableList.of(CONTRACT_FOR_RELATIONSHIP)),
|
||||
Mockito.eq(RelationshipDirection.INCOMING),
|
||||
Mockito.eq(0),
|
||||
Mockito.eq(1),
|
||||
Mockito.anyString()))
|
||||
.thenReturn(
|
||||
new EntityRelationships()
|
||||
.setTotal(1)
|
||||
.setCount(1)
|
||||
.setStart(0)
|
||||
.setRelationships(
|
||||
new EntityRelationshipArray(
|
||||
ImmutableList.of(
|
||||
new EntityRelationship()
|
||||
.setType(CONTRACT_FOR_RELATIONSHIP)
|
||||
.setEntity(TEST_DATA_CONTRACT_URN)
|
||||
.setCreated(
|
||||
new AuditStamp()
|
||||
.setActor(UrnUtils.getUrn("urn:li:corpuser:test"))
|
||||
.setTime(0L))))));
|
||||
|
||||
Map<String, EnvelopedAspect> dataContractAspects = new HashMap<>();
|
||||
|
||||
// 1. Key Aspect
|
||||
dataContractAspects.put(
|
||||
Constants.DATA_CONTRACT_KEY_ASPECT_NAME,
|
||||
new com.linkedin.entity.EnvelopedAspect()
|
||||
.setValue(new Aspect(new DataContractKey().setId("test").data())));
|
||||
|
||||
// 2. Properties Aspect.
|
||||
DataContractProperties expectedProperties =
|
||||
new DataContractProperties()
|
||||
.setEntity(TEST_DATASET_URN)
|
||||
.setDataQuality(
|
||||
new DataQualityContractArray(
|
||||
ImmutableList.of(
|
||||
new DataQualityContract().setAssertion(TEST_QUALITY_ASSERTION_URN))))
|
||||
.setFreshness(
|
||||
new FreshnessContractArray(
|
||||
ImmutableList.of(
|
||||
new FreshnessContract().setAssertion(TEST_FRESHNESS_ASSERTION_URN))))
|
||||
.setSchema(
|
||||
new SchemaContractArray(
|
||||
ImmutableList.of(
|
||||
new SchemaContract().setAssertion(TEST_SCHEMA_ASSERTION_URN))));
|
||||
|
||||
dataContractAspects.put(
|
||||
Constants.DATA_CONTRACT_PROPERTIES_ASPECT_NAME,
|
||||
new com.linkedin.entity.EnvelopedAspect().setValue(new Aspect(expectedProperties.data())));
|
||||
|
||||
// 3. Status Aspect
|
||||
DataContractStatus expectedStatus = new DataContractStatus().setState(DataContractState.ACTIVE);
|
||||
|
||||
dataContractAspects.put(
|
||||
Constants.DATA_CONTRACT_STATUS_ASPECT_NAME,
|
||||
new com.linkedin.entity.EnvelopedAspect().setValue(new Aspect(expectedStatus.data())));
|
||||
|
||||
Mockito.when(
|
||||
mockClient.getV2(
|
||||
nullable(OperationContext.class),
|
||||
Mockito.eq(Constants.DATA_CONTRACT_ENTITY_NAME),
|
||||
Mockito.eq(TEST_DATA_CONTRACT_URN),
|
||||
Mockito.eq(null)))
|
||||
.thenReturn(
|
||||
new EntityResponse()
|
||||
.setEntityName(Constants.DATA_CONTRACT_ENTITY_NAME)
|
||||
.setUrn(TEST_DATA_CONTRACT_URN)
|
||||
.setAspects(new EnvelopedAspectMap(dataContractAspects)));
|
||||
|
||||
// Execute resolver
|
||||
QueryContext mockContext = Mockito.mock(QueryContext.class);
|
||||
Mockito.when(mockContext.getAuthentication()).thenReturn(Mockito.mock(Authentication.class));
|
||||
Mockito.when(mockContext.getActorUrn()).thenReturn("urn:li:corpuser:test");
|
||||
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
|
||||
Dataset parentDataset = new Dataset();
|
||||
parentDataset.setUrn(TEST_DATASET_URN.toString());
|
||||
Mockito.when(mockEnv.getSource()).thenReturn(parentDataset);
|
||||
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
|
||||
|
||||
EntityDataContractResolver resolver =
|
||||
new EntityDataContractResolver(mockClient, mockGraphClient);
|
||||
DataContract result = resolver.get(mockEnv).get();
|
||||
|
||||
// Assert that the result we get matches the expectations.
|
||||
assertEquals(result.getUrn(), TEST_DATA_CONTRACT_URN.toString());
|
||||
assertEquals(result.getType(), EntityType.DATA_CONTRACT);
|
||||
|
||||
// Verify Properties
|
||||
assertEquals(result.getProperties().getDataQuality().size(), 1);
|
||||
assertEquals(result.getProperties().getFreshness().size(), 1);
|
||||
assertEquals(result.getProperties().getSchema().size(), 1);
|
||||
assertEquals(
|
||||
result.getProperties().getDataQuality().get(0).getAssertion().getUrn(),
|
||||
TEST_QUALITY_ASSERTION_URN.toString());
|
||||
assertEquals(
|
||||
result.getProperties().getFreshness().get(0).getAssertion().getUrn(),
|
||||
TEST_FRESHNESS_ASSERTION_URN.toString());
|
||||
assertEquals(
|
||||
result.getProperties().getSchema().get(0).getAssertion().getUrn(),
|
||||
TEST_SCHEMA_ASSERTION_URN.toString());
|
||||
|
||||
// Verify Status
|
||||
assertEquals(result.getStatus().getState().toString(), expectedStatus.getState().toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSuccessNoContracts() throws Exception {
|
||||
GraphClient mockGraphClient = Mockito.mock(GraphClient.class);
|
||||
EntityClient mockClient = Mockito.mock(EntityClient.class);
|
||||
|
||||
Mockito.when(
|
||||
mockGraphClient.getRelatedEntities(
|
||||
Mockito.eq(TEST_DATASET_URN.toString()),
|
||||
Mockito.eq(ImmutableList.of(CONTRACT_FOR_RELATIONSHIP)),
|
||||
Mockito.eq(RelationshipDirection.INCOMING),
|
||||
Mockito.eq(0),
|
||||
Mockito.eq(1),
|
||||
Mockito.anyString()))
|
||||
.thenReturn(
|
||||
new EntityRelationships()
|
||||
.setTotal(0)
|
||||
.setCount(0)
|
||||
.setStart(0)
|
||||
.setRelationships(new EntityRelationshipArray(Collections.emptyList())));
|
||||
|
||||
EntityDataContractResolver resolver =
|
||||
new EntityDataContractResolver(mockClient, mockGraphClient);
|
||||
|
||||
// Execute resolver
|
||||
QueryContext mockContext = Mockito.mock(QueryContext.class);
|
||||
Mockito.when(mockContext.getAuthentication()).thenReturn(Mockito.mock(Authentication.class));
|
||||
Mockito.when(mockContext.getActorUrn()).thenReturn("urn:li:corpuser:test");
|
||||
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
|
||||
Dataset parentDataset = new Dataset();
|
||||
parentDataset.setUrn(TEST_DATASET_URN.toString());
|
||||
Mockito.when(mockEnv.getSource()).thenReturn(parentDataset);
|
||||
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
|
||||
|
||||
DataContract result = resolver.get(mockEnv).get();
|
||||
|
||||
assertNull(result);
|
||||
Mockito.verifyNoMoreInteractions(mockClient);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,379 @@
|
||||
package com.linkedin.datahub.graphql.resolvers.datacontract;
|
||||
|
||||
import static com.linkedin.datahub.graphql.TestUtils.*;
|
||||
import static com.linkedin.datahub.graphql.resolvers.datacontract.EntityDataContractResolver.*;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.testng.Assert.*;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.linkedin.common.AuditStamp;
|
||||
import com.linkedin.common.EntityRelationship;
|
||||
import com.linkedin.common.EntityRelationshipArray;
|
||||
import com.linkedin.common.EntityRelationships;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.common.urn.UrnUtils;
|
||||
import com.linkedin.data.template.StringMap;
|
||||
import com.linkedin.datacontract.DataContractProperties;
|
||||
import com.linkedin.datacontract.DataContractStatus;
|
||||
import com.linkedin.datacontract.DataQualityContract;
|
||||
import com.linkedin.datacontract.DataQualityContractArray;
|
||||
import com.linkedin.datacontract.FreshnessContract;
|
||||
import com.linkedin.datacontract.FreshnessContractArray;
|
||||
import com.linkedin.datacontract.SchemaContract;
|
||||
import com.linkedin.datacontract.SchemaContractArray;
|
||||
import com.linkedin.datahub.graphql.QueryContext;
|
||||
import com.linkedin.datahub.graphql.generated.DataContract;
|
||||
import com.linkedin.datahub.graphql.generated.DataContractState;
|
||||
import com.linkedin.datahub.graphql.generated.DataQualityContractInput;
|
||||
import com.linkedin.datahub.graphql.generated.FreshnessContractInput;
|
||||
import com.linkedin.datahub.graphql.generated.SchemaContractInput;
|
||||
import com.linkedin.datahub.graphql.generated.UpsertDataContractInput;
|
||||
import com.linkedin.entity.Aspect;
|
||||
import com.linkedin.entity.AspectType;
|
||||
import com.linkedin.entity.EntityResponse;
|
||||
import com.linkedin.entity.EnvelopedAspect;
|
||||
import com.linkedin.entity.EnvelopedAspectMap;
|
||||
import com.linkedin.entity.client.EntityClient;
|
||||
import com.linkedin.events.metadata.ChangeType;
|
||||
import com.linkedin.metadata.Constants;
|
||||
import com.linkedin.metadata.graph.GraphClient;
|
||||
import com.linkedin.metadata.key.DataContractKey;
|
||||
import com.linkedin.metadata.query.filter.RelationshipDirection;
|
||||
import com.linkedin.metadata.utils.EntityKeyUtils;
|
||||
import com.linkedin.metadata.utils.GenericRecordUtils;
|
||||
import com.linkedin.mxe.MetadataChangeProposal;
|
||||
import com.linkedin.mxe.SystemMetadata;
|
||||
import com.linkedin.r2.RemoteInvocationException;
|
||||
import graphql.schema.DataFetchingEnvironment;
|
||||
import io.datahubproject.metadata.context.OperationContext;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import org.mockito.Mockito;
|
||||
import org.testng.Assert;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
public class UpsertDataContractResolverTest {
|
||||
|
||||
private static final Urn TEST_CONTRACT_URN = UrnUtils.getUrn("urn:li:dataContract:test-id");
|
||||
private static final Urn TEST_DATASET_URN =
|
||||
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:snowflake,test,PROD)");
|
||||
private static final Urn TEST_FRESHNESS_ASSERTION_URN =
|
||||
UrnUtils.getUrn("urn:li:assertion:freshness");
|
||||
private static final Urn TEST_SCHEMA_ASSERTION_URN = UrnUtils.getUrn("urn:li:assertion:schema");
|
||||
private static final Urn TEST_QUALITY_ASSERTION_URN = UrnUtils.getUrn("urn:li:assertion:quality");
|
||||
|
||||
private static final UpsertDataContractInput TEST_CREATE_INPUT =
|
||||
new UpsertDataContractInput(
|
||||
TEST_DATASET_URN.toString(),
|
||||
ImmutableList.of(new FreshnessContractInput(TEST_FRESHNESS_ASSERTION_URN.toString())),
|
||||
ImmutableList.of(new SchemaContractInput(TEST_SCHEMA_ASSERTION_URN.toString())),
|
||||
ImmutableList.of(new DataQualityContractInput(TEST_QUALITY_ASSERTION_URN.toString())),
|
||||
DataContractState.PENDING,
|
||||
"test-id");
|
||||
|
||||
private static final UpsertDataContractInput TEST_VALID_UPDATE_INPUT =
|
||||
new UpsertDataContractInput(
|
||||
TEST_DATASET_URN.toString(),
|
||||
ImmutableList.of(new FreshnessContractInput(TEST_FRESHNESS_ASSERTION_URN.toString())),
|
||||
ImmutableList.of(new SchemaContractInput(TEST_SCHEMA_ASSERTION_URN.toString())),
|
||||
ImmutableList.of(new DataQualityContractInput(TEST_QUALITY_ASSERTION_URN.toString())),
|
||||
DataContractState.ACTIVE,
|
||||
null);
|
||||
|
||||
private static final Urn TEST_ACTOR_URN = UrnUtils.getUrn("urn:li:corpuser:test");
|
||||
|
||||
@Test
|
||||
public void testGetSuccessCreate() throws Exception {
|
||||
|
||||
// Expected results
|
||||
final DataContractKey key = new DataContractKey();
|
||||
key.setId("test-id");
|
||||
final Urn dataContractUrn =
|
||||
EntityKeyUtils.convertEntityKeyToUrn(key, Constants.DATA_CONTRACT_ENTITY_NAME);
|
||||
|
||||
final DataContractStatus status = new DataContractStatus();
|
||||
status.setState(com.linkedin.datacontract.DataContractState.PENDING);
|
||||
|
||||
final DataContractProperties props = new DataContractProperties();
|
||||
props.setEntity(TEST_DATASET_URN);
|
||||
props.setDataQuality(
|
||||
new DataQualityContractArray(
|
||||
ImmutableList.of(new DataQualityContract().setAssertion(TEST_QUALITY_ASSERTION_URN))));
|
||||
props.setFreshness(
|
||||
new FreshnessContractArray(
|
||||
ImmutableList.of(new FreshnessContract().setAssertion(TEST_FRESHNESS_ASSERTION_URN))));
|
||||
props.setSchema(
|
||||
new SchemaContractArray(
|
||||
ImmutableList.of(new SchemaContract().setAssertion(TEST_SCHEMA_ASSERTION_URN))));
|
||||
|
||||
// Create resolver
|
||||
EntityClient mockClient = Mockito.mock(EntityClient.class);
|
||||
GraphClient mockGraphClient = Mockito.mock(GraphClient.class);
|
||||
initMockGraphClient(mockGraphClient, null);
|
||||
initMockEntityClient(mockClient, null, props); // No existing contract
|
||||
UpsertDataContractResolver resolver =
|
||||
new UpsertDataContractResolver(mockClient, mockGraphClient);
|
||||
|
||||
// Execute resolver
|
||||
QueryContext mockContext = getMockAllowContext();
|
||||
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
|
||||
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(TEST_CREATE_INPUT);
|
||||
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
|
||||
|
||||
DataContract result = resolver.get(mockEnv).get();
|
||||
|
||||
final MetadataChangeProposal propertiesProposal = new MetadataChangeProposal();
|
||||
propertiesProposal.setEntityUrn(dataContractUrn);
|
||||
propertiesProposal.setEntityType(Constants.DATA_CONTRACT_ENTITY_NAME);
|
||||
propertiesProposal.setSystemMetadata(
|
||||
new SystemMetadata().setProperties(new StringMap(ImmutableMap.of("appSource", "ui"))));
|
||||
propertiesProposal.setAspectName(Constants.DATA_CONTRACT_PROPERTIES_ASPECT_NAME);
|
||||
propertiesProposal.setAspect(GenericRecordUtils.serializeAspect(props));
|
||||
propertiesProposal.setChangeType(ChangeType.UPSERT);
|
||||
|
||||
final MetadataChangeProposal statusProposal = new MetadataChangeProposal();
|
||||
statusProposal.setEntityUrn(dataContractUrn);
|
||||
statusProposal.setEntityType(Constants.DATA_CONTRACT_ENTITY_NAME);
|
||||
statusProposal.setSystemMetadata(
|
||||
new SystemMetadata().setProperties(new StringMap(ImmutableMap.of("appSource", "ui"))));
|
||||
statusProposal.setAspectName(Constants.DATA_CONTRACT_STATUS_ASPECT_NAME);
|
||||
statusProposal.setAspect(GenericRecordUtils.serializeAspect(status));
|
||||
statusProposal.setChangeType(ChangeType.UPSERT);
|
||||
|
||||
Mockito.verify(mockClient, Mockito.times(1))
|
||||
.batchIngestProposals(
|
||||
any(OperationContext.class),
|
||||
Mockito.eq(ImmutableList.of(propertiesProposal, statusProposal)),
|
||||
Mockito.eq(false));
|
||||
|
||||
Assert.assertEquals(result.getUrn(), TEST_CONTRACT_URN.toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSuccessUpdate() throws Exception {
|
||||
|
||||
DataContractProperties props = new DataContractProperties();
|
||||
props.setEntity(TEST_DATASET_URN);
|
||||
props.setDataQuality(
|
||||
new DataQualityContractArray(
|
||||
ImmutableList.of(new DataQualityContract().setAssertion(TEST_QUALITY_ASSERTION_URN))));
|
||||
props.setFreshness(
|
||||
new FreshnessContractArray(
|
||||
ImmutableList.of(new FreshnessContract().setAssertion(TEST_FRESHNESS_ASSERTION_URN))));
|
||||
props.setSchema(
|
||||
new SchemaContractArray(
|
||||
ImmutableList.of(new SchemaContract().setAssertion(TEST_SCHEMA_ASSERTION_URN))));
|
||||
|
||||
DataContractStatus status = new DataContractStatus();
|
||||
status.setState(com.linkedin.datacontract.DataContractState.ACTIVE);
|
||||
|
||||
// Update resolver
|
||||
EntityClient mockClient = Mockito.mock(EntityClient.class);
|
||||
GraphClient mockGraphClient = Mockito.mock(GraphClient.class);
|
||||
initMockGraphClient(mockGraphClient, TEST_CONTRACT_URN);
|
||||
initMockEntityClient(mockClient, TEST_CONTRACT_URN, props); // Contract Exists
|
||||
UpsertDataContractResolver resolver =
|
||||
new UpsertDataContractResolver(mockClient, mockGraphClient);
|
||||
|
||||
// Execute resolver
|
||||
QueryContext mockContext = getMockAllowContext();
|
||||
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
|
||||
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(TEST_VALID_UPDATE_INPUT);
|
||||
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
|
||||
|
||||
DataContract result = resolver.get(mockEnv).get();
|
||||
|
||||
final MetadataChangeProposal propertiesProposal = new MetadataChangeProposal();
|
||||
propertiesProposal.setEntityUrn(TEST_CONTRACT_URN);
|
||||
propertiesProposal.setEntityType(Constants.DATA_CONTRACT_ENTITY_NAME);
|
||||
propertiesProposal.setSystemMetadata(
|
||||
new SystemMetadata().setProperties(new StringMap(ImmutableMap.of("appSource", "ui"))));
|
||||
propertiesProposal.setAspectName(Constants.DATA_CONTRACT_PROPERTIES_ASPECT_NAME);
|
||||
propertiesProposal.setAspect(GenericRecordUtils.serializeAspect(props));
|
||||
propertiesProposal.setChangeType(ChangeType.UPSERT);
|
||||
|
||||
final MetadataChangeProposal statusProposal = new MetadataChangeProposal();
|
||||
statusProposal.setEntityUrn(TEST_CONTRACT_URN);
|
||||
statusProposal.setEntityType(Constants.DATA_CONTRACT_ENTITY_NAME);
|
||||
statusProposal.setSystemMetadata(
|
||||
new SystemMetadata().setProperties(new StringMap(ImmutableMap.of("appSource", "ui"))));
|
||||
statusProposal.setAspectName(Constants.DATA_CONTRACT_STATUS_ASPECT_NAME);
|
||||
statusProposal.setAspect(GenericRecordUtils.serializeAspect(status));
|
||||
statusProposal.setChangeType(ChangeType.UPSERT);
|
||||
|
||||
Mockito.verify(mockClient, Mockito.times(1))
|
||||
.batchIngestProposals(
|
||||
any(OperationContext.class),
|
||||
Mockito.eq(ImmutableList.of(propertiesProposal, statusProposal)),
|
||||
Mockito.eq(false));
|
||||
|
||||
Assert.assertEquals(result.getUrn(), TEST_CONTRACT_URN.toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetFailureEntityDoesNotExist() throws Exception {
|
||||
// Update resolver
|
||||
EntityClient mockClient = Mockito.mock(EntityClient.class);
|
||||
GraphClient mockGraphClient = Mockito.mock(GraphClient.class);
|
||||
initMockGraphClient(mockGraphClient, TEST_CONTRACT_URN);
|
||||
Mockito.when(mockClient.exists(any(OperationContext.class), Mockito.eq(TEST_DATASET_URN)))
|
||||
.thenReturn(false);
|
||||
UpsertDataContractResolver resolver =
|
||||
new UpsertDataContractResolver(mockClient, mockGraphClient);
|
||||
|
||||
// Execute resolver
|
||||
QueryContext mockContext = getMockAllowContext();
|
||||
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
|
||||
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(TEST_CREATE_INPUT);
|
||||
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
|
||||
|
||||
Assert.assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetFailureAssertionDoesNotExist() throws Exception {
|
||||
// Update resolver
|
||||
EntityClient mockClient = Mockito.mock(EntityClient.class);
|
||||
GraphClient mockGraphClient = Mockito.mock(GraphClient.class);
|
||||
initMockGraphClient(mockGraphClient, TEST_CONTRACT_URN);
|
||||
Mockito.when(mockClient.exists(any(OperationContext.class), Mockito.eq(TEST_DATASET_URN)))
|
||||
.thenReturn(true);
|
||||
Mockito.when(
|
||||
mockClient.exists(
|
||||
any(OperationContext.class), Mockito.eq(TEST_FRESHNESS_ASSERTION_URN)))
|
||||
.thenReturn(false);
|
||||
Mockito.when(
|
||||
mockClient.exists(any(OperationContext.class), Mockito.eq(TEST_QUALITY_ASSERTION_URN)))
|
||||
.thenReturn(false);
|
||||
Mockito.when(
|
||||
mockClient.exists(any(OperationContext.class), Mockito.eq(TEST_SCHEMA_ASSERTION_URN)))
|
||||
.thenReturn(false);
|
||||
UpsertDataContractResolver resolver =
|
||||
new UpsertDataContractResolver(mockClient, mockGraphClient);
|
||||
|
||||
// Execute resolver
|
||||
QueryContext mockContext = getMockAllowContext();
|
||||
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
|
||||
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(TEST_CREATE_INPUT);
|
||||
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
|
||||
|
||||
Assert.assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetUnauthorized() throws Exception {
|
||||
// Create resolver
|
||||
EntityClient mockClient = Mockito.mock(EntityClient.class);
|
||||
GraphClient mockGraphClient = Mockito.mock(GraphClient.class);
|
||||
UpsertDataContractResolver resolver =
|
||||
new UpsertDataContractResolver(mockClient, mockGraphClient);
|
||||
|
||||
// Execute resolver
|
||||
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
|
||||
QueryContext mockContext = getMockDenyContext();
|
||||
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(TEST_CREATE_INPUT);
|
||||
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
|
||||
|
||||
assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join());
|
||||
Mockito.verify(mockClient, Mockito.times(0))
|
||||
.ingestProposal(any(OperationContext.class), Mockito.any());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetEntityClientException() throws Exception {
|
||||
// Create resolver
|
||||
EntityClient mockClient = Mockito.mock(EntityClient.class);
|
||||
GraphClient mockGraphClient = Mockito.mock(GraphClient.class);
|
||||
Mockito.doThrow(RemoteInvocationException.class)
|
||||
.when(mockClient)
|
||||
.ingestProposal(any(OperationContext.class), Mockito.any(), Mockito.eq(false));
|
||||
UpsertDataContractResolver resolver =
|
||||
new UpsertDataContractResolver(mockClient, mockGraphClient);
|
||||
|
||||
// Execute resolver
|
||||
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
|
||||
QueryContext mockContext = getMockAllowContext();
|
||||
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(TEST_CREATE_INPUT);
|
||||
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
|
||||
|
||||
assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join());
|
||||
}
|
||||
|
||||
private void initMockGraphClient(GraphClient client, Urn existingContractUrn) {
|
||||
if (existingContractUrn != null) {
|
||||
Mockito.when(
|
||||
client.getRelatedEntities(
|
||||
Mockito.eq(TEST_DATASET_URN.toString()),
|
||||
Mockito.eq(ImmutableList.of(CONTRACT_FOR_RELATIONSHIP)),
|
||||
Mockito.eq(RelationshipDirection.INCOMING),
|
||||
Mockito.eq(0),
|
||||
Mockito.eq(1),
|
||||
Mockito.anyString()))
|
||||
.thenReturn(
|
||||
new EntityRelationships()
|
||||
.setTotal(1)
|
||||
.setCount(1)
|
||||
.setStart(0)
|
||||
.setRelationships(
|
||||
new EntityRelationshipArray(
|
||||
ImmutableList.of(
|
||||
new EntityRelationship()
|
||||
.setEntity(existingContractUrn)
|
||||
.setType(CONTRACT_FOR_RELATIONSHIP)
|
||||
.setCreated(
|
||||
new AuditStamp().setActor(TEST_ACTOR_URN).setTime(0L))))));
|
||||
} else {
|
||||
Mockito.when(
|
||||
client.getRelatedEntities(
|
||||
Mockito.eq(TEST_DATASET_URN.toString()),
|
||||
Mockito.eq(ImmutableList.of(CONTRACT_FOR_RELATIONSHIP)),
|
||||
Mockito.eq(RelationshipDirection.INCOMING),
|
||||
Mockito.eq(0),
|
||||
Mockito.eq(1),
|
||||
Mockito.anyString()))
|
||||
.thenReturn(
|
||||
new EntityRelationships()
|
||||
.setTotal(0)
|
||||
.setCount(0)
|
||||
.setStart(0)
|
||||
.setRelationships(new EntityRelationshipArray(Collections.emptyList())));
|
||||
}
|
||||
}
|
||||
|
||||
private void initMockEntityClient(
|
||||
EntityClient client, Urn existingContractUrn, DataContractProperties newContractProperties)
|
||||
throws Exception {
|
||||
if (existingContractUrn != null) {
|
||||
Mockito.when(client.exists(any(OperationContext.class), Mockito.eq(existingContractUrn)))
|
||||
.thenReturn(true);
|
||||
}
|
||||
Mockito.when(client.exists(any(OperationContext.class), Mockito.eq(TEST_DATASET_URN)))
|
||||
.thenReturn(true);
|
||||
Mockito.when(client.exists(any(OperationContext.class), Mockito.eq(TEST_QUALITY_ASSERTION_URN)))
|
||||
.thenReturn(true);
|
||||
Mockito.when(
|
||||
client.exists(any(OperationContext.class), Mockito.eq(TEST_FRESHNESS_ASSERTION_URN)))
|
||||
.thenReturn(true);
|
||||
Mockito.when(client.exists(any(OperationContext.class), Mockito.eq(TEST_SCHEMA_ASSERTION_URN)))
|
||||
.thenReturn(true);
|
||||
|
||||
Mockito.when(
|
||||
client.getV2(
|
||||
any(OperationContext.class),
|
||||
Mockito.eq(Constants.DATA_CONTRACT_ENTITY_NAME),
|
||||
Mockito.eq(TEST_CONTRACT_URN),
|
||||
Mockito.eq(null)))
|
||||
.thenReturn(
|
||||
new EntityResponse()
|
||||
.setUrn(TEST_CONTRACT_URN)
|
||||
.setAspects(
|
||||
new EnvelopedAspectMap(
|
||||
ImmutableMap.of(
|
||||
Constants.DATA_CONTRACT_PROPERTIES_ASPECT_NAME,
|
||||
new EnvelopedAspect()
|
||||
.setType(AspectType.VERSIONED)
|
||||
.setName(Constants.DATA_CONTRACT_ENTITY_NAME)
|
||||
.setValue(new Aspect(newContractProperties.data()))))));
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,180 @@
|
||||
package com.linkedin.datahub.graphql.types.datacontract;
|
||||
|
||||
import static org.testng.Assert.assertEquals;
|
||||
import static org.testng.Assert.assertNotNull;
|
||||
import static org.testng.Assert.assertNull;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.data.template.StringMap;
|
||||
import com.linkedin.datacontract.DataContractProperties;
|
||||
import com.linkedin.datacontract.DataContractState;
|
||||
import com.linkedin.datacontract.DataContractStatus;
|
||||
import com.linkedin.datacontract.DataQualityContract;
|
||||
import com.linkedin.datacontract.DataQualityContractArray;
|
||||
import com.linkedin.datacontract.FreshnessContract;
|
||||
import com.linkedin.datacontract.FreshnessContractArray;
|
||||
import com.linkedin.datacontract.SchemaContract;
|
||||
import com.linkedin.datacontract.SchemaContractArray;
|
||||
import com.linkedin.datahub.graphql.generated.DataContract;
|
||||
import com.linkedin.datahub.graphql.generated.EntityType;
|
||||
import com.linkedin.entity.Aspect;
|
||||
import com.linkedin.entity.EntityResponse;
|
||||
import com.linkedin.entity.EnvelopedAspect;
|
||||
import com.linkedin.entity.EnvelopedAspectMap;
|
||||
import com.linkedin.metadata.Constants;
|
||||
import java.util.Collections;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
public class DataContractMapperTest {
|
||||
|
||||
@Test
|
||||
public void testMapAllFields() throws Exception {
|
||||
EntityResponse entityResponse = new EntityResponse();
|
||||
Urn urn = Urn.createFromString("urn:li:dataContract:1");
|
||||
Urn dataQualityAssertionUrn = Urn.createFromString("urn:li:assertion:quality");
|
||||
Urn dataQualityAssertionUrn2 = Urn.createFromString("urn:li:assertion:quality2");
|
||||
|
||||
Urn freshnessAssertionUrn = Urn.createFromString("urn:li:assertion:freshness");
|
||||
Urn schemaAssertionUrn = Urn.createFromString("urn:li:assertion:schema");
|
||||
Urn datasetUrn =
|
||||
Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:snowflake,test,PROD)");
|
||||
entityResponse.setUrn(urn);
|
||||
|
||||
EnvelopedAspect envelopedDataContractProperties = new EnvelopedAspect();
|
||||
DataContractProperties dataContractProperties = new DataContractProperties();
|
||||
dataContractProperties.setDataQuality(
|
||||
new DataQualityContractArray(
|
||||
ImmutableList.of(
|
||||
new DataQualityContract().setAssertion(dataQualityAssertionUrn),
|
||||
new DataQualityContract().setAssertion(dataQualityAssertionUrn2))));
|
||||
dataContractProperties.setFreshness(
|
||||
new FreshnessContractArray(
|
||||
ImmutableList.of(new FreshnessContract().setAssertion(freshnessAssertionUrn))));
|
||||
dataContractProperties.setSchema(
|
||||
new SchemaContractArray(
|
||||
ImmutableList.of(new SchemaContract().setAssertion(schemaAssertionUrn))));
|
||||
|
||||
dataContractProperties.setEntity(datasetUrn);
|
||||
|
||||
envelopedDataContractProperties.setValue(new Aspect(dataContractProperties.data()));
|
||||
|
||||
EnvelopedAspect envelopedDataContractStatus = new EnvelopedAspect();
|
||||
DataContractStatus status = new DataContractStatus();
|
||||
status.setState(DataContractState.PENDING);
|
||||
status.setCustomProperties(new StringMap(ImmutableMap.of("key", "value")));
|
||||
|
||||
envelopedDataContractStatus.setValue(new Aspect(status.data()));
|
||||
entityResponse.setAspects(
|
||||
new EnvelopedAspectMap(
|
||||
Collections.singletonMap(
|
||||
Constants.DATA_CONTRACT_PROPERTIES_ASPECT_NAME, envelopedDataContractProperties)));
|
||||
|
||||
entityResponse.setAspects(
|
||||
new EnvelopedAspectMap(
|
||||
ImmutableMap.of(
|
||||
Constants.DATA_CONTRACT_PROPERTIES_ASPECT_NAME,
|
||||
envelopedDataContractProperties,
|
||||
Constants.DATA_CONTRACT_STATUS_ASPECT_NAME,
|
||||
envelopedDataContractStatus)));
|
||||
|
||||
DataContract dataContract = DataContractMapper.mapContract(entityResponse);
|
||||
assertNotNull(dataContract);
|
||||
assertEquals(dataContract.getUrn(), urn.toString());
|
||||
assertEquals(dataContract.getType(), EntityType.DATA_CONTRACT);
|
||||
assertEquals(
|
||||
dataContract.getStatus().getState(),
|
||||
com.linkedin.datahub.graphql.generated.DataContractState.PENDING);
|
||||
assertEquals(dataContract.getProperties().getEntityUrn(), datasetUrn.toString());
|
||||
assertEquals(dataContract.getProperties().getDataQuality().size(), 2);
|
||||
assertEquals(
|
||||
dataContract.getProperties().getDataQuality().get(0).getAssertion().getUrn(),
|
||||
dataQualityAssertionUrn.toString());
|
||||
assertEquals(
|
||||
dataContract.getProperties().getDataQuality().get(1).getAssertion().getUrn(),
|
||||
dataQualityAssertionUrn2.toString());
|
||||
assertEquals(dataContract.getProperties().getFreshness().size(), 1);
|
||||
assertEquals(
|
||||
dataContract.getProperties().getFreshness().get(0).getAssertion().getUrn(),
|
||||
freshnessAssertionUrn.toString());
|
||||
assertEquals(dataContract.getProperties().getSchema().size(), 1);
|
||||
assertEquals(
|
||||
dataContract.getProperties().getSchema().get(0).getAssertion().getUrn(),
|
||||
schemaAssertionUrn.toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMapRequiredFields() throws Exception {
|
||||
EntityResponse entityResponse = new EntityResponse();
|
||||
Urn urn = Urn.createFromString("urn:li:dataContract:1");
|
||||
Urn datasetUrn =
|
||||
Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:snowflake,test,PROD)");
|
||||
entityResponse.setUrn(urn);
|
||||
|
||||
EnvelopedAspect envelopedDataContractProperties = new EnvelopedAspect();
|
||||
DataContractProperties dataContractProperties = new DataContractProperties();
|
||||
dataContractProperties.setEntity(datasetUrn);
|
||||
envelopedDataContractProperties.setValue(new Aspect(dataContractProperties.data()));
|
||||
|
||||
EnvelopedAspect envelopedDataContractStatus = new EnvelopedAspect();
|
||||
DataContractStatus status = new DataContractStatus();
|
||||
status.setState(DataContractState.PENDING);
|
||||
status.setCustomProperties(new StringMap(ImmutableMap.of("key", "value")));
|
||||
|
||||
envelopedDataContractStatus.setValue(new Aspect(status.data()));
|
||||
entityResponse.setAspects(
|
||||
new EnvelopedAspectMap(
|
||||
Collections.singletonMap(
|
||||
Constants.DATA_CONTRACT_PROPERTIES_ASPECT_NAME, envelopedDataContractProperties)));
|
||||
|
||||
entityResponse.setAspects(
|
||||
new EnvelopedAspectMap(
|
||||
ImmutableMap.of(
|
||||
Constants.DATA_CONTRACT_PROPERTIES_ASPECT_NAME,
|
||||
envelopedDataContractProperties,
|
||||
Constants.DATA_CONTRACT_STATUS_ASPECT_NAME,
|
||||
envelopedDataContractStatus)));
|
||||
|
||||
DataContract dataContract = DataContractMapper.mapContract(entityResponse);
|
||||
assertNotNull(dataContract);
|
||||
assertEquals(dataContract.getUrn(), urn.toString());
|
||||
assertEquals(dataContract.getType(), EntityType.DATA_CONTRACT);
|
||||
assertEquals(
|
||||
dataContract.getStatus().getState(),
|
||||
com.linkedin.datahub.graphql.generated.DataContractState.PENDING);
|
||||
assertEquals(dataContract.getProperties().getEntityUrn(), datasetUrn.toString());
|
||||
assertNull(dataContract.getProperties().getDataQuality());
|
||||
assertNull(dataContract.getProperties().getSchema());
|
||||
assertNull(dataContract.getProperties().getFreshness());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMapNoStatus() throws Exception {
|
||||
EntityResponse entityResponse = new EntityResponse();
|
||||
Urn urn = Urn.createFromString("urn:li:dataContract:1");
|
||||
Urn datasetUrn =
|
||||
Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:snowflake,test,PROD)");
|
||||
entityResponse.setUrn(urn);
|
||||
|
||||
EnvelopedAspect envelopedDataContractProperties = new EnvelopedAspect();
|
||||
DataContractProperties dataContractProperties = new DataContractProperties();
|
||||
dataContractProperties.setEntity(datasetUrn);
|
||||
envelopedDataContractProperties.setValue(new Aspect(dataContractProperties.data()));
|
||||
|
||||
entityResponse.setAspects(
|
||||
new EnvelopedAspectMap(
|
||||
ImmutableMap.of(
|
||||
Constants.DATA_CONTRACT_PROPERTIES_ASPECT_NAME, envelopedDataContractProperties)));
|
||||
|
||||
DataContract dataContract = DataContractMapper.mapContract(entityResponse);
|
||||
assertNotNull(dataContract);
|
||||
assertEquals(dataContract.getUrn(), urn.toString());
|
||||
assertEquals(dataContract.getType(), EntityType.DATA_CONTRACT);
|
||||
assertNull(dataContract.getStatus());
|
||||
assertEquals(dataContract.getProperties().getEntityUrn(), datasetUrn.toString());
|
||||
assertNull(dataContract.getProperties().getDataQuality());
|
||||
assertNull(dataContract.getProperties().getSchema());
|
||||
assertNull(dataContract.getProperties().getFreshness());
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,152 @@
|
||||
package com.linkedin.datahub.graphql.types.datacontract;
|
||||
|
||||
import static com.linkedin.datahub.graphql.TestUtils.*;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.nullable;
|
||||
import static org.testng.Assert.*;
|
||||
|
||||
import com.datahub.authentication.Authentication;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.common.urn.UrnUtils;
|
||||
import com.linkedin.datacontract.DataContractProperties;
|
||||
import com.linkedin.datacontract.DataContractState;
|
||||
import com.linkedin.datacontract.DataContractStatus;
|
||||
import com.linkedin.datacontract.DataQualityContract;
|
||||
import com.linkedin.datacontract.DataQualityContractArray;
|
||||
import com.linkedin.datacontract.FreshnessContract;
|
||||
import com.linkedin.datacontract.FreshnessContractArray;
|
||||
import com.linkedin.datacontract.SchemaContract;
|
||||
import com.linkedin.datacontract.SchemaContractArray;
|
||||
import com.linkedin.datahub.graphql.QueryContext;
|
||||
import com.linkedin.datahub.graphql.generated.DataContract;
|
||||
import com.linkedin.datahub.graphql.generated.EntityType;
|
||||
import com.linkedin.entity.Aspect;
|
||||
import com.linkedin.entity.EntityResponse;
|
||||
import com.linkedin.entity.EnvelopedAspect;
|
||||
import com.linkedin.entity.EnvelopedAspectMap;
|
||||
import com.linkedin.entity.client.EntityClient;
|
||||
import com.linkedin.metadata.Constants;
|
||||
import com.linkedin.metadata.key.DataContractKey;
|
||||
import com.linkedin.r2.RemoteInvocationException;
|
||||
import graphql.execution.DataFetcherResult;
|
||||
import io.datahubproject.metadata.context.OperationContext;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import org.mockito.Mockito;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
public class DataContractTypeTest {
|
||||
|
||||
private static final Urn TEST_DATASET_URN =
|
||||
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:snowflake,test,PROD)");
|
||||
private static final Urn DATA_QUALITY_ASSERTION_URN = UrnUtils.getUrn("urn:li:assertion:quality");
|
||||
private static final Urn FRESHNESS_ASSERTION_URN = UrnUtils.getUrn("urn:li:assertion:freshness");
|
||||
private static final Urn SCHEMA_ASSERTION_URN = UrnUtils.getUrn("urn:li:assertion:schema");
|
||||
private static final String TEST_DATA_CONTRACT_1_URN = "urn:li:dataContract:id-1";
|
||||
private static final DataContractKey TEST_DATA_CONTRACT_1_KEY =
|
||||
new DataContractKey().setId("id-1");
|
||||
private static final DataContractProperties TEST_DATA_CONTRACT_1_PROPERTIES =
|
||||
new DataContractProperties()
|
||||
.setEntity(TEST_DATASET_URN)
|
||||
.setDataQuality(
|
||||
new DataQualityContractArray(
|
||||
ImmutableList.of(
|
||||
new DataQualityContract().setAssertion(DATA_QUALITY_ASSERTION_URN))))
|
||||
.setFreshness(
|
||||
new FreshnessContractArray(
|
||||
ImmutableList.of(new FreshnessContract().setAssertion(FRESHNESS_ASSERTION_URN))))
|
||||
.setSchema(
|
||||
new SchemaContractArray(
|
||||
ImmutableList.of(new SchemaContract().setAssertion(SCHEMA_ASSERTION_URN))));
|
||||
private static final DataContractStatus TEST_DATA_CONTRACT_1_STATUS =
|
||||
new DataContractStatus().setState(DataContractState.ACTIVE);
|
||||
|
||||
private static final String TEST_DATA_CONTRACT_2_URN = "urn:li:dataContract:id-2";
|
||||
|
||||
@Test
|
||||
public void testBatchLoad() throws Exception {
|
||||
|
||||
EntityClient client = Mockito.mock(EntityClient.class);
|
||||
|
||||
Urn dataContractUrn1 = Urn.createFromString(TEST_DATA_CONTRACT_1_URN);
|
||||
Urn dataContractUrn2 = Urn.createFromString(TEST_DATA_CONTRACT_2_URN);
|
||||
|
||||
Mockito.when(
|
||||
client.batchGetV2(
|
||||
any(OperationContext.class),
|
||||
Mockito.eq(Constants.DATA_CONTRACT_ENTITY_NAME),
|
||||
Mockito.eq(new HashSet<>(ImmutableSet.of(dataContractUrn1, dataContractUrn2))),
|
||||
Mockito.eq(DataContractType.ASPECTS_TO_FETCH)))
|
||||
.thenReturn(
|
||||
ImmutableMap.of(
|
||||
dataContractUrn1,
|
||||
new EntityResponse()
|
||||
.setEntityName(Constants.DATA_CONTRACT_ENTITY_NAME)
|
||||
.setUrn(dataContractUrn1)
|
||||
.setAspects(
|
||||
new EnvelopedAspectMap(
|
||||
ImmutableMap.of(
|
||||
Constants.DATA_CONTRACT_KEY_ASPECT_NAME,
|
||||
new EnvelopedAspect()
|
||||
.setValue(new Aspect(TEST_DATA_CONTRACT_1_KEY.data())),
|
||||
Constants.DATA_CONTRACT_PROPERTIES_ASPECT_NAME,
|
||||
new EnvelopedAspect()
|
||||
.setValue(new Aspect(TEST_DATA_CONTRACT_1_PROPERTIES.data())),
|
||||
Constants.DATA_CONTRACT_STATUS_ASPECT_NAME,
|
||||
new EnvelopedAspect()
|
||||
.setValue(new Aspect(TEST_DATA_CONTRACT_1_STATUS.data())))))));
|
||||
|
||||
DataContractType type = new DataContractType(client);
|
||||
|
||||
QueryContext mockContext = getMockAllowContext();
|
||||
List<DataFetcherResult<DataContract>> result =
|
||||
type.batchLoad(
|
||||
ImmutableList.of(TEST_DATA_CONTRACT_1_URN, TEST_DATA_CONTRACT_2_URN), mockContext);
|
||||
|
||||
// Verify response
|
||||
Mockito.verify(client, Mockito.times(1))
|
||||
.batchGetV2(
|
||||
any(OperationContext.class),
|
||||
Mockito.eq(Constants.DATA_CONTRACT_ENTITY_NAME),
|
||||
Mockito.eq(ImmutableSet.of(dataContractUrn1, dataContractUrn2)),
|
||||
Mockito.eq(DataContractType.ASPECTS_TO_FETCH));
|
||||
|
||||
assertEquals(result.size(), 2);
|
||||
|
||||
DataContract dataContract1 = result.get(0).getData();
|
||||
assertEquals(dataContract1.getUrn(), TEST_DATA_CONTRACT_1_URN);
|
||||
assertEquals(dataContract1.getType(), EntityType.DATA_CONTRACT);
|
||||
assertEquals(dataContract1.getProperties().getEntityUrn(), TEST_DATASET_URN.toString());
|
||||
assertEquals(dataContract1.getProperties().getDataQuality().size(), 1);
|
||||
assertEquals(dataContract1.getProperties().getSchema().size(), 1);
|
||||
assertEquals(dataContract1.getProperties().getFreshness().size(), 1);
|
||||
|
||||
// Assert second element is null.
|
||||
assertNull(result.get(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBatchLoadClientException() throws Exception {
|
||||
EntityClient mockClient = Mockito.mock(EntityClient.class);
|
||||
Mockito.doThrow(RemoteInvocationException.class)
|
||||
.when(mockClient)
|
||||
.batchGetV2(
|
||||
nullable(OperationContext.class),
|
||||
Mockito.anyString(),
|
||||
Mockito.anySet(),
|
||||
Mockito.anySet());
|
||||
DataContractType type = new DataContractType(mockClient);
|
||||
|
||||
// Execute Batch load
|
||||
QueryContext context = Mockito.mock(QueryContext.class);
|
||||
Mockito.when(context.getAuthentication()).thenReturn(Mockito.mock(Authentication.class));
|
||||
assertThrows(
|
||||
RuntimeException.class,
|
||||
() ->
|
||||
type.batchLoad(
|
||||
ImmutableList.of(TEST_DATA_CONTRACT_1_URN, TEST_DATA_CONTRACT_2_URN), context));
|
||||
}
|
||||
}
|
||||
@ -363,6 +363,12 @@ public class Constants {
|
||||
public static final String DATAHUB_CONNECTION_ENTITY_NAME = "dataHubConnection";
|
||||
public static final String DATAHUB_CONNECTION_DETAILS_ASPECT_NAME = "dataHubConnectionDetails";
|
||||
|
||||
// Data Contracts
|
||||
public static final String DATA_CONTRACT_ENTITY_NAME = "dataContract";
|
||||
public static final String DATA_CONTRACT_PROPERTIES_ASPECT_NAME = "dataContractProperties";
|
||||
public static final String DATA_CONTRACT_KEY_ASPECT_NAME = "dataContractKey";
|
||||
public static final String DATA_CONTRACT_STATUS_ASPECT_NAME = "dataContractStatus";
|
||||
|
||||
// Relationships
|
||||
public static final String IS_MEMBER_OF_GROUP_RELATIONSHIP_NAME = "IsMemberOfGroup";
|
||||
public static final String IS_MEMBER_OF_NATIVE_GROUP_RELATIONSHIP_NAME = "IsMemberOfNativeGroup";
|
||||
|
||||
@ -15,7 +15,7 @@ record FreshnessAssertionSchedule {
|
||||
*/
|
||||
type: enum FreshnessAssertionScheduleType {
|
||||
/**
|
||||
* An highly configurable recurring schedule which describes the times of events described
|
||||
* A highly configurable recurring schedule which describes the times of events described
|
||||
* by a CRON schedule, with the evaluation schedule assuming to be matching the cron schedule.
|
||||
*
|
||||
* In a CRON schedule type, we compute the look-back window to be the time between the last scheduled event
|
||||
@ -45,12 +45,30 @@ record FreshnessAssertionSchedule {
|
||||
* to be evaluated each hour, we'd compute the result as follows:
|
||||
*
|
||||
* 1. Subtract the fixed interval from the current time (Evaluation time) to compute the bounds of a fixed look-back window.
|
||||
* 2. Verify that the target event has occurred within the CRON-interval window.
|
||||
* 2. Verify that the target event has occurred within the look-back window.
|
||||
* 3. If the target event has occurred within the time window, then assertion passes.
|
||||
* 4. If the target event has not occurred within the time window, then the assertion fails.
|
||||
*
|
||||
*/
|
||||
FIXED_INTERVAL
|
||||
/**
|
||||
* A stateful check that takes the last time this check ran to determine the look-back window.
|
||||
*
|
||||
* To compute the valid look-back- window, we start at the time the monitor last evaluated this assertion,
|
||||
* and we end at the point in time the check is currently running.
|
||||
*
|
||||
* For example, let's say a Freshness assertion is of type SINCE_THE_LAST_CHECK, and the monitor is configured to
|
||||
* run every day at 12:00am. Let's assume this assertion was last evaluated yesterday at 12:04am. We'd compute
|
||||
* the result as follows:
|
||||
*
|
||||
* 1. Get the timestamp for the latest run event on this assertion.
|
||||
* 2. look_back_window_start_time = latest_run_event.timestampMillis [12:04a yesterday]
|
||||
* 3. look_back_window_end_time = nowMillis [12:02a today]
|
||||
* 4. If the target event has occurred within the window [12:04a yday to 12:02a today], then assertion passes.
|
||||
* 5. If the target event has not occurred within the window, then the assertion fails.
|
||||
*
|
||||
*/
|
||||
SINCE_THE_LAST_CHECK
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -345,6 +345,12 @@ public class PoliciesConfig {
|
||||
"Edit Dataset Queries",
|
||||
"The ability to edit the Queries for a Dataset.");
|
||||
|
||||
public static final Privilege EDIT_ENTITY_DATA_CONTRACT_PRIVILEGE =
|
||||
Privilege.of(
|
||||
"EDIT_ENTITY_DATA_CONTRACT",
|
||||
"Edit Data Contract",
|
||||
"The ability to edit the Data Contract for an entity.");
|
||||
|
||||
// Tag Privileges
|
||||
public static final Privilege EDIT_TAG_COLOR_PRIVILEGE =
|
||||
Privilege.of("EDIT_TAG_COLOR", "Edit Tag Color", "The ability to change the color of a Tag.");
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user