240 lines
9.1 KiB
Python
Raw Permalink Normal View History

import subprocess
from typing import Any, Dict
from unittest.mock import patch
import pytest
from freezegun import freeze_time
from tests.test_helpers import mce_helpers
from tests.test_helpers.click_helpers import run_datahub_cmd
from tests.test_helpers.docker_helpers import cleanup_image, wait_for_port
from tests.test_helpers.state_helpers import (
get_current_checkpoint_from_pipeline,
run_and_get_pipeline,
validate_all_providers_have_committed_successfully,
)
pytestmark = pytest.mark.integration_batch_1
FROZEN_TIME = "2020-04-14 07:00:00"
GMS_PORT = 8080
GMS_SERVER = f"http://localhost:{GMS_PORT}"
# These paths change from one instance run of the clickhouse docker to the other, and the FROZEN_TIME does not apply to
# these.
PATHS_IN_GOLDEN_FILE_TO_IGNORE = [
r"root\[\d+\].+\['customProperties'\]\['created-at'\]",
r"root\[\d+\].+\['com.linkedin.pegasus2avro.dataset.DatasetProperties'\]\['created'\]",
r"root\[\d+\].+\['com.linkedin.pegasus2avro.dataset.DatasetProperties'\]\['lastModified'\]",
r"root\[\d+\].+\['customProperties'\]\['snapshot-id'\]",
r"root\[\d+\].+\['customProperties'\]\['manifest-list'\]",
]
@pytest.fixture(autouse=True, scope="module")
def remove_docker_image():
yield
# The tabulario/spark-iceberg image is pretty large, so we remove it after the test.
cleanup_image("tabulario/spark-iceberg")
def spark_submit(file_path: str, args: str = "") -> None:
docker = "docker"
command = f"{docker} exec spark-iceberg spark-submit {file_path} {args}"
ret = subprocess.run(command, shell=True, capture_output=True)
assert ret.returncode == 0
@freeze_time(FROZEN_TIME)
def test_multiprocessing_iceberg_ingest(
docker_compose_runner, pytestconfig, tmp_path, mock_time
):
test_resources_dir = pytestconfig.rootpath / "tests/integration/iceberg/"
with docker_compose_runner(
test_resources_dir / "docker-compose.yml", "iceberg"
) as docker_services:
wait_for_port(docker_services, "spark-iceberg", 8888, timeout=120)
# Run the create.py pyspark file to populate the table.
spark_submit("/home/iceberg/setup/create.py", "nyc.taxis")
# Run the metadata ingestion pipeline.
config_file = (
test_resources_dir / "iceberg_multiprocessing_to_file.yml"
).resolve()
run_datahub_cmd(
["ingest", "--strict-warnings", "-c", f"{config_file}"], tmp_path=tmp_path
)
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
ignore_paths=PATHS_IN_GOLDEN_FILE_TO_IGNORE,
output_path=tmp_path / "iceberg_mcps.json",
golden_path=test_resources_dir / "iceberg_ingest_mcps_golden.json",
)
@freeze_time(FROZEN_TIME)
def test_iceberg_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time):
test_resources_dir = pytestconfig.rootpath / "tests/integration/iceberg/"
with docker_compose_runner(
test_resources_dir / "docker-compose.yml", "iceberg"
) as docker_services:
wait_for_port(docker_services, "spark-iceberg", 8888, timeout=120)
# Run the create.py pyspark file to populate the table.
spark_submit("/home/iceberg/setup/create.py", "nyc.taxis")
# Run the metadata ingestion pipeline.
config_file = (test_resources_dir / "iceberg_to_file.yml").resolve()
run_datahub_cmd(
["ingest", "--strict-warnings", "-c", f"{config_file}"], tmp_path=tmp_path
)
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
ignore_paths=PATHS_IN_GOLDEN_FILE_TO_IGNORE,
output_path=tmp_path / "iceberg_mcps.json",
golden_path=test_resources_dir / "iceberg_ingest_mcps_golden.json",
)
@freeze_time(FROZEN_TIME)
def test_iceberg_stateful_ingest(
docker_compose_runner, pytestconfig, tmp_path, mock_time, mock_datahub_graph
):
test_resources_dir = pytestconfig.rootpath / "tests/integration/iceberg"
platform_instance = "test_platform_instance"
pipeline_config_dict: Dict[str, Any] = {
"source": {
"type": "iceberg",
"config": {
"catalog": {
"default": {
"type": "rest",
"uri": "http://localhost:8181",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
"s3.region": "us-east-1",
"warehouse": "s3a://warehouse/wh/",
"s3.endpoint": "http://localhost:9000",
},
},
"user_ownership_property": "owner",
"group_ownership_property": "owner",
"platform_instance": f"{platform_instance}",
# enable stateful ingestion
"stateful_ingestion": {
"enabled": True,
"remove_stale_metadata": True,
"fail_safe_threshold": 100.0,
"state_provider": {
"type": "datahub",
"config": {"datahub_api": {"server": GMS_SERVER}},
},
},
},
},
"sink": {
# we are not really interested in the resulting events for this test
"type": "console"
},
"pipeline_name": "test_pipeline",
}
with docker_compose_runner(
test_resources_dir / "docker-compose.yml", "iceberg"
) as docker_services, patch(
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
mock_datahub_graph,
) as mock_checkpoint:
wait_for_port(docker_services, "spark-iceberg", 8888, timeout=120)
# Run the create.py pyspark file to populate two tables.
spark_submit("/home/iceberg/setup/create.py", "nyc.taxis")
spark_submit("/home/iceberg/setup/create.py", "nyc.another_taxis")
# Both checkpoint and reporting will use the same mocked graph instance.
mock_checkpoint.return_value = mock_datahub_graph
# Do the first run of the pipeline and get the default job's checkpoint.
pipeline_run1 = run_and_get_pipeline(pipeline_config_dict)
checkpoint1 = get_current_checkpoint_from_pipeline(pipeline_run1)
assert checkpoint1
assert checkpoint1.state
# Capture MCPs of second run to validate Status(removed=true)
deleted_mcps_path = f"{tmp_path}/iceberg_deleted_mcps.json"
pipeline_config_dict["sink"]["type"] = "file"
pipeline_config_dict["sink"]["config"] = {"filename": deleted_mcps_path}
# Run the delete.py pyspark file to delete the table.
spark_submit("/home/iceberg/setup/delete.py")
# Do the second run of the pipeline.
pipeline_run2 = run_and_get_pipeline(pipeline_config_dict)
checkpoint2 = get_current_checkpoint_from_pipeline(pipeline_run2)
assert checkpoint2
assert checkpoint2.state
# Perform all assertions on the states. The deleted table should not be
# part of the second state
state1 = checkpoint1.state
state2 = checkpoint2.state
difference_urns = list(
state1.get_urns_not_in(type="dataset", other_checkpoint_state=state2)
)
assert len(difference_urns) == 1
urn1 = "urn:li:dataset:(urn:li:dataPlatform:iceberg,test_platform_instance.nyc.taxis,PROD)"
assert urn1 in difference_urns
# Validate that all providers have committed successfully.
validate_all_providers_have_committed_successfully(
pipeline=pipeline_run1, expected_providers=1
)
validate_all_providers_have_committed_successfully(
pipeline=pipeline_run2, expected_providers=1
)
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
ignore_paths=PATHS_IN_GOLDEN_FILE_TO_IGNORE,
output_path=deleted_mcps_path,
golden_path=test_resources_dir / "iceberg_deleted_table_mcps_golden.json",
)
@freeze_time(FROZEN_TIME)
def test_iceberg_profiling(docker_compose_runner, pytestconfig, tmp_path, mock_time):
test_resources_dir = pytestconfig.rootpath / "tests/integration/iceberg/"
with docker_compose_runner(
test_resources_dir / "docker-compose.yml", "iceberg"
) as docker_services:
wait_for_port(docker_services, "spark-iceberg", 8888, timeout=120)
# Run the create.py pyspark file to populate the table.
spark_submit("/home/iceberg/setup/create.py", "nyc.taxis")
# Run the metadata ingestion pipeline.
config_file = (test_resources_dir / "iceberg_profile_to_file.yml").resolve()
run_datahub_cmd(
["ingest", "--strict-warnings", "-c", f"{config_file}"], tmp_path=tmp_path
)
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
ignore_paths=PATHS_IN_GOLDEN_FILE_TO_IGNORE,
output_path=tmp_path / "iceberg_mcps.json",
golden_path=test_resources_dir / "iceberg_profile_mcps_golden.json",
)