diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/prestoConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/prestoConnection.json index 900fd68ee29..87fe25a7834 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/prestoConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/prestoConnection.json @@ -76,5 +76,5 @@ } }, "additionalProperties": false, - "required": ["hostPort", "username"] + "required": ["hostPort", "username", "catalog"] } diff --git a/ingestion/examples/airflow/dags/airflow_sample_data.py b/ingestion/examples/airflow/dags/airflow_sample_data.py index 553134589d3..3bce81f89f0 100644 --- a/ingestion/examples/airflow/dags/airflow_sample_data.py +++ b/ingestion/examples/airflow/dags/airflow_sample_data.py @@ -9,9 +9,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json from datetime import timedelta +import yaml from airflow import DAG try: @@ -21,10 +21,6 @@ except ModuleNotFoundError: from airflow.utils.dates import days_ago -from airflow_provider_openmetadata.lineage.callback import ( - failure_callback, - success_callback, -) from metadata.ingestion.api.workflow import Workflow default_args = { @@ -34,39 +30,29 @@ default_args = { "retries": 3, "retry_delay": timedelta(seconds=10), "execution_timeout": timedelta(minutes=60), - "on_failure_callback": failure_callback, - "on_success_callback": success_callback, } config = """ -{ - "source": { - "type": "sample_data", - "serviceName": "sample_data", - "serviceConnection": { - "config": { - "type": "SampleData", - "sampleDataFolder": "./examples/sample_data" - } - }, - "sourceConfig": {} - }, - "sink": { - "type": "metadata-rest", - "config": {} - }, - "workflowConfig": { - "openMetadataServerConfig": { - "hostPort": "http://localhost:8585/api", - "authProvider": "no-auth" - } - } -} +source: + type: sample_data + serviceName: sample_data + serviceConnection: + config: + type: SampleData + sampleDataFolder: "./examples/sample_data" + sourceConfig: {} +sink: + type: metadata-rest + config: {} +workflowConfig: + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: no-auth """ def metadata_ingestion_workflow(): - workflow_config = json.loads(config) + workflow_config = yaml.safe_load(config) workflow = Workflow.create(workflow_config) workflow.execute() workflow.raise_from_status() diff --git a/ingestion/src/metadata/ingestion/source/presto.py b/ingestion/src/metadata/ingestion/source/presto.py index 71456efdf88..17cbb77b295 100644 --- a/ingestion/src/metadata/ingestion/source/presto.py +++ b/ingestion/src/metadata/ingestion/source/presto.py @@ -10,20 +10,29 @@ # limitations under the License. import re -from urllib.parse import quote_plus +from typing import Iterable from pyhive.sqlalchemy_presto import PrestoDialect, _type_map -from sqlalchemy import types, util +from sqlalchemy import inspect, types, util from sqlalchemy.engine import reflection +from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) +from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import InvalidSourceException +from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.source.sql_source import SQLSource +from metadata.utils.filters import filter_by_schema +from metadata.utils.fqdn_generator import get_fqdn +from metadata.utils.logger import ometa_logger + +logger = ometa_logger() _type_map.update( { @@ -81,6 +90,8 @@ from metadata.generated.schema.entity.services.connections.database.prestoConnec class PrestoSource(SQLSource): def __init__(self, config, metadata_config): super().__init__(config, metadata_config) + self.schema_names = None + self.inspector = None @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): @@ -91,3 +102,42 @@ class PrestoSource(SQLSource): f"Expected PrestoConnection, but got {connection}" ) return cls(config, metadata_config) + + def _get_database(self, _) -> Database: + return Database( + name=self.service_connection.catalog, + service=EntityReference( + id=self.service.id, type=self.service_connection.type.value + ), + ) + + def prepare(self): + self.inspector = inspect(self.engine) + self.schema_names = ( + self.inspector.get_schema_names() + if not self.service_connection.database + else [self.service_connection.database] + ) + return super().prepare() + + def next_record(self) -> Iterable[OMetaDatabaseAndTable]: + for schema in self.schema_names: + self.database_source_state.clear() + if filter_by_schema( + self.source_config.schemaFilterPattern, schema_name=schema + ): + self.status.filter(schema, "Schema pattern not allowed") + continue + + if self.source_config.includeTables: + yield from self.fetch_tables(self.inspector, schema) + if self.source_config.includeViews: + logger.info("Presto includes views when fetching tables.") + if self.source_config.markDeletedTables: + schema_fqdn = get_fqdn( + DatabaseSchema, + self.config.serviceName, + self.service_connection.catalog, + schema, + ) + yield from self.delete_tables(schema_fqdn) diff --git a/ingestion/src/metadata/orm_profiler/orm/functions/conn_test.py b/ingestion/src/metadata/orm_profiler/orm/functions/conn_test.py new file mode 100644 index 00000000000..e8cb228867d --- /dev/null +++ b/ingestion/src/metadata/orm_profiler/orm/functions/conn_test.py @@ -0,0 +1,44 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Define a vanilla connection testing function. +This will be executed as a way to make sure +that the Engine can reach and execute in the +source. +""" +from sqlalchemy.ext.compiler import compiles +from sqlalchemy.sql.expression import ClauseElement, Executable + +from metadata.orm_profiler.metrics.core import CACHE +from metadata.orm_profiler.orm.registry import Dialects +from metadata.utils.logger import profiler_logger + +logger = profiler_logger() + + +class ConnTestFn(Executable, ClauseElement): + inherit_cache = CACHE + + +@compiles(ConnTestFn) +def _(*_, **__): + return "SELECT 42" + + +@compiles(ConnTestFn, Dialects.Oracle) +def _(*_, **__): + return "SELECT 42 FROM DUAL" + + +@compiles(ConnTestFn, Dialects.BigQuery) +def _(*_, **__): + return "SELECT SESSION_USER()" diff --git a/ingestion/src/metadata/utils/connections.py b/ingestion/src/metadata/utils/connections.py index cc7a39410ed..5312654e2ba 100644 --- a/ingestion/src/metadata/utils/connections.py +++ b/ingestion/src/metadata/utils/connections.py @@ -19,7 +19,7 @@ from functools import singledispatch from typing import Union import requests -from sqlalchemy import create_engine +from sqlalchemy import create_engine, func from sqlalchemy.engine.base import Engine from sqlalchemy.exc import OperationalError from sqlalchemy.orm import sessionmaker @@ -65,6 +65,7 @@ from metadata.generated.schema.entity.services.connections.database.snowflakeCon from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import ( KafkaConnection, ) +from metadata.orm_profiler.orm.functions.conn_test import ConnTestFn from metadata.utils.connection_clients import ( DeltaLakeClient, DynamoClient, @@ -287,8 +288,8 @@ def test_connection(connection) -> None: :return: None or raise an exception if we cannot connect """ try: - with connection.connect() as _: - pass + with connection.connect() as conn: + conn.execute(ConnTestFn()) except OperationalError as err: raise SourceConnectionException( f"Connection error for {connection} - {err}. Check the connection details."