feat(graphql): add MutableTypeBatchResolver (#4976)

This commit is contained in:
NoahFournier 2022-08-05 19:20:29 +01:00 committed by GitHub
parent da77752f0d
commit cfcc1cf760
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 309 additions and 10 deletions

View File

@ -144,6 +144,7 @@ import com.linkedin.datahub.graphql.resolvers.mutate.AddTagResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.AddTagsResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.AddTermResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.AddTermsResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.MutableTypeBatchResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.BatchAddOwnersResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.BatchAddTagsResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.BatchAddTermsResolver;
@ -681,6 +682,7 @@ public class GmsGraphQLEngine {
private void configureMutationResolvers(final RuntimeWiring.Builder builder) {
builder.type("Mutation", typeWiring -> typeWiring
.dataFetcher("updateDataset", new MutableTypeResolver<>(datasetType))
.dataFetcher("updateDatasets", new MutableTypeBatchResolver<>(datasetType))
.dataFetcher("createTag", new CreateTagResolver(this.entityClient))
.dataFetcher("updateTag", new MutableTypeResolver<>(tagType))
.dataFetcher("setTagColor", new SetTagColorResolver(entityClient, entityService))

View File

@ -0,0 +1,53 @@
package com.linkedin.datahub.graphql.resolvers.mutate;
import com.codahale.metrics.Timer;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.types.BatchMutableType;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;
/**
* Generic GraphQL resolver responsible for performing updates against particular types.
*
* @param <I> the generated GraphQL POJO corresponding to the input type.
* @param <T> the generated GraphQL POJO corresponding to the return type.
*/
public class MutableTypeBatchResolver<I, B, T> implements DataFetcher<CompletableFuture<List<T>>> {
private static final Logger _logger = LoggerFactory.getLogger(MutableTypeBatchResolver.class.getName());
private final BatchMutableType<I, B, T> _batchMutableType;
public MutableTypeBatchResolver(final BatchMutableType<I, B, T> batchMutableType) {
_batchMutableType = batchMutableType;
}
@Override
public CompletableFuture<List<T>> get(DataFetchingEnvironment environment) throws Exception {
final B[] input = bindArgument(environment.getArgument("input"), _batchMutableType.batchInputClass());
return CompletableFuture.supplyAsync(() -> {
Timer.Context timer = MetricUtils.timer(this.getClass(), "batchMutate").time();
try {
return _batchMutableType.batchUpdate(input, environment.getContext());
} catch (AuthorizationException e) {
throw e;
} catch (Exception e) {
_logger.error("Failed to perform batchUpdate", e);
throw new IllegalArgumentException(e);
} finally {
timer.stop();
}
});
}
}

View File

@ -0,0 +1,16 @@
package com.linkedin.datahub.graphql.types;
import com.linkedin.datahub.graphql.QueryContext;
import javax.annotation.Nonnull;
import java.util.List;
public interface BatchMutableType<I, B, T> extends MutableType<I, T> {
default Class<B[]> batchInputClass() throws UnsupportedOperationException {
throw new UnsupportedOperationException(this.getClass().getName() + " does not implement batchInputClass method");
}
default List<T> batchUpdate(@Nonnull final B[] updateInput, QueryContext context) throws Exception {
throw new UnsupportedOperationException(this.getClass().getName() + " does not implement batchUpdate method");
}
}

View File

@ -1,6 +1,7 @@
package com.linkedin.datahub.graphql.types;
import com.linkedin.datahub.graphql.QueryContext;
import javax.annotation.Nonnull;
/**
@ -9,12 +10,11 @@ import javax.annotation.Nonnull;
* @param <I>: The input type corresponding to the write.
*/
public interface MutableType<I, T> {
/**
* Returns generated GraphQL class associated with the input type
*/
Class<I> inputClass();
Class<I> inputClass();
/**
* Update an entity by urn

View File

@ -11,18 +11,19 @@ import com.linkedin.datahub.graphql.authorization.AuthorizationUtils;
import com.linkedin.datahub.graphql.authorization.ConjunctivePrivilegeGroup;
import com.linkedin.datahub.graphql.authorization.DisjunctivePrivilegeGroup;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.generated.AutoCompleteResults;
import com.linkedin.datahub.graphql.generated.BrowsePath;
import com.linkedin.datahub.graphql.generated.BrowseResults;
import com.linkedin.datahub.graphql.generated.Dataset;
import com.linkedin.datahub.graphql.generated.DatasetUpdateInput;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.generated.Dataset;
import com.linkedin.datahub.graphql.generated.FacetFilterInput;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.generated.AutoCompleteResults;
import com.linkedin.datahub.graphql.generated.BrowseResults;
import com.linkedin.datahub.graphql.generated.BrowsePath;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.datahub.graphql.generated.SearchResults;
import com.linkedin.datahub.graphql.generated.BatchDatasetUpdateInput;
import com.linkedin.datahub.graphql.resolvers.ResolverUtils;
import com.linkedin.datahub.graphql.types.BatchMutableType;
import com.linkedin.datahub.graphql.types.BrowsableEntityType;
import com.linkedin.datahub.graphql.types.MutableType;
import com.linkedin.datahub.graphql.types.SearchableEntityType;
import com.linkedin.datahub.graphql.types.dataset.mappers.DatasetMapper;
import com.linkedin.datahub.graphql.types.dataset.mappers.DatasetUpdateInputMapper;
@ -40,7 +41,9 @@ import com.linkedin.metadata.search.SearchResult;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.r2.RemoteInvocationException;
import graphql.execution.DataFetcherResult;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
@ -56,7 +59,7 @@ import static com.linkedin.metadata.Constants.*;
public class DatasetType implements SearchableEntityType<Dataset, String>, BrowsableEntityType<Dataset, String>,
MutableType<DatasetUpdateInput, Dataset> {
BatchMutableType<DatasetUpdateInput, BatchDatasetUpdateInput, Dataset> {
private static final Set<String> ASPECTS_TO_RESOLVE = ImmutableSet.of(
DATASET_KEY_ASPECT_NAME,
@ -99,6 +102,11 @@ public class DatasetType implements SearchableEntityType<Dataset, String>, Brows
return DatasetUpdateInput.class;
}
@Override
public Class<BatchDatasetUpdateInput[]> batchInputClass() {
return BatchDatasetUpdateInput[].class;
}
@Override
public EntityType type() {
return EntityType.DATASET;
@ -184,6 +192,30 @@ public class DatasetType implements SearchableEntityType<Dataset, String>, Brows
return BrowsePathsMapper.map(result);
}
@Override
public List<Dataset> batchUpdate(@Nonnull BatchDatasetUpdateInput[] input, @Nonnull QueryContext context) throws Exception {
final Urn actor = Urn.createFromString(context.getAuthentication().getActor().toUrnStr());
final Collection<MetadataChangeProposal> proposals = Arrays.stream(input).map(updateInput -> {
if (isAuthorized(updateInput.getUrn(), updateInput.getUpdate(), context)) {
Collection<MetadataChangeProposal> datasetProposals = DatasetUpdateInputMapper.map(updateInput.getUpdate(), actor);
datasetProposals.forEach(proposal -> proposal.setEntityUrn(UrnUtils.getUrn(updateInput.getUrn())));
return datasetProposals;
}
throw new AuthorizationException("Unauthorized to perform this action. Please contact your DataHub administrator.");
}).flatMap(Collection::stream).collect(Collectors.toList());
final List<String> urns = Arrays.stream(input).map(BatchDatasetUpdateInput::getUrn).collect(Collectors.toList());
try {
_entityClient.batchIngestProposals(proposals, context.getAuthentication());
} catch (RemoteInvocationException e) {
throw new RuntimeException(String.format("Failed to write entity with urn %s", urns), e);
}
return batchLoad(urns, context).stream().map(DataFetcherResult::getData).collect(Collectors.toList());
}
@Override
public Dataset update(@Nonnull String urn, @Nonnull DatasetUpdateInput input, @Nonnull QueryContext context) throws Exception {
if (isAuthorized(urn, input, context)) {

View File

@ -185,6 +185,11 @@ type Mutation {
"""
updateDataset(urn: String!, input: DatasetUpdateInput!): Dataset
"""
Update the metadata about a batch of Datasets
"""
updateDatasets(input: [BatchDatasetUpdateInput!]!): [Dataset]
"""
Update the metadata about a particular Chart
"""
@ -3596,6 +3601,23 @@ input DatasetUpdateInput {
editableProperties: DatasetEditablePropertiesUpdate
}
"""
Arguments provided to batch update Dataset entities
"""
input BatchDatasetUpdateInput {
"""
Primary key of the Dataset to which the update will be applied
"""
urn: String!
"""
Arguments provided to update the Dataset
"""
update: DatasetUpdateInput!
}
"""
Update to editable schema metadata of the dataset
"""

View File

@ -0,0 +1,174 @@
package com.linkedin.datahub.graphql.resolvers.mutate;
import com.datahub.authentication.Actor;
import com.datahub.authentication.ActorType;
import com.datahub.authentication.Authentication;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.Deprecation;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.BatchDatasetUpdateInput;
import com.linkedin.datahub.graphql.generated.Dataset;
import com.linkedin.datahub.graphql.generated.DatasetDeprecationUpdate;
import com.linkedin.datahub.graphql.generated.DatasetUpdateInput;
import com.linkedin.datahub.graphql.types.BatchMutableType;
import com.linkedin.datahub.graphql.types.dataset.DatasetType;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.entity.EnvelopedAspectMap;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.metadata.Constants;
import com.linkedin.mxe.MetadataChangeProposal;
import graphql.schema.DataFetchingEnvironment;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.testng.annotations.Test;
import com.linkedin.entity.Aspect;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CompletionException;
import static com.linkedin.datahub.graphql.TestUtils.*;
import static org.testng.Assert.*;
public class MutableTypeBatchResolverTest {
private static final String TEST_DATASET_1_URN = "urn:li:dataset:id-1";
private static final String TEST_DATASET_2_URN = "urn:li:dataset:id-2";
private static final boolean TEST_DATASET_1_IS_DEPRECATED = true;
private static final boolean TEST_DATASET_2_IS_DEPRECATED = false;
private static final String TEST_DATASET_1_DEPRECATION_NOTE = "Test Deprecation Note";
private static final String TEST_DATASET_2_DEPRECATION_NOTE = "";
private static final Deprecation TEST_DATASET_1_DEPRECATION;
static {
try {
TEST_DATASET_1_DEPRECATION = new Deprecation()
.setDeprecated(TEST_DATASET_1_IS_DEPRECATED)
.setNote(TEST_DATASET_1_DEPRECATION_NOTE)
.setActor(Urn.createFromString("urn:li:corpuser:datahub"));
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
private static final Deprecation TEST_DATASET_2_DEPRECATION;
static {
try {
TEST_DATASET_2_DEPRECATION = new Deprecation()
.setDeprecated(TEST_DATASET_2_IS_DEPRECATED)
.setNote(TEST_DATASET_2_DEPRECATION_NOTE)
.setActor(Urn.createFromString("urn:li:corpuser:datahub"));
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
@Test
public void testGetSuccess() throws Exception {
EntityClient mockClient = Mockito.mock(RestliEntityClient.class);
BatchMutableType<DatasetUpdateInput, BatchDatasetUpdateInput, Dataset> batchMutableType = new DatasetType(mockClient);
MutableTypeBatchResolver<DatasetUpdateInput, BatchDatasetUpdateInput, Dataset> resolver = new MutableTypeBatchResolver<>(batchMutableType);
List<BatchDatasetUpdateInput> mockInputs = Arrays.asList(
new BatchDatasetUpdateInput.Builder()
.setUrn(TEST_DATASET_1_URN)
.setUpdate(
new DatasetUpdateInput.Builder()
.setDeprecation(
new DatasetDeprecationUpdate.Builder()
.setDeprecated(TEST_DATASET_1_IS_DEPRECATED)
.setNote(TEST_DATASET_1_DEPRECATION_NOTE)
.build()
)
.build()
)
.build(),
new BatchDatasetUpdateInput.Builder()
.setUrn(TEST_DATASET_2_URN)
.setUpdate(
new DatasetUpdateInput.Builder()
.setDeprecation(
new DatasetDeprecationUpdate.Builder()
.setDeprecated(TEST_DATASET_2_IS_DEPRECATED)
.setNote(TEST_DATASET_2_DEPRECATION_NOTE)
.build()
)
.build()
)
.build()
);
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
Mockito.when(mockEnv.getArgument("input")).thenReturn(mockInputs);
QueryContext mockContext = getMockAllowContext();
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
Authentication mockAuth = Mockito.mock(Authentication.class);
Mockito.when(mockContext.getAuthentication()).thenReturn(mockAuth);
Mockito.when(mockAuth.getActor()).thenReturn(new Actor(ActorType.USER, "datahub"));
Urn datasetUrn1 = Urn.createFromString(TEST_DATASET_1_URN);
Urn datasetUrn2 = Urn.createFromString(TEST_DATASET_2_URN);
Mockito.when(mockClient.batchGetV2(Mockito.eq(Constants.DATASET_ENTITY_NAME),
Mockito.eq(new HashSet<>(ImmutableSet.of(datasetUrn1, datasetUrn2))),
Mockito.any(),
Mockito.any(Authentication.class)))
.thenReturn(ImmutableMap.of(
datasetUrn1,
new EntityResponse()
.setEntityName(Constants.DATASET_ENTITY_NAME)
.setUrn(datasetUrn1)
.setAspects(new EnvelopedAspectMap(ImmutableMap.of(
Constants.DATASET_DEPRECATION_ASPECT_NAME,
new EnvelopedAspect().setValue(new Aspect(TEST_DATASET_1_DEPRECATION.data()))
))),
datasetUrn2,
new EntityResponse()
.setEntityName(Constants.DATASET_ENTITY_NAME)
.setUrn(datasetUrn2)
.setAspects(new EnvelopedAspectMap(ImmutableMap.of(
Constants.DATASET_DEPRECATION_ASPECT_NAME,
new EnvelopedAspect().setValue(new Aspect(TEST_DATASET_2_DEPRECATION.data()))
)))
));
List<Dataset> result = resolver.get(mockEnv).join();
ArgumentCaptor<Collection<MetadataChangeProposal>> changeProposalCaptor = ArgumentCaptor.forClass((Class) Collection.class);
Mockito.verify(mockClient, Mockito.times(1)).batchIngestProposals(changeProposalCaptor.capture(), Mockito.any());
Mockito.verify(mockClient, Mockito.times(1)).batchGetV2(
Mockito.eq(Constants.DATASET_ENTITY_NAME),
Mockito.eq(ImmutableSet.of(datasetUrn1, datasetUrn2)),
// Dataset aspects to fetch are private, but aren't important for this test
Mockito.any(),
Mockito.any(Authentication.class)
);
Collection<MetadataChangeProposal> changeProposals = changeProposalCaptor.getValue();
assertEquals(changeProposals.size(), 2);
assertEquals(result.size(), 2);
}
@Test
public void testGetFailureUnauthorized() throws Exception {
EntityClient mockClient = Mockito.mock(RestliEntityClient.class);
BatchMutableType<DatasetUpdateInput, BatchDatasetUpdateInput, Dataset> batchMutableType = new DatasetType(mockClient);
MutableTypeBatchResolver<DatasetUpdateInput, BatchDatasetUpdateInput, Dataset> resolver = new MutableTypeBatchResolver<>(batchMutableType);
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
QueryContext mockContext = getMockDenyContext();
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join());
}
}