Imri Paran b92b950060
Fix 18434: feat(statistics-profiler): use statistics tables to profile trino tables (#18433)
* feat(statistics-profiler): use statistics tables to profile trino tables

- implemented the collaborative root class
- added the "useStatistics" profiler parameter
- added the "supportsStatistics" database connection property
- implemented the ProfilerWithStatistics and StoredStatisticsSource to add this functionality to specific profilers
- implemented TrinoStoredStatisticsSource for specific trino statistics logic

* added ABC to terminal classes in collaborative root

* fixed docstring for TestSuiteInterface

* reverted unintended changes

* typo
2024-11-07 18:37:31 +01:00

271 lines
8.3 KiB
Python

import os.path
import random
from time import sleep
import docker
import pandas as pd
import pytest
import testcontainers.core.network
from sqlalchemy import create_engine, insert
from sqlalchemy.engine import make_url
from tenacity import retry, stop_after_delay, wait_fixed
from testcontainers.core.container import DockerContainer
from testcontainers.core.generic import DbContainer
from testcontainers.minio import MinioContainer
from testcontainers.mysql import MySqlContainer
from _openmetadata_testutils.helpers.docker import try_bind
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.entity.services.connections.database.trinoConnection import (
TrinoConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
DatabaseServiceType,
)
from ..conftest import ingestion_config as base_ingestion_config
class TrinoContainer(DbContainer):
def __init__(
self,
image: str = "trinodb/trino",
port=8080,
**kwargs,
):
super().__init__(image, **kwargs)
self.user = "admin"
self.port = port
self.with_exposed_ports(port)
self._built_image = f"trino:{random.randint(0, 10000)}"
def start(self) -> "DbContainer":
self.build()
self.image = self._built_image
return super().start()
def _connect(self) -> None:
super()._connect()
engine = create_engine(self.get_connection_url())
try:
retry(wait=wait_fixed(1), stop=stop_after_delay(120))(engine.execute)(
"select system.runtime.nodes.node_id from system.runtime.nodes"
).fetchall()
finally:
engine.dispose()
def _configure(self) -> None:
pass
def stop(self, force=True, delete_volume=True) -> None:
super().stop(force, delete_volume)
self._docker.client.images.remove(self._built_image)
def get_connection_url(self) -> str:
return f"trino://{self.user}:@{self.get_container_host_ip()}:{self.get_exposed_port(self.port)}/?http_scheme=http"
def build(self):
docker_client = docker.from_env()
docker_client.images.build(
path=os.path.dirname(__file__) + "/trino",
tag=self._built_image,
buildargs={"BASE_IMAGE": self.image},
rm=True,
)
class HiveMetaStoreContainer(DockerContainer):
def __init__(
self,
image: str = "apache/hive",
port=9083,
**kwargs,
):
super().__init__(image, **kwargs)
self.port = port
self.with_exposed_ports(port)
self._build_args = {}
self._built_image = f"hive:{random.randint(0, 10000)}"
def start(self) -> "DockerContainer":
self.build()
self.image = self._built_image
return super().start()
def with_build_args(self, key, value) -> "HiveMetaStoreContainer":
self._build_args.update({key: value})
return self
def stop(self, force=True, delete_volume=True) -> None:
super().stop(force, delete_volume)
self._docker.client.images.remove(self._built_image)
def build(self):
docker_client = docker.from_env()
docker_client.images.build(
path=os.path.dirname(__file__) + "/hive",
tag=self._built_image,
buildargs={
"BASE_IMAGE": self.image,
},
rm=True,
)
@pytest.fixture(scope="session")
def docker_network():
with testcontainers.core.network.Network() as network:
yield network
@pytest.fixture(scope="session")
def trino_container(hive_metastore_container, minio_container, docker_network):
container = (
TrinoContainer(image="trinodb/trino:418")
.with_network(docker_network)
.with_env(
"HIVE_METASTORE_URI",
f"thrift://metastore:{hive_metastore_container.port}",
)
.with_env(
"MINIO_ENDPOINT",
f"http://minio:{minio_container.port}",
)
)
with try_bind(container, container.port, container.port + 1) as trino:
yield trino
@pytest.fixture(scope="session")
def mysql_container(docker_network):
container = (
MySqlContainer(
"mariadb:10.6.16", username="admin", password="admin", dbname="metastore_db"
)
.with_network(docker_network)
.with_network_aliases("mariadb")
)
with try_bind(container, container.port, container.port + 1) as mysql:
yield mysql
@pytest.fixture(scope="session")
def hive_metastore_container(mysql_container, minio_container, docker_network):
with HiveMetaStoreContainer("bitsondatadev/hive-metastore:latest").with_network(
docker_network
).with_network_aliases("metastore").with_env(
"METASTORE_DB_HOSTNAME", "mariadb"
).with_env(
"METASTORE_DB_PORT", str(mysql_container.port)
).with_env(
"JDBC_CONNECTION_URL",
f"jdbc:mysql://mariadb:{mysql_container.port}/{mysql_container.dbname}",
).with_env(
"MINIO_ENDPOINT",
f"http://minio:{minio_container.port}",
) as hive:
yield hive
@pytest.fixture(scope="session")
def minio_container(docker_network):
container = (
MinioContainer().with_network(docker_network).with_network_aliases("minio")
)
with try_bind(container, container.port, container.port) as minio:
client = minio.get_client()
client.make_bucket("hive-warehouse")
yield minio
@pytest.fixture(scope="session")
def create_test_data(trino_container):
engine = create_engine(
make_url(trino_container.get_connection_url()).set(database="minio")
)
engine.execute(
"create schema minio.my_schema WITH (location = 's3a://hive-warehouse/')"
)
data_dir = os.path.dirname(__file__) + "/data"
for file in os.listdir(data_dir):
df = pd.read_parquet(f"{data_dir}/{file}")
for col in df.columns:
if pd.api.types.is_datetime64tz_dtype(df[col]):
df[col] = df[col].dt.tz_convert(None)
df.to_sql(
file.replace(".parquet", ""),
engine,
schema="my_schema",
if_exists="fail",
index=False,
method=custom_insert,
)
sleep(1)
engine.execute(
"ANALYZE " + f'minio."my_schema"."{file.replace(".parquet", "")}"'
)
engine.execute(
"CALL system.drop_stats(schema_name => 'my_schema', table_name => 'empty')"
)
return
def custom_insert(self, conn, keys: list[str], data_iter):
"""
Hack pandas.io.sql.SQLTable._execute_insert_multi to retry untill rows are inserted.
This is required becauase using trino with pd.to_sql in our setup us unreliable.
"""
rowcount = 0
max_tries = 10
try_num = 0
data = [dict(zip(keys, row)) for row in data_iter]
while rowcount != len(data):
if try_num >= max_tries:
raise RuntimeError(f"Failed to insert data after {max_tries} tries")
try_num += 1
stmt = insert(self.table).values(data)
conn.execute(stmt)
rowcount = conn.execute(
"SELECT COUNT(*) FROM " + f'"{self.schema}"."{self.name}"'
).scalar()
return rowcount
@pytest.fixture(scope="module")
def create_service_request(trino_container, tmp_path_factory):
return CreateDatabaseServiceRequest(
name="docker_test_" + tmp_path_factory.mktemp("trino").name,
serviceType=DatabaseServiceType.Trino,
connection=DatabaseConnection(
config=TrinoConnection(
username=trino_container.user,
hostPort="localhost:"
+ trino_container.get_exposed_port(trino_container.port),
catalog="minio",
connectionArguments={"http_scheme": "http"},
)
),
)
@pytest.fixture(scope="module")
def ingestion_config(db_service, sink_config, workflow_config, base_ingestion_config):
base_ingestion_config["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = {
"excludes": [
"^information_schema$",
],
}
return base_ingestion_config
@pytest.fixture(scope="module")
def unmask_password():
def patch_password(service: DatabaseService):
return service
return patch_password