# Copyright 2021 Collate # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Module containing AWS Client """ from typing import Any import boto3 from boto3 import Session from metadata.clients.connection_clients import ( DynamoClient, GlueDBClient, GluePipelineClient, KinesisClient, ) from metadata.generated.schema.security.credentials.awsCredentials import AWSCredentials from metadata.utils.logger import utils_logger logger = utils_logger() class AWSClient: """ AWSClient creates a boto3 Session client based on AWSCredentials. """ config: AWSCredentials def __init__(self, config: AWSCredentials): self.config = config def _get_session(self) -> Session: if ( self.config.awsAccessKeyId and self.config.awsSecretAccessKey and self.config.awsSessionToken ): return Session( aws_access_key_id=self.config.awsAccessKeyId, aws_secret_access_key=self.config.awsSecretAccessKey.get_secret_value(), aws_session_token=self.config.awsSessionToken, region_name=self.config.awsRegion, ) if self.config.awsAccessKeyId and self.config.awsSecretAccessKey: return Session( aws_access_key_id=self.config.awsAccessKeyId, aws_secret_access_key=self.config.awsSecretAccessKey.get_secret_value(), region_name=self.config.awsRegion, ) if self.config.awsRegion: return Session(region_name=self.config.awsRegion) return Session() def get_client(self, service_name: str) -> Any: # initialize the client depending on the AWSCredentials passed if self.config is not None: logger.info(f"Getting AWS client for service [{service_name}]") session = self._get_session() if self.config.endPointURL is not None: return session.client( service_name=service_name, endpoint_url=self.config.endPointURL ) return session.client(service_name=service_name) logger.info(f"Getting AWS default client for service [{service_name}]") # initialized with the credentials loaded from running machine return boto3.client(service_name=service_name) def get_resource(self, service_name: str) -> Any: session = self._get_session() if self.config.endPointURL is not None: return session.resource( service_name=service_name, endpoint_url=self.config.endPointURL ) return session.resource(service_name=service_name) def get_dynomo_client(self) -> DynamoClient: return DynamoClient(self.get_resource("dynamodb")) def get_glue_db_client(self) -> GlueDBClient: return GlueDBClient(self.get_client("glue")) def get_glue_pipeline_client(self) -> GluePipelineClient: return GluePipelineClient(self.get_client("glue")) def get_kinesis_client(self) -> KinesisClient: return KinesisClient(self.get_client("kinesis"))