mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-31 21:36:08 +00:00
74 lines
2.0 KiB
Python
74 lines
2.0 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
|
|
import pytest
|
|
|
|
from datahub.ingestion.run.pipeline import Pipeline
|
|
from datahub.testing import mce_helpers
|
|
|
|
FROZEN_TIME = "2020-04-14 07:00:00"
|
|
|
|
SOURCE_FILES_PATH = "./tests/integration/delta_lake/sources/local"
|
|
source_files = os.listdir(SOURCE_FILES_PATH)
|
|
|
|
|
|
@pytest.mark.parametrize("source_file", source_files)
|
|
def test_delta_lake(pytestconfig, source_file, tmp_path, mock_time):
|
|
test_resources_dir = pytestconfig.rootpath / "tests/integration/delta_lake"
|
|
|
|
f = open(os.path.join(SOURCE_FILES_PATH, source_file))
|
|
source = json.load(f)
|
|
|
|
config_dict = {}
|
|
config_dict["source"] = source
|
|
config_dict["sink"] = {
|
|
"type": "file",
|
|
"config": {
|
|
"filename": f"{tmp_path}/{source_file}",
|
|
},
|
|
# "type": "datahub-rest",
|
|
# "config": {"server": "http://localhost:8080"}
|
|
}
|
|
|
|
config_dict["run_id"] = source_file
|
|
|
|
pipeline = Pipeline.create(config_dict)
|
|
pipeline.run()
|
|
pipeline.raise_from_status()
|
|
|
|
print(f"tmp pth: {tmp_path}")
|
|
print(f"source file : {source_file}")
|
|
print(f"testresource dir: {test_resources_dir}")
|
|
# Verify the output.
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
output_path=f"{tmp_path}/{source_file}",
|
|
golden_path=f"{test_resources_dir}/golden_files/local/golden_mces_{source_file}",
|
|
)
|
|
|
|
|
|
def test_delta_lake_incorrect_config_raises_error(tmp_path, mock_time):
|
|
config_dict = {}
|
|
config_dict["sink"] = {
|
|
"type": "file",
|
|
"config": {
|
|
"filename": f"{tmp_path}/mces.json",
|
|
},
|
|
# "type": "datahub-rest",
|
|
# "config": {"server": "http://localhost:8080"}
|
|
}
|
|
|
|
# Case 1 : named variable in table name is not present in include
|
|
source = {
|
|
"type": "delta-lake",
|
|
"config": {"base_path": "invalid/path"},
|
|
}
|
|
config_dict["source"] = source
|
|
with pytest.raises(Exception) as e_info:
|
|
pipeline = Pipeline.create(config_dict)
|
|
pipeline.run()
|
|
pipeline.raise_from_status()
|
|
|
|
logging.debug(e_info)
|