From bfb903cfb874db2c19a53e0eb2d3c1c69725dba8 Mon Sep 17 00:00:00 2001 From: RyanHolstien Date: Mon, 3 Oct 2022 19:56:19 -0500 Subject: [PATCH] feat(ingest): add async option to ingest proposal endpoint (#6097) * feat(ingest): add async option to ingest proposal endpoint * small tweak to validate before write to K, also keep existing path for timeseries aspects * avoid double convert Co-authored-by: Shirshanka Das --- .../resolvers/mutate/MutationUtils.java | 2 +- .../mutate/UpdateUserSettingResolver.java | 2 +- .../resolvers/mutate/util/DeleteUtils.java | 2 +- .../mutate/util/DeprecationUtils.java | 2 +- .../resolvers/mutate/util/DomainUtils.java | 2 +- .../resolvers/mutate/util/LabelUtils.java | 2 +- .../resolvers/mutate/util/OwnerUtils.java | 2 +- .../graphql/types/chart/ChartType.java | 2 +- .../types/dashboard/DashboardType.java | 2 +- .../graphql/types/dataflow/DataFlowType.java | 2 +- .../graphql/types/datajob/DataJobType.java | 2 +- .../graphql/types/dataset/DatasetType.java | 4 +- .../graphql/types/notebook/NotebookType.java | 2 +- .../datahub/graphql/types/tag/TagType.java | 2 +- .../linkedin/datahub/graphql/TestUtils.java | 25 ++++++++++ .../BatchUpdateSoftDeletedResolverTest.java | 30 +++-------- .../BatchUpdateDeprecationResolverTest.java | 30 +++-------- .../domain/BatchSetDomainResolverTest.java | 44 ++++------------ .../glossary/AddRelatedTermsResolverTest.java | 42 +++------------- .../RemoveRelatedTermsResolverTest.java | 25 ++-------- .../glossary/UpdateNameResolverTest.java | 21 ++------ .../UpdateParentNodeResolverTest.java | 24 +++------ .../mutate/MutableTypeBatchResolverTest.java | 2 +- .../mutate/UpdateUserSettingResolverTest.java | 6 +-- .../owner/AddOwnersResolverTest.java | 25 +++------- .../owner/BatchAddOwnersResolverTest.java | 24 +++------ .../owner/BatchRemoveOwnersResolverTest.java | 21 ++------ .../resolvers/tag/AddTagsResolverTest.java | 24 +++------ .../tag/BatchAddTagsResolverTest.java | 28 +++-------- .../tag/BatchRemoveTagsResolverTest.java | 20 +++----- .../resolvers/term/AddTermsResolverTest.java | 12 ++--- .../term/BatchAddTermsResolverTest.java | 25 +++------- .../term/BatchRemoveTermsResolverTest.java | 21 ++------ .../dao/producer/KafkaEventProducer.java | 38 ++++++++++++++ .../com/linkedin/metadata/EventUtils.java | 39 ++++++++++++--- .../src/datahub/cli/cli_utils.py | 4 +- .../metadata/client/JavaEntityClient.java | 14 +++--- .../metadata/entity/DeleteEntityService.java | 4 +- .../metadata/entity/EntityService.java | 50 +++++++++++-------- .../metadata/entity/RetentionService.java | 4 +- .../metadata/event/EventProducer.java | 11 ++++ .../metadata/entity/EntityServiceTest.java | 50 ++++++++++++++++++- .../MetadataChangeProposalsProcessor.java | 2 +- .../token/StatefulTokenService.java | 4 +- .../linkedin/metadata/boot/UpgradeStep.java | 4 +- .../boot/steps/IngestPoliciesStep.java | 4 +- .../metadata/boot/steps/IngestRolesStep.java | 4 +- .../boot/steps/RemoveClientIdAspectStep.java | 2 +- .../boot/steps/RestoreDbtSiblingsIndices.java | 2 +- .../steps/UpgradeDefaultBrowsePathsStep.java | 3 +- .../steps/RestoreGlossaryIndicesTest.java | 9 ++-- .../UpgradeDefaultBrowsePathsStepTest.java | 12 +++-- .../openapi/util/MappingUtil.java | 4 +- .../com.linkedin.entity.aspects.restspec.json | 4 ++ .../com.linkedin.entity.aspects.snapshot.json | 4 ++ .../linkedin/entity/client/EntityClient.java | 26 ++++++++-- .../entity/client/RestliEntityClient.java | 3 +- .../resources/entity/AspectResource.java | 18 +++++-- .../entity/BatchIngestionRunResource.java | 3 +- 59 files changed, 396 insertions(+), 405 deletions(-) diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/MutationUtils.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/MutationUtils.java index ab7f645887..3cc64aed7f 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/MutationUtils.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/MutationUtils.java @@ -30,7 +30,7 @@ public class MutationUtils { proposal.setAspectName(aspectName); proposal.setAspect(GenericRecordUtils.serializeAspect(aspect)); proposal.setChangeType(ChangeType.UPSERT); - entityService.ingestProposal(proposal, getAuditStamp(actor)); + entityService.ingestProposal(proposal, getAuditStamp(actor), false); } public static MetadataChangeProposal buildMetadataChangeProposal(Urn urn, String aspectName, RecordTemplate aspect, Urn actor, EntityService entityService) { diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/UpdateUserSettingResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/UpdateUserSettingResolver.java index 47678973a5..86a8415da3 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/UpdateUserSettingResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/UpdateUserSettingResolver.java @@ -51,7 +51,7 @@ public class UpdateUserSettingResolver implements DataFetcher changes, EntityService entityService, Urn actor) { // TODO: Replace this with a batch ingest proposals endpoint. for (MetadataChangeProposal change : changes) { - entityService.ingestProposal(change, getAuditStamp(actor)); + entityService.ingestProposal(change, getAuditStamp(actor), false); } } } \ No newline at end of file diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/DeprecationUtils.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/DeprecationUtils.java index 48af0b4010..3a12dd8b6e 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/DeprecationUtils.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/DeprecationUtils.java @@ -89,7 +89,7 @@ public class DeprecationUtils { private static void ingestChangeProposals(List changes, EntityService entityService, Urn actor) { // TODO: Replace this with a batch ingest proposals endpoint. for (MetadataChangeProposal change : changes) { - entityService.ingestProposal(change, getAuditStamp(actor)); + entityService.ingestProposal(change, getAuditStamp(actor), false); } } } \ No newline at end of file diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/DomainUtils.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/DomainUtils.java index addd0bbd2b..e0e964b02f 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/DomainUtils.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/DomainUtils.java @@ -88,7 +88,7 @@ public class DomainUtils { private static void ingestChangeProposals(List changes, EntityService entityService, Urn actor) { // TODO: Replace this with a batch ingest proposals endpoint. for (MetadataChangeProposal change : changes) { - entityService.ingestProposal(change, getAuditStamp(actor)); + entityService.ingestProposal(change, getAuditStamp(actor), false); } } } \ No newline at end of file diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/LabelUtils.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/LabelUtils.java index 62fe5531ff..7f9c44e29e 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/LabelUtils.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/LabelUtils.java @@ -556,7 +556,7 @@ public class LabelUtils { private static void ingestChangeProposals(List changes, EntityService entityService, Urn actor) { // TODO: Replace this with a batch ingest proposals endpoint. for (MetadataChangeProposal change : changes) { - entityService.ingestProposal(change, getAuditStamp(actor)); + entityService.ingestProposal(change, getAuditStamp(actor), false); } } } diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/OwnerUtils.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/OwnerUtils.java index 4c13367f5d..f3510283cf 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/OwnerUtils.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/OwnerUtils.java @@ -215,7 +215,7 @@ public class OwnerUtils { private static void ingestChangeProposals(List changes, EntityService entityService, Urn actor) { // TODO: Replace this with a batch ingest proposals endpoint. for (MetadataChangeProposal change : changes) { - entityService.ingestProposal(change, getAuditStamp(actor)); + entityService.ingestProposal(change, getAuditStamp(actor), false); } } } diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/chart/ChartType.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/chart/ChartType.java index 87be15ef21..1c041db9f8 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/chart/ChartType.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/chart/ChartType.java @@ -203,7 +203,7 @@ public class ChartType implements SearchableEntityType, Browsable proposals.forEach(proposal -> proposal.setEntityUrn(UrnUtils.getUrn(urn))); try { - _entityClient.batchIngestProposals(proposals, context.getAuthentication()); + _entityClient.batchIngestProposals(proposals, context.getAuthentication(), false); } catch (RemoteInvocationException e) { throw new RuntimeException(String.format("Failed to write entity with urn %s", urn), e); } diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dashboard/DashboardType.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dashboard/DashboardType.java index 62566043ad..c4ef925e0d 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dashboard/DashboardType.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dashboard/DashboardType.java @@ -192,7 +192,7 @@ public class DashboardType implements SearchableEntityType, B proposals.forEach(proposal -> proposal.setEntityUrn(UrnUtils.getUrn(urn))); try { - _entityClient.batchIngestProposals(proposals, context.getAuthentication()); + _entityClient.batchIngestProposals(proposals, context.getAuthentication(), false); } catch (RemoteInvocationException e) { throw new RuntimeException(String.format("Failed to write entity with urn %s", urn), e); } diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dataflow/DataFlowType.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dataflow/DataFlowType.java index 57e03f1606..92c3fe9068 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dataflow/DataFlowType.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dataflow/DataFlowType.java @@ -179,7 +179,7 @@ public class DataFlowType implements SearchableEntityType, Bro proposals.forEach(proposal -> proposal.setEntityUrn(UrnUtils.getUrn(urn))); try { - _entityClient.batchIngestProposals(proposals, context.getAuthentication()); + _entityClient.batchIngestProposals(proposals, context.getAuthentication(), false); } catch (RemoteInvocationException e) { throw new RuntimeException(String.format("Failed to write entity with urn %s", urn), e); } diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/datajob/DataJobType.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/datajob/DataJobType.java index 8a5d74faf9..1200eecec7 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/datajob/DataJobType.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/datajob/DataJobType.java @@ -179,7 +179,7 @@ public class DataJobType implements SearchableEntityType, Brows proposals.forEach(proposal -> proposal.setEntityUrn(UrnUtils.getUrn(urn))); try { - _entityClient.batchIngestProposals(proposals, context.getAuthentication()); + _entityClient.batchIngestProposals(proposals, context.getAuthentication(), false); } catch (RemoteInvocationException e) { throw new RuntimeException(String.format("Failed to write entity with urn %s", urn), e); } diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dataset/DatasetType.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dataset/DatasetType.java index 87b96f91ae..6bb47da3a4 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dataset/DatasetType.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dataset/DatasetType.java @@ -208,7 +208,7 @@ public class DatasetType implements SearchableEntityType, Brows final List urns = Arrays.stream(input).map(BatchDatasetUpdateInput::getUrn).collect(Collectors.toList()); try { - _entityClient.batchIngestProposals(proposals, context.getAuthentication()); + _entityClient.batchIngestProposals(proposals, context.getAuthentication(), false); } catch (RemoteInvocationException e) { throw new RuntimeException(String.format("Failed to write entity with urn %s", urns), e); } @@ -224,7 +224,7 @@ public class DatasetType implements SearchableEntityType, Brows proposals.forEach(proposal -> proposal.setEntityUrn(UrnUtils.getUrn(urn))); try { - _entityClient.batchIngestProposals(proposals, context.getAuthentication()); + _entityClient.batchIngestProposals(proposals, context.getAuthentication(), false); } catch (RemoteInvocationException e) { throw new RuntimeException(String.format("Failed to write entity with urn %s", urn), e); } diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/notebook/NotebookType.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/notebook/NotebookType.java index ba715d990e..a841439c3b 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/notebook/NotebookType.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/notebook/NotebookType.java @@ -175,7 +175,7 @@ public class NotebookType implements SearchableEntityType, Bro proposals.forEach(proposal -> proposal.setEntityUrn(UrnUtils.getUrn(urn))); try { - _entityClient.batchIngestProposals(proposals, context.getAuthentication()); + _entityClient.batchIngestProposals(proposals, context.getAuthentication(), false); } catch (RemoteInvocationException e) { throw new RuntimeException(String.format("Failed to write entity with urn %s", urn), e); } diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/tag/TagType.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/tag/TagType.java index 41ae275f42..9aace61965 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/tag/TagType.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/tag/TagType.java @@ -132,7 +132,7 @@ public class TagType implements com.linkedin.datahub.graphql.types.SearchableEnt final Collection proposals = TagUpdateInputMapper.map(input, actor); proposals.forEach(proposal -> proposal.setEntityUrn(UrnUtils.getUrn(urn))); try { - _entityClient.batchIngestProposals(proposals, context.getAuthentication()); + _entityClient.batchIngestProposals(proposals, context.getAuthentication(), false); } catch (RemoteInvocationException e) { throw new RuntimeException(String.format("Failed to write entity with urn %s", urn), e); } diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/TestUtils.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/TestUtils.java index ef0cc566c5..e93f48336e 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/TestUtils.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/TestUtils.java @@ -3,6 +3,9 @@ package com.linkedin.datahub.graphql; import com.datahub.authentication.Authentication; import com.datahub.authorization.AuthorizationResult; import com.datahub.authorization.Authorizer; +import com.linkedin.common.AuditStamp; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.mxe.MetadataChangeProposal; import org.mockito.Mockito; @@ -36,5 +39,27 @@ public class TestUtils { return mockContext; } + public static void verifyIngestProposal(EntityService mockService, int numberOfInvocations, MetadataChangeProposal proposal) { + Mockito.verify(mockService, Mockito.times(numberOfInvocations)).ingestProposal( + Mockito.eq(proposal), + Mockito.any(AuditStamp.class), + Mockito.eq(false) + ); + } + + public static void verifyIngestProposal(EntityService mockService, int numberOfInvocations) { + Mockito.verify(mockService, Mockito.times(numberOfInvocations)).ingestProposal( + Mockito.any(MetadataChangeProposal.class), + Mockito.any(AuditStamp.class), + Mockito.eq(false) + ); + } + + public static void verifyNoIngestProposal(EntityService mockService) { + Mockito.verify(mockService, Mockito.times(0)).ingestProposal( + Mockito.any(), + Mockito.any(AuditStamp.class), Mockito.anyBoolean()); + } + private TestUtils() { } } diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/delete/BatchUpdateSoftDeletedResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/delete/BatchUpdateSoftDeletedResolverTest.java index 2fe927100d..12cbf21b13 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/delete/BatchUpdateSoftDeletedResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/delete/BatchUpdateSoftDeletedResolverTest.java @@ -66,10 +66,7 @@ public class BatchUpdateSoftDeletedResolverTest { proposal1.setAspect(GenericRecordUtils.serializeAspect(newStatus)); proposal1.setChangeType(ChangeType.UPSERT); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal1), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1, proposal1); final MetadataChangeProposal proposal2 = new MetadataChangeProposal(); proposal2.setEntityUrn(Urn.createFromString(TEST_ENTITY_URN_2)); @@ -78,10 +75,7 @@ public class BatchUpdateSoftDeletedResolverTest { proposal2.setAspect(GenericRecordUtils.serializeAspect(newStatus)); proposal2.setChangeType(ChangeType.UPSERT); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal2), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1, proposal2); } @Test @@ -124,10 +118,7 @@ public class BatchUpdateSoftDeletedResolverTest { proposal1.setAspect(GenericRecordUtils.serializeAspect(newStatus)); proposal1.setChangeType(ChangeType.UPSERT); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal1), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1, proposal1); final MetadataChangeProposal proposal2 = new MetadataChangeProposal(); proposal2.setEntityUrn(Urn.createFromString(TEST_ENTITY_URN_2)); @@ -136,10 +127,7 @@ public class BatchUpdateSoftDeletedResolverTest { proposal2.setAspect(GenericRecordUtils.serializeAspect(newStatus)); proposal2.setChangeType(ChangeType.UPSERT); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal2), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1, proposal2); } @Test @@ -171,9 +159,7 @@ public class BatchUpdateSoftDeletedResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any(AuditStamp.class)); + verifyNoIngestProposal(mockService); } @Test @@ -191,9 +177,7 @@ public class BatchUpdateSoftDeletedResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any(AuditStamp.class)); + verifyNoIngestProposal(mockService); } @Test @@ -202,7 +186,7 @@ public class BatchUpdateSoftDeletedResolverTest { Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal( Mockito.any(), - Mockito.any(AuditStamp.class)); + Mockito.any(AuditStamp.class), Mockito.anyBoolean()); BatchUpdateSoftDeletedResolver resolver = new BatchUpdateSoftDeletedResolver(mockService); diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/deprecation/BatchUpdateDeprecationResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/deprecation/BatchUpdateDeprecationResolverTest.java index 49c2477033..36909eb075 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/deprecation/BatchUpdateDeprecationResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/deprecation/BatchUpdateDeprecationResolverTest.java @@ -73,10 +73,7 @@ public class BatchUpdateDeprecationResolverTest { proposal1.setAspect(GenericRecordUtils.serializeAspect(newDeprecation)); proposal1.setChangeType(ChangeType.UPSERT); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal1), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1, proposal1); final MetadataChangeProposal proposal2 = new MetadataChangeProposal(); proposal2.setEntityUrn(Urn.createFromString(TEST_ENTITY_URN_2)); @@ -85,10 +82,7 @@ public class BatchUpdateDeprecationResolverTest { proposal2.setAspect(GenericRecordUtils.serializeAspect(newDeprecation)); proposal2.setChangeType(ChangeType.UPSERT); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal2), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1, proposal2); } @Test @@ -140,10 +134,7 @@ public class BatchUpdateDeprecationResolverTest { proposal1.setAspect(GenericRecordUtils.serializeAspect(newDeprecation)); proposal1.setChangeType(ChangeType.UPSERT); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal1), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1, proposal1); final MetadataChangeProposal proposal2 = new MetadataChangeProposal(); proposal2.setEntityUrn(Urn.createFromString(TEST_ENTITY_URN_2)); @@ -152,10 +143,7 @@ public class BatchUpdateDeprecationResolverTest { proposal2.setAspect(GenericRecordUtils.serializeAspect(newDeprecation)); proposal2.setChangeType(ChangeType.UPSERT); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal2), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1, proposal2); } @Test @@ -188,9 +176,7 @@ public class BatchUpdateDeprecationResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any(AuditStamp.class)); + verifyNoIngestProposal(mockService); } @Test @@ -209,9 +195,7 @@ public class BatchUpdateDeprecationResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any(AuditStamp.class)); + verifyNoIngestProposal(mockService); } @Test @@ -220,7 +204,7 @@ public class BatchUpdateDeprecationResolverTest { Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal( Mockito.any(), - Mockito.any(AuditStamp.class)); + Mockito.any(AuditStamp.class), Mockito.anyBoolean()); BatchUpdateDeprecationResolver resolver = new BatchUpdateDeprecationResolver(mockService); diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/domain/BatchSetDomainResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/domain/BatchSetDomainResolverTest.java index 756e085593..fe3bfb3dec 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/domain/BatchSetDomainResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/domain/BatchSetDomainResolverTest.java @@ -77,10 +77,7 @@ public class BatchSetDomainResolverTest { proposal1.setAspect(GenericRecordUtils.serializeAspect(newDomains)); proposal1.setChangeType(ChangeType.UPSERT); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal1), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1, proposal1); final MetadataChangeProposal proposal2 = new MetadataChangeProposal(); proposal2.setEntityUrn(Urn.createFromString(TEST_ENTITY_URN_2)); @@ -89,10 +86,7 @@ public class BatchSetDomainResolverTest { proposal2.setAspect(GenericRecordUtils.serializeAspect(newDomains)); proposal2.setChangeType(ChangeType.UPSERT); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal2), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1, proposal2); Mockito.verify(mockService, Mockito.times(1)).exists( Mockito.eq(Urn.createFromString(TEST_DOMAIN_2_URN)) @@ -147,10 +141,7 @@ public class BatchSetDomainResolverTest { proposal1.setAspect(GenericRecordUtils.serializeAspect(newDomains)); proposal1.setChangeType(ChangeType.UPSERT); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal1), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1, proposal1); final MetadataChangeProposal proposal2 = new MetadataChangeProposal(); proposal2.setEntityUrn(Urn.createFromString(TEST_ENTITY_URN_2)); @@ -159,10 +150,7 @@ public class BatchSetDomainResolverTest { proposal2.setAspect(GenericRecordUtils.serializeAspect(newDomains)); proposal2.setChangeType(ChangeType.UPSERT); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal2), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1, proposal2); Mockito.verify(mockService, Mockito.times(1)).exists( Mockito.eq(Urn.createFromString(TEST_DOMAIN_2_URN)) @@ -215,10 +203,7 @@ public class BatchSetDomainResolverTest { proposal1.setAspect(GenericRecordUtils.serializeAspect(newDomains)); proposal1.setChangeType(ChangeType.UPSERT); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal1), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1, proposal1); final MetadataChangeProposal proposal2 = new MetadataChangeProposal(); proposal2.setEntityUrn(Urn.createFromString(TEST_ENTITY_URN_2)); @@ -227,10 +212,7 @@ public class BatchSetDomainResolverTest { proposal2.setAspect(GenericRecordUtils.serializeAspect(newDomains)); proposal2.setChangeType(ChangeType.UPSERT); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal2), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1, proposal2); } @Test @@ -258,9 +240,7 @@ public class BatchSetDomainResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any(AuditStamp.class)); + verifyNoIngestProposal(mockService); } @Test @@ -294,9 +274,7 @@ public class BatchSetDomainResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any(AuditStamp.class)); + verifyNoIngestProposal(mockService); } @Test @@ -315,9 +293,7 @@ public class BatchSetDomainResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any(AuditStamp.class)); + verifyNoIngestProposal(mockService); } @Test @@ -326,7 +302,7 @@ public class BatchSetDomainResolverTest { Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal( Mockito.any(), - Mockito.any(AuditStamp.class)); + Mockito.any(AuditStamp.class), Mockito.anyBoolean()); BatchSetDomainResolver resolver = new BatchSetDomainResolver(mockService); diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/glossary/AddRelatedTermsResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/glossary/AddRelatedTermsResolverTest.java index 451faf9bc8..6bbf4f4797 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/glossary/AddRelatedTermsResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/glossary/AddRelatedTermsResolverTest.java @@ -1,7 +1,6 @@ package com.linkedin.datahub.graphql.resolvers.glossary; import com.google.common.collect.ImmutableList; -import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.UrnUtils; import com.linkedin.datahub.graphql.QueryContext; @@ -9,7 +8,6 @@ import com.linkedin.datahub.graphql.generated.RelatedTermsInput; import com.linkedin.datahub.graphql.generated.TermRelationshipType; import com.linkedin.metadata.Constants; import com.linkedin.metadata.entity.EntityService; -import com.linkedin.mxe.MetadataChangeProposal; import graphql.schema.DataFetchingEnvironment; import java.util.concurrent.ExecutionException; @@ -58,10 +56,7 @@ public class AddRelatedTermsResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertTrue(resolver.get(mockEnv).get()); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.any(MetadataChangeProposal.class), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1); Mockito.verify(mockService, Mockito.times(1)).exists( Mockito.eq(Urn.createFromString(TEST_ENTITY_URN)) ); @@ -93,10 +88,7 @@ public class AddRelatedTermsResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertTrue(resolver.get(mockEnv).get()); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.any(MetadataChangeProposal.class), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1); Mockito.verify(mockService, Mockito.times(1)).exists( Mockito.eq(Urn.createFromString(TEST_ENTITY_URN)) ); @@ -125,10 +117,7 @@ public class AddRelatedTermsResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(ExecutionException.class, () -> resolver.get(mockEnv).get()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(MetadataChangeProposal.class), - Mockito.any(AuditStamp.class) - ); + verifyNoIngestProposal(mockService); } @Test @@ -148,10 +137,7 @@ public class AddRelatedTermsResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(ExecutionException.class, () -> resolver.get(mockEnv).get()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(MetadataChangeProposal.class), - Mockito.any(AuditStamp.class) - ); + verifyNoIngestProposal(mockService); } @Test @@ -172,10 +158,7 @@ public class AddRelatedTermsResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(ExecutionException.class, () -> resolver.get(mockEnv).get()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(MetadataChangeProposal.class), - Mockito.any(AuditStamp.class) - ); + verifyNoIngestProposal(mockService); } @Test @@ -196,10 +179,7 @@ public class AddRelatedTermsResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(ExecutionException.class, () -> resolver.get(mockEnv).get()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(MetadataChangeProposal.class), - Mockito.any(AuditStamp.class) - ); + verifyNoIngestProposal(mockService); } @Test @@ -220,10 +200,7 @@ public class AddRelatedTermsResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(ExecutionException.class, () -> resolver.get(mockEnv).get()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(MetadataChangeProposal.class), - Mockito.any(AuditStamp.class) - ); + verifyNoIngestProposal(mockService); } @Test @@ -246,10 +223,7 @@ public class AddRelatedTermsResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(ExecutionException.class, () -> resolver.get(mockEnv).get()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(MetadataChangeProposal.class), - Mockito.any(AuditStamp.class) - ); + verifyNoIngestProposal(mockService); } } diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/glossary/RemoveRelatedTermsResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/glossary/RemoveRelatedTermsResolverTest.java index 6a704c2b61..dd54d7f983 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/glossary/RemoveRelatedTermsResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/glossary/RemoveRelatedTermsResolverTest.java @@ -1,7 +1,6 @@ package com.linkedin.datahub.graphql.resolvers.glossary; import com.google.common.collect.ImmutableList; -import com.linkedin.common.AuditStamp; import com.linkedin.common.GlossaryTermUrnArray; import com.linkedin.common.urn.GlossaryTermUrn; import com.linkedin.common.urn.Urn; @@ -12,7 +11,6 @@ import com.linkedin.datahub.graphql.generated.TermRelationshipType; import com.linkedin.glossary.GlossaryRelatedTerms; import com.linkedin.metadata.Constants; import com.linkedin.metadata.entity.EntityService; -import com.linkedin.mxe.MetadataChangeProposal; import graphql.schema.DataFetchingEnvironment; import org.mockito.Mockito; import org.testng.annotations.Test; @@ -20,8 +18,7 @@ import org.testng.annotations.Test; import java.util.Arrays; import java.util.concurrent.ExecutionException; -import static com.linkedin.datahub.graphql.TestUtils.getMockAllowContext; -import static com.linkedin.datahub.graphql.TestUtils.getMockDenyContext; +import static com.linkedin.datahub.graphql.TestUtils.*; import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; @@ -57,10 +54,7 @@ public class RemoveRelatedTermsResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertTrue(resolver.get(mockEnv).get()); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.any(MetadataChangeProposal.class), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1); Mockito.verify(mockService, Mockito.times(1)).exists( Mockito.eq(Urn.createFromString(TEST_ENTITY_URN)) ); @@ -92,10 +86,7 @@ public class RemoveRelatedTermsResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertTrue(resolver.get(mockEnv).get()); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.any(MetadataChangeProposal.class), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1); Mockito.verify(mockService, Mockito.times(1)).exists( Mockito.eq(Urn.createFromString(TEST_ENTITY_URN)) ); @@ -123,10 +114,7 @@ public class RemoveRelatedTermsResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(ExecutionException.class, () -> resolver.get(mockEnv).get()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(MetadataChangeProposal.class), - Mockito.any(AuditStamp.class) - ); + verifyNoIngestProposal(mockService); } @Test @@ -155,10 +143,7 @@ public class RemoveRelatedTermsResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(ExecutionException.class, () -> resolver.get(mockEnv).get()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(MetadataChangeProposal.class), - Mockito.any(AuditStamp.class) - ); + verifyNoIngestProposal(mockService); Mockito.verify(mockService, Mockito.times(0)).exists( Mockito.eq(Urn.createFromString(TEST_ENTITY_URN)) ); diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/glossary/UpdateNameResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/glossary/UpdateNameResolverTest.java index e3edfe0efe..1c037ea04e 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/glossary/UpdateNameResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/glossary/UpdateNameResolverTest.java @@ -21,7 +21,7 @@ import org.testng.annotations.Test; import java.util.concurrent.CompletionException; -import static com.linkedin.datahub.graphql.TestUtils.getMockAllowContext; +import static com.linkedin.datahub.graphql.TestUtils.*; import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; @@ -71,10 +71,7 @@ public class UpdateNameResolverTest { final MetadataChangeProposal proposal = setupTests(mockEnv, mockService); assertTrue(resolver.get(mockEnv).get()); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal), - Mockito.any() - ); + verifyIngestProposal(mockService, 1, proposal); } @Test @@ -108,10 +105,7 @@ public class UpdateNameResolverTest { UpdateNameResolver resolver = new UpdateNameResolver(mockService); assertTrue(resolver.get(mockEnv).get()); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal), - Mockito.any() - ); + verifyIngestProposal(mockService, 1, proposal); } @Test @@ -145,10 +139,7 @@ public class UpdateNameResolverTest { UpdateNameResolver resolver = new UpdateNameResolver(mockService); assertTrue(resolver.get(mockEnv).get()); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal), - Mockito.any() - ); + verifyIngestProposal(mockService, 1, proposal); } @Test @@ -162,8 +153,6 @@ public class UpdateNameResolverTest { setupTests(mockEnv, mockService); assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any()); + verifyNoIngestProposal(mockService); } } diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/glossary/UpdateParentNodeResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/glossary/UpdateParentNodeResolverTest.java index 1cba0a86b9..b9161996e8 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/glossary/UpdateParentNodeResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/glossary/UpdateParentNodeResolverTest.java @@ -20,7 +20,7 @@ import org.testng.annotations.Test; import java.net.URISyntaxException; -import static com.linkedin.datahub.graphql.TestUtils.getMockAllowContext; +import static com.linkedin.datahub.graphql.TestUtils.*; import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; @@ -72,10 +72,7 @@ public class UpdateParentNodeResolverTest { final MetadataChangeProposal proposal = setupTests(mockEnv, mockService); assertTrue(resolver.get(mockEnv).get()); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal), - Mockito.any() - ); + verifyIngestProposal(mockService, 1, proposal); } @Test @@ -111,10 +108,7 @@ public class UpdateParentNodeResolverTest { UpdateParentNodeResolver resolver = new UpdateParentNodeResolver(mockService); assertTrue(resolver.get(mockEnv).get()); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal), - Mockito.any() - ); + verifyIngestProposal(mockService, 1, proposal); } @Test @@ -129,9 +123,7 @@ public class UpdateParentNodeResolverTest { setupTests(mockEnv, mockService); assertThrows(IllegalArgumentException.class, () -> resolver.get(mockEnv).join()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any()); + verifyNoIngestProposal(mockService); } @Test @@ -146,9 +138,7 @@ public class UpdateParentNodeResolverTest { setupTests(mockEnv, mockService); assertThrows(IllegalArgumentException.class, () -> resolver.get(mockEnv).join()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any()); + verifyNoIngestProposal(mockService); } @Test @@ -163,8 +153,6 @@ public class UpdateParentNodeResolverTest { setupTests(mockEnv, mockService); assertThrows(URISyntaxException.class, () -> resolver.get(mockEnv).join()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any()); + verifyNoIngestProposal(mockService); } } diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/mutate/MutableTypeBatchResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/mutate/MutableTypeBatchResolverTest.java index 04ed772033..61dd6c678e 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/mutate/MutableTypeBatchResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/mutate/MutableTypeBatchResolverTest.java @@ -144,7 +144,7 @@ public class MutableTypeBatchResolverTest { List result = resolver.get(mockEnv).join(); ArgumentCaptor> changeProposalCaptor = ArgumentCaptor.forClass((Class) Collection.class); - Mockito.verify(mockClient, Mockito.times(1)).batchIngestProposals(changeProposalCaptor.capture(), Mockito.any()); + Mockito.verify(mockClient, Mockito.times(1)).batchIngestProposals(changeProposalCaptor.capture(), Mockito.any(), Mockito.eq(false)); Mockito.verify(mockClient, Mockito.times(1)).batchGetV2( Mockito.eq(Constants.DATASET_ENTITY_NAME), Mockito.eq(ImmutableSet.of(datasetUrn1, datasetUrn2)), diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/mutate/UpdateUserSettingResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/mutate/UpdateUserSettingResolverTest.java index d21e8a8e31..605f1e4142 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/mutate/UpdateUserSettingResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/mutate/UpdateUserSettingResolverTest.java @@ -1,6 +1,5 @@ package com.linkedin.datahub.graphql.resolvers.mutate; -import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.Urn; import com.linkedin.datahub.graphql.QueryContext; import com.linkedin.datahub.graphql.generated.UpdateUserSettingInput; @@ -47,9 +46,6 @@ public class UpdateUserSettingResolverTest { proposal.setAspect(GenericRecordUtils.serializeAspect(newSettings)); proposal.setChangeType(ChangeType.UPSERT); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1, proposal); } } diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/AddOwnersResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/AddOwnersResolverTest.java index 16a8e27b75..d4bec4adb8 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/AddOwnersResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/AddOwnersResolverTest.java @@ -12,7 +12,6 @@ import com.linkedin.datahub.graphql.generated.OwnershipType; import com.linkedin.datahub.graphql.resolvers.mutate.AddOwnersResolver; import com.linkedin.metadata.Constants; import com.linkedin.metadata.entity.EntityService; -import com.linkedin.mxe.MetadataChangeProposal; import graphql.schema.DataFetchingEnvironment; import java.util.concurrent.CompletionException; import org.mockito.Mockito; @@ -56,10 +55,7 @@ public class AddOwnersResolverTest { assertTrue(resolver.get(mockEnv).get()); // Unable to easily validate exact payload due to the injected timestamp - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.any(MetadataChangeProposal.class), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1); Mockito.verify(mockService, Mockito.times(1)).exists( Mockito.eq(Urn.createFromString(TEST_OWNER_1_URN)) @@ -98,10 +94,7 @@ public class AddOwnersResolverTest { assertTrue(resolver.get(mockEnv).get()); // Unable to easily validate exact payload due to the injected timestamp - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.any(MetadataChangeProposal.class), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1); Mockito.verify(mockService, Mockito.times(1)).exists( Mockito.eq(Urn.createFromString(TEST_OWNER_1_URN)) @@ -136,9 +129,7 @@ public class AddOwnersResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any(AuditStamp.class)); + verifyNoIngestProposal(mockService); } @Test @@ -165,9 +156,7 @@ public class AddOwnersResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any(AuditStamp.class)); + verifyNoIngestProposal(mockService); } @Test @@ -185,9 +174,7 @@ public class AddOwnersResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any(AuditStamp.class)); + verifyNoIngestProposal(mockService); } @Test @@ -196,7 +183,7 @@ public class AddOwnersResolverTest { Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal( Mockito.any(), - Mockito.any(AuditStamp.class)); + Mockito.any(AuditStamp.class), Mockito.anyBoolean()); AddOwnersResolver resolver = new AddOwnersResolver(Mockito.mock(EntityService.class)); diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/BatchAddOwnersResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/BatchAddOwnersResolverTest.java index 43121fa592..3a846c8f27 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/BatchAddOwnersResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/BatchAddOwnersResolverTest.java @@ -74,10 +74,7 @@ public class BatchAddOwnersResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertTrue(resolver.get(mockEnv).get()); - Mockito.verify(mockService, Mockito.times(2)).ingestProposal( - Mockito.any(), // Ownership has a dynamically generated timestamp - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 2); Mockito.verify(mockService, Mockito.times(1)).exists( Mockito.eq(Urn.createFromString(TEST_OWNER_URN_1)) @@ -133,10 +130,7 @@ public class BatchAddOwnersResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertTrue(resolver.get(mockEnv).get()); - Mockito.verify(mockService, Mockito.times(2)).ingestProposal( - Mockito.any(), // Ownership has a dynamically generated timestamp - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 2); Mockito.verify(mockService, Mockito.times(1)).exists( Mockito.eq(Urn.createFromString(TEST_OWNER_URN_1)) @@ -180,9 +174,7 @@ public class BatchAddOwnersResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any(AuditStamp.class)); + verifyNoIngestProposal(mockService); } @Test @@ -224,9 +216,7 @@ public class BatchAddOwnersResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any(AuditStamp.class)); + verifyNoIngestProposal(mockService); } @Test @@ -253,9 +243,7 @@ public class BatchAddOwnersResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any(AuditStamp.class)); + verifyNoIngestProposal(mockService); } @Test @@ -264,7 +252,7 @@ public class BatchAddOwnersResolverTest { Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal( Mockito.any(), - Mockito.any(AuditStamp.class)); + Mockito.any(AuditStamp.class), Mockito.anyBoolean()); BatchAddOwnersResolver resolver = new BatchAddOwnersResolver(mockService); diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/BatchRemoveOwnersResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/BatchRemoveOwnersResolverTest.java index ac4e0a7cdb..6dad703929 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/BatchRemoveOwnersResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/BatchRemoveOwnersResolverTest.java @@ -14,7 +14,6 @@ import com.linkedin.datahub.graphql.generated.ResourceRefInput; import com.linkedin.datahub.graphql.resolvers.mutate.BatchRemoveOwnersResolver; import com.linkedin.metadata.Constants; import com.linkedin.metadata.entity.EntityService; -import com.linkedin.mxe.MetadataChangeProposal; import graphql.schema.DataFetchingEnvironment; import java.util.concurrent.CompletionException; import org.mockito.Mockito; @@ -67,10 +66,7 @@ public class BatchRemoveOwnersResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertTrue(resolver.get(mockEnv).get()); - Mockito.verify(mockService, Mockito.times(2)).ingestProposal( - Mockito.any(), // Ownership has a dynamically generated timestamp - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 2); } @Test @@ -116,10 +112,7 @@ public class BatchRemoveOwnersResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertTrue(resolver.get(mockEnv).get()); - Mockito.verify(mockService, Mockito.times(2)).ingestProposal( - Mockito.any(MetadataChangeProposal.class), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 2); } @Test @@ -154,9 +147,7 @@ public class BatchRemoveOwnersResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any(AuditStamp.class)); + verifyNoIngestProposal(mockService); } @Test @@ -176,9 +167,7 @@ public class BatchRemoveOwnersResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any(AuditStamp.class)); + verifyNoIngestProposal(mockService); } @Test @@ -187,7 +176,7 @@ public class BatchRemoveOwnersResolverTest { Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal( Mockito.any(), - Mockito.any(AuditStamp.class)); + Mockito.any(AuditStamp.class), Mockito.anyBoolean()); BatchRemoveOwnersResolver resolver = new BatchRemoveOwnersResolver(mockService); diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/tag/AddTagsResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/tag/AddTagsResolverTest.java index 1b1ead8815..e0769668be 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/tag/AddTagsResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/tag/AddTagsResolverTest.java @@ -70,10 +70,7 @@ public class AddTagsResolverTest { proposal.setAspect(GenericRecordUtils.serializeAspect(newTags)); proposal.setChangeType(ChangeType.UPSERT); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1, proposal); Mockito.verify(mockService, Mockito.times(1)).exists( Mockito.eq(Urn.createFromString(TEST_TAG_1_URN)) @@ -127,10 +124,7 @@ public class AddTagsResolverTest { proposal.setAspect(GenericRecordUtils.serializeAspect(newTags)); proposal.setChangeType(ChangeType.UPSERT); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1, proposal); Mockito.verify(mockService, Mockito.times(1)).exists( Mockito.eq(Urn.createFromString(TEST_TAG_1_URN)) @@ -166,9 +160,7 @@ public class AddTagsResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any(AuditStamp.class)); + verifyNoIngestProposal(mockService); } @Test @@ -196,9 +188,7 @@ public class AddTagsResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any(AuditStamp.class)); + verifyNoIngestProposal(mockService); } @Test @@ -217,9 +207,7 @@ public class AddTagsResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any(AuditStamp.class)); + verifyNoIngestProposal(mockService); } @Test @@ -228,7 +216,7 @@ public class AddTagsResolverTest { Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal( Mockito.any(), - Mockito.any(AuditStamp.class)); + Mockito.any(AuditStamp.class), Mockito.eq(false)); AddTagsResolver resolver = new AddTagsResolver(Mockito.mock(EntityService.class)); diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/tag/BatchAddTagsResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/tag/BatchAddTagsResolverTest.java index 0eb3611380..4991f1b59a 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/tag/BatchAddTagsResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/tag/BatchAddTagsResolverTest.java @@ -83,10 +83,7 @@ public class BatchAddTagsResolverTest { proposal1.setAspect(GenericRecordUtils.serializeAspect(newTags)); proposal1.setChangeType(ChangeType.UPSERT); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal1), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1, proposal1); final MetadataChangeProposal proposal2 = new MetadataChangeProposal(); proposal2.setEntityUrn(Urn.createFromString(TEST_ENTITY_URN_2)); @@ -95,10 +92,7 @@ public class BatchAddTagsResolverTest { proposal2.setAspect(GenericRecordUtils.serializeAspect(newTags)); proposal2.setChangeType(ChangeType.UPSERT); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal2), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1, proposal2); Mockito.verify(mockService, Mockito.times(1)).exists( Mockito.eq(Urn.createFromString(TEST_TAG_1_URN)) @@ -162,10 +156,7 @@ public class BatchAddTagsResolverTest { proposal1.setAspect(GenericRecordUtils.serializeAspect(newTags)); proposal1.setChangeType(ChangeType.UPSERT); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal1), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1, proposal1); final MetadataChangeProposal proposal2 = new MetadataChangeProposal(); proposal2.setEntityUrn(Urn.createFromString(TEST_ENTITY_URN_2)); @@ -174,10 +165,7 @@ public class BatchAddTagsResolverTest { proposal2.setAspect(GenericRecordUtils.serializeAspect(newTags)); proposal2.setChangeType(ChangeType.UPSERT); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal2), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1, proposal2); Mockito.verify(mockService, Mockito.times(1)).exists( Mockito.eq(Urn.createFromString(TEST_TAG_1_URN)) @@ -217,7 +205,7 @@ public class BatchAddTagsResolverTest { assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); Mockito.verify(mockService, Mockito.times(0)).ingestProposal( Mockito.any(), - Mockito.any(AuditStamp.class)); + Mockito.any(AuditStamp.class), Mockito.anyBoolean()); } @Test @@ -256,7 +244,7 @@ public class BatchAddTagsResolverTest { assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); Mockito.verify(mockService, Mockito.times(0)).ingestProposal( Mockito.any(), - Mockito.any(AuditStamp.class)); + Mockito.any(AuditStamp.class), Mockito.anyBoolean()); } @Test @@ -280,7 +268,7 @@ public class BatchAddTagsResolverTest { assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); Mockito.verify(mockService, Mockito.times(0)).ingestProposal( Mockito.any(), - Mockito.any(AuditStamp.class)); + Mockito.any(AuditStamp.class), Mockito.anyBoolean()); } @Test @@ -289,7 +277,7 @@ public class BatchAddTagsResolverTest { Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal( Mockito.any(), - Mockito.any(AuditStamp.class)); + Mockito.any(AuditStamp.class), Mockito.anyBoolean()); BatchAddTagsResolver resolver = new BatchAddTagsResolver(mockService); diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/tag/BatchRemoveTagsResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/tag/BatchRemoveTagsResolverTest.java index 124927ff0a..7d3d876361 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/tag/BatchRemoveTagsResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/tag/BatchRemoveTagsResolverTest.java @@ -81,7 +81,7 @@ public class BatchRemoveTagsResolverTest { Mockito.verify(mockService, Mockito.times(1)).ingestProposal( Mockito.eq(proposal1), - Mockito.any(AuditStamp.class) + Mockito.any(AuditStamp.class), Mockito.eq(false) ); final MetadataChangeProposal proposal2 = new MetadataChangeProposal(); @@ -91,10 +91,7 @@ public class BatchRemoveTagsResolverTest { proposal2.setAspect(GenericRecordUtils.serializeAspect(emptyTags)); proposal2.setChangeType(ChangeType.UPSERT); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal2), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1, proposal2); } @Test @@ -154,7 +151,7 @@ public class BatchRemoveTagsResolverTest { Mockito.verify(mockService, Mockito.times(1)).ingestProposal( Mockito.eq(proposal1), - Mockito.any(AuditStamp.class) + Mockito.any(AuditStamp.class), Mockito.eq(false) ); final MetadataChangeProposal proposal2 = new MetadataChangeProposal(); @@ -164,10 +161,7 @@ public class BatchRemoveTagsResolverTest { proposal2.setAspect(GenericRecordUtils.serializeAspect(emptyTags)); proposal2.setChangeType(ChangeType.UPSERT); - Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.eq(proposal2), - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 1, proposal2); } @Test @@ -206,7 +200,7 @@ public class BatchRemoveTagsResolverTest { assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); Mockito.verify(mockService, Mockito.times(0)).ingestProposal( Mockito.any(), - Mockito.any(AuditStamp.class)); + Mockito.any(AuditStamp.class), Mockito.anyBoolean()); } @Test @@ -230,7 +224,7 @@ public class BatchRemoveTagsResolverTest { assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); Mockito.verify(mockService, Mockito.times(0)).ingestProposal( Mockito.any(), - Mockito.any(AuditStamp.class)); + Mockito.any(AuditStamp.class), Mockito.anyBoolean()); } @Test @@ -239,7 +233,7 @@ public class BatchRemoveTagsResolverTest { Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal( Mockito.any(), - Mockito.any(AuditStamp.class)); + Mockito.any(AuditStamp.class), Mockito.anyBoolean()); BatchRemoveTagsResolver resolver = new BatchRemoveTagsResolver(mockService); diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/term/AddTermsResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/term/AddTermsResolverTest.java index 2ac8842d95..c9ec92001f 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/term/AddTermsResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/term/AddTermsResolverTest.java @@ -59,7 +59,7 @@ public class AddTermsResolverTest { // Unable to easily validate exact payload due to the injected timestamp Mockito.verify(mockService, Mockito.times(1)).ingestProposal( Mockito.any(MetadataChangeProposal.class), - Mockito.any(AuditStamp.class) + Mockito.any(AuditStamp.class), Mockito.eq(false) ); Mockito.verify(mockService, Mockito.times(1)).exists( @@ -105,7 +105,7 @@ public class AddTermsResolverTest { // Unable to easily validate exact payload due to the injected timestamp Mockito.verify(mockService, Mockito.times(1)).ingestProposal( Mockito.any(MetadataChangeProposal.class), - Mockito.any(AuditStamp.class) + Mockito.any(AuditStamp.class), Mockito.eq(false) ); Mockito.verify(mockService, Mockito.times(1)).exists( @@ -144,7 +144,7 @@ public class AddTermsResolverTest { assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); Mockito.verify(mockService, Mockito.times(0)).ingestProposal( Mockito.any(), - Mockito.any(AuditStamp.class)); + Mockito.any(AuditStamp.class), Mockito.anyBoolean()); } @Test @@ -174,7 +174,7 @@ public class AddTermsResolverTest { assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); Mockito.verify(mockService, Mockito.times(0)).ingestProposal( Mockito.any(), - Mockito.any(AuditStamp.class)); + Mockito.any(AuditStamp.class), Mockito.anyBoolean()); } @Test @@ -195,7 +195,7 @@ public class AddTermsResolverTest { assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); Mockito.verify(mockService, Mockito.times(0)).ingestProposal( Mockito.any(), - Mockito.any(AuditStamp.class)); + Mockito.any(AuditStamp.class), Mockito.anyBoolean()); } @Test @@ -204,7 +204,7 @@ public class AddTermsResolverTest { Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal( Mockito.any(), - Mockito.any(AuditStamp.class)); + Mockito.any(AuditStamp.class), Mockito.anyBoolean()); AddTermsResolver resolver = new AddTermsResolver(Mockito.mock(EntityService.class)); diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/term/BatchAddTermsResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/term/BatchAddTermsResolverTest.java index 78655daf13..dfe1394635 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/term/BatchAddTermsResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/term/BatchAddTermsResolverTest.java @@ -14,7 +14,6 @@ import com.linkedin.datahub.graphql.generated.ResourceRefInput; import com.linkedin.datahub.graphql.resolvers.mutate.BatchAddTermsResolver; import com.linkedin.metadata.Constants; import com.linkedin.metadata.entity.EntityService; -import com.linkedin.mxe.MetadataChangeProposal; import graphql.schema.DataFetchingEnvironment; import java.util.concurrent.CompletionException; import org.mockito.Mockito; @@ -67,10 +66,7 @@ public class BatchAddTermsResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertTrue(resolver.get(mockEnv).get()); - Mockito.verify(mockService, Mockito.times(2)).ingestProposal( - Mockito.any(MetadataChangeProposal.class), // glossary terms contains a dynamically generated audit stamp - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 2); Mockito.verify(mockService, Mockito.times(1)).exists( Mockito.eq(Urn.createFromString(TEST_GLOSSARY_TERM_1_URN)) @@ -122,10 +118,7 @@ public class BatchAddTermsResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertTrue(resolver.get(mockEnv).get()); - Mockito.verify(mockService, Mockito.times(2)).ingestProposal( - Mockito.any(MetadataChangeProposal.class), // glossary terms contains a dynamically generated audit stamp - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 2); Mockito.verify(mockService, Mockito.times(1)).exists( Mockito.eq(Urn.createFromString(TEST_GLOSSARY_TERM_1_URN)) @@ -162,9 +155,7 @@ public class BatchAddTermsResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any(AuditStamp.class)); + verifyNoIngestProposal(mockService); } @Test @@ -200,9 +191,7 @@ public class BatchAddTermsResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any(AuditStamp.class)); + verifyNoIngestProposal(mockService); } @Test @@ -223,9 +212,7 @@ public class BatchAddTermsResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any(AuditStamp.class)); + verifyNoIngestProposal(mockService); } @Test @@ -234,7 +221,7 @@ public class BatchAddTermsResolverTest { Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal( Mockito.any(), - Mockito.any(AuditStamp.class)); + Mockito.any(AuditStamp.class), Mockito.anyBoolean()); BatchAddTermsResolver resolver = new BatchAddTermsResolver(mockService); diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/term/BatchRemoveTermsResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/term/BatchRemoveTermsResolverTest.java index cc5d825ac5..dcc8659c1b 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/term/BatchRemoveTermsResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/term/BatchRemoveTermsResolverTest.java @@ -14,7 +14,6 @@ import com.linkedin.datahub.graphql.generated.ResourceRefInput; import com.linkedin.datahub.graphql.resolvers.mutate.BatchRemoveTermsResolver; import com.linkedin.metadata.Constants; import com.linkedin.metadata.entity.EntityService; -import com.linkedin.mxe.MetadataChangeProposal; import graphql.schema.DataFetchingEnvironment; import java.util.concurrent.CompletionException; import org.mockito.Mockito; @@ -67,10 +66,7 @@ public class BatchRemoveTermsResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertTrue(resolver.get(mockEnv).get()); - Mockito.verify(mockService, Mockito.times(2)).ingestProposal( - Mockito.any(MetadataChangeProposal.class), // Glossary terms contains dynamically generated audit stamp - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 2); } @Test @@ -119,10 +115,7 @@ public class BatchRemoveTermsResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertTrue(resolver.get(mockEnv).get()); - Mockito.verify(mockService, Mockito.times(2)).ingestProposal( - Mockito.any(MetadataChangeProposal.class), // Glossary terms contains dynamically generated audit stamp - Mockito.any(AuditStamp.class) - ); + verifyIngestProposal(mockService, 2); } @Test @@ -159,9 +152,7 @@ public class BatchRemoveTermsResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any(AuditStamp.class)); + verifyNoIngestProposal(mockService); } @Test @@ -183,9 +174,7 @@ public class BatchRemoveTermsResolverTest { Mockito.when(mockEnv.getContext()).thenReturn(mockContext); assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); - Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any(AuditStamp.class)); + verifyNoIngestProposal(mockService); } @Test @@ -194,7 +183,7 @@ public class BatchRemoveTermsResolverTest { Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal( Mockito.any(), - Mockito.any(AuditStamp.class)); + Mockito.any(AuditStamp.class), Mockito.anyBoolean()); BatchRemoveTermsResolver resolver = new BatchRemoveTermsResolver(mockService); diff --git a/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaEventProducer.java b/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaEventProducer.java index d781d2b325..2af6d78b41 100644 --- a/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaEventProducer.java +++ b/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaEventProducer.java @@ -10,6 +10,7 @@ import com.linkedin.metadata.snapshot.Snapshot; import com.linkedin.mxe.MetadataAuditEvent; import com.linkedin.mxe.MetadataAuditOperation; import com.linkedin.mxe.MetadataChangeLog; +import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.mxe.PlatformEvent; import com.linkedin.mxe.SystemMetadata; import com.linkedin.mxe.TopicConvention; @@ -155,6 +156,43 @@ public class KafkaEventProducer implements EventProducer { } } + @Override + @WithSpan + public void produceMetadataChangeProposal(@Nonnull final MetadataChangeProposal metadataChangeProposal) { + GenericRecord record; + + Urn urn = metadataChangeProposal.getEntityUrn(); + if (urn == null) { + throw new IllegalArgumentException("Urn for proposal cannot be null."); + } + try { + log.debug(String.format("Converting Pegasus snapshot to Avro snapshot urn %s\nMetadataChangeProposal: %s", + urn, + metadataChangeProposal)); + record = EventUtils.pegasusToAvroMCP(metadataChangeProposal); + } catch (IOException e) { + log.error(String.format("Failed to convert Pegasus MCP to Avro: %s", metadataChangeProposal), e); + throw new ModelConversionException("Failed to convert Pegasus MCP to Avro", e); + } + + String topic = _topicConvention.getMetadataChangeProposalTopicName(); + if (_callback.isPresent()) { + _producer.send(new ProducerRecord(topic, urn.toString(), record), _callback.get()); + } else { + _producer.send(new ProducerRecord(topic, urn.toString(), record), (metadata, e) -> { + if (e != null) { + log.error(String.format("Failed to emit MCP for entity with urn %s", urn), e); + } else { + log.debug(String.format("Successfully emitted MCP for entity with urn %s at offset %s, partition %s, topic %s", + urn, + metadata.offset(), + metadata.partition(), + metadata.topic())); + } + }); + } + } + @Override public void producePlatformEvent(@Nonnull String name, @Nullable String key, @Nonnull PlatformEvent event) { GenericRecord record; diff --git a/metadata-events/mxe-utils-avro-1.7/src/main/java/com/linkedin/metadata/EventUtils.java b/metadata-events/mxe-utils-avro-1.7/src/main/java/com/linkedin/metadata/EventUtils.java index 582c519392..dca9ef3865 100644 --- a/metadata-events/mxe-utils-avro-1.7/src/main/java/com/linkedin/metadata/EventUtils.java +++ b/metadata-events/mxe-utils-avro-1.7/src/main/java/com/linkedin/metadata/EventUtils.java @@ -171,7 +171,7 @@ public class EventUtils { public static GenericRecord pegasusToAvroMAE(@Nonnull MetadataAuditEvent event) throws IOException { GenericRecord original = DataTranslator.dataMapToGenericRecord(event.data(), event.schema(), ORIGINAL_MAE_AVRO_SCHEMA); - return renameSchemaNamespace(original, ORIGINAL_MAE_AVRO_SCHEMA, RENAMED_MAE_AVRO_SCHEMA); + return renameSchemaNamespace(original, RENAMED_MAE_AVRO_SCHEMA); } /** @@ -185,7 +185,21 @@ public class EventUtils { public static GenericRecord pegasusToAvroMCL(@Nonnull MetadataChangeLog event) throws IOException { GenericRecord original = DataTranslator.dataMapToGenericRecord(event.data(), event.schema(), ORIGINAL_MCL_AVRO_SCHEMA); - return renameSchemaNamespace(original, ORIGINAL_MCL_AVRO_SCHEMA, RENAMED_MCL_AVRO_SCHEMA); + return renameSchemaNamespace(original, RENAMED_MCL_AVRO_SCHEMA); + } + + /** + * Converts a Pegasus MAE into the equivalent Avro model as a {@link GenericRecord}. + * + * @param event the Pegasus {@link MetadataChangeProposal} model + * @return the Avro model with com.linkedin.pegasus2avro.mxe namesapce + * @throws IOException if the conversion fails + */ + @Nonnull + public static GenericRecord pegasusToAvroMCP(@Nonnull MetadataChangeProposal event) throws IOException { + GenericRecord original = + DataTranslator.dataMapToGenericRecord(event.data(), event.schema(), ORIGINAL_MCP_AVRO_SCHEMA); + return renameSchemaNamespace(original, RENAMED_MCP_AVRO_SCHEMA); } /** @@ -199,7 +213,7 @@ public class EventUtils { public static GenericRecord pegasusToAvroMCE(@Nonnull MetadataChangeEvent event) throws IOException { GenericRecord original = DataTranslator.dataMapToGenericRecord(event.data(), event.schema(), ORIGINAL_MCE_AVRO_SCHEMA); - return renameSchemaNamespace(original, ORIGINAL_MCE_AVRO_SCHEMA, RENAMED_MCE_AVRO_SCHEMA); + return renameSchemaNamespace(original, RENAMED_MCE_AVRO_SCHEMA); } /** @@ -232,7 +246,7 @@ public class EventUtils { GenericRecord original = DataTranslator.dataMapToGenericRecord(failedMetadataChangeEvent.data(), failedMetadataChangeEvent.schema(), ORIGINAL_FAILED_MCE_AVRO_SCHEMA); - return renameSchemaNamespace(original, ORIGINAL_FAILED_MCE_AVRO_SCHEMA, RENAMED_FAILED_MCE_AVRO_SCHEMA); + return renameSchemaNamespace(original, RENAMED_FAILED_MCE_AVRO_SCHEMA); } /** @@ -248,7 +262,7 @@ public class EventUtils { GenericRecord original = DataTranslator.dataMapToGenericRecord(failedMetadataChangeProposal.data(), failedMetadataChangeProposal.schema(), ORIGINAL_FMCL_AVRO_SCHEMA); - return renameSchemaNamespace(original, ORIGINAL_FMCL_AVRO_SCHEMA, RENAMED_FMCP_AVRO_SCHEMA); + return renameSchemaNamespace(original, RENAMED_FMCP_AVRO_SCHEMA); } /** @@ -262,13 +276,16 @@ public class EventUtils { public static GenericRecord pegasusToAvroPE(@Nonnull PlatformEvent event) throws IOException { GenericRecord original = DataTranslator.dataMapToGenericRecord(event.data(), event.schema(), ORIGINAL_PE_AVRO_SCHEMA); - return renameSchemaNamespace(original, ORIGINAL_PE_AVRO_SCHEMA, RENAMED_PE_AVRO_SCHEMA); + return renameSchemaNamespace(original, RENAMED_PE_AVRO_SCHEMA); } /** * Converts original MXE into a renamed namespace + * Does a double convert that should not be necessary since we're already converting prior to calling this method + * in most spots */ @Nonnull + @Deprecated private static GenericRecord renameSchemaNamespace(@Nonnull GenericRecord original, @Nonnull Schema originalSchema, @Nonnull Schema newSchema) throws IOException { @@ -279,6 +296,16 @@ public class EventUtils { return changeSchema(record, newSchema, newSchema); } + /** + * Converts original MXE into a renamed namespace + */ + @Nonnull + private static GenericRecord renameSchemaNamespace(@Nonnull GenericRecord original, @Nonnull Schema newSchema) + throws IOException { + + return changeSchema(original, newSchema, newSchema); + } + /** * Changes the schema of a {@link GenericRecord} to a compatible schema * diff --git a/metadata-ingestion/src/datahub/cli/cli_utils.py b/metadata-ingestion/src/datahub/cli/cli_utils.py index bfb79fae1c..5bd8841e67 100644 --- a/metadata-ingestion/src/datahub/cli/cli_utils.py +++ b/metadata-ingestion/src/datahub/cli/cli_utils.py @@ -555,6 +555,7 @@ def post_entity( aspect_name: str, aspect_value: Dict, cached_session_host: Optional[Tuple[Session, str]] = None, + is_async: Optional[str] = "false", ) -> int: session, gms_host = cached_session_host or get_session_and_host() endpoint: str = "/aspects/?action=ingestProposal" @@ -569,7 +570,8 @@ def post_entity( "contentType": "application/json", "value": json.dumps(aspect_value), }, - } + }, + "async": is_async, } payload = json.dumps(proposal) url = gms_host + endpoint diff --git a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java index ae7740021a..4324b77880 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java @@ -8,6 +8,7 @@ import com.linkedin.aspect.GetTimeseriesAspectValuesResponse; import com.linkedin.common.AuditStamp; import com.linkedin.common.VersionedUrn; import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.DataMap; import com.linkedin.data.template.RecordTemplate; import com.linkedin.data.template.StringArray; @@ -437,20 +438,17 @@ public class JavaEntityClient implements EntityClient { } // TODO: Factor out ingest logic into a util that can be accessed by the java client and the resource - @SneakyThrows @Override - public String ingestProposal( - @Nonnull final MetadataChangeProposal metadataChangeProposal, - @Nonnull final Authentication authentication) throws RemoteInvocationException { - + public String ingestProposal(@Nonnull final MetadataChangeProposal metadataChangeProposal, + @Nonnull final Authentication authentication, final boolean async) throws RemoteInvocationException { String actorUrnStr = authentication.getActor() != null ? authentication.getActor().toUrnStr() : Constants.UNKNOWN_ACTOR; final AuditStamp auditStamp = - new AuditStamp().setTime(_clock.millis()).setActor(Urn.createFromString(actorUrnStr)); + new AuditStamp().setTime(_clock.millis()).setActor(UrnUtils.getUrn(actorUrnStr)); final List additionalChanges = AspectUtils.getAdditionalChanges(metadataChangeProposal, _entityService); - Urn urn = _entityService.ingestProposal(metadataChangeProposal, auditStamp).getUrn(); - additionalChanges.forEach(proposal -> _entityService.ingestProposal(proposal, auditStamp)); + Urn urn = _entityService.ingestProposal(metadataChangeProposal, auditStamp, async).getUrn(); + additionalChanges.forEach(proposal -> _entityService.ingestProposal(proposal, auditStamp, async)); tryIndexRunId(urn, metadataChangeProposal.getSystemMetadata()); return urn.toString(); } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/DeleteEntityService.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/DeleteEntityService.java index ceced5dd83..5f61e2ebe5 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/DeleteEntityService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/DeleteEntityService.java @@ -248,7 +248,7 @@ public class DeleteEntityService { proposal.setAspectName(aspectName); final AuditStamp auditStamp = new AuditStamp().setActor(UrnUtils.getUrn(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis()); - final EntityService.IngestProposalResult ingestProposalResult = _entityService.ingestProposal(proposal, auditStamp); + final EntityService.IngestProposalResult ingestProposalResult = _entityService.ingestProposal(proposal, auditStamp, false); if (!ingestProposalResult.isDidUpdate()) { log.error("Failed to ingest aspect with references removed. Before {}, after: null, please check MCP processor" @@ -276,7 +276,7 @@ public class DeleteEntityService { proposal.setAspect(GenericRecordUtils.serializeAspect(newAspect)); final AuditStamp auditStamp = new AuditStamp().setActor(UrnUtils.getUrn(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis()); - final EntityService.IngestProposalResult ingestProposalResult = _entityService.ingestProposal(proposal, auditStamp); + final EntityService.IngestProposalResult ingestProposalResult = _entityService.ingestProposal(proposal, auditStamp, false); if (!ingestProposalResult.isDidUpdate()) { log.error("Failed to ingest aspect with references removed. Before {}, after: {}, please check MCP processor" diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityService.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityService.java index 9f0e955053..0fbbd559e7 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityService.java @@ -830,10 +830,11 @@ private Map> getCorrespondingAspects(Set> getCorrespondingAspects(Set> getCorrespondingAspects(Set> getCorrespondingAspects(Set> getCorrespondingAspects(Set additionalChanges = AspectUtils.getAdditionalChanges(proposal, _entityService); - _entityService.ingestProposal(proposal, auditStamp); - additionalChanges.forEach(mcp -> _entityService.ingestProposal(mcp, auditStamp)); + _entityService.ingestProposal(proposal, auditStamp, false); + additionalChanges.forEach(mcp -> _entityService.ingestProposal(mcp, auditStamp, false)); return accessToken; } diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/UpgradeStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/UpgradeStep.java index 3cf0565892..28fa05bb48 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/UpgradeStep.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/UpgradeStep.java @@ -100,7 +100,7 @@ public abstract class UpgradeStep implements BootstrapStep { upgradeProposal.setAspect(GenericRecordUtils.serializeAspect(upgradeRequest)); upgradeProposal.setChangeType(ChangeType.UPSERT); - _entityService.ingestProposal(upgradeProposal, auditStamp); + _entityService.ingestProposal(upgradeProposal, auditStamp, false); } private void ingestUpgradeResultAspect() throws URISyntaxException { @@ -115,7 +115,7 @@ public abstract class UpgradeStep implements BootstrapStep { upgradeProposal.setAspect(GenericRecordUtils.serializeAspect(upgradeResult)); upgradeProposal.setChangeType(ChangeType.UPSERT); - _entityService.ingestProposal(upgradeProposal, auditStamp); + _entityService.ingestProposal(upgradeProposal, auditStamp, false); } private void cleanUpgradeAfterError(Exception e, String errorMessage) { diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestPoliciesStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestPoliciesStep.java index ab7716af52..79b4dc0f59 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestPoliciesStep.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestPoliciesStep.java @@ -167,7 +167,7 @@ public class IngestPoliciesStep implements BootstrapStep { keyAspectProposal.setEntityUrn(urn); _entityService.ingestProposal(keyAspectProposal, - new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis())); + new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis()), false); final MetadataChangeProposal proposal = new MetadataChangeProposal(); proposal.setEntityUrn(urn); @@ -177,7 +177,7 @@ public class IngestPoliciesStep implements BootstrapStep { proposal.setChangeType(ChangeType.UPSERT); _entityService.ingestProposal(proposal, - new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis())); + new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis()), false); } private boolean hasPolicy(Urn policyUrn) { diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestRolesStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestRolesStep.java index ff441caf0c..6baa71a512 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestRolesStep.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestRolesStep.java @@ -96,7 +96,7 @@ public class IngestRolesStep implements BootstrapStep { keyAspectProposal.setEntityUrn(roleUrn); _entityService.ingestProposal(keyAspectProposal, - new AuditStamp().setActor(Urn.createFromString(SYSTEM_ACTOR)).setTime(System.currentTimeMillis())); + new AuditStamp().setActor(Urn.createFromString(SYSTEM_ACTOR)).setTime(System.currentTimeMillis()), false); final MetadataChangeProposal proposal = new MetadataChangeProposal(); proposal.setEntityUrn(roleUrn); @@ -106,7 +106,7 @@ public class IngestRolesStep implements BootstrapStep { proposal.setChangeType(ChangeType.UPSERT); _entityService.ingestProposal(proposal, - new AuditStamp().setActor(Urn.createFromString(SYSTEM_ACTOR)).setTime(System.currentTimeMillis())); + new AuditStamp().setActor(Urn.createFromString(SYSTEM_ACTOR)).setTime(System.currentTimeMillis()), false); _entityService.produceMetadataChangeLog(roleUrn, DATAHUB_ROLE_ENTITY_NAME, DATAHUB_ROLE_INFO_ASPECT_NAME, roleInfoAspectSpec, null, dataHubRoleInfo, null, null, auditStamp, ChangeType.RESTATE); diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RemoveClientIdAspectStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RemoveClientIdAspectStep.java index b76d935a06..2a334d7bbb 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RemoveClientIdAspectStep.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RemoveClientIdAspectStep.java @@ -70,7 +70,7 @@ public class RemoveClientIdAspectStep implements BootstrapStep { upgradeProposal.setAspect(GenericRecordUtils.serializeAspect(aspect)); upgradeProposal.setChangeType(ChangeType.UPSERT); - _entityService.ingestProposal(upgradeProposal, auditStamp); + _entityService.ingestProposal(upgradeProposal, auditStamp, false); } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreDbtSiblingsIndices.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreDbtSiblingsIndices.java index 0309d2d4b4..989ee1a39b 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreDbtSiblingsIndices.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreDbtSiblingsIndices.java @@ -168,6 +168,6 @@ public class RestoreDbtSiblingsIndices implements BootstrapStep { upgradeProposal.setAspect(GenericRecordUtils.serializeAspect(aspect)); upgradeProposal.setChangeType(ChangeType.UPSERT); - _entityService.ingestProposal(upgradeProposal, auditStamp); + _entityService.ingestProposal(upgradeProposal, auditStamp, false); } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStep.java index 9afc1f8c2f..b990400b38 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStep.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStep.java @@ -128,7 +128,8 @@ public class UpgradeDefaultBrowsePathsStep extends UpgradeStep { proposal.setAspect(GenericRecordUtils.serializeAspect(newPaths)); _entityService.ingestProposal( proposal, - auditStamp + auditStamp, + false ); } diff --git a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreGlossaryIndicesTest.java b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreGlossaryIndicesTest.java index ecac6ad05b..64120787e4 100644 --- a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreGlossaryIndicesTest.java +++ b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreGlossaryIndicesTest.java @@ -104,7 +104,8 @@ public class RestoreGlossaryIndicesTest { Mockito.verify(mockRegistry, Mockito.times(1)).getEntitySpec(Constants.GLOSSARY_NODE_ENTITY_NAME); Mockito.verify(mockService, Mockito.times(2)).ingestProposal( Mockito.any(MetadataChangeProposal.class), - Mockito.any(AuditStamp.class) + Mockito.any(AuditStamp.class), + Mockito.eq(false) ); Mockito.verify(mockService, Mockito.times(1)).produceMetadataChangeLog( Mockito.eq(glossaryTermUrn), @@ -164,7 +165,8 @@ public class RestoreGlossaryIndicesTest { Mockito.verify(mockRegistry, Mockito.times(1)).getEntitySpec(Constants.GLOSSARY_NODE_ENTITY_NAME); Mockito.verify(mockService, Mockito.times(2)).ingestProposal( Mockito.any(MetadataChangeProposal.class), - Mockito.any(AuditStamp.class) + Mockito.any(AuditStamp.class), + Mockito.eq(false) ); Mockito.verify(mockService, Mockito.times(1)).produceMetadataChangeLog( Mockito.eq(glossaryTermUrn), @@ -220,7 +222,8 @@ public class RestoreGlossaryIndicesTest { Mockito.verify(mockSearchService, Mockito.times(0)).search(Constants.GLOSSARY_NODE_ENTITY_NAME, "", null, null, 0, 1000); Mockito.verify(mockService, Mockito.times(0)).ingestProposal( Mockito.any(MetadataChangeProposal.class), - Mockito.any(AuditStamp.class) + Mockito.any(AuditStamp.class), + Mockito.anyBoolean() ); Mockito.verify(mockService, Mockito.times(0)).produceMetadataChangeLog( Mockito.eq(glossaryTermUrn), diff --git a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStepTest.java b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStepTest.java index 8891657050..5e4ad6e7fe 100644 --- a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStepTest.java +++ b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStepTest.java @@ -89,7 +89,8 @@ public class UpgradeDefaultBrowsePathsStepTest { // Verify that 4 aspects are ingested, 2 for the upgrade request / result, but none for ingesting Mockito.verify(mockService, Mockito.times(2)).ingestProposal( Mockito.any(MetadataChangeProposal.class), - Mockito.any() + Mockito.any(), + Mockito.eq(false) ); } @@ -156,7 +157,8 @@ public class UpgradeDefaultBrowsePathsStepTest { // Verify that 4 aspects are ingested, 2 for the upgrade request / result and 2 for the browse pahts Mockito.verify(mockService, Mockito.times(4)).ingestProposal( Mockito.any(MetadataChangeProposal.class), - Mockito.any() + Mockito.any(), + Mockito.eq(false) ); } @@ -223,7 +225,8 @@ public class UpgradeDefaultBrowsePathsStepTest { // Verify that 2 aspects are ingested, only those for the upgrade step Mockito.verify(mockService, Mockito.times(2)).ingestProposal( Mockito.any(MetadataChangeProposal.class), - Mockito.any() + Mockito.any(), + Mockito.eq(false) ); } @@ -248,7 +251,8 @@ public class UpgradeDefaultBrowsePathsStepTest { Mockito.verify(mockService, Mockito.times(0)).ingestProposal( Mockito.any(MetadataChangeProposal.class), - Mockito.any(AuditStamp.class) + Mockito.any(AuditStamp.class), + Mockito.anyBoolean() ); } diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/util/MappingUtil.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/util/MappingUtil.java index c0474d7125..a02853d775 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/util/MappingUtil.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/util/MappingUtil.java @@ -294,9 +294,9 @@ public class MappingUtil { log.info("Proposal: {}", serviceProposal); Throwable exceptionally = null; try { - EntityService.IngestProposalResult proposalResult = entityService.ingestProposal(serviceProposal, auditStamp); + EntityService.IngestProposalResult proposalResult = entityService.ingestProposal(serviceProposal, auditStamp, false); Urn urn = proposalResult.getUrn(); - additionalChanges.forEach(proposal -> entityService.ingestProposal(proposal, auditStamp)); + additionalChanges.forEach(proposal -> entityService.ingestProposal(proposal, auditStamp, false)); return new Pair<>(urn.toString(), proposalResult.isDidUpdate()); } catch (ValidationException ve) { exceptionally = ve; diff --git a/metadata-service/restli-api/src/main/idl/com.linkedin.entity.aspects.restspec.json b/metadata-service/restli-api/src/main/idl/com.linkedin.entity.aspects.restspec.json index cc8d9630f5..0d41df169a 100644 --- a/metadata-service/restli-api/src/main/idl/com.linkedin.entity.aspects.restspec.json +++ b/metadata-service/restli-api/src/main/idl/com.linkedin.entity.aspects.restspec.json @@ -72,6 +72,10 @@ "parameters" : [ { "name" : "proposal", "type" : "com.linkedin.mxe.MetadataChangeProposal" + }, { + "name" : "async", + "type" : "string", + "default" : "unset" } ], "returns" : "string" }, { diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json index 053986c4c1..31fc619700 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json @@ -3647,6 +3647,10 @@ "parameters" : [ { "name" : "proposal", "type" : "com.linkedin.mxe.MetadataChangeProposal" + }, { + "name" : "async", + "type" : "string", + "default" : "unset" } ], "returns" : "string" }, { diff --git a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/EntityClient.java b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/EntityClient.java index e870fcdf73..a37063bd3f 100644 --- a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/EntityClient.java +++ b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/EntityClient.java @@ -276,22 +276,40 @@ public interface EntityClient { @Nonnull Boolean getLatestValue, @Nullable Filter filter, @Nonnull Authentication authentication) throws RemoteInvocationException; - public String ingestProposal(@Nonnull final MetadataChangeProposal metadataChangeProposal, - @Nonnull final Authentication authentication) throws RemoteInvocationException; + @Deprecated + default String ingestProposal(@Nonnull final MetadataChangeProposal metadataChangeProposal, + @Nonnull final Authentication authentication) throws RemoteInvocationException { + return ingestProposal(metadataChangeProposal, authentication, false); + } + String ingestProposal(@Nonnull final MetadataChangeProposal metadataChangeProposal, + @Nonnull final Authentication authentication, final boolean async) throws RemoteInvocationException; + + @Deprecated default String wrappedIngestProposal(@Nonnull MetadataChangeProposal metadataChangeProposal, @Nonnull final Authentication authentication) { + return wrappedIngestProposal(metadataChangeProposal, authentication, false); + } + + default String wrappedIngestProposal(@Nonnull MetadataChangeProposal metadataChangeProposal, + @Nonnull final Authentication authentication, final boolean async) { try { - return ingestProposal(metadataChangeProposal, authentication); + return ingestProposal(metadataChangeProposal, authentication, async); } catch (RemoteInvocationException e) { throw new RuntimeException(e); } } + @Deprecated default List batchIngestProposals(@Nonnull final Collection metadataChangeProposals, @Nonnull final Authentication authentication) throws RemoteInvocationException { + return batchIngestProposals(metadataChangeProposals, authentication, false); + } + + default List batchIngestProposals(@Nonnull final Collection metadataChangeProposals, + @Nonnull final Authentication authentication, final boolean async) throws RemoteInvocationException { return metadataChangeProposals.stream() - .map(proposal -> wrappedIngestProposal(proposal, authentication)) + .map(proposal -> wrappedIngestProposal(proposal, authentication, async)) .collect(Collectors.toList()); } diff --git a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java index ef03ba6d34..9e8dcf5220 100644 --- a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java +++ b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java @@ -624,8 +624,9 @@ public class RestliEntityClient extends BaseClient implements EntityClient { * Ingest a MetadataChangeProposal event. * @return */ + @Override public String ingestProposal(@Nonnull final MetadataChangeProposal metadataChangeProposal, - @Nonnull final Authentication authentication) throws RemoteInvocationException { + @Nonnull final Authentication authentication, final boolean async) throws RemoteInvocationException { final AspectsDoIngestProposalRequestBuilder requestBuilder = ASPECTS_REQUEST_BUILDERS.actionIngestProposal().proposalParam(metadataChangeProposal); return sendClientRequest(requestBuilder, authentication).getEntity(); diff --git a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java index 2199fb1083..4ea4aa70db 100644 --- a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java +++ b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java @@ -62,6 +62,10 @@ public class AspectResource extends CollectionResourceTaskTemplate ingestProposal( - @ActionParam(PARAM_PROPOSAL) @Nonnull MetadataChangeProposal metadataChangeProposal) throws URISyntaxException { + @ActionParam(PARAM_PROPOSAL) @Nonnull MetadataChangeProposal metadataChangeProposal, + @ActionParam(PARAM_ASYNC) @Optional(UNSET) String async) throws URISyntaxException { log.info("INGEST PROPOSAL proposal: {}", metadataChangeProposal); + boolean asyncBool; + if (UNSET.equals(async)) { + asyncBool = Boolean.parseBoolean(System.getenv(ASYNC_INGEST_DEFAULT_NAME)); + } else { + asyncBool = Boolean.parseBoolean(async); + } + Authentication authentication = AuthenticationContext.getAuthentication(); String actorUrnStr = authentication.getActor().toUrnStr(); final AuditStamp auditStamp = new AuditStamp().setTime(_clock.millis()).setActor(Urn.createFromString(actorUrnStr)); @@ -147,8 +159,8 @@ public class AspectResource extends CollectionResourceTaskTemplate { log.debug("Proposal: {}", metadataChangeProposal); try { - Urn urn = _entityService.ingestProposal(metadataChangeProposal, auditStamp).getUrn(); - additionalChanges.forEach(proposal -> _entityService.ingestProposal(proposal, auditStamp)); + Urn urn = _entityService.ingestProposal(metadataChangeProposal, auditStamp, asyncBool).getUrn(); + additionalChanges.forEach(proposal -> _entityService.ingestProposal(proposal, auditStamp, asyncBool)); tryIndexRunId(urn, metadataChangeProposal.getSystemMetadata(), _entitySearchService); return urn.toString(); } catch (ValidationException e) { diff --git a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/BatchIngestionRunResource.java b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/BatchIngestionRunResource.java index dd3a95dbae..0a32e5af0f 100644 --- a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/BatchIngestionRunResource.java +++ b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/BatchIngestionRunResource.java @@ -268,7 +268,8 @@ public class BatchIngestionRunResource extends CollectionResourceTaskTemplate