61 lines
2.0 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
import time
from opensearchpy import OpenSearch
N_ELEMENTS = 5
EXPECTED_TEXT = "To Whom it May Concern:"
if __name__ == "__main__":
print("Connecting to the OpenSearch cluster.")
client = OpenSearch(
hosts=[{"host": "localhost", "port": 9247}],
http_auth=("admin", "admin"),
use_ssl=True,
verify_certs=False,
ssl_show_warn=False,
)
print(client.info())
initial_query = {"query": {"simple_query_string": {"fields": ["text"], "query": EXPECTED_TEXT}}}
initial_embeddings = None
timeout_s = 9
sleep_s = 1
start = time.time()
found = False
while time.time() - start < timeout_s and not found:
results = client.search(index="ingest-test-destination", body=initial_query)
hits = results["hits"]["hits"]
if hits:
print(f"found results after {time.time() - start}s")
initial_embeddings = hits[0]["_source"]["embeddings"]
found = True
break
print(f"Waiting {sleep_s}s before checking again")
time.sleep(sleep_s)
if not found:
raise TimeoutError(
f"timed out after {round(timeout_s, 3)}s trying to get results from opensearch"
)
query = {"size": 1, "query": {"knn": {"embeddings": {"vector": initial_embeddings, "k": 1}}}}
vector_search = client.search(index="ingest-test-destination", body=query)
found_text = vector_search["hits"]["hits"][0]["_source"]["text"]
assert found_text == EXPECTED_TEXT, (
f"OpenSearch dest check failed: Did not find "
f"{EXPECTED_TEXT} in via vector search, instead: {found_text}."
)
print("OpenSearch vector search test was successful.")
count = client.count(index="ingest-test-destination")["count"]
assert int(count) == N_ELEMENTS, "OpenSearch dest check failed:"
f"got {count} items in index, expected {N_ELEMENTS} items in index."
print(f"OpenSearch destination test was successful with {count} items being uploaded.")