520 lines
17 KiB
Python
Raw Normal View History

# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Build and document all supported Engines
"""
import json
import logging
import traceback
2022-04-19 17:48:55 +02:00
from functools import singledispatch
from typing import Union
import requests
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.session import Session
from metadata.generated.schema.entity.services.connections.connectionBasicType import (
ConnectionArguments,
ConnectionOptions,
)
from metadata.generated.schema.entity.services.connections.dashboard.metabaseConnection import (
MetabaseConnection,
)
from metadata.generated.schema.entity.services.connections.dashboard.redashConnection import (
RedashConnection,
)
from metadata.generated.schema.entity.services.connections.dashboard.supersetConnection import (
SupersetConnection,
)
from metadata.generated.schema.entity.services.connections.dashboard.tableauConnection import (
TableauConnection,
)
2022-04-19 17:48:55 +02:00
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
BigQueryConnection,
)
from metadata.generated.schema.entity.services.connections.database.databricksConnection import (
DatabricksConnection,
)
from metadata.generated.schema.entity.services.connections.database.deltaLakeConnection import (
DeltaLakeConnection,
)
from metadata.generated.schema.entity.services.connections.database.dynamoDBConnection import (
DynamoDBConnection,
)
from metadata.generated.schema.entity.services.connections.database.glueConnection import (
GlueConnection,
)
2022-04-22 22:04:39 +05:30
from metadata.generated.schema.entity.services.connections.database.salesforceConnection import (
SalesforceConnection,
)
from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import (
SnowflakeConnection,
)
from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import (
KafkaConnection,
)
from metadata.orm_profiler.orm.functions.conn_test import ConnTestFn
from metadata.utils.connection_clients import (
DeltaLakeClient,
DynamoClient,
GlueClient,
2022-04-23 20:54:25 +05:30
KafkaClient,
MetabaseClient,
RedashClient,
SalesforceClient,
SupersetClient,
TableauClient,
)
2022-04-19 17:48:55 +02:00
from metadata.utils.credentials import set_google_credentials
from metadata.utils.source_connections import get_connection_args, get_connection_url
2022-04-12 22:14:17 +02:00
from metadata.utils.timeout import timeout
logger = logging.getLogger("Utils")
class SourceConnectionException(Exception):
"""
Raised when we cannot connect to the source
"""
def create_generic_connection(connection, verbose: bool = False) -> Engine:
"""
2022-04-19 17:48:55 +02:00
Generic Engine creation from connection object
:param connection: JSON Schema connection model
:param verbose: debugger or not
:return: SQAlchemy Engine
"""
options = connection.connectionOptions or ConnectionOptions()
engine = create_engine(
get_connection_url(connection),
**options.dict(),
connect_args=get_connection_args(connection),
echo=verbose,
)
return engine
2022-04-19 17:48:55 +02:00
@singledispatch
def get_connection(
connection, verbose: bool = False
) -> Union[Engine, DynamoClient, GlueClient, SalesforceClient, KafkaClient]:
2022-04-19 17:48:55 +02:00
"""
Given an SQL configuration, build the SQLAlchemy Engine
"""
return create_generic_connection(connection, verbose)
2022-04-19 17:48:55 +02:00
@get_connection.register
def _(connection: DatabricksConnection, verbose: bool = False):
if connection.httpPath:
if not connection.connectionArguments:
connection.connectionArguments = ConnectionArguments()
connection.connectionArguments.http_path = connection.httpPath
return create_generic_connection(connection, verbose)
@get_connection.register
def _(connection: SnowflakeConnection, verbose: bool = False) -> Engine:
if connection.privateKey:
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
snowflake_private_key_passphrase = (
connection.snowflakePrivatekeyPassphrase.get_secret_value()
if connection.snowflakePrivatekeyPassphrase
else ""
)
if not snowflake_private_key_passphrase:
logger.warning(
"Snowflake Private Key Passphrase not found, replacing it with empty string"
)
p_key = serialization.load_pem_private_key(
bytes(connection.privateKey.get_secret_value(), "utf-8"),
password=snowflake_private_key_passphrase.encode(),
backend=default_backend(),
)
pkb = p_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
if connection.privateKey:
connection.connectionArguments = dict()
connection.connectionArguments["private_key"] = pkb
return create_generic_connection(connection, verbose)
@get_connection.register
def _(connection: BigQueryConnection, verbose: bool = False) -> Engine:
2022-04-19 17:48:55 +02:00
"""
Prepare the engine and the GCS credentials
:param connection: BigQuery connection
:param verbose: debugger or not
:return: Engine
"""
set_google_credentials(gcs_credentials=connection.credentials)
return create_generic_connection(connection, verbose)
@get_connection.register
def _(connection: DynamoDBConnection, verbose: bool = False) -> DynamoClient:
from metadata.utils.aws_client import AWSClient
dynomo_connection = AWSClient(connection.awsConfig).get_dynomo_client()
return dynomo_connection
@get_connection.register
def _(connection: GlueConnection, verbose: bool = False) -> GlueClient:
from metadata.utils.aws_client import AWSClient
glue_connection = AWSClient(connection.awsConfig).get_glue_client()
return glue_connection
2022-04-19 17:48:55 +02:00
2022-04-22 22:04:39 +05:30
@get_connection.register
def _(connection: SalesforceConnection, verbose: bool = False) -> SalesforceClient:
2022-04-22 22:04:39 +05:30
from simple_salesforce import Salesforce
salesforce_connection = SalesforceClient(
Salesforce(
connection.username,
password=connection.password.get_secret_value(),
security_token=connection.securityToken.get_secret_value(),
2022-04-22 22:04:39 +05:30
)
)
return salesforce_connection
@get_connection.register
def _(connection: DeltaLakeConnection, verbose: bool = False) -> DeltaLakeClient:
import pyspark
from delta import configure_spark_with_delta_pip
builder = (
pyspark.sql.SparkSession.builder.appName(connection.appName)
.enableHiveSupport()
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
)
if connection.metastoreHostPort:
builder.config(
"hive.metastore.uris",
f"thrift://{connection.metastoreHostPort}",
)
elif connection.metastoreFilePath:
builder.config("spark.sql.warehouse.dir", f"{connection.metastoreFilePath}")
deltalake_connection = DeltaLakeClient(
configure_spark_with_delta_pip(builder).getOrCreate()
)
return deltalake_connection
@get_connection.register
def _(connection: KafkaConnection, verbose: bool = False) -> KafkaClient:
"""
Prepare Kafka Admin Client and Schema Registry Client
"""
from confluent_kafka.admin import AdminClient, ConfigResource
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.schema_registry.schema_registry_client import (
Schema,
SchemaRegistryClient,
)
admin_client_config = connection.consumerConfig
admin_client_config["bootstrap.servers"] = connection.bootstrapServers
admin_client = AdminClient(admin_client_config)
schema_registry_client = None
consumer_client = None
if connection.schemaRegistryURL:
connection.schemaRegistryConfig["url"] = connection.schemaRegistryURL
schema_registry_client = SchemaRegistryClient(connection.schemaRegistryConfig)
admin_client_config["schema.registry.url"] = connection.schemaRegistryURL
admin_client_config["group.id"] = "openmetadata-consumer-1"
admin_client_config["auto.offset.reset"] = "earliest"
admin_client_config["enable.auto.commit"] = False
consumer_client = AvroConsumer(admin_client_config)
return KafkaClient(
admin_client=admin_client,
schema_registry_client=schema_registry_client,
consumer_client=consumer_client,
)
def create_and_bind_session(engine: Engine) -> Session:
"""
Given an engine, create a session bound
to it to make our operations.
"""
session = sessionmaker()
session.configure(bind=engine)
return session()
2022-04-12 22:14:17 +02:00
@timeout(seconds=120)
@singledispatch
def test_connection(connection) -> None:
"""
Default implementation is the engine to test.
Test that we can connect to the source using the given engine
:param connection: Engine to test
:return: None or raise an exception if we cannot connect
"""
try:
with connection.connect() as conn:
conn.execute(ConnTestFn())
except OperationalError as err:
raise SourceConnectionException(
f"Connection error for {connection} - {err}. Check the connection details."
)
except Exception as err:
raise SourceConnectionException(
f"Unknown error connecting with {connection} - {err}."
)
@test_connection.register
def _(connection: DynamoClient) -> None:
"""
Test that we can connect to the source using the given aws resource
:param engine: boto service resource to test
:return: None or raise an exception if we cannot connect
"""
from botocore.client import ClientError
try:
connection.client.tables.all()
except ClientError as err:
raise SourceConnectionException(
f"Connection error for {connection} - {err}. Check the connection details."
)
except Exception as err:
raise SourceConnectionException(
f"Unknown error connecting with {connection} - {err}."
)
@test_connection.register
def _(connection: GlueClient) -> None:
"""
Test that we can connect to the source using the given aws resource
:param engine: boto cliet to test
:return: None or raise an exception if we cannot connect
"""
from botocore.client import ClientError
try:
connection.client.list_workflows()
except ClientError as err:
raise SourceConnectionException(
f"Connection error for {connection} - {err}. Check the connection details."
)
except Exception as err:
raise SourceConnectionException(
f"Unknown error connecting with {connection} - {err}."
)
2022-04-22 22:04:39 +05:30
@test_connection.register
def _(connection: SalesforceClient) -> None:
from simple_salesforce.exceptions import SalesforceAuthenticationFailed
try:
connection.client.describe()
except SalesforceAuthenticationFailed as err:
raise SourceConnectionException(
f"Connection error for {connection} - {err}. Check the connection details."
)
except Exception as err:
raise SourceConnectionException(
f"Unknown error connecting with {connection} - {err}."
)
2022-04-23 20:54:25 +05:30
@test_connection.register
def _(connection: KafkaClient) -> None:
"""
Test AdminClient.
2022-04-23 20:54:25 +05:30
If exists, test the Schema Registry client as well.
"""
2022-04-23 20:54:25 +05:30
try:
_ = connection.admin_client.list_topics().topics
if connection.schema_registry_client:
_ = connection.schema_registry_client.get_subjects()
2022-04-23 20:54:25 +05:30
except Exception as err:
raise SourceConnectionException(
f"Unknown error connecting with {connection} - {err}."
)
@test_connection.register
def _(connection: DeltaLakeClient) -> None:
try:
connection.client.catalog.listDatabases()
except Exception as err:
raise SourceConnectionException(
f"Unknown error connecting with {connection} - {err}."
)
@get_connection.register
def _(connection: MetabaseConnection, verbose: bool = False):
try:
params = dict()
params["username"] = connection.username
params["password"] = connection.password.get_secret_value()
HEADERS = {"Content-Type": "application/json", "Accept": "*/*"}
resp = requests.post(
connection.hostPort + "/api/session/",
data=json.dumps(params),
headers=HEADERS,
)
session_id = resp.json()["id"]
metabase_session = {"X-Metabase-Session": session_id}
conn = {"connection": connection, "metabase_session": metabase_session}
return MetabaseClient(conn)
except Exception as err:
logger.error(f"Failed to connect with error : {err}")
logger.debug(traceback.format_exc())
@test_connection.register
def _(connection: MetabaseClient) -> None:
try:
requests.get(
connection.client["connection"].hostPort + "/api/dashboard",
headers=connection.client["metabase_session"],
)
except Exception as err:
raise SourceConnectionException(
f"Unknown error connecting with {connection} - {err}."
)
@get_connection.register
def _(connection: RedashConnection, verbose: bool = False):
from redash_toolbelt import Redash
try:
redash = Redash(connection.hostPort, connection.apiKey.get_secret_value())
redash_client = RedashClient(redash)
return redash_client
except Exception as err:
logger.error(f"Failed to connect with error : {err}")
logger.error(err)
@test_connection.register
def _(connection: RedashClient) -> None:
try:
connection.client.dashboards()
except Exception as err:
raise SourceConnectionException(
f"Unknown error connecting with {connection} - {err}."
)
@get_connection.register
def _(connection: SupersetConnection, verbose: bool = False):
from metadata.ingestion.ometa.superset_rest import SupersetAPIClient
superset_connection = SupersetAPIClient(connection)
superset_client = SupersetClient(superset_connection)
return superset_client
@test_connection.register
def _(connection: SupersetClient) -> None:
try:
connection.client.fetch_menu()
except Exception as err:
raise SourceConnectionException(
f"Unknown error connecting with {connection} - {err}."
)
@get_connection.register
def _(connection: TableauConnection, verbose: bool = False):
from tableau_api_lib import TableauServerConnection
tableau_server_config = {
f"{connection.env}": {
"server": connection.hostPort,
"api_version": connection.apiVersion,
"site_name": connection.siteName,
"site_url": connection.siteName,
}
}
if connection.username and connection.password:
tableau_server_config[connection.env]["username"] = connection.username
tableau_server_config[connection.env][
"password"
] = connection.password.get_secret_value()
elif (
connection.personalAccessTokenName
and connection.personalAccessTokenSecret.get_secret_value()
):
tableau_server_config[connection.env][
"personal_access_token_name"
] = connection.personalAccessTokenName
tableau_server_config[connection.env][
"personal_access_token_secret"
] = connection.personalAccessTokenSecret.get_secret_value()
try:
conn = TableauServerConnection(
config_json=tableau_server_config,
env=connection.env,
)
conn.sign_in().json()
return TableauClient(conn)
except Exception as err: # pylint: disable=broad-except
logger.error("%s: %s", repr(err), err)
@test_connection.register
def _(connection: TableauClient) -> None:
from tableau_api_lib.utils.querying import get_workbooks_dataframe
try:
connection.client.server_info()
except Exception as err:
raise SourceConnectionException(
f"Unknown error connecting with {connection} - {err}."
)