| 
									
										
										
										
											2021-12-01 12:46:28 +05:30
										 |  |  | #  Copyright 2021 Collate | 
					
						
							|  |  |  | #  Licensed under the Apache License, Version 2.0 (the "License"); | 
					
						
							|  |  |  | #  you may not use this file except in compliance with the License. | 
					
						
							|  |  |  | #  You may obtain a copy of the License at | 
					
						
							| 
									
										
										
										
											2021-08-02 15:08:30 +05:30
										 |  |  | #  http://www.apache.org/licenses/LICENSE-2.0 | 
					
						
							|  |  |  | #  Unless required by applicable law or agreed to in writing, software | 
					
						
							|  |  |  | #  distributed under the License is distributed on an "AS IS" BASIS, | 
					
						
							|  |  |  | #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
					
						
							|  |  |  | #  See the License for the specific language governing permissions and | 
					
						
							|  |  |  | #  limitations under the License. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-11-03 21:02:34 +01:00
										 |  |  | import socket | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  | import time | 
					
						
							| 
									
										
										
										
											2021-11-03 21:02:34 +01:00
										 |  |  | from typing import List | 
					
						
							|  |  |  | from urllib.parse import urlparse | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-08-07 11:00:35 +05:30
										 |  |  | import pytest | 
					
						
							|  |  |  | import requests | 
					
						
							| 
									
										
										
										
											2021-08-01 14:27:44 -07:00
										 |  |  | from sqlalchemy.engine import create_engine | 
					
						
							|  |  |  | from sqlalchemy.inspection import inspect | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-02-01 01:29:56 +01:00
										 |  |  | from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest | 
					
						
							|  |  |  | from metadata.generated.schema.api.data.createTable import CreateTableRequest | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  | from metadata.generated.schema.api.services.createDatabaseService import ( | 
					
						
							| 
									
										
										
										
											2022-02-01 01:29:56 +01:00
										 |  |  |     CreateDatabaseServiceRequest, | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  | ) | 
					
						
							| 
									
										
										
										
											2021-11-03 21:02:34 +01:00
										 |  |  | from metadata.generated.schema.entity.data.database import Database | 
					
						
							|  |  |  | from metadata.generated.schema.entity.data.table import Column, Table | 
					
						
							|  |  |  | from metadata.generated.schema.entity.services.databaseService import DatabaseService | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  | from metadata.generated.schema.type.entityReference import EntityReference | 
					
						
							| 
									
										
										
										
											2021-11-03 21:02:34 +01:00
										 |  |  | from metadata.ingestion.ometa.ometa_api import OpenMetadata | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-08-01 14:27:44 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | def is_responsive(url): | 
					
						
							|  |  |  |     try: | 
					
						
							|  |  |  |         response = requests.get(url) | 
					
						
							|  |  |  |         if response.status_code == 200: | 
					
						
							|  |  |  |             return True | 
					
						
							|  |  |  |     except ConnectionError: | 
					
						
							|  |  |  |         return False | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-11-03 21:02:34 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-11-01 08:32:34 -07:00
										 |  |  | def is_port_open(url): | 
					
						
							|  |  |  |     url_parts = urlparse(url) | 
					
						
							|  |  |  |     hostname = url_parts.hostname | 
					
						
							|  |  |  |     port = url_parts.port | 
					
						
							|  |  |  |     try: | 
					
						
							|  |  |  |         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | 
					
						
							|  |  |  |         s.connect((hostname, port)) | 
					
						
							|  |  |  |         return True | 
					
						
							|  |  |  |     except socket.error: | 
					
						
							|  |  |  |         return False | 
					
						
							|  |  |  |     finally: | 
					
						
							|  |  |  |         s.close() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-11-03 21:02:34 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-11-01 08:32:34 -07:00
										 |  |  | def sleep(timeout_s): | 
					
						
							|  |  |  |     print(f"sleeping for {timeout_s} seconds") | 
					
						
							|  |  |  |     n = len(str(timeout_s)) | 
					
						
							|  |  |  |     for i in range(timeout_s, 0, -1): | 
					
						
							|  |  |  |         print(f"{i:>{n}}", end="\r", flush=True) | 
					
						
							|  |  |  |         time.sleep(1) | 
					
						
							|  |  |  |     print(f"{'':>{n}}", end="\n", flush=True) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-08-01 14:27:44 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | def status(r): | 
					
						
							|  |  |  |     if r.status_code == 200 or r.status_code == 201: | 
					
						
							|  |  |  |         return 1 | 
					
						
							|  |  |  |     else: | 
					
						
							|  |  |  |         return 0 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-11-03 21:02:34 +01:00
										 |  |  | def create_delete_table(client: OpenMetadata, databases: List[Database]): | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |     columns = [ | 
					
						
							| 
									
										
										
										
											2021-11-01 08:32:34 -07:00
										 |  |  |         Column(name="id", dataType="INT", dataLength=1), | 
					
						
							|  |  |  |         Column(name="name", dataType="VARCHAR", dataLength=1), | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |     ] | 
					
						
							| 
									
										
										
										
											2022-03-01 12:19:36 +01:00
										 |  |  |     db_ref = EntityReference( | 
					
						
							|  |  |  |         id=databases[0].id, name=databases[0].name.__root__, type="database" | 
					
						
							|  |  |  |     ) | 
					
						
							| 
									
										
										
										
											2022-02-04 12:39:08 -08:00
										 |  |  |     table = CreateTableRequest(name="test1", columns=columns, database=db_ref) | 
					
						
							| 
									
										
										
										
											2021-11-03 21:02:34 +01:00
										 |  |  |     created_table = client.create_or_update(table) | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |     if table.name.__root__ == created_table.name.__root__: | 
					
						
							| 
									
										
										
										
											2021-11-03 21:02:34 +01:00
										 |  |  |         client.delete(entity=Table, entity_id=str(created_table.id.__root__)) | 
					
						
							| 
									
										
										
										
											2021-08-01 14:27:44 -07:00
										 |  |  |         return 1 | 
					
						
							|  |  |  |     else: | 
					
						
							| 
									
										
										
										
											2021-11-03 21:02:34 +01:00
										 |  |  |         client.delete(entity=Table, entity_id=str(created_table.id.__root__)) | 
					
						
							| 
									
										
										
										
											2021-08-01 14:27:44 -07:00
										 |  |  |         return 0 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-11-03 21:02:34 +01:00
										 |  |  | def create_delete_database(client: OpenMetadata, databases: List[Database]): | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |     data = { | 
					
						
							| 
									
										
										
										
											2022-01-21 22:06:14 -08:00
										 |  |  |         "databaseConnection": {"hostPort": "localhost"}, | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |         "name": "temp_local_hive", | 
					
						
							| 
									
										
										
										
											2021-11-01 08:32:34 -07:00
										 |  |  |         "serviceType": "Hive", | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |         "description": "local hive env", | 
					
						
							|  |  |  |     } | 
					
						
							| 
									
										
										
										
											2022-02-01 01:29:56 +01:00
										 |  |  |     create_hive_service = CreateDatabaseServiceRequest(**data) | 
					
						
							| 
									
										
										
										
											2021-11-03 21:02:34 +01:00
										 |  |  |     hive_service = client.create_or_update(create_hive_service) | 
					
						
							| 
									
										
										
										
											2022-02-01 01:29:56 +01:00
										 |  |  |     create_database_request = CreateDatabaseRequest( | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |         name="dwh", service=EntityReference(id=hive_service.id, type="databaseService") | 
					
						
							|  |  |  |     ) | 
					
						
							| 
									
										
										
										
											2021-11-03 21:02:34 +01:00
										 |  |  |     created_database = client.create_or_update(create_database_request) | 
					
						
							|  |  |  |     resp = create_delete_table(client, databases) | 
					
						
							| 
									
										
										
										
											2021-08-01 14:27:44 -07:00
										 |  |  |     print(resp) | 
					
						
							| 
									
										
										
										
											2021-11-03 21:02:34 +01:00
										 |  |  |     client.delete(entity=Database, entity_id=str(created_database.id.__root__)) | 
					
						
							|  |  |  |     client.delete(entity=DatabaseService, entity_id=str(hive_service.id.__root__)) | 
					
						
							| 
									
										
										
										
											2021-08-01 14:27:44 -07:00
										 |  |  |     return resp | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @pytest.fixture(scope="session") | 
					
						
							|  |  |  | def hive_service(docker_ip, docker_services): | 
					
						
							|  |  |  |     """Ensure that Docker service is up and responsive.""" | 
					
						
							|  |  |  |     port = docker_services.port_for("hive-server", 10000) | 
					
						
							| 
									
										
										
										
											2021-11-01 08:32:34 -07:00
										 |  |  |     print(f"HIVE is running on port {port}") | 
					
						
							|  |  |  |     timeout_s = 120 | 
					
						
							|  |  |  |     sleep(timeout_s) | 
					
						
							|  |  |  |     url = "hive://localhost:10000/" | 
					
						
							| 
									
										
										
										
											2021-08-01 14:27:44 -07:00
										 |  |  |     docker_services.wait_until_responsive( | 
					
						
							| 
									
										
										
										
											2021-11-01 08:32:34 -07:00
										 |  |  |         timeout=timeout_s, pause=0.1, check=lambda: is_port_open(url) | 
					
						
							| 
									
										
										
										
											2021-08-01 14:27:44 -07:00
										 |  |  |     ) | 
					
						
							|  |  |  |     engine = create_engine(url) | 
					
						
							|  |  |  |     inspector = inspect(engine) | 
					
						
							|  |  |  |     return inspector | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-11-03 21:02:34 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-08-07 11:00:35 +05:30
										 |  |  | def test_check_schema(hive_service): | 
					
						
							| 
									
										
										
										
											2021-08-01 14:27:44 -07:00
										 |  |  |     inspector = hive_service | 
					
						
							|  |  |  |     schemas = [] | 
					
						
							|  |  |  |     for schema in inspector.get_schema_names(): | 
					
						
							|  |  |  |         schemas.append(schema) | 
					
						
							|  |  |  |     if "default" in schemas: | 
					
						
							|  |  |  |         assert 1 | 
					
						
							|  |  |  |     else: | 
					
						
							|  |  |  |         assert 0 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def test_read_tables(hive_service): | 
					
						
							|  |  |  |     inspector = hive_service | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |     check_tables = [ | 
					
						
							|  |  |  |         "metadata_array_struct_test", | 
					
						
							|  |  |  |         "metadata_struct_test", | 
					
						
							|  |  |  |         "metadata_test_table", | 
					
						
							|  |  |  |         "test_check", | 
					
						
							|  |  |  |     ] | 
					
						
							| 
									
										
										
										
											2021-08-01 14:27:44 -07:00
										 |  |  |     tables = [] | 
					
						
							|  |  |  |     for schema in inspector.get_schema_names(): | 
					
						
							|  |  |  |         for table in inspector.get_table_names(schema): | 
					
						
							|  |  |  |             tables.append(table) | 
					
						
							|  |  |  |             if set(tables) == set(check_tables): | 
					
						
							|  |  |  |                 assert 1 | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 assert 0 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-08-07 11:00:35 +05:30
										 |  |  | def test_check_table(): | 
					
						
							| 
									
										
										
										
											2022-05-27 03:09:13 -07:00
										 |  |  |     is_responsive("http://localhost:8586/healthcheck") | 
					
						
							| 
									
										
										
										
											2021-11-01 08:32:34 -07:00
										 |  |  |     metadata_config = MetadataServerConfig.parse_obj( | 
					
						
							|  |  |  |         {"api_endpoint": "http://localhost:8585/api", "auth_provider_type": "no-auth"} | 
					
						
							|  |  |  |     ) | 
					
						
							| 
									
										
										
										
											2021-11-03 21:02:34 +01:00
										 |  |  |     client = OpenMetadata(metadata_config) | 
					
						
							|  |  |  |     databases = client.list_entities(entity=Database).entities | 
					
						
							| 
									
										
										
										
											2021-08-01 14:27:44 -07:00
										 |  |  |     if len(databases) > 0: | 
					
						
							| 
									
										
										
										
											2021-11-03 21:02:34 +01:00
										 |  |  |         assert create_delete_table(client, databases) | 
					
						
							| 
									
										
										
										
											2021-08-01 14:27:44 -07:00
										 |  |  |     else: | 
					
						
							| 
									
										
										
										
											2021-11-03 21:02:34 +01:00
										 |  |  |         assert create_delete_database(client, databases) |