mirror of
https://github.com/deepset-ai/haystack.git
synced 2025-08-31 11:56:35 +00:00
feat: Add Indexing Pipeline (#6424)
* Add build_indexing_pipeline utils function * Pylint fixes * Move into another package to avoid circular deps * Revert change * Revert haystack/utils/__init__.py change * Add example * Use DocumentStore type, remove typing checks
This commit is contained in:
parent
8267058c05
commit
008a322023
14
examples/getting_started/indexing.py
Normal file
14
examples/getting_started/indexing.py
Normal file
@ -0,0 +1,14 @@
|
||||
from pathlib import Path
|
||||
|
||||
from haystack.document_stores import InMemoryDocumentStore
|
||||
from haystack.pipeline_utils import build_indexing_pipeline
|
||||
|
||||
# We support many different databases. Here we load a simple and lightweight in-memory document store.
|
||||
document_store = InMemoryDocumentStore()
|
||||
|
||||
# Let's now build indexing pipeline that indexes PDFs and text files from a test folder.
|
||||
indexing_pipeline = build_indexing_pipeline(
|
||||
document_store=document_store, embedding_model="sentence-transformers/all-mpnet-base-v2"
|
||||
)
|
||||
result = indexing_pipeline.run(files=list(Path("../../test/test_files").iterdir()))
|
||||
print(result)
|
@ -1,3 +1,4 @@
|
||||
from haystack.pipeline_utils.rag import build_rag_pipeline
|
||||
from haystack.pipeline_utils.indexing import build_indexing_pipeline
|
||||
|
||||
__all__ = ["build_rag_pipeline"]
|
||||
__all__ = ["build_rag_pipeline", "build_indexing_pipeline"]
|
||||
|
223
haystack/pipeline_utils/indexing.py
Normal file
223
haystack/pipeline_utils/indexing.py
Normal file
@ -0,0 +1,223 @@
|
||||
import inspect
|
||||
import os
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Optional, List, Any, Dict
|
||||
from typing import Union, Type
|
||||
|
||||
from haystack.document_stores.protocol import DocumentStore
|
||||
|
||||
from haystack import Pipeline
|
||||
from haystack.components.converters import TextFileToDocument
|
||||
from haystack.components.embedders import SentenceTransformersDocumentEmbedder, OpenAIDocumentEmbedder
|
||||
from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter
|
||||
from haystack.components.routers import FileTypeRouter, DocumentJoiner
|
||||
from haystack.components.writers import DocumentWriter
|
||||
|
||||
|
||||
def build_indexing_pipeline(
|
||||
document_store: Any,
|
||||
embedding_model: Optional[str] = None,
|
||||
embedding_model_kwargs: Optional[Dict[str, Any]] = None,
|
||||
supported_mime_types: Optional[List[str]] = None,
|
||||
):
|
||||
"""
|
||||
Returns a prebuilt pipeline for indexing documents into a DocumentStore. Indexing pipeline automatically detects
|
||||
the file type of the input files and converts them into Documents. The supported file types are: .txt,
|
||||
.pdf, and .html
|
||||
|
||||
Example usage:
|
||||
|
||||
```python
|
||||
from haystack.utils import build_indexing_pipeline
|
||||
indexing_pipe = build_indexing_pipeline(document_store=your_document_store_instance)
|
||||
indexing_pipe.run(files=["path/to/file1", "path/to/file2"])
|
||||
>>> {'documents_written': 2}
|
||||
```
|
||||
|
||||
One can also pass an embedding model to the pipeline, which will then calculate embeddings for the documents
|
||||
and store them in the DocumentStore. Example usage:
|
||||
```python
|
||||
indexing_pipe = build_indexing_pipeline(document_store=your_document_store_instance,
|
||||
embedding_model="sentence-transformers/all-mpnet-base-v2")
|
||||
indexing_pipe.run(files=["path/to/file1", "path/to/file2"])
|
||||
>>> {'documents_written': 2}
|
||||
```
|
||||
|
||||
After running indexing pipeline, the documents are indexed in the DocumentStore and can be used for querying.
|
||||
|
||||
|
||||
:param document_store: An instance of a DocumentStore to index documents into.
|
||||
:param embedding_model: The name of the model to use for document embeddings.
|
||||
:param embedding_model_kwargs: Keyword arguments to pass to the embedding model class.
|
||||
:param supported_mime_types: List of MIME types to support in the pipeline. If not given,
|
||||
defaults to ["text/plain", "application/pdf", "text/html"].
|
||||
|
||||
"""
|
||||
return _IndexingPipeline(
|
||||
document_store=document_store,
|
||||
embedding_model=embedding_model,
|
||||
embedding_model_kwargs=embedding_model_kwargs,
|
||||
supported_mime_types=supported_mime_types,
|
||||
)
|
||||
|
||||
|
||||
class _IndexingPipeline:
|
||||
"""
|
||||
An internal class to simplify creation of prebuilt pipeline for indexing documents into a DocumentStore. Indexing
|
||||
pipeline automatically detect the file type of the input files and converts them into Documents. The supported
|
||||
file types are: .txt, .pdf, and .html
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
document_store: DocumentStore,
|
||||
embedding_model: Optional[str] = None,
|
||||
embedding_model_kwargs: Optional[Dict[str, Any]] = None,
|
||||
supported_mime_types: Optional[List[str]] = None,
|
||||
):
|
||||
"""
|
||||
:param document_store: An instance of a DocumentStore to index documents into.
|
||||
:param embedding_model: The name of the model to use for document embeddings.
|
||||
:param supported_mime_types: List of MIME types to support in the pipeline. If not given,
|
||||
defaults to ["text/plain", "application/pdf", "text/html"].
|
||||
"""
|
||||
|
||||
if supported_mime_types is None:
|
||||
supported_mime_types = ["text/plain", "application/pdf", "text/html"]
|
||||
|
||||
self.pipeline = Pipeline()
|
||||
self.pipeline.add_component("file_type_router", FileTypeRouter(mime_types=supported_mime_types))
|
||||
converters_used: List[str] = []
|
||||
# Add converters dynamically based on MIME types
|
||||
if "text/plain" in supported_mime_types:
|
||||
self.pipeline.add_component("text_file_converter", TextFileToDocument())
|
||||
self.pipeline.connect("file_type_router.text/plain", "text_file_converter.sources")
|
||||
converters_used.append("text_file_converter")
|
||||
|
||||
if "application/pdf" in supported_mime_types:
|
||||
from haystack.components.converters import PyPDFToDocument
|
||||
|
||||
self.pipeline.add_component("pdf_file_converter", PyPDFToDocument())
|
||||
self.pipeline.connect("file_type_router.application/pdf", "pdf_file_converter.sources")
|
||||
converters_used.append("pdf_file_converter")
|
||||
|
||||
if "text/html" in supported_mime_types:
|
||||
from haystack.components.converters import HTMLToDocument
|
||||
|
||||
self.pipeline.add_component("html_file_converter", HTMLToDocument())
|
||||
self.pipeline.connect("file_type_router.text/html", "html_file_converter.sources")
|
||||
converters_used.append("html_file_converter")
|
||||
|
||||
# Add remaining common components
|
||||
self.pipeline.add_component("document_joiner", DocumentJoiner())
|
||||
self.pipeline.add_component("document_cleaner", DocumentCleaner())
|
||||
self.pipeline.add_component("document_splitter", DocumentSplitter())
|
||||
|
||||
# Connect converters to joiner, if they exist
|
||||
for converter_name in converters_used:
|
||||
self.pipeline.connect(f"{converter_name}.documents", "document_joiner.documents")
|
||||
|
||||
# Connect joiner to cleaner and splitter
|
||||
self.pipeline.connect("document_joiner.documents", "document_cleaner.documents")
|
||||
self.pipeline.connect("document_cleaner.documents", "document_splitter.documents")
|
||||
|
||||
if embedding_model:
|
||||
embedder_instance = self._find_embedder(embedding_model, embedding_model_kwargs)
|
||||
self.pipeline.add_component("storage_sink", DocumentWriter(document_store=document_store))
|
||||
self.pipeline.add_component("writer", embedder_instance)
|
||||
self.pipeline.connect("writer", "storage_sink")
|
||||
else:
|
||||
self.pipeline.add_component("writer", DocumentWriter(document_store=document_store))
|
||||
|
||||
self.pipeline.connect("document_splitter.documents", "writer.documents")
|
||||
|
||||
# this is more of a sanity check for the maintainer of the pipeline, to make sure that the pipeline is
|
||||
# configured correctly
|
||||
if len(self.pipeline.inputs()) < 1:
|
||||
raise RuntimeError("IndexingPipeline needs at least one input component.")
|
||||
if len(self.pipeline.outputs()) < 1:
|
||||
raise RuntimeError("IndexingPipeline needs at least one output component.")
|
||||
|
||||
def run(self, files: List[Union[str, Path]]) -> Dict[str, Any]:
|
||||
"""
|
||||
Performs indexing of the given list of documents into the DocumentStore.
|
||||
:param files: A list of paths to files to index.
|
||||
:type files: List[Union[str, Path]]
|
||||
|
||||
:return: the output of the pipeline run, which is a dictionary containing the number of documents written
|
||||
"""
|
||||
if not files:
|
||||
return {"documents_written": 0}
|
||||
input_files = self._process_files(files)
|
||||
pipeline_output = self.pipeline.run(data={"file_type_router": {"sources": input_files}})
|
||||
aggregated_results = {}
|
||||
# combine the results of all outputs into one dictionary
|
||||
for component_result in pipeline_output.values():
|
||||
aggregated_results.update(component_result)
|
||||
return aggregated_results
|
||||
|
||||
def _find_embedder(self, embedding_model: str, init_kwargs: Optional[Dict[str, Any]] = None) -> Any:
|
||||
embedder_patterns = {
|
||||
r"^text-embedding.*": OpenAIDocumentEmbedder,
|
||||
r"^sentence-transformers.*": SentenceTransformersDocumentEmbedder,
|
||||
# add more patterns or adjust them here
|
||||
}
|
||||
embedder_class = next((val for pat, val in embedder_patterns.items() if re.match(pat, embedding_model)), None)
|
||||
if not embedder_class:
|
||||
raise ValueError(
|
||||
f"Could not find an embedder for the given embedding model name {embedding_model}. "
|
||||
f"Please provide a valid embedding model name. "
|
||||
f"Valid embedder classes are {embedder_patterns.values()}."
|
||||
)
|
||||
return self._create_embedder(embedder_class, embedding_model, init_kwargs)
|
||||
|
||||
def _create_embedder(
|
||||
self, embedder_class: Type, model_name: str, init_kwargs: Optional[Dict[str, Any]] = None
|
||||
) -> Any:
|
||||
init_signature = inspect.signature(embedder_class.__init__)
|
||||
|
||||
kwargs = {**(init_kwargs or {})}
|
||||
|
||||
# Determine the correct parameter name and set it
|
||||
if "model_name_or_path" in init_signature.parameters:
|
||||
kwargs["model_name_or_path"] = model_name
|
||||
elif "model_name" in init_signature.parameters:
|
||||
kwargs["model_name"] = model_name
|
||||
else:
|
||||
raise ValueError(f"Could not find a parameter for the model name in the embedder class {embedder_class}")
|
||||
|
||||
# Instantiate the class
|
||||
return embedder_class(**kwargs)
|
||||
|
||||
def _list_files_recursively(self, path: Union[str, Path]) -> List[str]:
|
||||
"""
|
||||
List all files in a directory recursively as a list of strings, or return the file itself
|
||||
if it's not a directory.
|
||||
:param path: the path to list files from
|
||||
:type path: Union[str, Path]
|
||||
:return: a list of strings, where each string is a path to a file
|
||||
"""
|
||||
|
||||
if os.path.isfile(path):
|
||||
return [str(path)]
|
||||
elif os.path.isdir(path):
|
||||
file_list: List[str] = []
|
||||
for root, _, files in os.walk(path):
|
||||
for file in files:
|
||||
file_list.append(os.path.join(root, file))
|
||||
return file_list
|
||||
else:
|
||||
return []
|
||||
|
||||
def _process_files(self, files: List[Union[str, Path]]) -> List[str]:
|
||||
"""
|
||||
Process a list of files and directories, listing all files recursively and removing duplicates.
|
||||
:param files: A list of files and directories to process.
|
||||
:type files: List[Union[str, Path]]
|
||||
:return: A list of unique files.
|
||||
"""
|
||||
nested_file_lists = [self._list_files_recursively(file) for file in files]
|
||||
combined_files = [item for sublist in nested_file_lists for item in sublist]
|
||||
unique_files = list(set(combined_files))
|
||||
return unique_files
|
@ -0,0 +1,4 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Add a indexing `build_indexing_pipeline` utility function
|
69
test/pipelines/test_indexing_pipeline.py
Normal file
69
test/pipelines/test_indexing_pipeline.py
Normal file
@ -0,0 +1,69 @@
|
||||
import pytest
|
||||
|
||||
from haystack.pipeline_utils.indexing import build_indexing_pipeline
|
||||
from haystack.document_stores import InMemoryDocumentStore
|
||||
|
||||
|
||||
class TestIndexingPipeline:
|
||||
# indexing files without embeddings
|
||||
def test_indexing_files_without_embeddings(self, test_files_path):
|
||||
file_paths = [test_files_path / "txt" / "doc_1.txt", test_files_path / "txt" / "doc_2.txt"]
|
||||
document_store = InMemoryDocumentStore()
|
||||
pipeline = build_indexing_pipeline(document_store=document_store)
|
||||
result = pipeline.run(files=file_paths)
|
||||
assert result == {"documents_written": 2}
|
||||
|
||||
# indexing files with embeddings
|
||||
@pytest.mark.integration
|
||||
def test_indexing_files_with_embeddings(self, test_files_path):
|
||||
document_store = InMemoryDocumentStore()
|
||||
pipeline = build_indexing_pipeline(
|
||||
document_store=document_store, embedding_model="sentence-transformers/all-mpnet-base-v2"
|
||||
)
|
||||
file_paths = [test_files_path / "txt" / "doc_1.txt", test_files_path / "txt" / "doc_2.txt"]
|
||||
result = pipeline.run(files=file_paths)
|
||||
assert result == {"documents_written": 2}
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_indexing_dirs_with_embeddings(self, test_files_path):
|
||||
document_store = InMemoryDocumentStore()
|
||||
pipeline = build_indexing_pipeline(
|
||||
document_store=document_store, embedding_model="sentence-transformers/all-mpnet-base-v2"
|
||||
)
|
||||
file_paths = [test_files_path / "txt"]
|
||||
result = pipeline.run(files=file_paths)
|
||||
assert "documents_written" in result
|
||||
assert result["documents_written"] >= 3
|
||||
|
||||
# indexing multiple files
|
||||
def test_indexing_multiple_file_types(self, test_files_path):
|
||||
document_store = InMemoryDocumentStore()
|
||||
pipeline = build_indexing_pipeline(document_store=document_store)
|
||||
file_paths = [
|
||||
test_files_path / "txt" / "doc_1.txt",
|
||||
test_files_path / "txt" / "doc_2.txt",
|
||||
test_files_path / "pdf" / "sample_pdf_1.pdf",
|
||||
]
|
||||
result = pipeline.run(files=file_paths)
|
||||
# pdf gets split into 2 documents
|
||||
assert result == {"documents_written": 4}
|
||||
|
||||
# indexing empty list of files
|
||||
def test_indexing_empty_list_of_files(self):
|
||||
document_store = InMemoryDocumentStore()
|
||||
pipeline = build_indexing_pipeline(document_store=document_store)
|
||||
result = pipeline.run(files=[])
|
||||
assert result == {"documents_written": 0}
|
||||
|
||||
# embedding model is not found
|
||||
def test_embedding_model_not_found(self):
|
||||
document_store = InMemoryDocumentStore()
|
||||
with pytest.raises(ValueError, match="Could not find an embedder"):
|
||||
build_indexing_pipeline(document_store=document_store, embedding_model="invalid_model")
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_open_ai_embedding_model(self):
|
||||
document_store = InMemoryDocumentStore()
|
||||
pipe = build_indexing_pipeline(document_store=document_store, embedding_model="text-embedding-ada-002")
|
||||
# don't run the pipeline and waste credits, just check that it was created correctly
|
||||
assert pipe is not None
|
Loading…
x
Reference in New Issue
Block a user