feat(ingest/s3): ignore depth mismatched path (#12326)

This commit is contained in:
Austin SeungJun Park 2025-02-06 04:19:44 +09:00 committed by GitHub
parent 468112b11c
commit 5ed4b5bce9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 105 additions and 18 deletions

View File

@ -866,8 +866,21 @@ class S3Source(StatefulIngestionSourceBase):
Returns:
List[Folder]: A list of Folder objects representing the partitions found.
"""
def _is_allowed_path(path_spec_: PathSpec, s3_uri: str) -> bool:
allowed = path_spec_.allowed(s3_uri)
if not allowed:
logger.debug(f"File {s3_uri} not allowed and skipping")
self.report.report_file_dropped(s3_uri)
return allowed
s3_objects = (
obj
for obj in bucket.objects.filter(Prefix=prefix).page_size(PAGE_SIZE)
if _is_allowed_path(path_spec, f"s3://{obj.bucket_name}/{obj.key}")
)
partitions: List[Folder] = []
s3_objects = bucket.objects.filter(Prefix=prefix).page_size(PAGE_SIZE)
grouped_s3_objects_by_dirname = groupby_unsorted(
s3_objects,
key=lambda obj: obj.key.rsplit("/", 1)[0],
@ -878,10 +891,6 @@ class S3Source(StatefulIngestionSourceBase):
modification_time = None
for item in group:
file_path = self.create_s3_path(item.bucket_name, item.key)
if not path_spec.allowed(file_path):
logger.debug(f"File {file_path} not allowed and skipping")
continue
file_size += item.size
if creation_time is None or item.last_modified < creation_time:
creation_time = item.last_modified

View File

@ -0,0 +1,31 @@
import pytest
from datahub.ingestion.source.data_lake_common.path_spec import PathSpec
@pytest.mark.parametrize(
"include, s3_uri, expected",
[
(
"s3://bucket/{table}/{partition0}/*.csv",
"s3://bucket/table/p1/test.csv",
True,
),
(
"s3://bucket/{table}/{partition0}/*.csv",
"s3://bucket/table/p1/p2/test.csv",
False,
),
],
)
def test_allowed_ignores_depth_mismatch(
include: str, s3_uri: str, expected: bool
) -> None:
# arrange
path_spec = PathSpec(
include=include,
table_name="{table}",
)
# act, assert
assert path_spec.allowed(s3_uri) == expected

View File

@ -1,6 +1,6 @@
from datetime import datetime
from typing import List, Tuple
from unittest.mock import Mock
from unittest.mock import Mock, call
import pytest
@ -12,6 +12,18 @@ from datahub.ingestion.source.data_lake_common.path_spec import PathSpec
from datahub.ingestion.source.s3.source import S3Source, partitioned_folder_comparator
def _get_s3_source(path_spec_: PathSpec) -> S3Source:
return S3Source.create(
config_dict={
"path_spec": {
"include": path_spec_.include,
"table_name": path_spec_.table_name,
},
},
ctx=PipelineContext(run_id="test-s3"),
)
def test_partition_comparator_numeric_folder_name():
folder1 = "3"
folder2 = "12"
@ -249,18 +261,6 @@ def test_get_folder_info():
"""
Test S3Source.get_folder_info returns the latest file in each folder
"""
def _get_s3_source(path_spec_: PathSpec) -> S3Source:
return S3Source.create(
config_dict={
"path_spec": {
"include": path_spec_.include,
"table_name": path_spec_.table_name,
},
},
ctx=PipelineContext(run_id="test-s3"),
)
# arrange
path_spec = PathSpec(
include="s3://my-bucket/{table}/{partition0}/*.csv",
@ -303,3 +303,50 @@ def test_get_folder_info():
assert len(res) == 2
assert res[0].sample_file == "s3://my-bucket/my-folder/dir1/0002.csv"
assert res[1].sample_file == "s3://my-bucket/my-folder/dir2/0001.csv"
def test_get_folder_info_ignores_disallowed_path(
caplog: pytest.LogCaptureFixture,
) -> None:
"""
Test S3Source.get_folder_info skips disallowed files and logs a message
"""
# arrange
path_spec = Mock(
spec=PathSpec,
include="s3://my-bucket/{table}/{partition0}/*.csv",
table_name="{table}",
)
path_spec.allowed = Mock(return_value=False)
bucket = Mock()
bucket.objects.filter().page_size = Mock(
return_value=[
Mock(
bucket_name="my-bucket",
key="my-folder/ignore/this/path/0001.csv",
creation_time=datetime(2025, 1, 1, 1),
last_modified=datetime(2025, 1, 1, 1),
size=100,
),
]
)
s3_source = _get_s3_source(path_spec)
# act
res = s3_source.get_folder_info(path_spec, bucket, prefix="/my-folder")
# assert
expected_called_s3_uri = "s3://my-bucket/my-folder/ignore/this/path/0001.csv"
assert path_spec.allowed.call_args_list == [call(expected_called_s3_uri)], (
"File should be checked if it's allowed"
)
assert f"File {expected_called_s3_uri} not allowed and skipping" in caplog.text, (
"Dropped file should be logged"
)
assert s3_source.get_report().filtered == [expected_called_s3_uri], (
"Dropped file should be in the report.filtered"
)
assert res == [], "Dropped file should not be in the result"