2021-10-07 00:27:06 +05:30
import subprocess
import pytest
import requests
from freezegun import freeze_time
2022-02-02 22:52:50 -08:00
from datahub . configuration . common import AllowDenyPattern
2024-03-12 21:53:20 +05:30
from datahub . ingestion . glossary . classifier import (
ClassificationConfig ,
DynamicTypedClassifierConfig ,
)
from datahub . ingestion . glossary . datahub_classifier import DataHubClassifierConfig
2022-02-02 22:52:50 -08:00
from datahub . ingestion . run . pipeline import Pipeline
from datahub . ingestion . sink . file import FileSinkConfig
from datahub . ingestion . source . ge_profiling_config import GEProfilingConfig
2024-02-26 13:55:11 +05:30
from datahub . ingestion . source . sql . trino import ConnectorDetail , TrinoConfig
2021-10-07 00:27:06 +05:30
from tests . test_helpers import fs_helpers , mce_helpers
from tests . test_helpers . docker_helpers import wait_for_port
2023-10-05 09:31:32 +05:30
pytestmark = pytest . mark . integration_batch_1
2021-10-07 00:27:06 +05:30
FROZEN_TIME = " 2021-09-23 12:00:00 "
2022-02-02 22:52:50 -08:00
data_platform = " trino "
2021-10-07 00:27:06 +05:30
2022-02-02 22:52:50 -08:00
@pytest.fixture ( scope = " module " )
def trino_runner ( docker_compose_runner , pytestconfig ) :
test_resources_dir = pytestconfig . rootpath / " tests/integration/trino "
2021-10-07 00:27:06 +05:30
with docker_compose_runner (
test_resources_dir / " docker-compose.yml " , " trino "
) as docker_services :
wait_for_port ( docker_services , " testtrino " , 8080 )
wait_for_port ( docker_services , " testhiveserver2 " , 10000 , timeout = 120 )
docker_services . wait_until_responsive (
timeout = 30 ,
pause = 1 ,
check = lambda : requests . get ( " http://localhost:5300/v1/info " ) . json ( ) [
" starting "
]
is False ,
)
2022-02-02 22:52:50 -08:00
yield docker_services
@pytest.fixture ( scope = " module " )
def test_resources_dir ( pytestconfig ) :
return pytestconfig . rootpath / " tests/integration/trino "
@pytest.fixture ( scope = " module " )
def loaded_trino ( trino_runner ) :
# Set up the container.
command = " docker exec testhiveserver2 /opt/hive/bin/beeline -u jdbc:hive2://localhost:10000 -f /hive_setup.sql "
subprocess . run ( command , shell = True , check = True )
@freeze_time ( FROZEN_TIME )
def test_trino_ingest (
loaded_trino , test_resources_dir , pytestconfig , tmp_path , mock_time
) :
# Run the metadata ingestion pipeline.
with fs_helpers . isolated_filesystem ( tmp_path ) :
# Run the metadata ingestion pipeline for trino catalog referring to postgres database
mce_out_file = " trino_mces.json "
events_file = tmp_path / mce_out_file
pipeline_config = {
" run_id " : " trino-test " ,
" source " : {
" type " : data_platform ,
" config " : TrinoConfig (
host_port = " localhost:5300 " ,
database = " postgresqldb " ,
username = " foo " ,
schema_pattern = AllowDenyPattern ( allow = [ " ^librarydb " ] ) ,
profile_pattern = AllowDenyPattern (
2024-02-26 13:55:11 +05:30
allow = [ " postgresqldb.librarydb.* " ]
2022-02-02 22:52:50 -08:00
) ,
profiling = GEProfilingConfig (
enabled = True ,
include_field_null_count = True ,
2022-11-23 11:40:07 +05:30
include_field_distinct_count = True ,
2022-02-02 22:52:50 -08:00
include_field_min_value = True ,
include_field_max_value = True ,
include_field_mean_value = True ,
include_field_median_value = True ,
include_field_stddev_value = True ,
include_field_quantiles = True ,
include_field_distinct_value_frequencies = True ,
include_field_histogram = True ,
include_field_sample_values = True ,
) ,
2024-03-12 21:53:20 +05:30
classification = ClassificationConfig (
enabled = True ,
classifiers = [
DynamicTypedClassifierConfig (
type = " datahub " ,
config = DataHubClassifierConfig (
minimum_values_threshold = 1 ,
) ,
)
] ,
max_workers = 1 ,
) ,
2024-02-26 13:55:11 +05:30
catalog_to_connector_details = {
" postgresqldb " : ConnectorDetail(
connector_database = " postgres " ,
platform_instance = " local_server " ,
)
} ,
2022-02-02 22:52:50 -08:00
) . dict ( ) ,
} ,
" sink " : {
" type " : " file " ,
" config " : FileSinkConfig ( filename = str ( events_file ) ) . dict ( ) ,
} ,
}
2021-10-07 00:27:06 +05:30
# Run the metadata ingestion pipeline.
2022-02-02 22:52:50 -08:00
pipeline = Pipeline . create ( pipeline_config )
pipeline . run ( )
pipeline . pretty_print_summary ( )
pipeline . raise_from_status ( raise_warnings = True )
# Verify the output.
mce_helpers . check_golden_file (
pytestconfig ,
output_path = " trino_mces.json " ,
golden_path = test_resources_dir / " trino_mces_golden.json " ,
)
@freeze_time ( FROZEN_TIME )
def test_trino_hive_ingest (
loaded_trino , test_resources_dir , pytestconfig , tmp_path , mock_time
) :
# Run the metadata ingestion pipeline for trino catalog referring to postgres database
mce_out_file = " trino_hive_mces.json "
events_file = tmp_path / mce_out_file
pipeline_config = {
" run_id " : " trino-hive-test " ,
" source " : {
" type " : data_platform ,
" config " : TrinoConfig (
host_port = " localhost:5300 " ,
database = " hivedb " ,
username = " foo " ,
schema_pattern = AllowDenyPattern ( allow = [ " ^db1 " ] ) ,
2024-03-12 21:53:20 +05:30
classification = ClassificationConfig (
enabled = True ,
classifiers = [
DynamicTypedClassifierConfig (
type = " datahub " ,
config = DataHubClassifierConfig (
minimum_values_threshold = 1 ,
) ,
)
] ,
max_workers = 1 ,
) ,
2022-02-02 22:52:50 -08:00
) . dict ( ) ,
} ,
" sink " : {
" type " : " file " ,
" config " : FileSinkConfig ( filename = str ( events_file ) ) . dict ( ) ,
} ,
}
# Run the metadata ingestion pipeline.
pipeline = Pipeline . create ( pipeline_config )
pipeline . run ( )
pipeline . pretty_print_summary ( )
pipeline . raise_from_status ( raise_warnings = True )
# Limitation 1 - MCE contains "nullable": true for all fields in trino database, irrespective of not null constraints present in underlying postgres database.
# This is issue with trino, also reported here - https://github.com/trinodb/trino/issues/6400, Related : https://github.com/trinodb/trino/issues/4070
# Limitation 2 - Dataset properties for postgres view (view definition, etc) are not part of MCE from trino.
# Postgres views are exposed as tables in trino. This setting depends on trino connector implementation - https://trino.io/episodes/18.html
# Run the metadata ingestion pipeline for trino catalog referring to hive database
# config_file = (test_resources_dir / "trino_hive_to_file.yml").resolve()
# run_datahub_cmd(["ingest", "-c", f"{config_file}"])
# Verify the output.
mce_helpers . check_golden_file (
pytestconfig ,
output_path = events_file ,
golden_path = test_resources_dir / " trino_hive_mces_golden.json " ,
ignore_paths = [
r " root \ [ \ d+ \ ] \ [ ' proposedSnapshot ' \ ] \ [ ' com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot ' \ ] \ [ ' aspects ' \ ] \ [ \ d+ \ ] \ [ ' com.linkedin.pegasus2avro.dataset.DatasetProperties ' \ ] \ [ ' customProperties ' \ ] \ [ ' transient_lastddltime ' \ ] " ,
r " root \ [ \ d+ \ ] \ [ ' proposedSnapshot ' \ ] \ [ ' com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot ' \ ] \ [ ' aspects ' \ ] \ [ \ d+ \ ] \ [ ' com.linkedin.pegasus2avro.dataset.DatasetProperties ' \ ] \ [ ' customProperties ' \ ] \ [ ' numfiles ' \ ] " ,
r " root \ [ \ d+ \ ] \ [ ' proposedSnapshot ' \ ] \ [ ' com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot ' \ ] \ [ ' aspects ' \ ] \ [ \ d+ \ ] \ [ ' com.linkedin.pegasus2avro.dataset.DatasetProperties ' \ ] \ [ ' customProperties ' \ ] \ [ ' totalsize ' \ ] " ,
] ,
)
# Limitation 3 - Limited DatasetProperties available in Trino than in direct hive source - https://trino.io/docs/current/connector/hive.html#table-properties.
@freeze_time ( FROZEN_TIME )
def test_trino_instance_ingest (
loaded_trino , test_resources_dir , pytestconfig , tmp_path , mock_time
) :
mce_out_file = " trino_instance_mces.json "
events_file = tmp_path / mce_out_file
pipeline_config = {
" run_id " : " trino-hive-instance-test " ,
" source " : {
" type " : data_platform ,
" config " : TrinoConfig (
host_port = " localhost:5300 " ,
database = " hivedb " ,
username = " foo " ,
platform_instance = " production_warehouse " ,
schema_pattern = AllowDenyPattern ( allow = [ " ^db1 " ] ) ,
2024-02-26 13:55:11 +05:30
catalog_to_connector_details = {
" hivedb " : ConnectorDetail (
connector_platform = " glue " ,
platform_instance = " local_server " ,
)
} ,
2022-02-02 22:52:50 -08:00
) . dict ( ) ,
} ,
" sink " : {
" type " : " file " ,
" config " : FileSinkConfig ( filename = str ( events_file ) ) . dict ( ) ,
} ,
}
# Run the metadata ingestion pipeline.
pipeline = Pipeline . create ( pipeline_config )
pipeline . run ( )
pipeline . pretty_print_summary ( )
pipeline . raise_from_status ( raise_warnings = True )
2024-02-26 13:55:11 +05:30
# Verify the output.
mce_helpers . check_golden_file (
pytestconfig ,
output_path = events_file ,
golden_path = test_resources_dir / " trino_hive_instance_mces_golden.json " ,
ignore_paths = [
r " root \ [ \ d+ \ ] \ [ ' proposedSnapshot ' \ ] \ [ ' com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot ' \ ] \ [ ' aspects ' \ ] \ [ \ d+ \ ] \ [ ' com.linkedin.pegasus2avro.dataset.DatasetProperties ' \ ] \ [ ' customProperties ' \ ] \ [ ' transient_lastddltime ' \ ] " ,
] ,
2022-02-02 22:52:50 -08:00
)