mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2026-01-07 13:07:22 +00:00
Feature: Cassandra SSL (#19226)
This commit is contained in:
parent
8599aab9b4
commit
4bd8994a00
@ -13,6 +13,11 @@ source:
|
||||
# token: <Token String>
|
||||
# requestTimeout: <Timeout in seconds>
|
||||
# connectTimeout: <Timeout in seconds>
|
||||
# sslMode: allow
|
||||
# sslConfig:
|
||||
# caCertificate: "CA certificate content"
|
||||
# sslCertificate: "SSL certificate content"
|
||||
# sslKey: "SSL key content"
|
||||
hostPort: localhost:9042
|
||||
sourceConfig:
|
||||
config:
|
||||
|
||||
@ -34,6 +34,7 @@ from metadata.generated.schema.entity.services.connections.database.cassandraCon
|
||||
from metadata.generated.schema.entity.services.connections.testConnectionResult import (
|
||||
TestConnectionResult,
|
||||
)
|
||||
from metadata.ingestion.connections.builders import init_empty_connection_arguments
|
||||
from metadata.ingestion.connections.test_connections import test_connection_steps
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.ingestion.source.database.cassandra.queries import (
|
||||
@ -77,7 +78,14 @@ def get_connection(connection: CassandraConnection):
|
||||
password=connection.authType.password.get_secret_value(),
|
||||
)
|
||||
|
||||
cluster = Cluster(**cluster_config)
|
||||
connection.connectionArguments = (
|
||||
connection.connectionArguments or init_empty_connection_arguments()
|
||||
)
|
||||
|
||||
cluster = Cluster(
|
||||
**cluster_config,
|
||||
ssl_context=connection.connectionArguments.root.get("ssl_context"),
|
||||
)
|
||||
session = cluster.connect()
|
||||
|
||||
return session
|
||||
|
||||
@ -9,7 +9,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Iceberg source helpers.
|
||||
Cassandra source helpers.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
@ -57,6 +57,7 @@ class CassandraColumnParser:
|
||||
data_type = None
|
||||
array_data_type = None
|
||||
raw_data_type = ""
|
||||
|
||||
for letter in field.type:
|
||||
if letter == "<":
|
||||
if raw_data_type in ("", "frozen"):
|
||||
@ -71,12 +72,10 @@ class CassandraColumnParser:
|
||||
array_data_type = cls.datatype_mapping.get(
|
||||
raw_data_type.lower(), DataType.UNKNOWN
|
||||
)
|
||||
raw_data_type = ""
|
||||
if data_type != DataType.ARRAY:
|
||||
break
|
||||
|
||||
elif letter != ">":
|
||||
raw_data_type += letter
|
||||
raw_data_type = ""
|
||||
if data_type != DataType.ARRAY or array_data_type:
|
||||
break
|
||||
|
||||
elif letter == ">":
|
||||
if not array_data_type and data_type:
|
||||
@ -84,18 +83,18 @@ class CassandraColumnParser:
|
||||
raw_data_type.lower(), DataType.UNKNOWN
|
||||
)
|
||||
break
|
||||
else:
|
||||
if not data_type:
|
||||
data_type = cls.datatype_mapping.get(
|
||||
field.type.lower(), DataType.UNKNOWN
|
||||
)
|
||||
|
||||
else:
|
||||
raw_data_type += letter
|
||||
|
||||
if not data_type:
|
||||
data_type = cls.datatype_mapping.get(field.type.lower(), DataType.UNKNOWN)
|
||||
|
||||
column_def = {
|
||||
"name": field.column_name,
|
||||
"dataTypeDisplay": field.type,
|
||||
"dataType": data_type,
|
||||
"arrayDataType": array_data_type,
|
||||
}
|
||||
if array_data_type:
|
||||
column_def["arrayDataType"] = array_data_type
|
||||
|
||||
return Column(**column_def)
|
||||
|
||||
@ -17,6 +17,7 @@ import os
|
||||
import tempfile
|
||||
import traceback
|
||||
from functools import singledispatch, singledispatchmethod
|
||||
from ssl import CERT_REQUIRED, SSLContext
|
||||
from typing import Optional, Union, cast
|
||||
|
||||
from pydantic import SecretStr
|
||||
@ -27,6 +28,9 @@ from metadata.generated.schema.entity.services.connections.connectionBasicType i
|
||||
from metadata.generated.schema.entity.services.connections.dashboard.qlikSenseConnection import (
|
||||
QlikSenseConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.database.cassandraConnection import (
|
||||
CassandraConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.database.dorisConnection import (
|
||||
DorisConnection,
|
||||
)
|
||||
@ -55,6 +59,7 @@ from metadata.generated.schema.entity.services.connections.pipeline.matillionCon
|
||||
MatillionConnection,
|
||||
)
|
||||
from metadata.generated.schema.security.ssl import verifySSLConfig
|
||||
from metadata.generated.schema.security.ssl.verifySSLConfig import SslMode
|
||||
from metadata.ingestion.connections.builders import init_empty_connection_arguments
|
||||
from metadata.ingestion.models.custom_pydantic import CustomSecretStr
|
||||
from metadata.ingestion.source.connections import get_connection
|
||||
@ -206,6 +211,25 @@ class SSLManager:
|
||||
] = self.cert_file_path
|
||||
return connection
|
||||
|
||||
@setup_ssl.register(CassandraConnection)
|
||||
def _(self, connection):
|
||||
connection = cast(CassandraConnection, connection)
|
||||
|
||||
ssl_context = None
|
||||
if connection.sslMode != SslMode.disable:
|
||||
ssl_context = SSLContext()
|
||||
ssl_context.load_verify_locations(cafile=self.ca_file_path)
|
||||
ssl_context.verify_mode = CERT_REQUIRED
|
||||
ssl_context.load_cert_chain(
|
||||
certfile=self.cert_file_path, keyfile=self.key_file_path
|
||||
)
|
||||
|
||||
connection.connectionArguments = (
|
||||
connection.connectionArguments or init_empty_connection_arguments()
|
||||
)
|
||||
connection.connectionArguments.root["ssl_context"] = ssl_context
|
||||
return connection
|
||||
|
||||
|
||||
@singledispatch
|
||||
def check_ssl_and_init(_) -> Optional[SSLManager]:
|
||||
@ -289,6 +313,17 @@ def _(connection):
|
||||
return None
|
||||
|
||||
|
||||
@check_ssl_and_init.register(CassandraConnection)
|
||||
def _(connection):
|
||||
service_connection = cast(CassandraConnection, connection)
|
||||
ssl: Optional[verifySSLConfig.SslConfig] = service_connection.sslConfig
|
||||
if ssl and (ssl.root.caCertificate or ssl.root.sslCertificate or ssl.root.sslKey):
|
||||
return SSLManager(
|
||||
ca=ssl.root.caCertificate, cert=ssl.root.sslCertificate, key=ssl.root.sslKey
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
def get_ssl_connection(service_config):
|
||||
try:
|
||||
# To be cleaned up as part of https://github.com/open-metadata/OpenMetadata/issues/15913
|
||||
|
||||
@ -18,6 +18,7 @@ from metadata.generated.schema.security.client.openMetadataJWTClientConfig impor
|
||||
OpenMetadataJWTClientConfig,
|
||||
)
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.ingestion.source.database.cassandra.metadata import CassandraSource
|
||||
from metadata.ingestion.source.messaging.kafka.metadata import KafkaSource
|
||||
from metadata.utils.ssl_manager import SSLManager
|
||||
|
||||
@ -124,3 +125,73 @@ class KafkaSourceSSLTest(TestCase):
|
||||
kafka_source_with_ssl.service_connection.schemaRegistrySSL.root.sslCertificate.get_secret_value(),
|
||||
"sslCertificateData",
|
||||
)
|
||||
|
||||
|
||||
class CassandraSourceSSLTest(TestCase):
|
||||
@patch("metadata.utils.ssl_manager.SSLManager.setup_ssl")
|
||||
@patch(
|
||||
"metadata.ingestion.source.database.cassandra.metadata.CassandraSource.test_connection"
|
||||
)
|
||||
@patch("metadata.ingestion.source.database.cassandra.connection.get_connection")
|
||||
def test_init(self, get_connection, test_connection, setup_ssl):
|
||||
get_connection.return_value = True
|
||||
test_connection.return_value = True
|
||||
setup_ssl.side_effect = lambda x: x
|
||||
|
||||
config = WorkflowSource(
|
||||
**{
|
||||
"type": "cassandra",
|
||||
"serviceName": "local_cassandra",
|
||||
"serviceConnection": {
|
||||
"config": {
|
||||
"type": "Cassandra",
|
||||
"hostPort": "localhost:9042",
|
||||
}
|
||||
},
|
||||
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
|
||||
}
|
||||
)
|
||||
metadata = OpenMetadata(
|
||||
OpenMetadataConnection(
|
||||
hostPort="http://localhost:8585/api",
|
||||
authProvider="openmetadata",
|
||||
securityConfig=OpenMetadataJWTClientConfig(jwtToken="token"),
|
||||
)
|
||||
)
|
||||
cassandra_source = CassandraSource(config, metadata)
|
||||
self.assertIsNone(cassandra_source.ssl_manager)
|
||||
|
||||
config_with_ssl = WorkflowSource(
|
||||
**{
|
||||
"type": "cassandra",
|
||||
"serviceName": "local_cassandra",
|
||||
"serviceConnection": {
|
||||
"config": {
|
||||
"type": "Cassandra",
|
||||
"hostPort": "localhost:9042",
|
||||
"sslConfig": {
|
||||
"caCertificate": "caCertificateData",
|
||||
"sslKey": "sslKeyData",
|
||||
"sslCertificate": "sslCertificateData",
|
||||
},
|
||||
"sslMode": "allow",
|
||||
},
|
||||
},
|
||||
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
|
||||
}
|
||||
)
|
||||
cassandra_source_with_ssl = CassandraSource(config_with_ssl, metadata)
|
||||
|
||||
self.assertIsNotNone(cassandra_source_with_ssl.ssl_manager)
|
||||
self.assertEqual(
|
||||
cassandra_source_with_ssl.service_connection.sslConfig.root.caCertificate.get_secret_value(),
|
||||
"caCertificateData",
|
||||
)
|
||||
self.assertEqual(
|
||||
cassandra_source_with_ssl.service_connection.sslConfig.root.sslKey.get_secret_value(),
|
||||
"sslKeyData",
|
||||
)
|
||||
self.assertEqual(
|
||||
cassandra_source_with_ssl.service_connection.sslConfig.root.sslCertificate.get_secret_value(),
|
||||
"sslCertificateData",
|
||||
)
|
||||
|
||||
@ -18,6 +18,7 @@ Configure and schedule Cassandra metadata workflows from the OpenMetadata UI:
|
||||
|
||||
- [Requirements](#requirements)
|
||||
- [Metadata Ingestion](#metadata-ingestion)
|
||||
- [Enable Security](#securing-cassandra-connection-with-ssl-in-openmetadata)
|
||||
|
||||
{% partial file="/v1.7/connectors/ingestion-modes-tiles.md" variables={yamlPath: "/connectors/database/cassandra/yaml"} /%}
|
||||
|
||||
@ -59,6 +60,19 @@ Configuration for connecting to DataStax Astra DB in the cloud.
|
||||
- **token**: The Astra DB application token used for authentication.
|
||||
- **secureConnectBundle**: File path to the Secure Connect Bundle (.zip) used for a secure connection to DataStax Astra DB.
|
||||
|
||||
**SSL Modes**
|
||||
|
||||
There are a couple of types of SSL modes that Cassandra supports which can be added to ConnectionArguments, they are as follows:
|
||||
- **disable**: SSL is disabled and the connection is not encrypted.
|
||||
- **allow**: SSL is used if the server requires it.
|
||||
- **prefer**: SSL is used if the server supports it.
|
||||
- **require**: SSL is required.
|
||||
- **verify-ca**: SSL must be used and the server certificate must be verified.
|
||||
- **verify-full**: SSL must be used. The server certificate must be verified, and the server hostname must match the hostname attribute on the certificate.
|
||||
|
||||
**SSL Configuration**
|
||||
|
||||
In order to integrate SSL in the Metadata Ingestion Config, the user will have to add the SSL config under sslConfig which is placed in the source.
|
||||
|
||||
{% /extraContent %}
|
||||
|
||||
@ -70,6 +84,18 @@ Configuration for connecting to DataStax Astra DB in the cloud.
|
||||
|
||||
{% /stepsContainer %}
|
||||
|
||||
## Securing Cassandra Connection with SSL in OpenMetadata
|
||||
|
||||
To establish secure connections between OpenMetadata and a Cassandra database, you can use any SSL mode provided by Cassandra, except disable.
|
||||
|
||||
Under `Advanced Config`, after selecting the SSL mode, provide the CA certificate, SSL certificate and SSL key.
|
||||
|
||||
{% image
|
||||
src="/images/v1.7/connectors/ssl_connection.png"
|
||||
alt="SSL Configuration"
|
||||
height="450px"
|
||||
caption="SSL Configuration" /%}
|
||||
|
||||
{% partial file="/v1.7/connectors/troubleshooting.md" /%}
|
||||
|
||||
{% partial file="/v1.7/connectors/database/related.md" /%}
|
||||
|
||||
@ -17,6 +17,8 @@ Configure and schedule Cassandra metadata workflows from the OpenMetadata UI:
|
||||
|
||||
- [Requirements](#requirements)
|
||||
- [Metadata Ingestion](#metadata-ingestion)
|
||||
- [Enable Security](#securing-cassandra-connection-with-ssl-in-openmetadata)
|
||||
|
||||
|
||||
{% partial file="/v1.7/connectors/ingestion-modes-tiles.md" variables={yamlPath: "/connectors/database/cassandra/yaml"} /%}
|
||||
|
||||
@ -102,8 +104,36 @@ Configuration for connecting to DataStax Astra DB in the cloud.
|
||||
|
||||
{% partial file="/v1.7/connectors/yaml/workflow-config-def.md" /%}
|
||||
|
||||
#### Advanced Configuration
|
||||
|
||||
{% codeInfo srNumber=6 %}
|
||||
|
||||
**Connection Arguments (Optional)**: Enter the details for any additional connection arguments such as security or protocol configs that can be sent to database during the connection. These details must be added as Key-Value pairs.
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
{% codeInfo srNumber=7 %}
|
||||
|
||||
The sslConfig and sslMode are used to configure the SSL (Secure Sockets Layer) connection between your application and the PostgreSQL server.
|
||||
|
||||
- **caCertificate**: Provide the path to ssl ca file.
|
||||
|
||||
- **sslCertificate**: Provide the path to ssl client certificate file (ssl_cert).
|
||||
|
||||
- **sslKey**: Provide the path to ssl client certificate file (ssl_key).
|
||||
|
||||
**sslMode**: This field controls whether a secure SSL/TLS connection will be negotiated with the server. There are several modes you can choose:
|
||||
|
||||
disable: No SSL/TLS encryption will be used; the data sent over the network is not encrypted.
|
||||
allow: The driver will try to negotiate a non-SSL connection but if the server insists on SSL, it will switch to SSL.
|
||||
prefer (the default): The driver will try to negotiate an SSL connection but if the server does not support SSL, it will switch to a non-SSL connection.
|
||||
require: The driver will try to negotiate an SSL connection. If the server does not support SSL, the driver will not fall back to a non-SSL connection.
|
||||
verify-ca: The driver will negotiate an SSL connection and verify that the server certificate is issued by a trusted certificate authority (CA).
|
||||
verify-full: The driver will negotiate an SSL connection, verify that the server certificate is issued by a trusted CA and check that the server host name matches the one in the certificate.
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
{% /codeInfoContainer %}
|
||||
|
||||
@ -135,6 +165,17 @@ source:
|
||||
requestTimeout: <Timeout in seconds>
|
||||
connectTimeout: <Timeout in seconds>
|
||||
```
|
||||
```yaml {% srNumber=6 %}
|
||||
# connectionArguments:
|
||||
# key: value
|
||||
```
|
||||
```yaml {% srNumber=7 %}
|
||||
# sslConfig:
|
||||
# caCertificate: "path/to/ca/certificate"
|
||||
# sslCertificate: "path/to/ssl/certificate"
|
||||
# sslKey: "path/to/ssl/key"
|
||||
# sslMode: disable #allow prefer require verify-ca verify-full
|
||||
```
|
||||
|
||||
|
||||
{% partial file="/v1.7/connectors/yaml/database/source-config.md" /%}
|
||||
@ -148,3 +189,17 @@ source:
|
||||
{% /codePreview %}
|
||||
|
||||
{% partial file="/v1.7/connectors/yaml/ingestion-cli.md" /%}
|
||||
|
||||
## Securing Cassandra Connection with SSL in OpenMetadata
|
||||
|
||||
To establish secure connections between OpenMetadata and a Cassandra database, you can use any SSL mode provided by Cassandra, except disable.
|
||||
|
||||
Under `Advanced Config`, after selecting the SSL mode, provide the CA certificate, SSL certificate and SSL key.
|
||||
|
||||
```yaml
|
||||
sslMode: allow
|
||||
sslConfig:
|
||||
caCertificate: "/path/to/ca/certificate"
|
||||
sslCertificate: "/path/to/ssl/certificate"
|
||||
sslKey: "/path/to/ssl/key"
|
||||
```
|
||||
@ -49,6 +49,18 @@
|
||||
"description": "Optional name to give to the database in OpenMetadata. If left blank, we will use default as the database name.",
|
||||
"type": "string"
|
||||
},
|
||||
"connectionArguments": {
|
||||
"title": "Connection Arguments",
|
||||
"$ref": "../connectionBasicType.json#/definitions/connectionArguments"
|
||||
},
|
||||
"sslMode": {
|
||||
"title": "SSL Mode",
|
||||
"$ref": "../../../../security/ssl/verifySSLConfig.json#/definitions/sslMode"
|
||||
},
|
||||
"sslConfig": {
|
||||
"title": "SSL Configuration",
|
||||
"$ref": "../../../../security/ssl/verifySSLConfig.json#/definitions/sslConfig"
|
||||
},
|
||||
"supportsMetadataExtraction": {
|
||||
"title": "Supports Metadata Extraction",
|
||||
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
|
||||
|
||||
@ -29,13 +29,27 @@ $$
|
||||
|
||||
$$section
|
||||
### Cloud Config $(id="cloudConfig")
|
||||
|
||||
Configuration for connecting to DataStax Astra DB in the cloud.
|
||||
Configuration settings required when connecting to DataStax Astra DB in the cloud environment. These settings help establish and maintain secure connections to your cloud-hosted Cassandra database.
|
||||
$$
|
||||
|
||||
- connectTimeout: Timeout in seconds for establishing new connections to Cassandra.
|
||||
- requestTimeout: Timeout in seconds for individual Cassandra requests.
|
||||
- token: The Astra DB application token used for authentication.
|
||||
- secureConnectBundle: File path to the Secure Connect Bundle (.zip) used for a secure connection to DataStax Astra DB.
|
||||
$$section
|
||||
### Connect Timeout $(id="connectTimeout")
|
||||
Specifies the timeout duration in seconds for establishing new connections to Cassandra. This setting helps control how long the system should wait when attempting to create a new connection before timing out.
|
||||
$$
|
||||
|
||||
$$section
|
||||
### Request Timeout $(id="requestTimeout")
|
||||
Defines the timeout duration in seconds for individual Cassandra requests. This setting determines how long each query or operation should wait for a response before timing out.
|
||||
$$
|
||||
|
||||
$$section
|
||||
### Token $(id="token")
|
||||
The authentication token required for connecting to DataStax Astra DB. This token serves as the security credential for accessing your cloud database instance.
|
||||
$$
|
||||
|
||||
$$section
|
||||
### Secure Connect Bundle $(id="secureConnectBundle")
|
||||
The file path to the Secure Connect Bundle (.zip) file. This bundle contains the necessary certificates and configuration files required to establish a secure connection to your DataStax Astra DB instance.
|
||||
$$
|
||||
|
||||
$$section
|
||||
@ -46,3 +60,21 @@ Database Service > Database > Schema > Table
|
||||
```
|
||||
In the case of Cassandra, we won't have a Keyspace/Database as such. If you'd like to see your data in a database named something other than `default`, you can specify the name in this field.
|
||||
$$
|
||||
|
||||
$$section
|
||||
### Connection Arguments $(id="connectionArguments")
|
||||
Additional connection arguments that can be sent to the service during connection.
|
||||
$$
|
||||
|
||||
$$section
|
||||
### SSL Mode $(id="sslMode")
|
||||
SSL Mode to connect to Cassandra instance. By default, SSL is disabled.
|
||||
$$
|
||||
|
||||
$$section
|
||||
### SSL Configuration $(id="sslConfig")
|
||||
SSL Configuration for the Cassandra connection. This is required when SSL Mode is enabled.
|
||||
- `CA Certificate`: Path to the CA certificate file.
|
||||
- `SSL Certificate`: Path to the client certificate file.
|
||||
- `SSL Key`: Path to the client private key file.
|
||||
$$
|
||||
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2024 Collate.
|
||||
* Copyright 2025 Collate.
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
@ -17,7 +17,8 @@ export interface CassandraConnection {
|
||||
/**
|
||||
* Choose Auth Config Type.
|
||||
*/
|
||||
authType?: AuthConfigurationType;
|
||||
authType?: AuthConfigurationType;
|
||||
connectionArguments?: { [key: string]: any };
|
||||
/**
|
||||
* Optional name to give to the database in OpenMetadata. If left blank, we will use default
|
||||
* as the database name.
|
||||
@ -28,6 +29,8 @@ export interface CassandraConnection {
|
||||
* host when using the `cassandra+srv` scheme.
|
||||
*/
|
||||
hostPort?: string;
|
||||
sslConfig?: Config;
|
||||
sslMode?: SSLMode;
|
||||
supportsMetadataExtraction?: boolean;
|
||||
/**
|
||||
* Service Type
|
||||
@ -82,6 +85,38 @@ export interface DataStaxAstraDBConfiguration {
|
||||
[property: string]: any;
|
||||
}
|
||||
|
||||
/**
|
||||
* Client SSL configuration
|
||||
*
|
||||
* OpenMetadata Client configured to validate SSL certificates.
|
||||
*/
|
||||
export interface Config {
|
||||
/**
|
||||
* The CA certificate used for SSL validation.
|
||||
*/
|
||||
caCertificate?: string;
|
||||
/**
|
||||
* The SSL certificate used for client authentication.
|
||||
*/
|
||||
sslCertificate?: string;
|
||||
/**
|
||||
* The private key associated with the SSL certificate.
|
||||
*/
|
||||
sslKey?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* SSL Mode to connect to database.
|
||||
*/
|
||||
export enum SSLMode {
|
||||
Allow = "allow",
|
||||
Disable = "disable",
|
||||
Prefer = "prefer",
|
||||
Require = "require",
|
||||
VerifyCA = "verify-ca",
|
||||
VerifyFull = "verify-full",
|
||||
}
|
||||
|
||||
/**
|
||||
* Service Type
|
||||
*
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user