fix(system-update): fix no wait flag (#7927)

This commit is contained in:
david-leifker 2023-04-30 08:12:11 -05:00 committed by GitHub
parent b1413c3e8c
commit 9ef281acf2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 18 deletions

View File

@ -1,6 +1,5 @@
package com.linkedin.metadata.boot.factories;
import com.google.common.collect.ImmutableList;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.entity.EntityServiceFactory;
import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory;
@ -28,8 +27,10 @@ import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.search.transformer.SearchDocumentTransformer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
@ -99,10 +100,10 @@ public class BootstrapManagerFactory {
final RemoveClientIdAspectStep removeClientIdAspectStep = new RemoveClientIdAspectStep(_entityService);
final RestoreColumnLineageIndices restoreColumnLineageIndices = new RestoreColumnLineageIndices(_entityService, _entityRegistry);
final IngestDefaultGlobalSettingsStep ingestSettingsStep = new IngestDefaultGlobalSettingsStep(_entityService);
final WaitForSystemUpdateStep waitForSystemUpdateStep = new WaitForSystemUpdateStep(_dataHubUpgradeKafkaListener,
_configurationProvider);
final WaitForSystemUpdateStep waitForSystemUpdateStep = _configurationProvider.getSystemUpdate().isWaitForSystemUpdate()
? new WaitForSystemUpdateStep(_dataHubUpgradeKafkaListener, _configurationProvider) : null;
final List<BootstrapStep> finalSteps = new ArrayList<>(ImmutableList.of(
final List<BootstrapStep> finalSteps = Stream.of(
waitForSystemUpdateStep,
ingestRootUserStep,
ingestPoliciesStep,
@ -115,7 +116,9 @@ public class BootstrapManagerFactory {
removeClientIdAspectStep,
restoreDbtSiblingsIndices,
indexDataPlatformsStep,
restoreColumnLineageIndices));
restoreColumnLineageIndices)
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (_upgradeDefaultBrowsePathsEnabled) {
finalSteps.add(new UpgradeDefaultBrowsePathsStep(_entityService));

View File

@ -84,6 +84,9 @@ public class DataHubUpgradeKafkaListener implements ConsumerSeekAware, Bootstrap
log.info("Latest system update version: {}", event.getVersion());
if (expectedVersion.equals(event.getVersion())) {
IS_UPDATED.getAndSet(true);
} else if (!_configurationProvider.getSystemUpdate().isWaitForSystemUpdate()) {
log.warn("Wait for system update is disabled. Proceeding with startup.");
IS_UPDATED.getAndSet(true);
} else {
log.warn("System version is not up to date: {}. Waiting for datahub-upgrade to complete...", expectedVersion);
}