224 lines
8.3 KiB
Python

import re
import subprocess
from typing import Dict, Sequence
import pytest
import requests
from freezegun import freeze_time
from datahub.ingestion.run.pipeline import Pipeline
from datahub.testing import mce_helpers
from tests.test_helpers import fs_helpers
from tests.test_helpers.docker_helpers import wait_for_port
pytestmark = pytest.mark.integration_batch_1
FROZEN_TIME = "2021-09-23 12:00:00"
data_platform = "hive-metastore"
@pytest.fixture(scope="module")
def hive_metastore_runner(docker_compose_runner, pytestconfig):
test_resources_dir = pytestconfig.rootpath / "tests/integration/hive-metastore"
with docker_compose_runner(
test_resources_dir / "docker-compose.yml", "hive-metastore"
) as docker_services:
wait_for_port(docker_services, "presto", 8080)
wait_for_port(docker_services, "hiveserver2", 10000, timeout=120)
docker_services.wait_until_responsive(
timeout=30,
pause=1,
check=lambda: requests.get("http://localhost:5300/v1/info").json()[
"starting"
]
is False,
)
yield docker_services
@pytest.fixture(scope="module")
def test_resources_dir(pytestconfig):
return pytestconfig.rootpath / "tests/integration/hive-metastore"
@pytest.fixture(scope="module")
def loaded_hive_metastore(hive_metastore_runner):
# Set up the container.
command = "docker exec hiveserver2 /opt/hive/bin/beeline -u jdbc:hive2://localhost:10000 -f /hive_setup.sql"
subprocess.run(command, shell=True, check=True)
# Create presto view so we can test
command = "docker exec presto-cli /usr/bin/presto --server presto:8080 --catalog hivedb --execute 'CREATE VIEW db1.array_struct_test_presto_view as select * from db1.array_struct_test'"
subprocess.run(command, shell=True, check=True)
@freeze_time(FROZEN_TIME)
@pytest.mark.parametrize(
"mode,use_catalog_subtype,use_dataset_pascalcase_subtype,include_catalog_name_in_ids,simplify_nested_field_paths,"
"test_suffix",
[
("hive", False, False, False, False, "_1"),
("presto-on-hive", True, True, False, False, "_2"),
("hive", False, False, True, False, "_3"),
("presto-on-hive", True, True, True, False, "_4"),
("hive", False, False, False, True, "_5"),
],
)
def test_hive_metastore_ingest(
loaded_hive_metastore,
test_resources_dir,
pytestconfig,
tmp_path,
mock_time,
mode,
use_catalog_subtype,
use_dataset_pascalcase_subtype,
include_catalog_name_in_ids,
simplify_nested_field_paths,
test_suffix,
):
# Run the metadata ingestion pipeline.
with fs_helpers.isolated_filesystem(tmp_path):
# Run the metadata ingestion pipeline for presto catalog referring to postgres database
mce_out_file = f"hive_metastore_mces{test_suffix}.json"
events_file = tmp_path / mce_out_file
pipeline_config: Dict = {
"run_id": "hive-metastore-test",
"source": {
"type": data_platform,
"config": {
"host_port": "localhost:5432",
"metastore_db_name": "metastore",
"database_pattern": {"allow": ["db1"]},
"username": "postgres",
"scheme": "postgresql+psycopg2",
"include_views": True,
"include_tables": True,
"include_catalog_name_in_ids": include_catalog_name_in_ids,
"schema_pattern": {"allow": ["^public"]},
"mode": mode,
"use_catalog_subtype": use_catalog_subtype,
"use_dataset_pascalcase_subtype": use_dataset_pascalcase_subtype,
"simplify_nested_field_paths": simplify_nested_field_paths,
},
},
"sink": {
"type": "file",
"config": {
"filename": str(events_file),
},
},
}
# Run the metadata ingestion pipeline.
pipeline = Pipeline.create(pipeline_config)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
# Run the metadata ingestion pipeline for presto catalog referring to hive database
# config_file = (test_resources_dir / "presto_on_hive_to_file.yml").resolve()
# run_datahub_cmd(["ingest", "-c", f"{config_file}"])
ignore_paths: Sequence[str] = [
r"root\[\d+\]\['proposedSnapshot'\]\['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot'\]\['aspects'\]\[\d+\]\['com.linkedin.pegasus2avro.dataset.DatasetProperties'\]\['customProperties'\]\['transient_lastDdlTime'\]",
r"root\[\d+\]\['proposedSnapshot'\]\['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot'\]\['aspects'\]\[\d+\]\['com.linkedin.pegasus2avro.dataset.DatasetProperties'\]\['customProperties'\]\['numfiles'\]",
r"root\[\d+\]\['proposedSnapshot'\]\['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot'\]\['aspects'\]\[\d+\]\['com.linkedin.pegasus2avro.dataset.DatasetProperties'\]\['customProperties'\]\['totalsize'\]",
r"root\[\d+\]\['proposedSnapshot'\]\['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot'\]\['aspects'\]\[\d+\]\['com.linkedin.pegasus2avro.dataset.DatasetProperties'\]\['customProperties'\]\['create_date'\]",
]
ignore_paths_v2: Sequence[str] = [
"/customProperties/create_date",
"/customProperties/transient_lastDdlTime",
"/customProperties/numfiles",
"/customProperties/totalsize",
]
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=f"hive_metastore_mces{test_suffix}.json",
golden_path=test_resources_dir
/ f"hive_metastore_mces_golden{test_suffix}.json",
ignore_paths=ignore_paths,
ignore_paths_v2=ignore_paths_v2,
)
@freeze_time(FROZEN_TIME)
def test_hive_metastore_instance_ingest(
loaded_hive_metastore, test_resources_dir, pytestconfig, tmp_path, mock_time
):
instance = "production_warehouse"
platform = "hive"
mce_out_file = "hive_metastore_instance_mces.json"
events_file = tmp_path / mce_out_file
pipeline_config: Dict = {
"run_id": "hive-metastore-test-2",
"source": {
"type": data_platform,
"config": {
"host_port": "localhost:5432",
"database": "metastore",
"username": "postgres",
"scheme": "postgresql+psycopg2",
"include_views": True,
"include_tables": True,
"schema_pattern": {"allow": ["^public"]},
"platform_instance": "production_warehouse",
},
},
"sink": {
"type": "file",
"config": {
"filename": str(events_file),
},
},
}
# Run the metadata ingestion pipeline.
pipeline = Pipeline.create(pipeline_config)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
# Assert that all events generated have instance specific urns
urn_pattern = "^" + re.escape(
f"urn:li:dataset:(urn:li:dataPlatform:{platform},{instance}."
)
assert (
mce_helpers.assert_mce_entity_urn(
"ALL",
entity_type="dataset",
regex_pattern=urn_pattern,
file=events_file,
)
>= 0
), "There should be at least one match"
assert (
mce_helpers.assert_mcp_entity_urn(
"ALL",
entity_type="dataset",
regex_pattern=urn_pattern,
file=events_file,
)
>= 0
), "There should be at least one MCP"
# all dataset entities emitted must have a dataPlatformInstance aspect emitted
# there must be at least one entity emitted
assert (
mce_helpers.assert_for_each_entity(
entity_type="dataset",
aspect_name="dataPlatformInstance",
aspect_field_matcher={
"instance": f"urn:li:dataPlatformInstance:(urn:li:dataPlatform:{platform},{instance})"
},
file=events_file,
)
>= 1
)