diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/policies/accessControl/rule.json b/catalog-rest-service/src/main/resources/json/schema/entity/policies/accessControl/rule.json index 7d1eb12f386..0ab4f93421e 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/policies/accessControl/rule.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/policies/accessControl/rule.json @@ -10,8 +10,14 @@ "description": "Name that identifies this Rule.", "type": "string" }, - "filters": { - "$ref": "../filters.json#/definitions/filters" + "prefixFilter": { + "$ref": "../filters.json#/definitions/prefix" + }, + "regexFilter": { + "$ref": "../filters.json#/definitions/regex" + }, + "tagsFilter": { + "$ref": "../filters.json#/definitions/tags" }, "actions": { "description": "A set of access control enforcements to take on the entities.", diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/policies/filters.json b/catalog-rest-service/src/main/resources/json/schema/entity/policies/filters.json index 257cd5930bc..3ba16cfc615 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/policies/filters.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/policies/filters.json @@ -11,23 +11,11 @@ "description": "Regex that matches the entity.", "type": "string" }, - "filters": { - "description": "The set of filters that are used to match on entities. A logical AND operation is applied across all filters.", + "tags": { + "description": "Set of tags to match on (OR among all tags).", "type": "array", - "minItems": 1, "items": { - "anyOf": [ - { - "$ref": "#/definitions/prefix" - }, - { - "$ref": "#/definitions/regex" - }, - { - "description": "Entity tags to match on.", - "$ref": "../../type/tagLabel.json" - } - ] + "$ref": "../tags/tagCategory.json#/definitions/tagName" } } } diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/policies/lifecycle/deleteAction.json b/catalog-rest-service/src/main/resources/json/schema/entity/policies/lifecycle/deleteAction.json index 39986b8df75..6bdca50089b 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/policies/lifecycle/deleteAction.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/policies/lifecycle/deleteAction.json @@ -17,17 +17,5 @@ "minimum": 1 } }, - "oneOf": [ - { - "required": [ - "daysAfterCreation" - ] - }, - { - "required": [ - "daysAfterModification" - ] - } - ], "additionalProperties": false } \ No newline at end of file diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/policies/lifecycle/rule.json b/catalog-rest-service/src/main/resources/json/schema/entity/policies/lifecycle/rule.json index 25fddedb63c..8cb2ec31847 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/policies/lifecycle/rule.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/policies/lifecycle/rule.json @@ -10,8 +10,14 @@ "description": "Name that identifies this Rule.", "type": "string" }, - "filters": { - "$ref": "../filters.json#/definitions/filters" + "prefixFilter": { + "$ref": "../filters.json#/definitions/prefix" + }, + "regexFilter": { + "$ref": "../filters.json#/definitions/regex" + }, + "tagsFilter": { + "$ref": "../filters.json#/definitions/tags" }, "actions": { "description": "A set of actions to take on the entities.", diff --git a/ingestion/examples/workflows/s3.json b/ingestion/examples/workflows/s3.json new file mode 100644 index 00000000000..840acef83a1 --- /dev/null +++ b/ingestion/examples/workflows/s3.json @@ -0,0 +1,20 @@ +{ + "source": { + "type": "s3", + "config": { + "service_name": "aws-s3" + } + }, + "sink": { + "type": "metadata-rest", + "config": {} + }, + "metadata_server": { + "type": "metadata-server", + "config": { + "api_endpoint": "http://localhost:8585/api", + "auth_provider_type": "no-auth" + } + } +} + \ No newline at end of file diff --git a/ingestion/src/metadata/ingestion/source/s3.py b/ingestion/src/metadata/ingestion/source/s3.py index 3f716d6af9a..5d10a97ea4d 100644 --- a/ingestion/src/metadata/ingestion/source/s3.py +++ b/ingestion/src/metadata/ingestion/source/s3.py @@ -10,39 +10,38 @@ # limitations under the License. import logging -import os import uuid -from typing import Iterable, List - -import boto3 +from typing import Iterable, List, Union from metadata.generated.schema.api.services.createStorageService import ( CreateStorageServiceEntityRequest, ) from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.api.common import ConfigModel, Entity, WorkflowContext +from metadata.ingestion.api.common import Entity, WorkflowContext from metadata.ingestion.api.source import Source, SourceStatus from metadata.ingestion.models.ometa_policy import OMetaPolicy from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig from metadata.generated.schema.entity.data.location import Location, LocationType -from metadata.generated.schema.entity.policies.filters import Filters1, Prefix +from metadata.generated.schema.entity.policies.filters import Prefix from metadata.generated.schema.entity.policies.policy import Policy, PolicyType from metadata.generated.schema.entity.policies.lifecycle.rule import LifecycleRule +from metadata.generated.schema.entity.policies.lifecycle.deleteAction import ( + LifecycleDeleteAction, +) from metadata.generated.schema.entity.policies.lifecycle.moveAction import ( Destination, LifecycleMoveAction, ) from metadata.generated.schema.entity.services.storageService import StorageService from metadata.generated.schema.type.storage import StorageServiceType, S3StorageClass +from metadata.utils.aws_client import AWSClientConfigModel, AWSClient logger: logging.Logger = logging.getLogger(__name__) -class S3SourceConfig(ConfigModel): +class S3SourceConfig(AWSClientConfigModel): service_name: str - aws_access_key_id: str - aws_secret_access_key: str class S3Source(Source[Entity]): @@ -55,13 +54,11 @@ class S3Source(Source[Entity]): super().__init__(ctx) self.config = config self.metadata_config = metadata_config - os.environ["AWS_ACCESS_KEY_ID"] = self.config.aws_access_key_id - os.environ["AWS_SECRET_ACCESS_KEY"] = self.config.aws_secret_access_key self.status = SourceStatus() self.service = get_storage_service_or_create( config.service_name, metadata_config ) - self.s3 = boto3.resource("s3") + self.s3 = AWSClient(self.config).get_client("s3") @classmethod def create( @@ -76,9 +73,13 @@ class S3Source(Source[Entity]): def next_record(self) -> Iterable[OMetaPolicy]: try: - for bucket in self.s3.buckets.all(): - self.status.scanned(bucket) - location_name = self._get_bucket_name_with_prefix(bucket.name) + buckets_response = self.s3.list_buckets() + if not "Buckets" in buckets_response or not buckets_response["Buckets"]: + return + for bucket in buckets_response["Buckets"]: + bucket_name = bucket["Name"] + self.status.scanned(bucket_name) + location_name = self._get_bucket_name_with_prefix(bucket_name) location_id = uuid.uuid4() location = Location( id=location_id, @@ -94,9 +95,11 @@ class S3Source(Source[Entity]): # Retrieve lifecycle policy and rules for the bucket. rules: List[LifecycleRule] = [] - for rule in self.s3.BucketLifecycleConfiguration(bucket.name).rules: + for rule in self.s3.get_bucket_lifecycle_configuration( + Bucket=bucket_name + )["Rules"]: rules.append(self._get_rule(rule, location)) - policy_name = f"{bucket.name}-lifecycle-policy" + policy_name = f"{bucket_name}-lifecycle-policy" policy = Policy( id=uuid.uuid4(), name=policy_name, @@ -128,7 +131,7 @@ class S3Source(Source[Entity]): pass def _get_rule(self, rule: dict, location: Location) -> LifecycleRule: - actions = [] + actions: List[Union[LifecycleDeleteAction, LifecycleMoveAction]] = [] if "Transitions" in rule: for transition in rule["Transitions"]: if "StorageClass" in transition and "Days" in transition: @@ -144,19 +147,23 @@ class S3Source(Source[Entity]): ), ) ) + if "Expiration" in rule and "Days" in rule["Expiration"]: + actions.append( + LifecycleDeleteAction(daysAfterCreation=rule["Expiration"]["Days"]) + ) enabled = rule["Status"] == "Enabled" if "Status" in rule else False - filters = [] + prefix_filter = None if "Filter" in rule and "Prefix" in rule["Filter"]: - filters.append(Prefix.parse_obj(rule["Filter"]["Prefix"])) + prefix_filter = Prefix.parse_obj(rule["Filter"]["Prefix"]) name = rule["ID"] if "ID" in rule else None return LifecycleRule( actions=actions, enabled=enabled, - filters=Filters1.parse_obj(filters), + prefixFilter=prefix_filter, name=name, ) diff --git a/ingestion/src/metadata/utils/aws_client.py b/ingestion/src/metadata/utils/aws_client.py new file mode 100644 index 00000000000..b99bc7a625d --- /dev/null +++ b/ingestion/src/metadata/utils/aws_client.py @@ -0,0 +1,69 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional, Any + +from boto3 import Session + +from metadata.config.common import ConfigModel + + +class AWSClientConfigModel(ConfigModel): + """ + AWSClientConfigModel holds all config parameters required to instantiate an AWSClient. + """ + + aws_access_key_id: Optional[str] + aws_secret_access_key: Optional[str] + aws_session_token: Optional[str] + endpoint_url: Optional[str] + region_name: Optional[str] + + +class AWSClient: + """ + AWSClient creates a boto3 Session client based on AWSClientConfigModel. + """ + + config: AWSClientConfigModel + + def __init__(self, config: AWSClientConfigModel): + self.config = config + + def _get_session(self) -> Session: + if ( + self.config.aws_access_key_id + and self.config.aws_secret_access_key + and self.config.aws_session_token + ): + return Session( + aws_access_key_id=self.config.aws_access_key_id, + aws_secret_access_key=self.config.aws_secret_access_key, + aws_session_token=self.config.aws_session_token, + region_name=self.config.region_name, + ) + if self.config.aws_access_key_id and self.config.aws_secret_access_key: + return Session( + aws_access_key_id=self.config.aws_access_key_id, + aws_secret_access_key=self.config.aws_secret_access_key, + region_name=self.config.region_name, + ) + if self.config.region_name: + return Session(region_name=self.config.region_name) + return Session() + + def get_client(self, service_name: str) -> Any: + session = self._get_session() + if self.config.endpoint_url is not None: + return session.client( + service_name=service_name, endpoint_url=self.config.endpoint_url + ) + return session.client(service_name=service_name)