David Potter bc791d53f4
feat: add opensearch source and destination connector (#2349)
Adds OpenSearch as a source and destination.

Since OpenSearch is a fork of Elasticsearch, these connectors rely
heavily on inheriting the Elasticsearch connectors whenever possible.

- Adds OpenSearch source connector to be able to ingest documents from
OpenSearch.
- Adds OpenSearch destination connector to be able to ingest documents
from any supported source, embed them and write the embeddings /
documents into OpenSearch.
- Defines an example unstructured elements schema for users to be able
to setup their unstructured OpenSearch indexes easily.

---------

Co-authored-by: potter-potter <david.potter@gmail.com>
2024-01-17 04:31:49 +00:00

46 lines
1.5 KiB
Python
Executable File

#!/usr/bin/env python3
import sys
from opensearchpy import OpenSearch
N_ELEMENTS = 5
EXPECTED_TEXT = "To Whom it May Concern:"
if __name__ == "__main__":
print("Connecting to the OpenSearch cluster.")
client = OpenSearch(
hosts=[{"host": "localhost", "port": 9200}],
http_auth=("admin", "admin"),
use_ssl=True,
verify_certs=False,
ssl_show_warn=False,
)
print(client.info())
initial_query = {"query": {"simple_query_string": {"fields": ["text"], "query": EXPECTED_TEXT}}}
initial_result = client.search(index="ingest-test-destination", body=initial_query)
initial_embeddings = initial_result["hits"]["hits"][0]["_source"]["embeddings"]
query = {"size": 1, "query": {"knn": {"embeddings": {"vector": initial_embeddings, "k": 1}}}}
vector_search = client.search(index="ingest-test-destination", body=query)
try:
assert vector_search["hits"]["hits"][0]["_source"]["text"] == EXPECTED_TEXT
print("OpenSearch vector search test was successful.")
except AssertionError:
sys.exit(
"OpenSearch dest check failed:" f"Did not find {EXPECTED_TEXT} in via vector search."
)
count = int(client.count(index="ingest-test-destination")["count"])
try:
assert count == N_ELEMENTS
except AssertionError:
sys.exit(
"OpenSearch dest check failed:"
f"got {count} items in index, expected {N_ELEMENTS} items in index."
)
print(f"OpenSearch destination test was successful with {count} items being uploaded.")