diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index 20273550c8..ad6d3cb69e 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -1361,7 +1361,8 @@ public class EntityServiceImpl implements EntityService { * @param aspectsBatch timeseries upserts batch * @return returns ingest proposal result, however was never in the MCP topic */ - private Stream ingestTimeseriesProposal( + @VisibleForTesting + Stream ingestTimeseriesProposal( @Nonnull OperationContext opContext, AspectsBatch aspectsBatch, final boolean async) { List unsupported = @@ -1381,31 +1382,38 @@ public class EntityServiceImpl implements EntityService { return opContext.withSpan( "ingestTimeseriesProposal", () -> { - if (!async) { - // Handle throttling - APIThrottle.evaluate(opContext, new HashSet<>(throttleEvents.values()), true); + // Handle throttling + APIThrottle.evaluate(opContext, new HashSet<>(throttleEvents.values()), true); - // Create default non-timeseries aspects for timeseries aspects - List timeseriesKeyAspects = - aspectsBatch.getMCPItems().stream() - .filter( - item -> item.getAspectSpec() != null && item.getAspectSpec().isTimeseries()) - .map( - item -> - ChangeItemImpl.builder() - .urn(item.getUrn()) - .aspectName(item.getEntitySpec().getKeyAspectName()) - .changeType(ChangeType.UPSERT) - .entitySpec(item.getEntitySpec()) - .aspectSpec(item.getEntitySpec().getKeyAspectSpec()) - .auditStamp(item.getAuditStamp()) - .systemMetadata(item.getSystemMetadata()) - .recordTemplate( - EntityApiUtils.buildKeyAspect( - opContext.getEntityRegistry(), item.getUrn())) - .build(opContext.getAspectRetriever())) - .collect(Collectors.toList()); + // Create default non-timeseries aspects for timeseries aspects + List timeseriesKeyAspects = + aspectsBatch.getMCPItems().stream() + .filter( + item -> item.getAspectSpec() != null && item.getAspectSpec().isTimeseries()) + .map( + item -> + ChangeItemImpl.builder() + .urn(item.getUrn()) + .aspectName(item.getEntitySpec().getKeyAspectName()) + .changeType(ChangeType.UPSERT) + .entitySpec(item.getEntitySpec()) + .aspectSpec(item.getEntitySpec().getKeyAspectSpec()) + .auditStamp(item.getAuditStamp()) + .systemMetadata(item.getSystemMetadata()) + .recordTemplate( + EntityApiUtils.buildKeyAspect( + opContext.getEntityRegistry(), item.getUrn())) + .build(opContext.getAspectRetriever())) + .collect(Collectors.toList()); + if (async) { + ingestProposalAsync( + opContext, + AspectsBatchImpl.builder() + .retrieverContext(aspectsBatch.getRetrieverContext()) + .items(timeseriesKeyAspects) + .build()); + } else { ingestProposalSync( opContext, AspectsBatchImpl.builder() @@ -1479,8 +1487,8 @@ public class EntityServiceImpl implements EntityService { * @param aspectsBatch non-timeseries ingest aspects * @return produced items to the MCP topic */ - private Stream ingestProposalAsync( - OperationContext opContext, AspectsBatch aspectsBatch) { + @VisibleForTesting + Stream ingestProposalAsync(OperationContext opContext, AspectsBatch aspectsBatch) { return opContext.withSpan( "ingestProposalAsync", () -> { @@ -1524,7 +1532,8 @@ public class EntityServiceImpl implements EntityService { String.valueOf(aspectsBatch.getItems().size())); } - private Stream ingestProposalSync( + @VisibleForTesting + Stream ingestProposalSync( @Nonnull OperationContext opContext, AspectsBatch aspectsBatch) { return opContext.withSpan( diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceImplTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceImplTest.java index cf75a01ef7..aee9433ab4 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceImplTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceImplTest.java @@ -1,8 +1,11 @@ package com.linkedin.metadata.entity; +import static com.linkedin.metadata.Constants.DATASET_PROFILE_ASPECT_NAME; import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME; import static com.linkedin.metadata.Constants.UPSTREAM_LINEAGE_ASPECT_NAME; +import static com.linkedin.metadata.entity.EntityServiceTest.TEST_AUDIT_STAMP; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -13,6 +16,7 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import com.datahub.util.RecordUtils; import com.linkedin.common.AuditStamp; @@ -21,15 +25,20 @@ import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.template.DataTemplateUtil; import com.linkedin.data.template.RecordTemplate; +import com.linkedin.dataset.DatasetProfile; import com.linkedin.dataset.UpstreamLineage; +import com.linkedin.events.metadata.ChangeType; import com.linkedin.identity.CorpUserInfo; import com.linkedin.metadata.AspectGenerationUtils; import com.linkedin.metadata.aspect.SystemAspect; +import com.linkedin.metadata.aspect.batch.AspectsBatch; import com.linkedin.metadata.aspect.batch.ChangeMCP; import com.linkedin.metadata.config.PreProcessHooks; import com.linkedin.metadata.entity.ebean.EbeanAspectV2; import com.linkedin.metadata.entity.ebean.EbeanSystemAspect; +import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl; import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl; +import com.linkedin.metadata.entity.ebean.batch.DeleteItemImpl; import com.linkedin.metadata.event.EventProducer; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.utils.GenericRecordUtils; @@ -40,9 +49,11 @@ import com.linkedin.util.Pair; import io.datahubproject.metadata.context.OperationContext; import io.datahubproject.test.metadata.context.TestOperationContexts; import java.sql.Timestamp; +import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; +import java.util.stream.Stream; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -505,4 +516,104 @@ public class EntityServiceImplTest { verify(mockEventProducer, times(1)) .produceMetadataChangeLog(any(OperationContext.class), any(), any(), any()); } + + @Test + public void testIngestTimeseriesProposal() { + // Create a spy of the EntityServiceImpl to track method calls + EntityServiceImpl entityServiceSpy = org.mockito.Mockito.spy(entityService); + + Urn timeseriesUrn = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:test,timeseriesTest,PROD)"); + DatasetProfile datasetProfile = new DatasetProfile(); + datasetProfile.setRowCount(1000); + datasetProfile.setColumnCount(15); + datasetProfile.setTimestampMillis(0L); + + // Create a mock AspectsBatch with timeseries aspects + AspectsBatch mockBatch = + AspectsBatchImpl.builder() + .retrieverContext(opContext.getRetrieverContext()) + .items( + List.of( + ChangeItemImpl.builder() + .urn(timeseriesUrn) + .aspectName(DATASET_PROFILE_ASPECT_NAME) + .recordTemplate(datasetProfile) + .changeType(ChangeType.UPSERT) + .auditStamp(TEST_AUDIT_STAMP) + .build(opContext.getAspectRetriever()), + ChangeItemImpl.builder() + .urn(timeseriesUrn) + .aspectName(DATASET_PROFILE_ASPECT_NAME) + .recordTemplate(datasetProfile) + .changeType(ChangeType.UPSERT) + .auditStamp(TEST_AUDIT_STAMP) + .build(opContext.getAspectRetriever()))) + .build(); + + // Test case 1: async = true path + // Arrange + doReturn(Stream.empty()) + .when(entityServiceSpy) + .ingestProposalAsync(any(OperationContext.class), any(AspectsBatch.class)); + + // Act + entityServiceSpy.ingestTimeseriesProposal(opContext, mockBatch, true); + + // Verify + verify(entityServiceSpy, times(1)) + .ingestProposalAsync(any(OperationContext.class), any(AspectsBatch.class)); + verify(entityServiceSpy, never()) + .ingestProposalSync(any(OperationContext.class), any(AspectsBatch.class)); + + // Test case 2: async = false path + // Arrange + org.mockito.Mockito.reset(entityServiceSpy); + doReturn(Stream.empty()) + .when(entityServiceSpy) + .ingestProposalSync(any(OperationContext.class), any(AspectsBatch.class)); + + // Act + entityServiceSpy.ingestTimeseriesProposal(opContext, mockBatch, false); + + // Verify + verify(entityServiceSpy, never()) + .ingestProposalAsync(any(OperationContext.class), any(AspectsBatch.class)); + verify(entityServiceSpy, times(1)) + .ingestProposalSync(any(OperationContext.class), any(AspectsBatch.class)); + } + + @Test + public void testIngestTimeseriesProposalUnsupported() { + // Create a spy of the EntityServiceImpl to track method calls + EntityServiceImpl entityServiceSpy = org.mockito.Mockito.spy(entityService); + + Urn timeseriesUrn = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:test,timeseriesUnsupportedTest,PROD)"); + + // Create a mock AspectsBatch with timeseries aspects + AspectsBatch mockBatch = + AspectsBatchImpl.builder() + .retrieverContext(opContext.getRetrieverContext()) + .items( + List.of( + DeleteItemImpl.builder() + .urn(timeseriesUrn) + .aspectName(DATASET_PROFILE_ASPECT_NAME) + .auditStamp(TEST_AUDIT_STAMP) + .build(opContext.getAspectRetriever()), + DeleteItemImpl.builder() + .urn(timeseriesUrn) + .aspectName(DATASET_PROFILE_ASPECT_NAME) + .auditStamp(TEST_AUDIT_STAMP) + .build(opContext.getAspectRetriever()))) + .build(); + + try { + entityServiceSpy.ingestTimeseriesProposal(opContext, mockBatch, true); + fail("Should throw UnsupportedOperationException for non-UPSERT change types"); + } catch (UnsupportedOperationException e) { + // Expected + } + } }