diff --git a/ingestion/examples/workflows/trino.json b/ingestion/examples/workflows/trino.json index 02096d67ee0..1a349c56bff 100644 --- a/ingestion/examples/workflows/trino.json +++ b/ingestion/examples/workflows/trino.json @@ -4,8 +4,9 @@ "config": { "service_name": "local_trino", "host_port": "localhost:8080", - "catalog": "catalog_name", - "schema_name": "schema_name" + "username": "user", + "catalog": "tpcds", + "database": "tiny" } }, "sink": { diff --git a/ingestion/src/metadata/ingestion/source/trino.py b/ingestion/src/metadata/ingestion/source/trino.py index c733bc1e1b4..bd8a08db61b 100644 --- a/ingestion/src/metadata/ingestion/source/trino.py +++ b/ingestion/src/metadata/ingestion/source/trino.py @@ -9,15 +9,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Iterable, Optional -from urllib.parse import quote_plus +from typing import Iterable from sqlalchemy.inspection import inspect from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable - -from ..ometa.openmetadata_rest import MetadataServerConfig from .sql_source import SQLConnectionConfig, SQLSource +from ..ometa.openmetadata_rest import MetadataServerConfig class TrinoConfig(SQLConnectionConfig): @@ -25,18 +23,20 @@ class TrinoConfig(SQLConnectionConfig): scheme = "trino" service_type = "Trino" catalog: str - schema_name: Optional[str] + database: str def get_connection_url(self): - url = f"{self.scheme}://{self.host_port}" - if self.catalog: - url += f"/{quote_plus(self.catalog)}" - if self.schema_name: - url += f"/{quote_plus(self.schema_name)}" - if self.username: - url += f"?user={quote_plus(self.username)}" - if self.password: - url += f"&password={quote_plus(self.password)}" + url = f"{self.scheme}://" + if self.username is not None: + url += f"{self.username}" + if self.password is not None: + url += f":{self.password.get_secret_value()}" + url += "@" + url += f"{self.host_port}" + if self.catalog is not None: + url += f"/{self.catalog}" + if self.database is not None: + url += f"/{self.database}" return url @@ -53,6 +53,6 @@ class TrinoSource(SQLSource): def next_record(self) -> Iterable[OMetaDatabaseAndTable]: inspector = inspect(self.engine) if self.config.include_tables: - yield from self.fetch_tables(inspector, self.config.schema_name) + yield from self.fetch_tables(inspector, self.config.database) if self.config.include_views: - yield from self.fetch_views(inspector, self.config.schema_name) + yield from self.fetch_views(inspector, self.config.database) diff --git a/ingestion/tests/unit/trino_test.py b/ingestion/tests/unit/trino_test.py new file mode 100644 index 00000000000..75fef291193 --- /dev/null +++ b/ingestion/tests/unit/trino_test.py @@ -0,0 +1,43 @@ +import json +from unittest import TestCase + +from metadata.ingestion.api.workflow import Workflow + +config = """ +{ + "source": { + "type": "trino", + "config": { + "service_name": "local_trino", + "host_port": "localhost:8080", + "username": "user", + "catalog": "tpcds", + "database": "tiny" + } + }, + "sink": { + "type": "metadata-rest", + "config": {} + }, + "metadata_server": { + "type": "metadata-server", + "config": { + "api_endpoint": "http://localhost:8585/api", + "auth_provider_type": "no-auth" + } + } +} +""" + + +class WorkflowTest(TestCase): + def test_execute_200(self): + """ + stage/file.py must be compatible with source/sample_data.py, + this test try to catch if one becomes incompatible with the other + by running a workflow that includes both of them. + """ + workflow = Workflow.create(json.loads(config)) + workflow.execute() + workflow.stop() + self.assertTrue(True)