graphrag/tests/integration/storage/test_blob_pipeline_storage.py
Nathan Evans f5b4d2fea5
Ci streamline (#988)
* Remove excess vars from gh-pages build

* Delete redundant javascript ci

* Pull apart testing CI

* Clean up integration tests build

* Move storage tests to integration CI

* Take py 3.10 out of smoke tests matrix

* Use minimum supported python version for most tests

* Re-run main CI on any test change

* Add Josh and Kenny to author list

* Update auto-resolve perms
2024-08-21 15:16:15 -06:00

97 lines
3.5 KiB
Python

# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License
"""Blob Storage Tests."""
import re
from graphrag.index.storage.blob_pipeline_storage import BlobPipelineStorage
# cspell:disable-next-line well-known-key
WELL_KNOWN_BLOB_STORAGE_KEY = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"
async def test_find():
storage = BlobPipelineStorage(
connection_string=WELL_KNOWN_BLOB_STORAGE_KEY,
container_name="testfind",
)
try:
try:
items = list(
storage.find(base_dir="input", file_pattern=re.compile(r".*\.txt$"))
)
items = [item[0] for item in items]
assert items == []
await storage.set(
"input/christmas.txt", "Merry Christmas!", encoding="utf-8"
)
items = list(
storage.find(base_dir="input", file_pattern=re.compile(r".*\.txt$"))
)
items = [item[0] for item in items]
assert items == ["input/christmas.txt"]
await storage.set("test.txt", "Hello, World!", encoding="utf-8")
items = list(storage.find(file_pattern=re.compile(r".*\.txt$")))
items = [item[0] for item in items]
assert items == ["input/christmas.txt", "test.txt"]
output = await storage.get("test.txt")
assert output == "Hello, World!"
finally:
await storage.delete("test.txt")
output = await storage.get("test.txt")
assert output is None
finally:
storage.delete_container()
async def test_dotprefix():
storage = BlobPipelineStorage(
connection_string=WELL_KNOWN_BLOB_STORAGE_KEY,
container_name="testfind",
path_prefix=".",
)
try:
await storage.set("input/christmas.txt", "Merry Christmas!", encoding="utf-8")
items = list(storage.find(file_pattern=re.compile(r".*\.txt$")))
items = [item[0] for item in items]
assert items == ["input/christmas.txt"]
finally:
storage.delete_container()
async def test_child():
parent = BlobPipelineStorage(
connection_string=WELL_KNOWN_BLOB_STORAGE_KEY,
container_name="testchild",
)
try:
try:
storage = parent.child("input")
await storage.set("christmas.txt", "Merry Christmas!", encoding="utf-8")
items = list(storage.find(re.compile(r".*\.txt$")))
items = [item[0] for item in items]
assert items == ["christmas.txt"]
await storage.set("test.txt", "Hello, World!", encoding="utf-8")
items = list(storage.find(re.compile(r".*\.txt$")))
items = [item[0] for item in items]
print("FOUND", items)
assert items == ["christmas.txt", "test.txt"]
output = await storage.get("test.txt")
assert output == "Hello, World!"
items = list(parent.find(re.compile(r".*\.txt$")))
items = [item[0] for item in items]
print("FOUND ITEMS", items)
assert items == ["input/christmas.txt", "input/test.txt"]
finally:
await parent.delete("input/test.txt")
has_test = await parent.has("input/test.txt")
assert not has_test
finally:
parent.delete_container()