diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java index 1394c0f6e6..e26ad3c48d 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java @@ -144,6 +144,7 @@ import com.linkedin.datahub.graphql.resolvers.mutate.AddTagResolver; import com.linkedin.datahub.graphql.resolvers.mutate.AddTagsResolver; import com.linkedin.datahub.graphql.resolvers.mutate.AddTermResolver; import com.linkedin.datahub.graphql.resolvers.mutate.AddTermsResolver; +import com.linkedin.datahub.graphql.resolvers.mutate.MutableTypeBatchResolver; import com.linkedin.datahub.graphql.resolvers.mutate.BatchAddOwnersResolver; import com.linkedin.datahub.graphql.resolvers.mutate.BatchAddTagsResolver; import com.linkedin.datahub.graphql.resolvers.mutate.BatchAddTermsResolver; @@ -681,6 +682,7 @@ public class GmsGraphQLEngine { private void configureMutationResolvers(final RuntimeWiring.Builder builder) { builder.type("Mutation", typeWiring -> typeWiring .dataFetcher("updateDataset", new MutableTypeResolver<>(datasetType)) + .dataFetcher("updateDatasets", new MutableTypeBatchResolver<>(datasetType)) .dataFetcher("createTag", new CreateTagResolver(this.entityClient)) .dataFetcher("updateTag", new MutableTypeResolver<>(tagType)) .dataFetcher("setTagColor", new SetTagColorResolver(entityClient, entityService)) diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/MutableTypeBatchResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/MutableTypeBatchResolver.java new file mode 100644 index 0000000000..30bd940a7d --- /dev/null +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/MutableTypeBatchResolver.java @@ -0,0 +1,53 @@ +package com.linkedin.datahub.graphql.resolvers.mutate; + +import com.codahale.metrics.Timer; +import com.linkedin.datahub.graphql.exception.AuthorizationException; +import com.linkedin.datahub.graphql.types.BatchMutableType; +import com.linkedin.metadata.utils.metrics.MetricUtils; +import graphql.schema.DataFetcher; +import graphql.schema.DataFetchingEnvironment; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*; + + +/** + * Generic GraphQL resolver responsible for performing updates against particular types. + * + * @param the generated GraphQL POJO corresponding to the input type. + * @param the generated GraphQL POJO corresponding to the return type. + */ +public class MutableTypeBatchResolver implements DataFetcher>> { + + private static final Logger _logger = LoggerFactory.getLogger(MutableTypeBatchResolver.class.getName()); + + private final BatchMutableType _batchMutableType; + + public MutableTypeBatchResolver(final BatchMutableType batchMutableType) { + _batchMutableType = batchMutableType; + } + + @Override + public CompletableFuture> get(DataFetchingEnvironment environment) throws Exception { + final B[] input = bindArgument(environment.getArgument("input"), _batchMutableType.batchInputClass()); + + return CompletableFuture.supplyAsync(() -> { + Timer.Context timer = MetricUtils.timer(this.getClass(), "batchMutate").time(); + + try { + return _batchMutableType.batchUpdate(input, environment.getContext()); + } catch (AuthorizationException e) { + throw e; + } catch (Exception e) { + _logger.error("Failed to perform batchUpdate", e); + throw new IllegalArgumentException(e); + } finally { + timer.stop(); + } + }); + } +} diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/BatchMutableType.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/BatchMutableType.java new file mode 100644 index 0000000000..3bd8719a37 --- /dev/null +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/BatchMutableType.java @@ -0,0 +1,16 @@ +package com.linkedin.datahub.graphql.types; + +import com.linkedin.datahub.graphql.QueryContext; + +import javax.annotation.Nonnull; +import java.util.List; + +public interface BatchMutableType extends MutableType { + default Class batchInputClass() throws UnsupportedOperationException { + throw new UnsupportedOperationException(this.getClass().getName() + " does not implement batchInputClass method"); + } + + default List batchUpdate(@Nonnull final B[] updateInput, QueryContext context) throws Exception { + throw new UnsupportedOperationException(this.getClass().getName() + " does not implement batchUpdate method"); + } +} diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/MutableType.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/MutableType.java index 6b5a349e73..94f1200d3a 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/MutableType.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/MutableType.java @@ -1,6 +1,7 @@ package com.linkedin.datahub.graphql.types; import com.linkedin.datahub.graphql.QueryContext; + import javax.annotation.Nonnull; /** @@ -9,12 +10,11 @@ import javax.annotation.Nonnull; * @param : The input type corresponding to the write. */ public interface MutableType { - /** * Returns generated GraphQL class associated with the input type */ - Class inputClass(); + Class inputClass(); /** * Update an entity by urn diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dataset/DatasetType.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dataset/DatasetType.java index 9fd5c96cc9..87b96f91ae 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dataset/DatasetType.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dataset/DatasetType.java @@ -11,18 +11,19 @@ import com.linkedin.datahub.graphql.authorization.AuthorizationUtils; import com.linkedin.datahub.graphql.authorization.ConjunctivePrivilegeGroup; import com.linkedin.datahub.graphql.authorization.DisjunctivePrivilegeGroup; import com.linkedin.datahub.graphql.exception.AuthorizationException; -import com.linkedin.datahub.graphql.generated.AutoCompleteResults; -import com.linkedin.datahub.graphql.generated.BrowsePath; -import com.linkedin.datahub.graphql.generated.BrowseResults; -import com.linkedin.datahub.graphql.generated.Dataset; import com.linkedin.datahub.graphql.generated.DatasetUpdateInput; -import com.linkedin.datahub.graphql.generated.Entity; -import com.linkedin.datahub.graphql.generated.EntityType; +import com.linkedin.datahub.graphql.generated.Dataset; import com.linkedin.datahub.graphql.generated.FacetFilterInput; +import com.linkedin.datahub.graphql.generated.EntityType; +import com.linkedin.datahub.graphql.generated.AutoCompleteResults; +import com.linkedin.datahub.graphql.generated.BrowseResults; +import com.linkedin.datahub.graphql.generated.BrowsePath; +import com.linkedin.datahub.graphql.generated.Entity; import com.linkedin.datahub.graphql.generated.SearchResults; +import com.linkedin.datahub.graphql.generated.BatchDatasetUpdateInput; import com.linkedin.datahub.graphql.resolvers.ResolverUtils; +import com.linkedin.datahub.graphql.types.BatchMutableType; import com.linkedin.datahub.graphql.types.BrowsableEntityType; -import com.linkedin.datahub.graphql.types.MutableType; import com.linkedin.datahub.graphql.types.SearchableEntityType; import com.linkedin.datahub.graphql.types.dataset.mappers.DatasetMapper; import com.linkedin.datahub.graphql.types.dataset.mappers.DatasetUpdateInputMapper; @@ -40,7 +41,9 @@ import com.linkedin.metadata.search.SearchResult; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.r2.RemoteInvocationException; import graphql.execution.DataFetcherResult; + import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -56,7 +59,7 @@ import static com.linkedin.metadata.Constants.*; public class DatasetType implements SearchableEntityType, BrowsableEntityType, - MutableType { + BatchMutableType { private static final Set ASPECTS_TO_RESOLVE = ImmutableSet.of( DATASET_KEY_ASPECT_NAME, @@ -99,6 +102,11 @@ public class DatasetType implements SearchableEntityType, Brows return DatasetUpdateInput.class; } + @Override + public Class batchInputClass() { + return BatchDatasetUpdateInput[].class; + } + @Override public EntityType type() { return EntityType.DATASET; @@ -184,6 +192,30 @@ public class DatasetType implements SearchableEntityType, Brows return BrowsePathsMapper.map(result); } + @Override + public List batchUpdate(@Nonnull BatchDatasetUpdateInput[] input, @Nonnull QueryContext context) throws Exception { + final Urn actor = Urn.createFromString(context.getAuthentication().getActor().toUrnStr()); + + final Collection proposals = Arrays.stream(input).map(updateInput -> { + if (isAuthorized(updateInput.getUrn(), updateInput.getUpdate(), context)) { + Collection datasetProposals = DatasetUpdateInputMapper.map(updateInput.getUpdate(), actor); + datasetProposals.forEach(proposal -> proposal.setEntityUrn(UrnUtils.getUrn(updateInput.getUrn()))); + return datasetProposals; + } + throw new AuthorizationException("Unauthorized to perform this action. Please contact your DataHub administrator."); + }).flatMap(Collection::stream).collect(Collectors.toList()); + + final List urns = Arrays.stream(input).map(BatchDatasetUpdateInput::getUrn).collect(Collectors.toList()); + + try { + _entityClient.batchIngestProposals(proposals, context.getAuthentication()); + } catch (RemoteInvocationException e) { + throw new RuntimeException(String.format("Failed to write entity with urn %s", urns), e); + } + + return batchLoad(urns, context).stream().map(DataFetcherResult::getData).collect(Collectors.toList()); + } + @Override public Dataset update(@Nonnull String urn, @Nonnull DatasetUpdateInput input, @Nonnull QueryContext context) throws Exception { if (isAuthorized(urn, input, context)) { diff --git a/datahub-graphql-core/src/main/resources/entity.graphql b/datahub-graphql-core/src/main/resources/entity.graphql index 59f6bb5c0f..8e0ebf7b2c 100644 --- a/datahub-graphql-core/src/main/resources/entity.graphql +++ b/datahub-graphql-core/src/main/resources/entity.graphql @@ -185,6 +185,11 @@ type Mutation { """ updateDataset(urn: String!, input: DatasetUpdateInput!): Dataset + """ + Update the metadata about a batch of Datasets + """ + updateDatasets(input: [BatchDatasetUpdateInput!]!): [Dataset] + """ Update the metadata about a particular Chart """ @@ -3596,6 +3601,23 @@ input DatasetUpdateInput { editableProperties: DatasetEditablePropertiesUpdate } +""" +Arguments provided to batch update Dataset entities +""" +input BatchDatasetUpdateInput { + + """ + Primary key of the Dataset to which the update will be applied + """ + urn: String! + + """ + Arguments provided to update the Dataset + """ + update: DatasetUpdateInput! +} + + """ Update to editable schema metadata of the dataset """ diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/mutate/MutableTypeBatchResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/mutate/MutableTypeBatchResolverTest.java new file mode 100644 index 0000000000..04ed772033 --- /dev/null +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/mutate/MutableTypeBatchResolverTest.java @@ -0,0 +1,174 @@ +package com.linkedin.datahub.graphql.resolvers.mutate; + +import com.datahub.authentication.Actor; +import com.datahub.authentication.ActorType; +import com.datahub.authentication.Authentication; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.linkedin.common.Deprecation; +import com.linkedin.common.urn.Urn; +import com.linkedin.datahub.graphql.QueryContext; +import com.linkedin.datahub.graphql.generated.BatchDatasetUpdateInput; +import com.linkedin.datahub.graphql.generated.Dataset; +import com.linkedin.datahub.graphql.generated.DatasetDeprecationUpdate; +import com.linkedin.datahub.graphql.generated.DatasetUpdateInput; +import com.linkedin.datahub.graphql.types.BatchMutableType; +import com.linkedin.datahub.graphql.types.dataset.DatasetType; +import com.linkedin.entity.EntityResponse; +import com.linkedin.entity.EnvelopedAspect; +import com.linkedin.entity.EnvelopedAspectMap; +import com.linkedin.entity.client.EntityClient; +import com.linkedin.entity.client.RestliEntityClient; +import com.linkedin.metadata.Constants; +import com.linkedin.mxe.MetadataChangeProposal; +import graphql.schema.DataFetchingEnvironment; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.testng.annotations.Test; +import com.linkedin.entity.Aspect; + +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.CompletionException; + +import static com.linkedin.datahub.graphql.TestUtils.*; +import static org.testng.Assert.*; + +public class MutableTypeBatchResolverTest { + + private static final String TEST_DATASET_1_URN = "urn:li:dataset:id-1"; + private static final String TEST_DATASET_2_URN = "urn:li:dataset:id-2"; + private static final boolean TEST_DATASET_1_IS_DEPRECATED = true; + private static final boolean TEST_DATASET_2_IS_DEPRECATED = false; + private static final String TEST_DATASET_1_DEPRECATION_NOTE = "Test Deprecation Note"; + private static final String TEST_DATASET_2_DEPRECATION_NOTE = ""; + private static final Deprecation TEST_DATASET_1_DEPRECATION; + + static { + try { + TEST_DATASET_1_DEPRECATION = new Deprecation() + .setDeprecated(TEST_DATASET_1_IS_DEPRECATED) + .setNote(TEST_DATASET_1_DEPRECATION_NOTE) + .setActor(Urn.createFromString("urn:li:corpuser:datahub")); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + private static final Deprecation TEST_DATASET_2_DEPRECATION; + + static { + try { + TEST_DATASET_2_DEPRECATION = new Deprecation() + .setDeprecated(TEST_DATASET_2_IS_DEPRECATED) + .setNote(TEST_DATASET_2_DEPRECATION_NOTE) + .setActor(Urn.createFromString("urn:li:corpuser:datahub")); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testGetSuccess() throws Exception { + EntityClient mockClient = Mockito.mock(RestliEntityClient.class); + BatchMutableType batchMutableType = new DatasetType(mockClient); + + MutableTypeBatchResolver resolver = new MutableTypeBatchResolver<>(batchMutableType); + + List mockInputs = Arrays.asList( + new BatchDatasetUpdateInput.Builder() + .setUrn(TEST_DATASET_1_URN) + .setUpdate( + new DatasetUpdateInput.Builder() + .setDeprecation( + new DatasetDeprecationUpdate.Builder() + .setDeprecated(TEST_DATASET_1_IS_DEPRECATED) + .setNote(TEST_DATASET_1_DEPRECATION_NOTE) + .build() + ) + .build() + ) + .build(), + new BatchDatasetUpdateInput.Builder() + .setUrn(TEST_DATASET_2_URN) + .setUpdate( + new DatasetUpdateInput.Builder() + .setDeprecation( + new DatasetDeprecationUpdate.Builder() + .setDeprecated(TEST_DATASET_2_IS_DEPRECATED) + .setNote(TEST_DATASET_2_DEPRECATION_NOTE) + .build() + ) + .build() + ) + .build() + ); + + DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class); + Mockito.when(mockEnv.getArgument("input")).thenReturn(mockInputs); + QueryContext mockContext = getMockAllowContext(); + Mockito.when(mockEnv.getContext()).thenReturn(mockContext); + Authentication mockAuth = Mockito.mock(Authentication.class); + Mockito.when(mockContext.getAuthentication()).thenReturn(mockAuth); + Mockito.when(mockAuth.getActor()).thenReturn(new Actor(ActorType.USER, "datahub")); + + Urn datasetUrn1 = Urn.createFromString(TEST_DATASET_1_URN); + Urn datasetUrn2 = Urn.createFromString(TEST_DATASET_2_URN); + + Mockito.when(mockClient.batchGetV2(Mockito.eq(Constants.DATASET_ENTITY_NAME), + Mockito.eq(new HashSet<>(ImmutableSet.of(datasetUrn1, datasetUrn2))), + Mockito.any(), + Mockito.any(Authentication.class))) + .thenReturn(ImmutableMap.of( + datasetUrn1, + new EntityResponse() + .setEntityName(Constants.DATASET_ENTITY_NAME) + .setUrn(datasetUrn1) + .setAspects(new EnvelopedAspectMap(ImmutableMap.of( + Constants.DATASET_DEPRECATION_ASPECT_NAME, + new EnvelopedAspect().setValue(new Aspect(TEST_DATASET_1_DEPRECATION.data())) + ))), + datasetUrn2, + new EntityResponse() + .setEntityName(Constants.DATASET_ENTITY_NAME) + .setUrn(datasetUrn2) + .setAspects(new EnvelopedAspectMap(ImmutableMap.of( + Constants.DATASET_DEPRECATION_ASPECT_NAME, + new EnvelopedAspect().setValue(new Aspect(TEST_DATASET_2_DEPRECATION.data())) + ))) + )); + + List result = resolver.get(mockEnv).join(); + + ArgumentCaptor> changeProposalCaptor = ArgumentCaptor.forClass((Class) Collection.class); + Mockito.verify(mockClient, Mockito.times(1)).batchIngestProposals(changeProposalCaptor.capture(), Mockito.any()); + Mockito.verify(mockClient, Mockito.times(1)).batchGetV2( + Mockito.eq(Constants.DATASET_ENTITY_NAME), + Mockito.eq(ImmutableSet.of(datasetUrn1, datasetUrn2)), + // Dataset aspects to fetch are private, but aren't important for this test + Mockito.any(), + Mockito.any(Authentication.class) + ); + Collection changeProposals = changeProposalCaptor.getValue(); + + assertEquals(changeProposals.size(), 2); + assertEquals(result.size(), 2); + } + + @Test + public void testGetFailureUnauthorized() throws Exception { + EntityClient mockClient = Mockito.mock(RestliEntityClient.class); + BatchMutableType batchMutableType = new DatasetType(mockClient); + + MutableTypeBatchResolver resolver = new MutableTypeBatchResolver<>(batchMutableType); + + DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class); + QueryContext mockContext = getMockDenyContext(); + Mockito.when(mockEnv.getContext()).thenReturn(mockContext); + + assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); + } +}