2025-05-21 11:27:34 -05:00

136 lines
4.6 KiB
Python

import logging
from datetime import datetime
from unittest.mock import MagicMock, patch
import pytest
from datahub.ingestion.run.pipeline import Pipeline
from datahub.testing import mce_helpers
FROZEN_TIME = "2025-01-01 01:00:00"
CONTAINER_NAME = "test-abs"
ABS_PREFIX = "excel/test/"
ACCOUNT_NAME = "teststorageaccount"
SAS_TOKEN = "?sv=2020-08-04&ss=b&srt=co&sp=rwlacx&sig=fakeSignature123456789"
@pytest.fixture(scope="module")
def mock_azure_blob_setup():
mock_blob_service_client = MagicMock()
mock_container_client = MagicMock()
mock_blob_client = MagicMock()
mock_blob_service_client.get_container_client.return_value = mock_container_client
mock_container_client.get_blob_client.return_value = mock_blob_client
blob_list = [] # type: ignore
return {
"mock_blob_service_client": mock_blob_service_client,
"mock_container_client": mock_container_client,
"mock_blob_client": mock_blob_client,
"blob_list": blob_list,
}
@pytest.fixture(scope="module")
def abs_populate(pytestconfig, mock_azure_blob_setup):
mock_container_client = mock_azure_blob_setup["mock_container_client"]
mock_blob_client = mock_azure_blob_setup["mock_blob_client"]
blob_list = mock_azure_blob_setup["blob_list"]
data_dir = pytestconfig.rootpath / "tests/integration/excel/data"
logging.info(
f"Loading Excel files from {data_dir} to Azure Blob container: {CONTAINER_NAME}"
)
current_time = datetime.strptime(FROZEN_TIME, "%Y-%m-%d %H:%M:%S")
for file_path in data_dir.glob("*.xlsx"):
blob_path = f"{ABS_PREFIX}{file_path.name}"
with open(file_path, "rb") as f:
file_content = f.read()
download_mock = MagicMock()
download_mock.readall.return_value = file_content
mock_blob_client.download_blob.return_value = download_mock
blob_props = MagicMock()
blob_props.name = blob_path
blob_props.last_modified = current_time
blob_props.size = len(file_content)
blob_list.append(blob_props)
current_time = current_time.replace(second=current_time.second + 10)
logging.info(
f"Uploaded {file_path.name} to https://{ACCOUNT_NAME}.blob.core.windows.net/{CONTAINER_NAME}/{blob_path}"
)
def mock_list_blobs_filter(name_starts_with=None, **kwargs):
logging.info(f"list_blobs called with name_starts_with={name_starts_with}")
if name_starts_with:
return [
blob for blob in blob_list if blob.name.startswith(name_starts_with)
]
return blob_list
mock_container_client.list_blobs.side_effect = mock_list_blobs_filter
@pytest.mark.integration
def test_excel_abs(
pytestconfig, abs_populate, tmp_path, mock_time, mock_azure_blob_setup
):
test_resources_dir = pytestconfig.rootpath / "tests/integration/excel"
mock_blob_service_client = mock_azure_blob_setup["mock_blob_service_client"]
with patch(
"azure.storage.blob.BlobServiceClient.__init__", return_value=None
), patch(
"azure.storage.blob.BlobServiceClient", return_value=mock_blob_service_client
), patch(
"datahub.ingestion.source.azure.azure_common.AzureConnectionConfig.get_blob_service_client",
return_value=mock_blob_service_client,
):
pipeline = Pipeline.create(
{
"run_id": "excel-abs-test",
"source": {
"type": "excel",
"config": {
"path_list": [
f"https://{ACCOUNT_NAME}.blob.core.windows.net/{CONTAINER_NAME}/{ABS_PREFIX}*.xlsx",
],
"azure_config": {
"account_name": ACCOUNT_NAME,
"sas_token": SAS_TOKEN,
"container_name": CONTAINER_NAME,
},
"profiling": {
"enabled": True,
},
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/excel_abs_test.json",
},
},
}
)
pipeline.run()
pipeline.raise_from_status()
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "excel_abs_test.json",
golden_path=test_resources_dir / "excel_abs_test_golden.json",
ignore_paths=[
r"root\[\d+\]\['aspect'\]\['json'\]\['fieldProfiles'\]\[\d+\]\['sampleValues'\]",
],
)