LightRAG/lightrag/kg/milvus_impl.py

136 lines
4.4 KiB
Python
Raw Normal View History

2024-12-04 17:26:47 +08:00
import asyncio
import os
from typing import Any, final
2024-12-04 17:26:47 +08:00
from dataclasses import dataclass
import numpy as np
from lightrag.utils import logger
from ..base import BaseVectorStorage
2025-02-16 15:08:50 +01:00
import pipmaster as pm
2025-02-18 20:27:59 +01:00
if not pm.is_installed("configparser"):
pm.install("configparser")
2025-01-27 23:21:34 +08:00
2025-02-16 15:08:50 +01:00
if not pm.is_installed("pymilvus"):
pm.install("pymilvus")
try:
2025-02-18 20:27:59 +01:00
import configparser
from pymilvus import MilvusClient
2025-02-16 15:08:50 +01:00
except ImportError as e:
raise ImportError(
2025-02-16 15:08:50 +01:00
"`pymilvus` library is not installed. Please install it via pip: `pip install pymilvus`."
) from e
2024-12-04 17:26:47 +08:00
config = configparser.ConfigParser()
config.read("config.ini", "utf-8")
2025-02-11 03:29:40 +08:00
@final
2024-12-04 17:26:47 +08:00
@dataclass
2025-02-13 17:32:05 +08:00
class MilvusVectorDBStorage(BaseVectorStorage):
2024-12-04 17:26:47 +08:00
@staticmethod
def create_collection_if_not_exist(
client: MilvusClient, collection_name: str, **kwargs
):
if client.has_collection(collection_name):
return
client.create_collection(
collection_name, max_length=64, id_type="string", **kwargs
)
def __post_init__(self):
2025-02-14 03:00:56 +08:00
kwargs = self.global_config.get("vector_db_storage_cls_kwargs", {})
cosine_threshold = kwargs.get("cosine_better_than_threshold")
if cosine_threshold is None:
2025-02-13 04:12:00 +08:00
raise ValueError(
"cosine_better_than_threshold must be specified in vector_db_storage_cls_kwargs"
)
self.cosine_better_than_threshold = cosine_threshold
2024-12-04 17:26:47 +08:00
self._client = MilvusClient(
2025-02-11 03:29:40 +08:00
uri=os.environ.get(
"MILVUS_URI",
config.get(
"milvus",
"uri",
fallback=os.path.join(
self.global_config["working_dir"], "milvus_lite.db"
),
),
),
user=os.environ.get(
"MILVUS_USER", config.get("milvus", "user", fallback=None)
),
password=os.environ.get(
"MILVUS_PASSWORD", config.get("milvus", "password", fallback=None)
),
token=os.environ.get(
"MILVUS_TOKEN", config.get("milvus", "token", fallback=None)
),
db_name=os.environ.get(
"MILVUS_DB_NAME", config.get("milvus", "db_name", fallback=None)
),
2024-12-04 17:26:47 +08:00
)
self._max_batch_size = self.global_config["embedding_batch_num"]
2025-02-13 17:32:05 +08:00
MilvusVectorDBStorage.create_collection_if_not_exist(
2024-12-04 17:26:47 +08:00
self._client,
self.namespace,
dimension=self.embedding_func.embedding_dim,
)
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
2025-02-19 22:22:41 +01:00
logger.info(f"Inserting {len(data)} to {self.namespace}")
if not data:
return
list_data: list[dict[str, Any]] = [
2024-12-04 17:26:47 +08:00
{
"id": k,
**{k1: v1 for k1, v1 in v.items() if k1 in self.meta_fields},
}
for k, v in data.items()
]
contents = [v["content"] for v in data.values()]
batches = [
contents[i : i + self._max_batch_size]
for i in range(0, len(contents), self._max_batch_size)
]
2024-12-13 16:48:22 +08:00
embedding_tasks = [self.embedding_func(batch) for batch in batches]
2024-12-13 16:48:22 +08:00
embeddings_list = await asyncio.gather(*embedding_tasks)
2024-12-04 17:26:47 +08:00
embeddings = np.concatenate(embeddings_list)
for i, d in enumerate(list_data):
d["vector"] = embeddings[i]
results = self._client.upsert(collection_name=self.namespace, data=list_data)
return results
async def query(self, query: str, top_k: int) -> list[dict[str, Any]]:
2024-12-04 17:26:47 +08:00
embedding = await self.embedding_func([query])
results = self._client.search(
collection_name=self.namespace,
data=embedding,
limit=top_k,
output_fields=list(self.meta_fields),
2025-02-13 04:12:00 +08:00
search_params={
"metric_type": "COSINE",
"params": {"radius": self.cosine_better_than_threshold},
},
2024-12-04 17:26:47 +08:00
)
print(results)
return [
{**dp["entity"], "id": dp["id"], "distance": dp["distance"]}
for dp in results[0]
]
async def index_done_callback(self) -> None:
2025-02-16 16:04:07 +01:00
# Milvus handles persistence automatically
pass
2025-02-16 13:55:30 +01:00
async def delete_entity(self, entity_name: str) -> None:
raise NotImplementedError
async def delete_entity_relation(self, entity_name: str) -> None:
2025-02-16 13:55:30 +01:00
raise NotImplementedError