diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/trinoConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/trinoConnection.json index 1833c190bb9..bfda651729c 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/trinoConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/trinoConnection.json @@ -31,7 +31,7 @@ "default": "trino" }, "username": { - "description": "username to connect to the Snowflake. This user should have privileges to read all the metadata in Snowflake.", + "description": "username to connect to Trino. This user should have privileges to read all the metadata in Trino.", "type": "string" }, "password": { @@ -43,8 +43,12 @@ "description": "Host and port of the data source.", "type": "string" }, + "catalog": { + "description": "Catalog of the data source.", + "type": "string" + }, "database": { - "description": "Database of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single database. When left blank , OpenMetadata Ingestion attempts to scan all the databases in Trino.", + "description": "Database of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single database. When left blank , OpenMetadata Ingestion attempts to scan all the databases in the selected catalog in Trino.", "type": "string" }, "connectionOptions": { @@ -53,6 +57,14 @@ "connectionArguments": { "$ref": "connectionBasicType.json#/definitions/connectionArguments" }, + "proxies": { + "description": "Proxies for the connection to Trino data source", + "type": "object" + }, + "params": { + "description": "URL parameters for connection to the Trino data source", + "type": "object" + }, "supportedPipelineTypes": { "description": "Supported Metadata Extraction Pipelines.", "type": "string", diff --git a/docs/integrations/connectors/trino.md b/docs/integrations/connectors/trino.md index 6abdab8c52b..5322394f442 100644 --- a/docs/integrations/connectors/trino.md +++ b/docs/integrations/connectors/trino.md @@ -184,6 +184,14 @@ To specify a single database to ingest metadata from, provide the name of the da "database": "trino_db" ``` +#### **proxies (optional)** + +You can set a proxy for the connection with trino. If this field is not included, no proxy is set. + +```javascript +"proxies": {"http": "", "https": ""} +``` + ### **5. Enable/disable the data profiler** The data profiler ingests usage information for tables. This enables you to assess the frequency of use, reliability, and other details. @@ -407,9 +415,9 @@ Then re-run the install command in [Step 2](trino.md#2.-install-the-python-modul If you encounter the following error when attempting to run the ingestion workflow in Step 12, this is probably because there is no OpenMetadata server running at http://localhost:8585. ``` -requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=8585): -Max retries exceeded with url: /api/v1/services/databaseServices/name/local_trino -(Caused by NewConnectionError(': +requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=8585): +Max retries exceeded with url: /api/v1/services/databaseServices/name/local_trino +(Caused by NewConnectionError(': Failed to establish a new connection: [Errno 61] Connection refused')) ``` diff --git a/ingestion/src/metadata/ingestion/source/trino.py b/ingestion/src/metadata/ingestion/source/trino.py index cbc1abf13ce..27dc51a4bc9 100644 --- a/ingestion/src/metadata/ingestion/source/trino.py +++ b/ingestion/src/metadata/ingestion/source/trino.py @@ -11,8 +11,7 @@ import logging import sys -from typing import Iterable, Optional -from urllib.parse import quote_plus +from typing import Iterable import click from sqlalchemy.inspection import inspect @@ -20,37 +19,18 @@ from sqlalchemy.inspection import inspect from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataServerConfig, ) +from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.source.sql_source import SQLSource -from metadata.ingestion.source.sql_source_common import SQLConnectionConfig logger = logging.getLogger(__name__) from metadata.generated.schema.entity.services.connections.database.trinoConnection import ( TrinoConnection, ) - - -class TrinoConfig(TrinoConnection, SQLConnectionConfig): - params: Optional[dict] = None - - def get_connection_url(self): - url = f"{self.scheme}://" - if self.username: - url += f"{quote_plus(self.username)}" - if self.password: - url += f":{quote_plus(self.password.get_secret_value())}" - url += "@" - url += f"{self.host_port}" - url += f"/{self.catalog}" - if self.params is not None: - params = "&".join( - f"{key}={quote_plus(value)}" - for (key, value) in self.params.items() - if value - ) - url = f"{url}?{params}" - return url +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) class TrinoSource(SQLSource): @@ -74,7 +54,12 @@ class TrinoSource(SQLSource): @classmethod def create(cls, config_dict, metadata_config: OpenMetadataServerConfig): - config = TrinoConfig.parse_obj(config_dict) + config = WorkflowSource.parse_obj(config_dict) + connection: TrinoConnection = config.serviceConnection.__root__.config + if not isinstance(connection, TrinoConnection): + raise InvalidSourceException( + f"Expected TrinoConnection, but got {connection}" + ) return cls(config, metadata_config) def prepare(self): diff --git a/ingestion/src/metadata/utils/engines.py b/ingestion/src/metadata/utils/engines.py index 642a1daa31c..312b4c82c09 100644 --- a/ingestion/src/metadata/utils/engines.py +++ b/ingestion/src/metadata/utils/engines.py @@ -22,7 +22,7 @@ from sqlalchemy.orm.session import Session from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) -from metadata.utils.source_connections import get_connection_url +from metadata.utils.source_connections import get_connection_args, get_connection_url logger = logging.getLogger("Utils") @@ -36,13 +36,10 @@ def get_engine(workflow_source: WorkflowSource, verbose: bool = False) -> Engine options = service_connection_config.connectionOptions if not options: options = {} - connect_args = service_connection_config.connectionArguments - if not connect_args: - connect_args = {} engine = create_engine( get_connection_url(service_connection_config), **options, - connect_args=connect_args, + connect_args=get_connection_args(service_connection_config), echo=verbose, ) diff --git a/ingestion/src/metadata/utils/source_connections.py b/ingestion/src/metadata/utils/source_connections.py index 16bac3b9e49..b90812bba06 100644 --- a/ingestion/src/metadata/utils/source_connections.py +++ b/ingestion/src/metadata/utils/source_connections.py @@ -14,6 +14,8 @@ Hosts the singledispatch to build source URLs from functools import singledispatch from urllib.parse import quote_plus +from requests import Session + from metadata.generated.schema.entity.services.connections.database.clickhouseConnection import ( ClickhouseConnection, ) @@ -32,6 +34,9 @@ from metadata.generated.schema.entity.services.connections.database.redshiftConn from metadata.generated.schema.entity.services.connections.database.sqliteConnection import ( SQLiteConnection, ) +from metadata.generated.schema.entity.services.connections.database.trinoConnection import ( + TrinoConnection, +) def get_connection_url_common(connection): @@ -91,8 +96,50 @@ def _(connection: SQLiteConnection): return f"{connection.scheme.value}:///:memory:" -def get_connection_url(connection: DatabricksConnection): +@get_connection_url.register +def _(connection: TrinoConnection): + url = f"{connection.scheme.value}://" + if connection.username: + url += f"{quote_plus(connection.username)}" + if connection.password: + url += f":{quote_plus(connection.password.get_secret_value())}" + url += "@" + url += f"{connection.hostPort}" + url += f"/{connection.catalog}" + if connection.params is not None: + params = "&".join( + f"{key}={quote_plus(value)}" + for (key, value) in connection.params.items() + if value + ) + url = f"{url}?{params}" + return url + + +@get_connection_url.register +def _(connection: DatabricksConnection): url = f"{connection.scheme.value}://token:{connection.token}@{connection.hostPort}" if connection.database: url += f"/{connection.database}" return url + + +@singledispatch +def get_connection_args(connection): + if connection.connectionArguments: + return connection.connectionArguments + else: + return {} + + +@get_connection_args.register +def _(connection: TrinoConnection): + if connection.proxies: + session = Session() + session.proxies = connection.proxies + if connection.connectionArguments: + return {**connection.connectionArguments, "http_session": session} + else: + return {"http_session": session} + else: + return connection.connectionArguments diff --git a/ingestion/tests/unit/source_connection/test_trino_connection.py b/ingestion/tests/unit/source_connection/test_trino_connection.py new file mode 100644 index 00000000000..c59c0cf8890 --- /dev/null +++ b/ingestion/tests/unit/source_connection/test_trino_connection.py @@ -0,0 +1,59 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from unittest import TestCase + +from metadata.generated.schema.entity.services.connections.database.trinoConnection import ( + TrinoConnection, + TrinoScheme, +) +from metadata.utils.source_connections import get_connection_args, get_connection_url + + +class TrinoConnectionTest(TestCase): + def test_connection_url_without_params(self): + expected_url = "trino://username:pass@localhost:443/catalog" + trino_conn_obj = TrinoConnection( + scheme=TrinoScheme.trino, + hostPort="localhost:443", + username="username", + password="pass", + catalog="catalog", + ) + assert expected_url == get_connection_url(trino_conn_obj) + + def test_connection_url_with_params(self): + expected_url = "trino://username:pass@localhost:443/catalog?param=value" + trino_conn_obj = TrinoConnection( + scheme=TrinoScheme.trino, + hostPort="localhost:443", + username="username", + password="pass", + catalog="catalog", + params={"param": "value"}, + ) + assert expected_url == get_connection_url(trino_conn_obj) + + def test_connection_with_proxies(self): + test_proxies = {"http": "http_proxy", "https": "https_proxy"} + trino_conn_obj = TrinoConnection( + scheme=TrinoScheme.trino, + hostPort="localhost:443", + username="username", + password="pass", + catalog="catalog", + proxies=test_proxies, + ) + assert ( + test_proxies + == get_connection_args(trino_conn_obj).get("http_session").proxies + )