271 lines
9.2 KiB
Python
Raw Permalink Normal View History

import json
import pathlib
from typing import Sequence
from unittest.mock import patch
import pytest
from freezegun import freeze_time
from requests.models import HTTPError
from datahub.configuration.common import PipelineExecutionError
from datahub.ingestion.api.source import StructuredLogEntry
from datahub.ingestion.run.pipeline import Pipeline
from tests.test_helpers import mce_helpers
FROZEN_TIME = "2021-12-07 07:00:00"
JSON_RESPONSE_MAP = {
"https://app.mode.com/api/verify": "verify.json",
"https://app.mode.com/api/account": "user.json",
"https://app.mode.com/api/acryl/spaces?filter=all&per_page=30&page=1": "spaces.json",
"https://app.mode.com/api/acryl/spaces?filter=all&per_page=30&page=2": "spaces_empty.json",
"https://app.mode.com/api/acryl/spaces/157933cc1168/reports": "reports_157933cc1168.json",
"https://app.mode.com/api/acryl/spaces/75737b70402e/reports": "reports_75737b70402e.json",
"https://app.mode.com/api/modeuser": "user.json",
"https://app.mode.com/api/acryl/reports/9d2da37fa91e/queries": "queries.json",
"https://app.mode.com/api/acryl/reports/9d2da37fa91e/queries/6e26a9f3d4e2/charts": "charts.json",
"https://app.mode.com/api/acryl/data_sources": "data_sources.json",
"https://app.mode.com/api/acryl/definitions": "definitions.json",
"https://app.mode.com/api/acryl/spaces/157933cc1168/datasets": "datasets_157933cc1168.json",
"https://app.mode.com/api/acryl/spaces/75737b70402e/datasets": "datasets_75737b70402e.json",
"https://app.mode.com/api/acryl/reports/24f66e1701b6": "dataset_24f66e1701b6.json",
"https://app.mode.com/api/acryl/reports/24f66e1701b6/queries": "dataset_queries_24f66e1701b6.json",
}
ERROR_URL = "https://app.mode.com/api/acryl/spaces/75737b70402e/reports"
test_resources_dir = pathlib.Path(__file__).parent
class MockResponse:
def __init__(self, error_list, status_code):
self.json_data = None
self.error_list = error_list
self.status_code = status_code
self.auth = None
self.headers = {}
self.url = None
def json(self):
return self.json_data
def mount(self, prefix, adaptor):
return self
def get(self, url, timeout=40):
if self.error_list is not None and self.url in self.error_list:
http_error_msg = "{} Client Error: {} for url: {}".format(
400,
"Simulate error",
self.url,
)
raise HTTPError(http_error_msg, response=self)
self.url = url
self.timeout = timeout
response_json_path = f"{test_resources_dir}/setup/{JSON_RESPONSE_MAP.get(url)}"
with open(response_json_path) as file:
data = json.loads(file.read())
self.json_data = data
return self
@property
def text(self) -> str:
return json.dumps(self.json_data)
def raise_for_status(self) -> None:
if self.status_code >= 400:
raise HTTPError(
f"MockResponse for {self.url} has status code {self.status_code}",
response=self,
)
class MockResponseJson(MockResponse):
def __init__(
self,
status_code: int = 200,
*,
json_empty_list: Sequence[str] = (),
json_error_list: Sequence[str] = (),
):
super().__init__(None, status_code)
self.json_empty_list = json_empty_list
self.json_error_list = json_error_list
def json(self):
if self.url in self.json_empty_list:
return json.loads("") # Shouldn't be called
if self.url in self.json_error_list:
return json.loads("{")
return super().json()
def get(self, url, timeout=40):
response = super().get(url, timeout)
if self.url in self.json_empty_list:
response.status_code = 204
return response
def mocked_requests_success(*args, **kwargs):
return MockResponse(None, 200)
def mocked_requests_failure(*args, **kwargs):
return MockResponse([ERROR_URL], 200)
@freeze_time(FROZEN_TIME)
def test_mode_ingest_success(pytestconfig, tmp_path):
with patch(
"datahub.ingestion.source.mode.requests.Session",
side_effect=mocked_requests_success,
):
pipeline = Pipeline.create(
{
"run_id": "mode-test",
"source": {
"type": "mode",
"config": {
"token": "xxxx",
"password": "xxxx",
"connect_uri": "https://app.mode.com/",
"workspace": "acryl",
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/mode_mces.json",
},
},
}
)
pipeline.run()
pipeline.raise_from_status()
mce_helpers.check_golden_file(
pytestconfig,
output_path=f"{tmp_path}/mode_mces.json",
golden_path=test_resources_dir / "mode_mces_golden.json",
ignore_paths=mce_helpers.IGNORE_PATH_TIMESTAMPS,
)
@freeze_time(FROZEN_TIME)
def test_mode_ingest_failure(pytestconfig, tmp_path):
with patch(
"datahub.ingestion.source.mode.requests.Session",
side_effect=mocked_requests_failure,
):
global test_resources_dir
test_resources_dir = pytestconfig.rootpath / "tests/integration/mode"
pipeline = Pipeline.create(
{
"run_id": "mode-test",
"source": {
"type": "mode",
"config": {
"token": "xxxx",
"password": "xxxx",
"connect_uri": "https://app.mode.com/",
"workspace": "acryl",
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/mode_mces.json",
},
},
}
)
pipeline.run()
with pytest.raises(PipelineExecutionError) as exec_error:
pipeline.raise_from_status()
assert exec_error.value.args[0] == "Source reported errors"
assert len(exec_error.value.args[1]) == 1
error_dict: StructuredLogEntry
_level, error_dict = exec_error.value.args[1][0]
error = next(iter(error_dict.context))
assert "Simulate error" in error
assert ERROR_URL in error
@freeze_time(FROZEN_TIME)
def test_mode_ingest_json_empty(pytestconfig, tmp_path):
with patch(
"datahub.ingestion.source.mode.requests.Session",
side_effect=lambda *args, **kwargs: MockResponseJson(
json_empty_list=["https://app.mode.com/api/modeuser"]
),
):
global test_resources_dir
test_resources_dir = pytestconfig.rootpath / "tests/integration/mode"
pipeline = Pipeline.create(
{
"run_id": "mode-test",
"source": {
"type": "mode",
"config": {
"token": "xxxx",
"password": "xxxx",
"connect_uri": "https://app.mode.com/",
"workspace": "acryl",
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/mode_mces.json",
},
},
}
)
pipeline.run()
pipeline.raise_from_status(raise_warnings=True)
@freeze_time(FROZEN_TIME)
def test_mode_ingest_json_failure(pytestconfig, tmp_path):
with patch(
"datahub.ingestion.source.mode.requests.Session",
side_effect=lambda *args, **kwargs: MockResponseJson(
json_error_list=["https://app.mode.com/api/modeuser"]
),
):
global test_resources_dir
test_resources_dir = pytestconfig.rootpath / "tests/integration/mode"
pipeline = Pipeline.create(
{
"run_id": "mode-test",
"source": {
"type": "mode",
"config": {
"token": "xxxx",
"password": "xxxx",
"connect_uri": "https://app.mode.com/",
"workspace": "acryl",
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/mode_mces.json",
},
},
}
)
pipeline.run()
pipeline.raise_from_status(raise_warnings=False)
with pytest.raises(PipelineExecutionError) as exec_error:
pipeline.raise_from_status(raise_warnings=True)
assert len(exec_error.value.args[1]) > 0
error_dict: StructuredLogEntry
_level, error_dict = exec_error.value.args[1][0]
error = next(iter(error_dict.context))
assert "Expecting property name enclosed in double quotes" in error