diff --git a/ingestion/setup.py b/ingestion/setup.py index 8b8a04846d5..80ba6d5636b 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -226,8 +226,6 @@ plugins: Dict[str, Set[str]] = { *COMMONS["datalake"], }, "datalake-s3": { - # vendoring 'boto3' to keep all dependencies aligned (s3fs, boto3, botocore, aiobotocore) - "s3fs[boto3]", *COMMONS["datalake"], }, "deltalake": { diff --git a/ingestion/src/metadata/readers/dataframe/parquet.py b/ingestion/src/metadata/readers/dataframe/parquet.py index b227fbaf1e5..6e7ac92116f 100644 --- a/ingestion/src/metadata/readers/dataframe/parquet.py +++ b/ingestion/src/metadata/readers/dataframe/parquet.py @@ -64,32 +64,24 @@ class ParquetDataFrameReader(DataFrameReader): @_read_parquet_dispatch.register def _(self, _: S3Config, key: str, bucket_name: str) -> DatalakeColumnWrapper: # pylint: disable=import-outside-toplevel - import s3fs + from pyarrow.fs import S3FileSystem from pyarrow.parquet import ParquetDataset - client_kwargs = {} - if self.config_source.securityConfig.endPointURL: - client_kwargs["endpoint_url"] = str( - self.config_source.securityConfig.endPointURL - ) + client_kwargs = { + "endpoint_override": str(self.config_source.securityConfig.endPointURL), + "region": self.config_source.securityConfig.awsRegion, + "access_key": self.config_source.securityConfig.awsAccessKeyId, + "session_token": self.config_source.securityConfig.awsSessionToken, + "role_arn": self.config_source.securityConfig.assumeRoleArn, + "session_name": self.config_source.securityConfig.assumeRoleSessionName, + } + if self.config_source.securityConfig.awsSecretAccessKey: + client_kwargs[ + "secret_key" + ] = self.config_source.securityConfig.awsSecretAccessKey.get_secret_value() + s3_fs = S3FileSystem(**client_kwargs) - if self.config_source.securityConfig.awsRegion: - client_kwargs["region_name"] = self.config_source.securityConfig.awsRegion - - s3_fs = s3fs.S3FileSystem(client_kwargs=client_kwargs) - - if ( - self.config_source.securityConfig.awsAccessKeyId - and self.config_source.securityConfig.awsSecretAccessKey - ): - s3_fs = s3fs.S3FileSystem( - key=self.config_source.securityConfig.awsAccessKeyId, - secret=self.config_source.securityConfig.awsSecretAccessKey.get_secret_value(), - token=self.config_source.securityConfig.awsSessionToken, - client_kwargs=client_kwargs, - ) - - bucket_uri = f"s3://{bucket_name}/{key}" + bucket_uri = f"{bucket_name}/{key}" dataset = ParquetDataset(bucket_uri, filesystem=s3_fs) return dataframe_to_chunks(dataset.read_pandas().to_pandas())