diff --git a/ingestion/src/metadata/ingestion/source/dashboard/domodashboard/connection.py b/ingestion/src/metadata/ingestion/source/dashboard/domodashboard/connection.py index 2f923a22cbe..1478e8798c6 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/domodashboard/connection.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/domodashboard/connection.py @@ -12,12 +12,19 @@ """ Source connection handler """ +from functools import partial + from pydomo import Domo from metadata.generated.schema.entity.services.connections.dashboard.domoDashboardConnection import ( DomoDashboardConnection, ) -from metadata.ingestion.connections.test_connections import SourceConnectionException +from metadata.ingestion.connections.test_connections import ( + SourceConnectionException, + TestConnectionResult, + TestConnectionStep, + test_connection_steps, +) def get_connection(connection: DomoDashboardConnection) -> Domo: @@ -36,12 +43,20 @@ def get_connection(connection: DomoDashboardConnection) -> Domo: raise SourceConnectionException(msg) -def test_connection(domo: Domo, _) -> None: +def test_connection(domo: Domo, _) -> TestConnectionResult: """ Test connection """ - try: - domo.page_list() - except Exception as exc: - msg = f"Unknown error connecting with {domo}: {exc}." - raise SourceConnectionException(msg) + + def custom_executor(): + result = domo.page_list() + return list(result) + + steps = [ + TestConnectionStep( + function=partial(custom_executor), + name="Get Dashboard", + ), + ] + + return test_connection_steps(steps) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/metabase/connection.py b/ingestion/src/metadata/ingestion/source/dashboard/metabase/connection.py index 9db0ae2b12d..621ad13d5ab 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/metabase/connection.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/metabase/connection.py @@ -13,6 +13,7 @@ Source connection handler """ import json +from functools import partial from typing import Any, Dict import requests @@ -20,7 +21,12 @@ import requests from metadata.generated.schema.entity.services.connections.dashboard.metabaseConnection import ( MetabaseConnection, ) -from metadata.ingestion.connections.test_connections import SourceConnectionException +from metadata.ingestion.connections.test_connections import ( + SourceConnectionException, + TestConnectionResult, + TestConnectionStep, + test_connection_steps, +) def get_connection(connection: MetabaseConnection) -> Dict[str, Any]: @@ -50,15 +56,24 @@ def get_connection(connection: MetabaseConnection) -> Dict[str, Any]: raise SourceConnectionException(msg) from exc -def test_connection(client, _) -> None: +def test_connection(client, _) -> TestConnectionResult: """ Test connection """ - try: - requests.get( # pylint: disable=missing-timeout + + def custom_executor(): + result = requests.get( # pylint: disable=missing-timeout client["connection"].hostPort + "/api/dashboard", headers=client["metabase_session"], ) - except Exception as exc: - msg = f"Unknown error connecting with {client}: {exc}." - raise SourceConnectionException(msg) from exc + + return list(result) + + steps = [ + TestConnectionStep( + function=partial(custom_executor), + name="Get Dashboard", + ), + ] + + return test_connection_steps(steps) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau/connection.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau/connection.py index 8021cdca975..7382ecd0f51 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/tableau/connection.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau/connection.py @@ -23,6 +23,7 @@ from metadata.generated.schema.entity.services.connections.dashboard.tableauConn ) from metadata.ingestion.connections.test_connections import ( SourceConnectionException, + TestConnectionResult, TestConnectionStep, test_connection_steps, ) @@ -84,7 +85,7 @@ def get_connection(connection: TableauConnection) -> TableauServerConnection: ) -def test_connection(client: TableauServerConnection, _) -> None: +def test_connection(client: TableauServerConnection, _) -> TestConnectionResult: """ Test connection """ @@ -112,4 +113,4 @@ def test_connection(client: TableauServerConnection, _) -> None: ), ] - test_connection_steps(steps) + return test_connection_steps(steps) diff --git a/ingestion/src/metadata/ingestion/source/database/domodatabase/connection.py b/ingestion/src/metadata/ingestion/source/database/domodatabase/connection.py index 948a909f0b4..22d75b5f6b5 100644 --- a/ingestion/src/metadata/ingestion/source/database/domodatabase/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/domodatabase/connection.py @@ -12,12 +12,19 @@ """ Source connection handler """ +from functools import partial + from pydomo import Domo from metadata.generated.schema.entity.services.connections.database.domoDatabaseConnection import ( DomoDatabaseConnection, ) -from metadata.ingestion.connections.test_connections import SourceConnectionException +from metadata.ingestion.connections.test_connections import ( + SourceConnectionException, + TestConnectionResult, + TestConnectionStep, + test_connection_steps, +) def get_connection(connection: DomoDatabaseConnection) -> Domo: @@ -36,12 +43,20 @@ def get_connection(connection: DomoDatabaseConnection) -> Domo: raise SourceConnectionException(msg) -def test_connection(domo: Domo, _) -> None: +def test_connection(domo: Domo, _) -> TestConnectionResult: """ Test connection """ - try: - domo.datasets.list() - except Exception as exc: - msg = f"Unknown error connecting with {domo}: {exc}." - raise SourceConnectionException(msg) + + def custom_executor(): + result = domo.datasets.list() + return list(result) + + steps = [ + TestConnectionStep( + function=partial(custom_executor), + name="Get Tables", + ), + ] + + return test_connection_steps(steps) diff --git a/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py b/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py index f08912d92e9..089869f8a55 100644 --- a/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py @@ -69,6 +69,7 @@ class DomodatabaseSource(DatabaseServiceSource): self.domo_client = get_connection(self.service_connection) self.client = DomoClient(self.service_connection) super().__init__() + self.test_connection() @classmethod def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection): diff --git a/ingestion/src/metadata/ingestion/source/database/glue/connection.py b/ingestion/src/metadata/ingestion/source/database/glue/connection.py index 9ae19708fe7..16c67097926 100644 --- a/ingestion/src/metadata/ingestion/source/database/glue/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/glue/connection.py @@ -12,14 +12,19 @@ """ Source connection handler """ -from botocore.client import ClientError +from functools import partial + from sqlalchemy.engine import Engine from metadata.clients.aws_client import AWSClient from metadata.generated.schema.entity.services.connections.database.glueConnection import ( GlueConnection, ) -from metadata.ingestion.connections.test_connections import SourceConnectionException +from metadata.ingestion.connections.test_connections import ( + TestConnectionResult, + TestConnectionStep, + test_connection_steps, +) def get_connection(connection: GlueConnection) -> Engine: @@ -29,17 +34,35 @@ def get_connection(connection: GlueConnection) -> Engine: return AWSClient(connection.awsConfig).get_glue_client() -def test_connection(client, _) -> None: +def test_connection(client, _) -> TestConnectionResult: """ Test connection """ - try: - paginator = client.get_paginator("get_databases") - paginator.paginate() - except ClientError as err: - msg = f"Connection error for {client}: {err}. Check the connection details." - raise SourceConnectionException(msg) from err - except Exception as exc: - msg = f"Unknown error connecting with {client}: {exc}." - raise SourceConnectionException(msg) from exc + def custom_executor_for_database(): + paginator = client.get_paginator("get_databases") + return list(paginator.paginate()) + + def custom_executor_for_table(): + paginator = client.get_paginator("get_databases") + paginator_response = paginator.paginate() + for page in paginator_response: + for schema in page["DatabaseList"]: + database_name = schema["Name"] + paginator = client.get_paginator("get_tables") + tables = paginator.paginate(DatabaseName=database_name) + return list(tables) + return None + + steps = [ + TestConnectionStep( + function=partial(custom_executor_for_database), + name="Get Databases and Schemas", + ), + TestConnectionStep( + function=partial(custom_executor_for_table), + name="Get Tables", + ), + ] + + return test_connection_steps(steps) diff --git a/ingestion/src/metadata/ingestion/source/database/glue/metadata.py b/ingestion/src/metadata/ingestion/source/database/glue/metadata.py index 57054783f22..4ff4466fc23 100755 --- a/ingestion/src/metadata/ingestion/source/database/glue/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/glue/metadata.py @@ -72,6 +72,7 @@ class GlueSource(DatabaseServiceSource): self.status = SQLSourceStatus() self.glue = get_connection(self.service_connection) super().__init__() + self.test_connection() @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): diff --git a/ingestion/src/metadata/ingestion/source/database/oracle/connection.py b/ingestion/src/metadata/ingestion/source/database/oracle/connection.py index eccca8e15a1..ac08a7cbd25 100644 --- a/ingestion/src/metadata/ingestion/source/database/oracle/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/oracle/connection.py @@ -20,6 +20,7 @@ import oracledb from oracledb.exceptions import DatabaseError from pydantic import SecretStr from sqlalchemy.engine import Engine +from sqlalchemy.inspection import inspect from metadata.generated.schema.entity.services.connections.database.oracleConnection import ( OracleConnection, @@ -31,7 +32,11 @@ from metadata.ingestion.connections.builders import ( get_connection_args_common, get_connection_options_dict, ) -from metadata.ingestion.connections.test_connections import test_connection_db_common +from metadata.ingestion.connections.test_connections import ( + TestConnectionResult, + TestConnectionStep, + test_connection_db_common, +) from metadata.utils.logger import ingestion_logger CX_ORACLE_LIB_VERSION = "8.3.0" @@ -102,8 +107,25 @@ def get_connection(connection: OracleConnection) -> Engine: ) -def test_connection(engine: Engine, _) -> None: +def test_connection(engine: Engine, _) -> TestConnectionResult: """ Test connection """ - test_connection_db_common(engine) + inspector = inspect(engine) + + steps = [ + TestConnectionStep( + function=inspector.get_schema_names, + name="Get Schemas", + ), + TestConnectionStep( + function=inspector.get_table_names, + name="Get Tables", + ), + TestConnectionStep( + function=inspector.get_view_names, + name="Get Views", + mandatory=False, + ), + ] + return test_connection_db_common(engine, steps) diff --git a/ingestion/src/metadata/ingestion/source/database/presto/connection.py b/ingestion/src/metadata/ingestion/source/database/presto/connection.py index 99045519b89..a8c0906ce2c 100644 --- a/ingestion/src/metadata/ingestion/source/database/presto/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/presto/connection.py @@ -12,9 +12,11 @@ """ Source connection handler """ +from functools import partial from urllib.parse import quote_plus from sqlalchemy.engine import Engine +from sqlalchemy.inspection import inspect from metadata.generated.schema.entity.services.connections.database.prestoConnection import ( PrestoConnection, @@ -23,7 +25,11 @@ from metadata.ingestion.connections.builders import ( create_generic_db_connection, get_connection_args_common, ) -from metadata.ingestion.connections.test_connections import test_connection_db_common +from metadata.ingestion.connections.test_connections import ( + TestConnectionResult, + TestConnectionStep, + test_connection_db_common, +) def get_connection_url(connection: PrestoConnection) -> str: @@ -52,8 +58,40 @@ def get_connection(connection: PrestoConnection) -> Engine: ) -def test_connection(engine: Engine, _) -> None: +def test_connection(engine: Engine, _) -> TestConnectionResult: """ Test connection """ - test_connection_db_common(engine) + inspector = inspect(engine) + + def custom_executor(engine, statement): + cursor = engine.execute(statement) + return list(cursor.all()) + + def custom_executor_for_table(): + schema_name = inspector.get_schema_names() + if schema_name: + for schema in schema_name: + table_name = inspector.get_table_names(schema) + return table_name + return None + + steps = [ + TestConnectionStep( + function=partial( + custom_executor, + statement="SHOW CATALOGS", + engine=engine, + ), + name="Get Databases", + ), + TestConnectionStep( + function=inspector.get_schema_names, + name="Get Schemas", + ), + TestConnectionStep( + function=partial(custom_executor_for_table), + name="Get Tables", + ), + ] + return test_connection_db_common(engine, steps) diff --git a/ingestion/src/metadata/ingestion/source/database/singlestore/connection.py b/ingestion/src/metadata/ingestion/source/database/singlestore/connection.py index e1aeaf4b70b..b837e8106c4 100644 --- a/ingestion/src/metadata/ingestion/source/database/singlestore/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/singlestore/connection.py @@ -12,7 +12,10 @@ """ Source connection handler """ +from functools import partial + from sqlalchemy.engine import Engine +from sqlalchemy.inspection import inspect from metadata.generated.schema.entity.services.connections.database.singleStoreConnection import ( SingleStoreConnection, @@ -22,7 +25,11 @@ from metadata.ingestion.connections.builders import ( get_connection_args_common, get_connection_url_common, ) -from metadata.ingestion.connections.test_connections import test_connection_db_common +from metadata.ingestion.connections.test_connections import ( + TestConnectionResult, + TestConnectionStep, + test_connection_db_common, +) def get_connection(connection: SingleStoreConnection) -> Engine: @@ -36,8 +43,52 @@ def get_connection(connection: SingleStoreConnection) -> Engine: ) -def test_connection(engine: Engine, _) -> None: +def test_connection(engine: Engine, _) -> TestConnectionResult: """ Test connection """ - test_connection_db_common(engine) + inspector = inspect(engine) + + def custom_executor_for_table(): + schema_name = inspector.get_schema_names() + if schema_name: + for schema in schema_name: + if schema not in ( + "cluster", + "information_schema", + "memsql", + "s2_dataset_martech_3db3efe6", + ): + table_name = inspector.get_table_names(schema) + return table_name + return None + + def custom_executor_for_view(): + schema_name = inspector.get_schema_names() + if schema_name: + for schema in schema_name: + if schema not in ( + "cluster", + "information_schema", + "memsql", + "s2_dataset_martech_3db3efe6", + ): + view_name = inspector.get_view_names(schema) + return view_name + return None + + steps = [ + TestConnectionStep( + function=inspector.get_schema_names, + name="Get Schemas", + ), + TestConnectionStep( + function=partial(custom_executor_for_table), + name="Get Tables", + ), + TestConnectionStep( + function=partial(custom_executor_for_view), + name="Get Views", + ), + ] + return test_connection_db_common(engine, steps) diff --git a/ingestion/src/metadata/ingestion/source/database/trino/connection.py b/ingestion/src/metadata/ingestion/source/database/trino/connection.py index d94ce4a89f2..8a523b3ef0a 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/connection.py @@ -12,10 +12,12 @@ """ Source connection handler """ +from functools import partial from urllib.parse import quote_plus from requests import Session from sqlalchemy.engine import Engine +from sqlalchemy.inspection import inspect from metadata.generated.schema.entity.services.connections.database.trinoConnection import ( TrinoConnection, @@ -26,7 +28,12 @@ from metadata.ingestion.connections.builders import ( init_empty_connection_arguments, ) from metadata.ingestion.connections.secrets import connection_with_options_secrets -from metadata.ingestion.connections.test_connections import test_connection_db_common +from metadata.ingestion.connections.test_connections import ( + TestConnectionResult, + TestConnectionStep, + test_connection_db_common, +) +from metadata.ingestion.source.database.trino.queries import TRINO_GET_DATABASE def get_connection_url(connection: TrinoConnection) -> str: @@ -73,8 +80,53 @@ def get_connection(connection: TrinoConnection) -> Engine: ) -def test_connection(engine: Engine, _) -> None: +def test_connection(engine: Engine, _) -> TestConnectionResult: """ Test connection """ - test_connection_db_common(engine) + inspector = inspect(engine) + + def custom_executor(engine, statement): + cursor = engine.execute(statement) + return list(cursor.all()) + + def custom_executor_for_table(): + schema_name = inspector.get_schema_names() + if schema_name: + for schema in schema_name: + table_name = inspector.get_table_names(schema) + return table_name + return None + + def custom_executor_for_view(): + schema_name = inspector.get_schema_names() + if schema_name: + for schema in schema_name: + view_name = inspector.get_view_names(schema) + return view_name + return None + + steps = [ + TestConnectionStep( + function=partial( + custom_executor, + statement=TRINO_GET_DATABASE, + engine=engine, + ), + name="Get Databases", + ), + TestConnectionStep( + function=inspector.get_schema_names, + name="Get Schemas", + ), + TestConnectionStep( + function=partial(custom_executor_for_table), + name="Get Tables", + ), + TestConnectionStep( + function=partial(custom_executor_for_view), + name="Get Views", + mandatory=False, + ), + ] + return test_connection_db_common(engine, steps) diff --git a/ingestion/src/metadata/ingestion/source/database/trino/queries.py b/ingestion/src/metadata/ingestion/source/database/trino/queries.py index 104dfcb62c1..39677e9ebd5 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/queries.py @@ -25,3 +25,7 @@ TRINO_TABLE_COMMENTS = textwrap.dedent( and "comment" is not null """ ) + +TRINO_GET_DATABASE = """ +SHOW CATALOGS +""" diff --git a/ingestion/src/metadata/ingestion/source/database/vertica/connection.py b/ingestion/src/metadata/ingestion/source/database/vertica/connection.py index 4d863a7dcf4..5d0c7448f91 100644 --- a/ingestion/src/metadata/ingestion/source/database/vertica/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/vertica/connection.py @@ -12,7 +12,10 @@ """ Source connection handler """ +from functools import partial + from sqlalchemy.engine import Engine +from sqlalchemy.inspection import inspect from metadata.generated.schema.entity.services.connections.database.verticaConnection import ( VerticaConnection, @@ -22,7 +25,12 @@ from metadata.ingestion.connections.builders import ( get_connection_args_common, get_connection_url_common, ) -from metadata.ingestion.connections.test_connections import test_connection_db_common +from metadata.ingestion.connections.test_connections import ( + TestConnectionResult, + TestConnectionStep, + test_connection_db_common, +) +from metadata.ingestion.source.database.vertica.queries import VERTICA_LIST_DATABASES def get_connection(connection: VerticaConnection) -> Engine: @@ -36,8 +44,37 @@ def get_connection(connection: VerticaConnection) -> Engine: ) -def test_connection(engine: Engine, _) -> None: +def test_connection(engine: Engine, _) -> TestConnectionResult: """ Test connection """ - test_connection_db_common(engine) + inspector = inspect(engine) + + def custom_executor(engine, statement): + cursor = engine.execute(statement) + return list(cursor.all()) + + steps = [ + TestConnectionStep( + function=partial( + custom_executor, + statement=VERTICA_LIST_DATABASES, + engine=engine, + ), + name="Get Databases", + ), + TestConnectionStep( + function=inspector.get_schema_names, + name="Get Schemas", + ), + TestConnectionStep( + function=inspector.get_table_names, + name="Get Tables", + ), + TestConnectionStep( + function=inspector.get_view_names, + name="Get Views", + mandatory=False, + ), + ] + return test_connection_db_common(engine, steps) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/domopipeline/connection.py b/ingestion/src/metadata/ingestion/source/pipeline/domopipeline/connection.py index a41c94163db..5c7171f03b8 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/domopipeline/connection.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/domopipeline/connection.py @@ -12,13 +12,20 @@ """ Source connection handler """ +from functools import partial + from pydomo import Domo from metadata.clients.domo_client import DomoClient from metadata.generated.schema.entity.services.connections.pipeline.domoPipelineConnection import ( DomoPipelineConnection, ) -from metadata.ingestion.connections.test_connections import SourceConnectionException +from metadata.ingestion.connections.test_connections import ( + SourceConnectionException, + TestConnectionResult, + TestConnectionStep, + test_connection_steps, +) def get_connection(connection: DomoPipelineConnection) -> Domo: @@ -32,12 +39,20 @@ def get_connection(connection: DomoPipelineConnection) -> Domo: raise SourceConnectionException(msg) -def test_connection(connection: Domo, _) -> None: +def test_connection(connection: Domo, _) -> TestConnectionResult: """ Test connection """ - try: - connection.get_pipelines() - except Exception as exc: - msg = f"Unknown error while extracting pipeline from domo: {exc}." - raise SourceConnectionException(msg) + + def custom_executor(): + result = connection.get_pipelines() + return list(result) + + steps = [ + TestConnectionStep( + function=partial(custom_executor), + name="Get Pipeline", + ), + ] + + return test_connection_steps(steps) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java index ddbed458b91..3e5c175826a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java @@ -239,7 +239,7 @@ public class AirflowRESTClient extends PipelineServiceClient { String connectionPayload = JsonUtils.pojoToJson(testServiceConnection); response = post(statusUrl, connectionPayload); if (response.statusCode() == 200) { - return Response.status(200, response.body()).build(); + return Response.status(response.statusCode()).entity(response.body()).build(); } } catch (Exception e) { throw PipelineServiceClientException.byMessage("Failed to test connection.", e.getMessage());