Mayuri Nehate 2de0e62ac4
feat(ingest): add classification for sql sources (#10013)
Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
2024-03-12 09:23:20 -07:00

246 lines
9.8 KiB
Python

import subprocess
import pytest
import requests
from freezegun import freeze_time
from datahub.configuration.common import AllowDenyPattern
from datahub.ingestion.glossary.classifier import (
ClassificationConfig,
DynamicTypedClassifierConfig,
)
from datahub.ingestion.glossary.datahub_classifier import DataHubClassifierConfig
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.sink.file import FileSinkConfig
from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig
from datahub.ingestion.source.sql.trino import ConnectorDetail, TrinoConfig
from tests.test_helpers import fs_helpers, mce_helpers
from tests.test_helpers.docker_helpers import wait_for_port
pytestmark = pytest.mark.integration_batch_1
FROZEN_TIME = "2021-09-23 12:00:00"
data_platform = "trino"
@pytest.fixture(scope="module")
def trino_runner(docker_compose_runner, pytestconfig):
test_resources_dir = pytestconfig.rootpath / "tests/integration/trino"
with docker_compose_runner(
test_resources_dir / "docker-compose.yml", "trino"
) as docker_services:
wait_for_port(docker_services, "testtrino", 8080)
wait_for_port(docker_services, "testhiveserver2", 10000, timeout=120)
docker_services.wait_until_responsive(
timeout=30,
pause=1,
check=lambda: requests.get("http://localhost:5300/v1/info").json()[
"starting"
]
is False,
)
yield docker_services
@pytest.fixture(scope="module")
def test_resources_dir(pytestconfig):
return pytestconfig.rootpath / "tests/integration/trino"
@pytest.fixture(scope="module")
def loaded_trino(trino_runner):
# Set up the container.
command = "docker exec testhiveserver2 /opt/hive/bin/beeline -u jdbc:hive2://localhost:10000 -f /hive_setup.sql"
subprocess.run(command, shell=True, check=True)
@freeze_time(FROZEN_TIME)
def test_trino_ingest(
loaded_trino, test_resources_dir, pytestconfig, tmp_path, mock_time
):
# Run the metadata ingestion pipeline.
with fs_helpers.isolated_filesystem(tmp_path):
# Run the metadata ingestion pipeline for trino catalog referring to postgres database
mce_out_file = "trino_mces.json"
events_file = tmp_path / mce_out_file
pipeline_config = {
"run_id": "trino-test",
"source": {
"type": data_platform,
"config": TrinoConfig(
host_port="localhost:5300",
database="postgresqldb",
username="foo",
schema_pattern=AllowDenyPattern(allow=["^librarydb"]),
profile_pattern=AllowDenyPattern(
allow=["postgresqldb.librarydb.*"]
),
profiling=GEProfilingConfig(
enabled=True,
include_field_null_count=True,
include_field_distinct_count=True,
include_field_min_value=True,
include_field_max_value=True,
include_field_mean_value=True,
include_field_median_value=True,
include_field_stddev_value=True,
include_field_quantiles=True,
include_field_distinct_value_frequencies=True,
include_field_histogram=True,
include_field_sample_values=True,
),
classification=ClassificationConfig(
enabled=True,
classifiers=[
DynamicTypedClassifierConfig(
type="datahub",
config=DataHubClassifierConfig(
minimum_values_threshold=1,
),
)
],
max_workers=1,
),
catalog_to_connector_details={
"postgresqldb": ConnectorDetail(
connector_database="postgres",
platform_instance="local_server",
)
},
).dict(),
},
"sink": {
"type": "file",
"config": FileSinkConfig(filename=str(events_file)).dict(),
},
}
# Run the metadata ingestion pipeline.
pipeline = Pipeline.create(pipeline_config)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path="trino_mces.json",
golden_path=test_resources_dir / "trino_mces_golden.json",
)
@freeze_time(FROZEN_TIME)
def test_trino_hive_ingest(
loaded_trino, test_resources_dir, pytestconfig, tmp_path, mock_time
):
# Run the metadata ingestion pipeline for trino catalog referring to postgres database
mce_out_file = "trino_hive_mces.json"
events_file = tmp_path / mce_out_file
pipeline_config = {
"run_id": "trino-hive-test",
"source": {
"type": data_platform,
"config": TrinoConfig(
host_port="localhost:5300",
database="hivedb",
username="foo",
schema_pattern=AllowDenyPattern(allow=["^db1"]),
classification=ClassificationConfig(
enabled=True,
classifiers=[
DynamicTypedClassifierConfig(
type="datahub",
config=DataHubClassifierConfig(
minimum_values_threshold=1,
),
)
],
max_workers=1,
),
).dict(),
},
"sink": {
"type": "file",
"config": FileSinkConfig(filename=str(events_file)).dict(),
},
}
# Run the metadata ingestion pipeline.
pipeline = Pipeline.create(pipeline_config)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
# Limitation 1 - MCE contains "nullable": true for all fields in trino database, irrespective of not null constraints present in underlying postgres database.
# This is issue with trino, also reported here - https://github.com/trinodb/trino/issues/6400, Related : https://github.com/trinodb/trino/issues/4070
# Limitation 2 - Dataset properties for postgres view (view definition, etc) are not part of MCE from trino.
# Postgres views are exposed as tables in trino. This setting depends on trino connector implementation - https://trino.io/episodes/18.html
# Run the metadata ingestion pipeline for trino catalog referring to hive database
# config_file = (test_resources_dir / "trino_hive_to_file.yml").resolve()
# run_datahub_cmd(["ingest", "-c", f"{config_file}"])
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=events_file,
golden_path=test_resources_dir / "trino_hive_mces_golden.json",
ignore_paths=[
r"root\[\d+\]\['proposedSnapshot'\]\['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot'\]\['aspects'\]\[\d+\]\['com.linkedin.pegasus2avro.dataset.DatasetProperties'\]\['customProperties'\]\['transient_lastddltime'\]",
r"root\[\d+\]\['proposedSnapshot'\]\['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot'\]\['aspects'\]\[\d+\]\['com.linkedin.pegasus2avro.dataset.DatasetProperties'\]\['customProperties'\]\['numfiles'\]",
r"root\[\d+\]\['proposedSnapshot'\]\['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot'\]\['aspects'\]\[\d+\]\['com.linkedin.pegasus2avro.dataset.DatasetProperties'\]\['customProperties'\]\['totalsize'\]",
],
)
# Limitation 3 - Limited DatasetProperties available in Trino than in direct hive source - https://trino.io/docs/current/connector/hive.html#table-properties.
@freeze_time(FROZEN_TIME)
def test_trino_instance_ingest(
loaded_trino, test_resources_dir, pytestconfig, tmp_path, mock_time
):
mce_out_file = "trino_instance_mces.json"
events_file = tmp_path / mce_out_file
pipeline_config = {
"run_id": "trino-hive-instance-test",
"source": {
"type": data_platform,
"config": TrinoConfig(
host_port="localhost:5300",
database="hivedb",
username="foo",
platform_instance="production_warehouse",
schema_pattern=AllowDenyPattern(allow=["^db1"]),
catalog_to_connector_details={
"hivedb": ConnectorDetail(
connector_platform="glue",
platform_instance="local_server",
)
},
).dict(),
},
"sink": {
"type": "file",
"config": FileSinkConfig(filename=str(events_file)).dict(),
},
}
# Run the metadata ingestion pipeline.
pipeline = Pipeline.create(pipeline_config)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=events_file,
golden_path=test_resources_dir / "trino_hive_instance_mces_golden.json",
ignore_paths=[
r"root\[\d+\]\['proposedSnapshot'\]\['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot'\]\['aspects'\]\[\d+\]\['com.linkedin.pegasus2avro.dataset.DatasetProperties'\]\['customProperties'\]\['transient_lastddltime'\]",
],
)