ISSUE-16094: fix s3 storage parquet structureFormat ingestion (#18660)

This aims at fixing the s3 ingestion for parquet files, current behaviour is that
the pipeline will break if it encounters a file that is not valid parquet in the
the container, this is not great as containers might container non parquet files
on purpose like for example _SUCCESS files created by spark.

For that do not fail the whole pipeline when a single container fails, instead
count it as a failure and move on with the remainder of the containers, this is
already an improvement by ideally the ingestion should try a couple more files
under the given prefix before given up, additionally we can allow users to specify
file patterns to be ignored.

Co-authored-by: Abdallah Serghine <abdallah.serghine@olx.pl>
Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
This commit is contained in:
Abdallah Serghine 2024-12-14 11:40:23 +01:00 committed by GitHub
parent 9e6078f654
commit ac967dfe50
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -293,15 +293,26 @@ class S3Source(StorageServiceSource):
)
# if we have a sample file to fetch a schema from
if sample_key:
columns = self._get_columns(
container_name=bucket_name,
sample_key=sample_key,
metadata_entry=metadata_entry,
config_source=S3Config(
securityConfig=self.service_connection.awsConfig
),
client=self.s3_client,
)
try:
columns = self._get_columns(
container_name=bucket_name,
sample_key=sample_key,
metadata_entry=metadata_entry,
config_source=S3Config(
securityConfig=self.service_connection.awsConfig
),
client=self.s3_client,
)
except Exception as err:
logger.warning()
self.status.failed(
error=StackTraceError(
name=f"{bucket_name}/{sample_key}",
error=f"Error extracting columns from [{bucket_name}/{sample_key}] due to: [{err}]",
stackTrace=traceback.format_exc(),
)
)
return None
if columns:
prefix = (
f"{KEY_SEPARATOR}{metadata_entry.dataPath.strip(KEY_SEPARATOR)}"
@ -413,7 +424,7 @@ class S3Source(StorageServiceSource):
candidate_keys = [
entry["Key"]
for entry in response[S3_CLIENT_ROOT_RESPONSE]
if entry and entry.get("Key")
if entry and entry.get("Key") and not entry.get("Key").endswith("/")
]
for key in candidate_keys:
if self.is_valid_unstructured_file(metadata_entry.unstructuredFormats, key):
@ -622,7 +633,7 @@ class S3Source(StorageServiceSource):
candidate_keys = [
entry["Key"]
for entry in response[S3_CLIENT_ROOT_RESPONSE]
if entry and entry.get("Key")
if entry and entry.get("Key") and not entry.get("Key").endswith("/")
]
# pick a random key out of the candidates if any were returned
if candidate_keys: