diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/policy/ListPoliciesResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/policy/ListPoliciesResolver.java index 87832b8c3a..c1e4bb7f83 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/policy/ListPoliciesResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/policy/ListPoliciesResolver.java @@ -5,17 +5,25 @@ import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.bindArgument; import com.datahub.authorization.PolicyFetcher; import com.linkedin.datahub.graphql.QueryContext; import com.linkedin.datahub.graphql.exception.AuthorizationException; +import com.linkedin.datahub.graphql.generated.AndFilterInput; +import com.linkedin.datahub.graphql.generated.FacetFilterInput; import com.linkedin.datahub.graphql.generated.ListPoliciesInput; import com.linkedin.datahub.graphql.generated.ListPoliciesResult; import com.linkedin.datahub.graphql.generated.Policy; +import com.linkedin.datahub.graphql.resolvers.ResolverUtils; import com.linkedin.datahub.graphql.resolvers.policy.mappers.PolicyInfoPolicyMapper; import com.linkedin.entity.client.EntityClient; +import com.linkedin.metadata.query.filter.Filter; import graphql.schema.DataFetcher; import graphql.schema.DataFetchingEnvironment; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +@Slf4j public class ListPoliciesResolver implements DataFetcher> { private static final Integer DEFAULT_START = 0; @@ -40,9 +48,20 @@ public class ListPoliciesResolver implements DataFetcher filters = + input.getOrFilters() != null ? input.getOrFilters() : new ArrayList<>(); + final List facetFilters = + filters.stream() + .map(AndFilterInput::getAnd) + .flatMap(List::stream) + .collect(Collectors.toList()); + log.debug( + "User {} listing policies with filters {}", context.getActorUrn(), filters.toString()); + + final Filter filter = ResolverUtils.buildFilter(facetFilters, Collections.emptyList()); return _policyFetcher - .fetchPolicies(start, query, count, context.getAuthentication()) + .fetchPolicies(start, query, count, filter, context.getAuthentication()) .thenApply( policyFetchResult -> { final ListPoliciesResult result = new ListPoliciesResult(); diff --git a/datahub-graphql-core/src/main/resources/entity.graphql b/datahub-graphql-core/src/main/resources/entity.graphql index b5fd4a9a52..d8dedf20e3 100644 --- a/datahub-graphql-core/src/main/resources/entity.graphql +++ b/datahub-graphql-core/src/main/resources/entity.graphql @@ -8682,6 +8682,11 @@ input ListPoliciesInput { Optional search query """ query: String + + """ + A list of disjunctive criterion for the filter. (or operation to combine filters) + """ + orFilters: [AndFilterInput!] } """ diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillPolicyFieldsConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillPolicyFieldsConfig.java new file mode 100644 index 0000000000..6da85a5c16 --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillPolicyFieldsConfig.java @@ -0,0 +1,23 @@ +package com.linkedin.datahub.upgrade.config; + +import com.linkedin.datahub.upgrade.system.entity.steps.BackfillPolicyFields; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.search.SearchService; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class BackfillPolicyFieldsConfig { + + @Bean + public BackfillPolicyFields backfillPolicyFields( + EntityService entityService, + SearchService searchService, + @Value("${systemUpdate.policyFields.enabled}") final boolean enabled, + @Value("${systemUpdate.policyFields.reprocess.enabled}") final boolean reprocessEnabled, + @Value("${systemUpdate.policyFields.batchSize}") final Integer batchSize) { + return new BackfillPolicyFields( + entityService, searchService, enabled, reprocessEnabled, batchSize); + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java index cde3a29248..17ad56ec80 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java @@ -4,6 +4,7 @@ import com.linkedin.datahub.upgrade.system.SystemUpdate; import com.linkedin.datahub.upgrade.system.elasticsearch.BuildIndices; import com.linkedin.datahub.upgrade.system.elasticsearch.CleanIndices; import com.linkedin.datahub.upgrade.system.entity.steps.BackfillBrowsePathsV2; +import com.linkedin.datahub.upgrade.system.entity.steps.BackfillPolicyFields; import com.linkedin.datahub.upgrade.system.via.ReindexDataJobViaNodesCLL; import com.linkedin.gms.factory.common.TopicConventionFactory; import com.linkedin.gms.factory.config.ConfigurationProvider; @@ -40,7 +41,8 @@ public class SystemUpdateConfig { final GitVersion gitVersion, @Qualifier("revision") String revision, final BackfillBrowsePathsV2 backfillBrowsePathsV2, - final ReindexDataJobViaNodesCLL reindexDataJobViaNodesCLL) { + final ReindexDataJobViaNodesCLL reindexDataJobViaNodesCLL, + final BackfillPolicyFields backfillPolicyFields) { String version = String.format("%s-%s", gitVersion.getVersion(), revision); return new SystemUpdate( @@ -49,7 +51,8 @@ public class SystemUpdateConfig { kafkaEventProducer, version, backfillBrowsePathsV2, - reindexDataJobViaNodesCLL); + reindexDataJobViaNodesCLL, + backfillPolicyFields); } @Value("#{systemEnvironment['DATAHUB_REVISION'] ?: '0'}") diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/SystemUpdate.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/SystemUpdate.java index ed9c8ddda4..f02c820066 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/SystemUpdate.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/SystemUpdate.java @@ -7,6 +7,7 @@ import com.linkedin.datahub.upgrade.system.elasticsearch.BuildIndices; import com.linkedin.datahub.upgrade.system.elasticsearch.CleanIndices; import com.linkedin.datahub.upgrade.system.elasticsearch.steps.DataHubStartupStep; import com.linkedin.datahub.upgrade.system.entity.steps.BackfillBrowsePathsV2; +import com.linkedin.datahub.upgrade.system.entity.steps.BackfillPolicyFields; import com.linkedin.datahub.upgrade.system.via.ReindexDataJobViaNodesCLL; import com.linkedin.metadata.dao.producer.KafkaEventProducer; import java.util.List; @@ -26,11 +27,13 @@ public class SystemUpdate implements Upgrade { final KafkaEventProducer kafkaEventProducer, final String version, final BackfillBrowsePathsV2 backfillBrowsePathsV2, - final ReindexDataJobViaNodesCLL upgradeViaNodeCll) { + final ReindexDataJobViaNodesCLL upgradeViaNodeCll, + final BackfillPolicyFields backfillPolicyFields) { _preStartupUpgrades = List.of(buildIndicesJob); _steps = List.of(new DataHubStartupStep(kafkaEventProducer, version)); - _postStartupUpgrades = List.of(cleanIndicesJob, backfillBrowsePathsV2, upgradeViaNodeCll); + _postStartupUpgrades = + List.of(cleanIndicesJob, backfillBrowsePathsV2, upgradeViaNodeCll, backfillPolicyFields); } @Override diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillPolicyFields.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillPolicyFields.java new file mode 100644 index 0000000000..3e1d385b87 --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillPolicyFields.java @@ -0,0 +1,38 @@ +package com.linkedin.datahub.upgrade.system.entity.steps; + +import com.google.common.collect.ImmutableList; +import com.linkedin.datahub.upgrade.Upgrade; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.search.SearchService; +import java.util.List; + +public class BackfillPolicyFields implements Upgrade { + private final List _steps; + + public BackfillPolicyFields( + EntityService entityService, + SearchService searchService, + boolean enabled, + boolean reprocessEnabled, + Integer batchSize) { + if (enabled) { + _steps = + ImmutableList.of( + new BackfillPolicyFieldsStep( + entityService, searchService, reprocessEnabled, batchSize)); + } else { + _steps = ImmutableList.of(); + } + } + + @Override + public String id() { + return "BackfillPolicyFields"; + } + + @Override + public List steps() { + return _steps; + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillPolicyFieldsStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillPolicyFieldsStep.java new file mode 100644 index 0000000000..27d48aa5e0 --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillPolicyFieldsStep.java @@ -0,0 +1,216 @@ +package com.linkedin.datahub.upgrade.system.entity.steps; + +import static com.linkedin.metadata.Constants.*; + +import com.google.common.collect.ImmutableList; +import com.linkedin.common.AuditStamp; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.data.DataMap; +import com.linkedin.datahub.upgrade.UpgradeContext; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.datahub.upgrade.UpgradeStepResult; +import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult; +import com.linkedin.entity.EntityResponse; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.Constants; +import com.linkedin.metadata.boot.BootstrapStep; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.query.SearchFlags; +import com.linkedin.metadata.query.filter.Condition; +import com.linkedin.metadata.query.filter.ConjunctiveCriterion; +import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray; +import com.linkedin.metadata.query.filter.Criterion; +import com.linkedin.metadata.query.filter.CriterionArray; +import com.linkedin.metadata.query.filter.Filter; +import com.linkedin.metadata.search.ScrollResult; +import com.linkedin.metadata.search.SearchEntity; +import com.linkedin.metadata.search.SearchService; +import com.linkedin.metadata.utils.GenericRecordUtils; +import com.linkedin.mxe.MetadataChangeProposal; +import com.linkedin.mxe.SystemMetadata; +import com.linkedin.policy.DataHubPolicyInfo; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; + +/** + * This bootstrap step is responsible for upgrading DataHub policy documents with new searchable + * fields in ES + */ +@Slf4j +public class BackfillPolicyFieldsStep implements UpgradeStep { + private static final String UPGRADE_ID = "BackfillPolicyFieldsStep"; + private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID); + private final boolean reprocessEnabled; + private final Integer batchSize; + private final EntityService entityService; + private final SearchService _searchService; + + public BackfillPolicyFieldsStep( + EntityService entityService, + SearchService searchService, + boolean reprocessEnabled, + Integer batchSize) { + this.entityService = entityService; + this._searchService = searchService; + this.reprocessEnabled = reprocessEnabled; + this.batchSize = batchSize; + } + + @Override + public String id() { + return UPGRADE_ID; + } + + @Override + public Function executable() { + return (context) -> { + final AuditStamp auditStamp = + new AuditStamp() + .setActor(UrnUtils.getUrn(Constants.SYSTEM_ACTOR)) + .setTime(System.currentTimeMillis()); + + String scrollId = null; + int migratedCount = 0; + do { + log.info("Upgrading batch of policies {}-{}", migratedCount, migratedCount + batchSize); + scrollId = backfillPolicies(auditStamp, scrollId); + migratedCount += batchSize; + } while (scrollId != null); + + BootstrapStep.setUpgradeResult(UPGRADE_ID_URN, entityService); + + return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED); + }; + } + + /** + * Returns whether the upgrade should proceed if the step fails after exceeding the maximum + * retries. + */ + @Override + public boolean isOptional() { + return true; + } + + /** + * Returns whether the upgrade should be skipped. Uses previous run history or the environment + * variables REPROCESS_DEFAULT_POLICY_FIELDS & BACKFILL_BROWSE_PATHS_V2 to determine whether to + * skip. + */ + @Override + public boolean skip(UpgradeContext context) { + + if (reprocessEnabled) { + return false; + } + + boolean previouslyRun = entityService.exists(UPGRADE_ID_URN, true); + if (previouslyRun) { + log.info("{} was already run. Skipping.", id()); + } + return previouslyRun; + } + + private String backfillPolicies(AuditStamp auditStamp, String scrollId) { + + final Filter filter = backfillPolicyFieldFilter(); + final ScrollResult scrollResult = + _searchService.scrollAcrossEntities( + ImmutableList.of(Constants.POLICY_ENTITY_NAME), + "*", + filter, + null, + scrollId, + null, + batchSize, + new SearchFlags() + .setFulltext(true) + .setSkipCache(true) + .setSkipHighlighting(true) + .setSkipAggregates(true)); + + if (scrollResult.getNumEntities() == 0 || scrollResult.getEntities().isEmpty()) { + return null; + } + + for (SearchEntity searchEntity : scrollResult.getEntities()) { + try { + ingestPolicyFields(searchEntity.getEntity(), auditStamp); + } catch (Exception e) { + // don't stop the whole step because of one bad urn or one bad ingestion + log.error( + String.format( + "Error ingesting default browsePathsV2 aspect for urn %s", + searchEntity.getEntity()), + e); + } + } + + return scrollResult.getScrollId(); + } + + private Filter backfillPolicyFieldFilter() { + // Condition: Does not have at least 1 of: `privileges`, `editable`, `state` or `type` + ConjunctiveCriterionArray conjunctiveCriterionArray = new ConjunctiveCriterionArray(); + + conjunctiveCriterionArray.add(getCriterionForMissingField("privilege")); + conjunctiveCriterionArray.add(getCriterionForMissingField("editable")); + conjunctiveCriterionArray.add(getCriterionForMissingField("state")); + conjunctiveCriterionArray.add(getCriterionForMissingField("type")); + + Filter filter = new Filter(); + filter.setOr(conjunctiveCriterionArray); + return filter; + } + + private void ingestPolicyFields(Urn urn, AuditStamp auditStamp) { + EntityResponse entityResponse = null; + try { + entityResponse = + entityService.getEntityV2( + urn.getEntityType(), urn, Collections.singleton(DATAHUB_POLICY_INFO_ASPECT_NAME)); + } catch (URISyntaxException e) { + log.error( + String.format( + "Error getting DataHub Policy Info for entity with urn %s while restating policy information", + urn), + e); + } + + if (entityResponse != null + && entityResponse.getAspects().containsKey(DATAHUB_POLICY_INFO_ASPECT_NAME)) { + final DataMap dataMap = + entityResponse.getAspects().get(DATAHUB_POLICY_INFO_ASPECT_NAME).getValue().data(); + final DataHubPolicyInfo infoAspect = new DataHubPolicyInfo(dataMap); + log.debug("Restating policy information for urn {} with value {}", urn, infoAspect); + MetadataChangeProposal proposal = new MetadataChangeProposal(); + proposal.setEntityUrn(urn); + proposal.setEntityType(urn.getEntityType()); + proposal.setAspectName(DATAHUB_POLICY_INFO_ASPECT_NAME); + proposal.setChangeType(ChangeType.RESTATE); + proposal.setSystemMetadata( + new SystemMetadata() + .setRunId(DEFAULT_RUN_ID) + .setLastObserved(System.currentTimeMillis())); + proposal.setAspect(GenericRecordUtils.serializeAspect(infoAspect)); + entityService.ingestProposal(proposal, auditStamp, true); + } + } + + @NotNull + private static ConjunctiveCriterion getCriterionForMissingField(String field) { + final Criterion missingPrivilegesField = new Criterion(); + missingPrivilegesField.setCondition(Condition.IS_NULL); + missingPrivilegesField.setField(field); + + final CriterionArray criterionArray = new CriterionArray(); + criterionArray.add(missingPrivilegesField); + final ConjunctiveCriterion conjunctiveCriterion = new ConjunctiveCriterion(); + conjunctiveCriterion.setAnd(criterionArray); + return conjunctiveCriterion; + } +} diff --git a/metadata-models/src/main/pegasus/com/linkedin/policy/DataHubPolicyInfo.pdl b/metadata-models/src/main/pegasus/com/linkedin/policy/DataHubPolicyInfo.pdl index 8e43e531eb..503cadd5d7 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/policy/DataHubPolicyInfo.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/policy/DataHubPolicyInfo.pdl @@ -27,11 +27,17 @@ record DataHubPolicyInfo { /** * The type of policy */ + @Searchable = { + "fieldType": "KEYWORD" + } type: string /** * The state of policy, ACTIVE or INACTIVE */ + @Searchable = { + "fieldType": "KEYWORD" + } state: string /** @@ -42,6 +48,12 @@ record DataHubPolicyInfo { /** * The privileges that the policy grants. */ + @Searchable = { + "/*": { + "fieldType": "KEYWORD", + "addToFilters": true + } + } privileges: array[string] /** @@ -52,6 +64,9 @@ record DataHubPolicyInfo { /** * Whether the policy should be editable via the UI */ + @Searchable = { + "fieldType": "BOOLEAN" + } editable: boolean = true /** diff --git a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/DataHubAuthorizer.java b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/DataHubAuthorizer.java index 9ae95bd4e9..350d57aae3 100644 --- a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/DataHubAuthorizer.java +++ b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/DataHubAuthorizer.java @@ -278,7 +278,7 @@ public class DataHubAuthorizer implements Authorizer { while (total == null || scrollId != null) { try { final PolicyFetcher.PolicyFetchResult policyFetchResult = - _policyFetcher.fetchPolicies(count, scrollId, _systemAuthentication); + _policyFetcher.fetchPolicies(count, scrollId, null, _systemAuthentication); addPoliciesToCache(newCache, policyFetchResult.getPolicies()); diff --git a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/PolicyFetcher.java b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/PolicyFetcher.java index 9c5950985e..0485e3000a 100644 --- a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/PolicyFetcher.java +++ b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/PolicyFetcher.java @@ -9,6 +9,7 @@ import com.linkedin.entity.EntityResponse; import com.linkedin.entity.EnvelopedAspectMap; import com.linkedin.entity.client.EntityClient; import com.linkedin.metadata.query.SearchFlags; +import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.query.filter.SortCriterion; import com.linkedin.metadata.query.filter.SortOrder; import com.linkedin.metadata.search.ScrollResult; @@ -45,7 +46,7 @@ public class PolicyFetcher { */ @Deprecated public CompletableFuture fetchPolicies( - int start, String query, int count, Authentication authentication) { + int start, String query, int count, Filter filter, Authentication authentication) { return CompletableFuture.supplyAsync( () -> { try { @@ -55,7 +56,8 @@ public class PolicyFetcher { while (PolicyFetchResult.EMPTY.equals(result) && scrollId != null) { PolicyFetchResult tmpResult = - fetchPolicies(query, count, scrollId.isEmpty() ? null : scrollId, authentication); + fetchPolicies( + query, count, scrollId.isEmpty() ? null : scrollId, filter, authentication); fetchedResults += tmpResult.getPolicies().size(); scrollId = tmpResult.getScrollId(); if (fetchedResults > start) { @@ -71,13 +73,17 @@ public class PolicyFetcher { } public PolicyFetchResult fetchPolicies( - int count, @Nullable String scrollId, Authentication authentication) + int count, @Nullable String scrollId, Filter filter, Authentication authentication) throws RemoteInvocationException, URISyntaxException { - return fetchPolicies("", count, scrollId, authentication); + return fetchPolicies("", count, scrollId, filter, authentication); } public PolicyFetchResult fetchPolicies( - String query, int count, @Nullable String scrollId, Authentication authentication) + String query, + int count, + @Nullable String scrollId, + Filter filter, + Authentication authentication) throws RemoteInvocationException, URISyntaxException { log.debug(String.format("Batch fetching policies. count: %s, scroll: %s", count, scrollId)); @@ -86,7 +92,7 @@ public class PolicyFetcher { _entityClient.scrollAcrossEntities( List.of(POLICY_ENTITY_NAME), query, - null, + filter, scrollId, null, count, diff --git a/metadata-service/configuration/src/main/resources/application.yml b/metadata-service/configuration/src/main/resources/application.yml index 29dd32a375..494c18b75f 100644 --- a/metadata-service/configuration/src/main/resources/application.yml +++ b/metadata-service/configuration/src/main/resources/application.yml @@ -322,6 +322,11 @@ systemUpdate: batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_BROWSE_PATHS_V2_BATCH_SIZE:5000} reprocess: enabled: ${REPROCESS_DEFAULT_BROWSE_PATHS_V2:false} + policyFields: + enabled: ${BOOTSTRAP_SYSTEM_UPDATE_POLICY_FIELDS_ENABLED:true} + batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_POLICY_FIELDS_BATCH_SIZE:5000} + reprocess: + enabled: ${REPROCESS_DEFAULT_POLICY_FIELDS:false} structuredProperties: enabled: ${ENABLE_STRUCTURED_PROPERTIES_HOOK:true} # applies structured properties mappings