Add support for DeleteAction in S3 Lifecycle Policy (#1916)

- Support Delete Action for S3
- Add Example s3.json
- Create AWSClient util
- Use AWSClient util in S3 ingestion source
- Remove ambiguity in policy filters by removing array with different types
This commit is contained in:
Matt 2021-12-25 16:29:10 -08:00 committed by GitHub
parent 401f764b46
commit 4588121520
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 136 additions and 52 deletions

View File

@ -10,8 +10,14 @@
"description": "Name that identifies this Rule.",
"type": "string"
},
"filters": {
"$ref": "../filters.json#/definitions/filters"
"prefixFilter": {
"$ref": "../filters.json#/definitions/prefix"
},
"regexFilter": {
"$ref": "../filters.json#/definitions/regex"
},
"tagsFilter": {
"$ref": "../filters.json#/definitions/tags"
},
"actions": {
"description": "A set of access control enforcements to take on the entities.",

View File

@ -11,23 +11,11 @@
"description": "Regex that matches the entity.",
"type": "string"
},
"filters": {
"description": "The set of filters that are used to match on entities. A logical AND operation is applied across all filters.",
"tags": {
"description": "Set of tags to match on (OR among all tags).",
"type": "array",
"minItems": 1,
"items": {
"anyOf": [
{
"$ref": "#/definitions/prefix"
},
{
"$ref": "#/definitions/regex"
},
{
"description": "Entity tags to match on.",
"$ref": "../../type/tagLabel.json"
}
]
"$ref": "../tags/tagCategory.json#/definitions/tagName"
}
}
}

View File

@ -17,17 +17,5 @@
"minimum": 1
}
},
"oneOf": [
{
"required": [
"daysAfterCreation"
]
},
{
"required": [
"daysAfterModification"
]
}
],
"additionalProperties": false
}

View File

@ -10,8 +10,14 @@
"description": "Name that identifies this Rule.",
"type": "string"
},
"filters": {
"$ref": "../filters.json#/definitions/filters"
"prefixFilter": {
"$ref": "../filters.json#/definitions/prefix"
},
"regexFilter": {
"$ref": "../filters.json#/definitions/regex"
},
"tagsFilter": {
"$ref": "../filters.json#/definitions/tags"
},
"actions": {
"description": "A set of actions to take on the entities.",

View File

@ -0,0 +1,20 @@
{
"source": {
"type": "s3",
"config": {
"service_name": "aws-s3"
}
},
"sink": {
"type": "metadata-rest",
"config": {}
},
"metadata_server": {
"type": "metadata-server",
"config": {
"api_endpoint": "http://localhost:8585/api",
"auth_provider_type": "no-auth"
}
}
}

View File

@ -10,39 +10,38 @@
# limitations under the License.
import logging
import os
import uuid
from typing import Iterable, List
import boto3
from typing import Iterable, List, Union
from metadata.generated.schema.api.services.createStorageService import (
CreateStorageServiceEntityRequest,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import ConfigModel, Entity, WorkflowContext
from metadata.ingestion.api.common import Entity, WorkflowContext
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.models.ometa_policy import OMetaPolicy
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.generated.schema.entity.data.location import Location, LocationType
from metadata.generated.schema.entity.policies.filters import Filters1, Prefix
from metadata.generated.schema.entity.policies.filters import Prefix
from metadata.generated.schema.entity.policies.policy import Policy, PolicyType
from metadata.generated.schema.entity.policies.lifecycle.rule import LifecycleRule
from metadata.generated.schema.entity.policies.lifecycle.deleteAction import (
LifecycleDeleteAction,
)
from metadata.generated.schema.entity.policies.lifecycle.moveAction import (
Destination,
LifecycleMoveAction,
)
from metadata.generated.schema.entity.services.storageService import StorageService
from metadata.generated.schema.type.storage import StorageServiceType, S3StorageClass
from metadata.utils.aws_client import AWSClientConfigModel, AWSClient
logger: logging.Logger = logging.getLogger(__name__)
class S3SourceConfig(ConfigModel):
class S3SourceConfig(AWSClientConfigModel):
service_name: str
aws_access_key_id: str
aws_secret_access_key: str
class S3Source(Source[Entity]):
@ -55,13 +54,11 @@ class S3Source(Source[Entity]):
super().__init__(ctx)
self.config = config
self.metadata_config = metadata_config
os.environ["AWS_ACCESS_KEY_ID"] = self.config.aws_access_key_id
os.environ["AWS_SECRET_ACCESS_KEY"] = self.config.aws_secret_access_key
self.status = SourceStatus()
self.service = get_storage_service_or_create(
config.service_name, metadata_config
)
self.s3 = boto3.resource("s3")
self.s3 = AWSClient(self.config).get_client("s3")
@classmethod
def create(
@ -76,9 +73,13 @@ class S3Source(Source[Entity]):
def next_record(self) -> Iterable[OMetaPolicy]:
try:
for bucket in self.s3.buckets.all():
self.status.scanned(bucket)
location_name = self._get_bucket_name_with_prefix(bucket.name)
buckets_response = self.s3.list_buckets()
if not "Buckets" in buckets_response or not buckets_response["Buckets"]:
return
for bucket in buckets_response["Buckets"]:
bucket_name = bucket["Name"]
self.status.scanned(bucket_name)
location_name = self._get_bucket_name_with_prefix(bucket_name)
location_id = uuid.uuid4()
location = Location(
id=location_id,
@ -94,9 +95,11 @@ class S3Source(Source[Entity]):
# Retrieve lifecycle policy and rules for the bucket.
rules: List[LifecycleRule] = []
for rule in self.s3.BucketLifecycleConfiguration(bucket.name).rules:
for rule in self.s3.get_bucket_lifecycle_configuration(
Bucket=bucket_name
)["Rules"]:
rules.append(self._get_rule(rule, location))
policy_name = f"{bucket.name}-lifecycle-policy"
policy_name = f"{bucket_name}-lifecycle-policy"
policy = Policy(
id=uuid.uuid4(),
name=policy_name,
@ -128,7 +131,7 @@ class S3Source(Source[Entity]):
pass
def _get_rule(self, rule: dict, location: Location) -> LifecycleRule:
actions = []
actions: List[Union[LifecycleDeleteAction, LifecycleMoveAction]] = []
if "Transitions" in rule:
for transition in rule["Transitions"]:
if "StorageClass" in transition and "Days" in transition:
@ -144,19 +147,23 @@ class S3Source(Source[Entity]):
),
)
)
if "Expiration" in rule and "Days" in rule["Expiration"]:
actions.append(
LifecycleDeleteAction(daysAfterCreation=rule["Expiration"]["Days"])
)
enabled = rule["Status"] == "Enabled" if "Status" in rule else False
filters = []
prefix_filter = None
if "Filter" in rule and "Prefix" in rule["Filter"]:
filters.append(Prefix.parse_obj(rule["Filter"]["Prefix"]))
prefix_filter = Prefix.parse_obj(rule["Filter"]["Prefix"])
name = rule["ID"] if "ID" in rule else None
return LifecycleRule(
actions=actions,
enabled=enabled,
filters=Filters1.parse_obj(filters),
prefixFilter=prefix_filter,
name=name,
)

View File

@ -0,0 +1,69 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional, Any
from boto3 import Session
from metadata.config.common import ConfigModel
class AWSClientConfigModel(ConfigModel):
"""
AWSClientConfigModel holds all config parameters required to instantiate an AWSClient.
"""
aws_access_key_id: Optional[str]
aws_secret_access_key: Optional[str]
aws_session_token: Optional[str]
endpoint_url: Optional[str]
region_name: Optional[str]
class AWSClient:
"""
AWSClient creates a boto3 Session client based on AWSClientConfigModel.
"""
config: AWSClientConfigModel
def __init__(self, config: AWSClientConfigModel):
self.config = config
def _get_session(self) -> Session:
if (
self.config.aws_access_key_id
and self.config.aws_secret_access_key
and self.config.aws_session_token
):
return Session(
aws_access_key_id=self.config.aws_access_key_id,
aws_secret_access_key=self.config.aws_secret_access_key,
aws_session_token=self.config.aws_session_token,
region_name=self.config.region_name,
)
if self.config.aws_access_key_id and self.config.aws_secret_access_key:
return Session(
aws_access_key_id=self.config.aws_access_key_id,
aws_secret_access_key=self.config.aws_secret_access_key,
region_name=self.config.region_name,
)
if self.config.region_name:
return Session(region_name=self.config.region_name)
return Session()
def get_client(self, service_name: str) -> Any:
session = self._get_session()
if self.config.endpoint_url is not None:
return session.client(
service_name=service_name, endpoint_url=self.config.endpoint_url
)
return session.client(service_name=service_name)