530 lines
21 KiB
Python

import json
import logging
import os
from datetime import datetime
from unittest.mock import Mock, call, patch
import moto.s3
import pytest
from boto3.session import Session
from moto import mock_s3
from pydantic import ValidationError
from datahub.ingestion.run.pipeline import Pipeline, PipelineContext
from datahub.ingestion.source.aws.aws_common import AwsConnectionConfig
from datahub.ingestion.source.aws.s3_boto_utils import (
list_folders_path,
list_objects_recursive_path,
)
from datahub.ingestion.source.s3.source import S3Source
from datahub.testing import mce_helpers
logging.getLogger("boto3").setLevel(logging.INFO)
logging.getLogger("botocore").setLevel(logging.INFO)
logging.getLogger("s3transfer").setLevel(logging.INFO)
FROZEN_TIME = "2020-04-14 07:00:00"
FILE_LIST_FOR_VALIDATION = [
"folder_a/folder_aa/folder_aaa/NPS.7.1.package_data_NPS.6.1_ARCN_Lakes_ChemistryData_v1_csv.csv",
"folder_a/folder_aa/folder_aaa/chord_progressions_avro.avro",
"folder_a/folder_aa/folder_aaa/chord_progressions_csv.csv",
"folder_a/folder_aa/folder_aaa/countries_json.json",
"folder_a/folder_aa/folder_aaa/food_parquet.parquet",
"folder_a/folder_aa/folder_aaa/small.csv",
"folder_a/folder_aa/folder_aaa/wa_fn_usec_hr_employee_attrition_csv.csv",
"folder_a/folder_aa/folder_aaa/folder_aaaa/pokemon_abilities_yearwise_2019/month=feb/part1.json",
"folder_a/folder_aa/folder_aaa/folder_aaaa/pokemon_abilities_yearwise_2019/month=feb/part2.json",
"folder_a/folder_aa/folder_aaa/folder_aaaa/pokemon_abilities_yearwise_2019/month=jan/part1.json",
"folder_a/folder_aa/folder_aaa/folder_aaaa/pokemon_abilities_yearwise_2019/month=jan/part2.json",
"folder_a/folder_aa/folder_aaa/folder_aaaa/pokemon_abilities_yearwise_2020/month=feb/part1.json",
"folder_a/folder_aa/folder_aaa/folder_aaaa/pokemon_abilities_yearwise_2020/month=feb/part2.json",
"folder_a/folder_aa/folder_aaa/folder_aaaa/pokemon_abilities_yearwise_2020/month=march/part1.json",
"folder_a/folder_aa/folder_aaa/folder_aaaa/pokemon_abilities_yearwise_2020/month=march/part2.json",
"folder_a/folder_aa/folder_aaa/folder_aaaa/pokemon_abilities_yearwise_2021/month=april/part1.json",
"folder_a/folder_aa/folder_aaa/folder_aaaa/pokemon_abilities_yearwise_2021/month=april/part2.json",
"folder_a/folder_aa/folder_aaa/folder_aaaa/pokemon_abilities_yearwise_2021/month=march/part1.json",
"folder_a/folder_aa/folder_aaa/folder_aaaa/pokemon_abilities_yearwise_2021/month=march/part2.json",
"folder_a/folder_aa/folder_aaa/food_csv/part1.csv",
"folder_a/folder_aa/folder_aaa/food_csv/part2.csv",
"folder_a/folder_aa/folder_aaa/food_csv/part3.csv",
"folder_a/folder_aa/folder_aaa/food_parquet/part1.parquet",
"folder_a/folder_aa/folder_aaa/food_parquet/part2.parquet",
"folder_a/folder_aa/folder_aaa/no_extension/small",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2019/month=feb/part1.json",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2019/month=feb/part2.json",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2019/month=jan/part1.json",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2019/month=jan/part2.json",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2020/month=feb/part1.json",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2020/month=feb/part2.json",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2020/month=march/part1.json",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2020/month=march/part2.json",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2021/month=april/part1.json",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2021/month=april/part2.json",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2021/month=march/part1.json",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2021/month=march/part2.json",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2022/month=jan/part3.json",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2022/month=jan/_temporary/dummy.json",
]
@pytest.fixture(scope="module", autouse=True)
def bucket_names():
return ["my-test-bucket", "my-test-bucket-2"]
@pytest.fixture(scope="module", autouse=True)
def s3():
with mock_s3():
conn = Session(
aws_access_key_id="test",
aws_secret_access_key="test",
region_name="us-east-1",
)
yield conn
@pytest.fixture(scope="module", autouse=True)
def s3_resource(s3):
with mock_s3():
conn = s3.resource("s3")
yield conn
@pytest.fixture(scope="module", autouse=True)
def s3_client(s3):
with mock_s3():
conn = s3.client("s3")
yield conn
def get_descriptive_id(source_tuple):
source_dir, source_file = source_tuple
dir_name = os.path.basename(source_dir)
base_name = source_file.replace(".json", "")
return f"{dir_name}_{base_name}"
@pytest.fixture(scope="module", autouse=True)
def s3_populate(pytestconfig, s3_resource, s3_client, bucket_names):
for bucket_name in bucket_names:
logging.info(f"Populating s3 bucket: {bucket_name}")
s3_resource.create_bucket(Bucket=bucket_name)
bkt = s3_resource.Bucket(bucket_name)
bkt.Tagging().put(Tagging={"TagSet": [{"Key": "foo", "Value": "bar"}]})
test_resources_dir = (
pytestconfig.rootpath / "tests/integration/s3/test_data/local_system/"
)
current_time_sec = datetime.strptime(
FROZEN_TIME, "%Y-%m-%d %H:%M:%S"
).timestamp()
file_list = []
for root, _dirs, files in os.walk(test_resources_dir):
_dirs.sort()
for file in sorted(files):
full_path = os.path.join(root, file)
rel_path = os.path.relpath(full_path, test_resources_dir)
file_list.append(rel_path)
bkt.upload_file(
full_path,
rel_path, # Set content type for `no_extension/small` file to text/csv
ExtraArgs=(
{"ContentType": "text/csv"} if "." not in rel_path else {}
),
)
s3_client.put_object_tagging(
Bucket=bucket_name,
Key=rel_path,
Tagging={"TagSet": [{"Key": "baz", "Value": "bob"}]},
)
key = (
moto.s3.models.s3_backends["123456789012"]["global"]
.buckets[bucket_name]
.keys[rel_path]
)
current_time_sec += 10
key.last_modified = datetime.fromtimestamp(current_time_sec)
# This is used to make sure the list of files are the same in the test as locally
assert file_list == FILE_LIST_FOR_VALIDATION
yield
@pytest.fixture(scope="module", autouse=True)
def touch_local_files(pytestconfig):
test_resources_dir = (
pytestconfig.rootpath / "tests/integration/s3/test_data/local_system/"
)
current_time_sec = datetime.strptime(FROZEN_TIME, "%Y-%m-%d %H:%M:%S").timestamp()
for root, _dirs, files in os.walk(test_resources_dir):
_dirs.sort()
for file in sorted(files):
current_time_sec += 10
full_path = os.path.join(root, file)
os.utime(full_path, times=(current_time_sec, current_time_sec))
SHARED_SOURCE_FILES_PATH = "./tests/integration/s3/sources/shared"
shared_source_files = [
(SHARED_SOURCE_FILES_PATH, p) for p in os.listdir(SHARED_SOURCE_FILES_PATH)
]
S3_SOURCE_FILES_PATH = "./tests/integration/s3/sources/s3"
s3_source_files = [(S3_SOURCE_FILES_PATH, p) for p in os.listdir(S3_SOURCE_FILES_PATH)]
@pytest.mark.integration
@pytest.mark.parametrize(
"source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
)
def test_data_lake_s3_ingest(
pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
):
source_dir, source_file = source_file_tuple
test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
f = open(os.path.join(source_dir, source_file))
source = json.load(f)
config_dict = {}
config_dict["source"] = source
config_dict["sink"] = {
"type": "file",
"config": {
"filename": f"{tmp_path}/{source_file}",
},
}
config_dict["run_id"] = source_file
pipeline = Pipeline.create(config_dict)
pipeline.run()
pipeline.raise_from_status()
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=f"{tmp_path}/{source_file}",
golden_path=f"{test_resources_dir}/golden-files/s3/golden_mces_{source_file}",
ignore_paths=[
r"root\[\d+\]\['aspect'\]\['json'\]\['lastUpdatedTimestamp'\]",
],
)
@pytest.mark.integration
@pytest.mark.parametrize(
"source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
)
def test_data_lake_gcs_ingest(
pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
):
source_dir, source_file = source_file_tuple
test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
f = open(os.path.join(source_dir, source_file))
source = json.load(f)
config_dict = {}
source["type"] = "gcs"
source["config"]["credential"] = {
"hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
"hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
}
for path_spec in source["config"]["path_specs"]:
path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
source["config"].pop("aws_config")
source["config"].pop("profiling", None)
source["config"].pop("sort_schema_fields", None)
source["config"].pop("use_s3_bucket_tags", None)
source["config"].pop("use_s3_content_type", None)
source["config"].pop("use_s3_object_tags", None)
config_dict["source"] = source
config_dict["sink"] = {
"type": "file",
"config": {
"filename": f"{tmp_path}/{source_file}",
},
}
config_dict["run_id"] = source_file
with patch("datahub.ingestion.source.gcs.gcs_source.GCS_ENDPOINT_URL", None):
pipeline = Pipeline.create(config_dict)
pipeline.run()
pipeline.raise_from_status()
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=f"{tmp_path}/{source_file}",
golden_path=f"{test_resources_dir}/golden-files/gcs/golden_mces_{source_file}",
ignore_paths=[
r"root\[\d+\]\['aspect'\]\['json'\]\['lastUpdatedTimestamp'\]",
],
)
@pytest.mark.integration
@pytest.mark.parametrize(
"source_file_tuple", shared_source_files, ids=get_descriptive_id
)
def test_data_lake_local_ingest(
pytestconfig, touch_local_files, source_file_tuple, tmp_path, mock_time
):
source_dir, source_file = source_file_tuple
test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
f = open(os.path.join(source_dir, source_file))
source = json.load(f)
config_dict = {}
for path_spec in source["config"]["path_specs"]:
path_spec["include"] = (
path_spec["include"]
.replace(
"s3://my-test-bucket/", "tests/integration/s3/test_data/local_system/"
)
.replace(
"s3://my-test-bucket-2/", "tests/integration/s3/test_data/local_system/"
)
)
source["config"]["profiling"]["enabled"] = True
source["config"].pop("aws_config")
source["config"].pop("use_s3_bucket_tags", None)
source["config"].pop("use_s3_object_tags", None)
config_dict["source"] = source
config_dict["sink"] = {
"type": "file",
"config": {
"filename": f"{tmp_path}/{source_file}",
},
}
config_dict["run_id"] = source_file
pipeline = Pipeline.create(config_dict)
pipeline.run()
pipeline.raise_from_status()
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=f"{tmp_path}/{source_file}",
golden_path=f"{test_resources_dir}/golden-files/local/golden_mces_{source_file}",
ignore_paths=[
r"root\[\d+\]\['aspect'\]\['json'\]\['lastUpdatedTimestamp'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\[\d+\]\['value'\]\['time'\]",
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['created'\]\['time'\]",
# root[41]['aspect']['json']['fieldProfiles'][0]['sampleValues'][0]
r"root\[\d+\]\['aspect'\]\['json'\]\['fieldProfiles'\]\[\d+\]\['sampleValues'\]",
# "root[0]['proposedSnapshot']['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot']['aspects'][2]['com.linkedin.pegasus2avro.schema.SchemaMetadata']['fields'][4]"
r"root\[\d+\]\['proposedSnapshot'\]\['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot'\]\['aspects'\]\[\d+\]\['com.linkedin.pegasus2avro.schema.SchemaMetadata'\]\['fields'\]",
# "root[0]['proposedSnapshot']['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot']['aspects'][1]['com.linkedin.pegasus2avro.dataset.DatasetProperties']['customProperties']['size_in_bytes']"
r"root\[\d+\]\['proposedSnapshot'\]\['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot'\]\['aspects'\]\[\d+\]\['com.linkedin.pegasus2avro.dataset.DatasetProperties'\]\['customProperties'\]\['size_in_bytes'\]",
],
)
def test_data_lake_incorrect_config_raises_error(tmp_path, mock_time):
ctx = PipelineContext(run_id="test-s3")
# Baseline: valid config
source: dict = {
"path_spec": {"include": "a/b/c/d/{table}.*", "table_name": "{table}"}
}
s3 = S3Source.create(source, ctx)
assert s3.source_config.platform == "file"
# Case 1 : named variable in table name is not present in include
source = {"path_spec": {"include": "a/b/c/d/{table}.*", "table_name": "{table1}"}}
with pytest.raises(ValidationError, match="table_name"):
S3Source.create(source, ctx)
# Case 2 : named variable in exclude is not allowed
source = {
"path_spec": {
"include": "a/b/c/d/{table}/*.*",
"exclude": ["a/b/c/d/a-{exclude}/**"],
},
}
with pytest.raises(ValidationError, match=r"exclude.*named variable"):
S3Source.create(source, ctx)
# Case 3 : unsupported file type not allowed
source = {
"path_spec": {
"include": "a/b/c/d/{table}/*.hd5",
}
}
with pytest.raises(ValidationError, match="file type"):
S3Source.create(source, ctx)
# Case 4 : ** in include not allowed
source = {
"path_spec": {
"include": "a/b/c/d/**/*.*",
},
}
with pytest.raises(ValidationError, match=r"\*\*"):
S3Source.create(source, ctx)
@pytest.mark.parametrize(
"calls_test_tuple",
[
(
"partitions_and_filename_with_prefix",
{
"include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/{table}/year={year}/month={month}/part*.json",
"tables_filter_pattern": {"allow": ["^pokemon_abilities_json$"]},
},
[
call.list_folders_path(
"s3://my-test-bucket/folder_a/folder_aa/folder_aaa/"
),
call.list_folders_path(
s3_uri="s3://my-test-bucket/folder_a/folder_aa/folder_aaa/pokemon_abilities_json/",
startswith="year=",
),
call.list_folders_path(
s3_uri="s3://my-test-bucket/folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2022/",
startswith="month=",
),
call.list_objects_recursive_path(
"s3://my-test-bucket/folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2022/month=jan/",
startswith="part",
),
],
),
(
"filter_specific_partition",
{
"include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/{table}/year=2022/month={month}/*.json",
"tables_filter_pattern": {"allow": ["^pokemon_abilities_json$"]},
},
[
call.list_folders_path(
"s3://my-test-bucket/folder_a/folder_aa/folder_aaa/"
),
call.list_folders_path(
s3_uri="s3://my-test-bucket/folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2022",
startswith="month=",
),
call.list_objects_recursive_path(
"s3://my-test-bucket/folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2022/month=jan/",
startswith="",
),
],
),
(
"partition_autodetection",
{
"include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/{table}/",
"tables_filter_pattern": {"allow": ["^pokemon_abilities_json$"]},
},
[
call.list_folders_path(
"s3://my-test-bucket/folder_a/folder_aa/folder_aaa/"
),
call.list_folders_path(
s3_uri="s3://my-test-bucket/folder_a/folder_aa/folder_aaa/pokemon_abilities_json/",
startswith="",
),
call.list_folders_path(
s3_uri="s3://my-test-bucket/folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2022/",
startswith="",
),
call.list_folders_path(
s3_uri="s3://my-test-bucket/folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2022/month=jan/",
startswith="",
),
call.list_objects_recursive_path(
"s3://my-test-bucket/folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2022/month=jan/",
startswith="",
),
],
),
(
"partitions_traversal_all",
{
"include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/{table}/year={year}/month={month}/*.json",
"tables_filter_pattern": {"allow": ["^pokemon_abilities_json$"]},
"traversal_method": "ALL",
},
[
call.list_folders_path(
"s3://my-test-bucket/folder_a/folder_aa/folder_aaa/"
),
call.list_objects_recursive_path(
"s3://my-test-bucket/folder_a/folder_aa/folder_aaa/pokemon_abilities_json/",
startswith="year=",
),
],
),
(
"filter_specific_partition_traversal_all",
{
"include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/{table}/year=2022/month={month}/part*.json",
"tables_filter_pattern": {"allow": ["^pokemon_abilities_json$"]},
"traversal_method": "ALL",
},
[
call.list_folders_path(
"s3://my-test-bucket/folder_a/folder_aa/folder_aaa/"
),
call.list_objects_recursive_path(
"s3://my-test-bucket/folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2022",
startswith="month=",
),
],
),
],
ids=lambda calls_test_tuple: calls_test_tuple[0],
)
def test_data_lake_s3_calls(s3_populate, calls_test_tuple):
_, path_spec, expected_calls = calls_test_tuple
ctx = PipelineContext(run_id="test-s3")
config = {
"path_specs": [path_spec],
"aws_config": {
"aws_region": "us-east-1",
"aws_access_key_id": "testing",
"aws_secret_access_key": "testing",
},
}
source = S3Source.create(config, ctx)
m = Mock()
m.list_folders_path.side_effect = list_folders_path
m.list_objects_recursive_path.side_effect = list_objects_recursive_path
with (
patch(
"datahub.ingestion.source.s3.source.list_folders_path", m.list_folders_path
),
patch(
"datahub.ingestion.source.s3.source.list_objects_recursive_path",
m.list_objects_recursive_path,
),
):
for _ in source.get_workunits_internal():
pass
# Verify S3 calls. We're checking that we make the minimum necessary calls with
# prefixes when possible to reduce the amount of queries to the S3 API.
calls = []
for c in m.mock_calls:
if isinstance(c.kwargs, dict): # type assertion
c.kwargs.pop("aws_config", None)
if len(c.args) == 3 and isinstance(c.args[2], AwsConnectionConfig):
c = getattr(call, c[0])(*(c.args[:2]), **c.kwargs)
calls.append(c)
assert calls == expected_calls