From 3831bb9e4dad801e676b5fc5747d43c84ceaf93c Mon Sep 17 00:00:00 2001 From: Matt Date: Fri, 10 Dec 2021 07:19:43 -0800 Subject: [PATCH] Implement s3 bucket policy ingestion (#1570) - Fix broken json schema policies (minItems in array instead of minLength) - Amend s3 ingestion to create policies - Amend PolicyResource and PolicyRepository to support Policy Rules --- .../catalog/jdbi3/PolicyRepository.java | 14 +++- .../resources/policies/PolicyResource.java | 9 ++- .../schema/api/policies/createPolicy.json | 3 + .../entity/policies/accessControl/rule.json | 11 +++- .../policies/accessControl/tagBased.json | 4 +- .../json/schema/entity/policies/filters.json | 16 ++++- .../entity/policies/lifecycle/moveAction.json | 14 ---- .../entity/policies/lifecycle/rule.json | 11 +++- .../json/schema/entity/policies/policy.json | 28 ++++---- .../src/metadata/ingestion/ometa/ometa_api.py | 10 +++ .../metadata/ingestion/sink/metadata_rest.py | 25 +++++++ ingestion/src/metadata/ingestion/source/s3.py | 66 +++++++++++++++++-- 12 files changed, 167 insertions(+), 44 deletions(-) diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PolicyRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PolicyRepository.java index 4fca7fba762..f1306214d5d 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PolicyRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PolicyRepository.java @@ -36,9 +36,9 @@ import java.util.UUID; @Slf4j public class PolicyRepository extends EntityRepository { private static final Fields POLICY_UPDATE_FIELDS = new Fields(PolicyResource.FIELD_LIST, - "displayName,description,owner,policyUrl,enabled"); + "displayName,description,owner,policyUrl,enabled,rules"); private static final Fields POLICY_PATCH_FIELDS = new Fields(PolicyResource.FIELD_LIST, - "displayName,description,owner,policyUrl,enabled"); + "displayName,description,owner,policyUrl,enabled,rules"); private final CollectionDAO dao; public PolicyRepository(CollectionDAO dao) { @@ -72,6 +72,7 @@ public class PolicyRepository extends EntityRepository { policy.setOwner(fields.contains("owner") ? getOwner(policy) : null); policy.setPolicyUrl(fields.contains("policyUrl") ? policy.getPolicyUrl() : null); policy.setEnabled(fields.contains("enabled") ? policy.getEnabled() : null); + policy.setRules(fields.contains("rules") ? policy.getRules() : null); return policy; } @@ -171,6 +172,10 @@ public class PolicyRepository extends EntityRepository { return null; } + public List getRules() { + return entity.getRules(); + } + @Override public Double getVersion() { return entity.getVersion(); @@ -245,6 +250,10 @@ public class PolicyRepository extends EntityRepository { entity.setOwner(owner); } + public void setRules(List rules) { + entity.setRules(rules); + } + @Override public Policy withHref(URI href) { return entity.withHref(href); } @@ -266,6 +275,7 @@ public class PolicyRepository extends EntityRepository { public void entitySpecificUpdate() throws IOException { recordChange("policyUrl", original.getEntity().getPolicyUrl(), updated.getEntity().getPolicyUrl()); recordChange("enabled", original.getEntity().getEnabled(), updated.getEntity().getEnabled()); + recordChange("rules", original.getEntity().getRules(), updated.getEntity().getRules()); } } } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/policies/PolicyResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/policies/PolicyResource.java index df2d19e262c..fa87f211586 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/policies/PolicyResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/policies/PolicyResource.java @@ -109,7 +109,7 @@ public class PolicyResource { } } - static final String FIELDS = "displayName,description,owner,policyUrl,enabled"; + static final String FIELDS = "displayName,description,owner,policyUrl,enabled,rules"; public static final List FIELD_LIST = Arrays.asList(FIELDS.replaceAll(" ", "") .split(",")); @@ -254,7 +254,9 @@ public class PolicyResource { .withPolicyUrl(create.getPolicyUrl()) .withPolicyType(create.getPolicyType()) .withUpdatedBy(securityContext.getUserPrincipal().getName()) - .withUpdatedAt(new Date()); + .withUpdatedAt(new Date()) + .withRules(create.getRules()); + policy = addHref(uriInfo, dao.create(uriInfo, policy)); return Response.created(policy.getHref()).entity(policy).build(); } @@ -306,7 +308,8 @@ public class PolicyResource { .withPolicyUrl(create.getPolicyUrl()) .withPolicyType(create.getPolicyType()) .withUpdatedBy(securityContext.getUserPrincipal().getName()) - .withUpdatedAt(new Date()); + .withUpdatedAt(new Date()) + .withRules(create.getRules()); PutResponse response = dao.createOrUpdate(uriInfo, policy); addHref(uriInfo, response.getEntity()); diff --git a/catalog-rest-service/src/main/resources/json/schema/api/policies/createPolicy.json b/catalog-rest-service/src/main/resources/json/schema/api/policies/createPolicy.json index 8395a6532b2..8ab9752a503 100644 --- a/catalog-rest-service/src/main/resources/json/schema/api/policies/createPolicy.json +++ b/catalog-rest-service/src/main/resources/json/schema/api/policies/createPolicy.json @@ -28,6 +28,9 @@ }, "policyType": { "$ref": "../../entity/policies/policy.json#/definitions/policyType" + }, + "rules": { + "$ref": "../../entity/policies/policy.json#/definitions/rules" } }, "required": [ diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/policies/accessControl/rule.json b/catalog-rest-service/src/main/resources/json/schema/entity/policies/accessControl/rule.json index fdde18247bb..9f89f933434 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/policies/accessControl/rule.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/policies/accessControl/rule.json @@ -6,13 +6,17 @@ "type": "object", "javaType": "org.openmetadata.catalog.entity.policies.accessControl.Rule", "properties": { + "name": { + "description": "Name that identifies this Rule.", + "type": "string" + }, "filters": { "$ref": "../filters.json#/definitions/filters" }, "actions": { "description": "A set of access control enforcements to take on the entities.", "type": "array", - "minLength": 1, + "minItems": 1, "items": { "anyOf": [ { @@ -20,6 +24,11 @@ } ] } + }, + "enabled": { + "description": "Is the rule enabled.", + "type": "boolean", + "default": true } }, "required": [ diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/policies/accessControl/tagBased.json b/catalog-rest-service/src/main/resources/json/schema/entity/policies/accessControl/tagBased.json index fa1fe8636e2..fd6900233cb 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/policies/accessControl/tagBased.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/policies/accessControl/tagBased.json @@ -9,7 +9,7 @@ "tags": { "description": "Tags that are associated with the entities.", "type": "array", - "minLength": 1, + "minItems": 1, "items": [ { "$ref": "../../../type/tagLabel.json" @@ -19,7 +19,7 @@ "allow": { "description": "Teams and Users who are able to access the tagged entities.", "type": "array", - "minLength": 1, + "minItems": 1, "items": { "anyOf": [ { diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/policies/filters.json b/catalog-rest-service/src/main/resources/json/schema/entity/policies/filters.json index 52f52d3a06a..257cd5930bc 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/policies/filters.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/policies/filters.json @@ -3,15 +3,25 @@ "$schema": "http://json-schema.org/draft-07/schema#", "title": "Filters", "definitions": { + "prefix": { + "description": "Prefix path of the entity.", + "type": "string" + }, + "regex": { + "description": "Regex that matches the entity.", + "type": "string" + }, "filters": { "description": "The set of filters that are used to match on entities. A logical AND operation is applied across all filters.", "type": "array", - "minLength": 1, + "minItems": 1, "items": { "anyOf": [ { - "description": "Regex that matches the entity FQN.", - "type": "string" + "$ref": "#/definitions/prefix" + }, + { + "$ref": "#/definitions/regex" }, { "description": "Entity tags to match on.", diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/policies/lifecycle/moveAction.json b/catalog-rest-service/src/main/resources/json/schema/entity/policies/lifecycle/moveAction.json index 52be89470fa..3255259548d 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/policies/lifecycle/moveAction.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/policies/lifecycle/moveAction.json @@ -35,19 +35,5 @@ } } }, - "oneOf": [ - { - "required": [ - "daysAfterCreation", - "destination" - ] - }, - { - "required": [ - "daysAfterModification", - "destination" - ] - } - ], "additionalProperties": false } \ No newline at end of file diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/policies/lifecycle/rule.json b/catalog-rest-service/src/main/resources/json/schema/entity/policies/lifecycle/rule.json index dc4be92dbe1..24283f49871 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/policies/lifecycle/rule.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/policies/lifecycle/rule.json @@ -6,13 +6,17 @@ "type": "object", "javaType": "org.openmetadata.catalog.entity.policies.lifecycle.Rule", "properties": { + "name": { + "description": "Name that identifies this Rule.", + "type": "string" + }, "filters": { "$ref": "../filters.json#/definitions/filters" }, "actions": { "description": "A set of actions to take on the entities.", "type": "array", - "minLength": 1, + "minItems": 1, "items": { "anyOf": [ { @@ -23,6 +27,11 @@ } ] } + }, + "enabled": { + "description": "Is the rule enabled.", + "type": "boolean", + "default": true } }, "required": [ diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/policies/policy.json b/catalog-rest-service/src/main/resources/json/schema/entity/policies/policy.json index 9b6818e1a13..6b32a01ea85 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/policies/policy.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/policies/policy.json @@ -27,6 +27,20 @@ "name": "Lifecycle" } ] + }, + "rules": { + "description": "A set of rules associated with the Policy.", + "type": "array", + "items": { + "anyOf": [ + { + "$ref": "accessControl/rule.json" + }, + { + "$ref": "lifecycle/rule.json" + } + ] + } } }, "properties": { @@ -92,19 +106,7 @@ "$ref": "../../type/entityHistory.json#/definitions/changeDescription" }, "rules": { - "description": "A set of rules associated with this Policy.", - "type": "array", - "minLength": 1, - "items": { - "anyOf": [ - { - "$ref": "accessControl/rule.json" - }, - { - "$ref": "lifecycle/rule.json" - } - ] - } + "$ref": "#/definitions/rules" } }, "required": [ diff --git a/ingestion/src/metadata/ingestion/ometa/ometa_api.py b/ingestion/src/metadata/ingestion/ometa/ometa_api.py index d88f77ea05d..85caeb10df0 100644 --- a/ingestion/src/metadata/ingestion/ometa/ometa_api.py +++ b/ingestion/src/metadata/ingestion/ometa/ometa_api.py @@ -25,6 +25,7 @@ from metadata.generated.schema.entity.data.pipeline import Pipeline from metadata.generated.schema.entity.data.report import Report from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.topic import Topic +from metadata.generated.schema.entity.policies.policy import Policy from metadata.generated.schema.entity.services.dashboardService import DashboardService from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.messagingService import MessagingService @@ -90,6 +91,7 @@ class OpenMetadata(OMetaLineageMixin, OMetaTableMixin, Generic[T, C]): entity_path = "entity" api_path = "api" data_path = "data" + policies_path = "policies" services_path = "services" teams_path = "teams" @@ -160,6 +162,11 @@ class OpenMetadata(OMetaLineageMixin, OMetaTableMixin, Generic[T, C]): ): return "/locations" + if issubclass( + entity, get_args(Union[Policy, self.get_create_entity_type(Policy)]) + ): + return "/policies" + if issubclass( entity, get_args(Union[Table, self.get_create_entity_type(Table)]) ): @@ -251,6 +258,9 @@ class OpenMetadata(OMetaLineageMixin, OMetaTableMixin, Generic[T, C]): it is found inside generated """ + if "policy" in entity.__name__.lower(): + return self.policies_path + if "service" in entity.__name__.lower(): return self.services_path diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index c1f645e6a3e..edddec7a98c 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -32,6 +32,9 @@ from metadata.generated.schema.api.data.createPipeline import ( ) from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest from metadata.generated.schema.api.data.createTopic import CreateTopicEntityRequest +from metadata.generated.schema.api.policies.createPolicy import ( + CreatePolicyEntityRequest, +) from metadata.generated.schema.api.lineage.addLineage import AddLineage from metadata.generated.schema.api.teams.createTeam import CreateTeamEntityRequest from metadata.generated.schema.api.teams.createUser import CreateUserEntityRequest @@ -39,6 +42,7 @@ from metadata.generated.schema.entity.data.chart import ChartType from metadata.generated.schema.entity.data.location import Location from metadata.generated.schema.entity.data.mlmodel import MlModel from metadata.generated.schema.entity.data.pipeline import Pipeline +from metadata.generated.schema.entity.policies.policy import Policy from metadata.generated.schema.entity.teams.user import User from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.common import Entity, WorkflowContext @@ -115,6 +119,8 @@ class MetadataRestSink(Sink[Entity]): self.write_dashboards(record) elif isinstance(record, Location): self.write_locations(record) + elif isinstance(record, Policy): + self.write_policies(record) elif isinstance(record, Pipeline): self.write_pipelines(record) elif isinstance(record, AddLineage): @@ -309,6 +315,25 @@ class MetadataRestSink(Sink[Entity]): logger.error(err) self.status.failure(f"Pipeline: {pipeline.name}") + def write_policies(self, policy: Policy): + try: + policy_request = CreatePolicyEntityRequest( + name=policy.name, + displayName=policy.displayName, + description=policy.description, + owner=policy.owner, + policyUrl=policy.policyUrl, + policyType=policy.policyType, + rules=policy.rules, + ) + created_policy = self.metadata.create_or_update(policy_request) + logger.info(f"Successfully ingested Policy {created_policy.name}") + self.status.records_written(f"Policy: {created_policy.name}") + except (APIError, ValidationError) as err: + logger.error(f"Failed to ingest Policy {policy.name}") + logger.error(err) + self.status.failure(f"Policy: {policy.name}") + def write_lineage(self, add_lineage: AddLineage): try: logger.info(add_lineage) diff --git a/ingestion/src/metadata/ingestion/source/s3.py b/ingestion/src/metadata/ingestion/source/s3.py index 1684f73af9c..2d0239063bb 100644 --- a/ingestion/src/metadata/ingestion/source/s3.py +++ b/ingestion/src/metadata/ingestion/source/s3.py @@ -12,21 +12,28 @@ import logging import os import uuid -from typing import Iterable +from typing import Iterable, List import boto3 from metadata.generated.schema.api.services.createStorageService import ( CreateStorageServiceEntityRequest, ) -from metadata.generated.schema.entity.data.location import Location, LocationType -from metadata.generated.schema.entity.services.storageService import StorageService from metadata.generated.schema.type.entityReference import EntityReference -from metadata.generated.schema.type.storage import StorageServiceType from metadata.ingestion.api.common import ConfigModel, Entity, WorkflowContext from metadata.ingestion.api.source import Source, SourceStatus from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig +from metadata.generated.schema.entity.data.location import Location, LocationType +from metadata.generated.schema.entity.policies.filters import Filters1, Prefix +from metadata.generated.schema.entity.policies.policy import Policy, PolicyType +from metadata.generated.schema.entity.policies.lifecycle.rule import LifecycleRule +from metadata.generated.schema.entity.policies.lifecycle.moveAction import ( + Destination, + LifecycleMoveAction, +) +from metadata.generated.schema.entity.services.storageService import StorageService +from metadata.generated.schema.type.storage import StorageServiceType, S3StorageClass logger: logging.Logger = logging.getLogger(__name__) @@ -71,7 +78,7 @@ class S3Source(Source[Entity]): for bucket in self.s3.buckets.all(): self.status.scanned(bucket) bucket_name = self._get_bucket_name_with_prefix(bucket.name) - yield Location( + location = Location( id=uuid.uuid4(), name=bucket_name, displayName=bucket_name, @@ -82,6 +89,22 @@ class S3Source(Source[Entity]): name=self.service.name, ), ) + yield location + + # Retrieve lifecycle policies for the bucket. + rules: List[LifecycleRule] = [] + for rule in self.s3.BucketLifecycleConfiguration(bucket.name).rules: + rules.append(self._get_rule(rule, location)) + policy_name = f"{bucket.name}-lifecycle-policy" + yield Policy( + id=uuid.uuid4(), + name=policy_name, + displayName=policy_name, + description=policy_name, + policyType=PolicyType.Lifecycle, + rules=rules, + enabled=True, + ) except Exception as e: self.status.failure("error", str(e)) @@ -99,6 +122,39 @@ class S3Source(Source[Entity]): def close(self): pass + def _get_rule(self, rule: dict, location: Location) -> LifecycleRule: + actions = [] + if "Transitions" in rule: + for transition in rule["Transitions"]: + if "StorageClass" in transition and "Days" in transition: + actions.append( + LifecycleMoveAction( + daysAfterCreation=transition["Days"], + destination=Destination( + storageServiceType=self.service, + storageClassType=S3StorageClass( + transition["StorageClass"] + ), + location=location, + ), + ) + ) + + enabled = rule["Status"] == "Enabled" if "Status" in rule else False + + filters = [] + if "Filter" in rule and "Prefix" in rule["Filter"]: + filters.append(Prefix.parse_obj(rule["Filter"]["Prefix"])) + + name = rule["ID"] if "ID" in rule else None + + return LifecycleRule( + actions=actions, + enabled=enabled, + filters=Filters1.parse_obj(filters), + name=name, + ) + def get_storage_service_or_create( service_name: str, metadata_config: MetadataServerConfig