diff --git a/ingestion/setup.py b/ingestion/setup.py index 8841f5f673c..b8c018e0833 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -81,7 +81,7 @@ plugins: Dict[str, Set[str]] = { }, "dbt": {"google-cloud", "boto3", "google-cloud-storage==1.43.0"}, "druid": {"pydruid>=0.6.2"}, - "elasticsearch": {"elasticsearch==7.13.1"}, + "elasticsearch": {"elasticsearch==7.13.1", "requests-aws4auth==1.1.2"}, "glue": {"boto3~=1.19.12"}, "dynamodb": {"boto3~=1.19.12"}, "hive": { diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch.py b/ingestion/src/metadata/ingestion/sink/elasticsearch.py index e546e773146..f95bedbb909 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch.py @@ -16,8 +16,10 @@ import traceback from datetime import datetime from typing import List, Optional -from elasticsearch import Elasticsearch +import boto3 +from elasticsearch import Elasticsearch, RequestsHttpConnection from elasticsearch.connection import create_ssl_context +from requests_aws4auth import AWS4Auth from metadata.config.common import ConfigModel from metadata.generated.schema.entity.data.dashboard import Dashboard @@ -128,6 +130,8 @@ class ElasticSearchConfig(ConfigModel): timeout: int = 30 ca_certs: Optional[str] = None recreate_indexes: Optional[bool] = False + use_AWS_credentials: Optional[bool] = False + region_name: Optional[str] = None class ElasticsearchSink(Sink[Entity]): @@ -173,6 +177,18 @@ class ElasticsearchSink(Sink[Entity]): ssl_context=ssl_context, ca_certs=self.config.ca_certs, ) + if self.config.use_AWS_credentials: + credentials = boto3.Session().get_credentials() + region_from_boto3 = boto3.Session().region_name() + http_auth = AWS4Auth( + region=self.config.region_name + if self.config.region_name + else region_from_boto3, + service="es", + refreshable_credentials=credentials, + ) + self.elasticsearch_client.http_auth = http_auth + self.elasticsearch_client.connection_class = RequestsHttpConnection if self.config.index_tables: self._check_or_create_index(