From f1a8a7886ea0b42500e49c9a5fe3f9bf12d4e1a5 Mon Sep 17 00:00:00 2001 From: Matt Date: Thu, 23 Dec 2021 11:06:15 -0800 Subject: [PATCH] Link Lifecycle Policy to specific Location (#1720) - Amend ingestion to support Location reference from Policy - Amend PolicyResource and PolicyRepository to accommodate Location reference --- .../catalog/jdbi3/PolicyRepository.java | 103 +++++++++++-- .../catalog/jdbi3/Relationship.java | 17 ++- .../resources/policies/PolicyResource.java | 137 +++++++++++------- .../schema/api/policies/createPolicy.json | 5 + .../json/schema/entity/policies/policy.json | 4 + .../EnumBackwardCompatibilityTest.java | 22 +-- .../policies/PolicyResourceTest.java | 85 ++++++++--- .../common/utils/JsonSchemaTest.java | 32 ++-- .../developer/build-code-run-tests.md | 3 +- .../metadata/ingestion/models/ometa_policy.py | 22 +++ .../metadata/ingestion/sink/metadata_rest.py | 54 ++++--- ingestion/src/metadata/ingestion/source/s3.py | 21 ++- 12 files changed, 365 insertions(+), 140 deletions(-) create mode 100644 ingestion/src/metadata/ingestion/models/ometa_policy.py diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PolicyRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PolicyRepository.java index 38a10a5b4a5..005b9ef6ae8 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PolicyRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PolicyRepository.java @@ -21,6 +21,7 @@ import java.util.UUID; import lombok.extern.slf4j.Slf4j; import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.catalog.Entity; +import org.openmetadata.catalog.entity.data.Location; import org.openmetadata.catalog.entity.policies.Policy; import org.openmetadata.catalog.resources.policies.PolicyResource; import org.openmetadata.catalog.type.ChangeDescription; @@ -34,9 +35,11 @@ import org.openmetadata.catalog.util.JsonUtils; @Slf4j public class PolicyRepository extends EntityRepository { private static final Fields POLICY_UPDATE_FIELDS = - new Fields(PolicyResource.FIELD_LIST, "displayName,description,owner,policyUrl,enabled,rules"); + new Fields( + PolicyResource.FIELD_LIST, "displayName,description,owner,policyUrl,enabled,rules,location"); private static final Fields POLICY_PATCH_FIELDS = - new Fields(PolicyResource.FIELD_LIST, "displayName,description,owner,policyUrl,enabled,rules"); + new Fields( + PolicyResource.FIELD_LIST, "displayName,description,owner,policyUrl,enabled,rules,location"); private final CollectionDAO dao; public PolicyRepository(CollectionDAO dao) { @@ -57,7 +60,9 @@ public class PolicyRepository extends EntityRepository { @Transaction public void delete(UUID id) { - if (dao.relationshipDAO().findToCount(id.toString(), Relationship.CONTAINS.ordinal(), Entity.POLICY) > 0) { + if (dao.relationshipDAO() + .findToCount(id.toString(), Relationship.CONTAINS.ordinal(), Entity.POLICY) + > 0) { throw new IllegalArgumentException("Policy is not empty"); } dao.policyDAO().delete(id); @@ -69,6 +74,18 @@ public class PolicyRepository extends EntityRepository { return EntityUtil.populateOwner(dao.userDAO(), dao.teamDAO(), policy.getOwner()); } + /** Find the location to which this policy applies to. * */ + @Transaction + private EntityReference getLocationForPolicy(UUID policyId) throws IOException { + List result = + dao.relationshipDAO() + .findTo(policyId.toString(), Relationship.APPLIED_TO.ordinal(), Entity.LOCATION); + // There is at most one location for a policy. + return result.size() == 1 + ? dao.locationDAO().findEntityReferenceById(UUID.fromString(result.get(0))) + : null; + } + @Override public Policy setFields(Policy policy, Fields fields) throws IOException { policy.setDisplayName(fields.contains("displayName") ? policy.getDisplayName() : null); @@ -77,6 +94,7 @@ public class PolicyRepository extends EntityRepository { policy.setPolicyUrl(fields.contains("policyUrl") ? policy.getPolicyUrl() : null); policy.setEnabled(fields.contains("enabled") ? policy.getEnabled() : null); policy.setRules(fields.contains("rules") ? policy.getRules() : null); + policy.setLocation(fields.contains("location") ? getLocationForPolicy(policy.getId()) : null); return policy; } @@ -88,22 +106,43 @@ public class PolicyRepository extends EntityRepository { return new PolicyEntityInterface(entity); } + /** Generate EntityReference for a given Policy's Location. * */ + @Transaction + private EntityReference getLocationReference(Policy policy) throws IOException { + if (policy == null || policy.getLocation() == null || policy.getLocation().getId() == null) { + return null; + } + + Location location = dao.locationDAO().findEntityById(policy.getLocation().getId()); + if (location == null) { + return null; + } + return new EntityReference() + .withDescription(location.getDescription()) + .withDisplayName(location.getDisplayName()) + .withId(location.getId()) + .withHref(location.getHref()) + .withName(location.getName()) + .withType(Entity.LOCATION); + } + @Override public void prepare(Policy policy) throws IOException { policy.setFullyQualifiedName(getFQN(policy)); - + policy.setLocation(getLocationReference(policy)); // Check if owner is valid and set the relationship - EntityUtil.populateOwner(dao.userDAO(), dao.teamDAO(), policy.getOwner()); + policy.setOwner(EntityUtil.populateOwner(dao.userDAO(), dao.teamDAO(), policy.getOwner())); } @Override public void storeEntity(Policy policy, boolean update) throws IOException { // Relationships and fields such as href are derived and not stored as part of json EntityReference owner = policy.getOwner(); + EntityReference location = policy.getLocation(); URI href = policy.getHref(); - // Don't store owner and href as JSON. Build it on the fly based on relationships - policy.withOwner(null).withHref(null); + // Don't store owner, location and href as JSON. Build it on the fly based on relationships + policy.withOwner(null).withLocation(null).withHref(null); if (update) { dao.policyDAO().update(policy.getId(), JsonUtils.pojoToJson(policy)); @@ -112,13 +151,15 @@ public class PolicyRepository extends EntityRepository { } // Restore the relationships - policy.withOwner(owner).withHref(href); + policy.withOwner(owner).withLocation(location).withHref(href); } @Override public void storeRelationships(Policy policy) { - // Add policy owner relationship + // Add policy owner relationship. setOwner(policy, policy.getOwner()); + // Add location to which policy is assigned to. + setLocation(policy, policy.getLocation()); } @Override @@ -129,14 +170,28 @@ public class PolicyRepository extends EntityRepository { private EntityReference getOwner(Policy policy) throws IOException { return policy == null ? null - : EntityUtil.populateOwner(policy.getId(), dao.relationshipDAO(), dao.userDAO(), dao.teamDAO()); + : EntityUtil.populateOwner( + policy.getId(), dao.relationshipDAO(), dao.userDAO(), dao.teamDAO()); } - public void setOwner(Policy policy, EntityReference owner) { + private void setOwner(Policy policy, EntityReference owner) { EntityUtil.setOwner(dao.relationshipDAO(), policy.getId(), Entity.POLICY, owner); policy.setOwner(owner); } + private void setLocation(Policy policy, EntityReference location) { + if (location == null || location.getId() == null) { + return; + } + dao.relationshipDAO() + .insert( + policy.getId().toString(), + policy.getLocation().getId().toString(), + Entity.POLICY, + Entity.LOCATION, + Relationship.APPLIED_TO.ordinal()); + } + public static class PolicyEntityInterface implements EntityInterface { private final Policy entity; @@ -280,9 +335,33 @@ public class PolicyRepository extends EntityRepository { @Override public void entitySpecificUpdate() throws IOException { - recordChange("policyUrl", original.getEntity().getPolicyUrl(), updated.getEntity().getPolicyUrl()); + recordChange( + "policyUrl", original.getEntity().getPolicyUrl(), updated.getEntity().getPolicyUrl()); recordChange("enabled", original.getEntity().getEnabled(), updated.getEntity().getEnabled()); recordChange("rules", original.getEntity().getRules(), updated.getEntity().getRules()); + updateLocation(original.getEntity(), updated.getEntity()); + } + + private void updateLocation(Policy origPolicy, Policy updatedPolicy) throws IOException { + // remove original Policy --> Location relationship if exists. + if (origPolicy.getLocation() != null && origPolicy.getLocation().getId() != null) { + dao.relationshipDAO() + .delete( + origPolicy.getId().toString(), + origPolicy.getLocation().getId().toString(), + Relationship.APPLIED_TO.ordinal()); + } + // insert updated Policy --> Location relationship. + if (updatedPolicy.getLocation() != null && updatedPolicy.getLocation().getId() != null) { + dao.relationshipDAO() + .insert( + updatedPolicy.getId().toString(), + updatedPolicy.getLocation().getId().toString(), + Entity.POLICY, + Entity.LOCATION, + Relationship.APPLIED_TO.ordinal()); + } + recordChange("location", origPolicy.getLocation(), updatedPolicy.getLocation()); } } } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/Relationship.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/Relationship.java index 4f2df2cfdfd..983095a20ee 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/Relationship.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/Relationship.java @@ -14,15 +14,15 @@ package org.openmetadata.catalog.jdbi3; /** - * This enum captures all the relationships between Catalog entities Note that the relationship from is a Strong entity - * and to is Weak entity when possible. + * This enum captures all the relationships between Catalog entities Note that the relationship from + * is a Strong entity and to is Weak entity when possible. */ public enum Relationship { /** - * Rules for changing enums since the ordinal position is stored in the database. - Don't remove an enum, since the - * database might have stored the enum ordinal number - When adding a new enum, add it as the last enum to preserve - * the ordinal positions of the existing enums + * Rules for changing enums since the ordinal position is stored in the database. - Don't remove + * an enum, since the database might have stored the enum ordinal number - When adding a new enum, + * add it as the last enum to preserve the ordinal positions of the existing enums */ // Database --- contains --> Table // Organization --- contains --> Team @@ -74,7 +74,12 @@ public enum Relationship { // {Table1} --- upstream ---> {Table2} (Table1 is used for creating Table2} // {Pipeline} --- upstream ---> {Table2} (Pipeline creates Table2) // {Table} --- upstream ---> {Dashboard} (Table was used to create Dashboard) - UPSTREAM("upstream"); // 13 + UPSTREAM("upstream"), // 13 + + // Policy relationship + // {Policy1} -- appliedTo --> {Location1} (Policy1 is applied to Location1) + APPLIED_TO("appliedTo"); // 14 + /** * Add new enums to the end of the list * */ private final String value; diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/policies/PolicyResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/policies/PolicyResource.java index dbe64175ebe..557a088d729 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/policies/PolicyResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/policies/PolicyResource.java @@ -63,6 +63,7 @@ import org.openmetadata.catalog.resources.Collection; import org.openmetadata.catalog.security.CatalogAuthorizer; import org.openmetadata.catalog.security.SecurityUtil; import org.openmetadata.catalog.type.EntityHistory; +import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.util.EntityUtil.Fields; import org.openmetadata.catalog.util.RestUtil; import org.openmetadata.catalog.util.RestUtil.PatchResponse; @@ -80,7 +81,9 @@ public class PolicyResource { private final CatalogAuthorizer authorizer; public static ResultList addHref(UriInfo uriInfo, ResultList policies) { - Optional.ofNullable(policies.getData()).orElse(Collections.emptyList()).forEach(i -> addHref(uriInfo, i)); + Optional.ofNullable(policies.getData()) + .orElse(Collections.emptyList()) + .forEach(i -> addHref(uriInfo, i)); return policies; } @@ -108,8 +111,9 @@ public class PolicyResource { } } - static final String FIELDS = "displayName,description,owner,policyUrl,enabled,rules"; - public static final List FIELD_LIST = Arrays.asList(FIELDS.replaceAll(" ", "").split(",")); + static final String FIELDS = "displayName,description,owner,policyUrl,enabled,rules,location"; + public static final List FIELD_LIST = + Arrays.asList(FIELDS.replaceAll(" ", "").split(",")); @GET @Valid @@ -124,7 +128,10 @@ public class PolicyResource { @ApiResponse( responseCode = "200", description = "List of policies", - content = @Content(mediaType = "application/json", schema = @Schema(implementation = PolicyList.class))) + content = + @Content( + mediaType = "application/json", + schema = @Schema(implementation = PolicyList.class))) }) public ResultList list( @Context UriInfo uriInfo, @@ -134,16 +141,21 @@ public class PolicyResource { schema = @Schema(type = "string", example = FIELDS)) @QueryParam("fields") String fieldsParam, - @Parameter(description = "Limit the number policies returned. (1 to 1000000, " + "default = 10)") + @Parameter( + description = "Limit the number policies returned. (1 to 1000000, " + "default = 10)") @DefaultValue("10") @Min(1) @Max(1000000) @QueryParam("limit") int limitParam, - @Parameter(description = "Returns list of policies before this cursor", schema = @Schema(type = "string")) + @Parameter( + description = "Returns list of policies before this cursor", + schema = @Schema(type = "string")) @QueryParam("before") String before, - @Parameter(description = "Returns list of policies after this cursor", schema = @Schema(type = "string")) + @Parameter( + description = "Returns list of policies after this cursor", + schema = @Schema(type = "string")) @QueryParam("after") String after) throws IOException, GeneralSecurityException, ParseException { @@ -152,7 +164,8 @@ public class PolicyResource { ResultList policies; if (before != null) { // Reverse paging - policies = dao.listBefore(uriInfo, fields, null, limitParam, before); // Ask for one extra entry + policies = + dao.listBefore(uriInfo, fields, null, limitParam, before); // Ask for one extra entry } else { // Forward paging or first page policies = dao.listAfter(uriInfo, fields, null, limitParam, after); } @@ -169,7 +182,10 @@ public class PolicyResource { @ApiResponse( responseCode = "200", description = "The policy", - content = @Content(mediaType = "application/json", schema = @Schema(implementation = Policy.class))), + content = + @Content( + mediaType = "application/json", + schema = @Schema(implementation = Policy.class))), @ApiResponse(responseCode = "404", description = "Policy for instance {id} is not found") }) public Policy get( @@ -196,7 +212,10 @@ public class PolicyResource { @ApiResponse( responseCode = "200", description = "The policy", - content = @Content(mediaType = "application/json", schema = @Schema(implementation = Policy.class))), + content = + @Content( + mediaType = "application/json", + schema = @Schema(implementation = Policy.class))), @ApiResponse(responseCode = "404", description = "Policy for instance {id} is not found") }) public Policy getByName( @@ -224,12 +243,16 @@ public class PolicyResource { @ApiResponse( responseCode = "200", description = "List of policy versions", - content = @Content(mediaType = "application/json", schema = @Schema(implementation = EntityHistory.class))) + content = + @Content( + mediaType = "application/json", + schema = @Schema(implementation = EntityHistory.class))) }) public EntityHistory listVersions( @Context UriInfo uriInfo, @Context SecurityContext securityContext, - @Parameter(description = "policy Id", schema = @Schema(type = "string")) @PathParam("id") String id) + @Parameter(description = "policy Id", schema = @Schema(type = "string")) @PathParam("id") + String id) throws IOException, ParseException { return dao.listVersions(id); } @@ -244,7 +267,10 @@ public class PolicyResource { @ApiResponse( responseCode = "200", description = "policy", - content = @Content(mediaType = "application/json", schema = @Schema(implementation = Policy.class))), + content = + @Content( + mediaType = "application/json", + schema = @Schema(implementation = Policy.class))), @ApiResponse( responseCode = "404", description = "Policy for instance {id} and version {version} is" + " " + "not found") @@ -252,7 +278,8 @@ public class PolicyResource { public Policy getVersion( @Context UriInfo uriInfo, @Context SecurityContext securityContext, - @Parameter(description = "policy Id", schema = @Schema(type = "string")) @PathParam("id") String id, + @Parameter(description = "policy Id", schema = @Schema(type = "string")) @PathParam("id") + String id, @Parameter( description = "policy version number in the form `major`.`minor`", schema = @Schema(type = "string", example = "0.1 or 1.1")) @@ -271,25 +298,19 @@ public class PolicyResource { @ApiResponse( responseCode = "200", description = "The policy", - content = @Content(mediaType = "application/json", schema = @Schema(implementation = CreatePolicy.class))), + content = + @Content( + mediaType = "application/json", + schema = @Schema(implementation = CreatePolicy.class))), @ApiResponse(responseCode = "400", description = "Bad request") }) - public Response create(@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreatePolicy create) + public Response create( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Valid CreatePolicy create) throws IOException { SecurityUtil.checkAdminOrBotRole(authorizer, securityContext); - Policy policy = - new Policy() - .withId(UUID.randomUUID()) - .withName(create.getName()) - .withDisplayName(create.getDisplayName()) - .withDescription(create.getDescription()) - .withOwner(create.getOwner()) - .withPolicyUrl(create.getPolicyUrl()) - .withPolicyType(create.getPolicyType()) - .withUpdatedBy(securityContext.getUserPrincipal().getName()) - .withUpdatedAt(new Date()) - .withRules(create.getRules()); - + Policy policy = getPolicy(securityContext, create); policy = addHref(uriInfo, dao.create(uriInfo, policy)); return Response.created(policy.getHref()).entity(policy).build(); } @@ -300,9 +321,12 @@ public class PolicyResource { summary = "Update a policy", tags = "policies", description = "Update an existing policy using JsonPatch.", - externalDocs = @ExternalDocumentation(description = "JsonPatch RFC", url = "https://tools.ietf.org/html/rfc6902")) + externalDocs = + @ExternalDocumentation( + description = "JsonPatch RFC", + url = "https://tools.ietf.org/html/rfc6902")) @Consumes(MediaType.APPLICATION_JSON_PATCH_JSON) - public Response updateDescription( + public Response patch( @Context UriInfo uriInfo, @Context SecurityContext securityContext, @PathParam("id") String id, @@ -312,16 +336,19 @@ public class PolicyResource { @Content( mediaType = MediaType.APPLICATION_JSON_PATCH_JSON, examples = { - @ExampleObject("[" + "{op:remove, path:/a}," + "{op:add, path: /b, value: val}" + "]") + @ExampleObject( + "[" + "{op:remove, path:/a}," + "{op:add, path: /b, value: val}" + "]") })) JsonPatch patch) throws IOException, ParseException { Fields fields = new Fields(FIELD_LIST, FIELDS); Policy policy = dao.get(uriInfo, id, fields); - SecurityUtil.checkAdminRoleOrPermissions(authorizer, securityContext, dao.getOwnerReference(policy)); + SecurityUtil.checkAdminRoleOrPermissions( + authorizer, securityContext, dao.getOwnerReference(policy)); PatchResponse response = - dao.patch(uriInfo, UUID.fromString(id), securityContext.getUserPrincipal().getName(), patch); + dao.patch( + uriInfo, UUID.fromString(id), securityContext.getUserPrincipal().getName(), patch); addHref(uriInfo, response.getEntity()); return response.toResponse(); } @@ -335,25 +362,18 @@ public class PolicyResource { @ApiResponse( responseCode = "200", description = "The policy", - content = @Content(mediaType = "application/json", schema = @Schema(implementation = Policy.class))), + content = + @Content( + mediaType = "application/json", + schema = @Schema(implementation = Policy.class))), @ApiResponse(responseCode = "400", description = "Bad request") }) public Response createOrUpdate( - @Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreatePolicy create) + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Valid CreatePolicy create) throws IOException, ParseException { - Policy policy = - new Policy() - .withId(UUID.randomUUID()) - .withName(create.getName()) - .withDisplayName(create.getDisplayName()) - .withDescription(create.getDescription()) - .withOwner(create.getOwner()) - .withPolicyUrl(create.getPolicyUrl()) - .withPolicyType(create.getPolicyType()) - .withUpdatedBy(securityContext.getUserPrincipal().getName()) - .withUpdatedAt(new Date()) - .withRules(create.getRules()); - + Policy policy = getPolicy(securityContext, create); PutResponse response = dao.createOrUpdate(uriInfo, policy); addHref(uriInfo, response.getEntity()); return response.toResponse(); @@ -373,4 +393,23 @@ public class PolicyResource { dao.delete(UUID.fromString(id)); return Response.ok().build(); } + + private Policy getPolicy(SecurityContext securityContext, CreatePolicy create) { + Policy policy = + new Policy() + .withId(UUID.randomUUID()) + .withName(create.getName()) + .withDisplayName(create.getDisplayName()) + .withDescription(create.getDescription()) + .withOwner(create.getOwner()) + .withPolicyUrl(create.getPolicyUrl()) + .withPolicyType(create.getPolicyType()) + .withUpdatedBy(securityContext.getUserPrincipal().getName()) + .withUpdatedAt(new Date()) + .withRules(create.getRules()); + if (create.getLocation() != null) { + policy = policy.withLocation(new EntityReference().withId(create.getLocation())); + } + return policy; + } } diff --git a/catalog-rest-service/src/main/resources/json/schema/api/policies/createPolicy.json b/catalog-rest-service/src/main/resources/json/schema/api/policies/createPolicy.json index 8ab9752a503..3da6ad02cfa 100644 --- a/catalog-rest-service/src/main/resources/json/schema/api/policies/createPolicy.json +++ b/catalog-rest-service/src/main/resources/json/schema/api/policies/createPolicy.json @@ -31,6 +31,11 @@ }, "rules": { "$ref": "../../entity/policies/policy.json#/definitions/rules" + }, + "location" : { + "description": "UUID of Location where this policy is applied", + "$ref": "../../type/basic.json#/definitions/uuid", + "default" : null } }, "required": [ diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/policies/policy.json b/catalog-rest-service/src/main/resources/json/schema/entity/policies/policy.json index 6b32a01ea85..0c6e0b57681 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/policies/policy.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/policies/policy.json @@ -107,6 +107,10 @@ }, "rules": { "$ref": "#/definitions/rules" + }, + "location": { + "$ref": "../../type/entityReference.json", + "default": null } }, "required": [ diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/EnumBackwardCompatibilityTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/EnumBackwardCompatibilityTest.java index fcca691e050..9c50920d404 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/EnumBackwardCompatibilityTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/EnumBackwardCompatibilityTest.java @@ -22,22 +22,25 @@ import org.openmetadata.catalog.type.TagLabel.LabelType; import org.openmetadata.catalog.type.TagLabel.State; /** - * Enum ordinal number is stored in the database. New enums must be added at the end to ensure backward compatibility + * Enum ordinal number is stored in the database. New enums must be added at the end to ensure + * backward compatibility */ public class EnumBackwardCompatibilityTest { /** - * Any time a new enum is added, this test will fail. Update the test with total number of enums and test the ordinal - * number of the last enum. This will help catch new enum inadvertently being added in the middle. + * Any time a new enum is added, this test will fail. Update the test with total number of enums + * and test the ordinal number of the last enum. This will help catch new enum inadvertently being + * added in the middle. */ @Test public void testRelationshipEnumBackwardCompatible() { - assertEquals(14, Relationship.values().length); - assertEquals(13, Relationship.UPSTREAM.ordinal()); + assertEquals(15, Relationship.values().length); + assertEquals(14, Relationship.APPLIED_TO.ordinal()); } /** - * Any time a new enum is added, this test will fail. Update the test with total number of enums and test the ordinal - * number of the last enum. This will help catch new enum inadvertently being added in the middle. + * Any time a new enum is added, this test will fail. Update the test with total number of enums + * and test the ordinal number of the last enum. This will help catch new enum inadvertently being + * added in the middle. */ @Test public void testTagLabelEnumBackwardCompatible() { @@ -46,8 +49,9 @@ public class EnumBackwardCompatibilityTest { } /** - * Any time a new enum is added, this test will fail. Update the test with total number of enums and test the ordinal - * number of the last enum. This will help catch new enum inadvertently being added in the middle. + * Any time a new enum is added, this test will fail. Update the test with total number of enums + * and test the ordinal number of the last enum. This will help catch new enum inadvertently being + * added in the middle. */ @Test public void testTagStateEnumBackwardCompatible() { diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/policies/PolicyResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/policies/PolicyResourceTest.java index 0c230845fa6..1498184c067 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/policies/PolicyResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/policies/PolicyResourceTest.java @@ -55,7 +55,15 @@ import org.openmetadata.catalog.util.TestUtils; public class PolicyResourceTest extends EntityResourceTest { public PolicyResourceTest() { - super(Entity.POLICY, Policy.class, PolicyList.class, "policies", PolicyResource.FIELDS, false, true, false); + super( + Entity.POLICY, + Policy.class, + PolicyList.class, + "policies", + PolicyResource.FIELDS, + false, + true, + false); } @BeforeAll @@ -64,12 +72,14 @@ public class PolicyResourceTest extends EntityResourceTest { } @Override - public Object createRequest(String name, String description, String displayName, EntityReference owner) { + public Object createRequest( + String name, String description, String displayName, EntityReference owner) { return create(name).withDescription(description).withDisplayName(displayName).withOwner(owner); } @Override - public void validateCreatedEntity(Policy policy, Object request, Map authHeaders) { + public void validateCreatedEntity( + Policy policy, Object request, Map authHeaders) { CreatePolicy createRequest = (CreatePolicy) request; validateCommonEntityFields( getEntityInterface(policy), @@ -80,7 +90,8 @@ public class PolicyResourceTest extends EntityResourceTest { } @Override - public void validateUpdatedEntity(Policy updatedEntity, Object request, Map authHeaders) { + public void validateUpdatedEntity( + Policy updatedEntity, Object request, Map authHeaders) { validateCreatedEntity(updatedEntity, request, authHeaders); } @@ -93,7 +104,8 @@ public class PolicyResourceTest extends EntityResourceTest { } @Override - public void assertFieldChange(String fieldName, Object expected, Object actual) throws IOException { + public void assertFieldChange(String fieldName, Object expected, Object actual) + throws IOException { if (expected == actual) { return; } @@ -141,7 +153,9 @@ public class PolicyResourceTest extends EntityResourceTest { public void post_Policy_as_non_admin_401(TestInfo test) { CreatePolicy create = create(test); HttpResponseException exception = - assertThrows(HttpResponseException.class, () -> createPolicy(create, authHeaders("test@open-metadata.org"))); + assertThrows( + HttpResponseException.class, + () -> createPolicy(create, authHeaders("test@open-metadata.org"))); assertResponse(exception, FORBIDDEN, "Principal: CatalogPrincipal{name='test'} is not admin"); } @@ -149,22 +163,33 @@ public class PolicyResourceTest extends EntityResourceTest { public void get_PolicyListWithInvalidLimitOffset_4xx() { // Limit must be >= 1 and <= 1000,000 HttpResponseException exception = - assertThrows(HttpResponseException.class, () -> listPolicies(null, -1, null, null, adminAuthHeaders())); - assertResponse(exception, BAD_REQUEST, "[query param limit must be greater than or equal to 1]"); - - exception = assertThrows(HttpResponseException.class, () -> listPolicies(null, 0, null, null, adminAuthHeaders())); - assertResponse(exception, BAD_REQUEST, "[query param limit must be greater than or equal to 1]"); + assertThrows( + HttpResponseException.class, + () -> listPolicies(null, -1, null, null, adminAuthHeaders())); + assertResponse( + exception, BAD_REQUEST, "[query param limit must be greater than or equal to 1]"); exception = - assertThrows(HttpResponseException.class, () -> listPolicies(null, 1000001, null, null, adminAuthHeaders())); - assertResponse(exception, BAD_REQUEST, "[query param limit must be less than or equal to 1000000]"); + assertThrows( + HttpResponseException.class, + () -> listPolicies(null, 0, null, null, adminAuthHeaders())); + assertResponse( + exception, BAD_REQUEST, "[query param limit must be greater than or equal to 1]"); + + exception = + assertThrows( + HttpResponseException.class, + () -> listPolicies(null, 1000001, null, null, adminAuthHeaders())); + assertResponse( + exception, BAD_REQUEST, "[query param limit must be less than or equal to 1000000]"); } @Test public void get_PolicyListWithInvalidPaginationCursors_4xx() { // Passing both before and after cursors is invalid HttpResponseException exception = - assertThrows(HttpResponseException.class, () -> listPolicies(null, 1, "", "", adminAuthHeaders())); + assertThrows( + HttpResponseException.class, () -> listPolicies(null, 1, "", "", adminAuthHeaders())); assertResponse(exception, BAD_REQUEST, "Only one of before or after query parameter allowed"); } @@ -197,12 +222,14 @@ public class PolicyResourceTest extends EntityResourceTest { before = forwardPage.getPaging().getBefore(); assertEntityPagination(allPolicies.getData(), forwardPage, limit, indexInAllPolicies); - if (pageCount == 0) { // CASE 0 - First page is being returned. Therefore, before cursor is null + if (pageCount + == 0) { // CASE 0 - First page is being returned. Therefore, before cursor is null assertNull(before); } else { // Make sure scrolling back based on before cursor returns the correct result backwardPage = listPolicies(null, limit, before, null, adminAuthHeaders()); - assertEntityPagination(allPolicies.getData(), backwardPage, limit, (indexInAllPolicies - limit)); + assertEntityPagination( + allPolicies.getData(), backwardPage, limit, (indexInAllPolicies - limit)); } indexInAllPolicies += forwardPage.getData().size(); @@ -246,7 +273,9 @@ public class PolicyResourceTest extends EntityResourceTest { policy.setEnabled(false); ChangeDescription change = getChangeDescription(policy.getVersion()); change.getFieldsAdded().add(new FieldChange().withName("policyUrl").withNewValue(uri)); - change.getFieldsUpdated().add(new FieldChange().withName("enabled").withOldValue(true).withNewValue(false)); + change + .getFieldsUpdated() + .add(new FieldChange().withName("enabled").withOldValue(true).withNewValue(false)); policy = patchEntityAndCheck(policy, origJson, adminAuthHeaders(), MINOR_UPDATE, change); // Remove policyUrl @@ -268,13 +297,18 @@ public class PolicyResourceTest extends EntityResourceTest { // TODO } - public static Policy createPolicy(CreatePolicy create, Map authHeaders) throws HttpResponseException { + public static Policy createPolicy(CreatePolicy create, Map authHeaders) + throws HttpResponseException { return TestUtils.post(getResource("policies"), create, Policy.class, authHeaders); } - /** Validate returned fields GET .../policies/{id}?fields="..." or GET .../policies/name/{fqn}?fields="..." */ + /** + * Validate returned fields GET .../policies/{id}?fields="..." or GET + * .../policies/name/{fqn}?fields="..." + */ @Override - public void validateGetWithDifferentFields(Policy policy, boolean byName) throws HttpResponseException { + public void validateGetWithDifferentFields(Policy policy, boolean byName) + throws HttpResponseException { // .../policies?fields=owner String fields = "owner"; policy = @@ -300,7 +334,8 @@ public class PolicyResourceTest extends EntityResourceTest { assertNotNull(policy.getOwner()); } - public static Policy getPolicy(UUID id, String fields, Map authHeaders) throws HttpResponseException { + public static Policy getPolicy(UUID id, String fields, Map authHeaders) + throws HttpResponseException { WebTarget target = getResource("policies/" + id); target = fields != null ? target.queryParam("fields", fields) : target; return TestUtils.get(target, Policy.class, authHeaders); @@ -314,7 +349,11 @@ public class PolicyResourceTest extends EntityResourceTest { } public static PolicyList listPolicies( - String fields, Integer limitParam, String before, String after, Map authHeaders) + String fields, + Integer limitParam, + String before, + String after, + Map authHeaders) throws HttpResponseException { WebTarget target = getResource("policies"); target = fields != null ? target.queryParam("fields", fields) : target; @@ -336,7 +375,7 @@ public class PolicyResourceTest extends EntityResourceTest { return new CreatePolicy() .withName(name) .withDescription("description") - .withPolicyType(PolicyType.AccessControl) + .withPolicyType(PolicyType.Lifecycle) .withOwner(USER_OWNER1); } } diff --git a/common/src/test/java/org/openmetadata/common/utils/JsonSchemaTest.java b/common/src/test/java/org/openmetadata/common/utils/JsonSchemaTest.java index 6f86347bcc9..adf116b6262 100644 --- a/common/src/test/java/org/openmetadata/common/utils/JsonSchemaTest.java +++ b/common/src/test/java/org/openmetadata/common/utils/JsonSchemaTest.java @@ -34,8 +34,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; /** - * This test provides examples of how to use: - JSON schema to validate the JSON payload - Generate JSON schema from - * POJO + * This test provides examples of how to use: - JSON schema to validate the JSON payload - Generate + * JSON schema from POJO */ public class JsonSchemaTest { private final UUID TEST_UUID = UUID.randomUUID(); @@ -47,8 +47,8 @@ public class JsonSchemaTest { urnFactory = JsonSchemaUtil.getUrnFactory(); } - private java.util.Set validate(InputStream in, String jsonPayload) - throws IOException { + private java.util.Set validate( + InputStream in, String jsonPayload) throws IOException { System.out.println("Validating " + jsonPayload); Set errors = JsonSchemaUtil.validate(in, jsonPayload, urnFactory); System.out.println("Errors " + errors); @@ -59,7 +59,8 @@ public class JsonSchemaTest { @Test public void validJson() throws IOException { // Valid jsonPayload - InputStream in = JsonSchemaTest.class.getClassLoader().getResourceAsStream("json/entity/testEntity.json"); + InputStream in = + JsonSchemaTest.class.getClassLoader().getResourceAsStream("json/entity/testEntity.json"); Map objectTypeMap = Map.of( "ot1", "ot1", @@ -77,7 +78,8 @@ public class JsonSchemaTest { /** Validate a non-conforming JSON payload that is missing a required field using JSON schema */ @Test public void missingField() throws IOException { - InputStream in = JsonSchemaTest.class.getClassLoader().getResourceAsStream("json/entity/testEntity.json"); + InputStream in = + JsonSchemaTest.class.getClassLoader().getResourceAsStream("json/entity/testEntity.json"); // No mandatory field "stringProperty" Map objectTypeMap = Map.of( @@ -95,11 +97,14 @@ public class JsonSchemaTest { assertTrue(errors.iterator().next().getMessage().contains("stringProperty: is missing")); } - /** Validate a non-conforming JSON payload that is missing a required inner field using JSON schema */ + /** + * Validate a non-conforming JSON payload that is missing a required inner field using JSON schema + */ @Test public void missingInnerField() throws IOException { // No mandatory inner field "objectType.ot1" - InputStream in = JsonSchemaTest.class.getClassLoader().getResourceAsStream("json/entity/testEntity.json"); + InputStream in = + JsonSchemaTest.class.getClassLoader().getResourceAsStream("json/entity/testEntity.json"); Map objectTypeMap = Map.of( // Missing inner field ot1 @@ -124,7 +129,8 @@ public class JsonSchemaTest { @Test public void invalidJsonData() throws IOException { // Invalid value that is not a URI - InputStream in = JsonSchemaTest.class.getClassLoader().getResourceAsStream("json/entity/testEntity.json"); + InputStream in = + JsonSchemaTest.class.getClassLoader().getResourceAsStream("json/entity/testEntity.json"); Map objectTypeMap = Map.of("ot1", "ot2", "ot2", "ot2"); Map map = Map.of( @@ -140,7 +146,12 @@ public class JsonSchemaTest { Set errors = validate(in, jsonPayload); assertEquals(1, errors.size()); - assertTrue(errors.iterator().next().getMessage().contains("uriProperty: does not match the uri pattern")); + assertTrue( + errors + .iterator() + .next() + .getMessage() + .contains("uriProperty: does not match the uri pattern")); } /** Test POJO to JSON schema */ @@ -156,6 +167,7 @@ public class JsonSchemaTest { assertEquals("object", jsonMap.get("type")); // Check properties of the object + @SuppressWarnings("unchecked") Map propertiesMap = (Map) jsonMap.get("properties"); assertEquals("{type=string, description=TODO}", propertiesMap.get("name").toString()); } diff --git a/docs/open-source-community/developer/build-code-run-tests.md b/docs/open-source-community/developer/build-code-run-tests.md index 34201f8a6d7..ebb97df8884 100644 --- a/docs/open-source-community/developer/build-code-run-tests.md +++ b/docs/open-source-community/developer/build-code-run-tests.md @@ -8,8 +8,7 @@ * For an easy install of MySQL and ES, just install Docker on your local machine and run the following commands from the top-level directory ``` - cd docker/local-metadata - docker-compose -f docker-compose-dev.yml up + docker-compose -f docker/local-metadata/docker-compose-dev.yml up ``` * Bootstrap MySQL with tables diff --git a/ingestion/src/metadata/ingestion/models/ometa_policy.py b/ingestion/src/metadata/ingestion/models/ometa_policy.py new file mode 100644 index 00000000000..f4d81c1dd9a --- /dev/null +++ b/ingestion/src/metadata/ingestion/models/ometa_policy.py @@ -0,0 +1,22 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Optional + +from pydantic import BaseModel + +from metadata.generated.schema.entity.policies.policy import Policy +from metadata.generated.schema.entity.data.location import Location + + +class OMetaPolicy(BaseModel): + policy: Policy + # A Lifecycle Policy may be associated with a specific Location such as S3 bucket. + location: Optional[Location] diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index d3eb97cea69..3d558f4542a 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -47,6 +47,7 @@ from metadata.generated.schema.entity.teams.user import User from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.common import Entity, WorkflowContext from metadata.ingestion.api.sink import Sink, SinkStatus +from metadata.ingestion.models.ometa_policy import OMetaPolicy from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.models.table_metadata import Chart, Dashboard from metadata.ingestion.ometa.client import APIError @@ -55,11 +56,9 @@ from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig logger = logging.getLogger(__name__) - # Allow types from the generated pydantic models T = TypeVar("T", bound=BaseModel) - om_chart_type_dict = { "line": ChartType.Line, "table": ChartType.Table, @@ -119,7 +118,7 @@ class MetadataRestSink(Sink[Entity]): self.write_dashboards(record) elif isinstance(record, Location): self.write_locations(record) - elif isinstance(record, Policy): + elif isinstance(record, OMetaPolicy): self.write_policies(record) elif isinstance(record, Pipeline): self.write_pipelines(record) @@ -293,14 +292,7 @@ class MetadataRestSink(Sink[Entity]): def write_locations(self, location: Location): try: - location_request = CreateLocationEntityRequest( - name=location.name, - description=location.description, - locationType=location.locationType, - owner=location.owner, - service=location.service, - ) - created_location = self.metadata.create_or_update(location_request) + created_location = self._create_location(location) logger.info(f"Successfully ingested Location {created_location.name}") self.status.records_written(f"Location: {created_location.name}") except (APIError, ValidationError) as err: @@ -328,24 +320,44 @@ class MetadataRestSink(Sink[Entity]): logger.error(err) self.status.failure(f"Pipeline: {pipeline.name}") - def write_policies(self, policy: Policy): + def write_policies(self, ometa_policy: OMetaPolicy) -> None: try: + created_location = None + if ometa_policy.location is not None: + created_location = self._create_location(ometa_policy.location) + logger.info(f"Successfully ingested Location {created_location.name}") + self.status.records_written(f"Location: {created_location.name}") + policy_request = CreatePolicyEntityRequest( - name=policy.name, - displayName=policy.displayName, - description=policy.description, - owner=policy.owner, - policyUrl=policy.policyUrl, - policyType=policy.policyType, - rules=policy.rules, + name=ometa_policy.policy.name, + displayName=ometa_policy.policy.displayName, + description=ometa_policy.policy.description, + owner=ometa_policy.policy.owner, + policyUrl=ometa_policy.policy.policyUrl, + policyType=ometa_policy.policy.policyType, + rules=ometa_policy.policy.rules, + location=created_location.id if created_location else None, ) created_policy = self.metadata.create_or_update(policy_request) logger.info(f"Successfully ingested Policy {created_policy.name}") self.status.records_written(f"Policy: {created_policy.name}") + except (APIError, ValidationError) as err: - logger.error(f"Failed to ingest Policy {policy.name}") + logger.error(f"Failed to ingest Policy {ometa_policy.policy.name}") logger.error(err) - self.status.failure(f"Policy: {policy.name}") + traceback.print_exc() + self.status.failure(f"Policy: {ometa_policy.policy.name}") + + def _create_location(self, location: Location) -> Location: + location_request = CreateLocationEntityRequest( + name=location.name, + description=location.description, + locationType=location.locationType, + tags=location.tags, + owner=location.owner, + service=location.service, + ) + return self.metadata.create_or_update(location_request) def write_lineage(self, add_lineage: AddLineage): try: diff --git a/ingestion/src/metadata/ingestion/source/s3.py b/ingestion/src/metadata/ingestion/source/s3.py index 2d0239063bb..3f716d6af9a 100644 --- a/ingestion/src/metadata/ingestion/source/s3.py +++ b/ingestion/src/metadata/ingestion/source/s3.py @@ -22,6 +22,7 @@ from metadata.generated.schema.api.services.createStorageService import ( from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.common import ConfigModel, Entity, WorkflowContext from metadata.ingestion.api.source import Source, SourceStatus +from metadata.ingestion.models.ometa_policy import OMetaPolicy from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig from metadata.generated.schema.entity.data.location import Location, LocationType @@ -73,15 +74,16 @@ class S3Source(Source[Entity]): def prepare(self): pass - def next_record(self) -> Iterable[Entity]: + def next_record(self) -> Iterable[OMetaPolicy]: try: for bucket in self.s3.buckets.all(): self.status.scanned(bucket) - bucket_name = self._get_bucket_name_with_prefix(bucket.name) + location_name = self._get_bucket_name_with_prefix(bucket.name) + location_id = uuid.uuid4() location = Location( - id=uuid.uuid4(), - name=bucket_name, - displayName=bucket_name, + id=location_id, + name=location_name, + displayName=location_name, locationType=LocationType.Bucket, service=EntityReference( id=self.service.id, @@ -89,14 +91,13 @@ class S3Source(Source[Entity]): name=self.service.name, ), ) - yield location - # Retrieve lifecycle policies for the bucket. + # Retrieve lifecycle policy and rules for the bucket. rules: List[LifecycleRule] = [] for rule in self.s3.BucketLifecycleConfiguration(bucket.name).rules: rules.append(self._get_rule(rule, location)) policy_name = f"{bucket.name}-lifecycle-policy" - yield Policy( + policy = Policy( id=uuid.uuid4(), name=policy_name, displayName=policy_name, @@ -105,6 +106,10 @@ class S3Source(Source[Entity]): rules=rules, enabled=True, ) + yield OMetaPolicy( + location=location, + policy=policy, + ) except Exception as e: self.status.failure("error", str(e))