feat: Add support for exasol datasource (#17166)

* Add flake.nix

* Add lockfile for flake

* Update nix environment and document usage

* Add schema for exasol connector

* Add Exasol definitions to databaseService

* Fix error in exasol connector schema

* Add additional connection options/settings to exasol connector

* Add exasol-connector to ui

* Add depdencies for exasol-connector

* Update notes

* Update ingestion code

* Add Basic Documentation for Exasol Connector

* Update flake file

* Add developer notes

* Add python script which can be used as entry point for debugging in ide

* Add config file which can be used for debugging (manual execution)

* Update debug script

* Update developer notes

* Remove old developer notes

* Add .venv to gitignore

* Update dev notes

* Update development notes

* Update ExasolSource

* Establish basic connection to Exasol DB from connector

* Update exasol connector connection settings

* Add service_spec for exasol plugin

* Remove development files

* Remove unused module

* Applied code formatter

* Update exasol dependency constraint(s)

* Add unit test for exasol connection url(s)

* Fixed test expectations for exasol connection url test(s)

* Adjust the test query for the Exasol connection test
This commit is contained in:
Nicola Coretti 2024-10-31 08:11:30 +01:00 committed by GitHub
parent a2dda5b287
commit 7ebc62dca7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 340 additions and 1 deletions

1
.gitignore vendored
View File

@ -11,6 +11,7 @@
# Created by .ignore support plugin (hsz.mobi)
# Maven
.venv
__pycache__
target/
pom.xml.tag

View File

@ -224,6 +224,7 @@ plugins: Dict[str, Set[str]] = {
"elasticsearch": {
VERSIONS["elasticsearch8"],
}, # also requires requests-aws4auth which is in base
"exasol": {"sqlalchemy_exasol>=5,<6"},
"glue": {VERSIONS["boto3"]},
"great-expectations": {VERSIONS["great-expectations"]},
"greenplum": {*COMMONS["postgres"]},

View File

@ -0,0 +1,87 @@
from typing import Optional
from urllib.parse import quote_plus
from pydantic import SecretStr
from sqlalchemy.engine import Engine
from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.database.exasolConnection import (
ExasolConnection,
)
from metadata.ingestion.connections.builders import (
create_generic_db_connection,
get_connection_args_common,
)
from metadata.ingestion.connections.test_connections import test_query
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
def get_connection_url(connection: ExasolConnection) -> str:
"""
Common method for building the source connection urls
"""
url = f"{connection.scheme.value}://"
if connection.username:
url += f"{quote_plus(connection.username)}"
connection.password = (
SecretStr("") if not connection.password else connection.password
)
url += (
f":{quote_plus(connection.password.get_secret_value())}"
if connection
else ""
)
url += "@"
url += connection.hostPort
if hasattr(connection, "databaseSchema"):
url += f"/{connection.databaseSchema}" if connection.databaseSchema else ""
tls_settings = {
"validate-certificate": {},
"ignore-certificate": {"SSLCertificate": "SSL_VERIFY_NONE"},
"disable-tls": {"SSLCertificate": "SSL_VERIFY_NONE", "ENCRYPTION": "no"},
}
options = tls_settings[connection.tls.value]
if options:
if (hasattr(connection, "database") and not connection.database) or (
hasattr(connection, "databaseSchema") and not connection.databaseSchema
):
url += "/"
params = "&".join(
f"{key}={quote_plus(value)}" for (key, value) in options.items() if value
)
url = f"{url}?{params}"
return url
def get_connection(connection: ExasolConnection) -> Engine:
"""
Create connection
"""
return create_generic_db_connection(
connection=connection,
get_connection_url_fn=get_connection_url,
get_connection_args_fn=get_connection_args_common,
)
def test_connection(
metadata: OpenMetadata,
engine: Engine,
service_connection: ExasolConnection,
automation_workflow: Optional[AutomationWorkflow] = None,
) -> None:
"""
Test connection. This can be executed either as part
of a metadata workflow or during an Automation Workflow
"""
test_query(engine, "SELECT 1;")

View File

@ -0,0 +1,27 @@
from typing import Optional, cast
from metadata.generated.schema.entity.services.connections.database.exasolConnection import (
ExasolConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
class ExasolSource(CommonDbSourceService):
@classmethod
def create(
cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None
):
config: WorkflowSource = WorkflowSource.model_validate(config_dict)
if config.serviceConnection is None:
raise InvalidSourceException("Missing service connection")
connection = cast(ExasolConnection, config.serviceConnection.root.config)
if not isinstance(connection, ExasolConnection):
raise InvalidSourceException(
f"Expected ExasolConnection, but got {connection}"
)
return cls(config, metadata)

View File

@ -0,0 +1,4 @@
from metadata.ingestion.source.database.exasol.metadata import ExasolSource
from metadata.utils.service_spec.default import DefaultDatabaseSpec
ServiceSpec = DefaultDatabaseSpec(metadata_source_class=ExasolSource)

View File

@ -37,6 +37,12 @@ from metadata.generated.schema.entity.services.connections.database.druidConnect
DruidConnection,
DruidScheme,
)
from metadata.generated.schema.entity.services.connections.database.exasolConnection import (
ExasolConnection,
ExasolScheme,
ExasolType,
Tls,
)
from metadata.generated.schema.entity.services.connections.database.hiveConnection import (
HiveConnection,
HiveScheme,
@ -1178,3 +1184,61 @@ class SourceConnectionTest(TestCase):
),
)
assert get_connection_url(oracle_conn_obj) == expected_url
def test_exasol_url(self):
from metadata.ingestion.source.database.exasol.connection import (
get_connection_url,
)
def generate_test_data(
username="admin", password="password", port=8563, hostname="localhost"
):
from collections import namedtuple
TestData = namedtuple("TestData", ["comment", "kwargs", "expected"])
host_port = f"{hostname}:{port}"
yield from (
TestData(
comment="Testing default parameters",
kwargs={
"username": username,
"password": password,
"hostPort": host_port,
"tls": Tls.validate_certificate,
},
expected="exa+websocket://admin:password@localhost:8563",
),
TestData(
comment="Testing the manual setting of parameters",
kwargs={
"type": ExasolType.Exasol,
"scheme": ExasolScheme.exa_websocket,
"username": username,
"password": password,
"hostPort": host_port,
"tls": Tls.ignore_certificate,
},
expected="exa+websocket://admin:password@localhost:8563?SSLCertificate=SSL_VERIFY_NONE",
),
TestData(
comment="Testing disabling TLS completely",
kwargs={
"type": ExasolType.Exasol,
"scheme": ExasolScheme.exa_websocket,
"username": username,
"password": password,
"hostPort": host_port,
"tls": Tls.disable_tls,
},
expected="exa+websocket://admin:password@localhost:8563?SSLCertificate=SSL_VERIFY_NONE&ENCRYPTION=no",
),
)
# execute test cases
for data in generate_test_data():
with self.subTest(kwargs=data.kwargs, expected=data.expected):
connection = ExasolConnection(**data.kwargs)
actual = get_connection_url(connection)
expected = data.expected
assert actual == expected

View File

@ -0,0 +1,86 @@
{
"$id": "https://open-metadata.org/schema/entity/services/connections/database/exasolConnection.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ExasolConnection",
"description": "Exasol Database Connection Config",
"type": "object",
"javaType": "org.openmetadata.schema.services.connections.database.ExasolConnection",
"definitions": {
"exasolType": {
"description": "Service type.",
"type": "string",
"enum": [
"Exasol"
],
"default": "Exasol"
},
"exasolScheme": {
"description": "SQLAlchemy driver scheme options.",
"type": "string",
"enum": [
"exa+websocket"
],
"default": "exa+websocket"
}
},
"properties": {
"type": {
"title": "Service Type",
"description": "Service Type",
"$ref": "#/definitions/exasolType",
"default": "Exasol"
},
"scheme": {
"title": "Connection Scheme",
"description": "SQLAlchemy driver scheme options.",
"$ref": "#/definitions/exasolScheme",
"default": "exa+websocket"
},
"username": {
"title": "Username",
"description": "Username to connect to Exasol. This user should have privileges to read all the metadata in Exasol.",
"type": "string"
},
"password": {
"title": "Password",
"description": "Password to connect to Exasol.",
"type": "string",
"format": "password"
},
"hostPort": {
"title": "Host and Port",
"description": "Host and port of the source service.",
"type": "string",
"default": "127.0.0.1:8563"
},
"tls": {
"title": "SSL/TLS Settings",
"description": "Client SSL/TLS settings.",
"type": "string",
"enum": [
"disable-tls",
"ignore-certificate",
"validate-certificate"
],
"default": "validate-certificate"
},
"connectionOptions": {
"title": "Connection Options",
"$ref": "../connectionBasicType.json#/definitions/connectionOptions"
},
"connectionArguments": {
"title": "Connection Arguments",
"$ref": "../connectionBasicType.json#/definitions/connectionArguments"
},
"supportsMetadataExtraction": {
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
}
},
"additionalProperties": false,
"required": [
"hostPort",
"username",
"password"
]
}

View File

@ -57,7 +57,8 @@
"Iceberg",
"Teradata",
"SapErp",
"Synapse"
"Synapse",
"Exasol"
],
"javaEnums": [
{
@ -188,6 +189,9 @@
},
{
"name": "Synapse"
},
{
"name": "Exasol"
}
]
},
@ -323,6 +327,9 @@
},
{
"$ref": "./connections/database/synapseConnection.json"
},
{
"$ref": "./connections/database/exasolConnection.json"
}
]
}

View File

@ -0,0 +1,50 @@
# Exasol
In this section, we provide guides and references for using the Exasol connector.
## Requirements
* Exasol >= 7.1
## Connection Details
$$section
### Connection Scheme $(id="scheme")
SQLAlchemy driver scheme options.
$$
$$section
### Username $(id="username")
Username to connect to Exasol. This user should have privileges to read all the metadata in Exasol.
$$
$$section
### Password $(id="password")
Password of the user connecting to Exasol.
$$
$$section
### Host and Port $(id="hostPort")
This parameter specifies the host and port of the Exasol instance. This should be specified as a string in the format `hostname:port`. For example, you might set the hostPort parameter to `localhost:8563`.
If you are running the OpenMetadata ingestion in a docker and your services are hosted on the `localhost`, then use `host.docker.internal:8563` as the value.
$$
$$section
### SSL/TLS Settings $(id="tls")
Mode/setting for SSL validation:
#### validate-certificate (**default**)
Uses Transport Layer Security (TLS) and validates the server certificate using system certificate stores.
#### ignore-certificate
Uses Transport Layer Security (TLS) but disables the validation of the server certificate. This should not be used in production. It can be useful during testing with self-signed certificates.
#### disable-tls
Does not use any Transport Layer Security (TLS). Data will be sent in plain text (no encryption).
While this may be helpful in rare cases of debugging, make sure you do not use this in production.

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.2 KiB

View File

@ -36,6 +36,7 @@ import domo from '../assets/img/service-icon-domo.png';
import doris from '../assets/img/service-icon-doris.png';
import druid from '../assets/img/service-icon-druid.png';
import dynamodb from '../assets/img/service-icon-dynamodb.png';
import exasol from '../assets/img/service-icon-exasol.png';
import fivetran from '../assets/img/service-icon-fivetran.png';
import flink from '../assets/img/service-icon-flink.png';
import gcs from '../assets/img/service-icon-gcs.png';
@ -186,6 +187,7 @@ export const ALATIONSINK = alationsink;
export const SAS = sas;
export const OPENLINEAGE = openlineage;
export const LOGO = logo;
export const EXASOL = exasol;
export const AIRFLOW = airflow;
export const PREFECT = prefect;

View File

@ -29,6 +29,7 @@ import domoDatabaseConnection from '../jsons/connectionSchemas/connections/datab
import dorisConnection from '../jsons/connectionSchemas/connections/database/dorisConnection.json';
import druidConnection from '../jsons/connectionSchemas/connections/database/druidConnection.json';
import dynamoDBConnection from '../jsons/connectionSchemas/connections/database/dynamoDBConnection.json';
import exasolConnection from '../jsons/connectionSchemas/connections/database/exasolConnection.json';
import glueConnection from '../jsons/connectionSchemas/connections/database/glueConnection.json';
import greenplumConnection from '../jsons/connectionSchemas/connections/database/greenplumConnection.json';
import hiveConnection from '../jsons/connectionSchemas/connections/database/hiveConnection.json';
@ -121,6 +122,11 @@ export const getDatabaseConfig = (type: DatabaseServiceType) => {
break;
}
case DatabaseServiceType.Exasol: {
schema = exasolConnection;
break;
}
case DatabaseServiceType.Glue: {
schema = glueConnection;

View File

@ -41,6 +41,7 @@ import {
DRUID,
DYNAMODB,
ELASTIC_SEARCH,
EXASOL,
FIVETRAN,
FLINK,
GCS,
@ -311,6 +312,9 @@ class ServiceUtilClassBase {
case this.DatabaseServiceTypeSmallCase.DynamoDB:
return DYNAMODB;
case this.DatabaseServiceTypeSmallCase.Exasol:
return EXASOL;
case this.DatabaseServiceTypeSmallCase.SingleStore:
return SINGLESTORE;