* fixed s3 access bug for parquet files

* fixed formatting

* parsed endpoint_override to str in s3 parquet ingestion

---------

Co-authored-by: Katarzyna Kałek <kkalek@olx.pl>
This commit is contained in:
Katarzyna Kałek 2025-03-14 06:56:23 +01:00 committed by GitHub
parent 168e29423a
commit 397dd0512f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 15 additions and 25 deletions

View File

@ -226,8 +226,6 @@ plugins: Dict[str, Set[str]] = {
*COMMONS["datalake"],
},
"datalake-s3": {
# vendoring 'boto3' to keep all dependencies aligned (s3fs, boto3, botocore, aiobotocore)
"s3fs[boto3]",
*COMMONS["datalake"],
},
"deltalake": {

View File

@ -64,32 +64,24 @@ class ParquetDataFrameReader(DataFrameReader):
@_read_parquet_dispatch.register
def _(self, _: S3Config, key: str, bucket_name: str) -> DatalakeColumnWrapper:
# pylint: disable=import-outside-toplevel
import s3fs
from pyarrow.fs import S3FileSystem
from pyarrow.parquet import ParquetDataset
client_kwargs = {}
if self.config_source.securityConfig.endPointURL:
client_kwargs["endpoint_url"] = str(
self.config_source.securityConfig.endPointURL
)
client_kwargs = {
"endpoint_override": str(self.config_source.securityConfig.endPointURL),
"region": self.config_source.securityConfig.awsRegion,
"access_key": self.config_source.securityConfig.awsAccessKeyId,
"session_token": self.config_source.securityConfig.awsSessionToken,
"role_arn": self.config_source.securityConfig.assumeRoleArn,
"session_name": self.config_source.securityConfig.assumeRoleSessionName,
}
if self.config_source.securityConfig.awsSecretAccessKey:
client_kwargs[
"secret_key"
] = self.config_source.securityConfig.awsSecretAccessKey.get_secret_value()
s3_fs = S3FileSystem(**client_kwargs)
if self.config_source.securityConfig.awsRegion:
client_kwargs["region_name"] = self.config_source.securityConfig.awsRegion
s3_fs = s3fs.S3FileSystem(client_kwargs=client_kwargs)
if (
self.config_source.securityConfig.awsAccessKeyId
and self.config_source.securityConfig.awsSecretAccessKey
):
s3_fs = s3fs.S3FileSystem(
key=self.config_source.securityConfig.awsAccessKeyId,
secret=self.config_source.securityConfig.awsSecretAccessKey.get_secret_value(),
token=self.config_source.securityConfig.awsSessionToken,
client_kwargs=client_kwargs,
)
bucket_uri = f"s3://{bucket_name}/{key}"
bucket_uri = f"{bucket_name}/{key}"
dataset = ParquetDataset(bucket_uri, filesystem=s3_fs)
return dataframe_to_chunks(dataset.read_pandas().to_pandas())