Roman Isecke b777864296
feat: Migrate over fsspec connectors (#3066)
### Description
Move over all fsspec connectors to the new framework

Variety of bug fixes found and fixed in this PR as well:
* custom json mixin being used for the enhanced dataclass would break if
typing was quoted. That was fixed. A check was also added to the
enhanced dataclass to prevent `InitVar` from being used in the root
dataclass since this breaks serialization.
* hashing for partitioner was using the filename of the raw file being
partitioned rather than the file name of the file data generated from
indexing. This means that mutliple files could result in the same
partition hash when recursive flag is passed in. This was updated to use
the file data file name instead.

---------

Co-authored-by: ryannikolaidis <1208590+ryannikolaidis@users.noreply.github.com>
Co-authored-by: rbiseck3 <rbiseck3@users.noreply.github.com>
2024-06-05 19:12:06 +00:00

61 lines
2.0 KiB
Python
Executable File

#!/usr/bin/env python3
import time
from opensearchpy import OpenSearch
N_ELEMENTS = 5
EXPECTED_TEXT = "To Whom it May Concern:"
if __name__ == "__main__":
print("Connecting to the OpenSearch cluster.")
client = OpenSearch(
hosts=[{"host": "localhost", "port": 9247}],
http_auth=("admin", "admin"),
use_ssl=True,
verify_certs=False,
ssl_show_warn=False,
)
print(client.info())
initial_query = {"query": {"simple_query_string": {"fields": ["text"], "query": EXPECTED_TEXT}}}
initial_embeddings = None
timeout_s = 9
sleep_s = 1
start = time.time()
found = False
while time.time() - start < timeout_s and not found:
results = client.search(index="ingest-test-destination", body=initial_query)
hits = results["hits"]["hits"]
if hits:
print(f"found results after {time.time() - start}s")
initial_embeddings = hits[0]["_source"]["embeddings"]
found = True
break
print(f"Waiting {sleep_s}s before checking again")
time.sleep(sleep_s)
if not found:
raise TimeoutError(
f"timed out after {round(timeout_s, 3)}s trying to get results from opensearch"
)
query = {"size": 1, "query": {"knn": {"embeddings": {"vector": initial_embeddings, "k": 1}}}}
vector_search = client.search(index="ingest-test-destination", body=query)
found_text = vector_search["hits"]["hits"][0]["_source"]["text"]
assert found_text == EXPECTED_TEXT, (
f"OpenSearch dest check failed: Did not find "
f"{EXPECTED_TEXT} in via vector search, instead: {found_text}."
)
print("OpenSearch vector search test was successful.")
count = client.count(index="ingest-test-destination")["count"]
assert int(count) == N_ELEMENTS, "OpenSearch dest check failed:"
f"got {count} items in index, expected {N_ELEMENTS} items in index."
print(f"OpenSearch destination test was successful with {count} items being uploaded.")