feat(browseV2): add browseV2 logic to system update (#8506)

Co-authored-by: david-leifker <114954101+david-leifker@users.noreply.github.com>
This commit is contained in:
RyanHolstien 2023-08-01 12:48:02 -05:00 committed by GitHub
parent 2e2a6748ac
commit 9f791a3b6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 227 additions and 5 deletions

View File

@ -0,0 +1,17 @@
package com.linkedin.datahub.upgrade.config;
import com.linkedin.datahub.upgrade.system.entity.steps.BackfillBrowsePathsV2;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.SearchService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class BackfillBrowsePathsV2Config {
@Bean
public BackfillBrowsePathsV2 backfillBrowsePathsV2(EntityService entityService, SearchService searchService) {
return new BackfillBrowsePathsV2(entityService, searchService);
}
}

View File

@ -3,6 +3,7 @@ package com.linkedin.datahub.upgrade.config;
import com.linkedin.datahub.upgrade.system.SystemUpdate;
import com.linkedin.datahub.upgrade.system.elasticsearch.BuildIndices;
import com.linkedin.datahub.upgrade.system.elasticsearch.CleanIndices;
import com.linkedin.datahub.upgrade.system.entity.steps.BackfillBrowsePathsV2;
import com.linkedin.gms.factory.common.TopicConventionFactory;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.kafka.DataHubKafkaProducerFactory;
@ -30,10 +31,11 @@ public class SystemUpdateConfig {
@Bean(name = "systemUpdate")
public SystemUpdate systemUpdate(final BuildIndices buildIndices, final CleanIndices cleanIndices,
@Qualifier("duheKafkaEventProducer") final KafkaEventProducer kafkaEventProducer,
final GitVersion gitVersion, @Qualifier("revision") String revision) {
final GitVersion gitVersion, @Qualifier("revision") String revision,
final BackfillBrowsePathsV2 backfillBrowsePathsV2) {
String version = String.format("%s-%s", gitVersion.getVersion(), revision);
return new SystemUpdate(buildIndices, cleanIndices, kafkaEventProducer, version);
return new SystemUpdate(buildIndices, cleanIndices, kafkaEventProducer, version, backfillBrowsePathsV2);
}
@Value("#{systemEnvironment['DATAHUB_REVISION'] ?: '0'}")

View File

@ -6,6 +6,7 @@ import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.system.elasticsearch.BuildIndices;
import com.linkedin.datahub.upgrade.system.elasticsearch.CleanIndices;
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.DataHubStartupStep;
import com.linkedin.datahub.upgrade.system.entity.steps.BackfillBrowsePathsV2;
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
import lombok.extern.slf4j.Slf4j;
@ -21,11 +22,12 @@ public class SystemUpdate implements Upgrade {
private final List<UpgradeStep> _steps;
public SystemUpdate(final BuildIndices buildIndicesJob, final CleanIndices cleanIndicesJob,
final KafkaEventProducer kafkaEventProducer, final String version) {
final KafkaEventProducer kafkaEventProducer, final String version,
final BackfillBrowsePathsV2 backfillBrowsePathsV2) {
_preStartupUpgrades = List.of(buildIndicesJob);
_steps = List.of(new DataHubStartupStep(kafkaEventProducer, version));
_postStartupUpgrades = List.of(cleanIndicesJob);
_postStartupUpgrades = List.of(cleanIndicesJob, backfillBrowsePathsV2);
}
@Override

View File

@ -0,0 +1,28 @@
package com.linkedin.datahub.upgrade.system.entity.steps;
import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.Upgrade;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.SearchService;
import java.util.List;
public class BackfillBrowsePathsV2 implements Upgrade {
private final List<UpgradeStep> _steps;
public BackfillBrowsePathsV2(EntityService entityService, SearchService searchService) {
_steps = ImmutableList.of(new BackfillBrowsePathsV2Step(entityService, searchService));
}
@Override
public String id() {
return "BackfillBrowsePathsV2";
}
@Override
public List<UpgradeStep> steps() {
return _steps;
}
}

View File

@ -0,0 +1,167 @@
package com.linkedin.datahub.upgrade.system.entity.steps;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.BrowsePathsV2;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.query.filter.Condition;
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
import com.linkedin.metadata.query.filter.Criterion;
import com.linkedin.metadata.query.filter.CriterionArray;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.search.ScrollResult;
import com.linkedin.metadata.search.SearchEntity;
import com.linkedin.metadata.search.SearchService;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.SystemMetadata;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import java.util.Set;
import static com.linkedin.metadata.Constants.*;
@Slf4j
public class BackfillBrowsePathsV2Step implements UpgradeStep {
public static final String BACKFILL_BROWSE_PATHS_V2 = "BACKFILL_BROWSE_PATHS_V2";
private static final Set<String> ENTITY_TYPES_TO_MIGRATE = ImmutableSet.of(
Constants.DATASET_ENTITY_NAME,
Constants.DASHBOARD_ENTITY_NAME,
Constants.CHART_ENTITY_NAME,
Constants.DATA_JOB_ENTITY_NAME,
Constants.DATA_FLOW_ENTITY_NAME,
Constants.ML_MODEL_ENTITY_NAME,
Constants.ML_MODEL_GROUP_ENTITY_NAME,
Constants.ML_FEATURE_TABLE_ENTITY_NAME,
Constants.ML_FEATURE_ENTITY_NAME
);
private static final Integer BATCH_SIZE = 5000;
private final EntityService _entityService;
private final SearchService _searchService;
public BackfillBrowsePathsV2Step(EntityService entityService, SearchService searchService) {
_searchService = searchService;
_entityService = entityService;
}
@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
final AuditStamp auditStamp =
new AuditStamp().setActor(UrnUtils.getUrn(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis());
String scrollId = null;
for (String entityType : ENTITY_TYPES_TO_MIGRATE) {
int migratedCount = 0;
do {
log.info(String.format("Upgrading batch %s-%s of browse paths for entity type %s", migratedCount,
migratedCount + BATCH_SIZE, entityType));
scrollId = backfillBrowsePathsV2(entityType, auditStamp, scrollId);
migratedCount += BATCH_SIZE;
} while (scrollId != null);
}
return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED);
};
}
private String backfillBrowsePathsV2(String entityType, AuditStamp auditStamp, String scrollId) {
// Condition: has `browsePaths` AND does NOT have `browsePathV2`
Criterion missingBrowsePathV2 = new Criterion();
missingBrowsePathV2.setCondition(Condition.IS_NULL);
missingBrowsePathV2.setField("browsePathV2");
// Excludes entities without browsePaths
Criterion hasBrowsePathV1 = new Criterion();
hasBrowsePathV1.setCondition(Condition.EXISTS);
hasBrowsePathV1.setField("browsePaths");
CriterionArray criterionArray = new CriterionArray();
criterionArray.add(missingBrowsePathV2);
criterionArray.add(hasBrowsePathV1);
ConjunctiveCriterion conjunctiveCriterion = new ConjunctiveCriterion();
conjunctiveCriterion.setAnd(criterionArray);
ConjunctiveCriterionArray conjunctiveCriterionArray = new ConjunctiveCriterionArray();
conjunctiveCriterionArray.add(conjunctiveCriterion);
Filter filter = new Filter();
filter.setOr(conjunctiveCriterionArray);
final ScrollResult scrollResult = _searchService.scrollAcrossEntities(
ImmutableList.of(entityType),
"*",
filter,
null,
scrollId,
"5m",
BATCH_SIZE,
null
);
if (scrollResult.getNumEntities() == 0 || scrollResult.getEntities().size() == 0) {
return null;
}
for (SearchEntity searchEntity : scrollResult.getEntities()) {
try {
ingestBrowsePathsV2(searchEntity.getEntity(), auditStamp);
} catch (Exception e) {
// don't stop the whole step because of one bad urn or one bad ingestion
log.error(String.format("Error ingesting default browsePathsV2 aspect for urn %s", searchEntity.getEntity()), e);
}
}
return scrollResult.getScrollId();
}
private void ingestBrowsePathsV2(Urn urn, AuditStamp auditStamp) throws Exception {
BrowsePathsV2 browsePathsV2 = _entityService.buildDefaultBrowsePathV2(urn, true);
log.debug(String.format("Adding browse path v2 for urn %s with value %s", urn, browsePathsV2));
MetadataChangeProposal proposal = new MetadataChangeProposal();
proposal.setEntityUrn(urn);
proposal.setEntityType(urn.getEntityType());
proposal.setAspectName(Constants.BROWSE_PATHS_V2_ASPECT_NAME);
proposal.setChangeType(ChangeType.UPSERT);
proposal.setSystemMetadata(new SystemMetadata().setRunId(DEFAULT_RUN_ID).setLastObserved(System.currentTimeMillis()));
proposal.setAspect(GenericRecordUtils.serializeAspect(browsePathsV2));
_entityService.ingestProposal(
proposal,
auditStamp,
false
);
}
@Override
public String id() {
return "BackfillBrowsePathsV2Step";
}
/**
* Returns whether the upgrade should proceed if the step fails after exceeding the maximum retries.
*/
@Override
public boolean isOptional() {
return true;
}
@Override
public boolean skip(UpgradeContext context) {
return !Boolean.parseBoolean(System.getenv(BACKFILL_BROWSE_PATHS_V2));
}
}

View File

@ -20,6 +20,7 @@ DATAHUB_GMS_HOST=datahub-gms
DATAHUB_GMS_PORT=8080
ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
BACKFILL_BROWSE_PATHS_V2=true
# Uncomment and set these to support SSL connection to Elasticsearch
# ELASTICSEARCH_USE_SSL=

View File

@ -24,6 +24,7 @@ DATAHUB_GMS_HOST=datahub-gms
DATAHUB_GMS_PORT=8080
ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
BACKFILL_BROWSE_PATHS_V2=true
# Uncomment and set these to support SSL connection to Elasticsearch
# ELASTICSEARCH_USE_SSL=

View File

@ -146,6 +146,7 @@ services:
- DATAHUB_GMS_HOST=datahub-gms
- DATAHUB_GMS_PORT=8080
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- BACKFILL_BROWSE_PATHS_V2=true
hostname: datahub-upgrade
image: ${DATAHUB_UPGRADE_IMAGE:-acryldata/datahub-upgrade}:${DATAHUB_VERSION:-head}
labels:

View File

@ -139,6 +139,7 @@ services:
- DATAHUB_GMS_HOST=datahub-gms
- DATAHUB_GMS_PORT=8080
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- BACKFILL_BROWSE_PATHS_V2=true
hostname: datahub-upgrade
image: ${DATAHUB_UPGRADE_IMAGE:-acryldata/datahub-upgrade}:${DATAHUB_VERSION:-head}
labels:

View File

@ -139,6 +139,7 @@ services:
- DATAHUB_GMS_HOST=datahub-gms
- DATAHUB_GMS_PORT=8080
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- BACKFILL_BROWSE_PATHS_V2=true
hostname: datahub-upgrade
image: ${DATAHUB_UPGRADE_IMAGE:-acryldata/datahub-upgrade}:${DATAHUB_VERSION:-head}
labels:

View File

@ -146,6 +146,7 @@ services:
- DATAHUB_GMS_HOST=datahub-gms
- DATAHUB_GMS_PORT=8080
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- BACKFILL_BROWSE_PATHS_V2=true
hostname: datahub-upgrade
image: ${DATAHUB_UPGRADE_IMAGE:-acryldata/datahub-upgrade}:${DATAHUB_VERSION:-head}
labels:

View File

@ -270,7 +270,7 @@ bootstrap:
upgradeDefaultBrowsePaths:
enabled: ${UPGRADE_DEFAULT_BROWSE_PATHS_ENABLED:false} # enable to run the upgrade to migrate legacy default browse paths to new ones
backfillBrowsePathsV2:
enabled: ${BACKFILL_BROWSE_PATHS_V2:false} # Enables running the backfill of browsePathsV2 upgrade step. There are concerns about the load of this step so hiding it behind a flag.
enabled: ${BACKFILL_BROWSE_PATHS_V2:false} # Enables running the backfill of browsePathsV2 upgrade step. There are concerns about the load of this step so hiding it behind a flag. Deprecating in favor of running through SystemUpdate
systemUpdate:
initialBackOffMs: ${BOOTSTRAP_SYSTEM_UPDATE_INITIAL_BACK_OFF_MILLIS:5000}