160 lines
5.4 KiB
Python

import re
import subprocess
import pytest
from freezegun import freeze_time
from datahub.ingestion.run.pipeline import Pipeline
from tests.test_helpers import mce_helpers
from tests.test_helpers.docker_helpers import wait_for_port
FROZEN_TIME = "2020-04-14 07:00:00"
data_platform = "hive"
pytestmark = pytest.mark.integration_batch_1
@pytest.fixture(scope="module")
def hive_runner(docker_compose_runner, pytestconfig):
test_resources_dir = pytestconfig.rootpath / "tests/integration/hive"
with docker_compose_runner(
test_resources_dir / "docker-compose.yml", "hive"
) as docker_services:
wait_for_port(docker_services, "testhiveserver2", 10000, timeout=120)
yield docker_services
@pytest.fixture(scope="module")
def test_resources_dir(pytestconfig):
return pytestconfig.rootpath / "tests/integration/hive"
@pytest.fixture(scope="module")
def loaded_hive(hive_runner):
# Set up the container.
command = "docker exec testhiveserver2 /opt/hive/bin/beeline -u jdbc:hive2://localhost:10000 -f /hive_setup.sql"
subprocess.run(command, shell=True, check=True)
def base_pipeline_config(events_file, db=None):
return {
"run_id": "hive-test",
"source": {
"type": data_platform,
"config": {
"scheme": "hive",
"database": db,
"host_port": "localhost:10000",
},
},
"sink": {
"type": "file",
"config": {"filename": str(events_file)},
},
}
@freeze_time(FROZEN_TIME)
def test_hive_ingest(
loaded_hive, pytestconfig, test_resources_dir, tmp_path, mock_time
):
mce_out_file = "test_hive_ingest.json"
events_file = tmp_path / mce_out_file
# Run the metadata ingestion pipeline.
pipeline = Pipeline.create(base_pipeline_config(events_file, "db1"))
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=events_file,
golden_path=test_resources_dir / "hive_mces_golden.json",
ignore_paths=[
r"root\[\d+\]\['proposedSnapshot'\]\['com\.linkedin\.pegasus2avro\.metadata\.snapshot\.DatasetSnapshot'\]\['aspects'\]\[\d+\]\['com\.linkedin\.pegasus2avro\.dataset\.DatasetProperties'\]\['customProperties'\]\['.*Time.*'\]",
r"root\[6\]\['proposedSnapshot'\]\['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot'\]\['aspects'\]\[\d+\]\['com.linkedin.pegasus2avro.schema.SchemaMetadata'\]\['fields'\]\[\d+\]\['nativeDataType'\]",
],
)
# Limitation - native data types for union does not show up as expected
@freeze_time(FROZEN_TIME)
@pytest.mark.integration_batch_1
def test_hive_ingest_all_db(
loaded_hive, pytestconfig, test_resources_dir, tmp_path, mock_time
):
mce_out_file = "test_hive_ingest.json"
events_file = tmp_path / mce_out_file
# Run the metadata ingestion pipeline.
pipeline = Pipeline.create(base_pipeline_config(events_file))
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=events_file,
golden_path=test_resources_dir / "hive_mces_all_db_golden.json",
ignore_paths=[
r"root\[\d+\]\['proposedSnapshot'\]\['com\.linkedin\.pegasus2avro\.metadata\.snapshot\.DatasetSnapshot'\]\['aspects'\]\[\d+\]\['com\.linkedin\.pegasus2avro\.dataset\.DatasetProperties'\]\['customProperties'\]\['.*Time.*'\]",
r"root\[6\]\['proposedSnapshot'\]\['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot'\]\['aspects'\]\[\d+\]\['com.linkedin.pegasus2avro.schema.SchemaMetadata'\]\['fields'\]\[\d+\]\['nativeDataType'\]",
],
)
# Limitation - native data types for union does not show up as expected
@freeze_time(FROZEN_TIME)
def test_hive_instance_check(loaded_hive, test_resources_dir, tmp_path, pytestconfig):
instance: str = "production_warehouse"
# Run the metadata ingestion pipeline.
mce_out_file = "test_hive_instance.json"
events_file = tmp_path / mce_out_file
pipeline_config = base_pipeline_config(events_file, "db1")
pipeline_config["source"]["config"]["platform_instance"] = instance
pipeline = Pipeline.create(pipeline_config)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
# Assert that all events generated have instance specific urns
urn_pattern = "^" + re.escape(
f"urn:li:dataset:(urn:li:dataPlatform:{data_platform},{instance}."
)
mce_helpers.assert_mce_entity_urn(
"ALL",
entity_type="dataset",
regex_pattern=urn_pattern,
file=events_file,
)
mce_helpers.assert_mcp_entity_urn(
"ALL",
entity_type="dataset",
regex_pattern=urn_pattern,
file=events_file,
)
# all dataset entities emitted must have a dataPlatformInstance aspect emitted
# there must be at least one entity emitted
assert (
mce_helpers.assert_for_each_entity(
entity_type="dataset",
aspect_name="dataPlatformInstance",
aspect_field_matcher={
"instance": f"urn:li:dataPlatformInstance:(urn:li:dataPlatform:{data_platform},{instance})"
},
file=events_file,
)
>= 1
)