feat(mcl-upgrade): implement resume & urn pagination (#11889)

This commit is contained in:
david-leifker 2024-11-19 14:33:47 -06:00 committed by GitHub
parent d97885749e
commit 2d155ccaa9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 247 additions and 45 deletions

View File

@ -1,13 +1,12 @@
package com.linkedin.datahub.upgrade.system;
import static com.linkedin.metadata.Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.SystemAspect;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
@ -16,10 +15,13 @@ import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import com.linkedin.metadata.entity.ebean.PartitionedStream;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.upgrade.DataHubUpgradeResult;
import com.linkedin.upgrade.DataHubUpgradeState;
import com.linkedin.util.Pair;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Function;
@ -33,6 +35,8 @@ import lombok.extern.slf4j.Slf4j;
*/
@Slf4j
public abstract class AbstractMCLStep implements UpgradeStep {
public static final String LAST_URN_KEY = "lastUrn";
private final OperationContext opContext;
private final EntityService<?> entityService;
private final AspectDao aspectDao;
@ -70,10 +74,30 @@ public abstract class AbstractMCLStep implements UpgradeStep {
@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
// Resume state
Optional<DataHubUpgradeResult> prevResult =
context.upgrade().getUpgradeResult(opContext, getUpgradeIdUrn(), entityService);
String resumeUrn =
prevResult
.filter(
result ->
DataHubUpgradeState.IN_PROGRESS.equals(result.getState())
&& result.getResult() != null
&& result.getResult().containsKey(LAST_URN_KEY))
.map(result -> result.getResult().get(LAST_URN_KEY))
.orElse(null);
if (resumeUrn != null) {
log.info("{}: Resuming from URN: {}", getUpgradeIdUrn(), resumeUrn);
}
// re-using for configuring the sql scan
RestoreIndicesArgs args =
new RestoreIndicesArgs().aspectName(getAspectName()).batchSize(batchSize).limit(limit);
new RestoreIndicesArgs()
.aspectName(getAspectName())
.batchSize(batchSize)
.lastUrn(resumeUrn)
.urnBasedPagination(resumeUrn != null)
.limit(limit);
if (getUrnLike() != null) {
args = args.urnLike(getUrnLike());
@ -86,40 +110,62 @@ public abstract class AbstractMCLStep implements UpgradeStep {
batch -> {
log.info("Processing batch({}) of size {}.", getAspectName(), batchSize);
List<Pair<Future<?>, Boolean>> futures;
List<Pair<Future<?>, SystemAspect>> futures;
futures =
EntityUtils.toSystemAspectFromEbeanAspects(
opContext.getRetrieverContext().get(),
batch.collect(Collectors.toList()))
.stream()
.map(
systemAspect ->
entityService.alwaysProduceMCLAsync(
opContext,
systemAspect.getUrn(),
systemAspect.getUrn().getEntityType(),
getAspectName(),
systemAspect.getAspectSpec(),
null,
systemAspect.getRecordTemplate(),
null,
systemAspect
.getSystemMetadata()
.setRunId(id())
.setLastObserved(System.currentTimeMillis()),
AuditStampUtils.createDefaultAuditStamp(),
ChangeType.UPSERT))
.collect(Collectors.toList());
systemAspect -> {
Pair<Future<?>, Boolean> future =
entityService.alwaysProduceMCLAsync(
opContext,
systemAspect.getUrn(),
systemAspect.getUrn().getEntityType(),
getAspectName(),
systemAspect.getAspectSpec(),
null,
systemAspect.getRecordTemplate(),
null,
systemAspect
.getSystemMetadata()
.setRunId(id())
.setLastObserved(System.currentTimeMillis()),
AuditStampUtils.createDefaultAuditStamp(),
ChangeType.UPSERT);
return Pair.<Future<?>, SystemAspect>of(
future.getFirst(), systemAspect);
})
.toList();
futures.forEach(
f -> {
try {
f.getFirst().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
SystemAspect lastAspect =
futures.stream()
.map(
f -> {
try {
f.getFirst().get();
return f.getSecond();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
})
.reduce((a, b) -> b)
.orElse(null);
// record progress
if (lastAspect != null) {
log.info(
"{}: Saving state. Last urn:{}", getUpgradeIdUrn(), lastAspect.getUrn());
context
.upgrade()
.setUpgradeResult(
opContext,
getUpgradeIdUrn(),
entityService,
DataHubUpgradeState.IN_PROGRESS,
Map.of(LAST_URN_KEY, lastAspect.getUrn().toString()));
}
if (batchDelayMs > 0) {
log.info("Sleeping for {} ms", batchDelayMs);
@ -142,12 +188,23 @@ public abstract class AbstractMCLStep implements UpgradeStep {
@Override
/** Returns whether the upgrade should be skipped. */
public boolean skip(UpgradeContext context) {
boolean previouslyRun =
entityService.exists(
opContext, getUpgradeIdUrn(), DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, true);
if (previouslyRun) {
log.info("{} was already run. Skipping.", id());
Optional<DataHubUpgradeResult> prevResult =
context.upgrade().getUpgradeResult(opContext, getUpgradeIdUrn(), entityService);
boolean previousRunFinal =
prevResult
.filter(
result ->
DataHubUpgradeState.SUCCEEDED.equals(result.getState())
|| DataHubUpgradeState.ABORTED.equals(result.getState()))
.isPresent();
if (previousRunFinal) {
log.info(
"{} was already run. State: {} Skipping.",
id(),
prevResult.map(DataHubUpgradeResult::getState));
}
return previouslyRun;
return previousRunFinal;
}
}

View File

@ -1,5 +1,6 @@
package com.linkedin.datahub.upgrade.system.schemafield;
import static com.linkedin.datahub.upgrade.system.AbstractMCLStep.LAST_URN_KEY;
import static com.linkedin.metadata.Constants.APP_SOURCE;
import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME;
import static com.linkedin.metadata.Constants.SCHEMA_METADATA_ASPECT_NAME;
@ -61,7 +62,6 @@ import org.jetbrains.annotations.Nullable;
*/
@Slf4j
public class GenerateSchemaFieldsFromSchemaMetadataStep implements UpgradeStep {
private static final String LAST_URN_KEY = "lastUrn";
private static final List<String> REQUIRED_ASPECTS =
List.of(SCHEMA_METADATA_ASPECT_NAME, STATUS_ASPECT_NAME);

View File

@ -1,14 +1,18 @@
package com.linkedin.datahub.upgrade;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.assertNotNull;
import com.linkedin.data.template.StringMap;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeManager;
import com.linkedin.datahub.upgrade.system.SystemUpdateNonBlocking;
import com.linkedin.datahub.upgrade.system.bootstrapmcps.BootstrapMCPStep;
@ -20,17 +24,30 @@ import com.linkedin.metadata.dao.producer.KafkaEventProducer;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.EntityServiceImpl;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import com.linkedin.metadata.entity.ebean.PartitionedStream;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.mxe.Topics;
import com.linkedin.upgrade.DataHubUpgradeResult;
import com.linkedin.upgrade.DataHubUpgradeState;
import com.linkedin.util.Pair;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.test.metadata.context.TestOperationContexts;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.inject.Named;
import org.mockito.ArgumentCaptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@ActiveProfiles("test")
@ -63,7 +80,12 @@ public class DatahubUpgradeNonBlockingTest extends AbstractTestNGSpringContextTe
@Autowired private EntityServiceImpl entityService;
@Autowired private OperationContext opContext;
private OperationContext opContext;
@BeforeClass
public void init() {
opContext = TestOperationContexts.systemContextNoValidate();
}
@Test
public void testSystemUpdateNonBlockingInit() {
@ -81,10 +103,13 @@ public class DatahubUpgradeNonBlockingTest extends AbstractTestNGSpringContextTe
}
@Test
public void testReindexDataJobViaNodesCLLPaging() {
public void testReindexDataJobViaNodesCLLPagingArgs() {
EntityService<?> mockService = mock(EntityService.class);
AspectDao mockAspectDao = mock(AspectDao.class);
PartitionedStream<EbeanAspectV2> mockStream = mock(PartitionedStream.class);
when(mockStream.partition(anyInt())).thenReturn(Stream.empty());
when(mockAspectDao.streamAspectBatches(any(RestoreIndicesArgs.class))).thenReturn(mockStream);
ReindexDataJobViaNodesCLL cllUpgrade =
new ReindexDataJobViaNodesCLL(opContext, mockService, mockAspectDao, true, 10, 0, 0);
@ -102,9 +127,79 @@ public class DatahubUpgradeNonBlockingTest extends AbstractTestNGSpringContextTe
.batchSize(10)
.limit(0)
.aspectName("dataJobInputOutput")
.urnBasedPagination(false)
.lastUrn(null)
.urnLike("urn:li:dataJob:%")));
}
@Test
public void testReindexDataJobViaNodesCLLResumePaging() throws Exception {
// Mock services
EntityService<?> mockService = mock(EntityService.class);
AspectDao mockAspectDao = mock(AspectDao.class);
// Create test data
EbeanAspectV2 aspect1 = createMockEbeanAspect("urn:li:dataJob:job1", "dataJobInputOutput");
EbeanAspectV2 aspect2 = createMockEbeanAspect("urn:li:dataJob:job2", "dataJobInputOutput");
EbeanAspectV2 aspect3 = createMockEbeanAspect("urn:li:dataJob:job3", "dataJobInputOutput");
List<EbeanAspectV2> initialBatch = Arrays.asList(aspect1, aspect2);
List<EbeanAspectV2> resumeBatch = Arrays.asList(aspect3);
// Mock the stream for first batch
PartitionedStream<EbeanAspectV2> initialStream = mock(PartitionedStream.class);
when(initialStream.partition(anyInt())).thenReturn(Stream.of(initialBatch.stream()));
// Mock the stream for second batch
PartitionedStream<EbeanAspectV2> resumeStream = mock(PartitionedStream.class);
when(resumeStream.partition(anyInt())).thenReturn(Stream.of(resumeBatch.stream()));
// Setup the AspectDao using Answer to handle null safely
when(mockAspectDao.streamAspectBatches(any(RestoreIndicesArgs.class)))
.thenAnswer(
invocation -> {
RestoreIndicesArgs args = invocation.getArgument(0);
if (args.lastUrn() == null) {
return initialStream;
} else if ("urn:li:dataJob:job2".equals(args.lastUrn())) {
return resumeStream;
}
return mock(PartitionedStream.class);
});
// Mock successful MCL production
when(mockService.alwaysProduceMCLAsync(
any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any()))
.thenReturn(Pair.of(CompletableFuture.completedFuture(null), true));
// Create the upgrade
ReindexDataJobViaNodesCLL cllUpgrade =
new ReindexDataJobViaNodesCLL(opContext, mockService, mockAspectDao, true, 2, 0, 0);
// Initial Run
cllUpgrade.steps().get(0).executable().apply(createMockInitialUpgrade());
// Resumed
cllUpgrade.steps().get(0).executable().apply(createMockResumeUpgrade());
// Use ArgumentCaptor to verify the calls
ArgumentCaptor<RestoreIndicesArgs> argsCaptor =
ArgumentCaptor.forClass(RestoreIndicesArgs.class);
verify(mockAspectDao, times(2)).streamAspectBatches(argsCaptor.capture());
List<RestoreIndicesArgs> capturedArgs = argsCaptor.getAllValues();
// Verify both the initial and resume calls were made with correct arguments
assertEquals(capturedArgs.get(0).lastUrn(), null);
assertEquals(capturedArgs.get(0).urnBasedPagination(), false);
assertEquals(capturedArgs.get(1).lastUrn(), "urn:li:dataJob:job2");
assertEquals(capturedArgs.get(1).urnBasedPagination(), true);
// Verify MCL production was called for each aspect
verify(mockService, times(3))
.alwaysProduceMCLAsync(
any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any());
}
@Test
public void testNonBlockingBootstrapMCP() {
List<BootstrapMCPStep> mcpTemplate =
@ -123,4 +218,54 @@ public class DatahubUpgradeNonBlockingTest extends AbstractTestNGSpringContextTe
.map(update -> update.getMcpTemplate().getName())
.collect(Collectors.toSet())));
}
private UpgradeContext createMockInitialUpgrade() {
// Mock the Upgrade instance
Upgrade mockUpgrade = mock(Upgrade.class);
// Configure the mock upgrade to return no previous result
when(mockUpgrade.getUpgradeResult(any(), any(), any())).thenReturn(Optional.empty());
UpgradeContext mockInitialContext = mock(UpgradeContext.class);
when(mockInitialContext.opContext()).thenReturn(opContext);
when(mockInitialContext.upgrade()).thenReturn(mockUpgrade);
when(mockInitialContext.report()).thenReturn(mock(UpgradeReport.class));
return mockInitialContext;
}
private UpgradeContext createMockResumeUpgrade() {
// Mock the Upgrade instance
Upgrade mockUpgrade = mock(Upgrade.class);
DataHubUpgradeResult mockPrevResult = mock(DataHubUpgradeResult.class);
// Configure the mock previous result
when(mockPrevResult.getState()).thenReturn(DataHubUpgradeState.IN_PROGRESS);
when(mockPrevResult.getResult())
.thenReturn(new StringMap(Map.of("lastUrn", "urn:li:dataJob:job2")));
// Configure the mock upgrade to return our previous result
when(mockUpgrade.getUpgradeResult(any(), any(), any())).thenReturn(Optional.of(mockPrevResult));
UpgradeContext mockResumeContext = mock(UpgradeContext.class);
when(mockResumeContext.opContext()).thenReturn(opContext);
when(mockResumeContext.upgrade()).thenReturn(mockUpgrade);
when(mockResumeContext.report()).thenReturn(mock(UpgradeReport.class));
return mockResumeContext;
}
private static EbeanAspectV2 createMockEbeanAspect(String urn, String aspectName) {
Timestamp now = new Timestamp(System.currentTimeMillis());
return new EbeanAspectV2(
urn,
aspectName,
0L,
"{}", // metadata
now, // createdOn
"urn:li:corpuser:testUser", // createdBy
null, // createdFor
null // systemMetadata
);
}
}

View File

@ -19,17 +19,17 @@ import org.springframework.context.annotation.Import;
@Import(value = {SystemAuthenticationFactory.class})
public class UpgradeCliApplicationTestConfiguration {
@MockBean private UpgradeCli upgradeCli;
@MockBean public UpgradeCli upgradeCli;
@MockBean private Database ebeanServer;
@MockBean public Database ebeanServer;
@MockBean private SearchService searchService;
@MockBean public SearchService searchService;
@MockBean private GraphService graphService;
@MockBean public GraphService graphService;
@MockBean private EntityRegistry entityRegistry;
@MockBean public EntityRegistry entityRegistry;
@MockBean ConfigEntityRegistry configEntityRegistry;
@MockBean public ConfigEntityRegistry configEntityRegistry;
@MockBean public EntityIndexBuilders entityIndexBuilders;