mirror of
https://github.com/deepset-ai/haystack.git
synced 2025-11-11 15:23:41 +00:00
Bulk insert in sql document stores (#2264)
* bulk_save_objects * use bulk_insert_mappings in sql document store * bug fix : write_document overwrite * update documentation write_documents * Update Documentation & Code Style * small changes for readability * bug fix: missing quotation marks Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: Sara Zan <sara.zanzottera@deepset.ai>
This commit is contained in:
parent
a97a9d2b48
commit
004e7f33af
@ -1790,7 +1790,8 @@ documents for evaluation can be indexed in a separate index than the documents f
|
|||||||
- `duplicate_documents`: Handle duplicates document based on parameter options.
|
- `duplicate_documents`: Handle duplicates document based on parameter options.
|
||||||
Parameter options : ( 'skip','overwrite','fail')
|
Parameter options : ( 'skip','overwrite','fail')
|
||||||
skip: Ignore the duplicates documents
|
skip: Ignore the duplicates documents
|
||||||
overwrite: Update any existing documents with the same ID when adding documents.
|
overwrite: Update any existing documents with the same ID when adding documents
|
||||||
|
but is considerably slower (default).
|
||||||
fail: an error is raised if the document ID of the document being added already
|
fail: an error is raised if the document ID of the document being added already
|
||||||
exists.
|
exists.
|
||||||
|
|
||||||
|
|||||||
@ -369,7 +369,8 @@ class SQLDocumentStore(BaseDocumentStore):
|
|||||||
:param duplicate_documents: Handle duplicates document based on parameter options.
|
:param duplicate_documents: Handle duplicates document based on parameter options.
|
||||||
Parameter options : ( 'skip','overwrite','fail')
|
Parameter options : ( 'skip','overwrite','fail')
|
||||||
skip: Ignore the duplicates documents
|
skip: Ignore the duplicates documents
|
||||||
overwrite: Update any existing documents with the same ID when adding documents.
|
overwrite: Update any existing documents with the same ID when adding documents
|
||||||
|
but is considerably slower (default).
|
||||||
fail: an error is raised if the document ID of the document being added already
|
fail: an error is raised if the document ID of the document being added already
|
||||||
exists.
|
exists.
|
||||||
|
|
||||||
@ -392,24 +393,30 @@ class SQLDocumentStore(BaseDocumentStore):
|
|||||||
documents=document_objects, index=index, duplicate_documents=duplicate_documents
|
documents=document_objects, index=index, duplicate_documents=duplicate_documents
|
||||||
)
|
)
|
||||||
for i in range(0, len(document_objects), batch_size):
|
for i in range(0, len(document_objects), batch_size):
|
||||||
|
docs_orm = []
|
||||||
for doc in document_objects[i : i + batch_size]:
|
for doc in document_objects[i : i + batch_size]:
|
||||||
meta_fields = doc.meta or {}
|
meta_fields = doc.meta or {}
|
||||||
vector_id = meta_fields.pop("vector_id", None)
|
vector_id = meta_fields.pop("vector_id", None)
|
||||||
meta_orms = [MetaDocumentORM(name=key, value=value) for key, value in meta_fields.items()]
|
meta_orms = [MetaDocumentORM(name=key, value=value) for key, value in meta_fields.items()]
|
||||||
doc_orm = DocumentORM(
|
doc_mapping = {
|
||||||
id=doc.id,
|
"id": doc.id,
|
||||||
content=doc.to_dict()["content"],
|
"content": doc.to_dict()["content"],
|
||||||
content_type=doc.content_type,
|
"content_type": doc.content_type,
|
||||||
vector_id=vector_id,
|
"vector_id": vector_id,
|
||||||
meta=meta_orms,
|
"meta": meta_orms,
|
||||||
index=index,
|
"index": index,
|
||||||
)
|
}
|
||||||
if duplicate_documents == "overwrite":
|
if duplicate_documents == "overwrite":
|
||||||
|
doc_orm = DocumentORM(**doc_mapping)
|
||||||
# First old meta data cleaning is required
|
# First old meta data cleaning is required
|
||||||
self.session.query(MetaDocumentORM).filter_by(document_id=doc.id).delete()
|
self.session.query(MetaDocumentORM).filter_by(document_id=doc.id).delete()
|
||||||
self.session.merge(doc_orm)
|
self.session.merge(doc_orm)
|
||||||
else:
|
else:
|
||||||
self.session.add(doc_orm)
|
docs_orm.append(doc_mapping)
|
||||||
|
|
||||||
|
if docs_orm:
|
||||||
|
self.session.bulk_insert_mappings(DocumentORM, docs_orm)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.session.commit()
|
self.session.commit()
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user