# Copyright (c) 2024 Microsoft Corporation. # Licensed under the MIT License """Blob Storage Tests.""" import re from graphrag.index.storage.blob_pipeline_storage import BlobPipelineStorage # cspell:disable-next-line well-known-key WELL_KNOWN_BLOB_STORAGE_KEY = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;" async def test_find(): storage = BlobPipelineStorage( connection_string=WELL_KNOWN_BLOB_STORAGE_KEY, container_name="testfind", ) try: try: items = list( storage.find(base_dir="input", file_pattern=re.compile(r".*\.txt$")) ) items = [item[0] for item in items] assert items == [] await storage.set( "input/christmas.txt", "Merry Christmas!", encoding="utf-8" ) items = list( storage.find(base_dir="input", file_pattern=re.compile(r".*\.txt$")) ) items = [item[0] for item in items] assert items == ["input/christmas.txt"] await storage.set("test.txt", "Hello, World!", encoding="utf-8") items = list(storage.find(file_pattern=re.compile(r".*\.txt$"))) items = [item[0] for item in items] assert items == ["input/christmas.txt", "test.txt"] output = await storage.get("test.txt") assert output == "Hello, World!" finally: await storage.delete("test.txt") output = await storage.get("test.txt") assert output is None finally: storage.delete_container() async def test_dotprefix(): storage = BlobPipelineStorage( connection_string=WELL_KNOWN_BLOB_STORAGE_KEY, container_name="testfind", path_prefix=".", ) try: await storage.set("input/christmas.txt", "Merry Christmas!", encoding="utf-8") items = list(storage.find(file_pattern=re.compile(r".*\.txt$"))) items = [item[0] for item in items] assert items == ["input/christmas.txt"] finally: storage.delete_container() async def test_child(): parent = BlobPipelineStorage( connection_string=WELL_KNOWN_BLOB_STORAGE_KEY, container_name="testchild", ) try: try: storage = parent.child("input") await storage.set("christmas.txt", "Merry Christmas!", encoding="utf-8") items = list(storage.find(re.compile(r".*\.txt$"))) items = [item[0] for item in items] assert items == ["christmas.txt"] await storage.set("test.txt", "Hello, World!", encoding="utf-8") items = list(storage.find(re.compile(r".*\.txt$"))) items = [item[0] for item in items] print("FOUND", items) assert items == ["christmas.txt", "test.txt"] output = await storage.get("test.txt") assert output == "Hello, World!" items = list(parent.find(re.compile(r".*\.txt$"))) items = [item[0] for item in items] print("FOUND ITEMS", items) assert items == ["input/christmas.txt", "input/test.txt"] finally: await parent.delete("input/test.txt") has_test = await parent.has("input/test.txt") assert not has_test finally: parent.delete_container()