diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/aws_common.py b/metadata-ingestion/src/datahub/ingestion/source/aws/aws_common.py index d61975694f..0fb211a5d7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/aws_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/aws_common.py @@ -1,8 +1,8 @@ -from typing import TYPE_CHECKING, Dict, List, Optional, Union +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union import boto3 from boto3.session import Session -from botocore.config import Config +from botocore.config import DEFAULT_TIMEOUT, Config from botocore.utils import fix_s3_host from pydantic.fields import Field @@ -104,6 +104,16 @@ class AwsConnectionConfig(ConfigModel): description="A set of proxy configs to use with AWS. See the [botocore.config](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html) docs for details.", ) + read_timeout: float = Field( + default=DEFAULT_TIMEOUT, + description="The timeout for reading from the connection (in seconds).", + ) + + aws_advanced_config: Dict[str, Any] = Field( + default_factory=dict, + description="Advanced AWS configuration options. These are passed directly to [botocore.config.Config](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html).", + ) + def _normalized_aws_roles(self) -> List[AwsAssumeRoleConfig]: if not self.aws_role: return [] @@ -167,13 +177,20 @@ class AwsConnectionConfig(ConfigModel): } return {} + def _aws_config(self) -> Config: + return Config( + proxies=self.aws_proxy, + read_timeout=self.read_timeout, + **self.aws_advanced_config, + ) + def get_s3_client( self, verify_ssl: Optional[Union[bool, str]] = None ) -> "S3Client": return self.get_session().client( "s3", endpoint_url=self.aws_endpoint_url, - config=Config(proxies=self.aws_proxy), + config=self._aws_config(), verify=verify_ssl, ) @@ -183,7 +200,7 @@ class AwsConnectionConfig(ConfigModel): resource = self.get_session().resource( "s3", endpoint_url=self.aws_endpoint_url, - config=Config(proxies=self.aws_proxy), + config=self._aws_config(), verify=verify_ssl, ) # according to: https://stackoverflow.com/questions/32618216/override-s3-endpoint-using-boto3-configuration-file @@ -195,10 +212,10 @@ class AwsConnectionConfig(ConfigModel): return resource def get_glue_client(self) -> "GlueClient": - return self.get_session().client("glue") + return self.get_session().client("glue", config=self._aws_config()) def get_sagemaker_client(self) -> "SageMakerClient": - return self.get_session().client("sagemaker") + return self.get_session().client("sagemaker", config=self._aws_config()) class AwsSourceConfig(EnvConfigMixin, AwsConnectionConfig):