mirror of
https://github.com/datahub-project/datahub.git
synced 2025-08-19 14:44:37 +00:00
This reverts commit 8185ba441cf9266c3c811f2d0b13cdd38b7df6ba.
This commit is contained in:
parent
3353451561
commit
3584d64f1c
@ -1,20 +1,26 @@
|
|||||||
package com.linkedin.datahub.graphql.resolvers.policy;
|
package com.linkedin.datahub.graphql.resolvers.policy;
|
||||||
|
|
||||||
import com.datahub.authorization.PolicyFetcher;
|
import com.linkedin.common.urn.Urn;
|
||||||
import com.linkedin.datahub.graphql.QueryContext;
|
import com.linkedin.datahub.graphql.QueryContext;
|
||||||
import com.linkedin.datahub.graphql.exception.AuthorizationException;
|
import com.linkedin.datahub.graphql.exception.AuthorizationException;
|
||||||
import com.linkedin.datahub.graphql.generated.ListPoliciesInput;
|
import com.linkedin.datahub.graphql.generated.ListPoliciesInput;
|
||||||
import com.linkedin.datahub.graphql.generated.ListPoliciesResult;
|
import com.linkedin.datahub.graphql.generated.ListPoliciesResult;
|
||||||
import com.linkedin.datahub.graphql.generated.Policy;
|
import com.linkedin.datahub.graphql.generated.Policy;
|
||||||
import com.linkedin.datahub.graphql.resolvers.policy.mappers.PolicyInfoPolicyMapper;
|
import com.linkedin.datahub.graphql.types.policy.mappers.PolicyMapper;
|
||||||
|
import com.linkedin.entity.EntityResponse;
|
||||||
import com.linkedin.entity.client.EntityClient;
|
import com.linkedin.entity.client.EntityClient;
|
||||||
|
import com.linkedin.metadata.query.ListUrnsResult;
|
||||||
import graphql.schema.DataFetcher;
|
import graphql.schema.DataFetcher;
|
||||||
import graphql.schema.DataFetchingEnvironment;
|
import graphql.schema.DataFetchingEnvironment;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.bindArgument;
|
import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;
|
||||||
|
import static com.linkedin.metadata.Constants.*;
|
||||||
|
|
||||||
|
|
||||||
public class ListPoliciesResolver implements DataFetcher<CompletableFuture<ListPoliciesResult>> {
|
public class ListPoliciesResolver implements DataFetcher<CompletableFuture<ListPoliciesResult>> {
|
||||||
@ -22,10 +28,10 @@ public class ListPoliciesResolver implements DataFetcher<CompletableFuture<ListP
|
|||||||
private static final Integer DEFAULT_START = 0;
|
private static final Integer DEFAULT_START = 0;
|
||||||
private static final Integer DEFAULT_COUNT = 20;
|
private static final Integer DEFAULT_COUNT = 20;
|
||||||
|
|
||||||
private final PolicyFetcher _policyFetcher;
|
private final EntityClient _entityClient;
|
||||||
|
|
||||||
public ListPoliciesResolver(final EntityClient entityClient) {
|
public ListPoliciesResolver(final EntityClient entityClient) {
|
||||||
_policyFetcher = new PolicyFetcher(entityClient);
|
_entityClient = entityClient;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -41,16 +47,20 @@ public class ListPoliciesResolver implements DataFetcher<CompletableFuture<ListP
|
|||||||
return CompletableFuture.supplyAsync(() -> {
|
return CompletableFuture.supplyAsync(() -> {
|
||||||
try {
|
try {
|
||||||
// First, get all policy Urns.
|
// First, get all policy Urns.
|
||||||
final PolicyFetcher.PolicyFetchResult policyFetchResult =
|
final ListUrnsResult gmsResult = _entityClient.listUrns(POLICY_ENTITY_NAME, start, count, context.getAuthentication());
|
||||||
_policyFetcher.fetchPolicies(start, count, context.getAuthentication());
|
|
||||||
|
// Then, get all policies. TODO: Migrate batchGet to return GenericAspects, to avoid requiring a snapshot.
|
||||||
|
final Map<Urn, EntityResponse> entities = _entityClient.batchGetV2(POLICY_ENTITY_NAME,
|
||||||
|
new HashSet<>(gmsResult.getEntities()), null, context.getAuthentication());
|
||||||
|
|
||||||
// Now that we have entities we can bind this to a result.
|
// Now that we have entities we can bind this to a result.
|
||||||
final ListPoliciesResult result = new ListPoliciesResult();
|
final ListPoliciesResult result = new ListPoliciesResult();
|
||||||
result.setStart(start);
|
result.setStart(gmsResult.getStart());
|
||||||
result.setCount(count);
|
result.setCount(gmsResult.getCount());
|
||||||
result.setTotal(policyFetchResult.getTotal());
|
result.setTotal(gmsResult.getTotal());
|
||||||
result.setPolicies(mapEntities(policyFetchResult.getPolicies()));
|
result.setPolicies(mapEntities(entities.values()));
|
||||||
return result;
|
return result;
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new RuntimeException("Failed to list policies", e);
|
throw new RuntimeException("Failed to list policies", e);
|
||||||
}
|
}
|
||||||
@ -59,11 +69,9 @@ public class ListPoliciesResolver implements DataFetcher<CompletableFuture<ListP
|
|||||||
throw new AuthorizationException("Unauthorized to perform this action. Please contact your DataHub administrator.");
|
throw new AuthorizationException("Unauthorized to perform this action. Please contact your DataHub administrator.");
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<Policy> mapEntities(final List<PolicyFetcher.Policy> policies) {
|
private List<Policy> mapEntities(final Collection<EntityResponse> entities) {
|
||||||
return policies.stream().map(policy -> {
|
return entities.stream()
|
||||||
Policy mappedPolicy = PolicyInfoPolicyMapper.map(policy.getPolicyInfo());
|
.map(PolicyMapper::map)
|
||||||
mappedPolicy.setUrn(policy.getUrn().toString());
|
.collect(Collectors.toList());
|
||||||
return mappedPolicy;
|
|
||||||
}).collect(Collectors.toList());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -60,8 +60,6 @@ public class UpsertPolicyResolver implements DataFetcher<CompletableFuture<Strin
|
|||||||
|
|
||||||
// Create the policy info.
|
// Create the policy info.
|
||||||
final DataHubPolicyInfo info = PolicyUpdateInputInfoMapper.map(input);
|
final DataHubPolicyInfo info = PolicyUpdateInputInfoMapper.map(input);
|
||||||
info.setLastUpdatedTimestamp(System.currentTimeMillis());
|
|
||||||
|
|
||||||
proposal.setEntityType(POLICY_ENTITY_NAME);
|
proposal.setEntityType(POLICY_ENTITY_NAME);
|
||||||
proposal.setAspectName(POLICY_INFO_ASPECT_NAME);
|
proposal.setAspectName(POLICY_INFO_ASPECT_NAME);
|
||||||
proposal.setAspect(GenericRecordUtils.serializeAspect(info));
|
proposal.setAspect(GenericRecordUtils.serializeAspect(info));
|
||||||
|
@ -44,7 +44,7 @@ public class SearchResolver implements DataFetcher<CompletableFuture<SearchResul
|
|||||||
log.debug("Executing search. entity type {}, query {}, filters: {}, start: {}, count: {}", input.getType(),
|
log.debug("Executing search. entity type {}, query {}, filters: {}, start: {}, count: {}", input.getType(),
|
||||||
input.getQuery(), input.getFilters(), start, count);
|
input.getQuery(), input.getFilters(), start, count);
|
||||||
return UrnSearchResultsMapper.map(
|
return UrnSearchResultsMapper.map(
|
||||||
_entityClient.search(entityName, sanitizedQuery, ResolverUtils.buildFilter(input.getFilters()), null, start,
|
_entityClient.search(entityName, sanitizedQuery, ResolverUtils.buildFilter(input.getFilters()), start,
|
||||||
count, ResolverUtils.getAuthentication(environment)));
|
count, ResolverUtils.getAuthentication(environment)));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("Failed to execute search: entity type {}, query {}, filters: {}, start: {}, count: {}",
|
log.error("Failed to execute search: entity type {}, query {}, filters: {}, start: {}, count: {}",
|
||||||
|
@ -0,0 +1,37 @@
|
|||||||
|
package com.linkedin.datahub.graphql.types.policy.mappers;
|
||||||
|
|
||||||
|
import com.linkedin.data.DataMap;
|
||||||
|
import com.linkedin.datahub.graphql.generated.Policy;
|
||||||
|
import com.linkedin.datahub.graphql.resolvers.policy.mappers.PolicyInfoPolicyMapper;
|
||||||
|
import com.linkedin.datahub.graphql.types.mappers.ModelMapper;
|
||||||
|
import com.linkedin.entity.EntityResponse;
|
||||||
|
import com.linkedin.entity.EnvelopedAspectMap;
|
||||||
|
import com.linkedin.policy.DataHubPolicyInfo;
|
||||||
|
import javax.annotation.Nonnull;
|
||||||
|
|
||||||
|
import static com.linkedin.metadata.Constants.*;
|
||||||
|
|
||||||
|
|
||||||
|
public class PolicyMapper implements ModelMapper<EntityResponse, Policy> {
|
||||||
|
|
||||||
|
public static final PolicyMapper INSTANCE = new PolicyMapper();
|
||||||
|
|
||||||
|
public static Policy map(@Nonnull final EntityResponse entityResponse) {
|
||||||
|
return INSTANCE.apply(entityResponse);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Policy apply(EntityResponse entityResponse) {
|
||||||
|
EnvelopedAspectMap aspectMap = entityResponse.getAspects();
|
||||||
|
if (!aspectMap.containsKey(DATAHUB_POLICY_INFO_ASPECT_NAME)) {
|
||||||
|
// If the policy exists, it should always have DataHubPolicyInfo.
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
String.format("Failed to find DataHubPolicyInfo aspect in DataHubPolicy data %s. Invalid state.",
|
||||||
|
entityResponse));
|
||||||
|
}
|
||||||
|
DataMap dataMap = aspectMap.get(DATAHUB_POLICY_INFO_ASPECT_NAME).getValue().data();
|
||||||
|
Policy result = PolicyInfoPolicyMapper.map(new DataHubPolicyInfo(dataMap));
|
||||||
|
result.setUrn(entityResponse.getUrn().toString());
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
@ -163,18 +163,25 @@ export const PoliciesPage = () => {
|
|||||||
loading: policiesLoading,
|
loading: policiesLoading,
|
||||||
error: policiesError,
|
error: policiesError,
|
||||||
data: policiesData,
|
data: policiesData,
|
||||||
refetch: policiesRefetch,
|
|
||||||
} = useListPoliciesQuery({
|
} = useListPoliciesQuery({
|
||||||
fetchPolicy: 'no-cache',
|
fetchPolicy: 'no-cache',
|
||||||
variables: { input: { start, count: pageSize } },
|
variables: { input: { start, count: pageSize } },
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const listPoliciesQuery = 'listPolicies';
|
||||||
|
|
||||||
// Any time a policy is removed, edited, or created, refetch the list.
|
// Any time a policy is removed, edited, or created, refetch the list.
|
||||||
const [createPolicy, { error: createPolicyError }] = useCreatePolicyMutation();
|
const [createPolicy, { error: createPolicyError }] = useCreatePolicyMutation({
|
||||||
|
refetchQueries: () => [listPoliciesQuery],
|
||||||
|
});
|
||||||
|
|
||||||
const [updatePolicy, { error: updatePolicyError }] = useUpdatePolicyMutation();
|
const [updatePolicy, { error: updatePolicyError }] = useUpdatePolicyMutation({
|
||||||
|
refetchQueries: () => [listPoliciesQuery],
|
||||||
|
});
|
||||||
|
|
||||||
const [deletePolicy, { error: deletePolicyError }] = useDeletePolicyMutation();
|
const [deletePolicy, { error: deletePolicyError }] = useDeletePolicyMutation({
|
||||||
|
refetchQueries: () => [listPoliciesQuery],
|
||||||
|
});
|
||||||
|
|
||||||
const updateError = createPolicyError || updatePolicyError || deletePolicyError;
|
const updateError = createPolicyError || updatePolicyError || deletePolicyError;
|
||||||
|
|
||||||
@ -237,9 +244,6 @@ export const PoliciesPage = () => {
|
|||||||
content: `Are you sure you want to remove policy?`,
|
content: `Are you sure you want to remove policy?`,
|
||||||
onOk() {
|
onOk() {
|
||||||
deletePolicy({ variables: { urn: policy?.urn as string } }); // There must be a focus policy urn.
|
deletePolicy({ variables: { urn: policy?.urn as string } }); // There must be a focus policy urn.
|
||||||
setTimeout(function () {
|
|
||||||
policiesRefetch();
|
|
||||||
}, 2000);
|
|
||||||
onCancelViewPolicy();
|
onCancelViewPolicy();
|
||||||
},
|
},
|
||||||
onCancel() {},
|
onCancel() {},
|
||||||
@ -278,9 +282,6 @@ export const PoliciesPage = () => {
|
|||||||
createPolicy({ variables: { input: toPolicyInput(savePolicy) } });
|
createPolicy({ variables: { input: toPolicyInput(savePolicy) } });
|
||||||
}
|
}
|
||||||
message.success('Successfully saved policy.');
|
message.success('Successfully saved policy.');
|
||||||
setTimeout(function () {
|
|
||||||
policiesRefetch();
|
|
||||||
}, 2000);
|
|
||||||
onClosePolicyBuilder();
|
onClosePolicyBuilder();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -11,9 +11,6 @@ record DataHubPolicyInfo {
|
|||||||
/**
|
/**
|
||||||
* Display name of the Policy
|
* Display name of the Policy
|
||||||
*/
|
*/
|
||||||
@Searchable = {
|
|
||||||
"fieldType": "KEYWORD"
|
|
||||||
}
|
|
||||||
displayName: string
|
displayName: string
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -51,11 +48,4 @@ record DataHubPolicyInfo {
|
|||||||
*/
|
*/
|
||||||
editable: boolean = true
|
editable: boolean = true
|
||||||
|
|
||||||
/**
|
|
||||||
* Timestamp when the policy was last updated
|
|
||||||
*/
|
|
||||||
@Searchable = {
|
|
||||||
"fieldType": "DATETIME"
|
|
||||||
}
|
|
||||||
lastUpdatedTimestamp: optional long
|
|
||||||
}
|
}
|
@ -6,11 +6,17 @@ import com.linkedin.common.Owner;
|
|||||||
import com.linkedin.common.Ownership;
|
import com.linkedin.common.Ownership;
|
||||||
import com.linkedin.common.urn.Urn;
|
import com.linkedin.common.urn.Urn;
|
||||||
import com.linkedin.common.urn.UrnUtils;
|
import com.linkedin.common.urn.UrnUtils;
|
||||||
|
import com.linkedin.entity.EntityResponse;
|
||||||
|
import com.linkedin.entity.EnvelopedAspectMap;
|
||||||
import com.linkedin.entity.client.EntityClient;
|
import com.linkedin.entity.client.EntityClient;
|
||||||
import com.linkedin.metadata.authorization.PoliciesConfig;
|
import com.linkedin.metadata.authorization.PoliciesConfig;
|
||||||
|
import com.linkedin.metadata.query.ListUrnsResult;
|
||||||
import com.linkedin.policy.DataHubPolicyInfo;
|
import com.linkedin.policy.DataHubPolicyInfo;
|
||||||
|
import com.linkedin.r2.RemoteInvocationException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
@ -19,11 +25,12 @@ import java.util.concurrent.ScheduledExecutorService;
|
|||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
import lombok.RequiredArgsConstructor;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
import static com.linkedin.metadata.Constants.CORP_GROUP_ENTITY_NAME;
|
import static com.linkedin.metadata.Constants.CORP_GROUP_ENTITY_NAME;
|
||||||
import static com.linkedin.metadata.Constants.CORP_USER_ENTITY_NAME;
|
import static com.linkedin.metadata.Constants.CORP_USER_ENTITY_NAME;
|
||||||
|
import static com.linkedin.metadata.Constants.DATAHUB_POLICY_INFO_ASPECT_NAME;
|
||||||
|
import static com.linkedin.metadata.Constants.POLICY_ENTITY_NAME;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -73,7 +80,7 @@ public class DataHubAuthorizer implements Authorizer {
|
|||||||
final int refreshIntervalSeconds,
|
final int refreshIntervalSeconds,
|
||||||
final AuthorizationMode mode) {
|
final AuthorizationMode mode) {
|
||||||
_systemAuthentication = systemAuthentication;
|
_systemAuthentication = systemAuthentication;
|
||||||
_policyRefreshRunnable = new PolicyRefreshRunnable(systemAuthentication, new PolicyFetcher(entityClient), _policyCache);
|
_policyRefreshRunnable = new PolicyRefreshRunnable(systemAuthentication, entityClient, _policyCache);
|
||||||
_refreshExecutorService.scheduleAtFixedRate(_policyRefreshRunnable, delayIntervalSeconds, refreshIntervalSeconds, TimeUnit.SECONDS);
|
_refreshExecutorService.scheduleAtFixedRate(_policyRefreshRunnable, delayIntervalSeconds, refreshIntervalSeconds, TimeUnit.SECONDS);
|
||||||
_mode = mode;
|
_mode = mode;
|
||||||
_resourceSpecResolver = new ResourceSpecResolver(systemAuthentication, entityClient);
|
_resourceSpecResolver = new ResourceSpecResolver(systemAuthentication, entityClient);
|
||||||
@ -206,13 +213,21 @@ public class DataHubAuthorizer implements Authorizer {
|
|||||||
* entire cache using Policies stored in the backend.
|
* entire cache using Policies stored in the backend.
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
@RequiredArgsConstructor
|
|
||||||
static class PolicyRefreshRunnable implements Runnable {
|
static class PolicyRefreshRunnable implements Runnable {
|
||||||
|
|
||||||
private final Authentication _systemAuthentication;
|
private final Authentication _systemAuthentication;
|
||||||
private final PolicyFetcher _policyFetcher;
|
private final EntityClient _entityClient;
|
||||||
private final Map<String, List<DataHubPolicyInfo>> _policyCache;
|
private final Map<String, List<DataHubPolicyInfo>> _policyCache;
|
||||||
|
|
||||||
|
public PolicyRefreshRunnable(
|
||||||
|
final Authentication systemAuthentication,
|
||||||
|
final EntityClient entityClient,
|
||||||
|
final Map<String, List<DataHubPolicyInfo>> policyCache) {
|
||||||
|
_systemAuthentication = systemAuthentication;
|
||||||
|
_entityClient = entityClient;
|
||||||
|
_policyCache = policyCache;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
@ -225,16 +240,18 @@ public class DataHubAuthorizer implements Authorizer {
|
|||||||
|
|
||||||
while (start < total) {
|
while (start < total) {
|
||||||
try {
|
try {
|
||||||
final PolicyFetcher.PolicyFetchResult
|
log.debug(String.format("Batch fetching policies. start: %s, count: %s ", start, count));
|
||||||
policyFetchResult = _policyFetcher.fetchPolicies(start, count, _systemAuthentication);
|
final ListUrnsResult policyUrns = _entityClient.listUrns(POLICY_ENTITY_NAME, start, count, _systemAuthentication);
|
||||||
|
final Map<Urn, EntityResponse> policyEntities = _entityClient.batchGetV2(POLICY_ENTITY_NAME,
|
||||||
|
new HashSet<>(policyUrns.getEntities()), null, _systemAuthentication);
|
||||||
|
|
||||||
addPoliciesToCache(newCache, policyFetchResult.getPolicies());
|
addPoliciesToCache(newCache, policyEntities.values());
|
||||||
|
|
||||||
total = policyFetchResult.getTotal();
|
total = policyUrns.getTotal();
|
||||||
start = start + count;
|
start = start + count;
|
||||||
} catch (Exception e) {
|
} catch (RemoteInvocationException e) {
|
||||||
log.error(
|
log.error(String.format(
|
||||||
"Failed to retrieve policy urns! Skipping updating policy cache until next refresh. start: {}, count: {}", start, count, e);
|
"Failed to retrieve policy urns! Skipping updating policy cache until next refresh. start: %s, count: %s", start, count), e);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
synchronized (_policyCache) {
|
synchronized (_policyCache) {
|
||||||
@ -249,8 +266,19 @@ public class DataHubAuthorizer implements Authorizer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void addPoliciesToCache(final Map<String, List<DataHubPolicyInfo>> cache,
|
private void addPoliciesToCache(final Map<String, List<DataHubPolicyInfo>> cache,
|
||||||
final List<PolicyFetcher.Policy> policies) {
|
final Collection<EntityResponse> entityResponses) {
|
||||||
policies.forEach(policy -> addPolicyToCache(cache, policy.getPolicyInfo()));
|
for (final EntityResponse entityResponse : entityResponses) {
|
||||||
|
addPolicyToCache(cache, entityResponse);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addPolicyToCache(final Map<String, List<DataHubPolicyInfo>> cache, final EntityResponse entityResponse) {
|
||||||
|
EnvelopedAspectMap aspectMap = entityResponse.getAspects();
|
||||||
|
if (!aspectMap.containsKey(DATAHUB_POLICY_INFO_ASPECT_NAME)) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
String.format("Failed to find DataHubPolicyInfo aspect in DataHubPolicy data %s. Invalid state.", aspectMap));
|
||||||
|
}
|
||||||
|
addPolicyToCache(cache, new DataHubPolicyInfo(aspectMap.get(DATAHUB_POLICY_INFO_ASPECT_NAME).getValue().data()));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addPolicyToCache(final Map<String, List<DataHubPolicyInfo>> cache, final DataHubPolicyInfo policy) {
|
private void addPolicyToCache(final Map<String, List<DataHubPolicyInfo>> cache, final DataHubPolicyInfo policy) {
|
||||||
|
@ -1,85 +0,0 @@
|
|||||||
package com.datahub.authorization;
|
|
||||||
|
|
||||||
import com.datahub.authentication.Authentication;
|
|
||||||
import com.linkedin.common.urn.Urn;
|
|
||||||
import com.linkedin.entity.EntityResponse;
|
|
||||||
import com.linkedin.entity.EnvelopedAspectMap;
|
|
||||||
import com.linkedin.entity.client.EntityClient;
|
|
||||||
import com.linkedin.metadata.query.filter.SortCriterion;
|
|
||||||
import com.linkedin.metadata.query.filter.SortOrder;
|
|
||||||
import com.linkedin.metadata.search.SearchEntity;
|
|
||||||
import com.linkedin.metadata.search.SearchResult;
|
|
||||||
import com.linkedin.policy.DataHubPolicyInfo;
|
|
||||||
import com.linkedin.r2.RemoteInvocationException;
|
|
||||||
import java.net.URISyntaxException;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
import lombok.RequiredArgsConstructor;
|
|
||||||
import lombok.Value;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
|
|
||||||
import static com.linkedin.metadata.Constants.DATAHUB_POLICY_INFO_ASPECT_NAME;
|
|
||||||
import static com.linkedin.metadata.Constants.POLICY_ENTITY_NAME;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrapper around entity client to fetch policies in a paged manner
|
|
||||||
*/
|
|
||||||
@Slf4j
|
|
||||||
@RequiredArgsConstructor
|
|
||||||
public class PolicyFetcher {
|
|
||||||
private final EntityClient _entityClient;
|
|
||||||
|
|
||||||
private static final SortCriterion POLICY_SORT_CRITERION =
|
|
||||||
new SortCriterion().setField("lastUpdatedTimestamp").setOrder(SortOrder.DESCENDING);
|
|
||||||
|
|
||||||
public PolicyFetchResult fetchPolicies(int start, int count, Authentication authentication)
|
|
||||||
throws RemoteInvocationException, URISyntaxException {
|
|
||||||
log.debug(String.format("Batch fetching policies. start: %s, count: %s ", start, count));
|
|
||||||
// First fetch all policy urns from start - start + count
|
|
||||||
SearchResult result =
|
|
||||||
_entityClient.search(POLICY_ENTITY_NAME, "*", null, POLICY_SORT_CRITERION, start, count, authentication);
|
|
||||||
List<Urn> policyUrns = result.getEntities().stream().map(SearchEntity::getEntity).collect(Collectors.toList());
|
|
||||||
|
|
||||||
if (policyUrns.isEmpty()) {
|
|
||||||
return new PolicyFetchResult(Collections.emptyList(), 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fetch DataHubPolicyInfo aspects for each urn
|
|
||||||
final Map<Urn, EntityResponse> policyEntities =
|
|
||||||
_entityClient.batchGetV2(POLICY_ENTITY_NAME, new HashSet<>(policyUrns), null, authentication);
|
|
||||||
return new PolicyFetchResult(policyUrns.stream()
|
|
||||||
.map(policyEntities::get)
|
|
||||||
.filter(Objects::nonNull)
|
|
||||||
.map(this::extractPolicy)
|
|
||||||
.filter(Objects::nonNull)
|
|
||||||
.collect(Collectors.toList()), result.getNumEntities());
|
|
||||||
}
|
|
||||||
|
|
||||||
private Policy extractPolicy(EntityResponse entityResponse) {
|
|
||||||
EnvelopedAspectMap aspectMap = entityResponse.getAspects();
|
|
||||||
if (!aspectMap.containsKey(DATAHUB_POLICY_INFO_ASPECT_NAME)) {
|
|
||||||
// Right after deleting the policy, there could be a small time frame where search and local db is not consistent.
|
|
||||||
// Simply return null in that case
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return new Policy(entityResponse.getUrn(),
|
|
||||||
new DataHubPolicyInfo(aspectMap.get(DATAHUB_POLICY_INFO_ASPECT_NAME).getValue().data()));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Value
|
|
||||||
public static class PolicyFetchResult {
|
|
||||||
List<Policy> policies;
|
|
||||||
int total;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Value
|
|
||||||
public static class Policy {
|
|
||||||
Urn urn;
|
|
||||||
DataHubPolicyInfo policyInfo;
|
|
||||||
}
|
|
||||||
}
|
|
@ -19,9 +19,7 @@ import com.linkedin.entity.EntityResponse;
|
|||||||
import com.linkedin.entity.EnvelopedAspect;
|
import com.linkedin.entity.EnvelopedAspect;
|
||||||
import com.linkedin.entity.EnvelopedAspectMap;
|
import com.linkedin.entity.EnvelopedAspectMap;
|
||||||
import com.linkedin.entity.client.EntityClient;
|
import com.linkedin.entity.client.EntityClient;
|
||||||
import com.linkedin.metadata.search.SearchEntity;
|
import com.linkedin.metadata.query.ListUrnsResult;
|
||||||
import com.linkedin.metadata.search.SearchEntityArray;
|
|
||||||
import com.linkedin.metadata.search.SearchResult;
|
|
||||||
import com.linkedin.policy.DataHubActorFilter;
|
import com.linkedin.policy.DataHubActorFilter;
|
||||||
import com.linkedin.policy.DataHubPolicyInfo;
|
import com.linkedin.policy.DataHubPolicyInfo;
|
||||||
import com.linkedin.policy.DataHubResourceFilter;
|
import com.linkedin.policy.DataHubResourceFilter;
|
||||||
@ -38,7 +36,6 @@ import static com.linkedin.metadata.Constants.POLICY_ENTITY_NAME;
|
|||||||
import static com.linkedin.metadata.authorization.PoliciesConfig.ACTIVE_POLICY_STATE;
|
import static com.linkedin.metadata.authorization.PoliciesConfig.ACTIVE_POLICY_STATE;
|
||||||
import static com.linkedin.metadata.authorization.PoliciesConfig.INACTIVE_POLICY_STATE;
|
import static com.linkedin.metadata.authorization.PoliciesConfig.INACTIVE_POLICY_STATE;
|
||||||
import static com.linkedin.metadata.authorization.PoliciesConfig.METADATA_POLICY_TYPE;
|
import static com.linkedin.metadata.authorization.PoliciesConfig.METADATA_POLICY_TYPE;
|
||||||
import static org.mockito.ArgumentMatchers.isNull;
|
|
||||||
import static org.mockito.Mockito.any;
|
import static org.mockito.Mockito.any;
|
||||||
import static org.mockito.Mockito.anyInt;
|
import static org.mockito.Mockito.anyInt;
|
||||||
import static org.mockito.Mockito.eq;
|
import static org.mockito.Mockito.eq;
|
||||||
@ -70,15 +67,18 @@ public class DataHubAuthorizerTest {
|
|||||||
final EnvelopedAspectMap inactiveAspectMap = new EnvelopedAspectMap();
|
final EnvelopedAspectMap inactiveAspectMap = new EnvelopedAspectMap();
|
||||||
inactiveAspectMap.put(DATAHUB_POLICY_INFO_ASPECT_NAME, new EnvelopedAspect().setValue(new Aspect(inactivePolicy.data())));
|
inactiveAspectMap.put(DATAHUB_POLICY_INFO_ASPECT_NAME, new EnvelopedAspect().setValue(new Aspect(inactivePolicy.data())));
|
||||||
|
|
||||||
final SearchResult policySearchResult = new SearchResult();
|
final ListUrnsResult listUrnsResult = new ListUrnsResult();
|
||||||
policySearchResult.setNumEntities(2);
|
listUrnsResult.setStart(0);
|
||||||
policySearchResult.setEntities(new SearchEntityArray(ImmutableList.of(new SearchEntity().setEntity(activePolicyUrn),
|
listUrnsResult.setTotal(2);
|
||||||
new SearchEntity().setEntity(inactivePolicyUrn))));
|
listUrnsResult.setCount(2);
|
||||||
|
UrnArray policyUrns = new UrnArray(ImmutableList.of(
|
||||||
|
activePolicyUrn,
|
||||||
|
inactivePolicyUrn));
|
||||||
|
listUrnsResult.setEntities(policyUrns);
|
||||||
|
|
||||||
when(_entityClient.search(eq("dataHubPolicy"), eq("*"), isNull(), any(), anyInt(), anyInt(), any())).thenReturn(
|
when(_entityClient.listUrns(eq("dataHubPolicy"), eq(0), anyInt(), any())).thenReturn(listUrnsResult);
|
||||||
policySearchResult);
|
|
||||||
when(_entityClient.batchGetV2(eq(POLICY_ENTITY_NAME),
|
when(_entityClient.batchGetV2(eq(POLICY_ENTITY_NAME),
|
||||||
eq(ImmutableSet.of(activePolicyUrn, inactivePolicyUrn)), eq(null), any())).thenReturn(
|
eq(new HashSet<>(listUrnsResult.getEntities())), eq(null), any())).thenReturn(
|
||||||
ImmutableMap.of(
|
ImmutableMap.of(
|
||||||
activePolicyUrn, new EntityResponse().setUrn(activePolicyUrn).setAspects(activeAspectMap),
|
activePolicyUrn, new EntityResponse().setUrn(activePolicyUrn).setAspects(activeAspectMap),
|
||||||
inactivePolicyUrn, new EntityResponse().setUrn(inactivePolicyUrn).setAspects(inactiveAspectMap)
|
inactivePolicyUrn, new EntityResponse().setUrn(inactivePolicyUrn).setAspects(inactiveAspectMap)
|
||||||
@ -188,14 +188,17 @@ public class DataHubAuthorizerTest {
|
|||||||
assertEquals(_dataHubAuthorizer.authorize(request).getType(), AuthorizationResult.Type.ALLOW);
|
assertEquals(_dataHubAuthorizer.authorize(request).getType(), AuthorizationResult.Type.ALLOW);
|
||||||
|
|
||||||
// Now init the mocks to return 0 policies.
|
// Now init the mocks to return 0 policies.
|
||||||
final SearchResult emptyResult = new SearchResult();
|
final ListUrnsResult emptyUrnsResult = new ListUrnsResult();
|
||||||
emptyResult.setNumEntities(0);
|
emptyUrnsResult.setStart(0);
|
||||||
emptyResult.setEntities(new SearchEntityArray());
|
emptyUrnsResult.setTotal(0);
|
||||||
|
emptyUrnsResult.setCount(0);
|
||||||
|
emptyUrnsResult.setEntities(new UrnArray(Collections.emptyList()));
|
||||||
|
|
||||||
when(_entityClient.search(eq("dataHubPolicy"), eq("*"), isNull(), any(), anyInt(), anyInt(), any())).thenReturn(
|
when(_entityClient.listUrns(eq("dataHubPolicy"), eq(0), anyInt(), any())).thenReturn(emptyUrnsResult);
|
||||||
emptyResult);
|
when(_entityClient.batchGetV2(eq(POLICY_ENTITY_NAME), eq(new HashSet<>(emptyUrnsResult.getEntities())),
|
||||||
when(_entityClient.batchGetV2(eq(POLICY_ENTITY_NAME), eq(Collections.emptySet()), eq(null), any())).thenReturn(
|
eq(null), any())).thenReturn(
|
||||||
Collections.emptyMap());
|
Collections.emptyMap()
|
||||||
|
);
|
||||||
|
|
||||||
// Invalidate Cache.
|
// Invalidate Cache.
|
||||||
_dataHubAuthorizer.invalidateCache();
|
_dataHubAuthorizer.invalidateCache();
|
||||||
|
@ -2,8 +2,6 @@ package com.linkedin.metadata.boot.factories;
|
|||||||
|
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.linkedin.gms.factory.entity.EntityServiceFactory;
|
import com.linkedin.gms.factory.entity.EntityServiceFactory;
|
||||||
import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory;
|
|
||||||
import com.linkedin.gms.factory.search.EntitySearchServiceFactory;
|
|
||||||
import com.linkedin.metadata.boot.BootstrapManager;
|
import com.linkedin.metadata.boot.BootstrapManager;
|
||||||
import com.linkedin.metadata.boot.steps.IngestDataPlatformInstancesStep;
|
import com.linkedin.metadata.boot.steps.IngestDataPlatformInstancesStep;
|
||||||
import com.linkedin.metadata.boot.steps.IngestDataPlatformsStep;
|
import com.linkedin.metadata.boot.steps.IngestDataPlatformsStep;
|
||||||
@ -11,8 +9,6 @@ import com.linkedin.metadata.boot.steps.IngestPoliciesStep;
|
|||||||
import com.linkedin.metadata.boot.steps.IngestRetentionPoliciesStep;
|
import com.linkedin.metadata.boot.steps.IngestRetentionPoliciesStep;
|
||||||
import com.linkedin.metadata.boot.steps.IngestRootUserStep;
|
import com.linkedin.metadata.boot.steps.IngestRootUserStep;
|
||||||
import com.linkedin.metadata.entity.EntityService;
|
import com.linkedin.metadata.entity.EntityService;
|
||||||
import com.linkedin.metadata.models.registry.EntityRegistry;
|
|
||||||
import com.linkedin.metadata.search.EntitySearchService;
|
|
||||||
import io.ebean.EbeanServer;
|
import io.ebean.EbeanServer;
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@ -24,21 +20,13 @@ import org.springframework.context.annotation.Scope;
|
|||||||
|
|
||||||
|
|
||||||
@Configuration
|
@Configuration
|
||||||
@Import({EntityServiceFactory.class, EntityRegistryFactory.class, EntitySearchServiceFactory.class})
|
@Import({EntityServiceFactory.class})
|
||||||
public class BootstrapManagerFactory {
|
public class BootstrapManagerFactory {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
@Qualifier("entityService")
|
@Qualifier("entityService")
|
||||||
private EntityService _entityService;
|
private EntityService _entityService;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
@Qualifier("entityRegistry")
|
|
||||||
private EntityRegistry _entityRegistry;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
@Qualifier("entitySearchService")
|
|
||||||
private EntitySearchService _entitySearchService;
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
@Qualifier("ebeanServer")
|
@Qualifier("ebeanServer")
|
||||||
private EbeanServer _server;
|
private EbeanServer _server;
|
||||||
@ -52,8 +40,7 @@ public class BootstrapManagerFactory {
|
|||||||
@Nonnull
|
@Nonnull
|
||||||
protected BootstrapManager createInstance() {
|
protected BootstrapManager createInstance() {
|
||||||
final IngestRootUserStep ingestRootUserStep = new IngestRootUserStep(_entityService);
|
final IngestRootUserStep ingestRootUserStep = new IngestRootUserStep(_entityService);
|
||||||
final IngestPoliciesStep ingestPoliciesStep =
|
final IngestPoliciesStep ingestPoliciesStep = new IngestPoliciesStep(_entityService);
|
||||||
new IngestPoliciesStep(_entityRegistry, _entityService, _entitySearchService);
|
|
||||||
final IngestDataPlatformsStep ingestDataPlatformsStep = new IngestDataPlatformsStep(_entityService);
|
final IngestDataPlatformsStep ingestDataPlatformsStep = new IngestDataPlatformsStep(_entityService);
|
||||||
final IngestDataPlatformInstancesStep ingestDataPlatformInstancesStep =
|
final IngestDataPlatformInstancesStep ingestDataPlatformInstancesStep =
|
||||||
new IngestDataPlatformInstancesStep(_entityService, _server);
|
new IngestDataPlatformInstancesStep(_entityService, _server);
|
||||||
|
@ -1,21 +1,16 @@
|
|||||||
package com.linkedin.metadata.boot.steps;
|
package com.linkedin.metadata.boot.steps;
|
||||||
|
|
||||||
import com.datahub.util.RecordUtils;
|
|
||||||
import com.fasterxml.jackson.databind.JsonNode;
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.linkedin.common.AuditStamp;
|
import com.linkedin.common.AuditStamp;
|
||||||
import com.linkedin.common.urn.Urn;
|
import com.linkedin.common.urn.Urn;
|
||||||
import com.linkedin.data.template.RecordTemplate;
|
import com.linkedin.data.template.RecordTemplate;
|
||||||
import com.linkedin.entity.EntityResponse;
|
|
||||||
import com.linkedin.entity.EnvelopedAspect;
|
|
||||||
import com.linkedin.events.metadata.ChangeType;
|
|
||||||
import com.linkedin.metadata.Constants;
|
import com.linkedin.metadata.Constants;
|
||||||
import com.linkedin.metadata.boot.BootstrapStep;
|
import com.linkedin.metadata.boot.BootstrapStep;
|
||||||
|
import com.linkedin.events.metadata.ChangeType;
|
||||||
|
import com.datahub.util.RecordUtils;
|
||||||
import com.linkedin.metadata.entity.EntityService;
|
import com.linkedin.metadata.entity.EntityService;
|
||||||
import com.linkedin.metadata.models.AspectSpec;
|
import com.linkedin.metadata.models.AspectSpec;
|
||||||
import com.linkedin.metadata.models.registry.EntityRegistry;
|
|
||||||
import com.linkedin.metadata.query.ListUrnsResult;
|
|
||||||
import com.linkedin.metadata.search.EntitySearchService;
|
|
||||||
import com.linkedin.metadata.utils.EntityKeyUtils;
|
import com.linkedin.metadata.utils.EntityKeyUtils;
|
||||||
import com.linkedin.metadata.utils.GenericRecordUtils;
|
import com.linkedin.metadata.utils.GenericRecordUtils;
|
||||||
import com.linkedin.mxe.GenericAspect;
|
import com.linkedin.mxe.GenericAspect;
|
||||||
@ -23,24 +18,22 @@ import com.linkedin.mxe.MetadataChangeProposal;
|
|||||||
import com.linkedin.policy.DataHubPolicyInfo;
|
import com.linkedin.policy.DataHubPolicyInfo;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.util.Collections;
|
import java.util.Iterator;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Map;
|
|
||||||
import lombok.RequiredArgsConstructor;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.core.io.ClassPathResource;
|
import org.springframework.core.io.ClassPathResource;
|
||||||
|
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@RequiredArgsConstructor
|
|
||||||
public class IngestPoliciesStep implements BootstrapStep {
|
public class IngestPoliciesStep implements BootstrapStep {
|
||||||
|
|
||||||
private static final String POLICY_ENTITY_NAME = "dataHubPolicy";
|
private static final String POLICY_ENTITY_NAME = "dataHubPolicy";
|
||||||
private static final String POLICY_INFO_ASPECT_NAME = "dataHubPolicyInfo";
|
private static final String POLICY_INFO_ASPECT_NAME = "dataHubPolicyInfo";
|
||||||
|
|
||||||
private final EntityRegistry _entityRegistry;
|
|
||||||
private final EntityService _entityService;
|
private final EntityService _entityService;
|
||||||
private final EntitySearchService _entitySearchService;
|
|
||||||
|
public IngestPoliciesStep(final EntityService entityService) {
|
||||||
|
_entityService = entityService;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String name() {
|
public String name() {
|
||||||
@ -55,21 +48,19 @@ public class IngestPoliciesStep implements BootstrapStep {
|
|||||||
// 0. Execute preflight check to see whether we need to ingest policies
|
// 0. Execute preflight check to see whether we need to ingest policies
|
||||||
log.info("Ingesting default access policies...");
|
log.info("Ingesting default access policies...");
|
||||||
|
|
||||||
|
// Whether we are at clean boot or not.
|
||||||
|
final boolean hasDefaultPolicies = hasDefaultPolicies();
|
||||||
|
|
||||||
// 1. Read from the file into JSON.
|
// 1. Read from the file into JSON.
|
||||||
final JsonNode policiesObj = mapper.readTree(new ClassPathResource("./boot/policies.json").getFile());
|
final JsonNode policiesObj = mapper.readTree(new ClassPathResource("./boot/policies.json").getFile());
|
||||||
|
|
||||||
if (!policiesObj.isArray()) {
|
if (!policiesObj.isArray()) {
|
||||||
throw new RuntimeException(
|
throw new RuntimeException(String.format("Found malformed policies file, expected an Array but found %s", policiesObj.getNodeType()));
|
||||||
String.format("Found malformed policies file, expected an Array but found %s", policiesObj.getNodeType()));
|
|
||||||
}
|
|
||||||
|
|
||||||
// If search index for policies is empty, send MCLs for all policies to ingest policies into the search index
|
|
||||||
if (_entitySearchService.docCount(Constants.POLICY_ENTITY_NAME) == 0) {
|
|
||||||
sendMCL();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. For each JSON object, cast into a DataHub Policy Info object.
|
// 2. For each JSON object, cast into a DataHub Policy Info object.
|
||||||
for (final JsonNode policyObj : policiesObj) {
|
for (Iterator<JsonNode> it = policiesObj.iterator(); it.hasNext(); ) {
|
||||||
|
final JsonNode policyObj = it.next();
|
||||||
final Urn urn = Urn.createFromString(policyObj.get("urn").asText());
|
final Urn urn = Urn.createFromString(policyObj.get("urn").asText());
|
||||||
|
|
||||||
// If the info is not there, it means that the policy was there before, but must now be removed
|
// If the info is not there, it means that the policy was there before, but must now be removed
|
||||||
@ -78,8 +69,7 @@ public class IngestPoliciesStep implements BootstrapStep {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
final DataHubPolicyInfo info =
|
final DataHubPolicyInfo info = RecordUtils.toRecordTemplate(DataHubPolicyInfo.class, policyObj.get("info").toString());
|
||||||
RecordUtils.toRecordTemplate(DataHubPolicyInfo.class, policyObj.get("info").toString());
|
|
||||||
|
|
||||||
if (!info.isEditable()) {
|
if (!info.isEditable()) {
|
||||||
// If the Policy is not editable, always re-ingest.
|
// If the Policy is not editable, always re-ingest.
|
||||||
@ -98,45 +88,11 @@ public class IngestPoliciesStep implements BootstrapStep {
|
|||||||
log.info("Successfully ingested default access policies.");
|
log.info("Successfully ingested default access policies.");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Send MCLs for each policy to refill the policy search index
|
|
||||||
*/
|
|
||||||
private void sendMCL() throws URISyntaxException {
|
|
||||||
log.info("Pushing MCLs for all policies");
|
|
||||||
AspectSpec policyInfoAspectSpec = _entityRegistry.getEntitySpec(Constants.POLICY_ENTITY_NAME)
|
|
||||||
.getAspectSpec(Constants.DATAHUB_POLICY_INFO_ASPECT_NAME);
|
|
||||||
int start = 0;
|
|
||||||
int count = 30;
|
|
||||||
int total = 100;
|
|
||||||
while (start < total) {
|
|
||||||
ListUrnsResult listUrnsResult = _entityService.listUrns(Constants.POLICY_ENTITY_NAME, start, count);
|
|
||||||
total = listUrnsResult.getTotal();
|
|
||||||
start = start + count;
|
|
||||||
|
|
||||||
final Map<Urn, EntityResponse> policyEntities =
|
|
||||||
_entityService.getEntitiesV2(POLICY_ENTITY_NAME, new HashSet<>(listUrnsResult.getEntities()),
|
|
||||||
Collections.singleton(Constants.DATAHUB_POLICY_INFO_ASPECT_NAME));
|
|
||||||
policyEntities.values().forEach(entityResponse -> sendMCL(entityResponse, policyInfoAspectSpec));
|
|
||||||
}
|
|
||||||
log.info("Successfully pushed MCLs for all policies");
|
|
||||||
}
|
|
||||||
|
|
||||||
private void sendMCL(EntityResponse entityResponse, AspectSpec aspectSpec) {
|
|
||||||
EnvelopedAspect aspect = entityResponse.getAspects().get(Constants.DATAHUB_POLICY_INFO_ASPECT_NAME);
|
|
||||||
if (aspect == null) {
|
|
||||||
throw new RuntimeException(String.format("Missing policy info aspect for urn %s", entityResponse.getUrn()));
|
|
||||||
}
|
|
||||||
_entityService.produceMetadataChangeLog(entityResponse.getUrn(), Constants.POLICY_ENTITY_NAME,
|
|
||||||
Constants.DATAHUB_POLICY_INFO_ASPECT_NAME, aspectSpec, null, new DataHubPolicyInfo(aspect.getValue().data()),
|
|
||||||
null, null, aspect.getCreated(), ChangeType.UPSERT);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void ingestPolicy(final Urn urn, final DataHubPolicyInfo info) throws URISyntaxException {
|
private void ingestPolicy(final Urn urn, final DataHubPolicyInfo info) throws URISyntaxException {
|
||||||
// 3. Write key & aspect
|
// 3. Write key & aspect
|
||||||
final MetadataChangeProposal keyAspectProposal = new MetadataChangeProposal();
|
final MetadataChangeProposal keyAspectProposal = new MetadataChangeProposal();
|
||||||
final AspectSpec keyAspectSpec = _entityService.getKeyAspectSpec(urn);
|
final AspectSpec keyAspectSpec = _entityService.getKeyAspectSpec(urn);
|
||||||
GenericAspect aspect =
|
GenericAspect aspect = GenericRecordUtils.serializeAspect(EntityKeyUtils.convertUrnToEntityKey(urn, keyAspectSpec.getPegasusSchema()));
|
||||||
GenericRecordUtils.serializeAspect(EntityKeyUtils.convertUrnToEntityKey(urn, keyAspectSpec.getPegasusSchema()));
|
|
||||||
keyAspectProposal.setAspect(aspect);
|
keyAspectProposal.setAspect(aspect);
|
||||||
keyAspectProposal.setAspectName(keyAspectSpec.getName());
|
keyAspectProposal.setAspectName(keyAspectSpec.getName());
|
||||||
keyAspectProposal.setEntityType(POLICY_ENTITY_NAME);
|
keyAspectProposal.setEntityType(POLICY_ENTITY_NAME);
|
||||||
@ -157,6 +113,14 @@ public class IngestPoliciesStep implements BootstrapStep {
|
|||||||
new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis()));
|
new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean hasDefaultPolicies() throws URISyntaxException {
|
||||||
|
// If there are already default policies, denoted by presence of policy 0, don't ingest bootstrap policies.
|
||||||
|
// This will retain any changes made to policies after initial bootstrap.
|
||||||
|
final Urn defaultPolicyUrn = Urn.createFromString("urn:li:dataHubPolicy:0");
|
||||||
|
RecordTemplate aspect = _entityService.getAspect(defaultPolicyUrn, POLICY_INFO_ASPECT_NAME, 0);
|
||||||
|
return aspect != null;
|
||||||
|
}
|
||||||
|
|
||||||
private boolean hasPolicy(Urn policyUrn) {
|
private boolean hasPolicy(Urn policyUrn) {
|
||||||
// Check if policy exists
|
// Check if policy exists
|
||||||
RecordTemplate aspect = _entityService.getAspect(policyUrn, POLICY_INFO_ASPECT_NAME, 0);
|
RecordTemplate aspect = _entityService.getAspect(policyUrn, POLICY_INFO_ASPECT_NAME, 0);
|
||||||
|
@ -2766,7 +2766,6 @@
|
|||||||
"Relationship" : {
|
"Relationship" : {
|
||||||
"/*" : {
|
"/*" : {
|
||||||
"entityTypes" : [ "dataset" ],
|
"entityTypes" : [ "dataset" ],
|
||||||
"isLineage" : true,
|
|
||||||
"name" : "DerivedFrom"
|
"name" : "DerivedFrom"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -3190,14 +3189,7 @@
|
|||||||
"items" : "com.linkedin.common.MLFeatureUrn"
|
"items" : "com.linkedin.common.MLFeatureUrn"
|
||||||
},
|
},
|
||||||
"doc" : "List of features used for MLModel training",
|
"doc" : "List of features used for MLModel training",
|
||||||
"optional" : true,
|
"optional" : true
|
||||||
"Relationship" : {
|
|
||||||
"/*" : {
|
|
||||||
"entityTypes" : [ "mlFeature" ],
|
|
||||||
"isLineage" : true,
|
|
||||||
"name" : "Consumes"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, {
|
}, {
|
||||||
"name" : "tags",
|
"name" : "tags",
|
||||||
"type" : {
|
"type" : {
|
||||||
@ -3438,7 +3430,6 @@
|
|||||||
}, {
|
}, {
|
||||||
"name" : "aspect",
|
"name" : "aspect",
|
||||||
"type" : "GenericAspect",
|
"type" : "GenericAspect",
|
||||||
"doc" : "The value of the new aspect.",
|
|
||||||
"optional" : true
|
"optional" : true
|
||||||
}, {
|
}, {
|
||||||
"name" : "systemMetadata",
|
"name" : "systemMetadata",
|
||||||
|
@ -3359,14 +3359,7 @@
|
|||||||
"items" : "com.linkedin.common.MLFeatureUrn"
|
"items" : "com.linkedin.common.MLFeatureUrn"
|
||||||
},
|
},
|
||||||
"doc" : "List of features used for MLModel training",
|
"doc" : "List of features used for MLModel training",
|
||||||
"optional" : true,
|
"optional" : true
|
||||||
"Relationship" : {
|
|
||||||
"/*" : {
|
|
||||||
"entityTypes" : [ "mlFeature" ],
|
|
||||||
"isLineage" : true,
|
|
||||||
"name" : "Consumes"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, {
|
}, {
|
||||||
"name" : "tags",
|
"name" : "tags",
|
||||||
"type" : {
|
"type" : {
|
||||||
@ -3844,7 +3837,6 @@
|
|||||||
"Relationship" : {
|
"Relationship" : {
|
||||||
"/*" : {
|
"/*" : {
|
||||||
"entityTypes" : [ "dataset" ],
|
"entityTypes" : [ "dataset" ],
|
||||||
"isLineage" : true,
|
|
||||||
"name" : "DerivedFrom"
|
"name" : "DerivedFrom"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -3933,7 +3925,6 @@
|
|||||||
"Relationship" : {
|
"Relationship" : {
|
||||||
"/*" : {
|
"/*" : {
|
||||||
"entityTypes" : [ "dataset" ],
|
"entityTypes" : [ "dataset" ],
|
||||||
"isLineage" : true,
|
|
||||||
"name" : "DerivedFrom"
|
"name" : "DerivedFrom"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -4014,6 +4005,7 @@
|
|||||||
"Relationship" : {
|
"Relationship" : {
|
||||||
"/*" : {
|
"/*" : {
|
||||||
"entityTypes" : [ "mlFeature" ],
|
"entityTypes" : [ "mlFeature" ],
|
||||||
|
"isLineage" : true,
|
||||||
"name" : "Contains"
|
"name" : "Contains"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -4712,14 +4704,6 @@
|
|||||||
"type" : "boolean",
|
"type" : "boolean",
|
||||||
"doc" : "Whether the policy should be editable via the UI",
|
"doc" : "Whether the policy should be editable via the UI",
|
||||||
"default" : true
|
"default" : true
|
||||||
}, {
|
|
||||||
"name" : "lastUpdatedTimestamp",
|
|
||||||
"type" : "long",
|
|
||||||
"doc" : "Timestamp when the policy was last updated",
|
|
||||||
"optional" : true,
|
|
||||||
"Searchable" : {
|
|
||||||
"fieldType" : "DATETIME"
|
|
||||||
}
|
|
||||||
} ],
|
} ],
|
||||||
"Aspect" : {
|
"Aspect" : {
|
||||||
"name" : "dataHubPolicyInfo"
|
"name" : "dataHubPolicyInfo"
|
||||||
|
@ -2513,7 +2513,6 @@
|
|||||||
"Relationship" : {
|
"Relationship" : {
|
||||||
"/*" : {
|
"/*" : {
|
||||||
"entityTypes" : [ "dataset" ],
|
"entityTypes" : [ "dataset" ],
|
||||||
"isLineage" : true,
|
|
||||||
"name" : "DerivedFrom"
|
"name" : "DerivedFrom"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2937,14 +2936,7 @@
|
|||||||
"items" : "com.linkedin.common.MLFeatureUrn"
|
"items" : "com.linkedin.common.MLFeatureUrn"
|
||||||
},
|
},
|
||||||
"doc" : "List of features used for MLModel training",
|
"doc" : "List of features used for MLModel training",
|
||||||
"optional" : true,
|
"optional" : true
|
||||||
"Relationship" : {
|
|
||||||
"/*" : {
|
|
||||||
"entityTypes" : [ "mlFeature" ],
|
|
||||||
"isLineage" : true,
|
|
||||||
"name" : "Consumes"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, {
|
}, {
|
||||||
"name" : "tags",
|
"name" : "tags",
|
||||||
"type" : {
|
"type" : {
|
||||||
|
@ -3359,14 +3359,7 @@
|
|||||||
"items" : "com.linkedin.common.MLFeatureUrn"
|
"items" : "com.linkedin.common.MLFeatureUrn"
|
||||||
},
|
},
|
||||||
"doc" : "List of features used for MLModel training",
|
"doc" : "List of features used for MLModel training",
|
||||||
"optional" : true,
|
"optional" : true
|
||||||
"Relationship" : {
|
|
||||||
"/*" : {
|
|
||||||
"entityTypes" : [ "mlFeature" ],
|
|
||||||
"isLineage" : true,
|
|
||||||
"name" : "Consumes"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, {
|
}, {
|
||||||
"name" : "tags",
|
"name" : "tags",
|
||||||
"type" : {
|
"type" : {
|
||||||
@ -3844,7 +3837,6 @@
|
|||||||
"Relationship" : {
|
"Relationship" : {
|
||||||
"/*" : {
|
"/*" : {
|
||||||
"entityTypes" : [ "dataset" ],
|
"entityTypes" : [ "dataset" ],
|
||||||
"isLineage" : true,
|
|
||||||
"name" : "DerivedFrom"
|
"name" : "DerivedFrom"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -3933,7 +3925,6 @@
|
|||||||
"Relationship" : {
|
"Relationship" : {
|
||||||
"/*" : {
|
"/*" : {
|
||||||
"entityTypes" : [ "dataset" ],
|
"entityTypes" : [ "dataset" ],
|
||||||
"isLineage" : true,
|
|
||||||
"name" : "DerivedFrom"
|
"name" : "DerivedFrom"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -4014,6 +4005,7 @@
|
|||||||
"Relationship" : {
|
"Relationship" : {
|
||||||
"/*" : {
|
"/*" : {
|
||||||
"entityTypes" : [ "mlFeature" ],
|
"entityTypes" : [ "mlFeature" ],
|
||||||
|
"isLineage" : true,
|
||||||
"name" : "Contains"
|
"name" : "Contains"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -4712,14 +4704,6 @@
|
|||||||
"type" : "boolean",
|
"type" : "boolean",
|
||||||
"doc" : "Whether the policy should be editable via the UI",
|
"doc" : "Whether the policy should be editable via the UI",
|
||||||
"default" : true
|
"default" : true
|
||||||
}, {
|
|
||||||
"name" : "lastUpdatedTimestamp",
|
|
||||||
"type" : "long",
|
|
||||||
"doc" : "Timestamp when the policy was last updated",
|
|
||||||
"optional" : true,
|
|
||||||
"Searchable" : {
|
|
||||||
"fieldType" : "DATETIME"
|
|
||||||
}
|
|
||||||
} ],
|
} ],
|
||||||
"Aspect" : {
|
"Aspect" : {
|
||||||
"name" : "dataHubPolicyInfo"
|
"name" : "dataHubPolicyInfo"
|
||||||
@ -4903,7 +4887,7 @@
|
|||||||
}, {
|
}, {
|
||||||
"name" : "name",
|
"name" : "name",
|
||||||
"type" : "string",
|
"type" : "string",
|
||||||
"doc" : "The name of the event, e.g. the type of event. For example, 'notificationRequestEvent', 'entityChangeEvent'"
|
"doc" : "The name of the event, e.g. the type of event. For example, 'notificationRequestEvent'."
|
||||||
}, {
|
}, {
|
||||||
"name" : "payload",
|
"name" : "payload",
|
||||||
"type" : "GenericPayload",
|
"type" : "GenericPayload",
|
||||||
|
@ -146,15 +146,14 @@ public interface EntityClient {
|
|||||||
*
|
*
|
||||||
* @param input search query
|
* @param input search query
|
||||||
* @param filter search filters
|
* @param filter search filters
|
||||||
* @param sortCriterion sort criterion
|
|
||||||
* @param start start offset for search results
|
* @param start start offset for search results
|
||||||
* @param count max number of search results requested
|
* @param count max number of search results requested
|
||||||
* @return Snapshot key
|
* @return Snapshot key
|
||||||
* @throws RemoteInvocationException
|
* @throws RemoteInvocationException
|
||||||
*/
|
*/
|
||||||
@Nonnull
|
@Nonnull
|
||||||
public SearchResult search(@Nonnull String entity, @Nonnull String input, @Nullable Filter filter,
|
public SearchResult search(@Nonnull String entity, @Nonnull String input, @Nullable Filter filter, int start,
|
||||||
SortCriterion sortCriterion, int start, int count, @Nonnull Authentication authentication) throws RemoteInvocationException;
|
int count, @Nonnull Authentication authentication) throws RemoteInvocationException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Searches for entities matching to a given query and filters across multiple entity types
|
* Searches for entities matching to a given query and filters across multiple entity types
|
||||||
|
@ -248,7 +248,6 @@ public class JavaEntityClient implements EntityClient {
|
|||||||
*
|
*
|
||||||
* @param input search query
|
* @param input search query
|
||||||
* @param filter search filters
|
* @param filter search filters
|
||||||
* @param sortCriterion sort criterion
|
|
||||||
* @param start start offset for search results
|
* @param start start offset for search results
|
||||||
* @param count max number of search results requested
|
* @param count max number of search results requested
|
||||||
* @return Snapshot key
|
* @return Snapshot key
|
||||||
@ -259,12 +258,11 @@ public class JavaEntityClient implements EntityClient {
|
|||||||
@Nonnull String entity,
|
@Nonnull String entity,
|
||||||
@Nonnull String input,
|
@Nonnull String input,
|
||||||
@Nullable Filter filter,
|
@Nullable Filter filter,
|
||||||
@Nullable SortCriterion sortCriterion,
|
|
||||||
int start,
|
int start,
|
||||||
int count,
|
int count,
|
||||||
@Nonnull final Authentication authentication)
|
@Nonnull final Authentication authentication)
|
||||||
throws RemoteInvocationException {
|
throws RemoteInvocationException {
|
||||||
return _entitySearchService.search(entity, input, filter, sortCriterion, start, count);
|
return _entitySearchService.search(entity, input, filter, null, start, count);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -319,16 +319,14 @@ public class RestliEntityClient extends BaseClient implements EntityClient {
|
|||||||
*
|
*
|
||||||
* @param input search query
|
* @param input search query
|
||||||
* @param filter search filters
|
* @param filter search filters
|
||||||
* @param sortCriterion sort criterion
|
|
||||||
* @param start start offset for search results
|
* @param start start offset for search results
|
||||||
* @param count max number of search results requested
|
* @param count max number of search results requested
|
||||||
* @return Snapshot key
|
* @return Snapshot key
|
||||||
* @throws RemoteInvocationException
|
* @throws RemoteInvocationException
|
||||||
*/
|
*/
|
||||||
@Nonnull
|
@Nonnull
|
||||||
public SearchResult search(@Nonnull String entity, @Nonnull String input, @Nullable Filter filter,
|
public SearchResult search(@Nonnull String entity, @Nonnull String input, @Nullable Filter filter, int start,
|
||||||
SortCriterion sortCriterion, int start, int count, @Nonnull final Authentication authentication)
|
int count, @Nonnull final Authentication authentication) throws RemoteInvocationException {
|
||||||
throws RemoteInvocationException {
|
|
||||||
|
|
||||||
final EntitiesDoSearchRequestBuilder requestBuilder = ENTITIES_REQUEST_BUILDERS.actionSearch()
|
final EntitiesDoSearchRequestBuilder requestBuilder = ENTITIES_REQUEST_BUILDERS.actionSearch()
|
||||||
.entityParam(entity)
|
.entityParam(entity)
|
||||||
@ -340,10 +338,6 @@ public class RestliEntityClient extends BaseClient implements EntityClient {
|
|||||||
requestBuilder.filterParam(filter);
|
requestBuilder.filterParam(filter);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sortCriterion != null) {
|
|
||||||
requestBuilder.sortParam(sortCriterion);
|
|
||||||
}
|
|
||||||
|
|
||||||
return sendClientRequest(requestBuilder, authentication).getEntity();
|
return sendClientRequest(requestBuilder, authentication).getEntity();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -764,7 +764,7 @@ def test_frontend_create_policy(frontend_session):
|
|||||||
new_urn = res_data["data"]["createPolicy"]
|
new_urn = res_data["data"]["createPolicy"]
|
||||||
|
|
||||||
# Sleep for eventual consistency
|
# Sleep for eventual consistency
|
||||||
time.sleep(3)
|
time.sleep(1)
|
||||||
|
|
||||||
# Now verify the policy has been added.
|
# Now verify the policy has been added.
|
||||||
json = {
|
json = {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user