Fix #1729: Connector Trino works with username. (#1732)

Implementation details
I have decided to rename schema_name to database and make it mandatory. Without database there is an error while scanning all available tables. The connector doesn't support multiple databases at the moment. It has to be tested with passwords. Trino requires SSL if you use passwords. It has to be tested with impersonation. I have removed quote_plus because I don't think it's needed.

- [x] Support username
- [ ] There is an integration test
- [ ] Support impersonation
- [ ] Support passwords
- [ ] Support tokens
- [ ] Support multiple databases
This commit is contained in:
Alberto Miorin 2021-12-13 22:05:28 +01:00 committed by GitHub
parent 324573dc76
commit 93276079ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 62 additions and 18 deletions

View File

@ -4,8 +4,9 @@
"config": {
"service_name": "local_trino",
"host_port": "localhost:8080",
"catalog": "catalog_name",
"schema_name": "schema_name"
"username": "user",
"catalog": "tpcds",
"database": "tiny"
}
},
"sink": {

View File

@ -9,15 +9,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterable, Optional
from urllib.parse import quote_plus
from typing import Iterable
from sqlalchemy.inspection import inspect
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from ..ometa.openmetadata_rest import MetadataServerConfig
from .sql_source import SQLConnectionConfig, SQLSource
from ..ometa.openmetadata_rest import MetadataServerConfig
class TrinoConfig(SQLConnectionConfig):
@ -25,18 +23,20 @@ class TrinoConfig(SQLConnectionConfig):
scheme = "trino"
service_type = "Trino"
catalog: str
schema_name: Optional[str]
database: str
def get_connection_url(self):
url = f"{self.scheme}://{self.host_port}"
if self.catalog:
url += f"/{quote_plus(self.catalog)}"
if self.schema_name:
url += f"/{quote_plus(self.schema_name)}"
if self.username:
url += f"?user={quote_plus(self.username)}"
if self.password:
url += f"&password={quote_plus(self.password)}"
url = f"{self.scheme}://"
if self.username is not None:
url += f"{self.username}"
if self.password is not None:
url += f":{self.password.get_secret_value()}"
url += "@"
url += f"{self.host_port}"
if self.catalog is not None:
url += f"/{self.catalog}"
if self.database is not None:
url += f"/{self.database}"
return url
@ -53,6 +53,6 @@ class TrinoSource(SQLSource):
def next_record(self) -> Iterable[OMetaDatabaseAndTable]:
inspector = inspect(self.engine)
if self.config.include_tables:
yield from self.fetch_tables(inspector, self.config.schema_name)
yield from self.fetch_tables(inspector, self.config.database)
if self.config.include_views:
yield from self.fetch_views(inspector, self.config.schema_name)
yield from self.fetch_views(inspector, self.config.database)

View File

@ -0,0 +1,43 @@
import json
from unittest import TestCase
from metadata.ingestion.api.workflow import Workflow
config = """
{
"source": {
"type": "trino",
"config": {
"service_name": "local_trino",
"host_port": "localhost:8080",
"username": "user",
"catalog": "tpcds",
"database": "tiny"
}
},
"sink": {
"type": "metadata-rest",
"config": {}
},
"metadata_server": {
"type": "metadata-server",
"config": {
"api_endpoint": "http://localhost:8585/api",
"auth_provider_type": "no-auth"
}
}
}
"""
class WorkflowTest(TestCase):
def test_execute_200(self):
"""
stage/file.py must be compatible with source/sample_data.py,
this test try to catch if one becomes incompatible with the other
by running a workflow that includes both of them.
"""
workflow = Workflow.create(json.loads(config))
workflow.execute()
workflow.stop()
self.assertTrue(True)