mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-08 01:31:55 +00:00
240 lines
9.1 KiB
Python
240 lines
9.1 KiB
Python
import subprocess
|
|
from typing import Any, Dict
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
from freezegun import freeze_time
|
|
|
|
from tests.test_helpers import mce_helpers
|
|
from tests.test_helpers.click_helpers import run_datahub_cmd
|
|
from tests.test_helpers.docker_helpers import cleanup_image, wait_for_port
|
|
from tests.test_helpers.state_helpers import (
|
|
get_current_checkpoint_from_pipeline,
|
|
run_and_get_pipeline,
|
|
validate_all_providers_have_committed_successfully,
|
|
)
|
|
|
|
pytestmark = pytest.mark.integration_batch_1
|
|
FROZEN_TIME = "2020-04-14 07:00:00"
|
|
GMS_PORT = 8080
|
|
GMS_SERVER = f"http://localhost:{GMS_PORT}"
|
|
# These paths change from one instance run of the clickhouse docker to the other, and the FROZEN_TIME does not apply to
|
|
# these.
|
|
PATHS_IN_GOLDEN_FILE_TO_IGNORE = [
|
|
r"root\[\d+\].+\['customProperties'\]\['created-at'\]",
|
|
r"root\[\d+\].+\['com.linkedin.pegasus2avro.dataset.DatasetProperties'\]\['created'\]",
|
|
r"root\[\d+\].+\['com.linkedin.pegasus2avro.dataset.DatasetProperties'\]\['lastModified'\]",
|
|
r"root\[\d+\].+\['customProperties'\]\['snapshot-id'\]",
|
|
r"root\[\d+\].+\['customProperties'\]\['manifest-list'\]",
|
|
]
|
|
|
|
|
|
@pytest.fixture(autouse=True, scope="module")
|
|
def remove_docker_image():
|
|
yield
|
|
|
|
# The tabulario/spark-iceberg image is pretty large, so we remove it after the test.
|
|
cleanup_image("tabulario/spark-iceberg")
|
|
|
|
|
|
def spark_submit(file_path: str, args: str = "") -> None:
|
|
docker = "docker"
|
|
command = f"{docker} exec spark-iceberg spark-submit {file_path} {args}"
|
|
ret = subprocess.run(command, shell=True, capture_output=True)
|
|
assert ret.returncode == 0
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_multiprocessing_iceberg_ingest(
|
|
docker_compose_runner, pytestconfig, tmp_path, mock_time
|
|
):
|
|
test_resources_dir = pytestconfig.rootpath / "tests/integration/iceberg/"
|
|
|
|
with docker_compose_runner(
|
|
test_resources_dir / "docker-compose.yml", "iceberg"
|
|
) as docker_services:
|
|
wait_for_port(docker_services, "spark-iceberg", 8888, timeout=120)
|
|
|
|
# Run the create.py pyspark file to populate the table.
|
|
spark_submit("/home/iceberg/setup/create.py", "nyc.taxis")
|
|
|
|
# Run the metadata ingestion pipeline.
|
|
config_file = (
|
|
test_resources_dir / "iceberg_multiprocessing_to_file.yml"
|
|
).resolve()
|
|
run_datahub_cmd(
|
|
["ingest", "--strict-warnings", "-c", f"{config_file}"], tmp_path=tmp_path
|
|
)
|
|
# Verify the output.
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
ignore_paths=PATHS_IN_GOLDEN_FILE_TO_IGNORE,
|
|
output_path=tmp_path / "iceberg_mcps.json",
|
|
golden_path=test_resources_dir / "iceberg_ingest_mcps_golden.json",
|
|
)
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_iceberg_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time):
|
|
test_resources_dir = pytestconfig.rootpath / "tests/integration/iceberg/"
|
|
|
|
with docker_compose_runner(
|
|
test_resources_dir / "docker-compose.yml", "iceberg"
|
|
) as docker_services:
|
|
wait_for_port(docker_services, "spark-iceberg", 8888, timeout=120)
|
|
|
|
# Run the create.py pyspark file to populate the table.
|
|
spark_submit("/home/iceberg/setup/create.py", "nyc.taxis")
|
|
|
|
# Run the metadata ingestion pipeline.
|
|
config_file = (test_resources_dir / "iceberg_to_file.yml").resolve()
|
|
run_datahub_cmd(
|
|
["ingest", "--strict-warnings", "-c", f"{config_file}"], tmp_path=tmp_path
|
|
)
|
|
# Verify the output.
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
ignore_paths=PATHS_IN_GOLDEN_FILE_TO_IGNORE,
|
|
output_path=tmp_path / "iceberg_mcps.json",
|
|
golden_path=test_resources_dir / "iceberg_ingest_mcps_golden.json",
|
|
)
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_iceberg_stateful_ingest(
|
|
docker_compose_runner, pytestconfig, tmp_path, mock_time, mock_datahub_graph
|
|
):
|
|
test_resources_dir = pytestconfig.rootpath / "tests/integration/iceberg"
|
|
platform_instance = "test_platform_instance"
|
|
|
|
pipeline_config_dict: Dict[str, Any] = {
|
|
"source": {
|
|
"type": "iceberg",
|
|
"config": {
|
|
"catalog": {
|
|
"default": {
|
|
"type": "rest",
|
|
"uri": "http://localhost:8181",
|
|
"s3.access-key-id": "admin",
|
|
"s3.secret-access-key": "password",
|
|
"s3.region": "us-east-1",
|
|
"warehouse": "s3a://warehouse/wh/",
|
|
"s3.endpoint": "http://localhost:9000",
|
|
},
|
|
},
|
|
"user_ownership_property": "owner",
|
|
"group_ownership_property": "owner",
|
|
"platform_instance": f"{platform_instance}",
|
|
# enable stateful ingestion
|
|
"stateful_ingestion": {
|
|
"enabled": True,
|
|
"remove_stale_metadata": True,
|
|
"fail_safe_threshold": 100.0,
|
|
"state_provider": {
|
|
"type": "datahub",
|
|
"config": {"datahub_api": {"server": GMS_SERVER}},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
"sink": {
|
|
# we are not really interested in the resulting events for this test
|
|
"type": "console"
|
|
},
|
|
"pipeline_name": "test_pipeline",
|
|
}
|
|
|
|
with docker_compose_runner(
|
|
test_resources_dir / "docker-compose.yml", "iceberg"
|
|
) as docker_services, patch(
|
|
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
|
|
mock_datahub_graph,
|
|
) as mock_checkpoint:
|
|
wait_for_port(docker_services, "spark-iceberg", 8888, timeout=120)
|
|
|
|
# Run the create.py pyspark file to populate two tables.
|
|
spark_submit("/home/iceberg/setup/create.py", "nyc.taxis")
|
|
spark_submit("/home/iceberg/setup/create.py", "nyc.another_taxis")
|
|
|
|
# Both checkpoint and reporting will use the same mocked graph instance.
|
|
mock_checkpoint.return_value = mock_datahub_graph
|
|
|
|
# Do the first run of the pipeline and get the default job's checkpoint.
|
|
pipeline_run1 = run_and_get_pipeline(pipeline_config_dict)
|
|
checkpoint1 = get_current_checkpoint_from_pipeline(pipeline_run1)
|
|
|
|
assert checkpoint1
|
|
assert checkpoint1.state
|
|
|
|
# Capture MCPs of second run to validate Status(removed=true)
|
|
deleted_mcps_path = f"{tmp_path}/iceberg_deleted_mcps.json"
|
|
pipeline_config_dict["sink"]["type"] = "file"
|
|
pipeline_config_dict["sink"]["config"] = {"filename": deleted_mcps_path}
|
|
|
|
# Run the delete.py pyspark file to delete the table.
|
|
spark_submit("/home/iceberg/setup/delete.py")
|
|
|
|
# Do the second run of the pipeline.
|
|
pipeline_run2 = run_and_get_pipeline(pipeline_config_dict)
|
|
checkpoint2 = get_current_checkpoint_from_pipeline(pipeline_run2)
|
|
|
|
assert checkpoint2
|
|
assert checkpoint2.state
|
|
|
|
# Perform all assertions on the states. The deleted table should not be
|
|
# part of the second state
|
|
state1 = checkpoint1.state
|
|
state2 = checkpoint2.state
|
|
difference_urns = list(
|
|
state1.get_urns_not_in(type="dataset", other_checkpoint_state=state2)
|
|
)
|
|
|
|
assert len(difference_urns) == 1
|
|
|
|
urn1 = "urn:li:dataset:(urn:li:dataPlatform:iceberg,test_platform_instance.nyc.taxis,PROD)"
|
|
|
|
assert urn1 in difference_urns
|
|
|
|
# Validate that all providers have committed successfully.
|
|
validate_all_providers_have_committed_successfully(
|
|
pipeline=pipeline_run1, expected_providers=1
|
|
)
|
|
validate_all_providers_have_committed_successfully(
|
|
pipeline=pipeline_run2, expected_providers=1
|
|
)
|
|
|
|
# Verify the output.
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
ignore_paths=PATHS_IN_GOLDEN_FILE_TO_IGNORE,
|
|
output_path=deleted_mcps_path,
|
|
golden_path=test_resources_dir / "iceberg_deleted_table_mcps_golden.json",
|
|
)
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_iceberg_profiling(docker_compose_runner, pytestconfig, tmp_path, mock_time):
|
|
test_resources_dir = pytestconfig.rootpath / "tests/integration/iceberg/"
|
|
|
|
with docker_compose_runner(
|
|
test_resources_dir / "docker-compose.yml", "iceberg"
|
|
) as docker_services:
|
|
wait_for_port(docker_services, "spark-iceberg", 8888, timeout=120)
|
|
|
|
# Run the create.py pyspark file to populate the table.
|
|
spark_submit("/home/iceberg/setup/create.py", "nyc.taxis")
|
|
|
|
# Run the metadata ingestion pipeline.
|
|
config_file = (test_resources_dir / "iceberg_profile_to_file.yml").resolve()
|
|
run_datahub_cmd(
|
|
["ingest", "--strict-warnings", "-c", f"{config_file}"], tmp_path=tmp_path
|
|
)
|
|
|
|
# Verify the output.
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
ignore_paths=PATHS_IN_GOLDEN_FILE_TO_IGNORE,
|
|
output_path=tmp_path / "iceberg_mcps.json",
|
|
golden_path=test_resources_dir / "iceberg_profile_mcps_golden.json",
|
|
)
|