mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-25 10:30:04 +00:00
136 lines
4.6 KiB
Python
136 lines
4.6 KiB
Python
import logging
|
|
from datetime import datetime
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from datahub.ingestion.run.pipeline import Pipeline
|
|
from datahub.testing import mce_helpers
|
|
|
|
FROZEN_TIME = "2025-01-01 01:00:00"
|
|
CONTAINER_NAME = "test-abs"
|
|
ABS_PREFIX = "excel/test/"
|
|
ACCOUNT_NAME = "teststorageaccount"
|
|
SAS_TOKEN = "?sv=2020-08-04&ss=b&srt=co&sp=rwlacx&sig=fakeSignature123456789"
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def mock_azure_blob_setup():
|
|
mock_blob_service_client = MagicMock()
|
|
mock_container_client = MagicMock()
|
|
mock_blob_client = MagicMock()
|
|
|
|
mock_blob_service_client.get_container_client.return_value = mock_container_client
|
|
mock_container_client.get_blob_client.return_value = mock_blob_client
|
|
|
|
blob_list = [] # type: ignore
|
|
|
|
return {
|
|
"mock_blob_service_client": mock_blob_service_client,
|
|
"mock_container_client": mock_container_client,
|
|
"mock_blob_client": mock_blob_client,
|
|
"blob_list": blob_list,
|
|
}
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def abs_populate(pytestconfig, mock_azure_blob_setup):
|
|
mock_container_client = mock_azure_blob_setup["mock_container_client"]
|
|
mock_blob_client = mock_azure_blob_setup["mock_blob_client"]
|
|
blob_list = mock_azure_blob_setup["blob_list"]
|
|
|
|
data_dir = pytestconfig.rootpath / "tests/integration/excel/data"
|
|
logging.info(
|
|
f"Loading Excel files from {data_dir} to Azure Blob container: {CONTAINER_NAME}"
|
|
)
|
|
|
|
current_time = datetime.strptime(FROZEN_TIME, "%Y-%m-%d %H:%M:%S")
|
|
|
|
for file_path in data_dir.glob("*.xlsx"):
|
|
blob_path = f"{ABS_PREFIX}{file_path.name}"
|
|
|
|
with open(file_path, "rb") as f:
|
|
file_content = f.read()
|
|
|
|
download_mock = MagicMock()
|
|
download_mock.readall.return_value = file_content
|
|
|
|
mock_blob_client.download_blob.return_value = download_mock
|
|
|
|
blob_props = MagicMock()
|
|
blob_props.name = blob_path
|
|
blob_props.last_modified = current_time
|
|
blob_props.size = len(file_content)
|
|
|
|
blob_list.append(blob_props)
|
|
|
|
current_time = current_time.replace(second=current_time.second + 10)
|
|
logging.info(
|
|
f"Uploaded {file_path.name} to https://{ACCOUNT_NAME}.blob.core.windows.net/{CONTAINER_NAME}/{blob_path}"
|
|
)
|
|
|
|
def mock_list_blobs_filter(name_starts_with=None, **kwargs):
|
|
logging.info(f"list_blobs called with name_starts_with={name_starts_with}")
|
|
if name_starts_with:
|
|
return [
|
|
blob for blob in blob_list if blob.name.startswith(name_starts_with)
|
|
]
|
|
return blob_list
|
|
|
|
mock_container_client.list_blobs.side_effect = mock_list_blobs_filter
|
|
|
|
|
|
@pytest.mark.integration
|
|
def test_excel_abs(
|
|
pytestconfig, abs_populate, tmp_path, mock_time, mock_azure_blob_setup
|
|
):
|
|
test_resources_dir = pytestconfig.rootpath / "tests/integration/excel"
|
|
mock_blob_service_client = mock_azure_blob_setup["mock_blob_service_client"]
|
|
|
|
with patch(
|
|
"azure.storage.blob.BlobServiceClient.__init__", return_value=None
|
|
), patch(
|
|
"azure.storage.blob.BlobServiceClient", return_value=mock_blob_service_client
|
|
), patch(
|
|
"datahub.ingestion.source.azure.azure_common.AzureConnectionConfig.get_blob_service_client",
|
|
return_value=mock_blob_service_client,
|
|
):
|
|
pipeline = Pipeline.create(
|
|
{
|
|
"run_id": "excel-abs-test",
|
|
"source": {
|
|
"type": "excel",
|
|
"config": {
|
|
"path_list": [
|
|
f"https://{ACCOUNT_NAME}.blob.core.windows.net/{CONTAINER_NAME}/{ABS_PREFIX}*.xlsx",
|
|
],
|
|
"azure_config": {
|
|
"account_name": ACCOUNT_NAME,
|
|
"sas_token": SAS_TOKEN,
|
|
"container_name": CONTAINER_NAME,
|
|
},
|
|
"profiling": {
|
|
"enabled": True,
|
|
},
|
|
},
|
|
},
|
|
"sink": {
|
|
"type": "file",
|
|
"config": {
|
|
"filename": f"{tmp_path}/excel_abs_test.json",
|
|
},
|
|
},
|
|
}
|
|
)
|
|
pipeline.run()
|
|
pipeline.raise_from_status()
|
|
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
output_path=tmp_path / "excel_abs_test.json",
|
|
golden_path=test_resources_dir / "excel_abs_test_golden.json",
|
|
ignore_paths=[
|
|
r"root\[\d+\]\['aspect'\]\['json'\]\['fieldProfiles'\]\[\d+\]\['sampleValues'\]",
|
|
],
|
|
)
|