From 09f928b01b5d6c0cc30799078182ceb386e741f3 Mon Sep 17 00:00:00 2001 From: Onkar Ravgan Date: Fri, 22 Apr 2022 22:04:07 +0530 Subject: [PATCH] fixes to trino connector (#4377) Co-authored-by: Onkar Ravgan --- ingestion/examples/workflows/trino.json | 23 ++++++---- .../src/metadata/ingestion/source/trino.py | 42 ++++++++++++------- 2 files changed, 42 insertions(+), 23 deletions(-) diff --git a/ingestion/examples/workflows/trino.json b/ingestion/examples/workflows/trino.json index e74671f3095..6d5762180ad 100644 --- a/ingestion/examples/workflows/trino.json +++ b/ingestion/examples/workflows/trino.json @@ -8,11 +8,21 @@ "hostPort": "localhost:8080", "username": "user", "catalog": "tpcds", - "database": "tiny" + "database": "tiny", + "connectionOptions": {}, + "connectionArguments": {} } }, "sourceConfig": { - "config": {} + "config": { + "enableDataProfiler": true, + "generateSampleData": false, + "tableFilterPattern": { + "includes": [ + "customer.*" + ] + } + } } }, "sink": { @@ -20,11 +30,10 @@ "config": { } }, - "metadata_server": { - "type": "metadata-server", - "config": { - "api_endpoint": "http://localhost:8585/api", - "auth_provider_type": "no-auth" + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth" } } } diff --git a/ingestion/src/metadata/ingestion/source/trino.py b/ingestion/src/metadata/ingestion/source/trino.py index 0174c8c17b0..72283465509 100644 --- a/ingestion/src/metadata/ingestion/source/trino.py +++ b/ingestion/src/metadata/ingestion/source/trino.py @@ -16,21 +16,23 @@ from typing import Iterable import click from sqlalchemy.inspection import inspect -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) -from metadata.ingestion.api.source import InvalidSourceException -from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable -from metadata.ingestion.source.sql_source import SQLSource - -logger = logging.getLogger(__name__) - +from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.services.connections.database.trinoConnection import ( TrinoConnection, ) +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) +from metadata.ingestion.api.source import InvalidSourceException +from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable +from metadata.ingestion.source.sql_source import SQLSource +from metadata.utils.filters import filter_by_schema +from metadata.utils.fqdn_generator import get_fqdn + +logger = logging.getLogger(__name__) class TrinoSource(SQLSource): @@ -66,21 +68,29 @@ class TrinoSource(SQLSource): self.inspector = inspect(self.engine) self.schema_names = ( self.inspector.get_schema_names() - if not self.config.database - else [self.config.database] + if not self.service_connection.database + else [self.service_connection.database] ) return super().prepare() def next_record(self) -> Iterable[OMetaDatabaseAndTable]: for schema in self.schema_names: self.database_source_state.clear() - if not self.sql_config.schema_filter_pattern.included(schema): + if filter_by_schema( + self.source_config.schemaFilterPattern, schema_name=schema + ): self.status.filter(schema, "Schema pattern not allowed") continue - if self.config.include_tables: + + if self.source_config.includeTables: yield from self.fetch_tables(self.inspector, schema) - if self.config.include_views: + if self.source_config.includeViews: yield from self.fetch_views(self.inspector, schema) - if self.config.mark_deleted_tables_as_deleted: - schema_fqdn = f"{self.config.service_name}.{schema}" + if self.source_config.markDeletedTables: + schema_fqdn = get_fqdn( + DatabaseSchema, + self.config.serviceName, + self.service_connection.database, + schema, + ) yield from self.delete_tables(schema_fqdn)