Delete documents by ID in all document stores (#1606)

* Modify BaseDocumentStore.delete_documents() signature, implement ElasticSearch, and add tests

* Add implementation for InMemory

* Implement for SQL, FAISS and Milvus too

* Add tests for faiss and milvus

* Fix delete_all_documents

* Implement deletion by ID for weaviate

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

Co-authored-by: sarthakj2109 <54064348+sarthakj2109@users.noreply.github.com>

Co-authored-by: prafgup <prafulgupta6@gmail.com>

Co-authored-by: ankh6 <andynzemokalumu@live.be>
This commit is contained in:
Sara Zan 2021-10-19 12:30:15 +02:00 committed by GitHub
parent eb95f0e8aa
commit 575e64333c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 232 additions and 64 deletions

View File

@ -456,7 +456,7 @@ None
#### delete\_documents
```python
| delete_documents(index: Optional[str] = None, filters: Optional[Dict[str, List[str]]] = None)
| delete_documents(index: Optional[str] = None, ids: Optional[List[str]] = None, filters: Optional[Dict[str, List[str]]] = None)
```
Delete documents in an index. All documents are deleted if no filters are passed.
@ -464,8 +464,12 @@ Delete documents in an index. All documents are deleted if no filters are passed
**Arguments**:
- `index`: Index name to delete the document from.
- `ids`: Optional list of IDs to narrow down the documents to be deleted.
- `filters`: Optional filters to narrow down the documents to be deleted.
Example filters: {"name": ["some", "more"], "category": ["only_one"]}
Example filters: {"name": ["some", "more"], "category": ["only_one"]}.
If filters are provided along with a list of IDs, this method deletes the
intersection of the two query results (documents that match the filters and
have their ID in the list).
**Returns**:
@ -737,7 +741,7 @@ None
#### delete\_documents
```python
| delete_documents(index: Optional[str] = None, filters: Optional[Dict[str, List[str]]] = None)
| delete_documents(index: Optional[str] = None, ids: Optional[List[str]] = None, filters: Optional[Dict[str, List[str]]] = None)
```
Delete documents in an index. All documents are deleted if no filters are passed.
@ -746,8 +750,12 @@ Delete documents in an index. All documents are deleted if no filters are passed
- `index`: Index name to delete the document from. If None, the
DocumentStore's default index (self.index) will be used.
- `ids`: Optional list of IDs to narrow down the documents to be deleted.
- `filters`: Optional filters to narrow down the documents to be deleted.
Example filters: {"name": ["some", "more"], "category": ["only_one"]}
Example filters: {"name": ["some", "more"], "category": ["only_one"]}.
If filters are provided along with a list of IDs, this method deletes the
intersection of the two query results (documents that match the filters and
have their ID in the list).
**Returns**:
@ -954,7 +962,7 @@ None
#### delete\_documents
```python
| delete_documents(index: Optional[str] = None, filters: Optional[Dict[str, List[str]]] = None)
| delete_documents(index: Optional[str] = None, ids: Optional[List[str]] = None, filters: Optional[Dict[str, List[str]]] = None)
```
Delete documents in an index. All documents are deleted if no filters are passed.
@ -963,8 +971,12 @@ Delete documents in an index. All documents are deleted if no filters are passed
- `index`: Index name to delete the document from. If None, the
DocumentStore's default index (self.index) will be used.
- `ids`: Optional list of IDs to narrow down the documents to be deleted.
- `filters`: Optional filters to narrow down the documents to be deleted.
Example filters: {"name": ["some", "more"], "category": ["only_one"]}
Example filters: {"name": ["some", "more"], "category": ["only_one"]}.
If filters are provided along with a list of IDs, this method deletes the
intersection of the two query results (documents that match the filters and
have their ID in the list).
**Returns**:
@ -1153,7 +1165,7 @@ Delete all documents from the document store.
#### delete\_documents
```python
| delete_documents(index: Optional[str] = None, filters: Optional[Dict[str, List[str]]] = None)
| delete_documents(index: Optional[str] = None, ids: Optional[List[str]] = None, filters: Optional[Dict[str, List[str]]] = None)
```
Delete documents from the document store. All documents are deleted if no filters are passed.
@ -1162,8 +1174,12 @@ Delete documents from the document store. All documents are deleted if no filter
- `index`: Index name to delete the document from. If None, the
DocumentStore's default index (self.index) will be used.
- `ids`: Optional list of IDs to narrow down the documents to be deleted.
- `filters`: Optional filters to narrow down the documents to be deleted.
Example filters: {"name": ["some", "more"], "category": ["only_one"]}
Example filters: {"name": ["some", "more"], "category": ["only_one"]}.
If filters are provided along with a list of IDs, this method deletes the
intersection of the two query results (documents that match the filters and
have their ID in the list).
**Returns**:
@ -1418,7 +1434,7 @@ None
#### delete\_documents
```python
| delete_documents(index: Optional[str] = None, filters: Optional[Dict[str, List[str]]] = None)
| delete_documents(index: Optional[str] = None, ids: Optional[List[str]] = None, filters: Optional[Dict[str, List[str]]] = None)
```
Delete documents in an index. All documents are deleted if no filters are passed.
@ -1427,8 +1443,12 @@ Delete documents in an index. All documents are deleted if no filters are passed
- `index`: Index name to delete the document from. If None, the
DocumentStore's default index (self.index) will be used.
- `filters`: Optional filters to narrow down the search space.
Example: {"name": ["some", "more"], "category": ["only_one"]}
- `ids`: Optional list of IDs to narrow down the documents to be deleted.
- `filters`: Optional filters to narrow down the documents to be deleted.
Example filters: {"name": ["some", "more"], "category": ["only_one"]}.
If filters are provided along with a list of IDs, this method deletes the
intersection of the two query results (documents that match the filters and
have their ID in the list).
**Returns**:
@ -1787,7 +1807,7 @@ None
#### delete\_documents
```python
| delete_documents(index: Optional[str] = None, filters: Optional[Dict[str, List[str]]] = None)
| delete_documents(index: Optional[str] = None, ids: Optional[List[str]] = None, filters: Optional[Dict[str, List[str]]] = None)
```
Delete documents in an index. All documents are deleted if no filters are passed.
@ -1796,8 +1816,12 @@ Delete documents in an index. All documents are deleted if no filters are passed
- `index`: Index name to delete the document from. If None, the
DocumentStore's default index (self.index) will be used.
- `ids`: Optional list of IDs to narrow down the documents to be deleted.
- `filters`: Optional filters to narrow down the documents to be deleted.
Example filters: {"name": ["some", "more"], "category": ["only_one"]}
Example filters: {"name": ["some", "more"], "category": ["only_one"]}.
If filters are provided along with a list of IDs, this method deletes the
intersection of the two query results (documents that match the filters and
have their ID in the list).
**Returns**:

View File

@ -245,7 +245,7 @@ class BaseDocumentStore(BaseComponent):
pass
@abstractmethod
def delete_documents(self, index: Optional[str] = None, filters: Optional[Dict[str, List[str]]] = None):
def delete_documents(self, index: Optional[str] = None, ids: Optional[List[str]] = None, filters: Optional[Dict[str, List[str]]] = None):
pass
def run(self, documents: List[dict], index: Optional[str] = None): # type: ignore

View File

@ -991,15 +991,19 @@ class ElasticsearchDocumentStore(BaseDocumentStore):
For more details, please refer to the issue: https://github.com/deepset-ai/haystack/issues/1045
"""
)
self.delete_documents(index, filters)
self.delete_documents(index, None, filters)
def delete_documents(self, index: Optional[str] = None, filters: Optional[Dict[str, List[str]]] = None):
def delete_documents(self, index: Optional[str] = None, ids: Optional[List[str]] = None, filters: Optional[Dict[str, List[str]]] = None):
"""
Delete documents in an index. All documents are deleted if no filters are passed.
:param index: Index name to delete the document from.
:param ids: Optional list of IDs to narrow down the documents to be deleted.
:param filters: Optional filters to narrow down the documents to be deleted.
Example filters: {"name": ["some", "more"], "category": ["only_one"]}
Example filters: {"name": ["some", "more"], "category": ["only_one"]}.
If filters are provided along with a list of IDs, this method deletes the
intersection of the two query results (documents that match the filters and
have their ID in the list).
:return: None
"""
index = index or self.index
@ -1013,6 +1017,13 @@ class ElasticsearchDocumentStore(BaseDocumentStore):
}
)
query["query"]["bool"] = {"filter": filter_clause}
if ids:
query["query"]["bool"]["must"] = {"ids": {"values": ids}}
elif ids:
if ids:
query["query"]["ids"] = {"values": ids}
else:
query["query"] = {"match_all": {}}
self.client.delete_by_query(index=index, body=query, ignore=[404])

View File

@ -397,27 +397,34 @@ class FAISSDocumentStore(SQLDocumentStore):
For more details, please refer to the issue: https://github.com/deepset-ai/haystack/issues/1045
"""
)
self.delete_documents(index, filters)
self.delete_documents(index, None, filters)
def delete_documents(self, index: Optional[str] = None, filters: Optional[Dict[str, List[str]]] = None):
def delete_documents(self, index: Optional[str] = None, ids: Optional[List[str]] = None, filters: Optional[Dict[str, List[str]]] = None):
"""
Delete documents from the document store. All documents are deleted if no filters are passed.
:param index: Index name to delete the document from. If None, the
DocumentStore's default index (self.index) will be used.
:param filters: Optional filters to narrow down the documents to be deleted.
Example filters: {"name": ["some", "more"], "category": ["only_one"]}
:param ids: Optional list of IDs to narrow down the documents to be deleted.
:param filters: Optional filters to narrow down the documents to be deleted.
Example filters: {"name": ["some", "more"], "category": ["only_one"]}.
If filters are provided along with a list of IDs, this method deletes the
intersection of the two query results (documents that match the filters and
have their ID in the list).
:return: None
"""
index = index or self.index
if index in self.faiss_indexes.keys():
if filters:
if not filters and not ids:
self.faiss_indexes[index].reset()
else:
affected_docs = self.get_all_documents(filters=filters)
if ids:
affected_docs = [doc for doc in affected_docs if doc.id in ids]
doc_ids = [doc.meta.get("vector_id") for doc in affected_docs if doc.meta and doc.meta.get("vector_id") is not None]
self.faiss_indexes[index].remove_ids(np.array(doc_ids, dtype="int64"))
else:
self.faiss_indexes[index].reset()
super().delete_documents(index=index, filters=filters)
super().delete_documents(index=index, ids=ids, filters=filters)
def query_by_embedding(
self,

View File

@ -389,21 +389,29 @@ class InMemoryDocumentStore(BaseDocumentStore):
For more details, please refer to the issue: https://github.com/deepset-ai/haystack/issues/1045
"""
)
self.delete_documents(index, filters)
self.delete_documents(index, None, filters)
def delete_documents(self, index: Optional[str] = None, filters: Optional[Dict[str, List[str]]] = None):
def delete_documents(self, index: Optional[str] = None, ids: Optional[List[str]] = None, filters: Optional[Dict[str, List[str]]] = None):
"""
Delete documents in an index. All documents are deleted if no filters are passed.
:param index: Index name to delete the document from. If None, the
DocumentStore's default index (self.index) will be used.
:param ids: Optional list of IDs to narrow down the documents to be deleted.
:param filters: Optional filters to narrow down the documents to be deleted.
Example filters: {"name": ["some", "more"], "category": ["only_one"]}
Example filters: {"name": ["some", "more"], "category": ["only_one"]}.
If filters are provided along with a list of IDs, this method deletes the
intersection of the two query results (documents that match the filters and
have their ID in the list).
:return: None
"""
index = index or self.index
if not filters:
if not filters and not ids:
self.indexes[index] = {}
return
for doc in self.get_all_documents(filters=filters):
return
docs_to_delete = self.get_all_documents(filters=filters)
if ids:
docs_to_delete = [doc for doc in docs_to_delete if doc.id in ids]
for doc in docs_to_delete:
del self.indexes[index][doc.id]

View File

@ -389,16 +389,20 @@ class MilvusDocumentStore(SQLDocumentStore):
For more details, please refer to the issue: https://github.com/deepset-ai/haystack/issues/1045
"""
)
self.delete_documents(index, filters)
self.delete_documents(index, None, filters)
def delete_documents(self, index: Optional[str] = None, filters: Optional[Dict[str, List[str]]] = None):
def delete_documents(self, index: Optional[str] = None, ids: Optional[List[str]] = None, filters: Optional[Dict[str, List[str]]] = None):
"""
Delete documents in an index. All documents are deleted if no filters are passed.
:param index: Index name to delete the document from. If None, the
DocumentStore's default index (self.index) will be used.
:param filters: Optional filters to narrow down the search space.
Example: {"name": ["some", "more"], "category": ["only_one"]}
:param ids: Optional list of IDs to narrow down the documents to be deleted.
:param filters: Optional filters to narrow down the documents to be deleted.
Example filters: {"name": ["some", "more"], "category": ["only_one"]}.
If filters are provided along with a list of IDs, this method deletes the
intersection of the two query results (documents that match the filters and
have their ID in the list).
:return: None
"""
index = index or self.index
@ -406,19 +410,21 @@ class MilvusDocumentStore(SQLDocumentStore):
if status.code != Status.SUCCESS:
raise RuntimeError(f'Milvus has collection check failed: {status}')
if ok:
if filters:
existing_docs = super().get_all_documents(filters=filters, index=index)
self._delete_vector_ids_from_milvus(documents=existing_docs, index=index)
else:
if not filters and not ids:
status = self.milvus_server.drop_collection(collection_name=index)
if status.code != Status.SUCCESS:
raise RuntimeError(f'Milvus drop collection failed: {status}')
else:
affected_docs = super().get_all_documents(filters=filters, index=index)
if ids:
affected_docs = [doc for doc in affected_docs if doc.id in ids]
self._delete_vector_ids_from_milvus(documents=affected_docs, index=index)
self.milvus_server.flush([index])
self.milvus_server.compact(collection_name=index)
# Delete from SQL at the end to allow the above .get_all_documents() to work properly
super().delete_documents(index=index, filters=filters)
super().delete_documents(index=index, ids=ids, filters=filters)
def get_all_documents_generator(
self,

View File

@ -514,33 +514,39 @@ class SQLDocumentStore(BaseDocumentStore):
For more details, please refer to the issue: https://github.com/deepset-ai/haystack/issues/1045
"""
)
self.delete_documents(index, filters)
self.delete_documents(index, None, filters)
def delete_documents(self, index: Optional[str] = None, filters: Optional[Dict[str, List[str]]] = None):
def delete_documents(self, index: Optional[str] = None, ids: Optional[List[str]] = None, filters: Optional[Dict[str, List[str]]] = None):
"""
Delete documents in an index. All documents are deleted if no filters are passed.
:param index: Index name to delete the document from. If None, the
DocumentStore's default index (self.index) will be used.
:param ids: Optional list of IDs to narrow down the documents to be deleted.
:param filters: Optional filters to narrow down the documents to be deleted.
Example filters: {"name": ["some", "more"], "category": ["only_one"]}
Example filters: {"name": ["some", "more"], "category": ["only_one"]}.
If filters are provided along with a list of IDs, this method deletes the
intersection of the two query results (documents that match the filters and
have their ID in the list).
:return: None
"""
index = index or self.index
if filters:
# documents_query = documents_query.join(MetaORM)
document_ids_to_delete = self.session.query(DocumentORM.id).filter_by(index=index)
for key, values in filters.items():
document_ids_to_delete = document_ids_to_delete.filter(
MetaDocumentORM.name == key,
if not filters and not ids:
self.session.query(DocumentORM).filter_by(index=index).delete(synchronize_session=False)
else:
document_ids_to_delete = self.session.query(DocumentORM.id).filter(DocumentORM.index==index)
if filters:
for key, values in filters.items():
document_ids_to_delete = document_ids_to_delete.filter(
MetaDocumentORM.name == key,
MetaDocumentORM.value.in_(values),
DocumentORM.id == MetaDocumentORM.document_id
)
DocumentORM.id == MetaDocumentORM.document_id
)
if ids:
document_ids_to_delete = document_ids_to_delete.filter(DocumentORM.id.in_(ids))
self.session.query(DocumentORM).filter(DocumentORM.id.in_(document_ids_to_delete)).delete(
synchronize_session=False)
else:
self.session.query(DocumentORM).filter_by(index=index).delete(synchronize_session=False)
self.session.commit()

View File

@ -681,26 +681,33 @@ class WeaviateDocumentStore(BaseDocumentStore):
For more details, please refer to the issue: https://github.com/deepset-ai/haystack/issues/1045
"""
)
self.delete_documents(index, filters)
self.delete_documents(index, None, filters)
def delete_documents(self, index: Optional[str] = None, filters: Optional[Dict[str, List[str]]] = None):
def delete_documents(self, index: Optional[str] = None, ids: Optional[List[str]] = None, filters: Optional[Dict[str, List[str]]] = None):
"""
Delete documents in an index. All documents are deleted if no filters are passed.
:param index: Index name to delete the document from. If None, the
DocumentStore's default index (self.index) will be used.
:param ids: Optional list of IDs to narrow down the documents to be deleted.
:param filters: Optional filters to narrow down the documents to be deleted.
Example filters: {"name": ["some", "more"], "category": ["only_one"]}
Example filters: {"name": ["some", "more"], "category": ["only_one"]}.
If filters are provided along with a list of IDs, this method deletes the
intersection of the two query results (documents that match the filters and
have their ID in the list).
:return: None
"""
index = index or self.index
if filters:
docs_to_delete = self.get_all_documents(index, filters=filters)
for doc in docs_to_delete:
self.weaviate_client.data_object.delete(doc.id)
else:
if not filters and not ids:
self.weaviate_client.schema.delete_class(index)
self._create_schema_and_index_if_not_exist(index)
else:
docs_to_delete = self.get_all_documents(index, filters=filters)
if ids:
docs_to_delete = [doc for doc in docs_to_delete if doc.id in ids]
for doc in docs_to_delete:
self.weaviate_client.data_object.delete(doc.id)

View File

@ -345,6 +345,32 @@ def test_delete_documents(document_store_with_docs):
assert len(documents) == 0
def test_delete_documents_by_id(document_store_with_docs):
doc_ids = [doc.id for doc in document_store_with_docs.get_all_documents()]
assert len(doc_ids) == 3
docs_to_delete = doc_ids[0:2]
document_store_with_docs.delete_documents(ids=docs_to_delete)
documents = document_store_with_docs.get_all_documents()
assert len(documents) == 1
assert documents[0].id == doc_ids[2]
def test_delete_documents_by_id_with_filters(document_store_with_docs):
docs_to_delete = document_store_with_docs.get_all_documents(filters={"meta_field": ["test1", "test2"]})
docs_not_to_delete = document_store_with_docs.get_all_documents(filters={"meta_field": ["test3"]})
document_store_with_docs.delete_documents(ids=[doc.id for doc in docs_to_delete], filters={"meta_field": ["test1"]})
all_docs_left = document_store_with_docs.get_all_documents()
assert len(all_docs_left) == 2
assert all(doc.meta["meta_field"] != "test1" for doc in all_docs_left)
all_ids_left = [doc.id for doc in all_docs_left]
assert all(doc.id in all_ids_left for doc in docs_not_to_delete)
def test_delete_documents_with_filters(document_store_with_docs):
document_store_with_docs.delete_documents(filters={"meta_field": ["test1", "test2"]})
documents = document_store_with_docs.get_all_documents()

View File

@ -210,6 +210,51 @@ def test_delete_docs_with_filters(document_store, retriever):
assert {doc.meta["name"] for doc in documents} == {"name_5", "name_6"}
@pytest.mark.slow
@pytest.mark.parametrize("retriever", ["dpr"], indirect=True)
@pytest.mark.parametrize("document_store", ["faiss", "milvus"], indirect=True)
def test_delete_docs_by_id(document_store, retriever):
document_store.write_documents(DOCUMENTS)
document_store.update_embeddings(retriever=retriever, batch_size=4)
assert document_store.get_embedding_count() == 6
doc_ids = [doc.id for doc in document_store.get_all_documents()]
ids_to_delete = doc_ids[0:3]
document_store.delete_documents(ids=ids_to_delete)
documents = document_store.get_all_documents()
assert len(documents) == len(doc_ids) - len(ids_to_delete)
assert document_store.get_embedding_count() == len(doc_ids) - len(ids_to_delete)
remaining_ids = [doc.id for doc in documents]
assert all(doc_id not in remaining_ids for doc_id in ids_to_delete)
@pytest.mark.slow
@pytest.mark.parametrize("retriever", ["dpr"], indirect=True)
@pytest.mark.parametrize("document_store", ["faiss", "milvus"], indirect=True)
def test_delete_docs_by_id_with_filters(document_store, retriever):
document_store.write_documents(DOCUMENTS)
document_store.update_embeddings(retriever=retriever, batch_size=4)
assert document_store.get_embedding_count() == 6
ids_to_delete = [doc.id for doc in document_store.get_all_documents(filters={"name": ["name_1", "name_2"]})]
ids_not_to_delete = [doc.id for doc in document_store.get_all_documents(filters={"name": ["name_3", "name_4", "name_5", "name_6"]})]
document_store.delete_documents(ids=ids_to_delete, filters={"name": ["name_1", "name_2", "name_3", "name_4"]})
documents = document_store.get_all_documents()
assert len(documents) == len(DOCUMENTS) - len(ids_to_delete)
assert document_store.get_embedding_count() == len(DOCUMENTS) - len(ids_to_delete)
assert all(doc.meta["name"] != "name_1" for doc in documents)
assert all(doc.meta["name"] != "name_2" for doc in documents)
all_ids_left = [doc.id for doc in documents]
assert all(doc_id in all_ids_left for doc_id in ids_not_to_delete)
@pytest.mark.parametrize("retriever", ["embedding"], indirect=True)
@pytest.mark.parametrize("document_store", ["faiss", "milvus"], indirect=True)
def test_pipeline(document_store, retriever):

View File

@ -314,7 +314,7 @@ def test_query(document_store_with_docs):
@pytest.mark.weaviate
@pytest.mark.parametrize("document_store_with_docs", ["weaviate"], indirect=True)
def test_delete_all_documents(document_store_with_docs):
def test_delete_documents(document_store_with_docs):
assert len(document_store_with_docs.get_all_documents()) == 3
document_store_with_docs.delete_documents()
@ -324,8 +324,36 @@ def test_delete_all_documents(document_store_with_docs):
@pytest.mark.weaviate
@pytest.mark.parametrize("document_store_with_docs", ["weaviate"], indirect=True)
def test_delete_documents_with_filters(document_store_with_docs):
document_store_with_docs.delete_all_documents(filters={"metafield": ["test1", "test2"]})
assert len(document_store_with_docs.get_all_documents()) == 3
document_store_with_docs.delete_documents(filters={"metafield": ["test1", "test2"]})
documents = document_store_with_docs.get_all_documents()
assert len(documents) == 1
assert documents[0].meta["metafield"] == "test3"
@pytest.mark.weaviate
@pytest.mark.parametrize("document_store_with_docs", ["weaviate"], indirect=True)
def test_delete_documents_by_id(document_store_with_docs):
assert len(document_store_with_docs.get_all_documents()) == 3
ids_to_delete = [doc.id for doc in document_store_with_docs.get_all_documents()[0:2]]
document_store_with_docs.delete_documents(ids=ids_to_delete)
documents = document_store_with_docs.get_all_documents()
assert len(documents) == 1
assert documents[0].id not in ids_to_delete
@pytest.mark.weaviate
@pytest.mark.parametrize("document_store_with_docs", ["weaviate"], indirect=True)
def test_delete_documents_by_id_with_filters(document_store_with_docs):
docs_to_delete = document_store_with_docs.get_all_documents(filters={"metafield": ["test1", "test2"]})
docs_not_to_delete = document_store_with_docs.get_all_documents(filters={"metafield": ["test3"]})
document_store_with_docs.delete_documents(ids=[doc.id for doc in docs_to_delete], filters={"metafield": ["test1"]})
all_docs_left = document_store_with_docs.get_all_documents()
assert len(all_docs_left) == 2
assert all(doc.meta["metafield"] != "test1" for doc in all_docs_left)
all_ids_left = [doc.id for doc in all_docs_left]
assert all(doc.id in all_ids_left for doc in docs_not_to_delete)