Improve MongoDB vector index handling with workspace support

- Add workspace-specific index naming
- Store index name as instance variable
This commit is contained in:
yangdx 2025-07-07 03:19:41 +08:00
parent 1d24e8ca3c
commit 907f2313cd

View File

@ -1318,6 +1318,7 @@ class MongoGraphStorage(BaseGraphStorage):
class MongoVectorDBStorage(BaseVectorStorage):
db: AsyncDatabase | None = field(default=None)
_data: AsyncCollection | None = field(default=None)
_index_name: str = field(default="", init=False)
def __init__(
self, namespace, global_config, embedding_func, workspace=None, meta_fields=None
@ -1355,6 +1356,14 @@ class MongoVectorDBStorage(BaseVectorStorage):
logger.debug(f"Final namespace with workspace prefix: '{self.namespace}'")
# When workspace is empty, keep the original namespace unchanged
# Set index name based on workspace for backward compatibility
if effective_workspace:
# Use collection-specific index name for workspaced collections to avoid conflicts
self._index_name = f"vector_knn_index_{self.namespace}"
else:
# Keep original index name for backward compatibility with existing deployments
self._index_name = "vector_knn_index"
kwargs = self.global_config.get("vector_db_storage_cls_kwargs", {})
cosine_threshold = kwargs.get("cosine_better_than_threshold")
if cosine_threshold is None:
@ -1384,13 +1393,11 @@ class MongoVectorDBStorage(BaseVectorStorage):
async def create_vector_index_if_not_exists(self):
"""Creates an Atlas Vector Search index."""
try:
index_name = "vector_knn_index"
indexes_cursor = await self._data.list_search_indexes()
indexes = await indexes_cursor.to_list(length=None)
for index in indexes:
if index["name"] == index_name:
logger.debug("vector index already exist")
if index["name"] == self._index_name:
logger.info(f"vector index {self._index_name} already exist")
return
search_index_model = SearchIndexModel(
@ -1404,15 +1411,15 @@ class MongoVectorDBStorage(BaseVectorStorage):
}
]
},
name=index_name,
name=self._index_name,
type="vectorSearch",
)
await self._data.create_search_index(search_index_model)
logger.info("Vector index created successfully.")
logger.info(f"Vector index {self._index_name} created successfully.")
except PyMongoError as _:
logger.debug("vector index already exist")
except PyMongoError as e:
logger.error(f"Error creating vector index {self._index_name}: {e}")
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
logger.debug(f"Inserting {len(data)} to {self.namespace}")
@ -1467,7 +1474,7 @@ class MongoVectorDBStorage(BaseVectorStorage):
pipeline = [
{
"$vectorSearch": {
"index": "vector_knn_index", # Ensure this matches the created index name
"index": self._index_name, # Use stored index name for consistency
"path": "vector",
"queryVector": query_vector,
"numCandidates": 100, # Adjust for performance