Implement s3 bucket policy ingestion (#1570)

- Fix broken json schema policies (minItems in array instead of minLength)
- Amend s3 ingestion to create policies
- Amend PolicyResource and PolicyRepository to support Policy Rules
This commit is contained in:
Matt 2021-12-10 07:19:43 -08:00 committed by GitHub
parent 7b637d4628
commit 3831bb9e4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 167 additions and 44 deletions

View File

@ -36,9 +36,9 @@ import java.util.UUID;
@Slf4j
public class PolicyRepository extends EntityRepository<Policy> {
private static final Fields POLICY_UPDATE_FIELDS = new Fields(PolicyResource.FIELD_LIST,
"displayName,description,owner,policyUrl,enabled");
"displayName,description,owner,policyUrl,enabled,rules");
private static final Fields POLICY_PATCH_FIELDS = new Fields(PolicyResource.FIELD_LIST,
"displayName,description,owner,policyUrl,enabled");
"displayName,description,owner,policyUrl,enabled,rules");
private final CollectionDAO dao;
public PolicyRepository(CollectionDAO dao) {
@ -72,6 +72,7 @@ public class PolicyRepository extends EntityRepository<Policy> {
policy.setOwner(fields.contains("owner") ? getOwner(policy) : null);
policy.setPolicyUrl(fields.contains("policyUrl") ? policy.getPolicyUrl() : null);
policy.setEnabled(fields.contains("enabled") ? policy.getEnabled() : null);
policy.setRules(fields.contains("rules") ? policy.getRules() : null);
return policy;
}
@ -171,6 +172,10 @@ public class PolicyRepository extends EntityRepository<Policy> {
return null;
}
public List<Object> getRules() {
return entity.getRules();
}
@Override
public Double getVersion() {
return entity.getVersion();
@ -245,6 +250,10 @@ public class PolicyRepository extends EntityRepository<Policy> {
entity.setOwner(owner);
}
public void setRules(List<Object> rules) {
entity.setRules(rules);
}
@Override
public Policy withHref(URI href) { return entity.withHref(href); }
@ -266,6 +275,7 @@ public class PolicyRepository extends EntityRepository<Policy> {
public void entitySpecificUpdate() throws IOException {
recordChange("policyUrl", original.getEntity().getPolicyUrl(), updated.getEntity().getPolicyUrl());
recordChange("enabled", original.getEntity().getEnabled(), updated.getEntity().getEnabled());
recordChange("rules", original.getEntity().getRules(), updated.getEntity().getRules());
}
}
}

View File

@ -109,7 +109,7 @@ public class PolicyResource {
}
}
static final String FIELDS = "displayName,description,owner,policyUrl,enabled";
static final String FIELDS = "displayName,description,owner,policyUrl,enabled,rules";
public static final List<String> FIELD_LIST = Arrays.asList(FIELDS.replaceAll(" ", "")
.split(","));
@ -254,7 +254,9 @@ public class PolicyResource {
.withPolicyUrl(create.getPolicyUrl())
.withPolicyType(create.getPolicyType())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());
.withUpdatedAt(new Date())
.withRules(create.getRules());
policy = addHref(uriInfo, dao.create(uriInfo, policy));
return Response.created(policy.getHref()).entity(policy).build();
}
@ -306,7 +308,8 @@ public class PolicyResource {
.withPolicyUrl(create.getPolicyUrl())
.withPolicyType(create.getPolicyType())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());
.withUpdatedAt(new Date())
.withRules(create.getRules());
PutResponse<Policy> response = dao.createOrUpdate(uriInfo, policy);
addHref(uriInfo, response.getEntity());

View File

@ -28,6 +28,9 @@
},
"policyType": {
"$ref": "../../entity/policies/policy.json#/definitions/policyType"
},
"rules": {
"$ref": "../../entity/policies/policy.json#/definitions/rules"
}
},
"required": [

View File

@ -6,13 +6,17 @@
"type": "object",
"javaType": "org.openmetadata.catalog.entity.policies.accessControl.Rule",
"properties": {
"name": {
"description": "Name that identifies this Rule.",
"type": "string"
},
"filters": {
"$ref": "../filters.json#/definitions/filters"
},
"actions": {
"description": "A set of access control enforcements to take on the entities.",
"type": "array",
"minLength": 1,
"minItems": 1,
"items": {
"anyOf": [
{
@ -20,6 +24,11 @@
}
]
}
},
"enabled": {
"description": "Is the rule enabled.",
"type": "boolean",
"default": true
}
},
"required": [

View File

@ -9,7 +9,7 @@
"tags": {
"description": "Tags that are associated with the entities.",
"type": "array",
"minLength": 1,
"minItems": 1,
"items": [
{
"$ref": "../../../type/tagLabel.json"
@ -19,7 +19,7 @@
"allow": {
"description": "Teams and Users who are able to access the tagged entities.",
"type": "array",
"minLength": 1,
"minItems": 1,
"items": {
"anyOf": [
{

View File

@ -3,15 +3,25 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Filters",
"definitions": {
"prefix": {
"description": "Prefix path of the entity.",
"type": "string"
},
"regex": {
"description": "Regex that matches the entity.",
"type": "string"
},
"filters": {
"description": "The set of filters that are used to match on entities. A logical AND operation is applied across all filters.",
"type": "array",
"minLength": 1,
"minItems": 1,
"items": {
"anyOf": [
{
"description": "Regex that matches the entity FQN.",
"type": "string"
"$ref": "#/definitions/prefix"
},
{
"$ref": "#/definitions/regex"
},
{
"description": "Entity tags to match on.",

View File

@ -35,19 +35,5 @@
}
}
},
"oneOf": [
{
"required": [
"daysAfterCreation",
"destination"
]
},
{
"required": [
"daysAfterModification",
"destination"
]
}
],
"additionalProperties": false
}

View File

@ -6,13 +6,17 @@
"type": "object",
"javaType": "org.openmetadata.catalog.entity.policies.lifecycle.Rule",
"properties": {
"name": {
"description": "Name that identifies this Rule.",
"type": "string"
},
"filters": {
"$ref": "../filters.json#/definitions/filters"
},
"actions": {
"description": "A set of actions to take on the entities.",
"type": "array",
"minLength": 1,
"minItems": 1,
"items": {
"anyOf": [
{
@ -23,6 +27,11 @@
}
]
}
},
"enabled": {
"description": "Is the rule enabled.",
"type": "boolean",
"default": true
}
},
"required": [

View File

@ -27,6 +27,20 @@
"name": "Lifecycle"
}
]
},
"rules": {
"description": "A set of rules associated with the Policy.",
"type": "array",
"items": {
"anyOf": [
{
"$ref": "accessControl/rule.json"
},
{
"$ref": "lifecycle/rule.json"
}
]
}
}
},
"properties": {
@ -92,19 +106,7 @@
"$ref": "../../type/entityHistory.json#/definitions/changeDescription"
},
"rules": {
"description": "A set of rules associated with this Policy.",
"type": "array",
"minLength": 1,
"items": {
"anyOf": [
{
"$ref": "accessControl/rule.json"
},
{
"$ref": "lifecycle/rule.json"
}
]
}
"$ref": "#/definitions/rules"
}
},
"required": [

View File

@ -25,6 +25,7 @@ from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.report import Report
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.topic import Topic
from metadata.generated.schema.entity.policies.policy import Policy
from metadata.generated.schema.entity.services.dashboardService import DashboardService
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.messagingService import MessagingService
@ -90,6 +91,7 @@ class OpenMetadata(OMetaLineageMixin, OMetaTableMixin, Generic[T, C]):
entity_path = "entity"
api_path = "api"
data_path = "data"
policies_path = "policies"
services_path = "services"
teams_path = "teams"
@ -160,6 +162,11 @@ class OpenMetadata(OMetaLineageMixin, OMetaTableMixin, Generic[T, C]):
):
return "/locations"
if issubclass(
entity, get_args(Union[Policy, self.get_create_entity_type(Policy)])
):
return "/policies"
if issubclass(
entity, get_args(Union[Table, self.get_create_entity_type(Table)])
):
@ -251,6 +258,9 @@ class OpenMetadata(OMetaLineageMixin, OMetaTableMixin, Generic[T, C]):
it is found inside generated
"""
if "policy" in entity.__name__.lower():
return self.policies_path
if "service" in entity.__name__.lower():
return self.services_path

View File

@ -32,6 +32,9 @@ from metadata.generated.schema.api.data.createPipeline import (
)
from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest
from metadata.generated.schema.api.data.createTopic import CreateTopicEntityRequest
from metadata.generated.schema.api.policies.createPolicy import (
CreatePolicyEntityRequest,
)
from metadata.generated.schema.api.lineage.addLineage import AddLineage
from metadata.generated.schema.api.teams.createTeam import CreateTeamEntityRequest
from metadata.generated.schema.api.teams.createUser import CreateUserEntityRequest
@ -39,6 +42,7 @@ from metadata.generated.schema.entity.data.chart import ChartType
from metadata.generated.schema.entity.data.location import Location
from metadata.generated.schema.entity.data.mlmodel import MlModel
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.policies.policy import Policy
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity, WorkflowContext
@ -115,6 +119,8 @@ class MetadataRestSink(Sink[Entity]):
self.write_dashboards(record)
elif isinstance(record, Location):
self.write_locations(record)
elif isinstance(record, Policy):
self.write_policies(record)
elif isinstance(record, Pipeline):
self.write_pipelines(record)
elif isinstance(record, AddLineage):
@ -309,6 +315,25 @@ class MetadataRestSink(Sink[Entity]):
logger.error(err)
self.status.failure(f"Pipeline: {pipeline.name}")
def write_policies(self, policy: Policy):
try:
policy_request = CreatePolicyEntityRequest(
name=policy.name,
displayName=policy.displayName,
description=policy.description,
owner=policy.owner,
policyUrl=policy.policyUrl,
policyType=policy.policyType,
rules=policy.rules,
)
created_policy = self.metadata.create_or_update(policy_request)
logger.info(f"Successfully ingested Policy {created_policy.name}")
self.status.records_written(f"Policy: {created_policy.name}")
except (APIError, ValidationError) as err:
logger.error(f"Failed to ingest Policy {policy.name}")
logger.error(err)
self.status.failure(f"Policy: {policy.name}")
def write_lineage(self, add_lineage: AddLineage):
try:
logger.info(add_lineage)

View File

@ -12,21 +12,28 @@
import logging
import os
import uuid
from typing import Iterable
from typing import Iterable, List
import boto3
from metadata.generated.schema.api.services.createStorageService import (
CreateStorageServiceEntityRequest,
)
from metadata.generated.schema.entity.data.location import Location, LocationType
from metadata.generated.schema.entity.services.storageService import StorageService
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.storage import StorageServiceType
from metadata.ingestion.api.common import ConfigModel, Entity, WorkflowContext
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.generated.schema.entity.data.location import Location, LocationType
from metadata.generated.schema.entity.policies.filters import Filters1, Prefix
from metadata.generated.schema.entity.policies.policy import Policy, PolicyType
from metadata.generated.schema.entity.policies.lifecycle.rule import LifecycleRule
from metadata.generated.schema.entity.policies.lifecycle.moveAction import (
Destination,
LifecycleMoveAction,
)
from metadata.generated.schema.entity.services.storageService import StorageService
from metadata.generated.schema.type.storage import StorageServiceType, S3StorageClass
logger: logging.Logger = logging.getLogger(__name__)
@ -71,7 +78,7 @@ class S3Source(Source[Entity]):
for bucket in self.s3.buckets.all():
self.status.scanned(bucket)
bucket_name = self._get_bucket_name_with_prefix(bucket.name)
yield Location(
location = Location(
id=uuid.uuid4(),
name=bucket_name,
displayName=bucket_name,
@ -82,6 +89,22 @@ class S3Source(Source[Entity]):
name=self.service.name,
),
)
yield location
# Retrieve lifecycle policies for the bucket.
rules: List[LifecycleRule] = []
for rule in self.s3.BucketLifecycleConfiguration(bucket.name).rules:
rules.append(self._get_rule(rule, location))
policy_name = f"{bucket.name}-lifecycle-policy"
yield Policy(
id=uuid.uuid4(),
name=policy_name,
displayName=policy_name,
description=policy_name,
policyType=PolicyType.Lifecycle,
rules=rules,
enabled=True,
)
except Exception as e:
self.status.failure("error", str(e))
@ -99,6 +122,39 @@ class S3Source(Source[Entity]):
def close(self):
pass
def _get_rule(self, rule: dict, location: Location) -> LifecycleRule:
actions = []
if "Transitions" in rule:
for transition in rule["Transitions"]:
if "StorageClass" in transition and "Days" in transition:
actions.append(
LifecycleMoveAction(
daysAfterCreation=transition["Days"],
destination=Destination(
storageServiceType=self.service,
storageClassType=S3StorageClass(
transition["StorageClass"]
),
location=location,
),
)
)
enabled = rule["Status"] == "Enabled" if "Status" in rule else False
filters = []
if "Filter" in rule and "Prefix" in rule["Filter"]:
filters.append(Prefix.parse_obj(rule["Filter"]["Prefix"]))
name = rule["ID"] if "ID" in rule else None
return LifecycleRule(
actions=actions,
enabled=enabled,
filters=Filters1.parse_obj(filters),
name=name,
)
def get_storage_service_or_create(
service_name: str, metadata_config: MetadataServerConfig