feat(bootstrap): create abstract class UpgradeStep to abstract away upgrade logic (#5349)

This commit is contained in:
Aditya Radhakrishnan 2022-07-06 13:57:05 -07:00 committed by GitHub
parent 4b515e022b
commit a63d8be538
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 171 additions and 99 deletions

View File

@ -0,0 +1,125 @@
package com.linkedin.metadata.boot;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.DataMap;
import com.linkedin.entity.EntityResponse;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.key.DataHubUpgradeKey;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.upgrade.DataHubUpgradeRequest;
import com.linkedin.upgrade.DataHubUpgradeResult;
import java.net.URISyntaxException;
import java.util.Collections;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public abstract class UpgradeStep implements BootstrapStep {
private static final Integer SLEEP_SECONDS = 120;
protected final EntityService _entityService;
private final String _version;
private final String _upgradeId;
private final Urn _upgradeUrn;
public UpgradeStep(EntityService entityService, String version, String upgradeId) {
this._entityService = entityService;
this._version = version;
this._upgradeId = upgradeId;
this._upgradeUrn = EntityKeyUtils.convertEntityKeyToUrn(new DataHubUpgradeKey().setId(upgradeId),
Constants.DATA_HUB_UPGRADE_ENTITY_NAME);
}
@Override
public void execute() throws Exception {
String upgradeStepName = name();
log.info(String.format("Attempting to run %s Upgrade Step..", upgradeStepName));
log.info(String.format("Waiting %s seconds..", SLEEP_SECONDS));
if (hasUpgradeRan()) {
log.info(String.format("%s has run before for version %s. Skipping..", _upgradeId, _version));
return;
}
// Sleep to ensure deployment process finishes.
Thread.sleep(SLEEP_SECONDS * 1000);
try {
ingestUpgradeRequestAspect();
upgrade();
ingestUpgradeResultAspect();
} catch (Exception e) {
String errorMessage = String.format("Error when running %s for version %s", _upgradeId, _version);
cleanUpgradeAfterError(e, errorMessage);
throw new RuntimeException(errorMessage, e);
}
}
@Override
public String name() {
return this.getClass().getSimpleName();
}
public abstract void upgrade() throws Exception;
private boolean hasUpgradeRan() {
try {
EntityResponse response = _entityService.getEntityV2(Constants.DATA_HUB_UPGRADE_ENTITY_NAME, _upgradeUrn,
Collections.singleton(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME));
if (response != null && response.getAspects().containsKey(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME)) {
DataMap dataMap = response.getAspects().get(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME).getValue().data();
DataHubUpgradeRequest request = new DataHubUpgradeRequest(dataMap);
if (request.hasVersion() && request.getVersion().equals(_version)) {
return true;
}
}
} catch (Exception e) {
log.error("Error when checking to see if datahubUpgrade entity exists. Commencing with upgrade...", e);
return false;
}
return false;
}
private void ingestUpgradeRequestAspect() throws URISyntaxException {
final AuditStamp auditStamp =
new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis());
final DataHubUpgradeRequest upgradeRequest =
new DataHubUpgradeRequest().setTimestampMs(System.currentTimeMillis()).setVersion(_version);
final MetadataChangeProposal upgradeProposal = new MetadataChangeProposal();
upgradeProposal.setEntityUrn(_upgradeUrn);
upgradeProposal.setEntityType(Constants.DATA_HUB_UPGRADE_ENTITY_NAME);
upgradeProposal.setAspectName(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME);
upgradeProposal.setAspect(GenericRecordUtils.serializeAspect(upgradeRequest));
upgradeProposal.setChangeType(ChangeType.UPSERT);
_entityService.ingestProposal(upgradeProposal, auditStamp);
}
private void ingestUpgradeResultAspect() throws URISyntaxException {
final AuditStamp auditStamp =
new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis());
final DataHubUpgradeResult upgradeResult = new DataHubUpgradeResult().setTimestampMs(System.currentTimeMillis());
final MetadataChangeProposal upgradeProposal = new MetadataChangeProposal();
upgradeProposal.setEntityUrn(_upgradeUrn);
upgradeProposal.setEntityType(Constants.DATA_HUB_UPGRADE_ENTITY_NAME);
upgradeProposal.setAspectName(Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME);
upgradeProposal.setAspect(GenericRecordUtils.serializeAspect(upgradeResult));
upgradeProposal.setChangeType(ChangeType.UPSERT);
_entityService.ingestProposal(upgradeProposal, auditStamp);
}
private void cleanUpgradeAfterError(Exception e, String errorMessage) {
log.error(errorMessage, e);
_entityService.deleteUrn(_upgradeUrn);
}
}

View File

@ -67,11 +67,14 @@ public class BootstrapManagerFactory {
final IngestDataPlatformsStep ingestDataPlatformsStep = new IngestDataPlatformsStep(_entityService);
final IngestDataPlatformInstancesStep ingestDataPlatformInstancesStep =
new IngestDataPlatformInstancesStep(_entityService, _migrationsDao);
final RestoreGlossaryIndices restoreGlossaryIndicesStep = new RestoreGlossaryIndices(_entityService, _entitySearchService, _entityRegistry);
final RestoreDbtSiblingsIndices
restoreDbtSiblingsIndices = new RestoreDbtSiblingsIndices(_entityService, _entityRegistry);
final RestoreGlossaryIndices restoreGlossaryIndicesStep =
new RestoreGlossaryIndices(_entityService, _entitySearchService, _entityRegistry);
final RestoreDbtSiblingsIndices restoreDbtSiblingsIndices =
new RestoreDbtSiblingsIndices(_entityService, _entityRegistry);
final RemoveClientIdAspectStep removeClientIdAspectStep = new RemoveClientIdAspectStep(_entityService);
return new BootstrapManager(ImmutableList.of(ingestRootUserStep, ingestPoliciesStep, ingestDataPlatformsStep,
ingestDataPlatformInstancesStep, _ingestRetentionPoliciesStep, restoreGlossaryIndicesStep, removeClientIdAspectStep, restoreDbtSiblingsIndices));
return new BootstrapManager(
ImmutableList.of(ingestRootUserStep, ingestPoliciesStep, ingestDataPlatformsStep,
ingestDataPlatformInstancesStep, _ingestRetentionPoliciesStep, restoreGlossaryIndicesStep,
removeClientIdAspectStep, restoreDbtSiblingsIndices));
}
}

View File

@ -2,55 +2,66 @@ package com.linkedin.metadata.boot.steps;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.DataMap;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.EnvelopedAspectMap;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.glossary.GlossaryNodeInfo;
import com.linkedin.glossary.GlossaryTermInfo;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.boot.UpgradeStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.key.DataHubUpgradeKey;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.search.SearchEntity;
import com.linkedin.metadata.search.SearchResult;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.upgrade.DataHubUpgradeRequest;
import com.linkedin.upgrade.DataHubUpgradeResult;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nonnull;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RequiredArgsConstructor
public class RestoreGlossaryIndices implements BootstrapStep {
public class RestoreGlossaryIndices extends UpgradeStep {
private static final String VERSION = "1";
private static final String UPGRADE_ID = "restore-glossary-indices-ui";
private static final Urn GLOSSARY_UPGRADE_URN =
EntityKeyUtils.convertEntityKeyToUrn(new DataHubUpgradeKey().setId(UPGRADE_ID), Constants.DATA_HUB_UPGRADE_ENTITY_NAME);
private static final Integer BATCH_SIZE = 1000;
private static final Integer SLEEP_SECONDS = 120;
private final EntityService _entityService;
private final EntitySearchService _entitySearchService;
private final EntityRegistry _entityRegistry;
public RestoreGlossaryIndices(EntityService entityService, EntitySearchService entitySearchService,
EntityRegistry entityRegistry) {
super(entityService, VERSION, UPGRADE_ID);
_entitySearchService = entitySearchService;
_entityRegistry = entityRegistry;
}
@Override
public String name() {
return this.getClass().getSimpleName();
public void upgrade() throws Exception {
final AspectSpec termAspectSpec = _entityRegistry.getEntitySpec(Constants.GLOSSARY_TERM_ENTITY_NAME)
.getAspectSpec(Constants.GLOSSARY_TERM_INFO_ASPECT_NAME);
final AspectSpec nodeAspectSpec = _entityRegistry.getEntitySpec(Constants.GLOSSARY_NODE_ENTITY_NAME)
.getAspectSpec(Constants.GLOSSARY_NODE_INFO_ASPECT_NAME);
final AuditStamp auditStamp =
new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis());
final int totalTermsCount = getAndRestoreTermAspectIndices(0, auditStamp, termAspectSpec);
int termsCount = BATCH_SIZE;
while (termsCount < totalTermsCount) {
getAndRestoreTermAspectIndices(termsCount, auditStamp, termAspectSpec);
termsCount += BATCH_SIZE;
}
final int totalNodesCount = getAndRestoreNodeAspectIndices(0, auditStamp, nodeAspectSpec);
int nodesCount = BATCH_SIZE;
while (nodesCount < totalNodesCount) {
getAndRestoreNodeAspectIndices(nodesCount, auditStamp, nodeAspectSpec);
nodesCount += BATCH_SIZE;
}
}
@Nonnull
@ -59,83 +70,16 @@ public class RestoreGlossaryIndices implements BootstrapStep {
return ExecutionMode.ASYNC;
}
@Override
public void execute() throws Exception {
log.info("Attempting to run RestoreGlossaryIndices upgrade..");
log.info(String.format("Waiting %s seconds..", SLEEP_SECONDS));
// Sleep to ensure deployment process finishes.
Thread.sleep(SLEEP_SECONDS * 1000);
try {
EntityResponse response = _entityService.getEntityV2(
Constants.DATA_HUB_UPGRADE_ENTITY_NAME,
GLOSSARY_UPGRADE_URN,
Collections.singleton(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME)
);
if (response != null && response.getAspects().containsKey(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME)) {
DataMap dataMap = response.getAspects().get(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME).getValue().data();
DataHubUpgradeRequest request = new DataHubUpgradeRequest(dataMap);
if (request.hasVersion() && request.getVersion().equals(VERSION)) {
log.info("Glossary Upgrade has run before with this version. Skipping");
return;
}
}
final AspectSpec termAspectSpec =
_entityRegistry.getEntitySpec(Constants.GLOSSARY_TERM_ENTITY_NAME).getAspectSpec(Constants.GLOSSARY_TERM_INFO_ASPECT_NAME);
final AspectSpec nodeAspectSpec =
_entityRegistry.getEntitySpec(Constants.GLOSSARY_NODE_ENTITY_NAME).getAspectSpec(Constants.GLOSSARY_NODE_INFO_ASPECT_NAME);
final AuditStamp auditStamp = new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis());
final DataHubUpgradeRequest upgradeRequest = new DataHubUpgradeRequest().setTimestampMs(System.currentTimeMillis()).setVersion(VERSION);
ingestUpgradeAspect(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME, upgradeRequest, auditStamp);
final int totalTermsCount = getAndRestoreTermAspectIndices(0, auditStamp, termAspectSpec);
int termsCount = BATCH_SIZE;
while (termsCount < totalTermsCount) {
getAndRestoreTermAspectIndices(termsCount, auditStamp, termAspectSpec);
termsCount += BATCH_SIZE;
}
final int totalNodesCount = getAndRestoreNodeAspectIndices(0, auditStamp, nodeAspectSpec);
int nodesCount = BATCH_SIZE;
while (nodesCount < totalNodesCount) {
getAndRestoreNodeAspectIndices(nodesCount, auditStamp, nodeAspectSpec);
nodesCount += BATCH_SIZE;
}
final DataHubUpgradeResult upgradeResult = new DataHubUpgradeResult().setTimestampMs(System.currentTimeMillis());
ingestUpgradeAspect(Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, upgradeResult, auditStamp);
log.info("Successfully restored glossary index");
} catch (Exception e) {
log.error("Error when running the RestoreGlossaryIndices Bootstrap Step", e);
_entityService.deleteUrn(GLOSSARY_UPGRADE_URN);
throw new RuntimeException("Error when running the RestoreGlossaryIndices Bootstrap Step", e);
}
}
private void ingestUpgradeAspect(String aspectName, RecordTemplate aspect, AuditStamp auditStamp) {
final MetadataChangeProposal upgradeProposal = new MetadataChangeProposal();
upgradeProposal.setEntityUrn(GLOSSARY_UPGRADE_URN);
upgradeProposal.setEntityType(Constants.DATA_HUB_UPGRADE_ENTITY_NAME);
upgradeProposal.setAspectName(aspectName);
upgradeProposal.setAspect(GenericRecordUtils.serializeAspect(aspect));
upgradeProposal.setChangeType(ChangeType.UPSERT);
_entityService.ingestProposal(upgradeProposal, auditStamp);
}
private int getAndRestoreTermAspectIndices(int start, AuditStamp auditStamp, AspectSpec termAspectSpec) throws Exception {
SearchResult termsResult = _entitySearchService.search(Constants.GLOSSARY_TERM_ENTITY_NAME, "", null, null, start, BATCH_SIZE);
private int getAndRestoreTermAspectIndices(int start, AuditStamp auditStamp, AspectSpec termAspectSpec)
throws Exception {
SearchResult termsResult =
_entitySearchService.search(Constants.GLOSSARY_TERM_ENTITY_NAME, "", null, null, start, BATCH_SIZE);
List<Urn> termUrns = termsResult.getEntities().stream().map(SearchEntity::getEntity).collect(Collectors.toList());
if (termUrns.size() == 0) {
return 0;
}
final Map<Urn, EntityResponse> termInfoResponses = _entityService.getEntitiesV2(
Constants.GLOSSARY_TERM_ENTITY_NAME,
new HashSet<>(termUrns),
final Map<Urn, EntityResponse> termInfoResponses =
_entityService.getEntitiesV2(Constants.GLOSSARY_TERM_ENTITY_NAME, new HashSet<>(termUrns),
Collections.singleton(Constants.GLOSSARY_TERM_INFO_ASPECT_NAME)
);