feat(policies): Make policies searchable by privilege, type, status or editable fields (#9877)

This commit is contained in:
Pedro Silva 2024-02-26 15:57:01 +00:00 committed by GitHub
parent 14648f43dc
commit 21ecb7170f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 345 additions and 12 deletions

View File

@ -5,17 +5,25 @@ import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.bindArgument;
import com.datahub.authorization.PolicyFetcher;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.generated.AndFilterInput;
import com.linkedin.datahub.graphql.generated.FacetFilterInput;
import com.linkedin.datahub.graphql.generated.ListPoliciesInput;
import com.linkedin.datahub.graphql.generated.ListPoliciesResult;
import com.linkedin.datahub.graphql.generated.Policy;
import com.linkedin.datahub.graphql.resolvers.ResolverUtils;
import com.linkedin.datahub.graphql.resolvers.policy.mappers.PolicyInfoPolicyMapper;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.query.filter.Filter;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ListPoliciesResolver implements DataFetcher<CompletableFuture<ListPoliciesResult>> {
private static final Integer DEFAULT_START = 0;
@ -40,9 +48,20 @@ public class ListPoliciesResolver implements DataFetcher<CompletableFuture<ListP
final Integer start = input.getStart() == null ? DEFAULT_START : input.getStart();
final Integer count = input.getCount() == null ? DEFAULT_COUNT : input.getCount();
final String query = input.getQuery() == null ? DEFAULT_QUERY : input.getQuery();
final List<AndFilterInput> filters =
input.getOrFilters() != null ? input.getOrFilters() : new ArrayList<>();
final List<FacetFilterInput> facetFilters =
filters.stream()
.map(AndFilterInput::getAnd)
.flatMap(List::stream)
.collect(Collectors.toList());
log.debug(
"User {} listing policies with filters {}", context.getActorUrn(), filters.toString());
final Filter filter = ResolverUtils.buildFilter(facetFilters, Collections.emptyList());
return _policyFetcher
.fetchPolicies(start, query, count, context.getAuthentication())
.fetchPolicies(start, query, count, filter, context.getAuthentication())
.thenApply(
policyFetchResult -> {
final ListPoliciesResult result = new ListPoliciesResult();

View File

@ -8682,6 +8682,11 @@ input ListPoliciesInput {
Optional search query
"""
query: String
"""
A list of disjunctive criterion for the filter. (or operation to combine filters)
"""
orFilters: [AndFilterInput!]
}
"""

View File

@ -0,0 +1,23 @@
package com.linkedin.datahub.upgrade.config;
import com.linkedin.datahub.upgrade.system.entity.steps.BackfillPolicyFields;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.SearchService;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class BackfillPolicyFieldsConfig {
@Bean
public BackfillPolicyFields backfillPolicyFields(
EntityService<?> entityService,
SearchService searchService,
@Value("${systemUpdate.policyFields.enabled}") final boolean enabled,
@Value("${systemUpdate.policyFields.reprocess.enabled}") final boolean reprocessEnabled,
@Value("${systemUpdate.policyFields.batchSize}") final Integer batchSize) {
return new BackfillPolicyFields(
entityService, searchService, enabled, reprocessEnabled, batchSize);
}
}

View File

@ -4,6 +4,7 @@ import com.linkedin.datahub.upgrade.system.SystemUpdate;
import com.linkedin.datahub.upgrade.system.elasticsearch.BuildIndices;
import com.linkedin.datahub.upgrade.system.elasticsearch.CleanIndices;
import com.linkedin.datahub.upgrade.system.entity.steps.BackfillBrowsePathsV2;
import com.linkedin.datahub.upgrade.system.entity.steps.BackfillPolicyFields;
import com.linkedin.datahub.upgrade.system.via.ReindexDataJobViaNodesCLL;
import com.linkedin.gms.factory.common.TopicConventionFactory;
import com.linkedin.gms.factory.config.ConfigurationProvider;
@ -40,7 +41,8 @@ public class SystemUpdateConfig {
final GitVersion gitVersion,
@Qualifier("revision") String revision,
final BackfillBrowsePathsV2 backfillBrowsePathsV2,
final ReindexDataJobViaNodesCLL reindexDataJobViaNodesCLL) {
final ReindexDataJobViaNodesCLL reindexDataJobViaNodesCLL,
final BackfillPolicyFields backfillPolicyFields) {
String version = String.format("%s-%s", gitVersion.getVersion(), revision);
return new SystemUpdate(
@ -49,7 +51,8 @@ public class SystemUpdateConfig {
kafkaEventProducer,
version,
backfillBrowsePathsV2,
reindexDataJobViaNodesCLL);
reindexDataJobViaNodesCLL,
backfillPolicyFields);
}
@Value("#{systemEnvironment['DATAHUB_REVISION'] ?: '0'}")

View File

@ -7,6 +7,7 @@ import com.linkedin.datahub.upgrade.system.elasticsearch.BuildIndices;
import com.linkedin.datahub.upgrade.system.elasticsearch.CleanIndices;
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.DataHubStartupStep;
import com.linkedin.datahub.upgrade.system.entity.steps.BackfillBrowsePathsV2;
import com.linkedin.datahub.upgrade.system.entity.steps.BackfillPolicyFields;
import com.linkedin.datahub.upgrade.system.via.ReindexDataJobViaNodesCLL;
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
import java.util.List;
@ -26,11 +27,13 @@ public class SystemUpdate implements Upgrade {
final KafkaEventProducer kafkaEventProducer,
final String version,
final BackfillBrowsePathsV2 backfillBrowsePathsV2,
final ReindexDataJobViaNodesCLL upgradeViaNodeCll) {
final ReindexDataJobViaNodesCLL upgradeViaNodeCll,
final BackfillPolicyFields backfillPolicyFields) {
_preStartupUpgrades = List.of(buildIndicesJob);
_steps = List.of(new DataHubStartupStep(kafkaEventProducer, version));
_postStartupUpgrades = List.of(cleanIndicesJob, backfillBrowsePathsV2, upgradeViaNodeCll);
_postStartupUpgrades =
List.of(cleanIndicesJob, backfillBrowsePathsV2, upgradeViaNodeCll, backfillPolicyFields);
}
@Override

View File

@ -0,0 +1,38 @@
package com.linkedin.datahub.upgrade.system.entity.steps;
import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.Upgrade;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.SearchService;
import java.util.List;
public class BackfillPolicyFields implements Upgrade {
private final List<UpgradeStep> _steps;
public BackfillPolicyFields(
EntityService<?> entityService,
SearchService searchService,
boolean enabled,
boolean reprocessEnabled,
Integer batchSize) {
if (enabled) {
_steps =
ImmutableList.of(
new BackfillPolicyFieldsStep(
entityService, searchService, reprocessEnabled, batchSize));
} else {
_steps = ImmutableList.of();
}
}
@Override
public String id() {
return "BackfillPolicyFields";
}
@Override
public List<UpgradeStep> steps() {
return _steps;
}
}

View File

@ -0,0 +1,216 @@
package com.linkedin.datahub.upgrade.system.entity.steps;
import static com.linkedin.metadata.Constants.*;
import com.google.common.collect.ImmutableList;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.DataMap;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.entity.EntityResponse;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.metadata.query.filter.Condition;
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
import com.linkedin.metadata.query.filter.Criterion;
import com.linkedin.metadata.query.filter.CriterionArray;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.search.ScrollResult;
import com.linkedin.metadata.search.SearchEntity;
import com.linkedin.metadata.search.SearchService;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.policy.DataHubPolicyInfo;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
/**
* This bootstrap step is responsible for upgrading DataHub policy documents with new searchable
* fields in ES
*/
@Slf4j
public class BackfillPolicyFieldsStep implements UpgradeStep {
private static final String UPGRADE_ID = "BackfillPolicyFieldsStep";
private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID);
private final boolean reprocessEnabled;
private final Integer batchSize;
private final EntityService<?> entityService;
private final SearchService _searchService;
public BackfillPolicyFieldsStep(
EntityService<?> entityService,
SearchService searchService,
boolean reprocessEnabled,
Integer batchSize) {
this.entityService = entityService;
this._searchService = searchService;
this.reprocessEnabled = reprocessEnabled;
this.batchSize = batchSize;
}
@Override
public String id() {
return UPGRADE_ID;
}
@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
final AuditStamp auditStamp =
new AuditStamp()
.setActor(UrnUtils.getUrn(Constants.SYSTEM_ACTOR))
.setTime(System.currentTimeMillis());
String scrollId = null;
int migratedCount = 0;
do {
log.info("Upgrading batch of policies {}-{}", migratedCount, migratedCount + batchSize);
scrollId = backfillPolicies(auditStamp, scrollId);
migratedCount += batchSize;
} while (scrollId != null);
BootstrapStep.setUpgradeResult(UPGRADE_ID_URN, entityService);
return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED);
};
}
/**
* Returns whether the upgrade should proceed if the step fails after exceeding the maximum
* retries.
*/
@Override
public boolean isOptional() {
return true;
}
/**
* Returns whether the upgrade should be skipped. Uses previous run history or the environment
* variables REPROCESS_DEFAULT_POLICY_FIELDS & BACKFILL_BROWSE_PATHS_V2 to determine whether to
* skip.
*/
@Override
public boolean skip(UpgradeContext context) {
if (reprocessEnabled) {
return false;
}
boolean previouslyRun = entityService.exists(UPGRADE_ID_URN, true);
if (previouslyRun) {
log.info("{} was already run. Skipping.", id());
}
return previouslyRun;
}
private String backfillPolicies(AuditStamp auditStamp, String scrollId) {
final Filter filter = backfillPolicyFieldFilter();
final ScrollResult scrollResult =
_searchService.scrollAcrossEntities(
ImmutableList.of(Constants.POLICY_ENTITY_NAME),
"*",
filter,
null,
scrollId,
null,
batchSize,
new SearchFlags()
.setFulltext(true)
.setSkipCache(true)
.setSkipHighlighting(true)
.setSkipAggregates(true));
if (scrollResult.getNumEntities() == 0 || scrollResult.getEntities().isEmpty()) {
return null;
}
for (SearchEntity searchEntity : scrollResult.getEntities()) {
try {
ingestPolicyFields(searchEntity.getEntity(), auditStamp);
} catch (Exception e) {
// don't stop the whole step because of one bad urn or one bad ingestion
log.error(
String.format(
"Error ingesting default browsePathsV2 aspect for urn %s",
searchEntity.getEntity()),
e);
}
}
return scrollResult.getScrollId();
}
private Filter backfillPolicyFieldFilter() {
// Condition: Does not have at least 1 of: `privileges`, `editable`, `state` or `type`
ConjunctiveCriterionArray conjunctiveCriterionArray = new ConjunctiveCriterionArray();
conjunctiveCriterionArray.add(getCriterionForMissingField("privilege"));
conjunctiveCriterionArray.add(getCriterionForMissingField("editable"));
conjunctiveCriterionArray.add(getCriterionForMissingField("state"));
conjunctiveCriterionArray.add(getCriterionForMissingField("type"));
Filter filter = new Filter();
filter.setOr(conjunctiveCriterionArray);
return filter;
}
private void ingestPolicyFields(Urn urn, AuditStamp auditStamp) {
EntityResponse entityResponse = null;
try {
entityResponse =
entityService.getEntityV2(
urn.getEntityType(), urn, Collections.singleton(DATAHUB_POLICY_INFO_ASPECT_NAME));
} catch (URISyntaxException e) {
log.error(
String.format(
"Error getting DataHub Policy Info for entity with urn %s while restating policy information",
urn),
e);
}
if (entityResponse != null
&& entityResponse.getAspects().containsKey(DATAHUB_POLICY_INFO_ASPECT_NAME)) {
final DataMap dataMap =
entityResponse.getAspects().get(DATAHUB_POLICY_INFO_ASPECT_NAME).getValue().data();
final DataHubPolicyInfo infoAspect = new DataHubPolicyInfo(dataMap);
log.debug("Restating policy information for urn {} with value {}", urn, infoAspect);
MetadataChangeProposal proposal = new MetadataChangeProposal();
proposal.setEntityUrn(urn);
proposal.setEntityType(urn.getEntityType());
proposal.setAspectName(DATAHUB_POLICY_INFO_ASPECT_NAME);
proposal.setChangeType(ChangeType.RESTATE);
proposal.setSystemMetadata(
new SystemMetadata()
.setRunId(DEFAULT_RUN_ID)
.setLastObserved(System.currentTimeMillis()));
proposal.setAspect(GenericRecordUtils.serializeAspect(infoAspect));
entityService.ingestProposal(proposal, auditStamp, true);
}
}
@NotNull
private static ConjunctiveCriterion getCriterionForMissingField(String field) {
final Criterion missingPrivilegesField = new Criterion();
missingPrivilegesField.setCondition(Condition.IS_NULL);
missingPrivilegesField.setField(field);
final CriterionArray criterionArray = new CriterionArray();
criterionArray.add(missingPrivilegesField);
final ConjunctiveCriterion conjunctiveCriterion = new ConjunctiveCriterion();
conjunctiveCriterion.setAnd(criterionArray);
return conjunctiveCriterion;
}
}

View File

@ -27,11 +27,17 @@ record DataHubPolicyInfo {
/**
* The type of policy
*/
@Searchable = {
"fieldType": "KEYWORD"
}
type: string
/**
* The state of policy, ACTIVE or INACTIVE
*/
@Searchable = {
"fieldType": "KEYWORD"
}
state: string
/**
@ -42,6 +48,12 @@ record DataHubPolicyInfo {
/**
* The privileges that the policy grants.
*/
@Searchable = {
"/*": {
"fieldType": "KEYWORD",
"addToFilters": true
}
}
privileges: array[string]
/**
@ -52,6 +64,9 @@ record DataHubPolicyInfo {
/**
* Whether the policy should be editable via the UI
*/
@Searchable = {
"fieldType": "BOOLEAN"
}
editable: boolean = true
/**

View File

@ -278,7 +278,7 @@ public class DataHubAuthorizer implements Authorizer {
while (total == null || scrollId != null) {
try {
final PolicyFetcher.PolicyFetchResult policyFetchResult =
_policyFetcher.fetchPolicies(count, scrollId, _systemAuthentication);
_policyFetcher.fetchPolicies(count, scrollId, null, _systemAuthentication);
addPoliciesToCache(newCache, policyFetchResult.getPolicies());

View File

@ -9,6 +9,7 @@ import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.EnvelopedAspectMap;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.query.filter.SortCriterion;
import com.linkedin.metadata.query.filter.SortOrder;
import com.linkedin.metadata.search.ScrollResult;
@ -45,7 +46,7 @@ public class PolicyFetcher {
*/
@Deprecated
public CompletableFuture<PolicyFetchResult> fetchPolicies(
int start, String query, int count, Authentication authentication) {
int start, String query, int count, Filter filter, Authentication authentication) {
return CompletableFuture.supplyAsync(
() -> {
try {
@ -55,7 +56,8 @@ public class PolicyFetcher {
while (PolicyFetchResult.EMPTY.equals(result) && scrollId != null) {
PolicyFetchResult tmpResult =
fetchPolicies(query, count, scrollId.isEmpty() ? null : scrollId, authentication);
fetchPolicies(
query, count, scrollId.isEmpty() ? null : scrollId, filter, authentication);
fetchedResults += tmpResult.getPolicies().size();
scrollId = tmpResult.getScrollId();
if (fetchedResults > start) {
@ -71,13 +73,17 @@ public class PolicyFetcher {
}
public PolicyFetchResult fetchPolicies(
int count, @Nullable String scrollId, Authentication authentication)
int count, @Nullable String scrollId, Filter filter, Authentication authentication)
throws RemoteInvocationException, URISyntaxException {
return fetchPolicies("", count, scrollId, authentication);
return fetchPolicies("", count, scrollId, filter, authentication);
}
public PolicyFetchResult fetchPolicies(
String query, int count, @Nullable String scrollId, Authentication authentication)
String query,
int count,
@Nullable String scrollId,
Filter filter,
Authentication authentication)
throws RemoteInvocationException, URISyntaxException {
log.debug(String.format("Batch fetching policies. count: %s, scroll: %s", count, scrollId));
@ -86,7 +92,7 @@ public class PolicyFetcher {
_entityClient.scrollAcrossEntities(
List.of(POLICY_ENTITY_NAME),
query,
null,
filter,
scrollId,
null,
count,

View File

@ -322,6 +322,11 @@ systemUpdate:
batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_BROWSE_PATHS_V2_BATCH_SIZE:5000}
reprocess:
enabled: ${REPROCESS_DEFAULT_BROWSE_PATHS_V2:false}
policyFields:
enabled: ${BOOTSTRAP_SYSTEM_UPDATE_POLICY_FIELDS_ENABLED:true}
batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_POLICY_FIELDS_BATCH_SIZE:5000}
reprocess:
enabled: ${REPROCESS_DEFAULT_POLICY_FIELDS:false}
structuredProperties:
enabled: ${ENABLE_STRUCTURED_PROPERTIES_HOOK:true} # applies structured properties mappings