mirror of
https://github.com/datahub-project/datahub.git
synced 2025-08-31 12:52:13 +00:00
fix(openapi): fix openapi timeseries async ingestion (#12812)
This commit is contained in:
parent
55fbb71a53
commit
47f59e62dd
@ -1361,7 +1361,8 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
|
||||
* @param aspectsBatch timeseries upserts batch
|
||||
* @return returns ingest proposal result, however was never in the MCP topic
|
||||
*/
|
||||
private Stream<IngestResult> ingestTimeseriesProposal(
|
||||
@VisibleForTesting
|
||||
Stream<IngestResult> ingestTimeseriesProposal(
|
||||
@Nonnull OperationContext opContext, AspectsBatch aspectsBatch, final boolean async) {
|
||||
|
||||
List<? extends BatchItem> unsupported =
|
||||
@ -1381,31 +1382,38 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
|
||||
return opContext.withSpan(
|
||||
"ingestTimeseriesProposal",
|
||||
() -> {
|
||||
if (!async) {
|
||||
// Handle throttling
|
||||
APIThrottle.evaluate(opContext, new HashSet<>(throttleEvents.values()), true);
|
||||
// Handle throttling
|
||||
APIThrottle.evaluate(opContext, new HashSet<>(throttleEvents.values()), true);
|
||||
|
||||
// Create default non-timeseries aspects for timeseries aspects
|
||||
List<MCPItem> timeseriesKeyAspects =
|
||||
aspectsBatch.getMCPItems().stream()
|
||||
.filter(
|
||||
item -> item.getAspectSpec() != null && item.getAspectSpec().isTimeseries())
|
||||
.map(
|
||||
item ->
|
||||
ChangeItemImpl.builder()
|
||||
.urn(item.getUrn())
|
||||
.aspectName(item.getEntitySpec().getKeyAspectName())
|
||||
.changeType(ChangeType.UPSERT)
|
||||
.entitySpec(item.getEntitySpec())
|
||||
.aspectSpec(item.getEntitySpec().getKeyAspectSpec())
|
||||
.auditStamp(item.getAuditStamp())
|
||||
.systemMetadata(item.getSystemMetadata())
|
||||
.recordTemplate(
|
||||
EntityApiUtils.buildKeyAspect(
|
||||
opContext.getEntityRegistry(), item.getUrn()))
|
||||
.build(opContext.getAspectRetriever()))
|
||||
.collect(Collectors.toList());
|
||||
// Create default non-timeseries aspects for timeseries aspects
|
||||
List<MCPItem> timeseriesKeyAspects =
|
||||
aspectsBatch.getMCPItems().stream()
|
||||
.filter(
|
||||
item -> item.getAspectSpec() != null && item.getAspectSpec().isTimeseries())
|
||||
.map(
|
||||
item ->
|
||||
ChangeItemImpl.builder()
|
||||
.urn(item.getUrn())
|
||||
.aspectName(item.getEntitySpec().getKeyAspectName())
|
||||
.changeType(ChangeType.UPSERT)
|
||||
.entitySpec(item.getEntitySpec())
|
||||
.aspectSpec(item.getEntitySpec().getKeyAspectSpec())
|
||||
.auditStamp(item.getAuditStamp())
|
||||
.systemMetadata(item.getSystemMetadata())
|
||||
.recordTemplate(
|
||||
EntityApiUtils.buildKeyAspect(
|
||||
opContext.getEntityRegistry(), item.getUrn()))
|
||||
.build(opContext.getAspectRetriever()))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
if (async) {
|
||||
ingestProposalAsync(
|
||||
opContext,
|
||||
AspectsBatchImpl.builder()
|
||||
.retrieverContext(aspectsBatch.getRetrieverContext())
|
||||
.items(timeseriesKeyAspects)
|
||||
.build());
|
||||
} else {
|
||||
ingestProposalSync(
|
||||
opContext,
|
||||
AspectsBatchImpl.builder()
|
||||
@ -1479,8 +1487,8 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
|
||||
* @param aspectsBatch non-timeseries ingest aspects
|
||||
* @return produced items to the MCP topic
|
||||
*/
|
||||
private Stream<IngestResult> ingestProposalAsync(
|
||||
OperationContext opContext, AspectsBatch aspectsBatch) {
|
||||
@VisibleForTesting
|
||||
Stream<IngestResult> ingestProposalAsync(OperationContext opContext, AspectsBatch aspectsBatch) {
|
||||
return opContext.withSpan(
|
||||
"ingestProposalAsync",
|
||||
() -> {
|
||||
@ -1524,7 +1532,8 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
|
||||
String.valueOf(aspectsBatch.getItems().size()));
|
||||
}
|
||||
|
||||
private Stream<IngestResult> ingestProposalSync(
|
||||
@VisibleForTesting
|
||||
Stream<IngestResult> ingestProposalSync(
|
||||
@Nonnull OperationContext opContext, AspectsBatch aspectsBatch) {
|
||||
|
||||
return opContext.withSpan(
|
||||
|
@ -1,8 +1,11 @@
|
||||
package com.linkedin.metadata.entity;
|
||||
|
||||
import static com.linkedin.metadata.Constants.DATASET_PROFILE_ASPECT_NAME;
|
||||
import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME;
|
||||
import static com.linkedin.metadata.Constants.UPSTREAM_LINEAGE_ASPECT_NAME;
|
||||
import static com.linkedin.metadata.entity.EntityServiceTest.TEST_AUDIT_STAMP;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
@ -13,6 +16,7 @@ import static org.testng.Assert.assertFalse;
|
||||
import static org.testng.Assert.assertNotNull;
|
||||
import static org.testng.Assert.assertNull;
|
||||
import static org.testng.Assert.assertTrue;
|
||||
import static org.testng.Assert.fail;
|
||||
|
||||
import com.datahub.util.RecordUtils;
|
||||
import com.linkedin.common.AuditStamp;
|
||||
@ -21,15 +25,20 @@ import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.common.urn.UrnUtils;
|
||||
import com.linkedin.data.template.DataTemplateUtil;
|
||||
import com.linkedin.data.template.RecordTemplate;
|
||||
import com.linkedin.dataset.DatasetProfile;
|
||||
import com.linkedin.dataset.UpstreamLineage;
|
||||
import com.linkedin.events.metadata.ChangeType;
|
||||
import com.linkedin.identity.CorpUserInfo;
|
||||
import com.linkedin.metadata.AspectGenerationUtils;
|
||||
import com.linkedin.metadata.aspect.SystemAspect;
|
||||
import com.linkedin.metadata.aspect.batch.AspectsBatch;
|
||||
import com.linkedin.metadata.aspect.batch.ChangeMCP;
|
||||
import com.linkedin.metadata.config.PreProcessHooks;
|
||||
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
|
||||
import com.linkedin.metadata.entity.ebean.EbeanSystemAspect;
|
||||
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
|
||||
import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl;
|
||||
import com.linkedin.metadata.entity.ebean.batch.DeleteItemImpl;
|
||||
import com.linkedin.metadata.event.EventProducer;
|
||||
import com.linkedin.metadata.models.registry.EntityRegistry;
|
||||
import com.linkedin.metadata.utils.GenericRecordUtils;
|
||||
@ -40,9 +49,11 @@ import com.linkedin.util.Pair;
|
||||
import io.datahubproject.metadata.context.OperationContext;
|
||||
import io.datahubproject.test.metadata.context.TestOperationContexts;
|
||||
import java.sql.Timestamp;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.stream.Stream;
|
||||
import org.testng.annotations.BeforeMethod;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
@ -505,4 +516,104 @@ public class EntityServiceImplTest {
|
||||
verify(mockEventProducer, times(1))
|
||||
.produceMetadataChangeLog(any(OperationContext.class), any(), any(), any());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIngestTimeseriesProposal() {
|
||||
// Create a spy of the EntityServiceImpl to track method calls
|
||||
EntityServiceImpl entityServiceSpy = org.mockito.Mockito.spy(entityService);
|
||||
|
||||
Urn timeseriesUrn =
|
||||
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:test,timeseriesTest,PROD)");
|
||||
DatasetProfile datasetProfile = new DatasetProfile();
|
||||
datasetProfile.setRowCount(1000);
|
||||
datasetProfile.setColumnCount(15);
|
||||
datasetProfile.setTimestampMillis(0L);
|
||||
|
||||
// Create a mock AspectsBatch with timeseries aspects
|
||||
AspectsBatch mockBatch =
|
||||
AspectsBatchImpl.builder()
|
||||
.retrieverContext(opContext.getRetrieverContext())
|
||||
.items(
|
||||
List.of(
|
||||
ChangeItemImpl.builder()
|
||||
.urn(timeseriesUrn)
|
||||
.aspectName(DATASET_PROFILE_ASPECT_NAME)
|
||||
.recordTemplate(datasetProfile)
|
||||
.changeType(ChangeType.UPSERT)
|
||||
.auditStamp(TEST_AUDIT_STAMP)
|
||||
.build(opContext.getAspectRetriever()),
|
||||
ChangeItemImpl.builder()
|
||||
.urn(timeseriesUrn)
|
||||
.aspectName(DATASET_PROFILE_ASPECT_NAME)
|
||||
.recordTemplate(datasetProfile)
|
||||
.changeType(ChangeType.UPSERT)
|
||||
.auditStamp(TEST_AUDIT_STAMP)
|
||||
.build(opContext.getAspectRetriever())))
|
||||
.build();
|
||||
|
||||
// Test case 1: async = true path
|
||||
// Arrange
|
||||
doReturn(Stream.empty())
|
||||
.when(entityServiceSpy)
|
||||
.ingestProposalAsync(any(OperationContext.class), any(AspectsBatch.class));
|
||||
|
||||
// Act
|
||||
entityServiceSpy.ingestTimeseriesProposal(opContext, mockBatch, true);
|
||||
|
||||
// Verify
|
||||
verify(entityServiceSpy, times(1))
|
||||
.ingestProposalAsync(any(OperationContext.class), any(AspectsBatch.class));
|
||||
verify(entityServiceSpy, never())
|
||||
.ingestProposalSync(any(OperationContext.class), any(AspectsBatch.class));
|
||||
|
||||
// Test case 2: async = false path
|
||||
// Arrange
|
||||
org.mockito.Mockito.reset(entityServiceSpy);
|
||||
doReturn(Stream.empty())
|
||||
.when(entityServiceSpy)
|
||||
.ingestProposalSync(any(OperationContext.class), any(AspectsBatch.class));
|
||||
|
||||
// Act
|
||||
entityServiceSpy.ingestTimeseriesProposal(opContext, mockBatch, false);
|
||||
|
||||
// Verify
|
||||
verify(entityServiceSpy, never())
|
||||
.ingestProposalAsync(any(OperationContext.class), any(AspectsBatch.class));
|
||||
verify(entityServiceSpy, times(1))
|
||||
.ingestProposalSync(any(OperationContext.class), any(AspectsBatch.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIngestTimeseriesProposalUnsupported() {
|
||||
// Create a spy of the EntityServiceImpl to track method calls
|
||||
EntityServiceImpl entityServiceSpy = org.mockito.Mockito.spy(entityService);
|
||||
|
||||
Urn timeseriesUrn =
|
||||
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:test,timeseriesUnsupportedTest,PROD)");
|
||||
|
||||
// Create a mock AspectsBatch with timeseries aspects
|
||||
AspectsBatch mockBatch =
|
||||
AspectsBatchImpl.builder()
|
||||
.retrieverContext(opContext.getRetrieverContext())
|
||||
.items(
|
||||
List.of(
|
||||
DeleteItemImpl.builder()
|
||||
.urn(timeseriesUrn)
|
||||
.aspectName(DATASET_PROFILE_ASPECT_NAME)
|
||||
.auditStamp(TEST_AUDIT_STAMP)
|
||||
.build(opContext.getAspectRetriever()),
|
||||
DeleteItemImpl.builder()
|
||||
.urn(timeseriesUrn)
|
||||
.aspectName(DATASET_PROFILE_ASPECT_NAME)
|
||||
.auditStamp(TEST_AUDIT_STAMP)
|
||||
.build(opContext.getAspectRetriever())))
|
||||
.build();
|
||||
|
||||
try {
|
||||
entityServiceSpy.ingestTimeseriesProposal(opContext, mockBatch, true);
|
||||
fail("Should throw UnsupportedOperationException for non-UPSERT change types");
|
||||
} catch (UnsupportedOperationException e) {
|
||||
// Expected
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user