299 lines
12 KiB
Python
Raw Permalink Normal View History

import json
import logging
import os
from datetime import datetime
import moto.s3
import pytest
from boto3.session import Session
from moto import mock_s3
from pydantic import ValidationError
from datahub.ingestion.run.pipeline import Pipeline, PipelineContext
from datahub.ingestion.source.s3.source import S3Source
from datahub.testing import mce_helpers
FROZEN_TIME = "2020-04-14 07:00:00"
FILE_LIST_FOR_VALIDATION = [
"folder_a/folder_aa/folder_aaa/NPS.7.1.package_data_NPS.6.1_ARCN_Lakes_ChemistryData_v1_csv.csv",
"folder_a/folder_aa/folder_aaa/chord_progressions_avro.avro",
"folder_a/folder_aa/folder_aaa/chord_progressions_csv.csv",
"folder_a/folder_aa/folder_aaa/countries_json.json",
"folder_a/folder_aa/folder_aaa/food_parquet.parquet",
"folder_a/folder_aa/folder_aaa/small.csv",
"folder_a/folder_aa/folder_aaa/wa_fn_usec_hr_employee_attrition_csv.csv",
"folder_a/folder_aa/folder_aaa/folder_aaaa/pokemon_abilities_yearwise_2019/month=feb/part1.json",
"folder_a/folder_aa/folder_aaa/folder_aaaa/pokemon_abilities_yearwise_2019/month=feb/part2.json",
"folder_a/folder_aa/folder_aaa/folder_aaaa/pokemon_abilities_yearwise_2019/month=jan/part1.json",
"folder_a/folder_aa/folder_aaa/folder_aaaa/pokemon_abilities_yearwise_2019/month=jan/part2.json",
"folder_a/folder_aa/folder_aaa/folder_aaaa/pokemon_abilities_yearwise_2020/month=feb/part1.json",
"folder_a/folder_aa/folder_aaa/folder_aaaa/pokemon_abilities_yearwise_2020/month=feb/part2.json",
"folder_a/folder_aa/folder_aaa/folder_aaaa/pokemon_abilities_yearwise_2020/month=march/part1.json",
"folder_a/folder_aa/folder_aaa/folder_aaaa/pokemon_abilities_yearwise_2020/month=march/part2.json",
"folder_a/folder_aa/folder_aaa/folder_aaaa/pokemon_abilities_yearwise_2021/month=april/part1.json",
"folder_a/folder_aa/folder_aaa/folder_aaaa/pokemon_abilities_yearwise_2021/month=april/part2.json",
"folder_a/folder_aa/folder_aaa/folder_aaaa/pokemon_abilities_yearwise_2021/month=march/part1.json",
"folder_a/folder_aa/folder_aaa/folder_aaaa/pokemon_abilities_yearwise_2021/month=march/part2.json",
"folder_a/folder_aa/folder_aaa/food_csv/part1.csv",
"folder_a/folder_aa/folder_aaa/food_csv/part2.csv",
"folder_a/folder_aa/folder_aaa/food_csv/part3.csv",
"folder_a/folder_aa/folder_aaa/food_parquet/part1.parquet",
"folder_a/folder_aa/folder_aaa/food_parquet/part2.parquet",
"folder_a/folder_aa/folder_aaa/no_extension/small",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2019/month=feb/part1.json",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2019/month=feb/part2.json",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2019/month=jan/part1.json",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2019/month=jan/part2.json",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2020/month=feb/part1.json",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2020/month=feb/part2.json",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2020/month=march/part1.json",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2020/month=march/part2.json",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2021/month=april/part1.json",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2021/month=april/part2.json",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2021/month=march/part1.json",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2021/month=march/part2.json",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2022/month=jan/part3.json",
"folder_a/folder_aa/folder_aaa/pokemon_abilities_json/year=2022/month=jan/_temporary/dummy.json",
]
@pytest.fixture(scope="module", autouse=True)
def bucket_names():
return ["my-test-bucket", "my-test-bucket-2"]
@pytest.fixture(scope="module", autouse=True)
def s3():
with mock_s3():
conn = Session(
aws_access_key_id="test",
aws_secret_access_key="test",
region_name="us-east-1",
)
yield conn
@pytest.fixture(scope="module", autouse=True)
def s3_resource(s3):
with mock_s3():
conn = s3.resource("s3")
yield conn
@pytest.fixture(scope="module", autouse=True)
def s3_client(s3):
with mock_s3():
conn = s3.client("s3")
yield conn
@pytest.fixture(scope="module", autouse=True)
def s3_populate(pytestconfig, s3_resource, s3_client, bucket_names):
for bucket_name in bucket_names:
logging.info(f"Populating s3 bucket: {bucket_name}")
s3_resource.create_bucket(Bucket=bucket_name)
bkt = s3_resource.Bucket(bucket_name)
bkt.Tagging().put(Tagging={"TagSet": [{"Key": "foo", "Value": "bar"}]})
test_resources_dir = (
pytestconfig.rootpath / "tests/integration/s3/test_data/local_system/"
)
current_time_sec = datetime.strptime(
FROZEN_TIME, "%Y-%m-%d %H:%M:%S"
).timestamp()
file_list = []
for root, _dirs, files in os.walk(test_resources_dir):
_dirs.sort()
for file in sorted(files):
full_path = os.path.join(root, file)
rel_path = os.path.relpath(full_path, test_resources_dir)
file_list.append(rel_path)
bkt.upload_file(
full_path,
rel_path, # Set content type for `no_extension/small` file to text/csv
ExtraArgs=(
{"ContentType": "text/csv"} if "." not in rel_path else {}
),
)
s3_client.put_object_tagging(
Bucket=bucket_name,
Key=rel_path,
Tagging={"TagSet": [{"Key": "baz", "Value": "bob"}]},
)
key = (
moto.s3.models.s3_backends["123456789012"]["global"]
.buckets[bucket_name]
.keys[rel_path]
)
current_time_sec += 10
key.last_modified = datetime.fromtimestamp(current_time_sec)
# This is used to make sure the list of files are the same in the test as locally
assert file_list == FILE_LIST_FOR_VALIDATION
yield
@pytest.fixture(scope="module", autouse=True)
def touch_local_files(pytestconfig):
test_resources_dir = (
pytestconfig.rootpath / "tests/integration/s3/test_data/local_system/"
)
current_time_sec = datetime.strptime(FROZEN_TIME, "%Y-%m-%d %H:%M:%S").timestamp()
for root, _dirs, files in os.walk(test_resources_dir):
_dirs.sort()
for file in sorted(files):
current_time_sec += 10
full_path = os.path.join(root, file)
os.utime(full_path, times=(current_time_sec, current_time_sec))
SHARED_SOURCE_FILES_PATH = "./tests/integration/s3/sources/shared"
shared_source_files = [
(SHARED_SOURCE_FILES_PATH, p) for p in os.listdir(SHARED_SOURCE_FILES_PATH)
]
S3_SOURCE_FILES_PATH = "./tests/integration/s3/sources/s3"
s3_source_files = [(S3_SOURCE_FILES_PATH, p) for p in os.listdir(S3_SOURCE_FILES_PATH)]
@pytest.mark.integration
@pytest.mark.parametrize("source_file_tuple", shared_source_files + s3_source_files)
def test_data_lake_s3_ingest(
pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
):
source_dir, source_file = source_file_tuple
test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
f = open(os.path.join(source_dir, source_file))
source = json.load(f)
config_dict = {}
config_dict["source"] = source
config_dict["sink"] = {
"type": "file",
"config": {
"filename": f"{tmp_path}/{source_file}",
},
}
config_dict["run_id"] = source_file
pipeline = Pipeline.create(config_dict)
pipeline.run()
pipeline.raise_from_status()
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=f"{tmp_path}/{source_file}",
golden_path=f"{test_resources_dir}/golden-files/s3/golden_mces_{source_file}",
ignore_paths=[
r"root\[\d+\]\['aspect'\]\['json'\]\['lastUpdatedTimestamp'\]",
],
)
@pytest.mark.integration
@pytest.mark.parametrize("source_file_tuple", shared_source_files)
def test_data_lake_local_ingest(
pytestconfig, touch_local_files, source_file_tuple, tmp_path, mock_time
):
source_dir, source_file = source_file_tuple
test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
f = open(os.path.join(source_dir, source_file))
source = json.load(f)
config_dict = {}
for path_spec in source["config"]["path_specs"]:
path_spec["include"] = (
path_spec["include"]
.replace(
"s3://my-test-bucket/", "tests/integration/s3/test_data/local_system/"
)
.replace(
"s3://my-test-bucket-2/", "tests/integration/s3/test_data/local_system/"
)
)
source["config"]["profiling"]["enabled"] = True
source["config"].pop("aws_config")
source["config"].pop("use_s3_bucket_tags", None)
source["config"].pop("use_s3_object_tags", None)
config_dict["source"] = source
config_dict["sink"] = {
"type": "file",
"config": {
"filename": f"{tmp_path}/{source_file}",
},
}
config_dict["run_id"] = source_file
pipeline = Pipeline.create(config_dict)
pipeline.run()
pipeline.raise_from_status()
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=f"{tmp_path}/{source_file}",
golden_path=f"{test_resources_dir}/golden-files/local/golden_mces_{source_file}",
ignore_paths=[
r"root\[\d+\]\['aspect'\]\['json'\]\['lastUpdatedTimestamp'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\[\d+\]\['value'\]\['time'\]",
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['created'\]\['time'\]",
# root[41]['aspect']['json']['fieldProfiles'][0]['sampleValues'][0]
r"root\[\d+\]\['aspect'\]\['json'\]\['fieldProfiles'\]\[\d+\]\['sampleValues'\]",
# "root[0]['proposedSnapshot']['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot']['aspects'][2]['com.linkedin.pegasus2avro.schema.SchemaMetadata']['fields'][4]"
r"root\[\d+\]\['proposedSnapshot'\]\['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot'\]\['aspects'\]\[\d+\]\['com.linkedin.pegasus2avro.schema.SchemaMetadata'\]\['fields'\]",
# "root[0]['proposedSnapshot']['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot']['aspects'][1]['com.linkedin.pegasus2avro.dataset.DatasetProperties']['customProperties']['size_in_bytes']"
r"root\[\d+\]\['proposedSnapshot'\]\['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot'\]\['aspects'\]\[\d+\]\['com.linkedin.pegasus2avro.dataset.DatasetProperties'\]\['customProperties'\]\['size_in_bytes'\]",
],
)
def test_data_lake_incorrect_config_raises_error(tmp_path, mock_time):
ctx = PipelineContext(run_id="test-s3")
# Baseline: valid config
source: dict = {
"path_spec": {"include": "a/b/c/d/{table}.*", "table_name": "{table}"}
}
s3 = S3Source.create(source, ctx)
assert s3.source_config.platform == "file"
# Case 1 : named variable in table name is not present in include
source = {"path_spec": {"include": "a/b/c/d/{table}.*", "table_name": "{table1}"}}
with pytest.raises(ValidationError, match="table_name"):
S3Source.create(source, ctx)
# Case 2 : named variable in exclude is not allowed
source = {
"path_spec": {
"include": "a/b/c/d/{table}/*.*",
"exclude": ["a/b/c/d/a-{exclude}/**"],
},
}
with pytest.raises(ValidationError, match=r"exclude.*named variable"):
S3Source.create(source, ctx)
# Case 3 : unsupported file type not allowed
source = {
"path_spec": {
"include": "a/b/c/d/{table}/*.hd5",
}
}
with pytest.raises(ValidationError, match="file type"):
S3Source.create(source, ctx)
# Case 4 : ** in include not allowed
source = {
"path_spec": {
"include": "a/b/c/d/**/*.*",
},
}
with pytest.raises(ValidationError, match=r"\*\*"):
S3Source.create(source, ctx)