Support for AWS-delegated credentials for Elasticsearch (#6644)

* Support for AWS-delegated credentials for Elasticsearch

* FIX: Based on comments

* FIX: Based on comments

* FIX: Based on comments
This commit is contained in:
Milan Bariya 2022-08-09 12:36:16 +05:30 committed by GitHub
parent cf2cb6d531
commit c38ab8c8aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 18 additions and 2 deletions

View File

@ -81,7 +81,7 @@ plugins: Dict[str, Set[str]] = {
},
"dbt": {"google-cloud", "boto3", "google-cloud-storage==1.43.0"},
"druid": {"pydruid>=0.6.2"},
"elasticsearch": {"elasticsearch==7.13.1"},
"elasticsearch": {"elasticsearch==7.13.1", "requests-aws4auth==1.1.2"},
"glue": {"boto3~=1.19.12"},
"dynamodb": {"boto3~=1.19.12"},
"hive": {

View File

@ -16,8 +16,10 @@ import traceback
from datetime import datetime
from typing import List, Optional
from elasticsearch import Elasticsearch
import boto3
from elasticsearch import Elasticsearch, RequestsHttpConnection
from elasticsearch.connection import create_ssl_context
from requests_aws4auth import AWS4Auth
from metadata.config.common import ConfigModel
from metadata.generated.schema.entity.data.dashboard import Dashboard
@ -128,6 +130,8 @@ class ElasticSearchConfig(ConfigModel):
timeout: int = 30
ca_certs: Optional[str] = None
recreate_indexes: Optional[bool] = False
use_AWS_credentials: Optional[bool] = False
region_name: Optional[str] = None
class ElasticsearchSink(Sink[Entity]):
@ -173,6 +177,18 @@ class ElasticsearchSink(Sink[Entity]):
ssl_context=ssl_context,
ca_certs=self.config.ca_certs,
)
if self.config.use_AWS_credentials:
credentials = boto3.Session().get_credentials()
region_from_boto3 = boto3.Session().region_name()
http_auth = AWS4Auth(
region=self.config.region_name
if self.config.region_name
else region_from_boto3,
service="es",
refreshable_credentials=credentials,
)
self.elasticsearch_client.http_auth = http_auth
self.elasticsearch_client.connection_class = RequestsHttpConnection
if self.config.index_tables:
self._check_or_create_index(