ElasticSearch: Fix ES connection (#13919)

This commit is contained in:
Mayur Singal 2023-11-13 14:10:11 +05:30 committed by GitHub
parent 759bd84162
commit b34111ea00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 537 additions and 50 deletions

View File

@ -8,4 +8,21 @@ SET dqdts.json = JSON_INSERT(
)
WHERE dqdts.extension = 'testCase.testCaseResult'
AND JSON_EXTRACT(dqdts.json, '$.timestamp') REGEXP '^[0-9]{10}$'
;
;
-- update elasticsearch connection
UPDATE search_service_entity
SET json = JSON_INSERT(
JSON_REMOVE(json, '$.connection.config.caCert'),
'$.connection.config.sslConfig',
JSON_OBJECT(
'certificates',
JSON_OBJECT(
'caCertPath',
JSON_EXTRACT(json, '$.connection.config.caCert')
)
)
)
WHERE
serviceType = 'ElasticSearch'
AND JSON_EXTRACT(json, '$.connection.config.caCert') IS NOT NULL;

View File

@ -20,3 +20,23 @@ SET json = jsonb_set(
)
WHERE dqdts.extension = 'testCase.testCaseResult'
AND (json->>'timestamp') ~ '^[0-9]{10}$';
UPDATE search_service_entity
SET json = JSONB_SET(
json::jsonb,
'{connection,config}',
json::jsonb #> '{connection,config}' #- '{caCert}' ||
jsonb_build_object(
'sslConfig',
jsonb_build_object(
'certificates',
jsonb_build_object('caCertPath', json #> '{connection,config,caCert}')
)
),
true
)
WHERE
serviceType = 'ElasticSearch'
AND json #> '{connection,config,caCert}' IS NOT NULL;

View File

@ -12,21 +12,130 @@
"""
Source connection handler
"""
import ssl
from pathlib import Path
from typing import Optional
from elasticsearch8 import Elasticsearch
from httpx import create_ssl_context
from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.search.elasticSearchConnection import (
ApiAuthentication,
from metadata.generated.schema.entity.services.connections.common.sslCertPaths import (
SslCertificatesByPath,
)
from metadata.generated.schema.entity.services.connections.common.sslCertValues import (
SslCertificatesByValues,
)
from metadata.generated.schema.entity.services.connections.common.sslConfig import (
SslConfig,
)
from metadata.generated.schema.entity.services.connections.search.elasticSearch.apiAuth import (
ApiKeyAuthentication,
)
from metadata.generated.schema.entity.services.connections.search.elasticSearch.basicAuth import (
BasicAuthentication,
)
from metadata.generated.schema.entity.services.connections.search.elasticSearchConnection import (
ElasticsearchConnection,
)
from metadata.ingestion.connections.builders import init_empty_connection_arguments
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.constants import UTF_8
from metadata.utils.helpers import init_staging_dir
CA_CERT_FILE_NAME = "root.pem"
CLIENT_CERT_FILE_NAME = "client.pem"
KEY_CERT_FILE_NAME = "client_key.pem"
def _clean_cert_value(cert_data: str) -> str:
return cert_data.replace("\\n", "\n")
def write_data_to_file(file_path: Path, cert_data: str) -> None:
with open(
file_path,
"w+",
encoding=UTF_8,
) as file:
data = _clean_cert_value(cert_data)
file.write(data)
def _handle_ssl_context_by_value(ssl_config: SslConfig):
ca_cert = False
client_cert = None
private_key = None
init_staging_dir(ssl_config.certificates.stagingDir)
if ssl_config.certificates.caCertValue:
ca_cert = Path(ssl_config.certificates.stagingDir, CA_CERT_FILE_NAME)
write_data_to_file(
ca_cert, ssl_config.certificates.caCertValue.get_secret_value()
)
if ssl_config.certificates.clientCertValue:
client_cert = Path(ssl_config.certificates.stagingDir, CLIENT_CERT_FILE_NAME)
write_data_to_file(
client_cert,
ssl_config.certificates.clientCertValue.get_secret_value(),
)
if ssl_config.certificates.privateKeyValue:
private_key = Path(ssl_config.certificates.stagingDir, KEY_CERT_FILE_NAME)
write_data_to_file(
private_key,
ssl_config.certificates.privateKeyValue.get_secret_value(),
)
return ca_cert, client_cert, private_key
def _handle_ssl_context_by_path(ssl_config: SslConfig):
ca_cert = False
if ssl_config.certificates.caCertPath:
ca_cert = ssl_config.certificates.caCertPath
client_cert = ssl_config.certificates.clientCertPath
private_key = ssl_config.certificates.privateKeyPath
return ca_cert, client_cert, private_key
def get_ssl_context(ssl_config: SslConfig) -> ssl.SSLContext:
"""
Method to get SSL Context
"""
ca_cert = False
client_cert = None
private_key = None
cert_chain = None
if not ssl_config.certificates:
return None
if isinstance(ssl_config.certificates, SslCertificatesByValues):
ca_cert, client_cert, private_key = _handle_ssl_context_by_value(
ssl_config=ssl_config
)
elif isinstance(ssl_config.certificates, SslCertificatesByPath):
ca_cert, client_cert, private_key = _handle_ssl_context_by_path(
ssl_config=ssl_config
)
if client_cert and private_key:
cert_chain = (client_cert, private_key)
elif client_cert:
cert_chain = client_cert
else:
cert_chain = None
if ca_cert or cert_chain:
ssl_context = create_ssl_context(
cert=cert_chain,
verify=ca_cert,
)
return ssl_context
return ssl._create_unverified_context() # pylint: disable=protected-access
def get_connection(connection: ElasticsearchConnection) -> Elasticsearch:
@ -35,6 +144,7 @@ def get_connection(connection: ElasticsearchConnection) -> Elasticsearch:
"""
basic_auth = None
api_key = None
ssl_context = None
if (
isinstance(connection.authType, BasicAuthentication)
and connection.authType.username
@ -46,7 +156,7 @@ def get_connection(connection: ElasticsearchConnection) -> Elasticsearch:
else None,
)
if isinstance(connection.authType, ApiAuthentication):
if isinstance(connection.authType, ApiKeyAuthentication):
if connection.authType.apiKeyId and connection.authType.apiKey:
api_key = (
connection.authType.apiKeyId,
@ -58,12 +168,15 @@ def get_connection(connection: ElasticsearchConnection) -> Elasticsearch:
if not connection.connectionArguments:
connection.connectionArguments = init_empty_connection_arguments()
if connection.sslConfig:
ssl_context = get_ssl_context(connection.sslConfig)
return Elasticsearch(
connection.hostPort,
http_auth=basic_auth,
api_key=api_key,
ca_certs=connection.caCert,
**connection.connectionArguments.__root__
ssl_context=ssl_context,
**connection.connectionArguments.__root__,
)

View File

@ -11,6 +11,8 @@
"""
Elasticsearch source to extract metadata
"""
import shutil
from pathlib import Path
from typing import Any, Iterable, Optional
from elasticsearch8 import Elasticsearch
@ -124,3 +126,11 @@ class ElasticsearchSource(SearchServiceSource):
),
)
)
def close(self):
try:
if Path(self.service_connection.sslConfig.certificates.stagingDir).exists():
shutil.rmtree(self.service_connection.sslConfig.certificates.stagingDir)
except AttributeError:
pass
return super().close()

View File

@ -121,6 +121,8 @@ class BaseWorkflow(ABC, WorkflowStatusMixin):
except Exception as exc:
logger.warning(f"Error trying to close the step {step} due to [{exc}]")
self.source.close()
@property
def timer(self) -> RepeatedTimer:
"""

View File

@ -51,7 +51,34 @@ We extract ElasticSearch's metadata by using its [API](https://www.elastic.co/gu
2. API Key Authentication
- API Key: API Key to connect to ElasticSearch required when API Key Authentication is enabled on ElasticSearch.
- API Key Id: Enter API Key ID In case of API Key Authentication if there is any API Key ID associated with the API Key, otherwise this field can be left blank.
- **Client Certificate Path**: In case the SSL is enabled on your ElasticSearch instance and CA certificate is required for authentication, then specify the path of certificate in this field. NOTE: In case of docker deployment you need to store this certificate accessible to OpenMetadata Ingestion docker container, you can do it via copying the certificate to the docker container or store it in the volume associate with the OpenMetadata Ingestion container.
- **SSL Certificates**:
1. SSL Certificates By Path
- CA Certificate Path: This field specifies the path of CA certificate required for authentication.
- Client Certificate Path: This field specifies the path of Clint certificate required for authentication.
- Private Key Path: This field specifies the path of Clint Key/Private Key required for authentication.
2. SSL Certificates By Value
- CA Certificate Value: This field specifies the value of CA certificate required for authentication.
- Client Certificate Value: This field specifies the value of Clint certificate required for authentication.
- Private Key Value: This field specifies the value of Clint Key/Private Key required for authentication.
- Staging Directory Path: This field specifies the path to temporary staging directory, where the certificates will be stored temporarily during the ingestion process, which will de deleted once the ingestion job is over.
- when you are using this approach make sure you are passing the key in a correct format. If your certificate looks like this:
```
-----BEGIN CERTIFICATE-----
MII..
MBQ...
CgU..
8Lt..
...
h+4=
-----END CERTIFICATE-----
```
You will have to replace new lines with `\n` and the final value that you need to pass should look like this:
```
-----BEGIN CERTIFICATE-----\nMII..\nMBQ...\nCgU..\n8Lt..\n...\nh+4=\n-----END CERTIFICATE-----\n
- **Connection Timeout in Seconds**: Connection timeout configuration for communicating with ElasticSearch APIs.
{% /extraContent %}

View File

@ -78,7 +78,34 @@ This is a sample config for ElasticSearch:
{% /codeInfo %}
{% codeInfo srNumber=4 %}
**caCert**: In case the SSL is enabled on your ElasticSearch instance and CA certificate is required for authentication, then specify the path of certificate in this field. NOTE: In case of docker deployment you need to store this certificate accessible to OpenMetadata Ingestion docker container, you can do it via copying the certificate to the docker container or store it in the volume associate with the OpenMetadata Ingestion container.
- **sslConfig**:
1. SSL Certificates By Path
- caCertPath: This field specifies the path of CA certificate required for authentication.
- clientCertPath: This field specifies the path of Clint certificate required for authentication.
- privateKeyPath: This field specifies the path of Clint Key/Private Key required for authentication.
2. SSL Certificates By Value
- caCertValue: This field specifies the value of CA certificate required for authentication.
- clientCertValue: This field specifies the value of Clint certificate required for authentication.
- privateKeyValue: This field specifies the value of Clint Key/Private Key required for authentication.
- stagingDir: This field specifies the path to temporary staging directory, where the certificates will be stored temporarily during the ingestion process, which will de deleted once the ingestion job is over.
- when you are using this approach make sure you are passing the key in a correct format. If your certificate looks like this:
```
-----BEGIN CERTIFICATE-----
MII..
MBQ...
CgU..
8Lt..
...
h+4=
-----END CERTIFICATE-----
```
You will have to replace new lines with `\n` and the final value that you need to pass should look like this:
```
-----BEGIN CERTIFICATE-----\nMII..\nMBQ...\nCgU..\n8Lt..\n...\nh+4=\n-----END CERTIFICATE-----\n
{% /codeInfo %}
@ -117,7 +144,17 @@ source:
# apiKey: <api key>
```
```yaml {% srNumber=4 %}
caCert: /path/to/http_ca.crt
sslConfig:
certificates:
caCertPath: /path/to/http_ca.crt
clientCertPath: /path/to/http_ca.crt
privateKeyPath: /path/to/http_ca.crt
# pass certificate values
# caCertValue: -----BEGIN CERTIFICATE-----\n....\n.....\n-----END CERTIFICATE-----\n
# clientCertValue: -----BEGIN CERTIFICATE-----\n....\n...-----END CERTIFICATE-----\n
# privateKeyValue: -----BEGIN RSA PRIVATE KEY-----\n....\n....\n-----END RSA PRIVATE KEY-----\n
# stagingDir: /tmp/stage
```
```yaml {% srNumber=5 %}
connectionTimeoutSecs: 30

View File

@ -32,6 +32,7 @@ import org.openmetadata.schema.services.connections.database.TrinoConnection;
import org.openmetadata.schema.services.connections.database.datalake.GCSConfig;
import org.openmetadata.schema.services.connections.metadata.OpenMetadataConnection;
import org.openmetadata.schema.services.connections.pipeline.AirflowConnection;
import org.openmetadata.schema.services.connections.search.ElasticSearchConnection;
import org.openmetadata.schema.services.connections.storage.GcsConnection;
/** Factory class to get a `ClassConverter` based on the service class. */
@ -56,6 +57,7 @@ public final class ClassConverterFactory {
Map.entry(GCSConfig.class, new GCPConfigClassConverter()),
Map.entry(GCPCredentials.class, new GcpCredentialsClassConverter()),
Map.entry(GcsConnection.class, new GcpConnectionClassConverter()),
Map.entry(ElasticSearchConnection.class, new ElasticSearchConnectionClassConverter()),
Map.entry(LookerConnection.class, new LookerConnectionClassConverter()),
Map.entry(OpenMetadataConnection.class, new OpenMetadataConnectionClassConverter()),
Map.entry(SSOAuthMechanism.class, new SSOAuthMechanismClassConverter()),

View File

@ -0,0 +1,41 @@
/*
* Copyright 2021 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.service.secrets.converter;
import java.util.List;
import org.openmetadata.schema.services.connections.search.ElasticSearchConnection;
import org.openmetadata.schema.services.connections.search.elasticSearch.ESAPIAuth;
import org.openmetadata.schema.services.connections.search.elasticSearch.ESBasicAuth;
import org.openmetadata.service.util.JsonUtils;
/** Converter class to get an `ElasticSearchConnection` object. */
public class ElasticSearchConnectionClassConverter extends ClassConverter {
private static final List<Class<?>> CONFIG_SOURCE_CLASSES = List.of(ESBasicAuth.class, ESAPIAuth.class);
//
public ElasticSearchConnectionClassConverter() {
super(ElasticSearchConnection.class);
}
@Override
public Object convert(Object object) {
ElasticSearchConnection elasticSearchConnection =
(ElasticSearchConnection) JsonUtils.convertValue(object, this.clazz);
tryToConvert(elasticSearchConnection.getAuthType(), CONFIG_SOURCE_CLASSES)
.ifPresent(elasticSearchConnection::setAuthType);
return elasticSearchConnection;
}
}

View File

@ -23,6 +23,7 @@ import org.openmetadata.schema.services.connections.database.TrinoConnection;
import org.openmetadata.schema.services.connections.database.datalake.GCSConfig;
import org.openmetadata.schema.services.connections.metadata.OpenMetadataConnection;
import org.openmetadata.schema.services.connections.pipeline.AirflowConnection;
import org.openmetadata.schema.services.connections.search.ElasticSearchConnection;
import org.openmetadata.schema.services.connections.storage.GcsConnection;
public class ClassConverterFactoryTest {
@ -39,6 +40,7 @@ public class ClassConverterFactoryTest {
DbtPipeline.class,
GCSConfig.class,
GcsConnection.class,
ElasticSearchConnection.class,
LookerConnection.class,
OpenMetadataConnection.class,
SSOAuthMechanism.class,
@ -55,6 +57,6 @@ public class ClassConverterFactoryTest {
@Test
void testClassConvertedMapIsNotModified() {
assertEquals(18, ClassConverterFactory.getConverterMap().size());
assertEquals(19, ClassConverterFactory.getConverterMap().size());
}
}

View File

@ -0,0 +1,26 @@
{
"$id": "https://open-metadata.org/schema/entity/services/connections/search/elasticSearch/apiAuth.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "SSL Certificates By Path",
"description": "SSL Certificates By Path",
"javaType": "org.openmetadata.schema.services.common.SSLCertPaths",
"type": "object",
"properties": {
"caCertPath": {
"title": "CA Certificate Path",
"description": "CA Certificate Path",
"type": "string"
},
"clientCertPath": {
"title": "Client Certificate Path",
"description": "Client Certificate Path",
"type": "string"
},
"privateKeyPath": {
"title": "Private Key Path",
"description": "Private Key Path",
"type": "string"
}
},
"additionalProperties": false
}

View File

@ -0,0 +1,35 @@
{
"$id": "https://open-metadata.org/schema/entity/services/connections/search/elasticSearch/apiAuth.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "SSL Certificates By Values",
"description": "SSL Certificates By Values",
"javaType": "org.openmetadata.schema.services.common.SSLCertValues",
"type": "object",
"properties": {
"caCertValue": {
"title": "CA Certificate Value",
"description": "CA Certificate Value",
"type": "string",
"format": "password"
},
"clientCertValue": {
"title": "Client Certificate Value",
"description": "Client Certificate Value",
"type": "string",
"format": "password"
},
"privateKeyValue": {
"title": "Private Key Value",
"description": "Private Key Value",
"type": "string",
"format": "password"
},
"stagingDir": {
"title": "Staging Directory Path",
"description": "Staging Directory Path",
"type": "string",
"default": "/tmp/openmetadata-certs"
}
},
"additionalProperties": false
}

View File

@ -0,0 +1,26 @@
{
"$id": "https://open-metadata.org/schema/entity/services/connections/search/elasticSearch/apiAuth.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "SSL Config",
"description": "SSL Config",
"javaType": "org.openmetadata.schema.services.common.SSLConfig",
"type": "object",
"properties": {
"certificates": {
"type":"object",
"title": "SSL Certificates",
"description": "SSL Certificates",
"oneOf": [
{
"$ref": "./sslCertPaths.json"
},
{
"$ref": "./sslCertValues.json"
}
]
}
},
"additionalProperties": false
}

View File

@ -0,0 +1,22 @@
{
"$id": "https://open-metadata.org/schema/entity/services/connections/search/elasticSearch/apiAuth.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "API Key Authentication",
"description": "API Key Authentication for ElasticSearch",
"javaType": "org.openmetadata.schema.services.connections.search.elasticSearch.ESAPIAuth",
"type": "object",
"properties": {
"apiKey": {
"title": "API Key",
"description": "Elastic Search API Key for API Authentication",
"type": "string",
"format": "password"
},
"apiKeyId": {
"title": "API Key ID",
"description": "Elastic Search API Key ID for API Authentication",
"type": "string"
}
},
"additionalProperties": false
}

View File

@ -0,0 +1,22 @@
{
"$id": "https://open-metadata.org/schema/entity/services/connections/search/elasticSearch/basicAuth.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Basic Authentication",
"description": "Basic Auth Configuration for ElasticSearch",
"javaType": "org.openmetadata.schema.services.connections.search.elasticSearch.ESBasicAuth",
"type": "object",
"properties": {
"username": {
"title": "Username",
"description": "Elastic Search Username for Login",
"type": "string"
},
"password": {
"title": "Password",
"description": "Elastic Search Password for Login",
"type": "string",
"format": "password"
}
},
"additionalProperties": false
}

View File

@ -11,40 +11,6 @@
"type": "string",
"enum": ["ElasticSearch"],
"default": "ElasticSearch"
},
"basicAuthentication": {
"title": "Basic Authentication",
"properties": {
"username": {
"title": "Username",
"description": "Elastic Search Username for Login",
"type": "string"
},
"password": {
"title": "Password",
"description": "Elastic Search Password for Login",
"type": "string",
"format": "password"
}
},
"type": "object"
},
"apiAuthentication": {
"title": "API Key Authentication",
"type": "object",
"properties": {
"apiKey": {
"title": "API Key",
"description": "Elastic Search API Key for API Authentication",
"type": "string",
"format": "password"
},
"apiKeyId": {
"title": "API Key ID",
"description": "Elastic Search API Key ID for API Authentication",
"type": "string"
}
}
}
},
"properties": {
@ -65,17 +31,16 @@
"description": "Choose Auth Config Type.",
"oneOf": [
{
"$ref": "#/definitions/basicAuthentication"
"$ref": "./elasticSearch/basicAuth.json"
},
{
"$ref": "#/definitions/apiAuthentication"
"$ref": "./elasticSearch/apiAuth.json"
}
]
},
"caCert": {
"title": "Client Certificate Path",
"description": "Path to CA Cert File",
"type": "string"
"sslConfig": {
"title": "SSL Config",
"$ref": "../common/sslConfig.json"
},
"connectionTimeoutSecs": {
"title": "Connection Timeout in Seconds",

View File

@ -27,3 +27,123 @@ $$section
### Password $(id="password")
Password of the user account to connect with ElasticSearch.
$$
$$section
### API Key $(id="apiKey")
API Key to connect to ElasticSearch required when API Key Authentication is enabled on ElasticSearch.
$$
$$section
### API Key ID $(id="apiKeyId")
Enter API Key ID In case of API Key Authentication if there is any API Key ID associated with the API Key, otherwise this field can be left blank.
$$
$$section
### SSL Certificates $(id="certificates")
If you have SSL/TLS enable on for your ElasticSearch you will need to pass the relevant SSL certificates in order to communicate with the ElasticSearch instance. You can either provide the where these certificates are stored or you can provide the direct value of these certificates.
$$
$$section
### CA Certificate Path $(id="caCertPath")
This field specifies the path of CA certificate required for authentication.
$$
$$section
### Client Certificate Path $(id="clientCertPath")
This field specifies the path of Clint certificate required for authentication.
$$
$$section
### Private Key Path $(id="privateKeyPath")
This field specifies the path of Clint Key/Private Key required for authentication.
$$
$$section
### CA Certificate Value $(id="caCertValue")
This field specifies the value of CA certificate required for authentication.
Make sure you are passing the value of certificate in a correct format. If your certificate looks like this:
```
-----BEGIN CERTIFICATE-----
MII..
MBQ...
CgU..
8Lt..
...
h+4=
-----END CERTIFICATE-----
```
You will have to replace new lines with `\n` and the final value that you need to pass should look like this:
```
-----BEGIN CERTIFICATE-----\nMII..\nMBQ...\nCgU..\n8Lt..\n...\nh+4=\n-----END CERTIFICATE-----\n
```
$$
$$section
### Client Certificate Value $(id="clientCertValue")
This field specifies the value of client certificate required for authentication.
Make sure you are passing the value in a correct format. If your certificate looks like this:
```
-----BEGIN CERTIFICATE-----
MII..
MBQ...
CgU..
8Lt..
...
h+4=
-----END CERTIFICATE-----
```
You will have to replace new lines with `\n` and the final value that you need to pass should look like this:
```
-----BEGIN CERTIFICATE-----\nMII..\nMBQ...\nCgU..\n8Lt..\n...\nh+4=\n-----END CERTIFICATE-----\n
```
$$
$$section
### Private Key Value $(id="privateKeyValue")
This field specifies the value of private key required for authentication.
Make sure you are passing the key in a correct format. If your certificate looks like this:
```
-----BEGIN RSA PRIVATE KEY-----
MII..
MBQ...
CgU..
8Lt..
...
h+4=
-----END RSA PRIVATE KEY-----
```
You will have to replace new lines with `\n` and the final private key that you need to pass should look like this:
```
-----BEGIN CERTIFICATE-----\nMII..\nMBQ...\nCgU..\n8Lt..\n...\nh+4=\n-----END CERTIFICATE-----\n
```
$$
$$section
### Staging Directory Path $(id="stagingDir")
This field specifies the path to temporary staging directory, where the certificates will be stored temporarily during the ingestion process, which will de deleted once the ingestion job is over.
$$