Ahmet Melek fd293b3e78
feat: add elasticsearch destination connector (#2152)
Closes https://github.com/Unstructured-IO/unstructured/issues/1842
Closes https://github.com/Unstructured-IO/unstructured/issues/2202
Closes https://github.com/Unstructured-IO/unstructured/issues/2203

This PR:
- Adds Elasticsearch destination connector to be able to ingest
documents from any supported source, embed them and write the embeddings
/ documents into Elasticsearch.
- Defines an example unstructured elements schema for users to be able
to setup their unstructured elasticsearch indexes easily.
- Includes parallelized upload and lazy processing for elasticsearch
destination connector.
- Rearranges elasticsearch test helpers to source, destination, and
common folders.
- Adds util functions to be able to batch iterables in a lazy way for
uploads
- Fixes a bug where removing the optional parameter `--fields` broke the
connector due to an integer processing error.
- Fixes a bug where using an [elasticsearch
config](8fa5cbf036/unstructured/ingest/connector/elasticsearch.py (L26-L35))
for a destination connector resulted in a serialization issue when
optional parameter `--fields` was not provided.
2023-12-20 01:26:58 +00:00

38 lines
1.2 KiB
Python

import os
DATA_PATH = "scripts/elasticsearch-test-helpers/source_connector/wiki_movie_plots_small.csv"
CLUSTER_URL = "http://localhost:9200"
INDEX_NAME = "movies"
USER = os.environ["ELASTIC_USER"]
PASSWORD = os.environ["ELASTIC_PASSWORD"]
MAPPINGS = {
"properties": {
"title": {"type": "text", "analyzer": "english"},
"ethnicity": {"type": "text", "analyzer": "standard"},
"director": {"type": "text", "analyzer": "standard"},
"cast": {"type": "text", "analyzer": "standard"},
"genre": {"type": "text", "analyzer": "standard"},
"plot": {"type": "text", "analyzer": "english"},
"year": {"type": "integer"},
"wiki_page": {"type": "keyword"},
},
}
def form_elasticsearch_doc_dict(i, csv_row):
return {
"_index": INDEX_NAME,
"_id": i,
"_source": {
"title": csv_row["Title"],
"ethnicity": csv_row["Origin/Ethnicity"],
"director": csv_row["Director"],
"cast": csv_row["Cast"],
"genre": csv_row["Genre"],
"plot": csv_row["Plot"],
"year": csv_row["Release Year"],
"wiki_page": csv_row["Wiki Page"],
},
}