feat(browsepathv2): Allow system-update to reprocess browse paths v2 (#9200)

This commit is contained in:
david-leifker 2023-11-07 18:22:18 -06:00 committed by GitHub
parent 23c98ecf7a
commit 353584c10a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 94 additions and 36 deletions

View File

@ -6,6 +6,7 @@ import com.linkedin.common.AuditStamp;
import com.linkedin.common.BrowsePathsV2;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.StringArray;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
@ -13,6 +14,7 @@ import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.metadata.query.filter.Condition;
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
@ -37,6 +39,8 @@ import static com.linkedin.metadata.Constants.*;
public class BackfillBrowsePathsV2Step implements UpgradeStep {
public static final String BACKFILL_BROWSE_PATHS_V2 = "BACKFILL_BROWSE_PATHS_V2";
public static final String REPROCESS_DEFAULT_BROWSE_PATHS_V2 = "REPROCESS_DEFAULT_BROWSE_PATHS_V2";
public static final String DEFAULT_BROWSE_PATH_V2 = "␟Default";
private static final Set<String> ENTITY_TYPES_TO_MIGRATE = ImmutableSet.of(
Constants.DATASET_ENTITY_NAME,
@ -81,6 +85,42 @@ public class BackfillBrowsePathsV2Step implements UpgradeStep {
private String backfillBrowsePathsV2(String entityType, AuditStamp auditStamp, String scrollId) {
final Filter filter;
if (System.getenv().containsKey(REPROCESS_DEFAULT_BROWSE_PATHS_V2)
&& Boolean.parseBoolean(System.getenv(REPROCESS_DEFAULT_BROWSE_PATHS_V2))) {
filter = backfillDefaultBrowsePathsV2Filter();
} else {
filter = backfillBrowsePathsV2Filter();
}
final ScrollResult scrollResult = _searchService.scrollAcrossEntities(
ImmutableList.of(entityType),
"*",
filter,
null,
scrollId,
null,
BATCH_SIZE,
new SearchFlags().setFulltext(true).setSkipCache(true).setSkipHighlighting(true).setSkipAggregates(true)
);
if (scrollResult.getNumEntities() == 0 || scrollResult.getEntities().size() == 0) {
return null;
}
for (SearchEntity searchEntity : scrollResult.getEntities()) {
try {
ingestBrowsePathsV2(searchEntity.getEntity(), auditStamp);
} catch (Exception e) {
// don't stop the whole step because of one bad urn or one bad ingestion
log.error(String.format("Error ingesting default browsePathsV2 aspect for urn %s", searchEntity.getEntity()), e);
}
}
return scrollResult.getScrollId();
}
private Filter backfillBrowsePathsV2Filter() {
// Condition: has `browsePaths` AND does NOT have `browsePathV2`
Criterion missingBrowsePathV2 = new Criterion();
missingBrowsePathV2.setCondition(Condition.IS_NULL);
@ -102,31 +142,31 @@ public class BackfillBrowsePathsV2Step implements UpgradeStep {
Filter filter = new Filter();
filter.setOr(conjunctiveCriterionArray);
return filter;
}
final ScrollResult scrollResult = _searchService.scrollAcrossEntities(
ImmutableList.of(entityType),
"*",
filter,
null,
scrollId,
"5m",
BATCH_SIZE,
null
);
if (scrollResult.getNumEntities() == 0 || scrollResult.getEntities().size() == 0) {
return null;
}
private Filter backfillDefaultBrowsePathsV2Filter() {
// Condition: has default `browsePathV2`
Criterion hasDefaultBrowsePathV2 = new Criterion();
hasDefaultBrowsePathV2.setCondition(Condition.EQUAL);
hasDefaultBrowsePathV2.setField("browsePathV2");
StringArray values = new StringArray();
values.add(DEFAULT_BROWSE_PATH_V2);
hasDefaultBrowsePathV2.setValues(values);
hasDefaultBrowsePathV2.setValue(DEFAULT_BROWSE_PATH_V2); // not used, but required field?
for (SearchEntity searchEntity : scrollResult.getEntities()) {
try {
ingestBrowsePathsV2(searchEntity.getEntity(), auditStamp);
} catch (Exception e) {
// don't stop the whole step because of one bad urn or one bad ingestion
log.error(String.format("Error ingesting default browsePathsV2 aspect for urn %s", searchEntity.getEntity()), e);
}
}
CriterionArray criterionArray = new CriterionArray();
criterionArray.add(hasDefaultBrowsePathV2);
return scrollResult.getScrollId();
ConjunctiveCriterion conjunctiveCriterion = new ConjunctiveCriterion();
conjunctiveCriterion.setAnd(criterionArray);
ConjunctiveCriterionArray conjunctiveCriterionArray = new ConjunctiveCriterionArray();
conjunctiveCriterionArray.add(conjunctiveCriterion);
Filter filter = new Filter();
filter.setOr(conjunctiveCriterionArray);
return filter;
}
private void ingestBrowsePathsV2(Urn urn, AuditStamp auditStamp) throws Exception {
@ -142,7 +182,7 @@ public class BackfillBrowsePathsV2Step implements UpgradeStep {
_entityService.ingestProposal(
proposal,
auditStamp,
false
true
);
}

View File

@ -21,6 +21,7 @@ DATAHUB_GMS_PORT=8080
ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
BACKFILL_BROWSE_PATHS_V2=true
REPROCESS_DEFAULT_BROWSE_PATHS_V2=${REPROCESS_DEFAULT_BROWSE_PATHS_V2:-false}
# Uncomment and set these to support SSL connection to Elasticsearch
# ELASTICSEARCH_USE_SSL=

View File

@ -25,6 +25,7 @@ DATAHUB_GMS_PORT=8080
ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
BACKFILL_BROWSE_PATHS_V2=true
REPROCESS_DEFAULT_BROWSE_PATHS_V2=${REPROCESS_DEFAULT_BROWSE_PATHS_V2:-false}
# Uncomment and set these to support SSL connection to Elasticsearch
# ELASTICSEARCH_USE_SSL=

View File

@ -54,6 +54,8 @@ services:
- ${HOME}/.datahub/plugins:/etc/datahub/plugins
datahub-upgrade:
image: acryldata/datahub-upgrade:debug
ports:
- ${DATAHUB_MAPPED_UPGRADE_DEBUG_PORT:-5003}:5003
build:
context: datahub-upgrade
dockerfile: Dockerfile
@ -63,6 +65,8 @@ services:
- SKIP_ELASTICSEARCH_CHECK=false
- DATAHUB_SERVER_TYPE=${DATAHUB_SERVER_TYPE:-dev}
- DATAHUB_TELEMETRY_ENABLED=${DATAHUB_TELEMETRY_ENABLED:-true}
- REPROCESS_DEFAULT_BROWSE_PATHS_V2=${REPROCESS_DEFAULT_BROWSE_PATHS_V2:-false}
- JAVA_TOOL_OPTIONS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5003
volumes:
- ../datahub-upgrade/build/libs/:/datahub/datahub-upgrade/bin/
- ../metadata-models/src/main/resources/:/datahub/datahub-gms/resources

View File

@ -151,6 +151,7 @@ services:
- DATAHUB_GMS_PORT=8080
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- BACKFILL_BROWSE_PATHS_V2=true
- REPROCESS_DEFAULT_BROWSE_PATHS_V2=false
hostname: datahub-upgrade
image: ${DATAHUB_UPGRADE_IMAGE:-acryldata/datahub-upgrade}:${DATAHUB_VERSION:-head}
labels:

View File

@ -144,6 +144,7 @@ services:
- DATAHUB_GMS_PORT=8080
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- BACKFILL_BROWSE_PATHS_V2=true
- REPROCESS_DEFAULT_BROWSE_PATHS_V2=false
hostname: datahub-upgrade
image: ${DATAHUB_UPGRADE_IMAGE:-acryldata/datahub-upgrade}:${DATAHUB_VERSION:-head}
labels:

View File

@ -144,6 +144,7 @@ services:
- DATAHUB_GMS_PORT=8080
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- BACKFILL_BROWSE_PATHS_V2=true
- REPROCESS_DEFAULT_BROWSE_PATHS_V2=false
hostname: datahub-upgrade
image: ${DATAHUB_UPGRADE_IMAGE:-acryldata/datahub-upgrade}:${DATAHUB_VERSION:-head}
labels:

View File

@ -151,6 +151,7 @@ services:
- DATAHUB_GMS_PORT=8080
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- BACKFILL_BROWSE_PATHS_V2=true
- REPROCESS_DEFAULT_BROWSE_PATHS_V2=false
hostname: datahub-upgrade
image: ${DATAHUB_UPGRADE_IMAGE:-acryldata/datahub-upgrade}:${DATAHUB_VERSION:-head}
labels:

View File

@ -16,7 +16,7 @@ import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import org.javatuples.Quintet;
import org.javatuples.Septet;
import org.javatuples.Sextet;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
@ -154,8 +154,9 @@ public class CachingEntitySearchService {
batchSize,
querySize -> getRawSearchResults(entityNames, query, filters, sortCriterion, querySize.getFrom(),
querySize.getSize(), flags, facets),
querySize -> Sextet.with(entityNames, query, filters != null ? toJsonString(filters) : null,
sortCriterion != null ? toJsonString(sortCriterion) : null, facets, querySize), flags, enableCache).getSearchResults(from, size);
querySize -> Septet.with(entityNames, query, filters != null ? toJsonString(filters) : null,
sortCriterion != null ? toJsonString(sortCriterion) : null, flags != null ? toJsonString(flags) : null,
facets, querySize), flags, enableCache).getSearchResults(from, size);
}
@ -175,7 +176,8 @@ public class CachingEntitySearchService {
if (enableCache(flags)) {
try (Timer.Context ignored2 = MetricUtils.timer(this.getClass(), "getCachedAutoCompleteResults_cache").time()) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "autocomplete_cache_access").time();
Object cacheKey = Quintet.with(entityName, input, field, filters != null ? toJsonString(filters) : null, limit);
Object cacheKey = Sextet.with(entityName, input, field, filters != null ? toJsonString(filters) : null,
flags != null ? toJsonString(flags) : null, limit);
String json = cache.get(cacheKey, String.class);
result = json != null ? toRecordTemplate(AutoCompleteResult.class, json) : null;
cacheAccess.stop();
@ -210,7 +212,8 @@ public class CachingEntitySearchService {
if (enableCache(flags)) {
try (Timer.Context ignored2 = MetricUtils.timer(this.getClass(), "getCachedBrowseResults_cache").time()) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "browse_cache_access").time();
Object cacheKey = Quintet.with(entityName, path, filters != null ? toJsonString(filters) : null, from, size);
Object cacheKey = Sextet.with(entityName, path, filters != null ? toJsonString(filters) : null,
flags != null ? toJsonString(flags) : null, from, size);
String json = cache.get(cacheKey, String.class);
result = json != null ? toRecordTemplate(BrowseResult.class, json) : null;
cacheAccess.stop();
@ -247,9 +250,10 @@ public class CachingEntitySearchService {
ScrollResult result;
if (enableCache(flags)) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "scroll_cache_access").time();
Object cacheKey = Sextet.with(entities, query,
Object cacheKey = Septet.with(entities, query,
filters != null ? toJsonString(filters) : null,
sortCriterion != null ? toJsonString(sortCriterion) : null,
flags != null ? toJsonString(flags) : null,
scrollId, size);
String json = cache.get(cacheKey, String.class);
result = json != null ? toRecordTemplate(ScrollResult.class, json) : null;

View File

@ -157,7 +157,7 @@ public class ESSearchDAO {
@Nonnull
@WithSpan
private ScrollResult executeAndExtract(@Nonnull List<EntitySpec> entitySpecs, @Nonnull SearchRequest searchRequest, @Nullable Filter filter,
@Nullable String scrollId, @Nonnull String keepAlive, int size) {
@Nullable String scrollId, @Nullable String keepAlive, int size) {
try (Timer.Context ignored = MetricUtils.timer(this.getClass(), "executeAndExtract_scroll").time()) {
final SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
// extract results, validated against document model as well
@ -166,7 +166,7 @@ public class ESSearchDAO {
.extractScrollResult(searchResponse,
filter, scrollId, keepAlive, size, supportsPointInTime()));
} catch (Exception e) {
log.error("Search query failed", e);
log.error("Search query failed: {}", searchRequest, e);
throw new ESQueryException("Search query failed:", e);
}
}

View File

@ -241,7 +241,9 @@ public class SearchRequestHandler {
BoolQueryBuilder filterQuery = getFilterQuery(filter);
searchSourceBuilder.query(QueryBuilders.boolQuery().must(getQuery(input, finalSearchFlags.isFulltext())).filter(filterQuery));
_aggregationQueryBuilder.getAggregations().forEach(searchSourceBuilder::aggregation);
if (!finalSearchFlags.isSkipAggregates()) {
_aggregationQueryBuilder.getAggregations().forEach(searchSourceBuilder::aggregation);
}
if (!finalSearchFlags.isSkipHighlighting()) {
searchSourceBuilder.highlighter(_highlights);
}
@ -366,7 +368,7 @@ public class SearchRequestHandler {
@WithSpan
public ScrollResult extractScrollResult(@Nonnull SearchResponse searchResponse, Filter filter, @Nullable String scrollId,
@Nonnull String keepAlive, int size, boolean supportsPointInTime) {
@Nullable String keepAlive, int size, boolean supportsPointInTime) {
int totalCount = (int) searchResponse.getHits().getTotalHits().value;
List<SearchEntity> resultList = getResults(searchResponse);
SearchResultMetadata searchResultMetadata = extractSearchResultMetadata(searchResponse, filter);
@ -376,7 +378,7 @@ public class SearchRequestHandler {
if (searchHits.length == size) {
Object[] sort = searchHits[searchHits.length - 1].getSortValues();
long expirationTimeMs = 0L;
if (supportsPointInTime) {
if (keepAlive != null && supportsPointInTime) {
expirationTimeMs = TimeValue.parseTimeValue(keepAlive, "expirationTime").getMillis() + System.currentTimeMillis();
}
nextScrollId = new SearchAfterWrapper(sort, searchResponse.pointInTimeId(), expirationTimeMs).toScrollId();

View File

@ -285,6 +285,8 @@ bootstrap:
enabled: ${UPGRADE_DEFAULT_BROWSE_PATHS_ENABLED:false} # enable to run the upgrade to migrate legacy default browse paths to new ones
backfillBrowsePathsV2:
enabled: ${BACKFILL_BROWSE_PATHS_V2:false} # Enables running the backfill of browsePathsV2 upgrade step. There are concerns about the load of this step so hiding it behind a flag. Deprecating in favor of running through SystemUpdate
reprocessDefaultBrowsePathsV2:
enabled: ${REPROCESS_DEFAULT_BROWSE_PATHS_V2:false} # reprocess V2 browse paths which were set to the default: {"path":[{"id":"Default"}]}
policies:
file: ${BOOTSTRAP_POLICIES_FILE:classpath:boot/policies.json}
# eg for local file

View File

@ -193,7 +193,7 @@ public interface EntitySearchService {
*/
@Nonnull
ScrollResult fullTextScroll(@Nonnull List<String> entities, @Nonnull String input, @Nullable Filter postFilters,
@Nullable SortCriterion sortCriterion, @Nullable String scrollId, @Nonnull String keepAlive, int size, @Nullable SearchFlags searchFlags);
@Nullable SortCriterion sortCriterion, @Nullable String scrollId, @Nullable String keepAlive, int size, @Nullable SearchFlags searchFlags);
/**
* Gets a list of documents that match given search request. The results are aggregated and filters are applied to the
@ -210,7 +210,7 @@ public interface EntitySearchService {
*/
@Nonnull
ScrollResult structuredScroll(@Nonnull List<String> entities, @Nonnull String input, @Nullable Filter postFilters,
@Nullable SortCriterion sortCriterion, @Nullable String scrollId, @Nonnull String keepAlive, int size, @Nullable SearchFlags searchFlags);
@Nullable SortCriterion sortCriterion, @Nullable String scrollId, @Nullable String keepAlive, int size, @Nullable SearchFlags searchFlags);
/**
* Max result size returned by the underlying search backend