2024-12-05 13:57:43 +08:00
|
|
|
import os
|
|
|
|
from tqdm.asyncio import tqdm as tqdm_async
|
|
|
|
from dataclasses import dataclass
|
2025-01-27 09:35:50 +01:00
|
|
|
import pipmaster as pm
|
|
|
|
if not pm.is_installed("pymongo"):
|
|
|
|
pm.install("pymongo")
|
|
|
|
|
2024-12-05 13:57:43 +08:00
|
|
|
from pymongo import MongoClient
|
2025-01-13 07:06:01 +00:00
|
|
|
from typing import Union
|
2024-12-05 13:57:43 +08:00
|
|
|
from lightrag.utils import logger
|
|
|
|
|
|
|
|
from lightrag.base import BaseKVStorage
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
class MongoKVStorage(BaseKVStorage):
|
|
|
|
def __post_init__(self):
|
|
|
|
client = MongoClient(
|
|
|
|
os.environ.get("MONGO_URI", "mongodb://root:root@localhost:27017/")
|
|
|
|
)
|
|
|
|
database = client.get_database(os.environ.get("MONGO_DATABASE", "LightRAG"))
|
|
|
|
self._data = database.get_collection(self.namespace)
|
|
|
|
logger.info(f"Use MongoDB as KV {self.namespace}")
|
|
|
|
|
|
|
|
async def all_keys(self) -> list[str]:
|
|
|
|
return [x["_id"] for x in self._data.find({}, {"_id": 1})]
|
|
|
|
|
|
|
|
async def get_by_id(self, id):
|
|
|
|
return self._data.find_one({"_id": id})
|
|
|
|
|
|
|
|
async def get_by_ids(self, ids, fields=None):
|
|
|
|
if fields is None:
|
|
|
|
return list(self._data.find({"_id": {"$in": ids}}))
|
|
|
|
return list(
|
|
|
|
self._data.find(
|
|
|
|
{"_id": {"$in": ids}},
|
|
|
|
{field: 1 for field in fields},
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
async def filter_keys(self, data: list[str]) -> set[str]:
|
|
|
|
existing_ids = [
|
|
|
|
str(x["_id"]) for x in self._data.find({"_id": {"$in": data}}, {"_id": 1})
|
|
|
|
]
|
|
|
|
return set([s for s in data if s not in existing_ids])
|
|
|
|
|
|
|
|
async def upsert(self, data: dict[str, dict]):
|
2025-01-13 07:06:01 +00:00
|
|
|
if self.namespace == "llm_response_cache":
|
|
|
|
for mode, items in data.items():
|
|
|
|
for k, v in tqdm_async(items.items(), desc="Upserting"):
|
|
|
|
key = f"{mode}_{k}"
|
2025-01-13 07:27:30 +00:00
|
|
|
result = self._data.update_one(
|
|
|
|
{"_id": key}, {"$setOnInsert": v}, upsert=True
|
|
|
|
)
|
2025-01-13 07:06:01 +00:00
|
|
|
if result.upserted_id:
|
|
|
|
logger.debug(f"\nInserted new document with key: {key}")
|
|
|
|
data[mode][k]["_id"] = key
|
|
|
|
else:
|
|
|
|
for k, v in tqdm_async(data.items(), desc="Upserting"):
|
|
|
|
self._data.update_one({"_id": k}, {"$set": v}, upsert=True)
|
|
|
|
data[k]["_id"] = k
|
2024-12-05 13:57:43 +08:00
|
|
|
return data
|
2025-01-13 07:27:30 +00:00
|
|
|
|
2025-01-13 07:06:01 +00:00
|
|
|
async def get_by_mode_and_id(self, mode: str, id: str) -> Union[dict, None]:
|
|
|
|
if "llm_response_cache" == self.namespace:
|
|
|
|
res = {}
|
2025-01-13 07:27:30 +00:00
|
|
|
v = self._data.find_one({"_id": mode + "_" + id})
|
2025-01-13 07:06:01 +00:00
|
|
|
if v:
|
|
|
|
res[id] = v
|
2025-01-13 07:27:30 +00:00
|
|
|
logger.debug(f"llm_response_cache find one by:{id}")
|
2025-01-13 07:06:01 +00:00
|
|
|
return res
|
|
|
|
else:
|
|
|
|
return None
|
|
|
|
else:
|
|
|
|
return None
|
2025-01-13 07:27:30 +00:00
|
|
|
|
2024-12-05 13:57:43 +08:00
|
|
|
async def drop(self):
|
|
|
|
""" """
|
|
|
|
pass
|