fix(gms): Corrects MCP generation in async mode (#7214)

Co-authored-by: John Joyce <john@acryl.io>
This commit is contained in:
Pedro Silva 2023-02-02 19:45:44 +00:00 committed by GitHub
parent 2cfd82203a
commit 4732694780
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 9 additions and 9 deletions

View File

@ -118,13 +118,10 @@ public class KafkaEventProducer implements EventProducer {
@Override
@WithSpan
public void produceMetadataChangeProposal(@Nonnull final MetadataChangeProposal metadataChangeProposal) {
public void produceMetadataChangeProposal(@Nonnull final Urn urn, @Nonnull final MetadataChangeProposal
metadataChangeProposal) {
GenericRecord record;
Urn urn = metadataChangeProposal.getEntityUrn();
if (urn == null) {
throw new IllegalArgumentException("Urn for proposal cannot be null.");
}
try {
log.debug(String.format("Converting Pegasus snapshot to Avro snapshot urn %s\nMetadataChangeProposal: %s",
urn,

View File

@ -882,7 +882,7 @@ private Map<Urn, List<EnvelopedAspect>> getCorrespondingAspects(Set<EntityAspect
newSystemMetadata = result != null ? result.getNewSystemMetadata() : null;
} else {
// When async is turned on, we write to proposal log and return without waiting
_producer.produceMetadataChangeProposal(mcp);
_producer.produceMetadataChangeProposal(entityUrn, mcp);
return new IngestProposalResult(mcp.getEntityUrn(), false, true);
}
} else {

View File

@ -63,7 +63,8 @@ public interface EventProducer {
* @param metadataChangeProposal metadata change proposal to push into MCP kafka topic
*/
@WithSpan
void produceMetadataChangeProposal(@Nonnull MetadataChangeProposal metadataChangeProposal);
void produceMetadataChangeProposal(@Nonnull final Urn urn, @Nonnull final MetadataChangeProposal
metadataChangeProposal);
/**
* Produces a generic platform "event".

View File

@ -461,7 +461,8 @@ abstract public class EntityServiceTest<T_AD extends AspectDao, T_RS extends Ret
_entityService.ingestProposal(gmce, TEST_AUDIT_STAMP, true);
verify(_mockProducer, times(0)).produceMetadataChangeLog(Mockito.eq(entityUrn),
Mockito.any(), Mockito.any());
verify(_mockProducer, times(1)).produceMetadataChangeProposal(Mockito.eq(gmce));
verify(_mockProducer, times(1)).produceMetadataChangeProposal(Mockito.eq(entityUrn),
Mockito.eq(gmce));
}
@ -486,7 +487,8 @@ abstract public class EntityServiceTest<T_AD extends AspectDao, T_RS extends Ret
_entityService.ingestProposal(gmce, TEST_AUDIT_STAMP, true);
verify(_mockProducer, times(1)).produceMetadataChangeLog(Mockito.eq(entityUrn),
Mockito.any(), Mockito.any());
verify(_mockProducer, times(0)).produceMetadataChangeProposal(Mockito.eq(gmce));
verify(_mockProducer, times(0)).produceMetadataChangeProposal(Mockito.eq(entityUrn),
Mockito.eq(gmce));
}
@Test