2021-07-21 12:56:31 -07:00
|
|
|
from os import PathLike
|
|
|
|
from typing import Any, Dict, Optional, Union
|
|
|
|
|
2021-03-22 23:11:29 -07:00
|
|
|
from datahub.ingestion.run.pipeline import Pipeline
|
2021-04-13 17:30:24 -07:00
|
|
|
from tests.test_helpers import mce_helpers
|
2021-03-22 23:11:29 -07:00
|
|
|
|
|
|
|
|
2021-07-21 12:56:31 -07:00
|
|
|
class DbtTestConfig:
|
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
run_id: str,
|
|
|
|
test_resources_dir: Union[str, PathLike],
|
|
|
|
tmp_path: Union[str, PathLike],
|
|
|
|
output_file: Union[str, PathLike],
|
|
|
|
golden_file: Union[str, PathLike],
|
|
|
|
source_config_modifiers: Optional[Dict[str, Any]] = None,
|
|
|
|
sink_config_modifiers: Optional[Dict[str, Any]] = None,
|
|
|
|
):
|
|
|
|
|
|
|
|
if source_config_modifiers is None:
|
|
|
|
source_config_modifiers = {}
|
|
|
|
|
|
|
|
if sink_config_modifiers is None:
|
|
|
|
sink_config_modifiers = {}
|
|
|
|
|
|
|
|
self.run_id = run_id
|
|
|
|
|
|
|
|
self.manifest_path = f"{test_resources_dir}/dbt_manifest.json"
|
|
|
|
self.catalog_path = f"{test_resources_dir}/dbt_catalog.json"
|
|
|
|
self.sources_path = f"{test_resources_dir}/dbt_sources.json"
|
|
|
|
self.target_platform = "dbt"
|
|
|
|
|
|
|
|
self.output_path = f"{tmp_path}/{output_file}"
|
|
|
|
|
|
|
|
self.golden_path = f"{test_resources_dir}/{golden_file}"
|
|
|
|
|
|
|
|
self.source_config = dict(
|
|
|
|
{
|
|
|
|
"manifest_path": self.manifest_path,
|
|
|
|
"catalog_path": self.catalog_path,
|
|
|
|
"sources_path": self.sources_path,
|
|
|
|
"target_platform": self.target_platform,
|
|
|
|
},
|
|
|
|
**source_config_modifiers,
|
|
|
|
)
|
|
|
|
|
|
|
|
self.sink_config = dict(
|
|
|
|
{
|
|
|
|
"filename": self.output_path,
|
|
|
|
},
|
|
|
|
**sink_config_modifiers,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
2021-03-23 20:15:44 -07:00
|
|
|
def test_dbt_ingest(pytestconfig, tmp_path, mock_time):
|
2021-03-22 23:11:29 -07:00
|
|
|
test_resources_dir = pytestconfig.rootpath / "tests/integration/dbt"
|
|
|
|
|
2021-07-21 12:56:31 -07:00
|
|
|
config_variants = [
|
|
|
|
DbtTestConfig(
|
|
|
|
"dbt-test-with-schemas",
|
|
|
|
test_resources_dir,
|
|
|
|
tmp_path,
|
|
|
|
"dbt_with_schemas_mces.json",
|
|
|
|
"dbt_with_schemas_mces_golden.json",
|
|
|
|
source_config_modifiers={"load_schemas": True},
|
|
|
|
),
|
|
|
|
DbtTestConfig(
|
|
|
|
"dbt-test-without-schemas",
|
|
|
|
test_resources_dir,
|
|
|
|
tmp_path,
|
|
|
|
"dbt_without_schemas_mces.json",
|
|
|
|
"dbt_without_schemas_mces_golden.json",
|
|
|
|
source_config_modifiers={"load_schemas": False},
|
|
|
|
),
|
|
|
|
]
|
|
|
|
|
|
|
|
for config in config_variants:
|
|
|
|
|
|
|
|
# test manifest, catalog, sources are generated from https://github.com/kevinhu/sample-dbt
|
|
|
|
pipeline = Pipeline.create(
|
|
|
|
{
|
|
|
|
"run_id": config.run_id,
|
|
|
|
"source": {"type": "dbt", "config": config.source_config},
|
|
|
|
"sink": {
|
|
|
|
"type": "file",
|
|
|
|
"config": config.sink_config,
|
2021-03-22 23:11:29 -07:00
|
|
|
},
|
2021-07-21 12:56:31 -07:00
|
|
|
}
|
|
|
|
)
|
|
|
|
pipeline.run()
|
|
|
|
pipeline.raise_from_status()
|
|
|
|
|
|
|
|
mce_helpers.check_golden_file(
|
|
|
|
pytestconfig,
|
|
|
|
output_path=config.output_path,
|
|
|
|
golden_path=config.golden_path,
|
|
|
|
)
|