diff --git a/ingestion/examples/workflows/trino.json b/ingestion/examples/workflows/trino.json index 382c31051fb..02096d67ee0 100644 --- a/ingestion/examples/workflows/trino.json +++ b/ingestion/examples/workflows/trino.json @@ -4,7 +4,8 @@ "config": { "service_name": "local_trino", "host_port": "localhost:8080", - "catalog": "system" + "catalog": "catalog_name", + "schema_name": "schema_name" } }, "sink": { diff --git a/ingestion/setup.py b/ingestion/setup.py index c63597fbabc..e8514632d07 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -121,7 +121,7 @@ plugins: Dict[str, Set[str]] = { build_options = {"includes": ["_cffi_backend"]} setup( name="openmetadata-ingestion", - version="0.4.5.dev1", + version="0.5.0.dev0", url="https://open-metadata.org/", author="OpenMetadata Committers", license="Apache License 2.0", diff --git a/ingestion/src/metadata/ingestion/source/trino.py b/ingestion/src/metadata/ingestion/source/trino.py index 49be247c54b..c733bc1e1b4 100644 --- a/ingestion/src/metadata/ingestion/source/trino.py +++ b/ingestion/src/metadata/ingestion/source/trino.py @@ -9,11 +9,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Optional +from typing import Iterable, Optional from urllib.parse import quote_plus -from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig -from metadata.ingestion.source.sql_source import SQLConnectionConfig, SQLSource +from sqlalchemy.inspection import inspect + +from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable + +from ..ometa.openmetadata_rest import MetadataServerConfig +from .sql_source import SQLConnectionConfig, SQLSource class TrinoConfig(SQLConnectionConfig): @@ -24,16 +28,15 @@ class TrinoConfig(SQLConnectionConfig): schema_name: Optional[str] def get_connection_url(self): - url = f"{self.scheme}://" - if self.username: - url += f"{quote_plus(self.username)}" - if self.password: - url += f":{quote_plus(self.password.get_secret_value())}" - url += f"{self.host_port}" + url = f"{self.scheme}://{self.host_port}" if self.catalog: url += f"/{quote_plus(self.catalog)}" if self.schema_name: url += f"/{quote_plus(self.schema_name)}" + if self.username: + url += f"?user={quote_plus(self.username)}" + if self.password: + url += f"&password={quote_plus(self.password)}" return url @@ -46,3 +49,10 @@ class TrinoSource(SQLSource): config = TrinoConfig.parse_obj(config_dict) metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) return cls(config, metadata_config, ctx) + + def next_record(self) -> Iterable[OMetaDatabaseAndTable]: + inspector = inspect(self.engine) + if self.config.include_tables: + yield from self.fetch_tables(inspector, self.config.schema_name) + if self.config.include_views: + yield from self.fetch_views(inspector, self.config.schema_name) diff --git a/ingestion/tests/integration/trino/test_trino_crud.py b/ingestion/tests/integration/trino/test_trino_crud.py new file mode 100644 index 00000000000..d2a3c142b6b --- /dev/null +++ b/ingestion/tests/integration/trino/test_trino_crud.py @@ -0,0 +1,176 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import socket +import time +from typing import List +from urllib.parse import urlparse + +import pytest +import requests +from sqlalchemy.engine import create_engine +from sqlalchemy.inspection import inspect + +from metadata.generated.schema.api.data.createDatabase import ( + CreateDatabaseEntityRequest, +) +from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest +from metadata.generated.schema.api.services.createDatabaseService import ( + CreateDatabaseServiceEntityRequest, +) +from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.table import Column, Table +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig + + +def is_responsive(url): + try: + response = requests.get(url) + if response.status_code == 200: + return True + except ConnectionError: + return False + + +def is_port_open(url): + url_parts = urlparse(url) + hostname = url_parts.hostname + port = url_parts.port + try: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect((hostname, port)) + return True + except socket.error: + return False + finally: + s.close() + + +def sleep(timeout_s): + print(f"sleeping for {timeout_s} seconds") + n = len(str(timeout_s)) + for i in range(timeout_s, 0, -1): + print(f"{i:>{n}}", end="\r", flush=True) + time.sleep(1) + print(f"{'':>{n}}", end="\n", flush=True) + + +def status(r): + if r.status_code == 200 or r.status_code == 201: + return 1 + else: + return 0 + + +def create_delete_table(client: OpenMetadata, databases: List[Database]): + columns = [ + Column(name="id", dataType="INT", dataLength=1), + Column(name="name", dataType="VARCHAR", dataLength=1), + ] + print(databases[0]) + table = CreateTableEntityRequest( + name="test1", columns=columns, database=databases[0].id.__root__ + ) + created_table = client.create_or_update(table) + if table.name.__root__ == created_table.name.__root__: + client.delete(entity=Table, entity_id=str(created_table.id.__root__)) + return 1 + else: + client.delete(entity=Table, entity_id=str(created_table.id.__root__)) + return 0 + + +def create_delete_database(client: OpenMetadata, databases: List[Database]): + data = { + "jdbc": {"connectionUrl": "trino://localhost/default", "driverClass": "jdbc"}, + "name": "temp_local_trino", + "serviceType": "Trino", + "description": "local trino env", + } + create_trino_service = CreateDatabaseServiceEntityRequest(**data) + trino_service = client.create_or_update(create_trino_service) + create_database_request = CreateDatabaseEntityRequest( + name="dwh", service=EntityReference(id=trino_service.id, type="databaseService") + ) + created_database = client.create_or_update(create_database_request) + resp = create_delete_table(client, databases) + print(resp) + client.delete(entity=Database, entity_id=str(created_database.id.__root__)) + client.delete(entity=DatabaseService, entity_id=str(trino_service.id.__root__)) + return resp + + +@pytest.fixture(scope="session") +def trino_service(docker_ip, docker_services): + """Ensure that Docker service is up and responsive.""" + port = docker_services.port_for("trino-server", 8080) + print(f"trino is running on port {port}") + timeout_s = 120 + sleep(timeout_s) + url = "trino://localhost:8080/" + docker_services.wait_until_responsive( + timeout=timeout_s, pause=0.1, check=lambda: is_port_open(url) + ) + engine = create_engine(url) + inspector = inspect(engine) + return inspector + + +def test_check_schema(trino_service): + inspector = trino_service + schemas = [] + for schema in inspector.get_schema_names(): + schemas.append(schema) + if "metadata" in schemas: + assert 1 + else: + assert 0 + + +def test_read_tables(trino_service): + inspector = trino_service + check_tables = [ + "analyze_properties", + "catalogs", + "column_properties", + "materialized_view_properties", + "materialized_views", + "schema_properties", + "table_comments", + "table_properties", + ] + tables = [] + for table in inspector.get_table_names("metadata"): + tables.append(table) + if set(tables) == set(check_tables): + assert 1 + else: + assert 0 + + +def test_check_table(): + is_responsive("http://localhost:8585/api/v1/health-check") + metadata_config = MetadataServerConfig.parse_obj( + {"api_endpoint": "http://localhost:8585/api", "auth_provider_type": "no-auth"} + ) + client = OpenMetadata(metadata_config) + databases = client.list_entities(entity=Database).entities + if len(databases) > 0: + assert create_delete_table(client, databases) + else: + assert create_delete_database(client, databases) diff --git a/ingestion/tests/integration/trino/tests/docker-compose.yml b/ingestion/tests/integration/trino/tests/docker-compose.yml new file mode 100644 index 00000000000..5a4105e7086 --- /dev/null +++ b/ingestion/tests/integration/trino/tests/docker-compose.yml @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version: "3" +services: + trino-server: + image: trinodb/trino + restart: always + ports: + - 8080:8080 \ No newline at end of file