mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-04 07:34:44 +00:00
197 lines
5.7 KiB
Python
197 lines
5.7 KiB
Python
import datetime
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Optional, Union
|
|
from unittest import mock
|
|
from unittest.mock import MagicMock
|
|
|
|
import pytest
|
|
from freezegun import freeze_time
|
|
|
|
from datahub.ingestion.run.pipeline import Pipeline
|
|
from datahub.testing import mce_helpers
|
|
|
|
pytestmark = pytest.mark.integration_batch_2
|
|
FROZEN_TIME = "2022-02-03 07:00:00"
|
|
|
|
|
|
def mock_msal_cca(*args, **kwargs):
|
|
class MsalClient:
|
|
def __init__(self):
|
|
self.call_num = 0
|
|
self.token: Dict[str, Any] = {
|
|
"access_token": "dummy",
|
|
}
|
|
|
|
def acquire_token_for_client(self, *args, **kwargs):
|
|
self.call_num += 1
|
|
return self.token
|
|
|
|
def reset(self):
|
|
self.call_num = 0
|
|
|
|
return MsalClient()
|
|
|
|
|
|
def scan_init_response(request, context):
|
|
workspace_id_list = request.text.replace("&", "").split("workspaces=")
|
|
|
|
workspace_id = "||".join(workspace_id_list[1:])
|
|
|
|
w_id_vs_response: Dict[str, Any] = {
|
|
"64ed5cad-7c10-4684-8180-826122881108": {
|
|
"id": "4674efd1-603c-4129-8d82-03cf2be05aff"
|
|
}
|
|
}
|
|
|
|
return w_id_vs_response[workspace_id]
|
|
|
|
|
|
def read_mock_data(path: Union[Path, str]) -> dict:
|
|
with open(path) as p:
|
|
return json.load(p)
|
|
|
|
|
|
def register_mock_api(
|
|
pytestconfig: pytest.Config, request_mock: Any, override_data: Optional[dict] = None
|
|
) -> None:
|
|
default_mock_data_path = (
|
|
pytestconfig.rootpath
|
|
/ "tests/integration/powerbi/mock_data/mysql_mock_response.json"
|
|
)
|
|
|
|
api_vs_response = {
|
|
"https://api.powerbi.com/v1.0/myorg/admin/workspaces/getInfo": {
|
|
"method": "POST",
|
|
"status_code": 200,
|
|
"json": scan_init_response,
|
|
},
|
|
}
|
|
|
|
api_vs_response.update(read_mock_data(default_mock_data_path))
|
|
|
|
api_vs_response.update(override_data or {})
|
|
|
|
for url in api_vs_response:
|
|
request_mock.register_uri(
|
|
api_vs_response[url]["method"],
|
|
url,
|
|
json=api_vs_response[url].get("json"),
|
|
text=api_vs_response[url].get("text"),
|
|
status_code=api_vs_response[url]["status_code"],
|
|
)
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
@mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca)
|
|
@pytest.mark.integration
|
|
def test_mysql_ingest(
|
|
mock_msal: MagicMock,
|
|
pytestconfig: pytest.Config,
|
|
tmp_path: str,
|
|
mock_time: datetime.datetime,
|
|
requests_mock: Any,
|
|
) -> None:
|
|
test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi"
|
|
|
|
register_mock_api(
|
|
request_mock=requests_mock,
|
|
pytestconfig=pytestconfig,
|
|
override_data=read_mock_data(
|
|
pytestconfig.rootpath
|
|
/ "tests/integration/powerbi/mock_data/mysql_mock_response.json"
|
|
),
|
|
)
|
|
|
|
pipeline = Pipeline.create(
|
|
{
|
|
"run_id": "powerbi-test",
|
|
"source": {
|
|
"type": "powerbi",
|
|
"config": {
|
|
"tenant_id": "0b0c960b-fcdf-4d0f-8c45-2e03bb59ddeb",
|
|
"client_id": "a8d655a6-f521-477e-8c22-255018583bf4",
|
|
"client_secret": "ababa~cdcdcdcdcdcdcdcdcdcd-abcd.defghijk",
|
|
"extract_ownership": True,
|
|
"extract_endorsements_to_tags": True,
|
|
"extract_app": True,
|
|
"extract_column_level_lineage": True,
|
|
"workspace_name_pattern": {"allow": ["^Employees$"]},
|
|
},
|
|
},
|
|
"sink": {
|
|
"type": "file",
|
|
"config": {
|
|
"filename": f"{tmp_path}/powerbi_mysql_mces.json",
|
|
},
|
|
},
|
|
}
|
|
)
|
|
|
|
pipeline.run()
|
|
pipeline.raise_from_status()
|
|
golden_file = "golden_test_mysql.json"
|
|
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
output_path=f"{tmp_path}/powerbi_mysql_mces.json",
|
|
golden_path=f"{test_resources_dir}/{golden_file}",
|
|
)
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
@mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca)
|
|
@pytest.mark.integration
|
|
def test_mysql_odbc_ingest(
|
|
mock_msal: MagicMock,
|
|
pytestconfig: pytest.Config,
|
|
tmp_path: str,
|
|
mock_time: datetime.datetime,
|
|
requests_mock: Any,
|
|
) -> None:
|
|
test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi"
|
|
|
|
register_mock_api(
|
|
request_mock=requests_mock,
|
|
pytestconfig=pytestconfig,
|
|
override_data=read_mock_data(
|
|
pytestconfig.rootpath
|
|
/ "tests/integration/powerbi/mock_data/mysql_odbc_mock_response.json"
|
|
),
|
|
)
|
|
|
|
pipeline = Pipeline.create(
|
|
{
|
|
"run_id": "powerbi-test",
|
|
"source": {
|
|
"type": "powerbi",
|
|
"config": {
|
|
"tenant_id": "0b0c960b-fcdf-4d0f-8c45-2e03bb59ddeb",
|
|
"client_id": "a8d655a6-f521-477e-8c22-255018583bf4",
|
|
"client_secret": "ababa~cdcdcdcdcdcdcdcdcdcd-abcd.defghijk",
|
|
"extract_ownership": True,
|
|
"extract_endorsements_to_tags": True,
|
|
"extract_app": True,
|
|
"extract_column_level_lineage": True,
|
|
"workspace_name_pattern": {"allow": ["^Employees$"]},
|
|
},
|
|
},
|
|
"sink": {
|
|
"type": "file",
|
|
"config": {
|
|
"filename": f"{tmp_path}/powerbi_mysql_odbc_mces.json",
|
|
},
|
|
},
|
|
}
|
|
)
|
|
|
|
pipeline.run()
|
|
pipeline.raise_from_status()
|
|
golden_file = "golden_test_mysql_odbc.json"
|
|
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
output_path=f"{tmp_path}/powerbi_mysql_odbc_mces.json",
|
|
golden_path=f"{test_resources_dir}/{golden_file}",
|
|
)
|