diff --git a/api/core/rag/datasource/vdb/qdrant/qdrant_vector.py b/api/core/rag/datasource/vdb/qdrant/qdrant_vector.py index 8ce194c683..05fa73011a 100644 --- a/api/core/rag/datasource/vdb/qdrant/qdrant_vector.py +++ b/api/core/rag/datasource/vdb/qdrant/qdrant_vector.py @@ -47,6 +47,7 @@ class QdrantConfig(BaseModel): grpc_port: int = 6334 prefer_grpc: bool = False replication_factor: int = 1 + write_consistency_factor: int = 1 def to_qdrant_params(self): if self.endpoint and self.endpoint.startswith("path:"): @@ -127,6 +128,7 @@ class QdrantVector(BaseVector): hnsw_config=hnsw_config, timeout=int(self._client_config.timeout), replication_factor=self._client_config.replication_factor, + write_consistency_factor=self._client_config.write_consistency_factor, ) # create group_id payload index diff --git a/api/core/rag/datasource/vdb/vector_factory.py b/api/core/rag/datasource/vdb/vector_factory.py index 67a4a515b1..00080b0fae 100644 --- a/api/core/rag/datasource/vdb/vector_factory.py +++ b/api/core/rag/datasource/vdb/vector_factory.py @@ -1,3 +1,5 @@ +import logging +import time from abc import ABC, abstractmethod from typing import Any, Optional @@ -13,6 +15,8 @@ from extensions.ext_database import db from extensions.ext_redis import redis_client from models.dataset import Dataset, Whitelist +logger = logging.getLogger(__name__) + class AbstractVectorFactory(ABC): @abstractmethod @@ -173,8 +177,20 @@ class Vector: def create(self, texts: Optional[list] = None, **kwargs): if texts: - embeddings = self._embeddings.embed_documents([document.page_content for document in texts]) - self._vector_processor.create(texts=texts, embeddings=embeddings, **kwargs) + start = time.time() + logger.info(f"start embedding {len(texts)} texts {start}") + batch_size = 1000 + total_batches = len(texts) + batch_size - 1 + for i in range(0, len(texts), batch_size): + batch = texts[i : i + batch_size] + batch_start = time.time() + logger.info(f"Processing batch {i // batch_size + 1}/{total_batches} ({len(batch)} texts)") + batch_embeddings = self._embeddings.embed_documents([document.page_content for document in batch]) + logger.info( + f"Embedding batch {i // batch_size + 1}/{total_batches} took {time.time() - batch_start:.3f}s" + ) + self._vector_processor.create(texts=batch, embeddings=batch_embeddings, **kwargs) + logger.info(f"Embedding {len(texts)} texts took {time.time() - start:.3f}s") def add_texts(self, documents: list[Document], **kwargs): if kwargs.get("duplicate_check", False):