mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2026-01-08 05:26:19 +00:00
Link Lifecycle Policy to specific Location (#1720)
- Amend ingestion to support Location reference from Policy - Amend PolicyResource and PolicyRepository to accommodate Location reference
This commit is contained in:
parent
9fe0726576
commit
f1a8a7886e
@ -21,6 +21,7 @@ import java.util.UUID;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.jdbi.v3.sqlobject.transaction.Transaction;
|
||||
import org.openmetadata.catalog.Entity;
|
||||
import org.openmetadata.catalog.entity.data.Location;
|
||||
import org.openmetadata.catalog.entity.policies.Policy;
|
||||
import org.openmetadata.catalog.resources.policies.PolicyResource;
|
||||
import org.openmetadata.catalog.type.ChangeDescription;
|
||||
@ -34,9 +35,11 @@ import org.openmetadata.catalog.util.JsonUtils;
|
||||
@Slf4j
|
||||
public class PolicyRepository extends EntityRepository<Policy> {
|
||||
private static final Fields POLICY_UPDATE_FIELDS =
|
||||
new Fields(PolicyResource.FIELD_LIST, "displayName,description,owner,policyUrl,enabled,rules");
|
||||
new Fields(
|
||||
PolicyResource.FIELD_LIST, "displayName,description,owner,policyUrl,enabled,rules,location");
|
||||
private static final Fields POLICY_PATCH_FIELDS =
|
||||
new Fields(PolicyResource.FIELD_LIST, "displayName,description,owner,policyUrl,enabled,rules");
|
||||
new Fields(
|
||||
PolicyResource.FIELD_LIST, "displayName,description,owner,policyUrl,enabled,rules,location");
|
||||
private final CollectionDAO dao;
|
||||
|
||||
public PolicyRepository(CollectionDAO dao) {
|
||||
@ -57,7 +60,9 @@ public class PolicyRepository extends EntityRepository<Policy> {
|
||||
|
||||
@Transaction
|
||||
public void delete(UUID id) {
|
||||
if (dao.relationshipDAO().findToCount(id.toString(), Relationship.CONTAINS.ordinal(), Entity.POLICY) > 0) {
|
||||
if (dao.relationshipDAO()
|
||||
.findToCount(id.toString(), Relationship.CONTAINS.ordinal(), Entity.POLICY)
|
||||
> 0) {
|
||||
throw new IllegalArgumentException("Policy is not empty");
|
||||
}
|
||||
dao.policyDAO().delete(id);
|
||||
@ -69,6 +74,18 @@ public class PolicyRepository extends EntityRepository<Policy> {
|
||||
return EntityUtil.populateOwner(dao.userDAO(), dao.teamDAO(), policy.getOwner());
|
||||
}
|
||||
|
||||
/** Find the location to which this policy applies to. * */
|
||||
@Transaction
|
||||
private EntityReference getLocationForPolicy(UUID policyId) throws IOException {
|
||||
List<String> result =
|
||||
dao.relationshipDAO()
|
||||
.findTo(policyId.toString(), Relationship.APPLIED_TO.ordinal(), Entity.LOCATION);
|
||||
// There is at most one location for a policy.
|
||||
return result.size() == 1
|
||||
? dao.locationDAO().findEntityReferenceById(UUID.fromString(result.get(0)))
|
||||
: null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Policy setFields(Policy policy, Fields fields) throws IOException {
|
||||
policy.setDisplayName(fields.contains("displayName") ? policy.getDisplayName() : null);
|
||||
@ -77,6 +94,7 @@ public class PolicyRepository extends EntityRepository<Policy> {
|
||||
policy.setPolicyUrl(fields.contains("policyUrl") ? policy.getPolicyUrl() : null);
|
||||
policy.setEnabled(fields.contains("enabled") ? policy.getEnabled() : null);
|
||||
policy.setRules(fields.contains("rules") ? policy.getRules() : null);
|
||||
policy.setLocation(fields.contains("location") ? getLocationForPolicy(policy.getId()) : null);
|
||||
return policy;
|
||||
}
|
||||
|
||||
@ -88,22 +106,43 @@ public class PolicyRepository extends EntityRepository<Policy> {
|
||||
return new PolicyEntityInterface(entity);
|
||||
}
|
||||
|
||||
/** Generate EntityReference for a given Policy's Location. * */
|
||||
@Transaction
|
||||
private EntityReference getLocationReference(Policy policy) throws IOException {
|
||||
if (policy == null || policy.getLocation() == null || policy.getLocation().getId() == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Location location = dao.locationDAO().findEntityById(policy.getLocation().getId());
|
||||
if (location == null) {
|
||||
return null;
|
||||
}
|
||||
return new EntityReference()
|
||||
.withDescription(location.getDescription())
|
||||
.withDisplayName(location.getDisplayName())
|
||||
.withId(location.getId())
|
||||
.withHref(location.getHref())
|
||||
.withName(location.getName())
|
||||
.withType(Entity.LOCATION);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prepare(Policy policy) throws IOException {
|
||||
policy.setFullyQualifiedName(getFQN(policy));
|
||||
|
||||
policy.setLocation(getLocationReference(policy));
|
||||
// Check if owner is valid and set the relationship
|
||||
EntityUtil.populateOwner(dao.userDAO(), dao.teamDAO(), policy.getOwner());
|
||||
policy.setOwner(EntityUtil.populateOwner(dao.userDAO(), dao.teamDAO(), policy.getOwner()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeEntity(Policy policy, boolean update) throws IOException {
|
||||
// Relationships and fields such as href are derived and not stored as part of json
|
||||
EntityReference owner = policy.getOwner();
|
||||
EntityReference location = policy.getLocation();
|
||||
URI href = policy.getHref();
|
||||
|
||||
// Don't store owner and href as JSON. Build it on the fly based on relationships
|
||||
policy.withOwner(null).withHref(null);
|
||||
// Don't store owner, location and href as JSON. Build it on the fly based on relationships
|
||||
policy.withOwner(null).withLocation(null).withHref(null);
|
||||
|
||||
if (update) {
|
||||
dao.policyDAO().update(policy.getId(), JsonUtils.pojoToJson(policy));
|
||||
@ -112,13 +151,15 @@ public class PolicyRepository extends EntityRepository<Policy> {
|
||||
}
|
||||
|
||||
// Restore the relationships
|
||||
policy.withOwner(owner).withHref(href);
|
||||
policy.withOwner(owner).withLocation(location).withHref(href);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeRelationships(Policy policy) {
|
||||
// Add policy owner relationship
|
||||
// Add policy owner relationship.
|
||||
setOwner(policy, policy.getOwner());
|
||||
// Add location to which policy is assigned to.
|
||||
setLocation(policy, policy.getLocation());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -129,14 +170,28 @@ public class PolicyRepository extends EntityRepository<Policy> {
|
||||
private EntityReference getOwner(Policy policy) throws IOException {
|
||||
return policy == null
|
||||
? null
|
||||
: EntityUtil.populateOwner(policy.getId(), dao.relationshipDAO(), dao.userDAO(), dao.teamDAO());
|
||||
: EntityUtil.populateOwner(
|
||||
policy.getId(), dao.relationshipDAO(), dao.userDAO(), dao.teamDAO());
|
||||
}
|
||||
|
||||
public void setOwner(Policy policy, EntityReference owner) {
|
||||
private void setOwner(Policy policy, EntityReference owner) {
|
||||
EntityUtil.setOwner(dao.relationshipDAO(), policy.getId(), Entity.POLICY, owner);
|
||||
policy.setOwner(owner);
|
||||
}
|
||||
|
||||
private void setLocation(Policy policy, EntityReference location) {
|
||||
if (location == null || location.getId() == null) {
|
||||
return;
|
||||
}
|
||||
dao.relationshipDAO()
|
||||
.insert(
|
||||
policy.getId().toString(),
|
||||
policy.getLocation().getId().toString(),
|
||||
Entity.POLICY,
|
||||
Entity.LOCATION,
|
||||
Relationship.APPLIED_TO.ordinal());
|
||||
}
|
||||
|
||||
public static class PolicyEntityInterface implements EntityInterface<Policy> {
|
||||
private final Policy entity;
|
||||
|
||||
@ -280,9 +335,33 @@ public class PolicyRepository extends EntityRepository<Policy> {
|
||||
|
||||
@Override
|
||||
public void entitySpecificUpdate() throws IOException {
|
||||
recordChange("policyUrl", original.getEntity().getPolicyUrl(), updated.getEntity().getPolicyUrl());
|
||||
recordChange(
|
||||
"policyUrl", original.getEntity().getPolicyUrl(), updated.getEntity().getPolicyUrl());
|
||||
recordChange("enabled", original.getEntity().getEnabled(), updated.getEntity().getEnabled());
|
||||
recordChange("rules", original.getEntity().getRules(), updated.getEntity().getRules());
|
||||
updateLocation(original.getEntity(), updated.getEntity());
|
||||
}
|
||||
|
||||
private void updateLocation(Policy origPolicy, Policy updatedPolicy) throws IOException {
|
||||
// remove original Policy --> Location relationship if exists.
|
||||
if (origPolicy.getLocation() != null && origPolicy.getLocation().getId() != null) {
|
||||
dao.relationshipDAO()
|
||||
.delete(
|
||||
origPolicy.getId().toString(),
|
||||
origPolicy.getLocation().getId().toString(),
|
||||
Relationship.APPLIED_TO.ordinal());
|
||||
}
|
||||
// insert updated Policy --> Location relationship.
|
||||
if (updatedPolicy.getLocation() != null && updatedPolicy.getLocation().getId() != null) {
|
||||
dao.relationshipDAO()
|
||||
.insert(
|
||||
updatedPolicy.getId().toString(),
|
||||
updatedPolicy.getLocation().getId().toString(),
|
||||
Entity.POLICY,
|
||||
Entity.LOCATION,
|
||||
Relationship.APPLIED_TO.ordinal());
|
||||
}
|
||||
recordChange("location", origPolicy.getLocation(), updatedPolicy.getLocation());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -14,15 +14,15 @@
|
||||
package org.openmetadata.catalog.jdbi3;
|
||||
|
||||
/**
|
||||
* This enum captures all the relationships between Catalog entities Note that the relationship from is a Strong entity
|
||||
* and to is Weak entity when possible.
|
||||
* This enum captures all the relationships between Catalog entities Note that the relationship from
|
||||
* is a Strong entity and to is Weak entity when possible.
|
||||
*/
|
||||
public enum Relationship {
|
||||
|
||||
/**
|
||||
* Rules for changing enums since the ordinal position is stored in the database. - Don't remove an enum, since the
|
||||
* database might have stored the enum ordinal number - When adding a new enum, add it as the last enum to preserve
|
||||
* the ordinal positions of the existing enums
|
||||
* Rules for changing enums since the ordinal position is stored in the database. - Don't remove
|
||||
* an enum, since the database might have stored the enum ordinal number - When adding a new enum,
|
||||
* add it as the last enum to preserve the ordinal positions of the existing enums
|
||||
*/
|
||||
// Database --- contains --> Table
|
||||
// Organization --- contains --> Team
|
||||
@ -74,7 +74,12 @@ public enum Relationship {
|
||||
// {Table1} --- upstream ---> {Table2} (Table1 is used for creating Table2}
|
||||
// {Pipeline} --- upstream ---> {Table2} (Pipeline creates Table2)
|
||||
// {Table} --- upstream ---> {Dashboard} (Table was used to create Dashboard)
|
||||
UPSTREAM("upstream"); // 13
|
||||
UPSTREAM("upstream"), // 13
|
||||
|
||||
// Policy relationship
|
||||
// {Policy1} -- appliedTo --> {Location1} (Policy1 is applied to Location1)
|
||||
APPLIED_TO("appliedTo"); // 14
|
||||
|
||||
/** * Add new enums to the end of the list * */
|
||||
private final String value;
|
||||
|
||||
|
||||
@ -63,6 +63,7 @@ import org.openmetadata.catalog.resources.Collection;
|
||||
import org.openmetadata.catalog.security.CatalogAuthorizer;
|
||||
import org.openmetadata.catalog.security.SecurityUtil;
|
||||
import org.openmetadata.catalog.type.EntityHistory;
|
||||
import org.openmetadata.catalog.type.EntityReference;
|
||||
import org.openmetadata.catalog.util.EntityUtil.Fields;
|
||||
import org.openmetadata.catalog.util.RestUtil;
|
||||
import org.openmetadata.catalog.util.RestUtil.PatchResponse;
|
||||
@ -80,7 +81,9 @@ public class PolicyResource {
|
||||
private final CatalogAuthorizer authorizer;
|
||||
|
||||
public static ResultList<Policy> addHref(UriInfo uriInfo, ResultList<Policy> policies) {
|
||||
Optional.ofNullable(policies.getData()).orElse(Collections.emptyList()).forEach(i -> addHref(uriInfo, i));
|
||||
Optional.ofNullable(policies.getData())
|
||||
.orElse(Collections.emptyList())
|
||||
.forEach(i -> addHref(uriInfo, i));
|
||||
return policies;
|
||||
}
|
||||
|
||||
@ -108,8 +111,9 @@ public class PolicyResource {
|
||||
}
|
||||
}
|
||||
|
||||
static final String FIELDS = "displayName,description,owner,policyUrl,enabled,rules";
|
||||
public static final List<String> FIELD_LIST = Arrays.asList(FIELDS.replaceAll(" ", "").split(","));
|
||||
static final String FIELDS = "displayName,description,owner,policyUrl,enabled,rules,location";
|
||||
public static final List<String> FIELD_LIST =
|
||||
Arrays.asList(FIELDS.replaceAll(" ", "").split(","));
|
||||
|
||||
@GET
|
||||
@Valid
|
||||
@ -124,7 +128,10 @@ public class PolicyResource {
|
||||
@ApiResponse(
|
||||
responseCode = "200",
|
||||
description = "List of policies",
|
||||
content = @Content(mediaType = "application/json", schema = @Schema(implementation = PolicyList.class)))
|
||||
content =
|
||||
@Content(
|
||||
mediaType = "application/json",
|
||||
schema = @Schema(implementation = PolicyList.class)))
|
||||
})
|
||||
public ResultList<Policy> list(
|
||||
@Context UriInfo uriInfo,
|
||||
@ -134,16 +141,21 @@ public class PolicyResource {
|
||||
schema = @Schema(type = "string", example = FIELDS))
|
||||
@QueryParam("fields")
|
||||
String fieldsParam,
|
||||
@Parameter(description = "Limit the number policies returned. (1 to 1000000, " + "default = 10)")
|
||||
@Parameter(
|
||||
description = "Limit the number policies returned. (1 to 1000000, " + "default = 10)")
|
||||
@DefaultValue("10")
|
||||
@Min(1)
|
||||
@Max(1000000)
|
||||
@QueryParam("limit")
|
||||
int limitParam,
|
||||
@Parameter(description = "Returns list of policies before this cursor", schema = @Schema(type = "string"))
|
||||
@Parameter(
|
||||
description = "Returns list of policies before this cursor",
|
||||
schema = @Schema(type = "string"))
|
||||
@QueryParam("before")
|
||||
String before,
|
||||
@Parameter(description = "Returns list of policies after this cursor", schema = @Schema(type = "string"))
|
||||
@Parameter(
|
||||
description = "Returns list of policies after this cursor",
|
||||
schema = @Schema(type = "string"))
|
||||
@QueryParam("after")
|
||||
String after)
|
||||
throws IOException, GeneralSecurityException, ParseException {
|
||||
@ -152,7 +164,8 @@ public class PolicyResource {
|
||||
|
||||
ResultList<Policy> policies;
|
||||
if (before != null) { // Reverse paging
|
||||
policies = dao.listBefore(uriInfo, fields, null, limitParam, before); // Ask for one extra entry
|
||||
policies =
|
||||
dao.listBefore(uriInfo, fields, null, limitParam, before); // Ask for one extra entry
|
||||
} else { // Forward paging or first page
|
||||
policies = dao.listAfter(uriInfo, fields, null, limitParam, after);
|
||||
}
|
||||
@ -169,7 +182,10 @@ public class PolicyResource {
|
||||
@ApiResponse(
|
||||
responseCode = "200",
|
||||
description = "The policy",
|
||||
content = @Content(mediaType = "application/json", schema = @Schema(implementation = Policy.class))),
|
||||
content =
|
||||
@Content(
|
||||
mediaType = "application/json",
|
||||
schema = @Schema(implementation = Policy.class))),
|
||||
@ApiResponse(responseCode = "404", description = "Policy for instance {id} is not found")
|
||||
})
|
||||
public Policy get(
|
||||
@ -196,7 +212,10 @@ public class PolicyResource {
|
||||
@ApiResponse(
|
||||
responseCode = "200",
|
||||
description = "The policy",
|
||||
content = @Content(mediaType = "application/json", schema = @Schema(implementation = Policy.class))),
|
||||
content =
|
||||
@Content(
|
||||
mediaType = "application/json",
|
||||
schema = @Schema(implementation = Policy.class))),
|
||||
@ApiResponse(responseCode = "404", description = "Policy for instance {id} is not found")
|
||||
})
|
||||
public Policy getByName(
|
||||
@ -224,12 +243,16 @@ public class PolicyResource {
|
||||
@ApiResponse(
|
||||
responseCode = "200",
|
||||
description = "List of policy versions",
|
||||
content = @Content(mediaType = "application/json", schema = @Schema(implementation = EntityHistory.class)))
|
||||
content =
|
||||
@Content(
|
||||
mediaType = "application/json",
|
||||
schema = @Schema(implementation = EntityHistory.class)))
|
||||
})
|
||||
public EntityHistory listVersions(
|
||||
@Context UriInfo uriInfo,
|
||||
@Context SecurityContext securityContext,
|
||||
@Parameter(description = "policy Id", schema = @Schema(type = "string")) @PathParam("id") String id)
|
||||
@Parameter(description = "policy Id", schema = @Schema(type = "string")) @PathParam("id")
|
||||
String id)
|
||||
throws IOException, ParseException {
|
||||
return dao.listVersions(id);
|
||||
}
|
||||
@ -244,7 +267,10 @@ public class PolicyResource {
|
||||
@ApiResponse(
|
||||
responseCode = "200",
|
||||
description = "policy",
|
||||
content = @Content(mediaType = "application/json", schema = @Schema(implementation = Policy.class))),
|
||||
content =
|
||||
@Content(
|
||||
mediaType = "application/json",
|
||||
schema = @Schema(implementation = Policy.class))),
|
||||
@ApiResponse(
|
||||
responseCode = "404",
|
||||
description = "Policy for instance {id} and version {version} is" + " " + "not found")
|
||||
@ -252,7 +278,8 @@ public class PolicyResource {
|
||||
public Policy getVersion(
|
||||
@Context UriInfo uriInfo,
|
||||
@Context SecurityContext securityContext,
|
||||
@Parameter(description = "policy Id", schema = @Schema(type = "string")) @PathParam("id") String id,
|
||||
@Parameter(description = "policy Id", schema = @Schema(type = "string")) @PathParam("id")
|
||||
String id,
|
||||
@Parameter(
|
||||
description = "policy version number in the form `major`.`minor`",
|
||||
schema = @Schema(type = "string", example = "0.1 or 1.1"))
|
||||
@ -271,25 +298,19 @@ public class PolicyResource {
|
||||
@ApiResponse(
|
||||
responseCode = "200",
|
||||
description = "The policy",
|
||||
content = @Content(mediaType = "application/json", schema = @Schema(implementation = CreatePolicy.class))),
|
||||
content =
|
||||
@Content(
|
||||
mediaType = "application/json",
|
||||
schema = @Schema(implementation = CreatePolicy.class))),
|
||||
@ApiResponse(responseCode = "400", description = "Bad request")
|
||||
})
|
||||
public Response create(@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreatePolicy create)
|
||||
public Response create(
|
||||
@Context UriInfo uriInfo,
|
||||
@Context SecurityContext securityContext,
|
||||
@Valid CreatePolicy create)
|
||||
throws IOException {
|
||||
SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
|
||||
Policy policy =
|
||||
new Policy()
|
||||
.withId(UUID.randomUUID())
|
||||
.withName(create.getName())
|
||||
.withDisplayName(create.getDisplayName())
|
||||
.withDescription(create.getDescription())
|
||||
.withOwner(create.getOwner())
|
||||
.withPolicyUrl(create.getPolicyUrl())
|
||||
.withPolicyType(create.getPolicyType())
|
||||
.withUpdatedBy(securityContext.getUserPrincipal().getName())
|
||||
.withUpdatedAt(new Date())
|
||||
.withRules(create.getRules());
|
||||
|
||||
Policy policy = getPolicy(securityContext, create);
|
||||
policy = addHref(uriInfo, dao.create(uriInfo, policy));
|
||||
return Response.created(policy.getHref()).entity(policy).build();
|
||||
}
|
||||
@ -300,9 +321,12 @@ public class PolicyResource {
|
||||
summary = "Update a policy",
|
||||
tags = "policies",
|
||||
description = "Update an existing policy using JsonPatch.",
|
||||
externalDocs = @ExternalDocumentation(description = "JsonPatch RFC", url = "https://tools.ietf.org/html/rfc6902"))
|
||||
externalDocs =
|
||||
@ExternalDocumentation(
|
||||
description = "JsonPatch RFC",
|
||||
url = "https://tools.ietf.org/html/rfc6902"))
|
||||
@Consumes(MediaType.APPLICATION_JSON_PATCH_JSON)
|
||||
public Response updateDescription(
|
||||
public Response patch(
|
||||
@Context UriInfo uriInfo,
|
||||
@Context SecurityContext securityContext,
|
||||
@PathParam("id") String id,
|
||||
@ -312,16 +336,19 @@ public class PolicyResource {
|
||||
@Content(
|
||||
mediaType = MediaType.APPLICATION_JSON_PATCH_JSON,
|
||||
examples = {
|
||||
@ExampleObject("[" + "{op:remove, path:/a}," + "{op:add, path: /b, value: val}" + "]")
|
||||
@ExampleObject(
|
||||
"[" + "{op:remove, path:/a}," + "{op:add, path: /b, value: val}" + "]")
|
||||
}))
|
||||
JsonPatch patch)
|
||||
throws IOException, ParseException {
|
||||
Fields fields = new Fields(FIELD_LIST, FIELDS);
|
||||
Policy policy = dao.get(uriInfo, id, fields);
|
||||
SecurityUtil.checkAdminRoleOrPermissions(authorizer, securityContext, dao.getOwnerReference(policy));
|
||||
SecurityUtil.checkAdminRoleOrPermissions(
|
||||
authorizer, securityContext, dao.getOwnerReference(policy));
|
||||
|
||||
PatchResponse<Policy> response =
|
||||
dao.patch(uriInfo, UUID.fromString(id), securityContext.getUserPrincipal().getName(), patch);
|
||||
dao.patch(
|
||||
uriInfo, UUID.fromString(id), securityContext.getUserPrincipal().getName(), patch);
|
||||
addHref(uriInfo, response.getEntity());
|
||||
return response.toResponse();
|
||||
}
|
||||
@ -335,25 +362,18 @@ public class PolicyResource {
|
||||
@ApiResponse(
|
||||
responseCode = "200",
|
||||
description = "The policy",
|
||||
content = @Content(mediaType = "application/json", schema = @Schema(implementation = Policy.class))),
|
||||
content =
|
||||
@Content(
|
||||
mediaType = "application/json",
|
||||
schema = @Schema(implementation = Policy.class))),
|
||||
@ApiResponse(responseCode = "400", description = "Bad request")
|
||||
})
|
||||
public Response createOrUpdate(
|
||||
@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreatePolicy create)
|
||||
@Context UriInfo uriInfo,
|
||||
@Context SecurityContext securityContext,
|
||||
@Valid CreatePolicy create)
|
||||
throws IOException, ParseException {
|
||||
Policy policy =
|
||||
new Policy()
|
||||
.withId(UUID.randomUUID())
|
||||
.withName(create.getName())
|
||||
.withDisplayName(create.getDisplayName())
|
||||
.withDescription(create.getDescription())
|
||||
.withOwner(create.getOwner())
|
||||
.withPolicyUrl(create.getPolicyUrl())
|
||||
.withPolicyType(create.getPolicyType())
|
||||
.withUpdatedBy(securityContext.getUserPrincipal().getName())
|
||||
.withUpdatedAt(new Date())
|
||||
.withRules(create.getRules());
|
||||
|
||||
Policy policy = getPolicy(securityContext, create);
|
||||
PutResponse<Policy> response = dao.createOrUpdate(uriInfo, policy);
|
||||
addHref(uriInfo, response.getEntity());
|
||||
return response.toResponse();
|
||||
@ -373,4 +393,23 @@ public class PolicyResource {
|
||||
dao.delete(UUID.fromString(id));
|
||||
return Response.ok().build();
|
||||
}
|
||||
|
||||
private Policy getPolicy(SecurityContext securityContext, CreatePolicy create) {
|
||||
Policy policy =
|
||||
new Policy()
|
||||
.withId(UUID.randomUUID())
|
||||
.withName(create.getName())
|
||||
.withDisplayName(create.getDisplayName())
|
||||
.withDescription(create.getDescription())
|
||||
.withOwner(create.getOwner())
|
||||
.withPolicyUrl(create.getPolicyUrl())
|
||||
.withPolicyType(create.getPolicyType())
|
||||
.withUpdatedBy(securityContext.getUserPrincipal().getName())
|
||||
.withUpdatedAt(new Date())
|
||||
.withRules(create.getRules());
|
||||
if (create.getLocation() != null) {
|
||||
policy = policy.withLocation(new EntityReference().withId(create.getLocation()));
|
||||
}
|
||||
return policy;
|
||||
}
|
||||
}
|
||||
|
||||
@ -31,6 +31,11 @@
|
||||
},
|
||||
"rules": {
|
||||
"$ref": "../../entity/policies/policy.json#/definitions/rules"
|
||||
},
|
||||
"location" : {
|
||||
"description": "UUID of Location where this policy is applied",
|
||||
"$ref": "../../type/basic.json#/definitions/uuid",
|
||||
"default" : null
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
|
||||
@ -107,6 +107,10 @@
|
||||
},
|
||||
"rules": {
|
||||
"$ref": "#/definitions/rules"
|
||||
},
|
||||
"location": {
|
||||
"$ref": "../../type/entityReference.json",
|
||||
"default": null
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
|
||||
@ -22,22 +22,25 @@ import org.openmetadata.catalog.type.TagLabel.LabelType;
|
||||
import org.openmetadata.catalog.type.TagLabel.State;
|
||||
|
||||
/**
|
||||
* Enum ordinal number is stored in the database. New enums must be added at the end to ensure backward compatibility
|
||||
* Enum ordinal number is stored in the database. New enums must be added at the end to ensure
|
||||
* backward compatibility
|
||||
*/
|
||||
public class EnumBackwardCompatibilityTest {
|
||||
/**
|
||||
* Any time a new enum is added, this test will fail. Update the test with total number of enums and test the ordinal
|
||||
* number of the last enum. This will help catch new enum inadvertently being added in the middle.
|
||||
* Any time a new enum is added, this test will fail. Update the test with total number of enums
|
||||
* and test the ordinal number of the last enum. This will help catch new enum inadvertently being
|
||||
* added in the middle.
|
||||
*/
|
||||
@Test
|
||||
public void testRelationshipEnumBackwardCompatible() {
|
||||
assertEquals(14, Relationship.values().length);
|
||||
assertEquals(13, Relationship.UPSTREAM.ordinal());
|
||||
assertEquals(15, Relationship.values().length);
|
||||
assertEquals(14, Relationship.APPLIED_TO.ordinal());
|
||||
}
|
||||
|
||||
/**
|
||||
* Any time a new enum is added, this test will fail. Update the test with total number of enums and test the ordinal
|
||||
* number of the last enum. This will help catch new enum inadvertently being added in the middle.
|
||||
* Any time a new enum is added, this test will fail. Update the test with total number of enums
|
||||
* and test the ordinal number of the last enum. This will help catch new enum inadvertently being
|
||||
* added in the middle.
|
||||
*/
|
||||
@Test
|
||||
public void testTagLabelEnumBackwardCompatible() {
|
||||
@ -46,8 +49,9 @@ public class EnumBackwardCompatibilityTest {
|
||||
}
|
||||
|
||||
/**
|
||||
* Any time a new enum is added, this test will fail. Update the test with total number of enums and test the ordinal
|
||||
* number of the last enum. This will help catch new enum inadvertently being added in the middle.
|
||||
* Any time a new enum is added, this test will fail. Update the test with total number of enums
|
||||
* and test the ordinal number of the last enum. This will help catch new enum inadvertently being
|
||||
* added in the middle.
|
||||
*/
|
||||
@Test
|
||||
public void testTagStateEnumBackwardCompatible() {
|
||||
|
||||
@ -55,7 +55,15 @@ import org.openmetadata.catalog.util.TestUtils;
|
||||
public class PolicyResourceTest extends EntityResourceTest<Policy> {
|
||||
|
||||
public PolicyResourceTest() {
|
||||
super(Entity.POLICY, Policy.class, PolicyList.class, "policies", PolicyResource.FIELDS, false, true, false);
|
||||
super(
|
||||
Entity.POLICY,
|
||||
Policy.class,
|
||||
PolicyList.class,
|
||||
"policies",
|
||||
PolicyResource.FIELDS,
|
||||
false,
|
||||
true,
|
||||
false);
|
||||
}
|
||||
|
||||
@BeforeAll
|
||||
@ -64,12 +72,14 @@ public class PolicyResourceTest extends EntityResourceTest<Policy> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object createRequest(String name, String description, String displayName, EntityReference owner) {
|
||||
public Object createRequest(
|
||||
String name, String description, String displayName, EntityReference owner) {
|
||||
return create(name).withDescription(description).withDisplayName(displayName).withOwner(owner);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void validateCreatedEntity(Policy policy, Object request, Map<String, String> authHeaders) {
|
||||
public void validateCreatedEntity(
|
||||
Policy policy, Object request, Map<String, String> authHeaders) {
|
||||
CreatePolicy createRequest = (CreatePolicy) request;
|
||||
validateCommonEntityFields(
|
||||
getEntityInterface(policy),
|
||||
@ -80,7 +90,8 @@ public class PolicyResourceTest extends EntityResourceTest<Policy> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void validateUpdatedEntity(Policy updatedEntity, Object request, Map<String, String> authHeaders) {
|
||||
public void validateUpdatedEntity(
|
||||
Policy updatedEntity, Object request, Map<String, String> authHeaders) {
|
||||
validateCreatedEntity(updatedEntity, request, authHeaders);
|
||||
}
|
||||
|
||||
@ -93,7 +104,8 @@ public class PolicyResourceTest extends EntityResourceTest<Policy> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assertFieldChange(String fieldName, Object expected, Object actual) throws IOException {
|
||||
public void assertFieldChange(String fieldName, Object expected, Object actual)
|
||||
throws IOException {
|
||||
if (expected == actual) {
|
||||
return;
|
||||
}
|
||||
@ -141,7 +153,9 @@ public class PolicyResourceTest extends EntityResourceTest<Policy> {
|
||||
public void post_Policy_as_non_admin_401(TestInfo test) {
|
||||
CreatePolicy create = create(test);
|
||||
HttpResponseException exception =
|
||||
assertThrows(HttpResponseException.class, () -> createPolicy(create, authHeaders("test@open-metadata.org")));
|
||||
assertThrows(
|
||||
HttpResponseException.class,
|
||||
() -> createPolicy(create, authHeaders("test@open-metadata.org")));
|
||||
assertResponse(exception, FORBIDDEN, "Principal: CatalogPrincipal{name='test'} is not admin");
|
||||
}
|
||||
|
||||
@ -149,22 +163,33 @@ public class PolicyResourceTest extends EntityResourceTest<Policy> {
|
||||
public void get_PolicyListWithInvalidLimitOffset_4xx() {
|
||||
// Limit must be >= 1 and <= 1000,000
|
||||
HttpResponseException exception =
|
||||
assertThrows(HttpResponseException.class, () -> listPolicies(null, -1, null, null, adminAuthHeaders()));
|
||||
assertResponse(exception, BAD_REQUEST, "[query param limit must be greater than or equal to 1]");
|
||||
|
||||
exception = assertThrows(HttpResponseException.class, () -> listPolicies(null, 0, null, null, adminAuthHeaders()));
|
||||
assertResponse(exception, BAD_REQUEST, "[query param limit must be greater than or equal to 1]");
|
||||
assertThrows(
|
||||
HttpResponseException.class,
|
||||
() -> listPolicies(null, -1, null, null, adminAuthHeaders()));
|
||||
assertResponse(
|
||||
exception, BAD_REQUEST, "[query param limit must be greater than or equal to 1]");
|
||||
|
||||
exception =
|
||||
assertThrows(HttpResponseException.class, () -> listPolicies(null, 1000001, null, null, adminAuthHeaders()));
|
||||
assertResponse(exception, BAD_REQUEST, "[query param limit must be less than or equal to 1000000]");
|
||||
assertThrows(
|
||||
HttpResponseException.class,
|
||||
() -> listPolicies(null, 0, null, null, adminAuthHeaders()));
|
||||
assertResponse(
|
||||
exception, BAD_REQUEST, "[query param limit must be greater than or equal to 1]");
|
||||
|
||||
exception =
|
||||
assertThrows(
|
||||
HttpResponseException.class,
|
||||
() -> listPolicies(null, 1000001, null, null, adminAuthHeaders()));
|
||||
assertResponse(
|
||||
exception, BAD_REQUEST, "[query param limit must be less than or equal to 1000000]");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void get_PolicyListWithInvalidPaginationCursors_4xx() {
|
||||
// Passing both before and after cursors is invalid
|
||||
HttpResponseException exception =
|
||||
assertThrows(HttpResponseException.class, () -> listPolicies(null, 1, "", "", adminAuthHeaders()));
|
||||
assertThrows(
|
||||
HttpResponseException.class, () -> listPolicies(null, 1, "", "", adminAuthHeaders()));
|
||||
assertResponse(exception, BAD_REQUEST, "Only one of before or after query parameter allowed");
|
||||
}
|
||||
|
||||
@ -197,12 +222,14 @@ public class PolicyResourceTest extends EntityResourceTest<Policy> {
|
||||
before = forwardPage.getPaging().getBefore();
|
||||
assertEntityPagination(allPolicies.getData(), forwardPage, limit, indexInAllPolicies);
|
||||
|
||||
if (pageCount == 0) { // CASE 0 - First page is being returned. Therefore, before cursor is null
|
||||
if (pageCount
|
||||
== 0) { // CASE 0 - First page is being returned. Therefore, before cursor is null
|
||||
assertNull(before);
|
||||
} else {
|
||||
// Make sure scrolling back based on before cursor returns the correct result
|
||||
backwardPage = listPolicies(null, limit, before, null, adminAuthHeaders());
|
||||
assertEntityPagination(allPolicies.getData(), backwardPage, limit, (indexInAllPolicies - limit));
|
||||
assertEntityPagination(
|
||||
allPolicies.getData(), backwardPage, limit, (indexInAllPolicies - limit));
|
||||
}
|
||||
|
||||
indexInAllPolicies += forwardPage.getData().size();
|
||||
@ -246,7 +273,9 @@ public class PolicyResourceTest extends EntityResourceTest<Policy> {
|
||||
policy.setEnabled(false);
|
||||
ChangeDescription change = getChangeDescription(policy.getVersion());
|
||||
change.getFieldsAdded().add(new FieldChange().withName("policyUrl").withNewValue(uri));
|
||||
change.getFieldsUpdated().add(new FieldChange().withName("enabled").withOldValue(true).withNewValue(false));
|
||||
change
|
||||
.getFieldsUpdated()
|
||||
.add(new FieldChange().withName("enabled").withOldValue(true).withNewValue(false));
|
||||
policy = patchEntityAndCheck(policy, origJson, adminAuthHeaders(), MINOR_UPDATE, change);
|
||||
|
||||
// Remove policyUrl
|
||||
@ -268,13 +297,18 @@ public class PolicyResourceTest extends EntityResourceTest<Policy> {
|
||||
// TODO
|
||||
}
|
||||
|
||||
public static Policy createPolicy(CreatePolicy create, Map<String, String> authHeaders) throws HttpResponseException {
|
||||
public static Policy createPolicy(CreatePolicy create, Map<String, String> authHeaders)
|
||||
throws HttpResponseException {
|
||||
return TestUtils.post(getResource("policies"), create, Policy.class, authHeaders);
|
||||
}
|
||||
|
||||
/** Validate returned fields GET .../policies/{id}?fields="..." or GET .../policies/name/{fqn}?fields="..." */
|
||||
/**
|
||||
* Validate returned fields GET .../policies/{id}?fields="..." or GET
|
||||
* .../policies/name/{fqn}?fields="..."
|
||||
*/
|
||||
@Override
|
||||
public void validateGetWithDifferentFields(Policy policy, boolean byName) throws HttpResponseException {
|
||||
public void validateGetWithDifferentFields(Policy policy, boolean byName)
|
||||
throws HttpResponseException {
|
||||
// .../policies?fields=owner
|
||||
String fields = "owner";
|
||||
policy =
|
||||
@ -300,7 +334,8 @@ public class PolicyResourceTest extends EntityResourceTest<Policy> {
|
||||
assertNotNull(policy.getOwner());
|
||||
}
|
||||
|
||||
public static Policy getPolicy(UUID id, String fields, Map<String, String> authHeaders) throws HttpResponseException {
|
||||
public static Policy getPolicy(UUID id, String fields, Map<String, String> authHeaders)
|
||||
throws HttpResponseException {
|
||||
WebTarget target = getResource("policies/" + id);
|
||||
target = fields != null ? target.queryParam("fields", fields) : target;
|
||||
return TestUtils.get(target, Policy.class, authHeaders);
|
||||
@ -314,7 +349,11 @@ public class PolicyResourceTest extends EntityResourceTest<Policy> {
|
||||
}
|
||||
|
||||
public static PolicyList listPolicies(
|
||||
String fields, Integer limitParam, String before, String after, Map<String, String> authHeaders)
|
||||
String fields,
|
||||
Integer limitParam,
|
||||
String before,
|
||||
String after,
|
||||
Map<String, String> authHeaders)
|
||||
throws HttpResponseException {
|
||||
WebTarget target = getResource("policies");
|
||||
target = fields != null ? target.queryParam("fields", fields) : target;
|
||||
@ -336,7 +375,7 @@ public class PolicyResourceTest extends EntityResourceTest<Policy> {
|
||||
return new CreatePolicy()
|
||||
.withName(name)
|
||||
.withDescription("description")
|
||||
.withPolicyType(PolicyType.AccessControl)
|
||||
.withPolicyType(PolicyType.Lifecycle)
|
||||
.withOwner(USER_OWNER1);
|
||||
}
|
||||
}
|
||||
|
||||
@ -34,8 +34,8 @@ import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
/**
|
||||
* This test provides examples of how to use: - JSON schema to validate the JSON payload - Generate JSON schema from
|
||||
* POJO
|
||||
* This test provides examples of how to use: - JSON schema to validate the JSON payload - Generate
|
||||
* JSON schema from POJO
|
||||
*/
|
||||
public class JsonSchemaTest {
|
||||
private final UUID TEST_UUID = UUID.randomUUID();
|
||||
@ -47,8 +47,8 @@ public class JsonSchemaTest {
|
||||
urnFactory = JsonSchemaUtil.getUrnFactory();
|
||||
}
|
||||
|
||||
private java.util.Set<com.networknt.schema.ValidationMessage> validate(InputStream in, String jsonPayload)
|
||||
throws IOException {
|
||||
private java.util.Set<com.networknt.schema.ValidationMessage> validate(
|
||||
InputStream in, String jsonPayload) throws IOException {
|
||||
System.out.println("Validating " + jsonPayload);
|
||||
Set<ValidationMessage> errors = JsonSchemaUtil.validate(in, jsonPayload, urnFactory);
|
||||
System.out.println("Errors " + errors);
|
||||
@ -59,7 +59,8 @@ public class JsonSchemaTest {
|
||||
@Test
|
||||
public void validJson() throws IOException {
|
||||
// Valid jsonPayload
|
||||
InputStream in = JsonSchemaTest.class.getClassLoader().getResourceAsStream("json/entity/testEntity.json");
|
||||
InputStream in =
|
||||
JsonSchemaTest.class.getClassLoader().getResourceAsStream("json/entity/testEntity.json");
|
||||
Map<String, Object> objectTypeMap =
|
||||
Map.of(
|
||||
"ot1", "ot1",
|
||||
@ -77,7 +78,8 @@ public class JsonSchemaTest {
|
||||
/** Validate a non-conforming JSON payload that is missing a required field using JSON schema */
|
||||
@Test
|
||||
public void missingField() throws IOException {
|
||||
InputStream in = JsonSchemaTest.class.getClassLoader().getResourceAsStream("json/entity/testEntity.json");
|
||||
InputStream in =
|
||||
JsonSchemaTest.class.getClassLoader().getResourceAsStream("json/entity/testEntity.json");
|
||||
// No mandatory field "stringProperty"
|
||||
Map<String, Object> objectTypeMap =
|
||||
Map.of(
|
||||
@ -95,11 +97,14 @@ public class JsonSchemaTest {
|
||||
assertTrue(errors.iterator().next().getMessage().contains("stringProperty: is missing"));
|
||||
}
|
||||
|
||||
/** Validate a non-conforming JSON payload that is missing a required inner field using JSON schema */
|
||||
/**
|
||||
* Validate a non-conforming JSON payload that is missing a required inner field using JSON schema
|
||||
*/
|
||||
@Test
|
||||
public void missingInnerField() throws IOException {
|
||||
// No mandatory inner field "objectType.ot1"
|
||||
InputStream in = JsonSchemaTest.class.getClassLoader().getResourceAsStream("json/entity/testEntity.json");
|
||||
InputStream in =
|
||||
JsonSchemaTest.class.getClassLoader().getResourceAsStream("json/entity/testEntity.json");
|
||||
Map<String, Object> objectTypeMap =
|
||||
Map.of(
|
||||
// Missing inner field ot1
|
||||
@ -124,7 +129,8 @@ public class JsonSchemaTest {
|
||||
@Test
|
||||
public void invalidJsonData() throws IOException {
|
||||
// Invalid value that is not a URI
|
||||
InputStream in = JsonSchemaTest.class.getClassLoader().getResourceAsStream("json/entity/testEntity.json");
|
||||
InputStream in =
|
||||
JsonSchemaTest.class.getClassLoader().getResourceAsStream("json/entity/testEntity.json");
|
||||
Map<String, Object> objectTypeMap = Map.of("ot1", "ot2", "ot2", "ot2");
|
||||
Map<String, Object> map =
|
||||
Map.of(
|
||||
@ -140,7 +146,12 @@ public class JsonSchemaTest {
|
||||
|
||||
Set<ValidationMessage> errors = validate(in, jsonPayload);
|
||||
assertEquals(1, errors.size());
|
||||
assertTrue(errors.iterator().next().getMessage().contains("uriProperty: does not match the uri pattern"));
|
||||
assertTrue(
|
||||
errors
|
||||
.iterator()
|
||||
.next()
|
||||
.getMessage()
|
||||
.contains("uriProperty: does not match the uri pattern"));
|
||||
}
|
||||
|
||||
/** Test POJO to JSON schema */
|
||||
@ -156,6 +167,7 @@ public class JsonSchemaTest {
|
||||
assertEquals("object", jsonMap.get("type"));
|
||||
|
||||
// Check properties of the object
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> propertiesMap = (Map<String, Object>) jsonMap.get("properties");
|
||||
assertEquals("{type=string, description=TODO}", propertiesMap.get("name").toString());
|
||||
}
|
||||
|
||||
@ -8,8 +8,7 @@
|
||||
* For an easy install of MySQL and ES, just install Docker on your local machine and run the following commands from the top-level directory
|
||||
|
||||
```
|
||||
cd docker/local-metadata
|
||||
docker-compose -f docker-compose-dev.yml up
|
||||
docker-compose -f docker/local-metadata/docker-compose-dev.yml up
|
||||
```
|
||||
* Bootstrap MySQL with tables
|
||||
|
||||
|
||||
22
ingestion/src/metadata/ingestion/models/ometa_policy.py
Normal file
22
ingestion/src/metadata/ingestion/models/ometa_policy.py
Normal file
@ -0,0 +1,22 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from metadata.generated.schema.entity.policies.policy import Policy
|
||||
from metadata.generated.schema.entity.data.location import Location
|
||||
|
||||
|
||||
class OMetaPolicy(BaseModel):
|
||||
policy: Policy
|
||||
# A Lifecycle Policy may be associated with a specific Location such as S3 bucket.
|
||||
location: Optional[Location]
|
||||
@ -47,6 +47,7 @@ from metadata.generated.schema.entity.teams.user import User
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.ingestion.api.common import Entity, WorkflowContext
|
||||
from metadata.ingestion.api.sink import Sink, SinkStatus
|
||||
from metadata.ingestion.models.ometa_policy import OMetaPolicy
|
||||
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
|
||||
from metadata.ingestion.models.table_metadata import Chart, Dashboard
|
||||
from metadata.ingestion.ometa.client import APIError
|
||||
@ -55,11 +56,9 @@ from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Allow types from the generated pydantic models
|
||||
T = TypeVar("T", bound=BaseModel)
|
||||
|
||||
|
||||
om_chart_type_dict = {
|
||||
"line": ChartType.Line,
|
||||
"table": ChartType.Table,
|
||||
@ -119,7 +118,7 @@ class MetadataRestSink(Sink[Entity]):
|
||||
self.write_dashboards(record)
|
||||
elif isinstance(record, Location):
|
||||
self.write_locations(record)
|
||||
elif isinstance(record, Policy):
|
||||
elif isinstance(record, OMetaPolicy):
|
||||
self.write_policies(record)
|
||||
elif isinstance(record, Pipeline):
|
||||
self.write_pipelines(record)
|
||||
@ -293,14 +292,7 @@ class MetadataRestSink(Sink[Entity]):
|
||||
|
||||
def write_locations(self, location: Location):
|
||||
try:
|
||||
location_request = CreateLocationEntityRequest(
|
||||
name=location.name,
|
||||
description=location.description,
|
||||
locationType=location.locationType,
|
||||
owner=location.owner,
|
||||
service=location.service,
|
||||
)
|
||||
created_location = self.metadata.create_or_update(location_request)
|
||||
created_location = self._create_location(location)
|
||||
logger.info(f"Successfully ingested Location {created_location.name}")
|
||||
self.status.records_written(f"Location: {created_location.name}")
|
||||
except (APIError, ValidationError) as err:
|
||||
@ -328,24 +320,44 @@ class MetadataRestSink(Sink[Entity]):
|
||||
logger.error(err)
|
||||
self.status.failure(f"Pipeline: {pipeline.name}")
|
||||
|
||||
def write_policies(self, policy: Policy):
|
||||
def write_policies(self, ometa_policy: OMetaPolicy) -> None:
|
||||
try:
|
||||
created_location = None
|
||||
if ometa_policy.location is not None:
|
||||
created_location = self._create_location(ometa_policy.location)
|
||||
logger.info(f"Successfully ingested Location {created_location.name}")
|
||||
self.status.records_written(f"Location: {created_location.name}")
|
||||
|
||||
policy_request = CreatePolicyEntityRequest(
|
||||
name=policy.name,
|
||||
displayName=policy.displayName,
|
||||
description=policy.description,
|
||||
owner=policy.owner,
|
||||
policyUrl=policy.policyUrl,
|
||||
policyType=policy.policyType,
|
||||
rules=policy.rules,
|
||||
name=ometa_policy.policy.name,
|
||||
displayName=ometa_policy.policy.displayName,
|
||||
description=ometa_policy.policy.description,
|
||||
owner=ometa_policy.policy.owner,
|
||||
policyUrl=ometa_policy.policy.policyUrl,
|
||||
policyType=ometa_policy.policy.policyType,
|
||||
rules=ometa_policy.policy.rules,
|
||||
location=created_location.id if created_location else None,
|
||||
)
|
||||
created_policy = self.metadata.create_or_update(policy_request)
|
||||
logger.info(f"Successfully ingested Policy {created_policy.name}")
|
||||
self.status.records_written(f"Policy: {created_policy.name}")
|
||||
|
||||
except (APIError, ValidationError) as err:
|
||||
logger.error(f"Failed to ingest Policy {policy.name}")
|
||||
logger.error(f"Failed to ingest Policy {ometa_policy.policy.name}")
|
||||
logger.error(err)
|
||||
self.status.failure(f"Policy: {policy.name}")
|
||||
traceback.print_exc()
|
||||
self.status.failure(f"Policy: {ometa_policy.policy.name}")
|
||||
|
||||
def _create_location(self, location: Location) -> Location:
|
||||
location_request = CreateLocationEntityRequest(
|
||||
name=location.name,
|
||||
description=location.description,
|
||||
locationType=location.locationType,
|
||||
tags=location.tags,
|
||||
owner=location.owner,
|
||||
service=location.service,
|
||||
)
|
||||
return self.metadata.create_or_update(location_request)
|
||||
|
||||
def write_lineage(self, add_lineage: AddLineage):
|
||||
try:
|
||||
|
||||
@ -22,6 +22,7 @@ from metadata.generated.schema.api.services.createStorageService import (
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.ingestion.api.common import ConfigModel, Entity, WorkflowContext
|
||||
from metadata.ingestion.api.source import Source, SourceStatus
|
||||
from metadata.ingestion.models.ometa_policy import OMetaPolicy
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
|
||||
from metadata.generated.schema.entity.data.location import Location, LocationType
|
||||
@ -73,15 +74,16 @@ class S3Source(Source[Entity]):
|
||||
def prepare(self):
|
||||
pass
|
||||
|
||||
def next_record(self) -> Iterable[Entity]:
|
||||
def next_record(self) -> Iterable[OMetaPolicy]:
|
||||
try:
|
||||
for bucket in self.s3.buckets.all():
|
||||
self.status.scanned(bucket)
|
||||
bucket_name = self._get_bucket_name_with_prefix(bucket.name)
|
||||
location_name = self._get_bucket_name_with_prefix(bucket.name)
|
||||
location_id = uuid.uuid4()
|
||||
location = Location(
|
||||
id=uuid.uuid4(),
|
||||
name=bucket_name,
|
||||
displayName=bucket_name,
|
||||
id=location_id,
|
||||
name=location_name,
|
||||
displayName=location_name,
|
||||
locationType=LocationType.Bucket,
|
||||
service=EntityReference(
|
||||
id=self.service.id,
|
||||
@ -89,14 +91,13 @@ class S3Source(Source[Entity]):
|
||||
name=self.service.name,
|
||||
),
|
||||
)
|
||||
yield location
|
||||
|
||||
# Retrieve lifecycle policies for the bucket.
|
||||
# Retrieve lifecycle policy and rules for the bucket.
|
||||
rules: List[LifecycleRule] = []
|
||||
for rule in self.s3.BucketLifecycleConfiguration(bucket.name).rules:
|
||||
rules.append(self._get_rule(rule, location))
|
||||
policy_name = f"{bucket.name}-lifecycle-policy"
|
||||
yield Policy(
|
||||
policy = Policy(
|
||||
id=uuid.uuid4(),
|
||||
name=policy_name,
|
||||
displayName=policy_name,
|
||||
@ -105,6 +106,10 @@ class S3Source(Source[Entity]):
|
||||
rules=rules,
|
||||
enabled=True,
|
||||
)
|
||||
yield OMetaPolicy(
|
||||
location=location,
|
||||
policy=policy,
|
||||
)
|
||||
except Exception as e:
|
||||
self.status.failure("error", str(e))
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user