From 397dd0512ff6494a69a5f2902fde75174f687612 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Katarzyna=20Ka=C5=82ek?= <145936017+trina242@users.noreply.github.com> Date: Fri, 14 Mar 2025 06:56:23 +0100 Subject: [PATCH] Fixes #19619 (#19620) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fixed s3 access bug for parquet files * fixed formatting * parsed endpoint_override to str in s3 parquet ingestion --------- Co-authored-by: Katarzyna Kałek --- ingestion/setup.py | 2 - .../src/metadata/readers/dataframe/parquet.py | 38 ++++++++----------- 2 files changed, 15 insertions(+), 25 deletions(-) diff --git a/ingestion/setup.py b/ingestion/setup.py index 8b8a04846d5..80ba6d5636b 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -226,8 +226,6 @@ plugins: Dict[str, Set[str]] = { *COMMONS["datalake"], }, "datalake-s3": { - # vendoring 'boto3' to keep all dependencies aligned (s3fs, boto3, botocore, aiobotocore) - "s3fs[boto3]", *COMMONS["datalake"], }, "deltalake": { diff --git a/ingestion/src/metadata/readers/dataframe/parquet.py b/ingestion/src/metadata/readers/dataframe/parquet.py index b227fbaf1e5..6e7ac92116f 100644 --- a/ingestion/src/metadata/readers/dataframe/parquet.py +++ b/ingestion/src/metadata/readers/dataframe/parquet.py @@ -64,32 +64,24 @@ class ParquetDataFrameReader(DataFrameReader): @_read_parquet_dispatch.register def _(self, _: S3Config, key: str, bucket_name: str) -> DatalakeColumnWrapper: # pylint: disable=import-outside-toplevel - import s3fs + from pyarrow.fs import S3FileSystem from pyarrow.parquet import ParquetDataset - client_kwargs = {} - if self.config_source.securityConfig.endPointURL: - client_kwargs["endpoint_url"] = str( - self.config_source.securityConfig.endPointURL - ) + client_kwargs = { + "endpoint_override": str(self.config_source.securityConfig.endPointURL), + "region": self.config_source.securityConfig.awsRegion, + "access_key": self.config_source.securityConfig.awsAccessKeyId, + "session_token": self.config_source.securityConfig.awsSessionToken, + "role_arn": self.config_source.securityConfig.assumeRoleArn, + "session_name": self.config_source.securityConfig.assumeRoleSessionName, + } + if self.config_source.securityConfig.awsSecretAccessKey: + client_kwargs[ + "secret_key" + ] = self.config_source.securityConfig.awsSecretAccessKey.get_secret_value() + s3_fs = S3FileSystem(**client_kwargs) - if self.config_source.securityConfig.awsRegion: - client_kwargs["region_name"] = self.config_source.securityConfig.awsRegion - - s3_fs = s3fs.S3FileSystem(client_kwargs=client_kwargs) - - if ( - self.config_source.securityConfig.awsAccessKeyId - and self.config_source.securityConfig.awsSecretAccessKey - ): - s3_fs = s3fs.S3FileSystem( - key=self.config_source.securityConfig.awsAccessKeyId, - secret=self.config_source.securityConfig.awsSecretAccessKey.get_secret_value(), - token=self.config_source.securityConfig.awsSessionToken, - client_kwargs=client_kwargs, - ) - - bucket_uri = f"s3://{bucket_name}/{key}" + bucket_uri = f"{bucket_name}/{key}" dataset = ParquetDataset(bucket_uri, filesystem=s3_fs) return dataframe_to_chunks(dataset.read_pandas().to_pandas())