diff --git a/ingestion/examples/workflows/presto.json b/ingestion/examples/workflows/presto.json index 7c04e0dd96e..5b04f24feeb 100644 --- a/ingestion/examples/workflows/presto.json +++ b/ingestion/examples/workflows/presto.json @@ -1,22 +1,31 @@ { "source": { "type": "presto", - "config": { - "service_name": "local_presto", - "host_port": "localhost:8080", - "catalog": "tpcds" + "serviceName": "local_presto", + "serviceConnection": { + "config": { + "type": "Presto", + "hostPort": "localhost:8080", + "catalog": "tpcds", + "username": "admin", + "password": "password", + "database": "database" + } + }, + "sourceConfig": { + "config": { + "generateSampleData": false + } } }, "sink": { "type": "metadata-rest", - "config": { - } + "config": {} }, - "metadata_server": { - "type": "metadata-server", - "config": { - "api_endpoint": "http://localhost:8585/api", - "auth_provider_type": "no-auth" + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth" } } } diff --git a/ingestion/src/metadata/ingestion/ometa/superset_rest.py b/ingestion/src/metadata/ingestion/ometa/superset_rest.py index f3e32ca9f72..9a1dafb6b0f 100644 --- a/ingestion/src/metadata/ingestion/ometa/superset_rest.py +++ b/ingestion/src/metadata/ingestion/ometa/superset_rest.py @@ -16,13 +16,6 @@ import logging from pydantic import SecretStr -from metadata.config.common import ConfigModel -from metadata.generated.schema.entity.services.connections.dashboard.supersetConnection import ( - SupersetConnection, -) -from metadata.generated.schema.entity.services.dashboardService import ( - DashboardServiceType, -) from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) diff --git a/ingestion/src/metadata/ingestion/source/oracle.py b/ingestion/src/metadata/ingestion/source/oracle.py index ce5f7b202de..639f198261a 100644 --- a/ingestion/src/metadata/ingestion/source/oracle.py +++ b/ingestion/src/metadata/ingestion/source/oracle.py @@ -27,7 +27,6 @@ from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.source.sql_source import SQLSource -from metadata.ingestion.source.sql_source_common import SQLConnectionConfig class OracleSource(SQLSource): diff --git a/ingestion/src/metadata/ingestion/source/presto.py b/ingestion/src/metadata/ingestion/source/presto.py index 89d4b4fa9e3..7d11f57d370 100644 --- a/ingestion/src/metadata/ingestion/source/presto.py +++ b/ingestion/src/metadata/ingestion/source/presto.py @@ -19,8 +19,11 @@ from sqlalchemy.engine import reflection from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataServerConfig, ) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.source.sql_source import SQLSource -from metadata.ingestion.source.sql_source_common import SQLConnectionConfig _type_map.update( { @@ -75,26 +78,16 @@ from metadata.generated.schema.entity.services.connections.database.prestoConnec ) -class PrestoConfig(PrestoConnection, SQLConnectionConfig): - def get_connection_url(self): - url = f"{self.scheme}://" - if self.username: - url += f"{quote_plus(self.username)}" - if self.password: - url += f":{quote_plus(self.password.get_secret_value())}" - url += "@" - url += f"{self.hostPort}" - url += f"/{self.catalog}" - if self.database: - url += f"?schema={quote_plus(self.database)}" - return url - - class PrestoSource(SQLSource): def __init__(self, config, metadata_config): super().__init__(config, metadata_config) @classmethod def create(cls, config_dict, metadata_config: OpenMetadataServerConfig): - config = PrestoConfig.parse_obj(config_dict) + config = WorkflowSource.parse_obj(config_dict) + connection: PrestoConnection = config.serviceConnection.__root__.config + if not isinstance(connection, PrestoConnection): + raise InvalidSourceException( + f"Expected PrestoConnection, but got {connection}" + ) return cls(config, metadata_config) diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index d2da18e9541..57084a0a4d3 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -258,21 +258,25 @@ class SQLSource(Source[OMetaDatabaseAndTable]): schema_names = inspector.get_schema_names() for schema in schema_names: # clear any previous source database state - self.database_source_state.clear() - if filter_by_schema( - self.source_config.schemaFilterPattern, schema_name=schema - ): - self.status.filter(schema, "Schema pattern not allowed") - continue + try: + self.database_source_state.clear() + if filter_by_schema( + self.source_config.schemaFilterPattern, schema_name=schema + ): + self.status.filter(schema, "Schema pattern not allowed") + continue - if self.source_config.includeTables: - yield from self.fetch_tables(inspector, schema) + if self.source_config.includeTables: + yield from self.fetch_tables(inspector, schema) - if self.source_config.includeViews: - yield from self.fetch_views(inspector, schema) - if self.source_config.markDeletedTables: - schema_fqdn = f"{self.config.serviceName}.{schema}" - yield from self.delete_tables(schema_fqdn) + if self.source_config.includeViews: + yield from self.fetch_views(inspector, schema) + if self.source_config.markDeletedTables: + schema_fqdn = f"{self.config.serviceName}.{schema}" + yield from self.delete_tables(schema_fqdn) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(err) def fetch_tables( self, inspector: Inspector, schema: str diff --git a/ingestion/src/metadata/utils/source_connections.py b/ingestion/src/metadata/utils/source_connections.py index 2341307e7de..52426ff99d5 100644 --- a/ingestion/src/metadata/utils/source_connections.py +++ b/ingestion/src/metadata/utils/source_connections.py @@ -43,6 +43,9 @@ from metadata.generated.schema.entity.services.connections.database.oracleConnec from metadata.generated.schema.entity.services.connections.database.postgresConnection import ( PostgresConnection, ) +from metadata.generated.schema.entity.services.connections.database.prestoConnection import ( + PrestoConnection, +) from metadata.generated.schema.entity.services.connections.database.redshiftConnection import ( RedshiftConnection, ) @@ -167,6 +170,21 @@ def _(connection: DatabricksConnection): return url +@get_connection_url.register +def _(connection: PrestoConnection): + url = f"{connection.scheme.value}://" + if connection.username: + url += f"{quote_plus(connection.username)}" + if connection.password: + url += f":{quote_plus(connection.password.get_secret_value())}" + url += "@" + url += f"{connection.hostPort}" + url += f"/{connection.catalog}" + if connection.database: + url += f"?schema={quote_plus(connection.database)}" + return url + + @singledispatch def get_connection_args(connection): if connection.connectionArguments: