from datetime import datetime from typing import List, Tuple from unittest.mock import Mock, call import pytest from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.data_lake_common.data_lake_utils import ContainerWUCreator from datahub.ingestion.source.data_lake_common.path_spec import PathSpec from datahub.ingestion.source.s3.source import ( Folder, S3Source, partitioned_folder_comparator, ) def _get_s3_source(path_spec_: PathSpec) -> S3Source: return S3Source.create( config_dict={ "path_spec": { "include": path_spec_.include, "table_name": path_spec_.table_name, }, }, ctx=PipelineContext(run_id="test-s3"), ) def test_partition_comparator_numeric_folder_name(): folder1 = "3" folder2 = "12" assert partitioned_folder_comparator(folder1, folder2) == -1 def test_partition_multi_level_key(): folder1 = "backup/metadata_aspect_v2/year=2023/month=01" folder2 = "backup/metadata_aspect_v2/year=2023/month=2" assert partitioned_folder_comparator(folder1, folder2) == -1 def test_partition_comparator_numeric_folder_name2(): folder1 = "12" folder2 = "3" assert partitioned_folder_comparator(folder1, folder2) == 1 def test_partition_comparator_string_folder(): folder1 = "myfolderB" folder2 = "myFolderA" assert partitioned_folder_comparator(folder1, folder2) == 1 def test_partition_comparator_string_same_folder(): folder1 = "myFolderA" folder2 = "myFolderA" assert partitioned_folder_comparator(folder1, folder2) == 0 def test_partition_comparator_with_numeric_partition(): folder1 = "year=3" folder2 = "year=12" assert partitioned_folder_comparator(folder1, folder2) == -1 def test_partition_comparator_with_padded_numeric_partition(): folder1 = "year=03" folder2 = "year=12" assert partitioned_folder_comparator(folder1, folder2) == -1 def test_partition_comparator_with_equal_sign_in_name(): folder1 = "month=12" folder2 = "year=0" assert partitioned_folder_comparator(folder1, folder2) == -1 def test_partition_comparator_with_string_partition(): folder1 = "year=year2020" folder2 = "year=year2021" assert partitioned_folder_comparator(folder1, folder2) == -1 def test_path_spec(): path_spec = PathSpec( include="s3://my-bucket/my-folder/year=*/month=*/day=*/*.csv", default_extension="csv", ) path = "s3://my-bucket/my-folder/year=2022/month=10/day=11/my_csv.csv" assert path_spec.allowed(path) def test_path_spec_with_double_star_ending(): path_spec = PathSpec( include="s3://my-bucket/{table}/**", default_extension="csv", allow_double_stars=True, ) path = "s3://my-bucket/my-folder/year=2022/month=10/day=11/my_csv.csv" assert path_spec.allowed(path) vars = path_spec.get_named_vars(path) assert vars assert vars["table"] == "my-folder" @pytest.mark.parametrize( "path_spec,path, expected", [ pytest.param( "s3://my-bucket/{table}/**", "s3://my-bucket/my-folder/year=2022/month=10/day=11/my_csv", [("year", "2022"), ("month", "10"), ("day", "11")], id="autodetect_partitions", ), pytest.param( "s3://my-bucket/{table}/{partition_key[0]}={partition_value[0]}/{partition_key[1]}={partition_value[1]}/{partition_key[2]}={partition_value[2]}/*.csv", "s3://my-bucket/my-folder/year=2022/month=10/day=11/my_csv.csv", [("year", "2022"), ("month", "10"), ("day", "11")], id="partition_key and value set", ), pytest.param( "s3://my-bucket/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/{partition_key[2]}={partition[2]}/*.csv", "s3://my-bucket/my-folder/year=2022/month=10/day=11/my_csv.csv", [("year", "2022"), ("month", "10"), ("day", "11")], id="partition_key and partition set", ), pytest.param( "s3://my-bucket/{table}/{year}/{month}/{day}/*.csv", "s3://my-bucket/my-folder/2022/10/11/my_csv.csv", [("year", "2022"), ("month", "10"), ("day", "11")], id="named partition keys", ), pytest.param( "s3://my-bucket/{table}/{part[0]}/{part[1]}/{part[2]}/*.csv", "s3://my-bucket/my-folder/2022/10/11/my_csv.csv", [("part_0", "2022"), ("part_1", "10"), ("part_2", "11")], id="indexed partition keys", ), pytest.param( "s3://my-bucket/{table}/**", "s3://my-bucket/my-folder/2022/10/11/my_csv.csv", [("partition_0", "2022"), ("partition_1", "10"), ("partition_2", "11")], id="partition autodetect with partition values only", ), pytest.param( "s3://my-bucket/{table}/**", "s3://my-bucket/my-folder/my_csv.csv", None, id="partition autodetect with non partitioned path", ), ], ) def test_path_spec_partition_detection( path_spec: str, path: str, expected: List[Tuple[str, str]] ) -> None: ps = PathSpec(include=path_spec, default_extension="csv", allow_double_stars=True) assert ps.allowed(path) partitions = ps.get_partition_from_path(path) assert partitions == expected def test_path_spec_dir_allowed(): path_spec = PathSpec( include="s3://my-bucket/my-folder/year=*/month=*/day=*/*.csv", exclude=[ "s3://my-bucket/my-folder/year=2022/month=12/day=11", "s3://my-bucket/my-folder/year=2022/month=10/**", ], default_extension="csv", ) path = "s3://my-bucket/my-folder/year=2022/" assert path_spec.dir_allowed(path) is True, f"{path} should be allowed" path = "s3://my-bucket/my-folder/year=2022/month=12/" assert path_spec.dir_allowed(path) is True, f"{path} should be allowed" path = "s3://my-bucket/my-folder/year=2022/month=12/day=11/my_csv.csv" assert path_spec.dir_allowed(path) is False, f"{path} should be denied" path = "s3://my-bucket/my-folder/year=2022/month=12/day=10/" assert path_spec.dir_allowed(path) is True, f"{path} should be allowed" path = "s3://my-bucket/my-folder/year=2022/month=12/day=10/_temporary/" assert path_spec.dir_allowed(path) is False, f"{path} should be denied" path = "s3://my-bucket/my-folder/year=2022/month=10/day=10/" assert path_spec.dir_allowed(path) is False, f"{path} should be denied" def test_container_generation_without_folders(): cwu = ContainerWUCreator("s3", None, "PROD") mcps = cwu.create_container_hierarchy( "s3://my-bucket/my-file.json.gz", "urn:li:dataset:123" ) def container_properties_filter(x: MetadataWorkUnit) -> bool: assert isinstance(x.metadata, MetadataChangeProposalWrapper) return x.metadata.aspectName == "containerProperties" container_properties: List = list(filter(container_properties_filter, mcps)) assert len(container_properties) == 1 assert container_properties[0].metadata.aspect.customProperties == { "bucket_name": "my-bucket", "env": "PROD", "platform": "s3", } def test_container_generation_with_folder(): cwu = ContainerWUCreator("s3", None, "PROD") mcps = cwu.create_container_hierarchy( "s3://my-bucket/my-dir/my-file.json.gz", "urn:li:dataset:123" ) def container_properties_filter(x: MetadataWorkUnit) -> bool: assert isinstance(x.metadata, MetadataChangeProposalWrapper) return x.metadata.aspectName == "containerProperties" container_properties: List = list(filter(container_properties_filter, mcps)) assert len(container_properties) == 2 assert container_properties[0].metadata.aspect.customProperties == { "bucket_name": "my-bucket", "env": "PROD", "platform": "s3", } assert container_properties[1].metadata.aspect.customProperties == { "env": "PROD", "folder_abs_path": "my-bucket/my-dir", "platform": "s3", } def test_container_generation_with_multiple_folders(): cwu = ContainerWUCreator("s3", None, "PROD") mcps = cwu.create_container_hierarchy( "s3://my-bucket/my-dir/my-dir2/my-file.json.gz", "urn:li:dataset:123" ) def container_properties_filter(x: MetadataWorkUnit) -> bool: assert isinstance(x.metadata, MetadataChangeProposalWrapper) return x.metadata.aspectName == "containerProperties" container_properties: List = list(filter(container_properties_filter, mcps)) assert len(container_properties) == 3 assert container_properties[0].metadata.aspect.customProperties == { "bucket_name": "my-bucket", "env": "PROD", "platform": "s3", } assert container_properties[1].metadata.aspect.customProperties == { "env": "PROD", "folder_abs_path": "my-bucket/my-dir", "platform": "s3", } assert container_properties[2].metadata.aspect.customProperties == { "env": "PROD", "folder_abs_path": "my-bucket/my-dir/my-dir2", "platform": "s3", } def test_get_folder_info_returns_latest_file_in_each_folder() -> None: """ Test S3Source.get_folder_info returns the latest file in each folder """ # arrange path_spec = PathSpec( include="s3://my-bucket/{table}/{partition0}/*.csv", table_name="{table}", ) bucket = Mock() bucket.objects.filter().page_size = Mock( return_value=[ Mock( bucket_name="my-bucket", key="my-folder/dir1/0001.csv", creation_time=datetime(2025, 1, 1, 1), last_modified=datetime(2025, 1, 1, 1), size=100, ), Mock( bucket_name="my-bucket", key="my-folder/dir2/0001.csv", creation_time=datetime(2025, 1, 1, 2), last_modified=datetime(2025, 1, 1, 2), size=100, ), Mock( bucket_name="my-bucket", key="my-folder/dir1/0002.csv", creation_time=datetime(2025, 1, 1, 2), last_modified=datetime(2025, 1, 1, 2), size=100, ), ] ) # act res = _get_s3_source(path_spec).get_folder_info( path_spec, bucket, prefix="/my-folder" ) res = list(res) # assert assert len(res) == 2 assert res[0].sample_file == "s3://my-bucket/my-folder/dir1/0002.csv" assert res[1].sample_file == "s3://my-bucket/my-folder/dir2/0001.csv" def test_get_folder_info_ignores_disallowed_path( caplog: pytest.LogCaptureFixture, ) -> None: """ Test S3Source.get_folder_info skips disallowed files and logs a message """ # arrange path_spec = Mock( spec=PathSpec, include="s3://my-bucket/{table}/{partition0}/*.csv", table_name="{table}", ) path_spec.allowed = Mock(return_value=False) bucket = Mock() bucket.objects.filter().page_size = Mock( return_value=[ Mock( bucket_name="my-bucket", key="my-folder/ignore/this/path/0001.csv", creation_time=datetime(2025, 1, 1, 1), last_modified=datetime(2025, 1, 1, 1), size=100, ), ] ) s3_source = _get_s3_source(path_spec) # act res = s3_source.get_folder_info(path_spec, bucket, prefix="/my-folder") res = list(res) # assert expected_called_s3_uri = "s3://my-bucket/my-folder/ignore/this/path/0001.csv" assert path_spec.allowed.call_args_list == [call(expected_called_s3_uri)], ( "File should be checked if it's allowed" ) assert f"File {expected_called_s3_uri} not allowed and skipping" in caplog.text, ( "Dropped file should be logged" ) assert s3_source.get_report().filtered == [expected_called_s3_uri], ( "Dropped file should be in the report.filtered" ) assert res == [], "Dropped file should not be in the result" def test_get_folder_info_returns_expected_folder() -> None: # arrange path_spec = PathSpec( include="s3://my-bucket/{table}/{partition0}/*.csv", table_name="{table}", ) bucket = Mock() bucket.objects.filter().page_size = Mock( return_value=[ Mock( bucket_name="my-bucket", key="my-folder/dir1/0001.csv", creation_time=datetime(2025, 1, 1, 1), last_modified=datetime(2025, 1, 1, 1), size=100, ), Mock( bucket_name="my-bucket", key="my-folder/dir1/0002.csv", creation_time=datetime(2025, 1, 1, 2), last_modified=datetime(2025, 1, 1, 2), size=50, ), ] ) # act res = _get_s3_source(path_spec).get_folder_info( path_spec, bucket, prefix="/my-folder" ) res = list(res) # assert assert len(res) == 1 assert res[0] == Folder( partition_id=[("partition0", "dir1")], is_partition=True, creation_time=datetime(2025, 1, 1, 1), modification_time=datetime(2025, 1, 1, 2), size=150, sample_file="s3://my-bucket/my-folder/dir1/0002.csv", )