| 
									
										
										
										
											2024-05-22 17:12:00 +02:00
										 |  |  | import os.path | 
					
						
							|  |  |  | import random | 
					
						
							| 
									
										
										
										
											2024-08-20 10:42:10 +02:00
										 |  |  | from time import sleep | 
					
						
							| 
									
										
										
										
											2024-05-22 17:12:00 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | import docker | 
					
						
							| 
									
										
										
										
											2024-08-20 10:42:10 +02:00
										 |  |  | import pandas as pd | 
					
						
							| 
									
										
										
										
											2024-05-22 17:12:00 +02:00
										 |  |  | import pytest | 
					
						
							|  |  |  | import testcontainers.core.network | 
					
						
							| 
									
										
										
										
											2024-08-20 10:42:10 +02:00
										 |  |  | from sqlalchemy import create_engine, insert | 
					
						
							|  |  |  | from sqlalchemy.engine import make_url | 
					
						
							| 
									
										
										
										
											2024-05-22 17:12:00 +02:00
										 |  |  | from tenacity import retry, stop_after_delay, wait_fixed | 
					
						
							|  |  |  | from testcontainers.core.container import DockerContainer | 
					
						
							|  |  |  | from testcontainers.core.generic import DbContainer | 
					
						
							|  |  |  | from testcontainers.minio import MinioContainer | 
					
						
							|  |  |  | from testcontainers.mysql import MySqlContainer | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-26 12:12:04 +02:00
										 |  |  | from _openmetadata_testutils.helpers.docker import try_bind | 
					
						
							| 
									
										
										
										
											2024-07-17 08:11:34 +02:00
										 |  |  | from metadata.generated.schema.api.services.createDatabaseService import ( | 
					
						
							|  |  |  |     CreateDatabaseServiceRequest, | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | from metadata.generated.schema.entity.services.connections.database.trinoConnection import ( | 
					
						
							|  |  |  |     TrinoConnection, | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | from metadata.generated.schema.entity.services.databaseService import ( | 
					
						
							|  |  |  |     DatabaseConnection, | 
					
						
							|  |  |  |     DatabaseService, | 
					
						
							|  |  |  |     DatabaseServiceType, | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-08-19 09:09:35 +02:00
										 |  |  | from ..conftest import ingestion_config as base_ingestion_config | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-22 17:12:00 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | class TrinoContainer(DbContainer): | 
					
						
							|  |  |  |     def __init__( | 
					
						
							|  |  |  |         self, | 
					
						
							|  |  |  |         image: str = "trinodb/trino", | 
					
						
							|  |  |  |         port=8080, | 
					
						
							|  |  |  |         **kwargs, | 
					
						
							|  |  |  |     ): | 
					
						
							|  |  |  |         super().__init__(image, **kwargs) | 
					
						
							|  |  |  |         self.user = "admin" | 
					
						
							|  |  |  |         self.port = port | 
					
						
							|  |  |  |         self.with_exposed_ports(port) | 
					
						
							|  |  |  |         self._built_image = f"trino:{random.randint(0, 10000)}" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def start(self) -> "DbContainer": | 
					
						
							|  |  |  |         self.build() | 
					
						
							|  |  |  |         self.image = self._built_image | 
					
						
							|  |  |  |         return super().start() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _connect(self) -> None: | 
					
						
							|  |  |  |         super()._connect() | 
					
						
							|  |  |  |         engine = create_engine(self.get_connection_url()) | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             retry(wait=wait_fixed(1), stop=stop_after_delay(120))(engine.execute)( | 
					
						
							|  |  |  |                 "select system.runtime.nodes.node_id from system.runtime.nodes" | 
					
						
							|  |  |  |             ).fetchall() | 
					
						
							|  |  |  |         finally: | 
					
						
							|  |  |  |             engine.dispose() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _configure(self) -> None: | 
					
						
							|  |  |  |         pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def stop(self, force=True, delete_volume=True) -> None: | 
					
						
							|  |  |  |         super().stop(force, delete_volume) | 
					
						
							|  |  |  |         self._docker.client.images.remove(self._built_image) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def get_connection_url(self) -> str: | 
					
						
							|  |  |  |         return f"trino://{self.user}:@{self.get_container_host_ip()}:{self.get_exposed_port(self.port)}/?http_scheme=http" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def build(self): | 
					
						
							|  |  |  |         docker_client = docker.from_env() | 
					
						
							|  |  |  |         docker_client.images.build( | 
					
						
							|  |  |  |             path=os.path.dirname(__file__) + "/trino", | 
					
						
							|  |  |  |             tag=self._built_image, | 
					
						
							|  |  |  |             buildargs={"BASE_IMAGE": self.image}, | 
					
						
							|  |  |  |             rm=True, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class HiveMetaStoreContainer(DockerContainer): | 
					
						
							|  |  |  |     def __init__( | 
					
						
							|  |  |  |         self, | 
					
						
							|  |  |  |         image: str = "apache/hive", | 
					
						
							|  |  |  |         port=9083, | 
					
						
							|  |  |  |         **kwargs, | 
					
						
							|  |  |  |     ): | 
					
						
							|  |  |  |         super().__init__(image, **kwargs) | 
					
						
							|  |  |  |         self.port = port | 
					
						
							|  |  |  |         self.with_exposed_ports(port) | 
					
						
							|  |  |  |         self._build_args = {} | 
					
						
							|  |  |  |         self._built_image = f"hive:{random.randint(0, 10000)}" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def start(self) -> "DockerContainer": | 
					
						
							|  |  |  |         self.build() | 
					
						
							|  |  |  |         self.image = self._built_image | 
					
						
							|  |  |  |         return super().start() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def with_build_args(self, key, value) -> "HiveMetaStoreContainer": | 
					
						
							|  |  |  |         self._build_args.update({key: value}) | 
					
						
							|  |  |  |         return self | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def stop(self, force=True, delete_volume=True) -> None: | 
					
						
							|  |  |  |         super().stop(force, delete_volume) | 
					
						
							|  |  |  |         self._docker.client.images.remove(self._built_image) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def build(self): | 
					
						
							|  |  |  |         docker_client = docker.from_env() | 
					
						
							|  |  |  |         docker_client.images.build( | 
					
						
							|  |  |  |             path=os.path.dirname(__file__) + "/hive", | 
					
						
							|  |  |  |             tag=self._built_image, | 
					
						
							|  |  |  |             buildargs={ | 
					
						
							|  |  |  |                 "BASE_IMAGE": self.image, | 
					
						
							|  |  |  |             }, | 
					
						
							|  |  |  |             rm=True, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-17 08:11:34 +02:00
										 |  |  | @pytest.fixture(scope="session") | 
					
						
							| 
									
										
										
										
											2024-05-22 17:12:00 +02:00
										 |  |  | def docker_network(): | 
					
						
							|  |  |  |     with testcontainers.core.network.Network() as network: | 
					
						
							|  |  |  |         yield network | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-17 08:11:34 +02:00
										 |  |  | @pytest.fixture(scope="session") | 
					
						
							| 
									
										
										
										
											2024-05-22 17:12:00 +02:00
										 |  |  | def trino_container(hive_metastore_container, minio_container, docker_network): | 
					
						
							| 
									
										
										
										
											2024-08-20 10:42:10 +02:00
										 |  |  |     container = ( | 
					
						
							|  |  |  |         TrinoContainer(image="trinodb/trino:418") | 
					
						
							|  |  |  |         .with_network(docker_network) | 
					
						
							|  |  |  |         .with_env( | 
					
						
							|  |  |  |             "HIVE_METASTORE_URI", | 
					
						
							|  |  |  |             f"thrift://metastore:{hive_metastore_container.port}", | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         .with_env( | 
					
						
							|  |  |  |             "MINIO_ENDPOINT", | 
					
						
							|  |  |  |             f"http://minio:{minio_container.port}", | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     with try_bind(container, container.port, container.port + 1) as trino: | 
					
						
							| 
									
										
										
										
											2024-05-22 17:12:00 +02:00
										 |  |  |         yield trino | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-17 08:11:34 +02:00
										 |  |  | @pytest.fixture(scope="session") | 
					
						
							| 
									
										
										
										
											2024-05-22 17:12:00 +02:00
										 |  |  | def mysql_container(docker_network): | 
					
						
							| 
									
										
										
										
											2024-07-17 08:11:34 +02:00
										 |  |  |     container = ( | 
					
						
							|  |  |  |         MySqlContainer( | 
					
						
							|  |  |  |             "mariadb:10.6.16", username="admin", password="admin", dbname="metastore_db" | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         .with_network(docker_network) | 
					
						
							|  |  |  |         .with_network_aliases("mariadb") | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     with try_bind(container, container.port, container.port + 1) as mysql: | 
					
						
							| 
									
										
										
										
											2024-05-22 17:12:00 +02:00
										 |  |  |         yield mysql | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-17 08:11:34 +02:00
										 |  |  | @pytest.fixture(scope="session") | 
					
						
							| 
									
										
										
										
											2024-05-22 17:12:00 +02:00
										 |  |  | def hive_metastore_container(mysql_container, minio_container, docker_network): | 
					
						
							|  |  |  |     with HiveMetaStoreContainer("bitsondatadev/hive-metastore:latest").with_network( | 
					
						
							|  |  |  |         docker_network | 
					
						
							|  |  |  |     ).with_network_aliases("metastore").with_env( | 
					
						
							|  |  |  |         "METASTORE_DB_HOSTNAME", "mariadb" | 
					
						
							|  |  |  |     ).with_env( | 
					
						
							|  |  |  |         "METASTORE_DB_PORT", str(mysql_container.port) | 
					
						
							|  |  |  |     ).with_env( | 
					
						
							|  |  |  |         "JDBC_CONNECTION_URL", | 
					
						
							|  |  |  |         f"jdbc:mysql://mariadb:{mysql_container.port}/{mysql_container.dbname}", | 
					
						
							|  |  |  |     ).with_env( | 
					
						
							|  |  |  |         "MINIO_ENDPOINT", | 
					
						
							|  |  |  |         f"http://minio:{minio_container.port}", | 
					
						
							|  |  |  |     ) as hive: | 
					
						
							|  |  |  |         yield hive | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-17 08:11:34 +02:00
										 |  |  | @pytest.fixture(scope="session") | 
					
						
							| 
									
										
										
										
											2024-05-22 17:12:00 +02:00
										 |  |  | def minio_container(docker_network): | 
					
						
							| 
									
										
										
										
											2024-07-17 08:11:34 +02:00
										 |  |  |     container = ( | 
					
						
							|  |  |  |         MinioContainer().with_network(docker_network).with_network_aliases("minio") | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     with try_bind(container, container.port, container.port) as minio: | 
					
						
							| 
									
										
										
										
											2024-05-22 17:12:00 +02:00
										 |  |  |         client = minio.get_client() | 
					
						
							|  |  |  |         client.make_bucket("hive-warehouse") | 
					
						
							|  |  |  |         yield minio | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-17 08:11:34 +02:00
										 |  |  | @pytest.fixture(scope="session") | 
					
						
							| 
									
										
										
										
											2024-05-22 17:12:00 +02:00
										 |  |  | def create_test_data(trino_container): | 
					
						
							| 
									
										
										
										
											2024-08-20 10:42:10 +02:00
										 |  |  |     engine = create_engine( | 
					
						
							|  |  |  |         make_url(trino_container.get_connection_url()).set(database="minio") | 
					
						
							|  |  |  |     ) | 
					
						
							| 
									
										
										
										
											2024-05-22 17:12:00 +02:00
										 |  |  |     engine.execute( | 
					
						
							|  |  |  |         "create schema minio.my_schema WITH (location = 's3a://hive-warehouse/')" | 
					
						
							|  |  |  |     ) | 
					
						
							| 
									
										
										
										
											2024-08-20 10:42:10 +02:00
										 |  |  |     data_dir = os.path.dirname(__file__) + "/data" | 
					
						
							|  |  |  |     for file in os.listdir(data_dir): | 
					
						
							|  |  |  |         df = pd.read_parquet(f"{data_dir}/{file}") | 
					
						
							|  |  |  |         for col in df.columns: | 
					
						
							|  |  |  |             if pd.api.types.is_datetime64tz_dtype(df[col]): | 
					
						
							|  |  |  |                 df[col] = df[col].dt.tz_convert(None) | 
					
						
							|  |  |  |         df.to_sql( | 
					
						
							|  |  |  |             file.replace(".parquet", ""), | 
					
						
							|  |  |  |             engine, | 
					
						
							|  |  |  |             schema="my_schema", | 
					
						
							|  |  |  |             if_exists="fail", | 
					
						
							|  |  |  |             index=False, | 
					
						
							|  |  |  |             method=custom_insert, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         sleep(1) | 
					
						
							| 
									
										
										
										
											2024-11-07 18:37:31 +01:00
										 |  |  |         engine.execute( | 
					
						
							|  |  |  |             "ANALYZE " + f'minio."my_schema"."{file.replace(".parquet", "")}"' | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |     engine.execute( | 
					
						
							|  |  |  |         "CALL system.drop_stats(schema_name => 'my_schema', table_name => 'empty')" | 
					
						
							|  |  |  |     ) | 
					
						
							| 
									
										
										
										
											2024-05-22 17:12:00 +02:00
										 |  |  |     return | 
					
						
							| 
									
										
										
										
											2024-07-17 08:11:34 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-08-20 10:42:10 +02:00
										 |  |  | def custom_insert(self, conn, keys: list[str], data_iter): | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Hack pandas.io.sql.SQLTable._execute_insert_multi to retry untill rows are inserted. | 
					
						
							|  |  |  |     This is required becauase using trino with pd.to_sql in our setup us unreliable. | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     rowcount = 0 | 
					
						
							|  |  |  |     max_tries = 10 | 
					
						
							|  |  |  |     try_num = 0 | 
					
						
							|  |  |  |     data = [dict(zip(keys, row)) for row in data_iter] | 
					
						
							|  |  |  |     while rowcount != len(data): | 
					
						
							|  |  |  |         if try_num >= max_tries: | 
					
						
							|  |  |  |             raise RuntimeError(f"Failed to insert data after {max_tries} tries") | 
					
						
							|  |  |  |         try_num += 1 | 
					
						
							|  |  |  |         stmt = insert(self.table).values(data) | 
					
						
							|  |  |  |         conn.execute(stmt) | 
					
						
							|  |  |  |         rowcount = conn.execute( | 
					
						
							|  |  |  |             "SELECT COUNT(*) FROM " + f'"{self.schema}"."{self.name}"' | 
					
						
							|  |  |  |         ).scalar() | 
					
						
							|  |  |  |     return rowcount | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-17 08:11:34 +02:00
										 |  |  | @pytest.fixture(scope="module") | 
					
						
							|  |  |  | def create_service_request(trino_container, tmp_path_factory): | 
					
						
							|  |  |  |     return CreateDatabaseServiceRequest( | 
					
						
							|  |  |  |         name="docker_test_" + tmp_path_factory.mktemp("trino").name, | 
					
						
							|  |  |  |         serviceType=DatabaseServiceType.Trino, | 
					
						
							|  |  |  |         connection=DatabaseConnection( | 
					
						
							|  |  |  |             config=TrinoConnection( | 
					
						
							|  |  |  |                 username=trino_container.user, | 
					
						
							|  |  |  |                 hostPort="localhost:" | 
					
						
							|  |  |  |                 + trino_container.get_exposed_port(trino_container.port), | 
					
						
							|  |  |  |                 catalog="minio", | 
					
						
							|  |  |  |                 connectionArguments={"http_scheme": "http"}, | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |         ), | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-08-20 10:42:10 +02:00
										 |  |  | @pytest.fixture(scope="module") | 
					
						
							| 
									
										
										
										
											2024-08-19 09:09:35 +02:00
										 |  |  | def ingestion_config(db_service, sink_config, workflow_config, base_ingestion_config): | 
					
						
							|  |  |  |     base_ingestion_config["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = { | 
					
						
							|  |  |  |         "excludes": [ | 
					
						
							|  |  |  |             "^information_schema$", | 
					
						
							|  |  |  |         ], | 
					
						
							| 
									
										
										
										
											2024-07-17 08:11:34 +02:00
										 |  |  |     } | 
					
						
							| 
									
										
										
										
											2024-08-19 09:09:35 +02:00
										 |  |  |     return base_ingestion_config | 
					
						
							| 
									
										
										
										
											2024-07-17 08:11:34 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @pytest.fixture(scope="module") | 
					
						
							|  |  |  | def unmask_password(): | 
					
						
							|  |  |  |     def patch_password(service: DatabaseService): | 
					
						
							|  |  |  |         return service | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     return patch_password |