diff --git a/conftest.py b/conftest.py index 8d673d46d..a381d802f 100644 --- a/conftest.py +++ b/conftest.py @@ -1,6 +1,8 @@ def pytest_addoption(parser): parser.addoption( - "--document_store_type", action="store", default="elasticsearch, faiss, sql, memory, milvus1, milvus, weaviate" + "--document_store_type", + action="store", + default="elasticsearch, faiss, sql, memory, milvus1, milvus, weaviate, pinecone", ) diff --git a/docs/_src/api/api/document_store.md b/docs/_src/api/api/document_store.md index 073837da5..49000b130 100644 --- a/docs/_src/api/api/document_store.md +++ b/docs/_src/api/api/document_store.md @@ -4689,7 +4689,7 @@ number of labels for the given index ## PineconeDocumentStore ```python -class PineconeDocumentStore(SQLDocumentStore) +class PineconeDocumentStore(BaseDocumentStore) ``` Document store for very large scale embedding based dense retrievers like the DPR. This is a hosted document store, @@ -4708,7 +4708,7 @@ the vector embeddings and metadata (for filtering) are indexed in a Pinecone Ind #### PineconeDocumentStore.\_\_init\_\_ ```python -def __init__(api_key: str, environment: str = "us-west1-gcp", sql_url: str = "sqlite:///pinecone_document_store.db", pinecone_index: Optional[pinecone.Index] = None, embedding_dim: int = 768, return_embedding: bool = False, index: str = "document", similarity: str = "cosine", replicas: int = 1, shards: int = 1, embedding_field: str = "embedding", progress_bar: bool = True, duplicate_documents: str = "overwrite", recreate_index: bool = False, metadata_config: dict = {"indexed": []}, validate_index_sync: bool = True) +def __init__(api_key: str, environment: str = "us-west1-gcp", pinecone_index: Optional[pinecone.Index] = None, embedding_dim: int = 768, return_embedding: bool = False, index: str = "document", similarity: str = "cosine", replicas: int = 1, shards: int = 1, embedding_field: str = "embedding", progress_bar: bool = True, duplicate_documents: str = "overwrite", recreate_index: bool = False, metadata_config: dict = {"indexed": []}, validate_index_sync: bool = True) ``` **Arguments**: @@ -4716,8 +4716,6 @@ def __init__(api_key: str, environment: str = "us-west1-gcp", sql_url: str = "sq - `api_key`: Pinecone vector database API key ([https://app.pinecone.io](https://app.pinecone.io)). - `environment`: Pinecone cloud environment uses `"us-west1-gcp"` by default. Other GCP and AWS regions are supported, contact Pinecone [here](https://www.pinecone.io/contact/) if required. -- `sql_url`: SQL connection URL for database. It defaults to local file based SQLite DB. For large scale -deployment, Postgres is recommended. - `pinecone_index`: pinecone-client Index object, an index will be initialized or loaded if not specified. - `embedding_dim`: The embedding vector size. - `return_embedding`: Whether to return document embeddings. @@ -4743,17 +4741,57 @@ Parameter options: created using the config you are using for initialization. Be aware that all data in the old index will be lost if you choose to recreate the index. Be aware that both the document_index and the label_index will be recreated. -- `metadata_config`: Which metadata fields should be indexed. Should be in the format -`{"indexed": ["metadata-field-1", "metadata-field-2", "metadata-field-n"]}`. -Indexing metadata fields is a prerequisite to allow filtering of documents by metadata values. -- `validate_index_sync`: Whether to check that the document count equals the embedding count at initialization time +- `metadata_config`: Which metadata fields should be indexed, part of the +[selective metadata filtering](https://www.pinecone.io/docs/manage-indexes/`selective`-metadata-indexing) feature. +Should be in the format `{"indexed": ["metadata-field-1", "metadata-field-2", "metadata-field-n"]}`. By default, +no fields are indexed. + + + +#### PineconeDocumentStore.get\_document\_count + +```python +def get_document_count(filters: Optional[Dict[str, Union[Dict, List, str, int, float, bool]]] = None, index: Optional[str] = None, only_documents_without_embedding: bool = False, headers: Optional[Dict[str, str]] = None) -> int +``` + +Return the count of embeddings in the document store. + +**Arguments**: + +- `filters`: Optional filters to narrow down the documents for which embeddings are to be updated. +Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical +operator (`"$and"`, `"$or"`, `"$not"`), a comparison operator (`"$eq"`, `"$in"`, `"$gt"`, +`"$gte"`, `"$lt"`, `"$lte"`), or a metadata field name. +Logical operator keys take a dictionary of metadata field names or logical operators as +value. Metadata field names take a dictionary of comparison operators as value. Comparison +operator keys take a single value or (in case of `"$in"`) a list of values as value. +If no logical operator is provided, `"$and"` is used as default operation. If no comparison +operator is provided, `"$eq"` (or `"$in"` if the comparison value is a list) is used as default +operation. + __Example__: + ```python + filters = { + "$and": { + "type": {"$eq": "article"}, + "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"}, + "rating": {"$gte": 3}, + "$or": { + "genre": {"$in": ["economy", "politics"]}, + "publisher": {"$eq": "nytimes"} + } + } + } + ``` +- `index`: Optional index to use for the query. If not provided, the default index is used. +- `only_documents_without_embedding`: If set to `True`, only documents without embeddings are counted. +- `headers`: PineconeDocumentStore does not support headers. #### PineconeDocumentStore.write\_documents ```python -def write_documents(documents: Union[List[dict], List[Document]], index: Optional[str] = None, batch_size: int = 32, duplicate_documents: Optional[str] = None, headers: Optional[Dict[str, str]] = None) +def write_documents(documents: Union[List[dict], List[Document]], index: Optional[str] = None, batch_size: int = 32, duplicate_documents: Optional[str] = None, headers: Optional[Dict[str, str]] = None, labels: Optional[bool] = False) ``` Add new documents to the DocumentStore. @@ -4771,6 +4809,7 @@ Parameter options: - `"overwrite"`: Update any existing documents with the same ID when adding documents. - `"fail"`: An error is raised if the document ID of the document being added already exists. - `headers`: PineconeDocumentStore does not support headers. +- `labels`: Tells us whether these records are labels or not. Defaults to False. **Raises**: @@ -4824,12 +4863,55 @@ operation. - `batch_size`: Number of documents to process at a time. When working with large number of documents, batching can help reduce memory footprint. + + +#### PineconeDocumentStore.get\_all\_documents + +```python +def get_all_documents(index: Optional[str] = None, filters: Optional[Dict[str, Union[Dict, List, str, int, float, bool]]] = None, return_embedding: Optional[bool] = None, batch_size: int = 32, headers: Optional[Dict[str, str]] = None, namespace: Optional[str] = None) -> List[Document] +``` + +Retrieves all documents in the index. + +**Arguments**: + +- `index`: Optional index name to retrieve all documents from. +- `filters`: Optional filters to narrow down the documents that will be retrieved. +Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical +operator (`"$and"`, `"$or"`, `"$not"`), a comparison operator (`"$eq"`, `"$in"`, `"$gt"`, +`"$gte"`, `"$lt"`, `"$lte"`) or a metadata field name. +Logical operator keys take a dictionary of metadata field names and/or logical operators as +value. Metadata field names take a dictionary of comparison operators as value. Comparison +operator keys take a single value or (in case of `"$in"`) a list of values as value. +If no logical operator is provided, `"$and"` is used as default operation. If no comparison +operator is provided, `"$eq"` (or `"$in"` if the comparison value is a list) is used as default +operation. + __Example__: + ```python + filters = { + "$and": { + "type": {"$eq": "article"}, + "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"}, + "rating": {"$gte": 3}, + "$or": { + "genre": {"$in": ["economy", "politics"]}, + "publisher": {"$eq": "nytimes"} + } + } + } + ``` +- `return_embedding`: Optional flag to return the embedding of the document. +- `batch_size`: Number of documents to process at a time. When working with large number of documents, +batching can help reduce memory footprint. +- `headers`: Pinecone does not support headers. +- `namespace`: Optional namespace to retrieve documents from. + #### PineconeDocumentStore.get\_all\_documents\_generator ```python -def get_all_documents_generator(index: Optional[str] = None, filters: Optional[Dict[str, Union[Dict, List, str, int, float, bool]]] = None, return_embedding: Optional[bool] = None, batch_size: int = 32, headers: Optional[Dict[str, str]] = None) -> Generator[Document, None, None] +def get_all_documents_generator(index: Optional[str] = None, filters: Optional[Dict[str, Union[Dict, List, str, int, float, bool]]] = None, return_embedding: Optional[bool] = None, batch_size: int = 32, headers: Optional[Dict[str, str]] = None, namespace: Optional[str] = None) -> Generator[Document, None, None] ``` Get all documents from the document store. Under-the-hood, documents are fetched in batches from the @@ -4868,6 +4950,45 @@ operation. - `return_embedding`: Whether to return the document embeddings. - `batch_size`: When working with large number of documents, batching can help reduce memory footprint. - `headers`: PineconeDocumentStore does not support headers. +- `namespace`: Optional namespace to retrieve documents from. + + + +#### PineconeDocumentStore.get\_documents\_by\_id + +```python +def get_documents_by_id(ids: List[str], index: Optional[str] = None, batch_size: int = 32, headers: Optional[Dict[str, str]] = None, return_embedding: Optional[bool] = None, namespace: str = None) -> List[Document] +``` + +Retrieves all documents in the index using their IDs. + +**Arguments**: + +- `ids`: List of IDs to retrieve. +- `index`: Optional index name to retrieve all documents from. +- `batch_size`: Number of documents to retrieve at a time. When working with large number of documents, +batching can help reduce memory footprint. +- `headers`: Pinecone does not support headers. +- `return_embedding`: Optional flag to return the embedding of the document. +- `namespace`: Optional namespace to retrieve documents from. + + + +#### PineconeDocumentStore.get\_document\_by\_id + +```python +def get_document_by_id(id: str, index: Optional[str] = None, headers: Optional[Dict[str, str]] = None, return_embedding: Optional[bool] = None, namespace: str = None) -> Document +``` + +Returns a single Document retrieved using an ID. + +**Arguments**: + +- `id`: ID string to retrieve. +- `index`: Optional index name to retrieve all documents from. +- `headers`: Pinecone does not support headers. +- `return_embedding`: Optional flag to return the embedding of the document. +- `namespace`: Optional namespace to retrieve documents from. @@ -4879,22 +5000,35 @@ def get_embedding_count(index: Optional[str] = None, filters: Optional[Dict[str, Return the count of embeddings in the document store. +**Arguments**: + +- `index`: Optional index name to retrieve all documents from. +- `filters`: Filters are not supported for `get_embedding_count` in Pinecone. + #### PineconeDocumentStore.update\_document\_meta ```python -def update_document_meta(id: str, meta: Dict[str, str], index: str = None) +def update_document_meta(id: str, meta: Dict[str, str], namespace: str = None, index: str = None) ``` -Update the metadata dictionary of a document by specifying its string id +Update the metadata dictionary of a document by specifying its string ID. + +**Arguments**: + +- `id`: ID of the Document to update. +- `meta`: Dictionary of new metadata. +- `namespace`: Optional namespace to update documents from. If not specified, defaults to the embedding +namespace (vectors) if it exists, otherwise the document namespace (no-vectors). +- `index`: Optional index name to update documents from. #### PineconeDocumentStore.delete\_documents ```python -def delete_documents(index: Optional[str] = None, ids: Optional[List[str]] = None, filters: Optional[Dict[str, Union[Dict, List, str, int, float, bool]]] = None, headers: Optional[Dict[str, str]] = None) +def delete_documents(index: Optional[str] = None, ids: Optional[List[str]] = None, filters: Optional[Dict[str, Union[Dict, List, str, int, float, bool]]] = None, headers: Optional[Dict[str, str]] = None, drop_ids: Optional[bool] = True, namespace: Optional[str] = None) ``` Delete documents from the document store. @@ -4904,6 +5038,8 @@ Delete documents from the document store. - `index`: Index name to delete the documents from. If `None`, the DocumentStore's default index (`self.index`) will be used. - `ids`: Optional list of IDs to narrow down the documents to be deleted. +- `namespace`: Optional namespace string. By default, it deletes vectors from the embeddings namespace +unless the namespace is empty, in which case it deletes from the documents namespace. - `filters`: Optional filters to narrow down the documents for which embeddings are to be updated. Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical operator (`"$and"`, `"$or"`, `"$not"`), a comparison operator (`"$eq"`, `"$in"`, `"$gt"`, @@ -4929,6 +5065,14 @@ operation. } ``` - `headers`: PineconeDocumentStore does not support headers. +- `drop_ids`: Specifies if the locally stored IDs should be deleted. The default +is True. +- `namespace`: Optional namespace to delete documents from. If not specified, defaults to the embedding +namespace (vectors) if it exists, otherwise the document namespace (no-vectors). + +**Returns**: + +`None`: @@ -4953,7 +5097,7 @@ None #### PineconeDocumentStore.query\_by\_embedding ```python -def query_by_embedding(query_emb: np.ndarray, filters: Optional[Dict[str, Union[Dict, List, str, int, float, bool]]] = None, top_k: int = 10, index: Optional[str] = None, return_embedding: Optional[bool] = None, headers: Optional[Dict[str, str]] = None, scale_score: bool = True) -> List[Document] +def query_by_embedding(query_emb: np.ndarray, filters: Optional[Dict[str, Union[Dict, List, str, int, float, bool]]] = None, top_k: int = 10, index: Optional[str] = None, return_embedding: Optional[bool] = None, headers: Optional[Dict[str, str]] = None, scale_score: bool = True, namespace: Optional[str] = None) -> List[Document] ``` Find the document that is most similar to the provided `query_emb` by using a vector similarity metric. @@ -5038,7 +5182,47 @@ Otherwise raw similarity scores (e.g. cosine or dot_product) will be used. def load(cls) ``` -Default class method used for loading indexes. Not applicable to the PineconeDocumentStore. +Default class method used for loading indexes. Not applicable to PineconeDocumentStore. + + + +#### PineconeDocumentStore.delete\_labels + +```python +def delete_labels(index: Optional[str] = None, ids: Optional[List[str]] = None, filters: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, batch_size: int = 32) +``` + +Default class method used for deleting labels. Not supported by PineconeDocumentStore. + + + +#### PineconeDocumentStore.get\_all\_labels + +```python +def get_all_labels(index=None, filters: Optional[dict] = None, headers: Optional[Dict[str, str]] = None) +``` + +Default class method used for getting all labels. + + + +#### PineconeDocumentStore.get\_label\_count + +```python +def get_label_count(index: Optional[str] = None, headers: Optional[Dict[str, str]] = None) +``` + +Default class method used for counting labels. Not supported by PineconeDocumentStore. + + + +#### PineconeDocumentStore.write\_labels + +```python +def write_labels(labels, index=None, headers: Optional[Dict[str, str]] = None) +``` + +Default class method used for writing labels. diff --git a/haystack/document_stores/filter_utils.py b/haystack/document_stores/filter_utils.py index 42cd62699..a74cec32f 100644 --- a/haystack/document_stores/filter_utils.py +++ b/haystack/document_stores/filter_utils.py @@ -6,6 +6,7 @@ from sqlalchemy.sql import select from sqlalchemy import and_, or_ from haystack.document_stores.utils import convert_date_to_rfc3339 +from haystack.errors import FilterError def nested_defaultdict() -> defaultdict: @@ -460,7 +461,8 @@ class InOperation(ComparisonOperation): # is only initialized with lists, but changing the type annotation would mean duplicating __init__ def convert_to_elasticsearch(self) -> Dict[str, Dict[str, List]]: - assert isinstance(self.comparison_value, list), "'$in' operation requires comparison value to be a list." + if not isinstance(self.comparison_value, list): + raise FilterError("'$in' operation requires comparison value to be a list.") return {"terms": {self.field_name: self.comparison_value}} def convert_to_sql(self, meta_document_orm): @@ -470,7 +472,8 @@ class InOperation(ComparisonOperation): def convert_to_weaviate(self) -> Dict[str, Union[str, List[Dict]]]: filter_dict: Dict[str, Union[str, List[Dict]]] = {"operator": "Or", "operands": []} - assert isinstance(self.comparison_value, list), "'$in' operation requires comparison value to be a list." + if not isinstance(self.comparison_value, list): + raise FilterError("'$in' operation requires comparison value to be a list.") for value in self.comparison_value: comp_value_type, comp_value = self._get_weaviate_datatype(value) assert isinstance(filter_dict["operands"], list) # Necessary for mypy @@ -481,7 +484,8 @@ class InOperation(ComparisonOperation): return filter_dict def convert_to_pinecone(self) -> Dict[str, Dict[str, List]]: - assert isinstance(self.comparison_value, list), "'$in' operation requires comparison value to be a list." + if not isinstance(self.comparison_value, list): + raise FilterError("'$in' operation requires comparison value to be a list.") return {self.field_name: {"$in": self.comparison_value}} def invert(self) -> "NinOperation": @@ -499,7 +503,8 @@ class NeOperation(ComparisonOperation): return fields[self.field_name] != self.comparison_value def convert_to_elasticsearch(self) -> Dict[str, Dict[str, Dict[str, Dict[str, Union[str, int, float, bool]]]]]: - assert not isinstance(self.comparison_value, list), "Use '$nin' operation for lists as comparison values." + if isinstance(self.comparison_value, list): + raise FilterError("Use '$nin' operation for lists as comparison values.") return {"bool": {"must_not": {"term": {self.field_name: self.comparison_value}}}} def convert_to_sql(self, meta_document_orm): @@ -530,7 +535,8 @@ class NinOperation(ComparisonOperation): # is only initialized with lists, but changing the type annotation would mean duplicating __init__ def convert_to_elasticsearch(self) -> Dict[str, Dict[str, Dict[str, Dict[str, List]]]]: - assert isinstance(self.comparison_value, list), "'$nin' operation requires comparison value to be a list." + if not isinstance(self.comparison_value, list): + raise FilterError("'$nin' operation requires comparison value to be a list.") return {"bool": {"must_not": {"terms": {self.field_name: self.comparison_value}}}} def convert_to_sql(self, meta_document_orm): @@ -540,7 +546,8 @@ class NinOperation(ComparisonOperation): def convert_to_weaviate(self) -> Dict[str, Union[str, List[Dict]]]: filter_dict: Dict[str, Union[str, List[Dict]]] = {"operator": "And", "operands": []} - assert isinstance(self.comparison_value, list), "'$nin' operation requires comparison value to be a list." + if not isinstance(self.comparison_value, list): + raise FilterError("'$nin' operation requires comparison value to be a list.") for value in self.comparison_value: comp_value_type, comp_value = self._get_weaviate_datatype(value) assert isinstance(filter_dict["operands"], list) # Necessary for mypy @@ -551,7 +558,8 @@ class NinOperation(ComparisonOperation): return filter_dict def convert_to_pinecone(self) -> Dict[str, Dict[str, List]]: - assert isinstance(self.comparison_value, list), "'$in' operation requires comparison value to be a list." + if not isinstance(self.comparison_value, list): + raise FilterError("'$in' operation requires comparison value to be a list.") return {self.field_name: {"$nin": self.comparison_value}} def invert(self) -> "InOperation": @@ -569,7 +577,8 @@ class GtOperation(ComparisonOperation): return fields[self.field_name] > self.comparison_value def convert_to_elasticsearch(self) -> Dict[str, Dict[str, Dict[str, Union[str, float, int]]]]: - assert not isinstance(self.comparison_value, list), "Comparison value for '$gt' operation must not be a list." + if isinstance(self.comparison_value, list): + raise FilterError("Comparison value for '$gt' operation must not be a list.") return {"range": {self.field_name: {"gt": self.comparison_value}}} def convert_to_sql(self, meta_document_orm): @@ -579,13 +588,13 @@ class GtOperation(ComparisonOperation): def convert_to_weaviate(self) -> Dict[str, Union[List[str], str, float, int]]: comp_value_type, comp_value = self._get_weaviate_datatype() - assert not isinstance(comp_value, list), "Comparison value for '$gt' operation must not be a list." + if isinstance(comp_value, list): + raise FilterError("Comparison value for '$gt' operation must not be a list.") return {"path": [self.field_name], "operator": "GreaterThan", comp_value_type: comp_value} def convert_to_pinecone(self) -> Dict[str, Dict[str, Union[float, int]]]: - assert not isinstance( - self.comparison_value, (list, str) - ), "Comparison value for '$gt' operation must be a float or int." + if not isinstance(self.comparison_value, (float, int)): + raise FilterError("Comparison value for '$gt' operation must be a float or int.") return {self.field_name: {"$gt": self.comparison_value}} def invert(self) -> "LteOperation": @@ -603,7 +612,8 @@ class GteOperation(ComparisonOperation): return fields[self.field_name] >= self.comparison_value def convert_to_elasticsearch(self) -> Dict[str, Dict[str, Dict[str, Union[str, float, int]]]]: - assert not isinstance(self.comparison_value, list), "Comparison value for '$gte' operation must not be a list." + if isinstance(self.comparison_value, list): + raise FilterError("Comparison value for '$gte' operation must not be a list.") return {"range": {self.field_name: {"gte": self.comparison_value}}} def convert_to_sql(self, meta_document_orm): @@ -613,13 +623,13 @@ class GteOperation(ComparisonOperation): def convert_to_weaviate(self) -> Dict[str, Union[List[str], str, float, int]]: comp_value_type, comp_value = self._get_weaviate_datatype() - assert not isinstance(comp_value, list), "Comparison value for '$gte' operation must not be a list." + if isinstance(comp_value, list): + raise FilterError("Comparison value for '$gte' operation must not be a list.") return {"path": [self.field_name], "operator": "GreaterThanEqual", comp_value_type: comp_value} def convert_to_pinecone(self) -> Dict[str, Dict[str, Union[float, int]]]: - assert not isinstance( - self.comparison_value, (list, str) - ), "Comparison value for '$gte' operation must be a float or int." + if not isinstance(self.comparison_value, (float, int)): + raise FilterError("Comparison value for '$gte' operation must be a float or int.") return {self.field_name: {"$gte": self.comparison_value}} def invert(self) -> "LtOperation": @@ -637,7 +647,8 @@ class LtOperation(ComparisonOperation): return fields[self.field_name] < self.comparison_value def convert_to_elasticsearch(self) -> Dict[str, Dict[str, Dict[str, Union[str, float, int]]]]: - assert not isinstance(self.comparison_value, list), "Comparison value for '$lt' operation must not be a list." + if isinstance(self.comparison_value, list): + raise FilterError("Comparison value for '$lt' operation must not be a list.") return {"range": {self.field_name: {"lt": self.comparison_value}}} def convert_to_sql(self, meta_document_orm): @@ -647,13 +658,13 @@ class LtOperation(ComparisonOperation): def convert_to_weaviate(self) -> Dict[str, Union[List[str], str, float, int]]: comp_value_type, comp_value = self._get_weaviate_datatype() - assert not isinstance(comp_value, list), "Comparison value for '$lt' operation must not be a list." + if isinstance(comp_value, list): + raise FilterError("Comparison value for '$lt' operation must not be a list.") return {"path": [self.field_name], "operator": "LessThan", comp_value_type: comp_value} def convert_to_pinecone(self) -> Dict[str, Dict[str, Union[float, int]]]: - assert not isinstance( - self.comparison_value, (list, str) - ), "Comparison value for '$lt' operation must be a float or int." + if not isinstance(self.comparison_value, (float, int)): + raise FilterError("Comparison value for '$lt' operation must be a float or int.") return {self.field_name: {"$lt": self.comparison_value}} def invert(self) -> "GteOperation": @@ -671,7 +682,8 @@ class LteOperation(ComparisonOperation): return fields[self.field_name] <= self.comparison_value def convert_to_elasticsearch(self) -> Dict[str, Dict[str, Dict[str, Union[str, float, int]]]]: - assert not isinstance(self.comparison_value, list), "Comparison value for '$lte' operation must not be a list." + if isinstance(self.comparison_value, list): + raise FilterError("Comparison value for '$lte' operation must not be a list.") return {"range": {self.field_name: {"lte": self.comparison_value}}} def convert_to_sql(self, meta_document_orm): @@ -681,13 +693,13 @@ class LteOperation(ComparisonOperation): def convert_to_weaviate(self) -> Dict[str, Union[List[str], str, float, int]]: comp_value_type, comp_value = self._get_weaviate_datatype() - assert not isinstance(comp_value, list), "Comparison value for '$lte' operation must not be a list." + if isinstance(comp_value, list): + raise FilterError("Comparison value for '$lte' operation must not be a list.") return {"path": [self.field_name], "operator": "LessThanEqual", comp_value_type: comp_value} def convert_to_pinecone(self) -> Dict[str, Dict[str, Union[float, int]]]: - assert not isinstance( - self.comparison_value, (list, str) - ), "Comparison value for '$lte' operation must be a float or int." + if not isinstance(self.comparison_value, (float, int)): + raise FilterError("Comparison value for '$lte' operation must be a float or int.") return {self.field_name: {"$lte": self.comparison_value}} def invert(self) -> "GtOperation": diff --git a/haystack/document_stores/pinecone.py b/haystack/document_stores/pinecone.py index cb797bd7f..89b41f825 100644 --- a/haystack/document_stores/pinecone.py +++ b/haystack/document_stores/pinecone.py @@ -1,16 +1,17 @@ -from typing import TYPE_CHECKING, Union, List, Optional, Dict, Generator +from typing import TYPE_CHECKING, Set, Union, List, Optional, Dict, Generator, Any import logging +from itertools import islice import pinecone import numpy as np from tqdm.auto import tqdm -from haystack.schema import Document -from haystack.document_stores.sql import SQLDocumentStore -from haystack.document_stores.base import get_batches_from_generator +from haystack.schema import Document, Label, Answer, Span +from haystack.document_stores import BaseDocumentStore + from haystack.document_stores.filter_utils import LogicalFilterClause -from haystack.errors import DocumentStoreError +from haystack.errors import PineconeDocumentStoreError, DuplicateDocumentError if TYPE_CHECKING: from haystack.nodes.retriever import BaseRetriever @@ -19,7 +20,13 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -class PineconeDocumentStore(SQLDocumentStore): +def _sanitize_index_name(index: Optional[str]) -> Optional[str]: + if index: + return index.replace("_", "-").lower() + return None + + +class PineconeDocumentStore(BaseDocumentStore): """ Document store for very large scale embedding based dense retrievers like the DPR. This is a hosted document store, this means that your vectors will not be stored locally but in the cloud. This means that the similarity @@ -40,7 +47,6 @@ class PineconeDocumentStore(SQLDocumentStore): self, api_key: str, environment: str = "us-west1-gcp", - sql_url: str = "sqlite:///pinecone_document_store.db", pinecone_index: Optional[pinecone.Index] = None, embedding_dim: int = 768, return_embedding: bool = False, @@ -59,8 +65,6 @@ class PineconeDocumentStore(SQLDocumentStore): :param api_key: Pinecone vector database API key ([https://app.pinecone.io](https://app.pinecone.io)). :param environment: Pinecone cloud environment uses `"us-west1-gcp"` by default. Other GCP and AWS regions are supported, contact Pinecone [here](https://www.pinecone.io/contact/) if required. - :param sql_url: SQL connection URL for database. It defaults to local file based SQLite DB. For large scale - deployment, Postgres is recommended. :param pinecone_index: pinecone-client Index object, an index will be initialized or loaded if not specified. :param embedding_dim: The embedding vector size. :param return_embedding: Whether to return document embeddings. @@ -87,12 +91,17 @@ class PineconeDocumentStore(SQLDocumentStore): created using the config you are using for initialization. Be aware that all data in the old index will be lost if you choose to recreate the index. Be aware that both the document_index and the label_index will be recreated. - :param metadata_config: Which metadata fields should be indexed. Should be in the format - `{"indexed": ["metadata-field-1", "metadata-field-2", "metadata-field-n"]}`. - Indexing metadata fields is a prerequisite to allow filtering of documents by metadata values. - :param validate_index_sync: Whether to check that the document count equals the embedding count at initialization time + :param metadata_config: Which metadata fields should be indexed, part of the + [selective metadata filtering](https://www.pinecone.io/docs/manage-indexes/#selective-metadata-indexing) feature. + Should be in the format `{"indexed": ["metadata-field-1", "metadata-field-2", "metadata-field-n"]}`. By default, + no fields are indexed. """ # Connect to Pinecone server using python client binding + if not api_key: + raise PineconeDocumentStoreError( + "Pinecone requires an API key, please provide one. https://app.pinecone.io" + ) + pinecone.init(api_key=api_key, environment=environment) self._api_key = api_key @@ -109,16 +118,23 @@ class PineconeDocumentStore(SQLDocumentStore): "Please set similarity to one of the above." ) - self.index = index + self.similarity = similarity + self.index: str = self._index_name(index) self.embedding_dim = embedding_dim self.return_embedding = return_embedding self.embedding_field = embedding_field self.progress_bar = progress_bar self.duplicate_documents = duplicate_documents + self.document_namespace = "no-vectors" + self.embedding_namespace = "vectors" # Pinecone index params self.replicas = replicas self.shards = shards + + # Add necessary metadata fields to metadata_config + fields = ["label-id", "query"] + metadata_config["indexed"] += fields self.metadata_config = metadata_config # Initialize dictionary of index connections @@ -126,17 +142,19 @@ class PineconeDocumentStore(SQLDocumentStore): self.return_embedding = return_embedding self.embedding_field = embedding_field + # Initialize dictionary to store temporary set of document IDs + self.all_ids: dict = {} + # Dummy query to be used during searches + self.dummy_query = [0.0] * self.embedding_dim + self.progress_bar = progress_bar - clean_index = self._sanitize_index_name(index) - super().__init__(url=sql_url, index=clean_index, duplicate_documents=duplicate_documents) - if pinecone_index: - self.pinecone_indexes[clean_index] = pinecone_index + self.pinecone_indexes[self.index] = pinecone_index else: - self.pinecone_indexes[clean_index] = self._create_index( + self.pinecone_indexes[self.index] = self._create_index( embedding_dim=self.embedding_dim, - index=clean_index, + index=self.index, metric_type=self.metric_type, replicas=self.replicas, shards=self.shards, @@ -144,11 +162,20 @@ class PineconeDocumentStore(SQLDocumentStore): metadata_config=self.metadata_config, ) - if validate_index_sync: - self._validate_index_sync() + super().__init__() - def _sanitize_index_name(self, index: str) -> str: - return index.replace("_", "-").lower() + def _add_local_ids(self, index: str, ids: list): + """ + Add all document IDs to the set of all IDs. + """ + if index not in self.all_ids: + self.all_ids[index] = set() + self.all_ids[index] = self.all_ids[index].union(set(ids)) + + def _index_name(self, index) -> str: + index = _sanitize_index_name(index) or self.index + # self.index = index # TODO maybe not needed + return index def _create_index( self, @@ -158,18 +185,16 @@ class PineconeDocumentStore(SQLDocumentStore): replicas: Optional[int] = 1, shards: Optional[int] = 1, recreate_index: bool = False, - metadata_config: dict = {}, + metadata_config: dict = {"indexed": []}, ): """ Create a new index for storing documents in case an index with the name doesn't exist already. """ - index = index or self.index - index = self._sanitize_index_name(index) + index = self._index_name(index) if recreate_index: self.delete_index(index) - super().delete_labels() # Skip if already exists if index in self.pinecone_indexes.keys(): @@ -195,17 +220,75 @@ class PineconeDocumentStore(SQLDocumentStore): # return index connection return index_connection - def _validate_index_sync(self): + def get_document_count( + self, + filters: Optional[Dict[str, Union[Dict, List, str, int, float, bool]]] = None, + index: Optional[str] = None, + only_documents_without_embedding: bool = False, + headers: Optional[Dict[str, str]] = None, + ) -> int: """ - This check ensures the correct document database was loaded. If it fails, make sure you provided the same path - to the SQL database as when you created the original Pinecone index. + Return the count of embeddings in the document store. + :param filters: Optional filters to narrow down the documents for which embeddings are to be updated. + Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical + operator (`"$and"`, `"$or"`, `"$not"`), a comparison operator (`"$eq"`, `"$in"`, `"$gt"`, + `"$gte"`, `"$lt"`, `"$lte"`), or a metadata field name. + Logical operator keys take a dictionary of metadata field names or logical operators as + value. Metadata field names take a dictionary of comparison operators as value. Comparison + operator keys take a single value or (in case of `"$in"`) a list of values as value. + If no logical operator is provided, `"$and"` is used as default operation. If no comparison + operator is provided, `"$eq"` (or `"$in"` if the comparison value is a list) is used as default + operation. + __Example__: + ```python + filters = { + "$and": { + "type": {"$eq": "article"}, + "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"}, + "rating": {"$gte": 3}, + "$or": { + "genre": {"$in": ["economy", "politics"]}, + "publisher": {"$eq": "nytimes"} + } + } + } + ``` + :param index: Optional index to use for the query. If not provided, the default index is used. + :param only_documents_without_embedding: If set to `True`, only documents without embeddings are counted. + :param headers: PineconeDocumentStore does not support headers. """ - if not self.get_document_count() == self.get_embedding_count(): - raise DocumentStoreError( - "The number of documents present in the SQL database does not " - "match the number of embeddings in Pinecone. Make sure your Pinecone " - "index aligns to the same database that was used when creating the " - "original index." + if headers: + raise NotImplementedError("PineconeDocumentStore does not support headers.") + + index = self._index_name(index) + if index not in self.pinecone_indexes: + raise PineconeDocumentStoreError( + f"Index named '{index}' does not exist. Try reinitializing PineconeDocumentStore() and running " + f"'update_embeddings()' to create and populate an index." + ) + + pinecone_syntax_filter = LogicalFilterClause.parse(filters).convert_to_pinecone() if filters else None + + stats = self.pinecone_indexes[index].describe_index_stats(filter=pinecone_syntax_filter) + # Document count is total number of vectors across all namespaces (no-vectors + vectors) + count = 0 + for namespace in stats["namespaces"].keys(): + if not (only_documents_without_embedding and "no-vectors" not in namespace): + count += stats["namespaces"][namespace]["vector_count"] + return count + + def _validate_index_sync(self, index: Optional[str] = None): + """ + This check ensures the correct number of documents and embeddings are found in the + Pinecone database. + """ + if self.get_document_count(index=index) != self.get_embedding_count(index=index): + raise PineconeDocumentStoreError( + f"The number of documents present in Pinecone ({self.get_document_count(index=index)}) " + "does not match the number of embeddings in Pinecone " + f" ({self.get_embedding_count(index=index)}). This can happen if a document store " + "instance is deleted during write operations. Call " + "the `update_documents` method to fix it." ) def write_documents( @@ -215,6 +298,7 @@ class PineconeDocumentStore(SQLDocumentStore): batch_size: int = 32, duplicate_documents: Optional[str] = None, headers: Optional[Dict[str, str]] = None, + labels: Optional[bool] = False, ): """ Add new documents to the DocumentStore. @@ -231,13 +315,13 @@ class PineconeDocumentStore(SQLDocumentStore): - `"overwrite"`: Update any existing documents with the same ID when adding documents. - `"fail"`: An error is raised if the document ID of the document being added already exists. :param headers: PineconeDocumentStore does not support headers. + :param labels: Tells us whether these records are labels or not. Defaults to False. :raises DuplicateDocumentError: Exception trigger on duplicate document. """ if headers: raise NotImplementedError("PineconeDocumentStore does not support headers.") - index = index or self.index - index = self._sanitize_index_name(index) + index = self._index_name(index) duplicate_documents = duplicate_documents or self.duplicate_documents assert ( duplicate_documents in self.duplicate_documents_options @@ -251,6 +335,7 @@ class PineconeDocumentStore(SQLDocumentStore): replicas=self.replicas, shards=self.shards, recreate_index=False, + metadata_config=self.metadata_config, ) field_map = self._create_document_field_map() @@ -260,28 +345,69 @@ class PineconeDocumentStore(SQLDocumentStore): ) if len(document_objects) > 0: add_vectors = False if document_objects[0].embedding is None else True + # If these are not labels, we need to find the correct namespace + if not labels: + # If not adding vectors we use document namespace + namespace = self.embedding_namespace if add_vectors else self.document_namespace + else: + namespace = "labels" + if not add_vectors: + # To store documents in Pinecone, we use dummy embeddings (to be replaced with real embeddings later) + embeddings_to_index = np.zeros((batch_size, self.embedding_dim), dtype="float32") + # Convert embeddings to list objects + embeddings = [embed.tolist() if embed is not None else None for embed in embeddings_to_index] with tqdm( total=len(document_objects), disable=not self.progress_bar, position=0, desc="Writing Documents" ) as progress_bar: for i in range(0, len(document_objects), batch_size): - ids = [doc.id for doc in document_objects[i : i + batch_size]] - metadata = [doc.meta for doc in document_objects[i : i + batch_size]] + document_batch = document_objects[i : i + batch_size] + ids = [doc.id for doc in document_batch] + # If duplicate_documents set to skip or fail, we need to check for existing documents + if duplicate_documents in ["skip", "fail"]: + existing_documents = self.get_documents_by_id(ids=ids, index=index, namespace=namespace) + # First check for documents in current batch that exist in the index + if len(existing_documents) > 0: + if duplicate_documents == "skip": + # If we should skip existing documents, we drop the ids that already exist + skip_ids = [doc.id for doc in existing_documents] + # We need to drop the affected document objects from the batch + document_batch = [doc for doc in document_batch if doc.id not in skip_ids] + # Now rebuild the ID list + ids = [doc.id for doc in document_batch] + progress_bar.update(len(skip_ids)) + elif duplicate_documents == "fail": + # Otherwise, we raise an error + raise DuplicateDocumentError( + f"Document ID {existing_documents[0].id} already exists in index {index}" + ) + # Now check for duplicate documents within the batch itself + if len(ids) != len(set(ids)): + if duplicate_documents in "skip": + # We just keep the first instance of each duplicate document + ids = [] + temp_document_batch = [] + for doc in document_batch: + if doc.id not in ids: + ids.append(doc.id) + temp_document_batch.append(doc) + document_batch = temp_document_batch + elif duplicate_documents == "fail": + # Otherwise, we raise an error + raise DuplicateDocumentError(f"Duplicate document IDs found in batch: {ids}") + metadata = [{"content": doc.content, **doc.meta} for doc in document_objects[i : i + batch_size]] if add_vectors: embeddings = [doc.embedding for doc in document_objects[i : i + batch_size]] embeddings_to_index = np.array(embeddings, dtype="float32") - if self.similarity == "cosine": + # Normalize embeddings inplace self.normalize_embedding(embeddings_to_index) # Convert embeddings to list objects - embeddings = [embed.tolist() if embed is not None else None for embed in embeddings] - data_to_write_to_pinecone = zip(ids, embeddings, metadata) - # Metadata fields and embeddings are stored in Pinecone - self.pinecone_indexes[index].upsert(vectors=data_to_write_to_pinecone) - - docs_to_write_to_sql = document_objects[i : i + batch_size] - super(PineconeDocumentStore, self).write_documents( - docs_to_write_to_sql, index=index, duplicate_documents=duplicate_documents - ) + embeddings = [embed.tolist() if embed is not None else None for embed in embeddings_to_index] + data_to_write_to_pinecone = zip(ids, embeddings, metadata) + # Metadata fields and embeddings are stored in Pinecone + self.pinecone_indexes[index].upsert(vectors=data_to_write_to_pinecone, namespace=namespace) + # Add IDs to ID list + self._add_local_ids(index, ids) progress_bar.update(batch_size) progress_bar.close() @@ -334,15 +460,12 @@ class PineconeDocumentStore(SQLDocumentStore): :param batch_size: Number of documents to process at a time. When working with large number of documents, batching can help reduce memory footprint. """ - index = index or self.index - index = self._sanitize_index_name(index) - + index = self._index_name(index) if index not in self.pinecone_indexes: raise ValueError( f"Couldn't find a the index '{index}' in Pinecone. Try to init the " f"PineconeDocumentStore() again ..." ) - document_count = self.get_document_count(index=index, filters=filters) if document_count == 0: logger.warning("Calling DocumentStore.update_embeddings() on an empty index") @@ -350,18 +473,22 @@ class PineconeDocumentStore(SQLDocumentStore): logger.info(f"Updating embeddings for {document_count} docs...") - result = self._query( - index=index, - vector_ids=None, - batch_size=batch_size, - filters=filters, - only_documents_without_embedding=not update_existing_embeddings, + # If the embedding namespace is empty or the user does not want to update existing embeddings, we use document namespace + if self.get_embedding_count(index=index) == 0 or not update_existing_embeddings: + namespace = self.document_namespace + else: + # Else, we use the embedding namespace as this is the primary namespace for embeddings + namespace = self.embedding_namespace + + documents = self.get_all_documents_generator( + index=index, namespace=namespace, filters=filters, return_embedding=False, batch_size=batch_size ) - batched_documents = get_batches_from_generator(result, batch_size) + with tqdm( total=document_count, disable=not self.progress_bar, position=0, unit=" docs", desc="Updating Embedding" ) as progress_bar: - for document_batch in batched_documents: + for _ in range(0, document_count, batch_size): + document_batch = list(islice(documents, batch_size)) embeddings = retriever.embed_documents(document_batch) # type: ignore assert len(document_batch) == len(embeddings) @@ -373,11 +500,16 @@ class PineconeDocumentStore(SQLDocumentStore): metadata = [] ids = [] for doc in document_batch: - metadata.append(doc.meta) + metadata.append({"content": doc.content, **doc.meta}) ids.append(doc.id) - # update existing vectors in pinecone index - self.pinecone_indexes[index].upsert(vectors=zip(ids, embeddings, metadata)) - + # Update existing vectors in pinecone index + self.pinecone_indexes[index].upsert( + vectors=zip(ids, embeddings, metadata), namespace=self.embedding_namespace + ) + # Delete existing vectors from document namespace if they exist there + self.delete_documents(index=index, ids=ids, namespace=self.document_namespace) + # Add these vector IDs to local store + self._add_local_ids(index, ids) progress_bar.set_description_str("Documents Processed") progress_bar.update(batch_size) @@ -388,15 +520,55 @@ class PineconeDocumentStore(SQLDocumentStore): return_embedding: Optional[bool] = None, batch_size: int = 32, headers: Optional[Dict[str, str]] = None, + namespace: Optional[str] = None, ) -> List[Document]: + """ + Retrieves all documents in the index. + :param index: Optional index name to retrieve all documents from. + :param filters: Optional filters to narrow down the documents that will be retrieved. + Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical + operator (`"$and"`, `"$or"`, `"$not"`), a comparison operator (`"$eq"`, `"$in"`, `"$gt"`, + `"$gte"`, `"$lt"`, `"$lte"`) or a metadata field name. + Logical operator keys take a dictionary of metadata field names and/or logical operators as + value. Metadata field names take a dictionary of comparison operators as value. Comparison + operator keys take a single value or (in case of `"$in"`) a list of values as value. + If no logical operator is provided, `"$and"` is used as default operation. If no comparison + operator is provided, `"$eq"` (or `"$in"` if the comparison value is a list) is used as default + operation. + __Example__: + ```python + filters = { + "$and": { + "type": {"$eq": "article"}, + "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"}, + "rating": {"$gte": 3}, + "$or": { + "genre": {"$in": ["economy", "politics"]}, + "publisher": {"$eq": "nytimes"} + } + } + } + ``` + :param return_embedding: Optional flag to return the embedding of the document. + :param batch_size: Number of documents to process at a time. When working with large number of documents, + batching can help reduce memory footprint. + :param headers: Pinecone does not support headers. + :param namespace: Optional namespace to retrieve documents from. + """ if headers: raise NotImplementedError("PineconeDocumentStore does not support headers.") + if namespace is None: + if self.get_embedding_count(index=index) > 0: + namespace = self.embedding_namespace + else: + namespace = self.document_namespace + result = self.get_all_documents_generator( - index=index, filters=filters, return_embedding=return_embedding, batch_size=batch_size + index=index, namespace=namespace, filters=filters, return_embedding=return_embedding, batch_size=batch_size ) - documents = list(result) + documents: List[Document] = list(result) return documents def get_all_documents_generator( @@ -406,6 +578,7 @@ class PineconeDocumentStore(SQLDocumentStore): return_embedding: Optional[bool] = None, batch_size: int = 32, headers: Optional[Dict[str, str]] = None, + namespace: Optional[str] = None, ) -> Generator[Document, None, None]: """ Get all documents from the document store. Under-the-hood, documents are fetched in batches from the @@ -441,22 +614,138 @@ class PineconeDocumentStore(SQLDocumentStore): :param return_embedding: Whether to return the document embeddings. :param batch_size: When working with large number of documents, batching can help reduce memory footprint. :param headers: PineconeDocumentStore does not support headers. + :param namespace: Optional namespace to retrieve documents from. """ if headers: raise NotImplementedError("PineconeDocumentStore does not support headers.") + if return_embedding is None: return_embedding = self.return_embedding - index = index or self.index - index = self._sanitize_index_name(index) - documents = super(PineconeDocumentStore, self).get_all_documents_generator( - index=index, filters=filters, batch_size=batch_size, return_embedding=False - ) + index = self._index_name(index) + if index not in self.pinecone_indexes: + raise PineconeDocumentStoreError( + f"Index named '{index}' does not exist. Try reinitializing PineconeDocumentStore() and running " + f"'update_embeddings()' to create and populate an index." + ) - for doc in documents: - if return_embedding: - self._attach_embedding_to_document(document=doc, index=index) - yield doc + if namespace is None: + if self.get_embedding_count(index=index) > 0: + namespace = self.embedding_namespace + else: + namespace = self.document_namespace + + ids = self._get_all_document_ids(index=index, namespace=namespace, filters=filters, batch_size=batch_size) + for i in range(0, len(ids), batch_size): + i_end = min(len(ids), i + batch_size) + documents = self.get_documents_by_id( + ids=ids[i:i_end], + index=index, + namespace=namespace, + batch_size=batch_size, + return_embedding=return_embedding, + ) + for doc in documents: + yield doc + + def _get_all_document_ids( + self, + index: Optional[str] = None, + namespace: Optional[str] = None, + filters: Optional[Dict[str, Union[Dict, List, str, int, float, bool]]] = None, + batch_size: int = 32, + ) -> List[str]: + index = self._index_name(index) + if index not in self.pinecone_indexes: + raise PineconeDocumentStoreError( + f"Index named '{index}' does not exist. Try reinitializing PineconeDocumentStore() and running " + f"'update_embeddings()' to create and populate an index." + ) + + if namespace is None: + if self.get_embedding_count(index=index) > 0: + namespace = self.embedding_namespace + else: + namespace = self.document_namespace + + document_count = self.get_document_count(index=index) + + if index not in self.all_ids: + self.all_ids[index] = set() + if len(self.all_ids[index]) == document_count and filters is None: + # We have all of the IDs and don't need to extract from Pinecone + return list(self.all_ids[index]) + else: + # Otherwise we must query and extract IDs from the original namespace, then move the retrieved embeddings + # to a temporary namespace and query again for new items. We repeat this process until all embeddings + # have been retrieved. + target_namespace = f"{namespace}-copy" + all_ids: Set[str] = set() + vector_id_matrix = ["dummy-id"] + with tqdm( + total=document_count, disable=not self.progress_bar, position=0, unit=" ids", desc="Retrieving IDs" + ) as progress_bar: + while len(vector_id_matrix) != 0: + # Retrieve IDs from Pinecone + vector_id_matrix = self._get_ids( + index=index, namespace=namespace, batch_size=batch_size, filters=filters + ) + # Save IDs + all_ids = all_ids.union(set(vector_id_matrix)) + # Move these IDs to new namespace + self._move_documents_by_id_namespace( + ids=vector_id_matrix, + index=index, + source_namespace=namespace, + target_namespace=target_namespace, + batch_size=batch_size, + ) + progress_bar.set_description_str("Retrieved IDs") + progress_bar.update(len(set(vector_id_matrix))) + # Now move all documents back to source namespace + self._namespace_cleanup(index) + self._add_local_ids(index, list(all_ids)) + return list(all_ids) + + def _move_documents_by_id_namespace( + self, + ids: List[str], + index: Optional[str] = None, + source_namespace: Optional[str] = "vectors", + target_namespace: Optional[str] = "copy", + batch_size: int = 32, + ): + index = self._index_name(index) + if index not in self.pinecone_indexes: + raise PineconeDocumentStoreError( + f"Index named '{index}' does not exist. Try reinitializing PineconeDocumentStore() and running " + f"'update_embeddings()' to create and populate an index." + ) + + if source_namespace == target_namespace: + raise PineconeDocumentStoreError( + f"Source namespace '{source_namespace}' cannot be the same as target namespace '{target_namespace}'." + ) + with tqdm( + total=len(ids), disable=not self.progress_bar, position=0, unit=" docs", desc="Moving Documents" + ) as progress_bar: + for i in range(0, len(ids), batch_size): + i_end = min(len(ids), i + batch_size) + # TODO if i == i_end: + # break + id_batch = ids[i:i_end] + # Retrieve documents from source_namespace + result = self.pinecone_indexes[index].fetch(ids=id_batch, namespace=source_namespace) + vector_id_matrix = result["vectors"].keys() + meta_matrix = [result["vectors"][_id]["metadata"] for _id in vector_id_matrix] + embedding_matrix = [result["vectors"][_id]["values"] for _id in vector_id_matrix] + data_to_write_to_pinecone = list(zip(vector_id_matrix, embedding_matrix, meta_matrix)) + # Store metadata nd embeddings in new target_namespace + self.pinecone_indexes[index].upsert(vectors=data_to_write_to_pinecone, namespace=target_namespace) + # Delete vectors from source_namespace + self.delete_documents(index=index, ids=ids[i:i_end], namespace=source_namespace, drop_ids=False) + progress_bar.set_description_str("Documents Moved") + progress_bar.update(len(id_batch)) def get_documents_by_id( self, @@ -465,7 +754,19 @@ class PineconeDocumentStore(SQLDocumentStore): batch_size: int = 32, headers: Optional[Dict[str, str]] = None, return_embedding: Optional[bool] = None, + namespace: str = None, ) -> List[Document]: + """ + Retrieves all documents in the index using their IDs. + + :param ids: List of IDs to retrieve. + :param index: Optional index name to retrieve all documents from. + :param batch_size: Number of documents to retrieve at a time. When working with large number of documents, + batching can help reduce memory footprint. + :param headers: Pinecone does not support headers. + :param return_embedding: Optional flag to return the embedding of the document. + :param namespace: Optional namespace to retrieve documents from. + """ if headers: raise NotImplementedError("PineconeDocumentStore does not support headers.") @@ -473,47 +774,121 @@ class PineconeDocumentStore(SQLDocumentStore): if return_embedding is None: return_embedding = self.return_embedding - index = index or self.index - index = self._sanitize_index_name(index) + if namespace is None: + if self.get_embedding_count(index=index) > 0: + namespace = self.embedding_namespace + else: + namespace = self.document_namespace - documents = super().get_documents_by_id(ids=ids, index=index, batch_size=batch_size) - if return_embedding: - for doc in documents: - self._attach_embedding_to_document(document=doc, index=index) + index = self._index_name(index) + if index not in self.pinecone_indexes: + raise PineconeDocumentStoreError( + f"Index named '{index}' does not exist. Try reinitializing PineconeDocumentStore() and running " + f"'update_embeddings()' to create and populate an index." + ) + + documents = [] + for i in range(0, len(ids), batch_size): + i_end = min(len(ids), i + batch_size) + id_batch = ids[i:i_end] + result = self.pinecone_indexes[index].fetch(ids=id_batch, namespace=namespace) + + vector_id_matrix = [] + meta_matrix = [] + embedding_matrix = [] + for _id in result["vectors"].keys(): + vector_id_matrix.append(_id) + meta_matrix.append(result["vectors"][_id]["metadata"]) + if return_embedding: + embedding_matrix.append(result["vectors"][_id]["values"]) + if return_embedding: + values = embedding_matrix + else: + values = None + document_batch = self._get_documents_by_meta( + vector_id_matrix, meta_matrix, values=values, index=index, return_embedding=return_embedding + ) + documents.extend(document_batch) return documents + def get_document_by_id( + self, + id: str, + index: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, + return_embedding: Optional[bool] = None, + namespace: str = None, + ) -> Document: + """ + Returns a single Document retrieved using an ID. + + :param id: ID string to retrieve. + :param index: Optional index name to retrieve all documents from. + :param headers: Pinecone does not support headers. + :param return_embedding: Optional flag to return the embedding of the document. + :param namespace: Optional namespace to retrieve documents from. + """ + documents = self.get_documents_by_id( + ids=[id], namespace=namespace, index=index, headers=headers, return_embedding=return_embedding + ) + return documents[0] + def get_embedding_count( self, index: Optional[str] = None, filters: Optional[Dict[str, Union[Dict, List, str, int, float, bool]]] = None ) -> int: """ Return the count of embeddings in the document store. + + :param index: Optional index name to retrieve all documents from. + :param filters: Filters are not supported for `get_embedding_count` in Pinecone. """ if filters: raise NotImplementedError("Filters are not supported for get_embedding_count in PineconeDocumentStore") - index = index or self.index - index = self._sanitize_index_name(index) - if not self.pinecone_indexes.get(index, False): - raise ValueError(f"No index named {index} found in Pinecone.") + index = self._index_name(index) + if index not in self.pinecone_indexes: + raise PineconeDocumentStoreError( + f"Index named '{index}' does not exist. Try reinitializing PineconeDocumentStore() and running " + f"'update_embeddings()' to create and populate an index." + ) stats = self.pinecone_indexes[index].describe_index_stats() - # if no namespace return zero - count = stats["namespaces"][""]["vector_count"] if "" in stats["namespaces"] else 0 + # if no embeddings namespace return zero + if self.embedding_namespace in stats["namespaces"]: + count = stats["namespaces"][self.embedding_namespace]["vector_count"] + else: + count = 0 return count - def update_document_meta(self, id: str, meta: Dict[str, str], index: str = None): + def update_document_meta(self, id: str, meta: Dict[str, str], namespace: str = None, index: str = None): # type: ignore """ - Update the metadata dictionary of a document by specifying its string id - """ - index = index or self.index - index = self._sanitize_index_name(index) - if index in self.pinecone_indexes: - doc = self.get_documents_by_id(ids=[id], index=index, return_embedding=True)[0] - if doc.embedding is not None: - self.pinecone_indexes[index].upsert(vectors=([id], [doc.embedding.tolist()], [meta])) + Update the metadata dictionary of a document by specifying its string ID. - super().update_document_meta(id=id, meta=meta, index=index) + :param id: ID of the Document to update. + :param meta: Dictionary of new metadata. + :param namespace: Optional namespace to update documents from. If not specified, defaults to the embedding + namespace (vectors) if it exists, otherwise the document namespace (no-vectors). + :param index: Optional index name to update documents from. + """ + + index = self._index_name(index) + if index not in self.pinecone_indexes: + raise PineconeDocumentStoreError( + f"Index named '{index}' does not exist. Try reinitializing PineconeDocumentStore() and running " + f"'update_embeddings()' to create and populate an index." + ) + + if namespace is None: + if self.get_embedding_count(index=index) > 0: + namespace = self.embedding_namespace + else: + namespace = self.document_namespace + + doc = self.get_documents_by_id(ids=[id], index=index, return_embedding=True)[0] + if doc.embedding is not None: + meta = {"content": doc.content, **meta} + self.pinecone_indexes[index].upsert(vectors=[(id, doc.embedding.tolist(), meta)], namespace=namespace) def delete_documents( self, @@ -521,6 +896,8 @@ class PineconeDocumentStore(SQLDocumentStore): ids: Optional[List[str]] = None, filters: Optional[Dict[str, Union[Dict, List, str, int, float, bool]]] = None, headers: Optional[Dict[str, str]] = None, + drop_ids: Optional[bool] = True, + namespace: Optional[str] = None, ): """ Delete documents from the document store. @@ -528,6 +905,8 @@ class PineconeDocumentStore(SQLDocumentStore): :param index: Index name to delete the documents from. If `None`, the DocumentStore's default index (`self.index`) will be used. :param ids: Optional list of IDs to narrow down the documents to be deleted. + :param namespace: Optional namespace string. By default, it deletes vectors from the embeddings namespace + unless the namespace is empty, in which case it deletes from the documents namespace. :param filters: Optional filters to narrow down the documents for which embeddings are to be updated. Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical operator (`"$and"`, `"$or"`, `"$not"`), a comparison operator (`"$eq"`, `"$in"`, `"$gt"`, @@ -553,24 +932,51 @@ class PineconeDocumentStore(SQLDocumentStore): } ``` :param headers: PineconeDocumentStore does not support headers. + :param drop_ids: Specifies if the locally stored IDs should be deleted. The default + is True. + :param namespace: Optional namespace to delete documents from. If not specified, defaults to the embedding + namespace (vectors) if it exists, otherwise the document namespace (no-vectors). + :return None: """ if headers: raise NotImplementedError("PineconeDocumentStore does not support headers.") - index = index or self.index - index = self._sanitize_index_name(index) - if index in self.pinecone_indexes: - if ids is None and filters is None: - self.pinecone_indexes[index].delete(delete_all=True) + if namespace is None: + if self.get_embedding_count(index=index) > 0: + namespace = self.embedding_namespace else: - affected_docs = self.get_all_documents(filters=filters, return_embedding=False) - if ids: - affected_docs = [doc for doc in affected_docs if doc.id in ids] + namespace = self.document_namespace - doc_ids = [doc.id for doc in affected_docs] - self.pinecone_indexes[index].delete(ids=doc_ids) + index = self._index_name(index) + if index not in self.pinecone_indexes: + raise PineconeDocumentStoreError( + f"Index named '{index}' does not exist. Try reinitializing PineconeDocumentStore() and running " + f"'update_embeddings()' to create and populate an index." + ) - super().delete_documents(index=index, ids=ids, filters=filters) + pinecone_syntax_filter = LogicalFilterClause.parse(filters).convert_to_pinecone() if filters else None + + if ids is None and pinecone_syntax_filter is None: + # If no filters or IDs we delete everything + self.pinecone_indexes[index].delete(delete_all=True, namespace=namespace) + id_values = list(self.all_ids[index]) + else: + if ids is None: + # In this case we identify all IDs that satisfy the filter condition + id_values = self._get_all_document_ids(index=index, namespace=namespace, filters=pinecone_syntax_filter) + else: + id_values = ids + if pinecone_syntax_filter: + # We must first identify the IDs that satisfy the filter condition + docs = self.get_all_documents(index=index, namespace=namespace, filters=pinecone_syntax_filter) + filter_ids = [doc.id for doc in docs] + # Find the intersect + id_values = list(set(id_values).intersection(set(filter_ids))) + if len(id_values) > 0: + # Now we delete + self.pinecone_indexes[index].delete(ids=id_values, namespace=namespace) + if drop_ids: + self.all_ids[index] = self.all_ids[index].difference(set(id_values)) def delete_index(self, index: str): """ @@ -579,13 +985,14 @@ class PineconeDocumentStore(SQLDocumentStore): :param index: The name of the index to delete. :return: None """ - index = self._sanitize_index_name(index) + index = self._index_name(index) if index in pinecone.list_indexes(): pinecone.delete_index(index) logger.info(f"Index '{index}' deleted.") if index in self.pinecone_indexes: del self.pinecone_indexes[index] - super().delete_index(index) + if index in self.all_ids: + self.all_ids[index] = set() def query_by_embedding( self, @@ -596,6 +1003,7 @@ class PineconeDocumentStore(SQLDocumentStore): return_embedding: Optional[bool] = None, headers: Optional[Dict[str, str]] = None, scale_score: bool = True, + namespace: Optional[str] = None, ) -> List[Document]: """ Find the document that is most similar to the provided `query_emb` by using a vector similarity metric. @@ -676,14 +1084,11 @@ class PineconeDocumentStore(SQLDocumentStore): return_embedding = self.return_embedding self._limit_check(top_k, include_values=return_embedding) - if filters: - filters = LogicalFilterClause.parse(filters).convert_to_pinecone() - - index = index or self.index - index = self._sanitize_index_name(index) + pinecone_syntax_filter = LogicalFilterClause.parse(filters).convert_to_pinecone() if filters else None + index = self._index_name(index) if index not in self.pinecone_indexes: - raise DocumentStoreError( + raise PineconeDocumentStoreError( f"Index named '{index}' does not exist. Try reinitializing PineconeDocumentStore() and running " f"'update_embeddings()' to create and populate an index." ) @@ -692,18 +1097,39 @@ class PineconeDocumentStore(SQLDocumentStore): if self.similarity == "cosine": self.normalize_embedding(query_emb) - res = self.pinecone_indexes[index].query(query_emb.tolist(), top_k=top_k, include_values=False, filter=filters) + if namespace is None: + namespace = self.embedding_namespace + + res = self.pinecone_indexes[index].query( + query_emb.tolist(), + namespace=namespace, + top_k=top_k, + include_values=return_embedding, + include_metadata=True, + filter=pinecone_syntax_filter, + ) score_matrix = [] vector_id_matrix = [] + meta_matrix = [] + embedding_matrix = [] for match in res["matches"]: score_matrix.append(match["score"]) vector_id_matrix.append(match["id"]) - documents = self.get_documents_by_id(vector_id_matrix, index=index, return_embedding=return_embedding) + meta_matrix.append(match["metadata"]) + if return_embedding: + embedding_matrix.append(match["values"]) + if return_embedding: + values = embedding_matrix + else: + values = None + documents = self._get_documents_by_meta( + vector_id_matrix, meta_matrix, values=values, index=index, return_embedding=return_embedding + ) # assign query score to each document scores_for_vector_ids: Dict[str, float] = {str(v_id): s for v_id, s in zip(vector_id_matrix, score_matrix)} - for i, doc in enumerate(documents): + for doc in documents: score = scores_for_vector_ids[doc.id] if scale_score: score = self.scale_to_unit_interval(score, self.similarity) @@ -711,12 +1137,55 @@ class PineconeDocumentStore(SQLDocumentStore): return documents - def _attach_embedding_to_document(self, document: Document, index: str): + def _get_documents_by_meta( + self, + ids: List[str], + metadata: List[dict], + values: Optional[List[List[float]]] = None, + namespace: Optional[str] = None, + index: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, + return_embedding: Optional[bool] = None, + ) -> List[Document]: + + if headers: + raise NotImplementedError("PineconeDocumentStore does not support headers.") + + if return_embedding is None: + return_embedding = self.return_embedding + + if namespace is None: + if self.get_embedding_count(index=index) > 0: + namespace = self.embedding_namespace + else: + namespace = self.document_namespace + + index = self._index_name(index) + + # extract ID, content, and metadata to create Documents + documents = [] + for _id, meta in zip(ids, metadata): + content = meta.pop("content") + doc = Document(id=_id, content=content, meta=meta) + documents.append(doc) + if return_embedding: + if values is None: + # If no embedding values are provided, we must request the embeddings from Pinecone + for doc in documents: + self._attach_embedding_to_document(document=doc, index=index, namespace=namespace) + else: + # If embedding values are given, we just add + for doc, embedding in zip(documents, values): + doc.embedding = np.asarray(embedding, dtype=np.float32) + + return documents + + def _attach_embedding_to_document(self, document: Document, index: str, namespace: str): """ Fetches the Document's embedding from the specified Pinecone index and attaches it to the Document's embedding field. """ - result = self.pinecone_indexes[index].fetch(ids=[document.id]) + result = self.pinecone_indexes[index].fetch(ids=[document.id], namespace=namespace) if result["vectors"].get(document.id, False): embedding = result["vectors"][document.id].get("values", None) document.embedding = np.asarray(embedding, dtype=np.float32) @@ -727,20 +1196,343 @@ class PineconeDocumentStore(SQLDocumentStore): """ if include_values: if top_k > self.top_k_limit_vectors: - raise DocumentStoreError( + raise PineconeDocumentStoreError( f"PineconeDocumentStore allows requests of no more than {self.top_k_limit_vectors} records " f"when returning embedding values. This request is attempting to return {top_k} records." ) else: if top_k > self.top_k_limit: - raise DocumentStoreError( + raise PineconeDocumentStoreError( f"PineconeDocumentStore allows requests of no more than {self.top_k_limit} records. " f"This request is attempting to return {top_k} records." ) + def _list_namespaces(self, index: str) -> List[str]: + """ + Returns a list of namespaces. + """ + res = self.pinecone_indexes[index].describe_index_stats() + namespaces = res["namespaces"].keys() + return namespaces + + def _check_exists(self, id: str, index: str, namespace: str) -> bool: + """ + Checks if the specified ID exists in the specified index and namespace. + """ + res = self.pinecone_indexes[index].fetch(ids=[id], namespace=namespace) + return bool(res["vectors"].get(id, False)) + + def _namespace_cleanup(self, index: str, batch_size: int = 32): + """ + Searches for any "-copy" namespaces and shifts vectors back to the original namespace. + """ + namespaces = self._list_namespaces(index) + namespaces = [name for name in namespaces if name[-5:] == "-copy"] + + with tqdm( + total=len(namespaces), + disable=not self.progress_bar, + position=0, + unit=" namespaces", + desc="Cleaning Namespace", + ) as progress_bar: + for namespace in namespaces: + target_namespace = namespace[:-5] + while True: + # Retrieve IDs from Pinecone + vector_id_matrix = self._get_ids(index=index, namespace=namespace, batch_size=batch_size) + # Once we reach final item, we break + if len(vector_id_matrix) == 0: + break + # Move these IDs to new namespace + self._move_documents_by_id_namespace( + ids=vector_id_matrix, + index=index, + source_namespace=namespace, + target_namespace=target_namespace, + batch_size=batch_size, + ) + progress_bar.set_description_str("Cleaned Namespace") + progress_bar.update(1) + + def _get_ids( + self, + index: str, + namespace: str, + batch_size: int = 32, + filters: Optional[Dict[str, Union[Dict, List, str, int, float, bool]]] = None, + ) -> List[str]: + """ + Retrieves a list of IDs that satisfy a particular filter condition (or any) using + a dummy query embedding. + """ + pinecone_syntax_filter = LogicalFilterClause.parse(filters).convert_to_pinecone() if filters else None + + # Retrieve embeddings from Pinecone + try: + res = self.pinecone_indexes[index].query( + self.dummy_query, + namespace=namespace, + top_k=batch_size, + include_values=False, + include_metadata=False, + filter=pinecone_syntax_filter, + ) + except pinecone.ApiException as e: + raise PineconeDocumentStoreError( + f"The API returned an exception.\nReason: {e.reason}\nHeaders: {e.headers}\nBody: {e.body}" + ) from e + + ids = [] + for match in res["matches"]: + ids.append(match["id"]) + return ids + @classmethod def load(cls): """ - Default class method used for loading indexes. Not applicable to the PineconeDocumentStore. + Default class method used for loading indexes. Not applicable to PineconeDocumentStore. """ raise NotImplementedError("load method not supported for PineconeDocumentStore") + + def _meta_for_pinecone(self, meta: Dict[str, Any]) -> Dict[str, Any]: + """ + Converts the meta dictionary to a format that can be stored in Pinecone. + """ + # Replace any None values with empty strings + for key, value in meta.items(): + if value is None: + meta[key] = "" + return meta + + def _pinecone_meta_format(self, meta: Dict[str, Any]) -> Dict[str, Any]: + """ + Converts the meta extracted from Pinecone into a better format for Python. + """ + # Replace any empty strings with None values + for key, value in meta.items(): + if value == "": + meta[key] = None + return meta + + def _label_to_meta(self, labels: list) -> dict: + """ + Converts a list of labels to a dictionary of ID: metadata mappings. + """ + metadata = {} + for label in labels: + # Get main labels data + meta = { + "label-id": label.id, + "query": label.query, + "label-answer-answer": label.answer.answer, + "label-answer-type": label.answer.type, + "label-answer-score": label.answer.score, + "label-answer-context": label.answer.context, + "label-answer-document-id": label.answer.document_id, + "label-is-correct-answer": label.is_correct_answer, + "label-is-correct-document": label.is_correct_document, + "label-document-content": label.document.content, + "label-document-id": label.document.id, + "label-no-answer": label.no_answer, + "label-origin": label.origin, + "label-created-at": label.created_at, + "label-updated-at": label.updated_at, + "label-pipeline-id": label.pipeline_id, + } + # Get offset data + if label.answer.offsets_in_document: + meta["label-answer-offsets-in-document-start"] = label.answer.offsets_in_document[0].start + meta["label-answer-offsets-in-document-end"] = label.answer.offsets_in_document[0].end + else: + meta["label-answer-offsets-in-document-start"] = None + meta["label-answer-offsets-in-document-end"] = None + if label.answer.offsets_in_context: + meta["label-answer-offsets-in-context-start"] = label.answer.offsets_in_context[0].start + meta["label-answer-offsets-in-context-end"] = label.answer.offsets_in_context[0].end + else: + meta["label-answer-offsets-in-context-start"] = None + meta["label-answer-offsets-in-context-end"] = None + metadata[label.id] = meta + metadata = self._meta_for_pinecone(metadata) + return metadata + + def _meta_to_labels(self, documents: List[Document]) -> List[Label]: + """ + Converts a list of metadata dictionaries to a list of Labels. + """ + labels = [] + for doc in documents: + label_meta = {k: v for k, v in doc.meta.items() if k[:6] == "label-" or k == "query"} + other_meta = {k: v for k, v in doc.meta.items() if k[:6] != "label-" and k != "query"} + # Create document + doc = Document( + id=label_meta["label-document-id"], + content=doc.content, + meta=other_meta, + score=doc.score, + embedding=doc.embedding, + ) + # Extract offsets + offsets: Dict[str, Optional[List[Span]]] = {"document": None, "context": None} + for mode in offsets.keys(): + if label_meta[f"label-answer-offsets-in-{mode}-start"] is not None: + offsets[mode] = [ + Span( + label_meta[f"label-answer-offsets-in-{mode}-start"], + label_meta[f"label-answer-offsets-in-{mode}-end"], + ) + ] + # if label_meta["label-answer-answer"] is None: + # label_meta["label-answer-answer"] = "" + answer = Answer( + answer=label_meta["label-answer-answer"] + or "", # If we leave as None a schema validation error will be thrown + type=label_meta["label-answer-type"], + score=label_meta["label-answer-score"], + context=label_meta["label-answer-context"], + offsets_in_document=offsets["document"], + offsets_in_context=offsets["context"], + document_id=label_meta["label-answer-document-id"], + meta=other_meta, + ) + label = Label( + id=label_meta["label-id"], + query=label_meta["query"], + document=doc, + answer=answer, + no_answer=label_meta["label-no-answer"], + pipeline_id=label_meta["label-pipeline-id"], + created_at=label_meta["label-created-at"], + updated_at=label_meta["label-updated-at"], + is_correct_answer=label_meta["label-is-correct-answer"], + is_correct_document=label_meta["label-is-correct-document"], + origin=label_meta["label-origin"], + meta={}, + filters=None, + ) + labels.append(label) + return labels + + def delete_labels( + self, + index: Optional[str] = None, + ids: Optional[List[str]] = None, + filters: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, str]] = None, + batch_size: int = 32, + ): + """ + Default class method used for deleting labels. Not supported by PineconeDocumentStore. + """ + index = self._index_name(index) + if index not in self.pinecone_indexes: + raise PineconeDocumentStoreError( + f"Index named '{index}' does not exist. Try reinitializing PineconeDocumentStore() and running " + f"'update_embeddings()' to create and populate an index." + ) + + pinecone_syntax_filter = LogicalFilterClause.parse(filters).convert_to_pinecone() if filters else None + + i = 0 + dummy_query = np.asarray(self.dummy_query) + # Set label namespace + namespace = "labels" + + while True: + if ids is None: + # Iteratively upsert new records without the labels metadata + docs = self.query_by_embedding( + dummy_query, + filters=pinecone_syntax_filter, + top_k=batch_size, + index=index, + return_embedding=True, + namespace=namespace, + ) + update_ids = [doc.id for doc in docs] + else: + i_end = min(i + batch_size, len(ids)) + update_ids = ids[i:i_end] + if pinecone_syntax_filter: + pinecone_syntax_filter["label-id"] = {"$in": update_ids} + else: + pinecone_syntax_filter = {"label-id": {"$in": update_ids}} + # Retrieve embeddings and metadata for the batch of documents + docs = self.query_by_embedding( + dummy_query, + filters=pinecone_syntax_filter, + top_k=batch_size, + index=index, + return_embedding=True, + namespace=namespace, + ) + # Apply filter to update IDs, finding intersection + update_ids = list(set(update_ids).intersection({doc.id for doc in docs})) + i = i_end + if len(update_ids) == 0: + break + # Delete the documents + self.delete_documents(ids=update_ids, index=index, namespace=namespace) + + def get_all_labels(self, index=None, filters: Optional[dict] = None, headers: Optional[Dict[str, str]] = None): + """ + Default class method used for getting all labels. + """ + index = self._index_name(index) + if index not in self.pinecone_indexes: + raise PineconeDocumentStoreError( + f"Index named '{index}' does not exist. Try reinitializing PineconeDocumentStore() and running " + f"'update_embeddings()' to create and populate an index." + ) + + documents = self.get_all_documents(index=index, filters=filters, headers=headers, namespace="labels") + for doc in documents: + doc.meta = self._pinecone_meta_format(doc.meta) + labels = self._meta_to_labels(documents) + return labels + + def get_label_count(self, index: Optional[str] = None, headers: Optional[Dict[str, str]] = None): + """ + Default class method used for counting labels. Not supported by PineconeDocumentStore. + """ + raise NotImplementedError("Labels are not supported by PineconeDocumentStore.") + + def write_labels(self, labels, index=None, headers: Optional[Dict[str, str]] = None): + """ + Default class method used for writing labels. + """ + index = self._index_name(index) + if index not in self.pinecone_indexes: + self.pinecone_indexes[index] = self._create_index( + embedding_dim=self.embedding_dim, + index=index, + metric_type=self.metric_type, + replicas=self.replicas, + shards=self.shards, + recreate_index=False, + metadata_config=self.metadata_config, + ) + + # Convert Label objects to dictionary of metadata + metadata = self._label_to_meta(labels) + ids = list(metadata.keys()) + # Set label namespace + namespace = "labels" + # Check if vectors exist in the namespace + existing_documents = self.get_documents_by_id(ids=ids, index=index, namespace=namespace, return_embedding=True) + if len(existing_documents) != 0: + # If they exist, we loop through and partial update their metadata with the new labels + existing_ids = [doc.id for doc in existing_documents] + for _id in existing_ids: + meta = self._meta_for_pinecone(metadata[_id]) + self.pinecone_indexes[index].update(id=_id, set_metadata=meta, namespace=namespace) + # After update, we delete the ID from the metadata list + del metadata[_id] + # If there are any remaining IDs, we create new documents with the remaining metadata + if len(metadata) != 0: + documents = [] + for _id, meta in metadata.items(): + metadata[_id] = self._meta_for_pinecone(meta) + documents.append(Document(id=_id, content=meta["label-document-content"], meta=meta)) + self.write_documents(documents, index=index, labels=True) diff --git a/haystack/errors.py b/haystack/errors.py index bc81faf0f..3b471b2e8 100644 --- a/haystack/errors.py +++ b/haystack/errors.py @@ -76,6 +76,20 @@ class DocumentStoreError(HaystackError): super().__init__(message=message) +class FilterError(DocumentStoreError): + """Exception for issues that occur building complex filters""" + + def __init__(self, message: Optional[str] = None): + super().__init__(message=message) + + +class PineconeDocumentStoreError(DocumentStoreError): + """Exception for issues that occur in a Pinecone document store""" + + def __init__(self, message: Optional[str] = None): + super().__init__(message=message) + + class DuplicateDocumentError(DocumentStoreError, ValueError): """Exception for Duplicate document""" diff --git a/haystack/json-schemas/haystack-pipeline-master.schema.json b/haystack/json-schemas/haystack-pipeline-master.schema.json index 06a6ca15d..1277ca5ba 100644 --- a/haystack/json-schemas/haystack-pipeline-master.schema.json +++ b/haystack/json-schemas/haystack-pipeline-master.schema.json @@ -1700,11 +1700,6 @@ "default": "us-west1-gcp", "type": "string" }, - "sql_url": { - "title": "Sql Url", - "default": "sqlite:///pinecone_document_store.db", - "type": "string" - }, "pinecone_index": { "title": "Pinecone Index", "default": null, diff --git a/pyproject.toml b/pyproject.toml index 95cd54170..3e44f7f2c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -128,7 +128,7 @@ weaviate = [ "weaviate-client==3.6.0", ] only-pinecone = [ - "pinecone-client", + "pinecone-client>=2.0.11,<3", ] pinecone = [ "farm-haystack[sql,only-pinecone]", diff --git a/test/conftest.py b/test/conftest.py index 84463e834..9524d4223 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -171,7 +171,7 @@ def pytest_collection_modifyitems(config, items): "pinecone", "opensearch", ]: - if cur_doc_store in keywords and cur_doc_store not in document_store_types_to_run: + if keywords and cur_doc_store in keywords and cur_doc_store not in document_store_types_to_run: skip_docstore = pytest.mark.skip( reason=f'{cur_doc_store} is disabled. Enable via pytest --document_store_type="{cur_doc_store}"' ) @@ -180,15 +180,11 @@ def pytest_collection_modifyitems(config, items): if "milvus1" in keywords and not milvus1: skip_milvus1 = pytest.mark.skip(reason="Skipping Tests for 'milvus1', as Milvus2 seems to be installed.") item.add_marker(skip_milvus1) + elif "milvus" in keywords and milvus1: skip_milvus = pytest.mark.skip(reason="Skipping Tests for 'milvus', as Milvus1 seems to be installed.") item.add_marker(skip_milvus) - # Skip PineconeDocumentStore if PINECONE_API_KEY not in environment variables - # if not os.environ.get("PINECONE_API_KEY", False) and "pinecone" in keywords: - # skip_pinecone = pytest.mark.skip(reason="PINECONE_API_KEY not in environment variables.") - # item.add_marker(skip_pinecone) - # # Empty mocks, as a base for unit tests. @@ -987,7 +983,7 @@ def get_document_store( elif document_store_type == "pinecone": document_store = PineconeDocumentStore( - api_key=os.environ.get("PINECONE_API_KEY"), + api_key=os.environ.get("PINECONE_API_KEY") or "fake-haystack-test-key", embedding_dim=embedding_dim, embedding_field=embedding_field, index=index, diff --git a/test/document_stores/test_document_store.py b/test/document_stores/test_document_store.py index a7b11cfca..5db4f6163 100644 --- a/test/document_stores/test_document_store.py +++ b/test/document_stores/test_document_store.py @@ -209,25 +209,25 @@ def test_get_all_documents_large_quantities(document_store: BaseDocumentStore): def test_get_all_document_filter_duplicate_text_value(document_store: BaseDocumentStore): documents = [ - Document(content="Doc1", meta={"f1": "0"}, id_hash_keys=["meta"]), - Document(content="Doc1", meta={"f1": "1", "meta_id": "0"}, id_hash_keys=["meta"]), - Document(content="Doc2", meta={"f3": "0"}, id_hash_keys=["meta"]), + Document(content="Doc1", meta={"meta_field": "0"}, id_hash_keys=["meta"]), + Document(content="Doc1", meta={"meta_field": "1", "name": "file.txt"}, id_hash_keys=["meta"]), + Document(content="Doc2", meta={"name": "file_2.txt"}, id_hash_keys=["meta"]), ] document_store.write_documents(documents) - documents = document_store.get_all_documents(filters={"f1": ["1"]}) + documents = document_store.get_all_documents(filters={"meta_field": ["1"]}) assert documents[0].content == "Doc1" assert len(documents) == 1 - assert {d.meta["meta_id"] for d in documents} == {"0"} + assert {d.meta["name"] for d in documents} == {"file.txt"} - documents = document_store.get_all_documents(filters={"f1": ["0"]}) + documents = document_store.get_all_documents(filters={"meta_field": ["0"]}) assert documents[0].content == "Doc1" assert len(documents) == 1 - assert documents[0].meta.get("meta_id") is None + assert documents[0].meta.get("name") is None - documents = document_store.get_all_documents(filters={"f3": ["0"]}) + documents = document_store.get_all_documents(filters={"name": ["file_2.txt"]}) assert documents[0].content == "Doc2" assert len(documents) == 1 - assert documents[0].meta.get("meta_id") is None + assert documents[0].meta.get("meta_field") is None def test_get_all_documents_with_correct_filters(document_store_with_docs): @@ -266,9 +266,8 @@ def test_get_all_documents_with_incorrect_filter_value(document_store_with_docs) assert len(documents) == 0 -@pytest.mark.parametrize( - "document_store_with_docs", ["elasticsearch", "sql", "weaviate", "memory", "pinecone"], indirect=True -) +# See test_pinecone.py +@pytest.mark.parametrize("document_store_with_docs", ["elasticsearch", "sql", "weaviate", "memory"], indirect=True) def test_extended_filter(document_store_with_docs): # Test comparison operators individually documents = document_store_with_docs.get_all_documents(filters={"meta_field": {"$eq": "test1"}}) diff --git a/test/document_stores/test_pinecone.py b/test/document_stores/test_pinecone.py new file mode 100644 index 000000000..6419fdad8 --- /dev/null +++ b/test/document_stores/test_pinecone.py @@ -0,0 +1,337 @@ +from typing import List, Union, Dict, Any + +import os +from datetime import datetime +from inspect import getmembers, isclass, isfunction + +import pytest + +from haystack.document_stores.pinecone import PineconeDocumentStore +from haystack.schema import Document +from haystack.errors import FilterError + + +from ..mocks import pinecone as pinecone_mock +from ..conftest import SAMPLES_PATH + + +# Set metadata fields used during testing for PineconeDocumentStore meta_config +META_FIELDS = ["meta_field", "name", "date", "numeric_field", "odd_document"] + + +# +# FIXME This class should extend the base Document Store test class once it exists. +# At that point some of the fixtures will be duplicate, so review them. +# +class TestPineconeDocumentStore: + + # Fixtures + + @pytest.fixture + def doc_store(self, monkeypatch, request) -> PineconeDocumentStore: + """ + This fixture provides an empty document store and takes care of cleaning up after each test + """ + # If it's a unit test, mock Pinecone + if not "integration" in request.keywords: + for fname, function in getmembers(pinecone_mock, isfunction): + monkeypatch.setattr(f"pinecone.{fname}", function, raising=False) + for cname, class_ in getmembers(pinecone_mock, isclass): + monkeypatch.setattr(f"pinecone.{cname}", class_, raising=False) + + return PineconeDocumentStore( + api_key=os.environ.get("PINECONE_API_KEY") or "fake-pinecone-test-key", + embedding_dim=768, + embedding_field="embedding", + index="haystack_tests", + similarity="cosine", + recreate_index=True, + metadata_config={"indexed": META_FIELDS}, + ) + + @pytest.fixture + def doc_store_with_docs(self, doc_store: PineconeDocumentStore, docs: List[Document]) -> PineconeDocumentStore: + """ + This fixture provides a pre-populated document store and takes care of cleaning up after each test + """ + doc_store.write_documents(docs) + return doc_store + + @pytest.fixture + def docs_all_formats(self) -> List[Union[Document, Dict[str, Any]]]: + return [ + # metafield at the top level for backward compatibility + { + "content": "My name is Paul and I live in New York", + "meta_field": "test-1", + "name": "file_1.txt", + "date": "2019-10-01", + "numeric_field": 5.0, + "odd_document": True, + }, + # "dict" format + { + "content": "My name is Carla and I live in Berlin", + "meta": { + "meta_field": "test-2", + "name": "file_2.txt", + "date": "2020-03-01", + "numeric_field": 5.5, + "odd_document": False, + }, + }, + # Document object + Document( + content="My name is Christelle and I live in Paris", + meta={ + "meta_field": "test-3", + "name": "file_3.txt", + "date": "2018-10-01", + "numeric_field": 4.5, + "odd_document": True, + }, + ), + Document( + content="My name is Camila and I live in Madrid", + meta={ + "meta_field": "test-4", + "name": "file_4.txt", + "date": "2021-02-01", + "numeric_field": 3.0, + "odd_document": False, + }, + ), + Document( + content="My name is Matteo and I live in Rome", + meta={ + "meta_field": "test-5", + "name": "file_5.txt", + "date": "2019-01-01", + "numeric_field": 0.0, + "odd_document": True, + }, + ), + # Without meta + Document(content="My name is Ahmed and I live in Cairo"), + ] + + @pytest.fixture + def docs(self, docs_all_formats: List[Union[Document, Dict[str, Any]]]) -> List[Document]: + return [Document.from_dict(doc) if isinstance(doc, dict) else doc for doc in docs_all_formats] + + # + # Tests + # + + @pytest.mark.pinecone + # NOTE: Pinecone does not support dates, so it can't do lte or gte on date fields. When a new release introduces this feature, + # the entire family of test_get_all_documents_extended_filter_* tests will become identical to the one present in the + # base document store suite, and can be removed from here. + def test_get_all_documents_extended_filter_eq(self, doc_store_with_docs: PineconeDocumentStore): + eq_docs = doc_store_with_docs.get_all_documents(filters={"meta_field": {"$eq": "test-1"}}) + normal_docs = doc_store_with_docs.get_all_documents(filters={"meta_field": "test-1"}) + assert eq_docs == normal_docs + + @pytest.mark.pinecone + # NOTE: Pinecone does not support dates, so it can't do lte or gte on date fields. When a new release introduces this feature, + # the entire family of test_get_all_documents_extended_filter_* tests will become identical to the one present in the + # base document store suite, and can be removed from here. + def test_get_all_documents_extended_filter_in(self, doc_store_with_docs: PineconeDocumentStore): + in_docs = doc_store_with_docs.get_all_documents(filters={"meta_field": {"$in": ["test-1", "test-2", "n.a."]}}) + normal_docs = doc_store_with_docs.get_all_documents(filters={"meta_field": ["test-1", "test-2", "n.a."]}) + assert in_docs == normal_docs + + @pytest.mark.pinecone + # NOTE: Pinecone does not support dates, so it can't do lte or gte on date fields. When a new release introduces this feature, + # the entire family of test_get_all_documents_extended_filter_* tests will become identical to the one present in the + # base document store suite, and can be removed from here. + def test_get_all_documents_extended_filter_ne(self, doc_store_with_docs: PineconeDocumentStore): + retrieved_docs = doc_store_with_docs.get_all_documents(filters={"meta_field": {"$ne": "test-1"}}) + assert all("test-1" != d.meta.get("meta_field", None) for d in retrieved_docs) + + @pytest.mark.pinecone + # NOTE: Pinecone does not support dates, so it can't do lte or gte on date fields. When a new release introduces this feature, + # the entire family of test_get_all_documents_extended_filter_* tests will become identical to the one present in the + # base document store suite, and can be removed from here. + def test_get_all_documents_extended_filter_nin(self, doc_store_with_docs: PineconeDocumentStore): + retrieved_docs = doc_store_with_docs.get_all_documents( + filters={"meta_field": {"$nin": ["test-1", "test-2", "n.a."]}} + ) + assert {"test-1", "test-2"}.isdisjoint({d.meta.get("meta_field", None) for d in retrieved_docs}) + + @pytest.mark.pinecone + # NOTE: Pinecone does not support dates, so it can't do lte or gte on date fields. When a new release introduces this feature, + # the entire family of test_get_all_documents_extended_filter_* tests will become identical to the one present in the + # base document store suite, and can be removed from here. + def test_get_all_documents_extended_filter_gt(self, doc_store_with_docs: PineconeDocumentStore): + retrieved_docs = doc_store_with_docs.get_all_documents(filters={"numeric_field": {"$gt": 3.0}}) + assert all(d.meta["numeric_field"] > 3.0 for d in retrieved_docs) + + @pytest.mark.pinecone + # NOTE: Pinecone does not support dates, so it can't do lte or gte on date fields. When a new release introduces this feature, + # the entire family of test_get_all_documents_extended_filter_* tests will become identical to the one present in the + # base document store suite, and can be removed from here. + def test_get_all_documents_extended_filter_gte(self, doc_store_with_docs: PineconeDocumentStore): + retrieved_docs = doc_store_with_docs.get_all_documents(filters={"numeric_field": {"$gte": 3.0}}) + assert all(d.meta["numeric_field"] >= 3.0 for d in retrieved_docs) + + @pytest.mark.pinecone + # NOTE: Pinecone does not support dates, so it can't do lte or gte on date fields. When a new release introduces this feature, + # the entire family of test_get_all_documents_extended_filter_* tests will become identical to the one present in the + # base document store suite, and can be removed from here. + def test_get_all_documents_extended_filter_lt(self, doc_store_with_docs: PineconeDocumentStore): + retrieved_docs = doc_store_with_docs.get_all_documents(filters={"numeric_field": {"$lt": 3.0}}) + assert all(d.meta["numeric_field"] < 3.0 for d in retrieved_docs) + + @pytest.mark.pinecone + # NOTE: Pinecone does not support dates, so it can't do lte or gte on date fields. When a new release introduces this feature, + # the entire family of test_get_all_documents_extended_filter_* tests will become identical to the one present in the + # base document store suite, and can be removed from here. + def test_get_all_documents_extended_filter_lte(self, doc_store_with_docs: PineconeDocumentStore): + retrieved_docs = doc_store_with_docs.get_all_documents(filters={"numeric_field": {"$lte": 3.0}}) + assert all(d.meta["numeric_field"] <= 3.0 for d in retrieved_docs) + + @pytest.mark.pinecone + # NOTE: Pinecone does not support dates, so it can't do lte or gte on date fields. When a new release introduces this feature, + # the entire family of test_get_all_documents_extended_filter_* tests will become identical to the one present in the + # base document store suite, and can be removed from here. + def test_get_all_documents_extended_filter_compound_dates(self, doc_store_with_docs: PineconeDocumentStore): + filters = {"date": {"$lte": "2020-12-31", "$gte": "2019-01-01"}} + + with pytest.raises(FilterError, match=r"Comparison value for '\$[l|g]te' operation must be a float or int."): + doc_store_with_docs.get_all_documents(filters=filters) + + @pytest.mark.pinecone + # NOTE: Pinecone does not support dates, so it can't do lte or gte on date fields. When a new release introduces this feature, + # the entire family of test_get_all_documents_extended_filter_* tests will become identical to the one present in the + # base document store suite, and can be removed from here. + def test_get_all_documents_extended_filter_compound_dates_and_other_field_explicit( + self, doc_store_with_docs: PineconeDocumentStore + ): + filters = { + "$and": { + "date": {"$lte": "2020-12-31", "$gte": "2019-01-01"}, + "name": {"$in": ["file_5.txt", "file_3.txt"]}, + } + } + + with pytest.raises(FilterError, match="Comparison value for '\$[l|g]te' operation must be a float or int."): + doc_store_with_docs.get_all_documents(filters=filters) + + @pytest.mark.pinecone + # NOTE: Pinecone does not support dates, so it can't do lte or gte on date fields. When a new release introduces this feature, + # the entire family of test_get_all_documents_extended_filter_* tests will become identical to the one present in the + # base document store suite, and can be removed from here. + def test_get_all_documents_extended_filter_compound_dates_and_other_field_simplified( + self, doc_store_with_docs: PineconeDocumentStore + ): + filters_simplified = { + "date": {"$lte": "2020-12-31", "$gte": "2019-01-01"}, + "name": ["file_5.txt", "file_3.txt"], + } + + with pytest.raises(FilterError, match="Comparison value for '\$[l|g]te' operation must be a float or int."): + doc_store_with_docs.get_all_documents(filters=filters_simplified) + + @pytest.mark.pinecone + # NOTE: Pinecone does not support dates, so it can't do lte or gte on date fields. When a new release introduces this feature, + # the entire family of test_get_all_documents_extended_filter_* tests will become identical to the one present in the + # base document store suite, and can be removed from here. + def test_get_all_documents_extended_filter_compound_dates_and_or_explicit( + self, doc_store_with_docs: PineconeDocumentStore + ): + filters = { + "$and": { + "date": {"$lte": "2020-12-31", "$gte": "2019-01-01"}, + "$or": {"name": {"$in": ["file_5.txt", "file_3.txt"]}, "numeric_field": {"$lte": 5.0}}, + } + } + + with pytest.raises(FilterError, match="Comparison value for '\$[l|g]te' operation must be a float or int."): + doc_store_with_docs.get_all_documents(filters=filters) + + @pytest.mark.pinecone + # NOTE: Pinecone does not support dates, so it can't do lte or gte on date fields. When a new release introduces this feature, + # the entire family of test_get_all_documents_extended_filter_* tests will become identical to the one present in the + # base document store suite, and can be removed from here. + def test_get_all_documents_extended_filter_compound_dates_and_or_simplified( + self, doc_store_with_docs: PineconeDocumentStore + ): + filters_simplified = { + "date": {"$lte": "2020-12-31", "$gte": "2019-01-01"}, + "$or": {"name": ["file_5.txt", "file_3.txt"], "numeric_field": {"$lte": 5.0}}, + } + + with pytest.raises(FilterError, match="Comparison value for '\$[l|g]te' operation must be a float or int."): + doc_store_with_docs.get_all_documents(filters=filters_simplified) + + @pytest.mark.pinecone + # NOTE: Pinecone does not support dates, so it can't do lte or gte on date fields. When a new release introduces this feature, + # the entire family of test_get_all_documents_extended_filter_* tests will become identical to the one present in the + # base document store suite, and can be removed from here. + def test_get_all_documents_extended_filter_compound_dates_and_or_and_not_explicit( + self, doc_store_with_docs: PineconeDocumentStore + ): + filters = { + "$and": { + "date": {"$lte": "2020-12-31", "$gte": "2019-01-01"}, + "$or": { + "name": {"$in": ["file_5.txt", "file_3.txt"]}, + "$and": {"numeric_field": {"$lte": 5.0}, "$not": {"meta_field": {"$eq": "test-2"}}}, + }, + } + } + with pytest.raises(FilterError, match="Comparison value for '\$[l|g]te' operation must be a float or int."): + doc_store_with_docs.get_all_documents(filters=filters) + + @pytest.mark.pinecone + # NOTE: Pinecone does not support dates, so it can't do lte or gte on date fields. When a new release introduces this feature, + # the entire family of test_get_all_documents_extended_filter_* tests will become identical to the one present in the + # base document store suite, and can be removed from here. + def test_get_all_documents_extended_filter_compound_dates_and_or_and_not_simplified( + self, doc_store_with_docs: PineconeDocumentStore + ): + filters_simplified = { + "date": {"$lte": "2020-12-31", "$gte": "2019-01-01"}, + "$or": { + "name": ["file_5.txt", "file_3.txt"], + "$and": {"numeric_field": {"$lte": 5.0}, "$not": {"meta_field": "test-2"}}, + }, + } + with pytest.raises(FilterError, match="Comparison value for '\$[l|g]te' operation must be a float or int."): + doc_store_with_docs.get_all_documents(filters=filters_simplified) + + @pytest.mark.pinecone + # NOTE: Pinecone does not support dates, so it can't do lte or gte on date fields. When a new release introduces this feature, + # the entire family of test_get_all_documents_extended_filter_* tests will become identical to the one present in the + # base document store suite, and can be removed from here. + def test_get_all_documents_extended_filter_compound_nested_not(self, doc_store_with_docs: PineconeDocumentStore): + # Test nested logical operations within "$not", important as we apply De Morgan's laws in Weaviatedocstore + filters = { + "$not": { + "$or": { + "$and": {"numeric_field": {"$gt": 3.0}, "meta_field": {"$ne": "test-3"}}, + "$not": {"date": {"$lt": "2020-01-01"}}, + } + } + } + with pytest.raises(FilterError, match="Comparison value for '\$[l|g]t' operation must be a float or int."): + doc_store_with_docs.get_all_documents(filters=filters) + + @pytest.mark.pinecone + # NOTE: Pinecone does not support dates, so it can't do lte or gte on date fields. When a new release introduces this feature, + # the entire family of test_get_all_documents_extended_filter_* tests will become identical to the one present in the + # base document store suite, and can be removed from here. + def test_get_all_documents_extended_filter_compound_same_level_not( + self, doc_store_with_docs: PineconeDocumentStore + ): + # Test same logical operator twice on same level, important as we apply De Morgan's laws in Weaviatedocstore + filters = { + "$or": [ + {"$and": {"meta_field": {"$in": ["test-1", "test-2"]}, "date": {"$gte": "2020-01-01"}}}, + {"$and": {"meta_field": {"$in": ["test-3", "test-4"]}, "date": {"$lt": "2020-01-01"}}}, + ] + } + + with pytest.raises(FilterError, match="Comparison value for '\$[l|g]te' operation must be a float or int."): + doc_store_with_docs.get_all_documents(filters=filters) diff --git a/test/mocks/pinecone.py b/test/mocks/pinecone.py index f733bc277..388a08850 100644 --- a/test/mocks/pinecone.py +++ b/test/mocks/pinecone.py @@ -2,8 +2,10 @@ from typing import Optional, List, Dict, Union import logging + logger = logging.getLogger(__name__) + # Mock Pinecone instance CONFIG: dict = {"api_key": None, "environment": None, "indexes": {}} @@ -84,6 +86,24 @@ class Index: include_values: bool = False, include_metadata: bool = False, filter: Optional[dict] = None, + ): + return self.query_filter( + vector=vector, + top_k=top_k, + namespace=namespace, + include_values=include_values, + include_metadata=include_metadata, + filter=filter, + ) + + def query_filter( + self, + vector: List[float], + top_k: int, + namespace: str = "", + include_values: bool = False, + include_metadata: bool = False, + filter: Optional[dict] = None, ): assert len(vector) == self.index_config.dimension response: dict = {"matches": []} @@ -92,6 +112,7 @@ class Index: else: records = self.index_config.namespaces[namespace] namespace_ids = list(records.keys())[:top_k] + for _id in namespace_ids: match = {"id": _id} if include_values: @@ -99,6 +120,7 @@ class Index: if include_metadata: match["metadata"] = records[_id]["metadata"].copy() match["score"] = 0.0 + if filter is None or ( filter is not None and self._filter(records[_id]["metadata"], filter, top_level=True) ): @@ -258,7 +280,7 @@ class Index: # We find the intersect between the IDs and filtered IDs id_list = set(id_list).intersection(filter_ids) records = self.index_config.namespaces[namespace] - for _id in records.keys(): + for _id in list(records.keys()): # list() is needed to be able to del below if _id in id_list: del records[_id] else: