graphrag/tests/integration/storage/test_cosmosdb_storage.py

110 lines
3.6 KiB
Python
Raw Normal View History

Add Cosmos DB storage/cache option (#1431) * added cosmosdb constructor and database methods * added rest of abstract method headers * added cosmos db container methods * implemented has and delete methods * finished implementing abstract class methods * integrated class into storage factory * integrated cosmosdb class into cache factory * added support for new config file fields * replaced primary key cosmosdb initialization with connection strings * modified cosmosdb setter to require json * Fix non-default emitters * Format * Ruff * ruff * first successful run of cosmosdb indexing * removed extraneous container_name setting * require base_dir to be typed as str * reverted merged changed from closed branch * removed nested try statement * readded initial non-parquet emitter fix * added basic support for parquet emitter using internal conversions * merged with main and resolved conflicts * fixed more merge conflicts * added cosmosdb functionality to query pipeline * tested query for cosmosdb * collapsed cosmosdb schema to use minimal containers and databases * simplified create_database and create_container functions * ruff fixes and semversioner * spellcheck and ci fixes * updated pyproject toml and lock file * apply fixes after merge from main * add temporary comments * refactor cache factory * refactored storage factory * minor formatting * update dictionary * fix spellcheck typo * fix default value * fix pydantic model defaults * update pydantic models * fix init_content * cleanup how factory passes parameters to file storage * remove unnecessary output file type * update pydantic model * cleanup code * implemented clear method * fix merge from main * add test stub for cosmosdb * regenerate lock file * modified set method to collapse parquet rows * modified get method to collapse parquet rows * updated has and delete methods and docstrings to adhere to new schema * added prefix helper function * replaced delimiter for prefixed id * verified empty tests are passing * fix merges from main * add find test * update cicd step name * tested querying for new schema * resolved errors from merge conflicts * refactored set method to handle cache in new schema * refactored get method to handle cache in new schema * force unique ids to be written to cosmos for nodes * found bug with has and delete methods * modified has and delete to work with cache in new schema * fix the merge from main * minor typo fixes * update lock file * spellcheck fix * fix init function signature * minor formatting updates * remove https protocol * change localhost to 127.0.0.1 address * update pytest to use bacj engine * verified cache tests * improved speed of has function * resolved pytest error with find function * added test for child method * make container_name variable private as _container_name * minor variable name fix * cleanup cosmos pytest and make the cosmosdb storage class operations more efficient * update cicd to use different cosmosdb emulator * test with http protocol * added pytest for clear() * add longer timeout for cosmosdb emulator startup * revert http connection back to https * add comments to cicd code for future dev usage * set to container and database clients to none upon deletion * ruff changes * add comments to cicd code * removed unneeded None statements and ruff fixes * more ruff fixes * Update test_run.py * remove unnecessary call to delete container * ruff format updates * Reverted test_run.py * fix ruff formatter errors * cleanup variable names to be more consistent * remove extra semversioner file * revert pydantic model changes * revert pydantic model change * revert pydantic model change * re-enable inline formatting rule * update documentation in dev guide --------- Co-authored-by: Alonso Guevara <alonsog@microsoft.com> Co-authored-by: Josh Bradley <joshbradley@microsoft.com>
2024-12-19 14:43:21 -05:00
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License
"""CosmosDB Storage Tests."""
import json
import re
import sys
import pytest
from graphrag.storage.cosmosdb_pipeline_storage import CosmosDBPipelineStorage
# cspell:disable-next-line well-known-key
WELL_KNOWN_COSMOS_CONNECTION_STRING = "AccountEndpoint=https://127.0.0.1:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw=="
# the cosmosdb emulator is only available on windows runners at this time
if not sys.platform.startswith("win"):
pytest.skip(
"encountered windows-only tests -- will skip for now", allow_module_level=True
)
async def test_find():
storage = CosmosDBPipelineStorage(
connection_string=WELL_KNOWN_COSMOS_CONNECTION_STRING,
database_name="testfind",
container_name="testfindcontainer",
)
try:
try:
items = list(storage.find(file_pattern=re.compile(r".*\.json$")))
items = [item[0] for item in items]
assert items == []
json_content = {
"content": "Merry Christmas!",
}
await storage.set(
"christmas.json", json.dumps(json_content), encoding="utf-8"
)
items = list(storage.find(file_pattern=re.compile(r".*\.json$")))
items = [item[0] for item in items]
assert items == ["christmas.json"]
json_content = {
"content": "Hello, World!",
}
await storage.set("test.json", json.dumps(json_content), encoding="utf-8")
items = list(storage.find(file_pattern=re.compile(r".*\.json$")))
items = [item[0] for item in items]
assert items == ["christmas.json", "test.json"]
output = await storage.get("test.json")
output_json = json.loads(output)
assert output_json["content"] == "Hello, World!"
json_exists = await storage.has("christmas.json")
assert json_exists is True
json_exists = await storage.has("easter.json")
assert json_exists is False
finally:
await storage.delete("test.json")
output = await storage.get("test.json")
assert output is None
finally:
await storage.clear()
async def test_child():
storage = CosmosDBPipelineStorage(
connection_string=WELL_KNOWN_COSMOS_CONNECTION_STRING,
database_name="testchild",
container_name="testchildcontainer",
)
try:
child_storage = storage.child("child")
assert type(child_storage) is CosmosDBPipelineStorage
finally:
await storage.clear()
async def test_clear():
storage = CosmosDBPipelineStorage(
connection_string=WELL_KNOWN_COSMOS_CONNECTION_STRING,
database_name="testclear",
container_name="testclearcontainer",
)
try:
json_exists = {
"content": "Merry Christmas!",
}
await storage.set("christmas.json", json.dumps(json_exists), encoding="utf-8")
json_exists = {
"content": "Happy Easter!",
}
await storage.set("easter.json", json.dumps(json_exists), encoding="utf-8")
await storage.clear()
items = list(storage.find(file_pattern=re.compile(r".*\.json$")))
items = [item[0] for item in items]
assert items == []
output = await storage.get("easter.json")
assert output is None
assert storage._container_client is None # noqa: SLF001
assert storage._database_client is None # noqa: SLF001
finally:
await storage.clear()