test(ingest): use pytest parameterization for dbt integration tests (#5879)

This commit is contained in:
Harshal Sheth 2022-09-10 18:35:58 -07:00 committed by GitHub
parent afa58888e4
commit 3cd8ea8316
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,3 +1,5 @@
import dataclasses
from dataclasses import dataclass
from os import PathLike
from typing import Any, Dict, Optional, Type, Union, cast
from unittest.mock import patch
@ -36,35 +38,29 @@ GMS_PORT = 8080
GMS_SERVER = f"http://localhost:{GMS_PORT}"
@dataclass
class DbtTestConfig:
def __init__(
run_id: str
output_file: Union[str, PathLike]
golden_file: Union[str, PathLike]
manifest_file: str = "dbt_manifest.json"
source_config_modifiers: Dict[str, Any] = dataclasses.field(default_factory=dict)
sink_config_modifiers: Dict[str, Any] = dataclasses.field(default_factory=dict)
def set_paths(
self,
run_id: str,
dbt_metadata_uri_prefix: str,
test_resources_dir: Union[str, PathLike],
tmp_path: Union[str, PathLike],
output_file: Union[str, PathLike],
golden_file: Union[str, PathLike],
manifest_file: str = "dbt_manifest.json",
source_config_modifiers: Optional[Dict[str, Any]] = None,
sink_config_modifiers: Optional[Dict[str, Any]] = None,
):
if source_config_modifiers is None:
source_config_modifiers = {}
if sink_config_modifiers is None:
sink_config_modifiers = {}
self.run_id = run_id
self.manifest_path = f"{dbt_metadata_uri_prefix}/{manifest_file}"
dbt_metadata_uri_prefix: PathLike,
test_resources_dir: PathLike,
tmp_path: PathLike,
) -> None:
self.manifest_path = f"{dbt_metadata_uri_prefix}/{self.manifest_file}"
self.catalog_path = f"{dbt_metadata_uri_prefix}/dbt_catalog.json"
self.sources_path = f"{dbt_metadata_uri_prefix}/dbt_sources.json"
self.target_platform = "postgres"
self.output_path = f"{tmp_path}/{output_file}"
self.output_path = f"{tmp_path}/{self.output_file}"
self.golden_path = f"{test_resources_dir}/{golden_file}"
self.golden_path = f"{test_resources_dir}/{self.golden_file}"
self.source_config = dict(
{
"manifest_path": self.manifest_path,
@ -113,20 +109,66 @@ class DbtTestConfig:
}
},
},
**source_config_modifiers,
**self.source_config_modifiers,
)
self.sink_config = dict(
{
"filename": self.output_path,
},
**sink_config_modifiers,
**self.sink_config_modifiers,
)
@pytest.mark.parametrize(
# test manifest, catalog, sources are generated from https://github.com/kevinhu/sample-dbt
"dbt_test_config",
[
DbtTestConfig(
"dbt-test-with-schemas-dbt-enabled",
"dbt_enabled_with_schemas_mces.json",
"dbt_enabled_with_schemas_mces_golden.json",
source_config_modifiers={
"enable_meta_mapping": True,
"owner_extraction_pattern": r"^@(?P<owner>(.*))",
},
),
DbtTestConfig(
"dbt-test-with-complex-owner-patterns",
"dbt_test_with_complex_owner_patterns_mces.json",
"dbt_test_with_complex_owner_patterns_mces_golden.json",
manifest_file="dbt_manifest_complex_owner_patterns.json",
source_config_modifiers={
"node_name_pattern": {
"deny": ["source.sample_dbt.pagila.payment_p2020_06"]
},
"owner_extraction_pattern": "(.*)(?P<owner>(?<=\\().*?(?=\\)))",
"strip_user_ids_from_email": True,
},
),
DbtTestConfig(
"dbt-test-with-data-platform-instance",
"dbt_test_with_data_platform_instance_mces.json",
"dbt_test_with_data_platform_instance_mces_golden.json",
source_config_modifiers={
"platform_instance": "dbt-instance-1",
},
),
DbtTestConfig(
"dbt-test-with-target-platform-instance",
"dbt_test_with_target_platform_instance_mces.json",
"dbt_test_with_target_platform_instance_mces_golden.json",
source_config_modifiers={
"target_platform_instance": "ps-instance-1",
},
),
],
ids=lambda dbt_test_config: dbt_test_config.run_id,
)
@pytest.mark.integration
@requests_mock.Mocker(kw="req_mock")
def test_dbt_ingest(pytestconfig, tmp_path, mock_time, **kwargs):
def test_dbt_ingest(dbt_test_config, pytestconfig, tmp_path, mock_time, **kwargs):
config: DbtTestConfig = dbt_test_config
test_resources_dir = pytestconfig.rootpath / "tests/integration/dbt"
with open(test_resources_dir / "dbt_manifest.json", "r") as f:
@ -144,78 +186,29 @@ def test_dbt_ingest(pytestconfig, tmp_path, mock_time, **kwargs):
"http://some-external-repo/dbt_sources.json", text=f.read()
)
config_variants = [
DbtTestConfig(
"dbt-test-with-schemas-dbt-enabled",
test_resources_dir,
test_resources_dir,
tmp_path,
"dbt_enabled_with_schemas_mces.json",
"dbt_enabled_with_schemas_mces_golden.json",
source_config_modifiers={
"enable_meta_mapping": True,
"owner_extraction_pattern": r"^@(?P<owner>(.*))",
},
),
DbtTestConfig(
"dbt-test-with-complex-owner-patterns",
test_resources_dir,
test_resources_dir,
tmp_path,
"dbt_test_with_complex_owner_patterns_mces.json",
"dbt_test_with_complex_owner_patterns_mces_golden.json",
manifest_file="dbt_manifest_complex_owner_patterns.json",
source_config_modifiers={
"node_name_pattern": {
"deny": ["source.sample_dbt.pagila.payment_p2020_06"]
},
"owner_extraction_pattern": "(.*)(?P<owner>(?<=\\().*?(?=\\)))",
"strip_user_ids_from_email": True,
},
),
DbtTestConfig(
"dbt-test-with-data-platform-instance",
test_resources_dir,
test_resources_dir,
tmp_path,
"dbt_test_with_data_platform_instance_mces.json",
"dbt_test_with_data_platform_instance_mces_golden.json",
source_config_modifiers={
"platform_instance": "dbt-instance-1",
},
),
DbtTestConfig(
"dbt-test-with-target-platform-instance",
test_resources_dir,
test_resources_dir,
tmp_path,
"dbt_test_with_target_platform_instance_mces.json",
"dbt_test_with_target_platform_instance_mces_golden.json",
source_config_modifiers={
"target_platform_instance": "ps-instance-1",
},
),
]
config.set_paths(
dbt_metadata_uri_prefix=test_resources_dir,
test_resources_dir=test_resources_dir,
tmp_path=tmp_path,
)
for config in config_variants:
# test manifest, catalog, sources are generated from https://github.com/kevinhu/sample-dbt
pipeline = Pipeline.create(
{
"run_id": config.run_id,
"source": {"type": "dbt", "config": config.source_config},
"sink": {
"type": "file",
"config": config.sink_config,
},
}
)
pipeline.run()
pipeline.raise_from_status()
mce_helpers.check_golden_file(
pytestconfig,
output_path=config.output_path,
golden_path=config.golden_path,
)
pipeline = Pipeline.create(
{
"run_id": config.run_id,
"source": {"type": "dbt", "config": config.source_config},
"sink": {
"type": "file",
"config": config.sink_config,
},
}
)
pipeline.run()
pipeline.raise_from_status()
mce_helpers.check_golden_file(
pytestconfig,
output_path=config.output_path,
golden_path=config.golden_path,
)
def get_current_checkpoint_from_pipeline(