mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-13 12:10:23 +00:00
224 lines
8.3 KiB
Python
224 lines
8.3 KiB
Python
import re
|
|
import subprocess
|
|
from typing import Dict, Sequence
|
|
|
|
import pytest
|
|
import requests
|
|
from freezegun import freeze_time
|
|
|
|
from datahub.ingestion.run.pipeline import Pipeline
|
|
from datahub.testing import mce_helpers
|
|
from tests.test_helpers import fs_helpers
|
|
from tests.test_helpers.docker_helpers import wait_for_port
|
|
|
|
pytestmark = pytest.mark.integration_batch_1
|
|
FROZEN_TIME = "2021-09-23 12:00:00"
|
|
|
|
data_platform = "hive-metastore"
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def hive_metastore_runner(docker_compose_runner, pytestconfig):
|
|
test_resources_dir = pytestconfig.rootpath / "tests/integration/hive-metastore"
|
|
with docker_compose_runner(
|
|
test_resources_dir / "docker-compose.yml", "hive-metastore"
|
|
) as docker_services:
|
|
wait_for_port(docker_services, "presto", 8080)
|
|
wait_for_port(docker_services, "hiveserver2", 10000, timeout=120)
|
|
docker_services.wait_until_responsive(
|
|
timeout=30,
|
|
pause=1,
|
|
check=lambda: requests.get("http://localhost:5300/v1/info").json()[
|
|
"starting"
|
|
]
|
|
is False,
|
|
)
|
|
|
|
yield docker_services
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def test_resources_dir(pytestconfig):
|
|
return pytestconfig.rootpath / "tests/integration/hive-metastore"
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def loaded_hive_metastore(hive_metastore_runner):
|
|
# Set up the container.
|
|
command = "docker exec hiveserver2 /opt/hive/bin/beeline -u jdbc:hive2://localhost:10000 -f /hive_setup.sql"
|
|
subprocess.run(command, shell=True, check=True)
|
|
# Create presto view so we can test
|
|
command = "docker exec presto-cli /usr/bin/presto --server presto:8080 --catalog hivedb --execute 'CREATE VIEW db1.array_struct_test_presto_view as select * from db1.array_struct_test'"
|
|
subprocess.run(command, shell=True, check=True)
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
@pytest.mark.parametrize(
|
|
"mode,use_catalog_subtype,use_dataset_pascalcase_subtype,include_catalog_name_in_ids,simplify_nested_field_paths,"
|
|
"test_suffix",
|
|
[
|
|
("hive", False, False, False, False, "_1"),
|
|
("presto-on-hive", True, True, False, False, "_2"),
|
|
("hive", False, False, True, False, "_3"),
|
|
("presto-on-hive", True, True, True, False, "_4"),
|
|
("hive", False, False, False, True, "_5"),
|
|
],
|
|
)
|
|
def test_hive_metastore_ingest(
|
|
loaded_hive_metastore,
|
|
test_resources_dir,
|
|
pytestconfig,
|
|
tmp_path,
|
|
mock_time,
|
|
mode,
|
|
use_catalog_subtype,
|
|
use_dataset_pascalcase_subtype,
|
|
include_catalog_name_in_ids,
|
|
simplify_nested_field_paths,
|
|
test_suffix,
|
|
):
|
|
# Run the metadata ingestion pipeline.
|
|
with fs_helpers.isolated_filesystem(tmp_path):
|
|
# Run the metadata ingestion pipeline for presto catalog referring to postgres database
|
|
mce_out_file = f"hive_metastore_mces{test_suffix}.json"
|
|
events_file = tmp_path / mce_out_file
|
|
|
|
pipeline_config: Dict = {
|
|
"run_id": "hive-metastore-test",
|
|
"source": {
|
|
"type": data_platform,
|
|
"config": {
|
|
"host_port": "localhost:5432",
|
|
"metastore_db_name": "metastore",
|
|
"database_pattern": {"allow": ["db1"]},
|
|
"username": "postgres",
|
|
"scheme": "postgresql+psycopg2",
|
|
"include_views": True,
|
|
"include_tables": True,
|
|
"include_catalog_name_in_ids": include_catalog_name_in_ids,
|
|
"schema_pattern": {"allow": ["^public"]},
|
|
"mode": mode,
|
|
"use_catalog_subtype": use_catalog_subtype,
|
|
"use_dataset_pascalcase_subtype": use_dataset_pascalcase_subtype,
|
|
"simplify_nested_field_paths": simplify_nested_field_paths,
|
|
},
|
|
},
|
|
"sink": {
|
|
"type": "file",
|
|
"config": {
|
|
"filename": str(events_file),
|
|
},
|
|
},
|
|
}
|
|
|
|
# Run the metadata ingestion pipeline.
|
|
pipeline = Pipeline.create(pipeline_config)
|
|
pipeline.run()
|
|
pipeline.pretty_print_summary()
|
|
pipeline.raise_from_status(raise_warnings=True)
|
|
|
|
# Run the metadata ingestion pipeline for presto catalog referring to hive database
|
|
# config_file = (test_resources_dir / "presto_on_hive_to_file.yml").resolve()
|
|
# run_datahub_cmd(["ingest", "-c", f"{config_file}"])
|
|
|
|
ignore_paths: Sequence[str] = [
|
|
r"root\[\d+\]\['proposedSnapshot'\]\['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot'\]\['aspects'\]\[\d+\]\['com.linkedin.pegasus2avro.dataset.DatasetProperties'\]\['customProperties'\]\['transient_lastDdlTime'\]",
|
|
r"root\[\d+\]\['proposedSnapshot'\]\['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot'\]\['aspects'\]\[\d+\]\['com.linkedin.pegasus2avro.dataset.DatasetProperties'\]\['customProperties'\]\['numfiles'\]",
|
|
r"root\[\d+\]\['proposedSnapshot'\]\['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot'\]\['aspects'\]\[\d+\]\['com.linkedin.pegasus2avro.dataset.DatasetProperties'\]\['customProperties'\]\['totalsize'\]",
|
|
r"root\[\d+\]\['proposedSnapshot'\]\['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot'\]\['aspects'\]\[\d+\]\['com.linkedin.pegasus2avro.dataset.DatasetProperties'\]\['customProperties'\]\['create_date'\]",
|
|
]
|
|
|
|
ignore_paths_v2: Sequence[str] = [
|
|
"/customProperties/create_date",
|
|
"/customProperties/transient_lastDdlTime",
|
|
"/customProperties/numfiles",
|
|
"/customProperties/totalsize",
|
|
]
|
|
|
|
# Verify the output.
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
output_path=f"hive_metastore_mces{test_suffix}.json",
|
|
golden_path=test_resources_dir
|
|
/ f"hive_metastore_mces_golden{test_suffix}.json",
|
|
ignore_paths=ignore_paths,
|
|
ignore_paths_v2=ignore_paths_v2,
|
|
)
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_hive_metastore_instance_ingest(
|
|
loaded_hive_metastore, test_resources_dir, pytestconfig, tmp_path, mock_time
|
|
):
|
|
instance = "production_warehouse"
|
|
platform = "hive"
|
|
mce_out_file = "hive_metastore_instance_mces.json"
|
|
events_file = tmp_path / mce_out_file
|
|
|
|
pipeline_config: Dict = {
|
|
"run_id": "hive-metastore-test-2",
|
|
"source": {
|
|
"type": data_platform,
|
|
"config": {
|
|
"host_port": "localhost:5432",
|
|
"database": "metastore",
|
|
"username": "postgres",
|
|
"scheme": "postgresql+psycopg2",
|
|
"include_views": True,
|
|
"include_tables": True,
|
|
"schema_pattern": {"allow": ["^public"]},
|
|
"platform_instance": "production_warehouse",
|
|
},
|
|
},
|
|
"sink": {
|
|
"type": "file",
|
|
"config": {
|
|
"filename": str(events_file),
|
|
},
|
|
},
|
|
}
|
|
|
|
# Run the metadata ingestion pipeline.
|
|
pipeline = Pipeline.create(pipeline_config)
|
|
pipeline.run()
|
|
pipeline.pretty_print_summary()
|
|
pipeline.raise_from_status(raise_warnings=True)
|
|
|
|
# Assert that all events generated have instance specific urns
|
|
urn_pattern = "^" + re.escape(
|
|
f"urn:li:dataset:(urn:li:dataPlatform:{platform},{instance}."
|
|
)
|
|
assert (
|
|
mce_helpers.assert_mce_entity_urn(
|
|
"ALL",
|
|
entity_type="dataset",
|
|
regex_pattern=urn_pattern,
|
|
file=events_file,
|
|
)
|
|
>= 0
|
|
), "There should be at least one match"
|
|
|
|
assert (
|
|
mce_helpers.assert_mcp_entity_urn(
|
|
"ALL",
|
|
entity_type="dataset",
|
|
regex_pattern=urn_pattern,
|
|
file=events_file,
|
|
)
|
|
>= 0
|
|
), "There should be at least one MCP"
|
|
|
|
# all dataset entities emitted must have a dataPlatformInstance aspect emitted
|
|
# there must be at least one entity emitted
|
|
assert (
|
|
mce_helpers.assert_for_each_entity(
|
|
entity_type="dataset",
|
|
aspect_name="dataPlatformInstance",
|
|
aspect_field_matcher={
|
|
"instance": f"urn:li:dataPlatformInstance:(urn:li:dataPlatform:{platform},{instance})"
|
|
},
|
|
file=events_file,
|
|
)
|
|
>= 1
|
|
)
|