Refactor Trino Connector and allow proxy setup in trino connection config (#3810)

Co-authored-by: Sriharsha Chintalapani <harshach@users.noreply.github.com>
This commit is contained in:
dvecerdea 2022-04-07 20:50:37 +01:00 committed by GitHub
parent 31695bfe16
commit 230fc256d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 145 additions and 37 deletions

View File

@ -31,7 +31,7 @@
"default": "trino" "default": "trino"
}, },
"username": { "username": {
"description": "username to connect to the Snowflake. This user should have privileges to read all the metadata in Snowflake.", "description": "username to connect to Trino. This user should have privileges to read all the metadata in Trino.",
"type": "string" "type": "string"
}, },
"password": { "password": {
@ -43,8 +43,12 @@
"description": "Host and port of the data source.", "description": "Host and port of the data source.",
"type": "string" "type": "string"
}, },
"catalog": {
"description": "Catalog of the data source.",
"type": "string"
},
"database": { "database": {
"description": "Database of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single database. When left blank , OpenMetadata Ingestion attempts to scan all the databases in Trino.", "description": "Database of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single database. When left blank , OpenMetadata Ingestion attempts to scan all the databases in the selected catalog in Trino.",
"type": "string" "type": "string"
}, },
"connectionOptions": { "connectionOptions": {
@ -53,6 +57,14 @@
"connectionArguments": { "connectionArguments": {
"$ref": "connectionBasicType.json#/definitions/connectionArguments" "$ref": "connectionBasicType.json#/definitions/connectionArguments"
}, },
"proxies": {
"description": "Proxies for the connection to Trino data source",
"type": "object"
},
"params": {
"description": "URL parameters for connection to the Trino data source",
"type": "object"
},
"supportedPipelineTypes": { "supportedPipelineTypes": {
"description": "Supported Metadata Extraction Pipelines.", "description": "Supported Metadata Extraction Pipelines.",
"type": "string", "type": "string",

View File

@ -184,6 +184,14 @@ To specify a single database to ingest metadata from, provide the name of the da
"database": "trino_db" "database": "trino_db"
``` ```
#### **proxies (optional)**
You can set a proxy for the connection with trino. If this field is not included, no proxy is set.
```javascript
"proxies": {"http": "<http proxy>", "https": "<https proxy>"}
```
### **5. Enable/disable the data profiler** ### **5. Enable/disable the data profiler**
The data profiler ingests usage information for tables. This enables you to assess the frequency of use, reliability, and other details. The data profiler ingests usage information for tables. This enables you to assess the frequency of use, reliability, and other details.
@ -407,9 +415,9 @@ Then re-run the install command in [Step 2](trino.md#2.-install-the-python-modul
If you encounter the following error when attempting to run the ingestion workflow in Step 12, this is probably because there is no OpenMetadata server running at http://localhost:8585. If you encounter the following error when attempting to run the ingestion workflow in Step 12, this is probably because there is no OpenMetadata server running at http://localhost:8585.
``` ```
requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=8585): requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=8585):
Max retries exceeded with url: /api/v1/services/databaseServices/name/local_trino Max retries exceeded with url: /api/v1/services/databaseServices/name/local_trino
(Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x1031fa310>: (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x1031fa310>:
Failed to establish a new connection: [Errno 61] Connection refused')) Failed to establish a new connection: [Errno 61] Connection refused'))
``` ```

View File

@ -11,8 +11,7 @@
import logging import logging
import sys import sys
from typing import Iterable, Optional from typing import Iterable
from urllib.parse import quote_plus
import click import click
from sqlalchemy.inspection import inspect from sqlalchemy.inspection import inspect
@ -20,37 +19,18 @@ from sqlalchemy.inspection import inspect
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig, OpenMetadataServerConfig,
) )
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.source.sql_source import SQLSource from metadata.ingestion.source.sql_source import SQLSource
from metadata.ingestion.source.sql_source_common import SQLConnectionConfig
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
from metadata.generated.schema.entity.services.connections.database.trinoConnection import ( from metadata.generated.schema.entity.services.connections.database.trinoConnection import (
TrinoConnection, TrinoConnection,
) )
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
class TrinoConfig(TrinoConnection, SQLConnectionConfig): )
params: Optional[dict] = None
def get_connection_url(self):
url = f"{self.scheme}://"
if self.username:
url += f"{quote_plus(self.username)}"
if self.password:
url += f":{quote_plus(self.password.get_secret_value())}"
url += "@"
url += f"{self.host_port}"
url += f"/{self.catalog}"
if self.params is not None:
params = "&".join(
f"{key}={quote_plus(value)}"
for (key, value) in self.params.items()
if value
)
url = f"{url}?{params}"
return url
class TrinoSource(SQLSource): class TrinoSource(SQLSource):
@ -74,7 +54,12 @@ class TrinoSource(SQLSource):
@classmethod @classmethod
def create(cls, config_dict, metadata_config: OpenMetadataServerConfig): def create(cls, config_dict, metadata_config: OpenMetadataServerConfig):
config = TrinoConfig.parse_obj(config_dict) config = WorkflowSource.parse_obj(config_dict)
connection: TrinoConnection = config.serviceConnection.__root__.config
if not isinstance(connection, TrinoConnection):
raise InvalidSourceException(
f"Expected TrinoConnection, but got {connection}"
)
return cls(config, metadata_config) return cls(config, metadata_config)
def prepare(self): def prepare(self):

View File

@ -22,7 +22,7 @@ from sqlalchemy.orm.session import Session
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource, Source as WorkflowSource,
) )
from metadata.utils.source_connections import get_connection_url from metadata.utils.source_connections import get_connection_args, get_connection_url
logger = logging.getLogger("Utils") logger = logging.getLogger("Utils")
@ -36,13 +36,10 @@ def get_engine(workflow_source: WorkflowSource, verbose: bool = False) -> Engine
options = service_connection_config.connectionOptions options = service_connection_config.connectionOptions
if not options: if not options:
options = {} options = {}
connect_args = service_connection_config.connectionArguments
if not connect_args:
connect_args = {}
engine = create_engine( engine = create_engine(
get_connection_url(service_connection_config), get_connection_url(service_connection_config),
**options, **options,
connect_args=connect_args, connect_args=get_connection_args(service_connection_config),
echo=verbose, echo=verbose,
) )

View File

@ -14,6 +14,8 @@ Hosts the singledispatch to build source URLs
from functools import singledispatch from functools import singledispatch
from urllib.parse import quote_plus from urllib.parse import quote_plus
from requests import Session
from metadata.generated.schema.entity.services.connections.database.clickhouseConnection import ( from metadata.generated.schema.entity.services.connections.database.clickhouseConnection import (
ClickhouseConnection, ClickhouseConnection,
) )
@ -32,6 +34,9 @@ from metadata.generated.schema.entity.services.connections.database.redshiftConn
from metadata.generated.schema.entity.services.connections.database.sqliteConnection import ( from metadata.generated.schema.entity.services.connections.database.sqliteConnection import (
SQLiteConnection, SQLiteConnection,
) )
from metadata.generated.schema.entity.services.connections.database.trinoConnection import (
TrinoConnection,
)
def get_connection_url_common(connection): def get_connection_url_common(connection):
@ -91,8 +96,50 @@ def _(connection: SQLiteConnection):
return f"{connection.scheme.value}:///:memory:" return f"{connection.scheme.value}:///:memory:"
def get_connection_url(connection: DatabricksConnection): @get_connection_url.register
def _(connection: TrinoConnection):
url = f"{connection.scheme.value}://"
if connection.username:
url += f"{quote_plus(connection.username)}"
if connection.password:
url += f":{quote_plus(connection.password.get_secret_value())}"
url += "@"
url += f"{connection.hostPort}"
url += f"/{connection.catalog}"
if connection.params is not None:
params = "&".join(
f"{key}={quote_plus(value)}"
for (key, value) in connection.params.items()
if value
)
url = f"{url}?{params}"
return url
@get_connection_url.register
def _(connection: DatabricksConnection):
url = f"{connection.scheme.value}://token:{connection.token}@{connection.hostPort}" url = f"{connection.scheme.value}://token:{connection.token}@{connection.hostPort}"
if connection.database: if connection.database:
url += f"/{connection.database}" url += f"/{connection.database}"
return url return url
@singledispatch
def get_connection_args(connection):
if connection.connectionArguments:
return connection.connectionArguments
else:
return {}
@get_connection_args.register
def _(connection: TrinoConnection):
if connection.proxies:
session = Session()
session.proxies = connection.proxies
if connection.connectionArguments:
return {**connection.connectionArguments, "http_session": session}
else:
return {"http_session": session}
else:
return connection.connectionArguments

View File

@ -0,0 +1,59 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import TestCase
from metadata.generated.schema.entity.services.connections.database.trinoConnection import (
TrinoConnection,
TrinoScheme,
)
from metadata.utils.source_connections import get_connection_args, get_connection_url
class TrinoConnectionTest(TestCase):
def test_connection_url_without_params(self):
expected_url = "trino://username:pass@localhost:443/catalog"
trino_conn_obj = TrinoConnection(
scheme=TrinoScheme.trino,
hostPort="localhost:443",
username="username",
password="pass",
catalog="catalog",
)
assert expected_url == get_connection_url(trino_conn_obj)
def test_connection_url_with_params(self):
expected_url = "trino://username:pass@localhost:443/catalog?param=value"
trino_conn_obj = TrinoConnection(
scheme=TrinoScheme.trino,
hostPort="localhost:443",
username="username",
password="pass",
catalog="catalog",
params={"param": "value"},
)
assert expected_url == get_connection_url(trino_conn_obj)
def test_connection_with_proxies(self):
test_proxies = {"http": "http_proxy", "https": "https_proxy"}
trino_conn_obj = TrinoConnection(
scheme=TrinoScheme.trino,
hostPort="localhost:443",
username="username",
password="pass",
catalog="catalog",
proxies=test_proxies,
)
assert (
test_proxies
== get_connection_args(trino_conn_obj).get("http_session").proxies
)