mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-29 04:20:00 +00:00
115 lines
3.3 KiB
Python
115 lines
3.3 KiB
Python
import os
|
|
import subprocess
|
|
|
|
import boto3
|
|
import freezegun
|
|
import pytest
|
|
|
|
from datahub.ingestion.run.pipeline import Pipeline
|
|
from datahub.testing import mce_helpers
|
|
from tests.test_helpers.docker_helpers import wait_for_port
|
|
|
|
pytestmark = pytest.mark.integration_batch_2
|
|
|
|
FROZEN_TIME = "2020-04-14 07:00:00"
|
|
MINIO_PORT = 9000
|
|
|
|
|
|
def is_minio_up(container_name: str) -> bool:
|
|
"""A cheap way to figure out if postgres is responsive on a container"""
|
|
|
|
cmd = f"docker logs {container_name} 2>&1 | grep '1 Online'"
|
|
ret = subprocess.run(
|
|
cmd,
|
|
shell=True,
|
|
)
|
|
return ret.returncode == 0
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def test_resources_dir(pytestconfig):
|
|
return pytestconfig.rootpath / "tests/integration/delta_lake"
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def minio_runner(docker_compose_runner, pytestconfig, test_resources_dir):
|
|
container_name = "minio_test"
|
|
with docker_compose_runner(
|
|
test_resources_dir / "docker-compose.yml", container_name
|
|
) as docker_services:
|
|
wait_for_port(
|
|
docker_services,
|
|
container_name,
|
|
MINIO_PORT,
|
|
timeout=120,
|
|
checker=lambda: is_minio_up(container_name),
|
|
)
|
|
yield docker_services
|
|
|
|
|
|
@pytest.fixture(scope="module", autouse=True)
|
|
def s3_bkt(minio_runner):
|
|
s3 = boto3.resource(
|
|
"s3",
|
|
endpoint_url=f"http://localhost:{MINIO_PORT}",
|
|
aws_access_key_id="miniouser",
|
|
aws_secret_access_key="miniopassword",
|
|
)
|
|
bkt = s3.Bucket("my-test-bucket")
|
|
bkt.create()
|
|
return bkt
|
|
|
|
|
|
@pytest.fixture(scope="module", autouse=True)
|
|
def populate_minio(pytestconfig, s3_bkt):
|
|
test_resources_dir = (
|
|
pytestconfig.rootpath / "tests/integration/delta_lake/test_data/"
|
|
)
|
|
|
|
for root, _dirs, files in os.walk(test_resources_dir):
|
|
for file in files:
|
|
full_path = os.path.join(root, file)
|
|
rel_path = os.path.relpath(full_path, test_resources_dir)
|
|
s3_bkt.upload_file(full_path, rel_path)
|
|
yield
|
|
|
|
|
|
@freezegun.freeze_time("2023-01-01 00:00:00+00:00")
|
|
def test_delta_lake_ingest(pytestconfig, tmp_path, test_resources_dir):
|
|
# Run the metadata ingestion pipeline.
|
|
pipeline = Pipeline.create(
|
|
{
|
|
"run_id": "delta-lake-test",
|
|
"source": {
|
|
"type": "delta-lake",
|
|
"config": {
|
|
"env": "DEV",
|
|
"base_path": "s3://my-test-bucket/delta_tables/sales",
|
|
"s3": {
|
|
"aws_config": {
|
|
"aws_access_key_id": "miniouser",
|
|
"aws_secret_access_key": "miniopassword",
|
|
"aws_endpoint_url": f"http://localhost:{MINIO_PORT}",
|
|
"aws_region": "us-east-1",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
"sink": {
|
|
"type": "file",
|
|
"config": {
|
|
"filename": f"{tmp_path}/delta_lake_minio_mces.json",
|
|
},
|
|
},
|
|
}
|
|
)
|
|
pipeline.run()
|
|
pipeline.raise_from_status()
|
|
|
|
# Verify the output.
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
output_path=tmp_path / "delta_lake_minio_mces.json",
|
|
golden_path=test_resources_dir / "delta_lake_minio_mces_golden.json",
|
|
)
|