From a63d8be53800d2e6953dcb9c61a43402b3313709 Mon Sep 17 00:00:00 2001 From: Aditya Radhakrishnan Date: Wed, 6 Jul 2022 13:57:05 -0700 Subject: [PATCH] feat(bootstrap): create abstract class UpgradeStep to abstract away upgrade logic (#5349) --- .../linkedin/metadata/boot/UpgradeStep.java | 125 +++++++++++++++++ .../factories/BootstrapManagerFactory.java | 13 +- .../boot/steps/RestoreGlossaryIndices.java | 132 +++++------------- 3 files changed, 171 insertions(+), 99 deletions(-) create mode 100644 metadata-service/factories/src/main/java/com/linkedin/metadata/boot/UpgradeStep.java diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/UpgradeStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/UpgradeStep.java new file mode 100644 index 0000000000..3cf0565892 --- /dev/null +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/UpgradeStep.java @@ -0,0 +1,125 @@ +package com.linkedin.metadata.boot; + +import com.linkedin.common.AuditStamp; +import com.linkedin.common.urn.Urn; +import com.linkedin.data.DataMap; +import com.linkedin.entity.EntityResponse; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.Constants; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.key.DataHubUpgradeKey; +import com.linkedin.metadata.utils.EntityKeyUtils; +import com.linkedin.metadata.utils.GenericRecordUtils; +import com.linkedin.mxe.MetadataChangeProposal; +import com.linkedin.upgrade.DataHubUpgradeRequest; +import com.linkedin.upgrade.DataHubUpgradeResult; +import java.net.URISyntaxException; +import java.util.Collections; +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public abstract class UpgradeStep implements BootstrapStep { + private static final Integer SLEEP_SECONDS = 120; + + protected final EntityService _entityService; + private final String _version; + private final String _upgradeId; + private final Urn _upgradeUrn; + + public UpgradeStep(EntityService entityService, String version, String upgradeId) { + this._entityService = entityService; + this._version = version; + this._upgradeId = upgradeId; + this._upgradeUrn = EntityKeyUtils.convertEntityKeyToUrn(new DataHubUpgradeKey().setId(upgradeId), + Constants.DATA_HUB_UPGRADE_ENTITY_NAME); + } + + @Override + public void execute() throws Exception { + String upgradeStepName = name(); + + log.info(String.format("Attempting to run %s Upgrade Step..", upgradeStepName)); + log.info(String.format("Waiting %s seconds..", SLEEP_SECONDS)); + + if (hasUpgradeRan()) { + log.info(String.format("%s has run before for version %s. Skipping..", _upgradeId, _version)); + return; + } + + // Sleep to ensure deployment process finishes. + Thread.sleep(SLEEP_SECONDS * 1000); + + try { + ingestUpgradeRequestAspect(); + upgrade(); + ingestUpgradeResultAspect(); + } catch (Exception e) { + String errorMessage = String.format("Error when running %s for version %s", _upgradeId, _version); + cleanUpgradeAfterError(e, errorMessage); + throw new RuntimeException(errorMessage, e); + } + } + + @Override + public String name() { + return this.getClass().getSimpleName(); + } + + public abstract void upgrade() throws Exception; + + private boolean hasUpgradeRan() { + try { + EntityResponse response = _entityService.getEntityV2(Constants.DATA_HUB_UPGRADE_ENTITY_NAME, _upgradeUrn, + Collections.singleton(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME)); + + if (response != null && response.getAspects().containsKey(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME)) { + DataMap dataMap = response.getAspects().get(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME).getValue().data(); + DataHubUpgradeRequest request = new DataHubUpgradeRequest(dataMap); + if (request.hasVersion() && request.getVersion().equals(_version)) { + return true; + } + } + } catch (Exception e) { + log.error("Error when checking to see if datahubUpgrade entity exists. Commencing with upgrade...", e); + return false; + } + return false; + } + + private void ingestUpgradeRequestAspect() throws URISyntaxException { + final AuditStamp auditStamp = + new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis()); + final DataHubUpgradeRequest upgradeRequest = + new DataHubUpgradeRequest().setTimestampMs(System.currentTimeMillis()).setVersion(_version); + + final MetadataChangeProposal upgradeProposal = new MetadataChangeProposal(); + upgradeProposal.setEntityUrn(_upgradeUrn); + upgradeProposal.setEntityType(Constants.DATA_HUB_UPGRADE_ENTITY_NAME); + upgradeProposal.setAspectName(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME); + upgradeProposal.setAspect(GenericRecordUtils.serializeAspect(upgradeRequest)); + upgradeProposal.setChangeType(ChangeType.UPSERT); + + _entityService.ingestProposal(upgradeProposal, auditStamp); + } + + private void ingestUpgradeResultAspect() throws URISyntaxException { + final AuditStamp auditStamp = + new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis()); + final DataHubUpgradeResult upgradeResult = new DataHubUpgradeResult().setTimestampMs(System.currentTimeMillis()); + + final MetadataChangeProposal upgradeProposal = new MetadataChangeProposal(); + upgradeProposal.setEntityUrn(_upgradeUrn); + upgradeProposal.setEntityType(Constants.DATA_HUB_UPGRADE_ENTITY_NAME); + upgradeProposal.setAspectName(Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME); + upgradeProposal.setAspect(GenericRecordUtils.serializeAspect(upgradeResult)); + upgradeProposal.setChangeType(ChangeType.UPSERT); + + _entityService.ingestProposal(upgradeProposal, auditStamp); + } + + private void cleanUpgradeAfterError(Exception e, String errorMessage) { + log.error(errorMessage, e); + _entityService.deleteUrn(_upgradeUrn); + } +} diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java index 2537b48af8..d092ce8817 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java @@ -67,11 +67,14 @@ public class BootstrapManagerFactory { final IngestDataPlatformsStep ingestDataPlatformsStep = new IngestDataPlatformsStep(_entityService); final IngestDataPlatformInstancesStep ingestDataPlatformInstancesStep = new IngestDataPlatformInstancesStep(_entityService, _migrationsDao); - final RestoreGlossaryIndices restoreGlossaryIndicesStep = new RestoreGlossaryIndices(_entityService, _entitySearchService, _entityRegistry); - final RestoreDbtSiblingsIndices - restoreDbtSiblingsIndices = new RestoreDbtSiblingsIndices(_entityService, _entityRegistry); + final RestoreGlossaryIndices restoreGlossaryIndicesStep = + new RestoreGlossaryIndices(_entityService, _entitySearchService, _entityRegistry); + final RestoreDbtSiblingsIndices restoreDbtSiblingsIndices = + new RestoreDbtSiblingsIndices(_entityService, _entityRegistry); final RemoveClientIdAspectStep removeClientIdAspectStep = new RemoveClientIdAspectStep(_entityService); - return new BootstrapManager(ImmutableList.of(ingestRootUserStep, ingestPoliciesStep, ingestDataPlatformsStep, - ingestDataPlatformInstancesStep, _ingestRetentionPoliciesStep, restoreGlossaryIndicesStep, removeClientIdAspectStep, restoreDbtSiblingsIndices)); + return new BootstrapManager( + ImmutableList.of(ingestRootUserStep, ingestPoliciesStep, ingestDataPlatformsStep, + ingestDataPlatformInstancesStep, _ingestRetentionPoliciesStep, restoreGlossaryIndicesStep, + removeClientIdAspectStep, restoreDbtSiblingsIndices)); } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreGlossaryIndices.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreGlossaryIndices.java index b6defa8bc3..b5ae5fb34a 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreGlossaryIndices.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreGlossaryIndices.java @@ -2,55 +2,66 @@ package com.linkedin.metadata.boot.steps; import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.Urn; -import com.linkedin.data.DataMap; -import com.linkedin.data.template.RecordTemplate; import com.linkedin.entity.EntityResponse; import com.linkedin.entity.EnvelopedAspectMap; import com.linkedin.events.metadata.ChangeType; import com.linkedin.glossary.GlossaryNodeInfo; import com.linkedin.glossary.GlossaryTermInfo; import com.linkedin.metadata.Constants; -import com.linkedin.metadata.boot.BootstrapStep; +import com.linkedin.metadata.boot.UpgradeStep; import com.linkedin.metadata.entity.EntityService; -import com.linkedin.metadata.key.DataHubUpgradeKey; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.search.EntitySearchService; import com.linkedin.metadata.search.SearchEntity; import com.linkedin.metadata.search.SearchResult; -import com.linkedin.metadata.utils.EntityKeyUtils; -import com.linkedin.metadata.utils.GenericRecordUtils; -import com.linkedin.mxe.MetadataChangeProposal; -import com.linkedin.upgrade.DataHubUpgradeRequest; -import com.linkedin.upgrade.DataHubUpgradeResult; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -import javax.annotation.Nonnull; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import javax.annotation.Nonnull; +import lombok.extern.slf4j.Slf4j; @Slf4j -@RequiredArgsConstructor -public class RestoreGlossaryIndices implements BootstrapStep { +public class RestoreGlossaryIndices extends UpgradeStep { private static final String VERSION = "1"; private static final String UPGRADE_ID = "restore-glossary-indices-ui"; - private static final Urn GLOSSARY_UPGRADE_URN = - EntityKeyUtils.convertEntityKeyToUrn(new DataHubUpgradeKey().setId(UPGRADE_ID), Constants.DATA_HUB_UPGRADE_ENTITY_NAME); private static final Integer BATCH_SIZE = 1000; - private static final Integer SLEEP_SECONDS = 120; - private final EntityService _entityService; private final EntitySearchService _entitySearchService; private final EntityRegistry _entityRegistry; + public RestoreGlossaryIndices(EntityService entityService, EntitySearchService entitySearchService, + EntityRegistry entityRegistry) { + super(entityService, VERSION, UPGRADE_ID); + _entitySearchService = entitySearchService; + _entityRegistry = entityRegistry; + } + @Override - public String name() { - return this.getClass().getSimpleName(); + public void upgrade() throws Exception { + final AspectSpec termAspectSpec = _entityRegistry.getEntitySpec(Constants.GLOSSARY_TERM_ENTITY_NAME) + .getAspectSpec(Constants.GLOSSARY_TERM_INFO_ASPECT_NAME); + final AspectSpec nodeAspectSpec = _entityRegistry.getEntitySpec(Constants.GLOSSARY_NODE_ENTITY_NAME) + .getAspectSpec(Constants.GLOSSARY_NODE_INFO_ASPECT_NAME); + final AuditStamp auditStamp = + new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis()); + + final int totalTermsCount = getAndRestoreTermAspectIndices(0, auditStamp, termAspectSpec); + int termsCount = BATCH_SIZE; + while (termsCount < totalTermsCount) { + getAndRestoreTermAspectIndices(termsCount, auditStamp, termAspectSpec); + termsCount += BATCH_SIZE; + } + + final int totalNodesCount = getAndRestoreNodeAspectIndices(0, auditStamp, nodeAspectSpec); + int nodesCount = BATCH_SIZE; + while (nodesCount < totalNodesCount) { + getAndRestoreNodeAspectIndices(nodesCount, auditStamp, nodeAspectSpec); + nodesCount += BATCH_SIZE; + } } @Nonnull @@ -59,83 +70,16 @@ public class RestoreGlossaryIndices implements BootstrapStep { return ExecutionMode.ASYNC; } - @Override - public void execute() throws Exception { - log.info("Attempting to run RestoreGlossaryIndices upgrade.."); - log.info(String.format("Waiting %s seconds..", SLEEP_SECONDS)); - - // Sleep to ensure deployment process finishes. - Thread.sleep(SLEEP_SECONDS * 1000); - - try { - EntityResponse response = _entityService.getEntityV2( - Constants.DATA_HUB_UPGRADE_ENTITY_NAME, - GLOSSARY_UPGRADE_URN, - Collections.singleton(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME) - ); - if (response != null && response.getAspects().containsKey(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME)) { - DataMap dataMap = response.getAspects().get(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME).getValue().data(); - DataHubUpgradeRequest request = new DataHubUpgradeRequest(dataMap); - if (request.hasVersion() && request.getVersion().equals(VERSION)) { - log.info("Glossary Upgrade has run before with this version. Skipping"); - return; - } - } - - final AspectSpec termAspectSpec = - _entityRegistry.getEntitySpec(Constants.GLOSSARY_TERM_ENTITY_NAME).getAspectSpec(Constants.GLOSSARY_TERM_INFO_ASPECT_NAME); - final AspectSpec nodeAspectSpec = - _entityRegistry.getEntitySpec(Constants.GLOSSARY_NODE_ENTITY_NAME).getAspectSpec(Constants.GLOSSARY_NODE_INFO_ASPECT_NAME); - final AuditStamp auditStamp = new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis()); - - final DataHubUpgradeRequest upgradeRequest = new DataHubUpgradeRequest().setTimestampMs(System.currentTimeMillis()).setVersion(VERSION); - ingestUpgradeAspect(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME, upgradeRequest, auditStamp); - - final int totalTermsCount = getAndRestoreTermAspectIndices(0, auditStamp, termAspectSpec); - int termsCount = BATCH_SIZE; - while (termsCount < totalTermsCount) { - getAndRestoreTermAspectIndices(termsCount, auditStamp, termAspectSpec); - termsCount += BATCH_SIZE; - } - - final int totalNodesCount = getAndRestoreNodeAspectIndices(0, auditStamp, nodeAspectSpec); - int nodesCount = BATCH_SIZE; - while (nodesCount < totalNodesCount) { - getAndRestoreNodeAspectIndices(nodesCount, auditStamp, nodeAspectSpec); - nodesCount += BATCH_SIZE; - } - - final DataHubUpgradeResult upgradeResult = new DataHubUpgradeResult().setTimestampMs(System.currentTimeMillis()); - ingestUpgradeAspect(Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, upgradeResult, auditStamp); - - log.info("Successfully restored glossary index"); - } catch (Exception e) { - log.error("Error when running the RestoreGlossaryIndices Bootstrap Step", e); - _entityService.deleteUrn(GLOSSARY_UPGRADE_URN); - throw new RuntimeException("Error when running the RestoreGlossaryIndices Bootstrap Step", e); - } - } - - private void ingestUpgradeAspect(String aspectName, RecordTemplate aspect, AuditStamp auditStamp) { - final MetadataChangeProposal upgradeProposal = new MetadataChangeProposal(); - upgradeProposal.setEntityUrn(GLOSSARY_UPGRADE_URN); - upgradeProposal.setEntityType(Constants.DATA_HUB_UPGRADE_ENTITY_NAME); - upgradeProposal.setAspectName(aspectName); - upgradeProposal.setAspect(GenericRecordUtils.serializeAspect(aspect)); - upgradeProposal.setChangeType(ChangeType.UPSERT); - - _entityService.ingestProposal(upgradeProposal, auditStamp); - } - - private int getAndRestoreTermAspectIndices(int start, AuditStamp auditStamp, AspectSpec termAspectSpec) throws Exception { - SearchResult termsResult = _entitySearchService.search(Constants.GLOSSARY_TERM_ENTITY_NAME, "", null, null, start, BATCH_SIZE); + private int getAndRestoreTermAspectIndices(int start, AuditStamp auditStamp, AspectSpec termAspectSpec) + throws Exception { + SearchResult termsResult = + _entitySearchService.search(Constants.GLOSSARY_TERM_ENTITY_NAME, "", null, null, start, BATCH_SIZE); List termUrns = termsResult.getEntities().stream().map(SearchEntity::getEntity).collect(Collectors.toList()); if (termUrns.size() == 0) { return 0; } - final Map termInfoResponses = _entityService.getEntitiesV2( - Constants.GLOSSARY_TERM_ENTITY_NAME, - new HashSet<>(termUrns), + final Map termInfoResponses = + _entityService.getEntitiesV2(Constants.GLOSSARY_TERM_ENTITY_NAME, new HashSet<>(termUrns), Collections.singleton(Constants.GLOSSARY_TERM_INFO_ASPECT_NAME) );