Add steps in test_connection function (#10408)

* Add steps in test_connection funstion

* Add steps in test_connection funstion

* Add steps in test_connection funstion

* Fix: Pylint issue

* Status response changefor statuscode 200

---------

Co-authored-by: Nahuel <nahuel@getcollate.io>
This commit is contained in:
Milan Bariya 2023-03-10 10:02:26 +05:30 committed by GitHub
parent 81dec813a0
commit bc4abc44ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 348 additions and 58 deletions

View File

@ -12,12 +12,19 @@
"""
Source connection handler
"""
from functools import partial
from pydomo import Domo
from metadata.generated.schema.entity.services.connections.dashboard.domoDashboardConnection import (
DomoDashboardConnection,
)
from metadata.ingestion.connections.test_connections import SourceConnectionException
from metadata.ingestion.connections.test_connections import (
SourceConnectionException,
TestConnectionResult,
TestConnectionStep,
test_connection_steps,
)
def get_connection(connection: DomoDashboardConnection) -> Domo:
@ -36,12 +43,20 @@ def get_connection(connection: DomoDashboardConnection) -> Domo:
raise SourceConnectionException(msg)
def test_connection(domo: Domo, _) -> None:
def test_connection(domo: Domo, _) -> TestConnectionResult:
"""
Test connection
"""
try:
domo.page_list()
except Exception as exc:
msg = f"Unknown error connecting with {domo}: {exc}."
raise SourceConnectionException(msg)
def custom_executor():
result = domo.page_list()
return list(result)
steps = [
TestConnectionStep(
function=partial(custom_executor),
name="Get Dashboard",
),
]
return test_connection_steps(steps)

View File

@ -13,6 +13,7 @@
Source connection handler
"""
import json
from functools import partial
from typing import Any, Dict
import requests
@ -20,7 +21,12 @@ import requests
from metadata.generated.schema.entity.services.connections.dashboard.metabaseConnection import (
MetabaseConnection,
)
from metadata.ingestion.connections.test_connections import SourceConnectionException
from metadata.ingestion.connections.test_connections import (
SourceConnectionException,
TestConnectionResult,
TestConnectionStep,
test_connection_steps,
)
def get_connection(connection: MetabaseConnection) -> Dict[str, Any]:
@ -50,15 +56,24 @@ def get_connection(connection: MetabaseConnection) -> Dict[str, Any]:
raise SourceConnectionException(msg) from exc
def test_connection(client, _) -> None:
def test_connection(client, _) -> TestConnectionResult:
"""
Test connection
"""
try:
requests.get( # pylint: disable=missing-timeout
def custom_executor():
result = requests.get( # pylint: disable=missing-timeout
client["connection"].hostPort + "/api/dashboard",
headers=client["metabase_session"],
)
except Exception as exc:
msg = f"Unknown error connecting with {client}: {exc}."
raise SourceConnectionException(msg) from exc
return list(result)
steps = [
TestConnectionStep(
function=partial(custom_executor),
name="Get Dashboard",
),
]
return test_connection_steps(steps)

View File

@ -23,6 +23,7 @@ from metadata.generated.schema.entity.services.connections.dashboard.tableauConn
)
from metadata.ingestion.connections.test_connections import (
SourceConnectionException,
TestConnectionResult,
TestConnectionStep,
test_connection_steps,
)
@ -84,7 +85,7 @@ def get_connection(connection: TableauConnection) -> TableauServerConnection:
)
def test_connection(client: TableauServerConnection, _) -> None:
def test_connection(client: TableauServerConnection, _) -> TestConnectionResult:
"""
Test connection
"""
@ -112,4 +113,4 @@ def test_connection(client: TableauServerConnection, _) -> None:
),
]
test_connection_steps(steps)
return test_connection_steps(steps)

View File

@ -12,12 +12,19 @@
"""
Source connection handler
"""
from functools import partial
from pydomo import Domo
from metadata.generated.schema.entity.services.connections.database.domoDatabaseConnection import (
DomoDatabaseConnection,
)
from metadata.ingestion.connections.test_connections import SourceConnectionException
from metadata.ingestion.connections.test_connections import (
SourceConnectionException,
TestConnectionResult,
TestConnectionStep,
test_connection_steps,
)
def get_connection(connection: DomoDatabaseConnection) -> Domo:
@ -36,12 +43,20 @@ def get_connection(connection: DomoDatabaseConnection) -> Domo:
raise SourceConnectionException(msg)
def test_connection(domo: Domo, _) -> None:
def test_connection(domo: Domo, _) -> TestConnectionResult:
"""
Test connection
"""
try:
domo.datasets.list()
except Exception as exc:
msg = f"Unknown error connecting with {domo}: {exc}."
raise SourceConnectionException(msg)
def custom_executor():
result = domo.datasets.list()
return list(result)
steps = [
TestConnectionStep(
function=partial(custom_executor),
name="Get Tables",
),
]
return test_connection_steps(steps)

View File

@ -69,6 +69,7 @@ class DomodatabaseSource(DatabaseServiceSource):
self.domo_client = get_connection(self.service_connection)
self.client = DomoClient(self.service_connection)
super().__init__()
self.test_connection()
@classmethod
def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection):

View File

@ -12,14 +12,19 @@
"""
Source connection handler
"""
from botocore.client import ClientError
from functools import partial
from sqlalchemy.engine import Engine
from metadata.clients.aws_client import AWSClient
from metadata.generated.schema.entity.services.connections.database.glueConnection import (
GlueConnection,
)
from metadata.ingestion.connections.test_connections import SourceConnectionException
from metadata.ingestion.connections.test_connections import (
TestConnectionResult,
TestConnectionStep,
test_connection_steps,
)
def get_connection(connection: GlueConnection) -> Engine:
@ -29,17 +34,35 @@ def get_connection(connection: GlueConnection) -> Engine:
return AWSClient(connection.awsConfig).get_glue_client()
def test_connection(client, _) -> None:
def test_connection(client, _) -> TestConnectionResult:
"""
Test connection
"""
try:
paginator = client.get_paginator("get_databases")
paginator.paginate()
except ClientError as err:
msg = f"Connection error for {client}: {err}. Check the connection details."
raise SourceConnectionException(msg) from err
except Exception as exc:
msg = f"Unknown error connecting with {client}: {exc}."
raise SourceConnectionException(msg) from exc
def custom_executor_for_database():
paginator = client.get_paginator("get_databases")
return list(paginator.paginate())
def custom_executor_for_table():
paginator = client.get_paginator("get_databases")
paginator_response = paginator.paginate()
for page in paginator_response:
for schema in page["DatabaseList"]:
database_name = schema["Name"]
paginator = client.get_paginator("get_tables")
tables = paginator.paginate(DatabaseName=database_name)
return list(tables)
return None
steps = [
TestConnectionStep(
function=partial(custom_executor_for_database),
name="Get Databases and Schemas",
),
TestConnectionStep(
function=partial(custom_executor_for_table),
name="Get Tables",
),
]
return test_connection_steps(steps)

View File

@ -72,6 +72,7 @@ class GlueSource(DatabaseServiceSource):
self.status = SQLSourceStatus()
self.glue = get_connection(self.service_connection)
super().__init__()
self.test_connection()
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):

View File

@ -20,6 +20,7 @@ import oracledb
from oracledb.exceptions import DatabaseError
from pydantic import SecretStr
from sqlalchemy.engine import Engine
from sqlalchemy.inspection import inspect
from metadata.generated.schema.entity.services.connections.database.oracleConnection import (
OracleConnection,
@ -31,7 +32,11 @@ from metadata.ingestion.connections.builders import (
get_connection_args_common,
get_connection_options_dict,
)
from metadata.ingestion.connections.test_connections import test_connection_db_common
from metadata.ingestion.connections.test_connections import (
TestConnectionResult,
TestConnectionStep,
test_connection_db_common,
)
from metadata.utils.logger import ingestion_logger
CX_ORACLE_LIB_VERSION = "8.3.0"
@ -102,8 +107,25 @@ def get_connection(connection: OracleConnection) -> Engine:
)
def test_connection(engine: Engine, _) -> None:
def test_connection(engine: Engine, _) -> TestConnectionResult:
"""
Test connection
"""
test_connection_db_common(engine)
inspector = inspect(engine)
steps = [
TestConnectionStep(
function=inspector.get_schema_names,
name="Get Schemas",
),
TestConnectionStep(
function=inspector.get_table_names,
name="Get Tables",
),
TestConnectionStep(
function=inspector.get_view_names,
name="Get Views",
mandatory=False,
),
]
return test_connection_db_common(engine, steps)

View File

@ -12,9 +12,11 @@
"""
Source connection handler
"""
from functools import partial
from urllib.parse import quote_plus
from sqlalchemy.engine import Engine
from sqlalchemy.inspection import inspect
from metadata.generated.schema.entity.services.connections.database.prestoConnection import (
PrestoConnection,
@ -23,7 +25,11 @@ from metadata.ingestion.connections.builders import (
create_generic_db_connection,
get_connection_args_common,
)
from metadata.ingestion.connections.test_connections import test_connection_db_common
from metadata.ingestion.connections.test_connections import (
TestConnectionResult,
TestConnectionStep,
test_connection_db_common,
)
def get_connection_url(connection: PrestoConnection) -> str:
@ -52,8 +58,40 @@ def get_connection(connection: PrestoConnection) -> Engine:
)
def test_connection(engine: Engine, _) -> None:
def test_connection(engine: Engine, _) -> TestConnectionResult:
"""
Test connection
"""
test_connection_db_common(engine)
inspector = inspect(engine)
def custom_executor(engine, statement):
cursor = engine.execute(statement)
return list(cursor.all())
def custom_executor_for_table():
schema_name = inspector.get_schema_names()
if schema_name:
for schema in schema_name:
table_name = inspector.get_table_names(schema)
return table_name
return None
steps = [
TestConnectionStep(
function=partial(
custom_executor,
statement="SHOW CATALOGS",
engine=engine,
),
name="Get Databases",
),
TestConnectionStep(
function=inspector.get_schema_names,
name="Get Schemas",
),
TestConnectionStep(
function=partial(custom_executor_for_table),
name="Get Tables",
),
]
return test_connection_db_common(engine, steps)

View File

@ -12,7 +12,10 @@
"""
Source connection handler
"""
from functools import partial
from sqlalchemy.engine import Engine
from sqlalchemy.inspection import inspect
from metadata.generated.schema.entity.services.connections.database.singleStoreConnection import (
SingleStoreConnection,
@ -22,7 +25,11 @@ from metadata.ingestion.connections.builders import (
get_connection_args_common,
get_connection_url_common,
)
from metadata.ingestion.connections.test_connections import test_connection_db_common
from metadata.ingestion.connections.test_connections import (
TestConnectionResult,
TestConnectionStep,
test_connection_db_common,
)
def get_connection(connection: SingleStoreConnection) -> Engine:
@ -36,8 +43,52 @@ def get_connection(connection: SingleStoreConnection) -> Engine:
)
def test_connection(engine: Engine, _) -> None:
def test_connection(engine: Engine, _) -> TestConnectionResult:
"""
Test connection
"""
test_connection_db_common(engine)
inspector = inspect(engine)
def custom_executor_for_table():
schema_name = inspector.get_schema_names()
if schema_name:
for schema in schema_name:
if schema not in (
"cluster",
"information_schema",
"memsql",
"s2_dataset_martech_3db3efe6",
):
table_name = inspector.get_table_names(schema)
return table_name
return None
def custom_executor_for_view():
schema_name = inspector.get_schema_names()
if schema_name:
for schema in schema_name:
if schema not in (
"cluster",
"information_schema",
"memsql",
"s2_dataset_martech_3db3efe6",
):
view_name = inspector.get_view_names(schema)
return view_name
return None
steps = [
TestConnectionStep(
function=inspector.get_schema_names,
name="Get Schemas",
),
TestConnectionStep(
function=partial(custom_executor_for_table),
name="Get Tables",
),
TestConnectionStep(
function=partial(custom_executor_for_view),
name="Get Views",
),
]
return test_connection_db_common(engine, steps)

View File

@ -12,10 +12,12 @@
"""
Source connection handler
"""
from functools import partial
from urllib.parse import quote_plus
from requests import Session
from sqlalchemy.engine import Engine
from sqlalchemy.inspection import inspect
from metadata.generated.schema.entity.services.connections.database.trinoConnection import (
TrinoConnection,
@ -26,7 +28,12 @@ from metadata.ingestion.connections.builders import (
init_empty_connection_arguments,
)
from metadata.ingestion.connections.secrets import connection_with_options_secrets
from metadata.ingestion.connections.test_connections import test_connection_db_common
from metadata.ingestion.connections.test_connections import (
TestConnectionResult,
TestConnectionStep,
test_connection_db_common,
)
from metadata.ingestion.source.database.trino.queries import TRINO_GET_DATABASE
def get_connection_url(connection: TrinoConnection) -> str:
@ -73,8 +80,53 @@ def get_connection(connection: TrinoConnection) -> Engine:
)
def test_connection(engine: Engine, _) -> None:
def test_connection(engine: Engine, _) -> TestConnectionResult:
"""
Test connection
"""
test_connection_db_common(engine)
inspector = inspect(engine)
def custom_executor(engine, statement):
cursor = engine.execute(statement)
return list(cursor.all())
def custom_executor_for_table():
schema_name = inspector.get_schema_names()
if schema_name:
for schema in schema_name:
table_name = inspector.get_table_names(schema)
return table_name
return None
def custom_executor_for_view():
schema_name = inspector.get_schema_names()
if schema_name:
for schema in schema_name:
view_name = inspector.get_view_names(schema)
return view_name
return None
steps = [
TestConnectionStep(
function=partial(
custom_executor,
statement=TRINO_GET_DATABASE,
engine=engine,
),
name="Get Databases",
),
TestConnectionStep(
function=inspector.get_schema_names,
name="Get Schemas",
),
TestConnectionStep(
function=partial(custom_executor_for_table),
name="Get Tables",
),
TestConnectionStep(
function=partial(custom_executor_for_view),
name="Get Views",
mandatory=False,
),
]
return test_connection_db_common(engine, steps)

View File

@ -25,3 +25,7 @@ TRINO_TABLE_COMMENTS = textwrap.dedent(
and "comment" is not null
"""
)
TRINO_GET_DATABASE = """
SHOW CATALOGS
"""

View File

@ -12,7 +12,10 @@
"""
Source connection handler
"""
from functools import partial
from sqlalchemy.engine import Engine
from sqlalchemy.inspection import inspect
from metadata.generated.schema.entity.services.connections.database.verticaConnection import (
VerticaConnection,
@ -22,7 +25,12 @@ from metadata.ingestion.connections.builders import (
get_connection_args_common,
get_connection_url_common,
)
from metadata.ingestion.connections.test_connections import test_connection_db_common
from metadata.ingestion.connections.test_connections import (
TestConnectionResult,
TestConnectionStep,
test_connection_db_common,
)
from metadata.ingestion.source.database.vertica.queries import VERTICA_LIST_DATABASES
def get_connection(connection: VerticaConnection) -> Engine:
@ -36,8 +44,37 @@ def get_connection(connection: VerticaConnection) -> Engine:
)
def test_connection(engine: Engine, _) -> None:
def test_connection(engine: Engine, _) -> TestConnectionResult:
"""
Test connection
"""
test_connection_db_common(engine)
inspector = inspect(engine)
def custom_executor(engine, statement):
cursor = engine.execute(statement)
return list(cursor.all())
steps = [
TestConnectionStep(
function=partial(
custom_executor,
statement=VERTICA_LIST_DATABASES,
engine=engine,
),
name="Get Databases",
),
TestConnectionStep(
function=inspector.get_schema_names,
name="Get Schemas",
),
TestConnectionStep(
function=inspector.get_table_names,
name="Get Tables",
),
TestConnectionStep(
function=inspector.get_view_names,
name="Get Views",
mandatory=False,
),
]
return test_connection_db_common(engine, steps)

View File

@ -12,13 +12,20 @@
"""
Source connection handler
"""
from functools import partial
from pydomo import Domo
from metadata.clients.domo_client import DomoClient
from metadata.generated.schema.entity.services.connections.pipeline.domoPipelineConnection import (
DomoPipelineConnection,
)
from metadata.ingestion.connections.test_connections import SourceConnectionException
from metadata.ingestion.connections.test_connections import (
SourceConnectionException,
TestConnectionResult,
TestConnectionStep,
test_connection_steps,
)
def get_connection(connection: DomoPipelineConnection) -> Domo:
@ -32,12 +39,20 @@ def get_connection(connection: DomoPipelineConnection) -> Domo:
raise SourceConnectionException(msg)
def test_connection(connection: Domo, _) -> None:
def test_connection(connection: Domo, _) -> TestConnectionResult:
"""
Test connection
"""
try:
connection.get_pipelines()
except Exception as exc:
msg = f"Unknown error while extracting pipeline from domo: {exc}."
raise SourceConnectionException(msg)
def custom_executor():
result = connection.get_pipelines()
return list(result)
steps = [
TestConnectionStep(
function=partial(custom_executor),
name="Get Pipeline",
),
]
return test_connection_steps(steps)

View File

@ -239,7 +239,7 @@ public class AirflowRESTClient extends PipelineServiceClient {
String connectionPayload = JsonUtils.pojoToJson(testServiceConnection);
response = post(statusUrl, connectionPayload);
if (response.statusCode() == 200) {
return Response.status(200, response.body()).build();
return Response.status(response.statusCode()).entity(response.body()).build();
}
} catch (Exception e) {
throw PipelineServiceClientException.byMessage("Failed to test connection.", e.getMessage());