fixes to trino connector (#4377)

Co-authored-by: Onkar Ravgan <onkarravgan@Onkars-MacBook-Pro.local>
This commit is contained in:
Onkar Ravgan 2022-04-22 22:04:07 +05:30 committed by GitHub
parent 777e9e8097
commit 09f928b01b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 42 additions and 23 deletions

View File

@ -8,11 +8,21 @@
"hostPort": "localhost:8080", "hostPort": "localhost:8080",
"username": "user", "username": "user",
"catalog": "tpcds", "catalog": "tpcds",
"database": "tiny" "database": "tiny",
"connectionOptions": {},
"connectionArguments": {}
} }
}, },
"sourceConfig": { "sourceConfig": {
"config": {} "config": {
"enableDataProfiler": true,
"generateSampleData": false,
"tableFilterPattern": {
"includes": [
"customer.*"
]
}
}
} }
}, },
"sink": { "sink": {
@ -20,11 +30,10 @@
"config": { "config": {
} }
}, },
"metadata_server": { "workflowConfig": {
"type": "metadata-server", "openMetadataServerConfig": {
"config": { "hostPort": "http://localhost:8585/api",
"api_endpoint": "http://localhost:8585/api", "authProvider": "no-auth"
"auth_provider_type": "no-auth"
} }
} }
} }

View File

@ -16,21 +16,23 @@ from typing import Iterable
import click import click
from sqlalchemy.inspection import inspect from sqlalchemy.inspection import inspect
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
OpenMetadataConnection,
)
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.source.sql_source import SQLSource
logger = logging.getLogger(__name__)
from metadata.generated.schema.entity.services.connections.database.trinoConnection import ( from metadata.generated.schema.entity.services.connections.database.trinoConnection import (
TrinoConnection, TrinoConnection,
) )
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource, Source as WorkflowSource,
) )
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.source.sql_source import SQLSource
from metadata.utils.filters import filter_by_schema
from metadata.utils.fqdn_generator import get_fqdn
logger = logging.getLogger(__name__)
class TrinoSource(SQLSource): class TrinoSource(SQLSource):
@ -66,21 +68,29 @@ class TrinoSource(SQLSource):
self.inspector = inspect(self.engine) self.inspector = inspect(self.engine)
self.schema_names = ( self.schema_names = (
self.inspector.get_schema_names() self.inspector.get_schema_names()
if not self.config.database if not self.service_connection.database
else [self.config.database] else [self.service_connection.database]
) )
return super().prepare() return super().prepare()
def next_record(self) -> Iterable[OMetaDatabaseAndTable]: def next_record(self) -> Iterable[OMetaDatabaseAndTable]:
for schema in self.schema_names: for schema in self.schema_names:
self.database_source_state.clear() self.database_source_state.clear()
if not self.sql_config.schema_filter_pattern.included(schema): if filter_by_schema(
self.source_config.schemaFilterPattern, schema_name=schema
):
self.status.filter(schema, "Schema pattern not allowed") self.status.filter(schema, "Schema pattern not allowed")
continue continue
if self.config.include_tables:
if self.source_config.includeTables:
yield from self.fetch_tables(self.inspector, schema) yield from self.fetch_tables(self.inspector, schema)
if self.config.include_views: if self.source_config.includeViews:
yield from self.fetch_views(self.inspector, schema) yield from self.fetch_views(self.inspector, schema)
if self.config.mark_deleted_tables_as_deleted: if self.source_config.markDeletedTables:
schema_fqdn = f"{self.config.service_name}.{schema}" schema_fqdn = get_fqdn(
DatabaseSchema,
self.config.serviceName,
self.service_connection.database,
schema,
)
yield from self.delete_tables(schema_fqdn) yield from self.delete_tables(schema_fqdn)