From 008a3220231ef42f3b47b1f5bbfd00c35f32fe7f Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Mon, 4 Dec 2023 16:08:53 +0100 Subject: [PATCH] feat: Add Indexing Pipeline (#6424) * Add build_indexing_pipeline utils function * Pylint fixes * Move into another package to avoid circular deps * Revert change * Revert haystack/utils/__init__.py change * Add example * Use DocumentStore type, remove typing checks --- examples/getting_started/indexing.py | 14 ++ haystack/pipeline_utils/__init__.py | 3 +- haystack/pipeline_utils/indexing.py | 223 ++++++++++++++++++ ...-ready-made-pipeline-85c1da2f8f910f9d.yaml | 4 + test/pipelines/test_indexing_pipeline.py | 69 ++++++ 5 files changed, 312 insertions(+), 1 deletion(-) create mode 100644 examples/getting_started/indexing.py create mode 100644 haystack/pipeline_utils/indexing.py create mode 100644 releasenotes/notes/add-indexing-ready-made-pipeline-85c1da2f8f910f9d.yaml create mode 100644 test/pipelines/test_indexing_pipeline.py diff --git a/examples/getting_started/indexing.py b/examples/getting_started/indexing.py new file mode 100644 index 000000000..e54dc6c4a --- /dev/null +++ b/examples/getting_started/indexing.py @@ -0,0 +1,14 @@ +from pathlib import Path + +from haystack.document_stores import InMemoryDocumentStore +from haystack.pipeline_utils import build_indexing_pipeline + +# We support many different databases. Here we load a simple and lightweight in-memory document store. +document_store = InMemoryDocumentStore() + +# Let's now build indexing pipeline that indexes PDFs and text files from a test folder. +indexing_pipeline = build_indexing_pipeline( + document_store=document_store, embedding_model="sentence-transformers/all-mpnet-base-v2" +) +result = indexing_pipeline.run(files=list(Path("../../test/test_files").iterdir())) +print(result) diff --git a/haystack/pipeline_utils/__init__.py b/haystack/pipeline_utils/__init__.py index 027e3bf89..3d6db9968 100644 --- a/haystack/pipeline_utils/__init__.py +++ b/haystack/pipeline_utils/__init__.py @@ -1,3 +1,4 @@ from haystack.pipeline_utils.rag import build_rag_pipeline +from haystack.pipeline_utils.indexing import build_indexing_pipeline -__all__ = ["build_rag_pipeline"] +__all__ = ["build_rag_pipeline", "build_indexing_pipeline"] diff --git a/haystack/pipeline_utils/indexing.py b/haystack/pipeline_utils/indexing.py new file mode 100644 index 000000000..cc1d28946 --- /dev/null +++ b/haystack/pipeline_utils/indexing.py @@ -0,0 +1,223 @@ +import inspect +import os +import re +from pathlib import Path +from typing import Optional, List, Any, Dict +from typing import Union, Type + +from haystack.document_stores.protocol import DocumentStore + +from haystack import Pipeline +from haystack.components.converters import TextFileToDocument +from haystack.components.embedders import SentenceTransformersDocumentEmbedder, OpenAIDocumentEmbedder +from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter +from haystack.components.routers import FileTypeRouter, DocumentJoiner +from haystack.components.writers import DocumentWriter + + +def build_indexing_pipeline( + document_store: Any, + embedding_model: Optional[str] = None, + embedding_model_kwargs: Optional[Dict[str, Any]] = None, + supported_mime_types: Optional[List[str]] = None, +): + """ + Returns a prebuilt pipeline for indexing documents into a DocumentStore. Indexing pipeline automatically detects + the file type of the input files and converts them into Documents. The supported file types are: .txt, + .pdf, and .html + + Example usage: + + ```python + from haystack.utils import build_indexing_pipeline + indexing_pipe = build_indexing_pipeline(document_store=your_document_store_instance) + indexing_pipe.run(files=["path/to/file1", "path/to/file2"]) + >>> {'documents_written': 2} + ``` + + One can also pass an embedding model to the pipeline, which will then calculate embeddings for the documents + and store them in the DocumentStore. Example usage: + ```python + indexing_pipe = build_indexing_pipeline(document_store=your_document_store_instance, + embedding_model="sentence-transformers/all-mpnet-base-v2") + indexing_pipe.run(files=["path/to/file1", "path/to/file2"]) + >>> {'documents_written': 2} + ``` + + After running indexing pipeline, the documents are indexed in the DocumentStore and can be used for querying. + + + :param document_store: An instance of a DocumentStore to index documents into. + :param embedding_model: The name of the model to use for document embeddings. + :param embedding_model_kwargs: Keyword arguments to pass to the embedding model class. + :param supported_mime_types: List of MIME types to support in the pipeline. If not given, + defaults to ["text/plain", "application/pdf", "text/html"]. + + """ + return _IndexingPipeline( + document_store=document_store, + embedding_model=embedding_model, + embedding_model_kwargs=embedding_model_kwargs, + supported_mime_types=supported_mime_types, + ) + + +class _IndexingPipeline: + """ + An internal class to simplify creation of prebuilt pipeline for indexing documents into a DocumentStore. Indexing + pipeline automatically detect the file type of the input files and converts them into Documents. The supported + file types are: .txt, .pdf, and .html + """ + + def __init__( + self, + document_store: DocumentStore, + embedding_model: Optional[str] = None, + embedding_model_kwargs: Optional[Dict[str, Any]] = None, + supported_mime_types: Optional[List[str]] = None, + ): + """ + :param document_store: An instance of a DocumentStore to index documents into. + :param embedding_model: The name of the model to use for document embeddings. + :param supported_mime_types: List of MIME types to support in the pipeline. If not given, + defaults to ["text/plain", "application/pdf", "text/html"]. + """ + + if supported_mime_types is None: + supported_mime_types = ["text/plain", "application/pdf", "text/html"] + + self.pipeline = Pipeline() + self.pipeline.add_component("file_type_router", FileTypeRouter(mime_types=supported_mime_types)) + converters_used: List[str] = [] + # Add converters dynamically based on MIME types + if "text/plain" in supported_mime_types: + self.pipeline.add_component("text_file_converter", TextFileToDocument()) + self.pipeline.connect("file_type_router.text/plain", "text_file_converter.sources") + converters_used.append("text_file_converter") + + if "application/pdf" in supported_mime_types: + from haystack.components.converters import PyPDFToDocument + + self.pipeline.add_component("pdf_file_converter", PyPDFToDocument()) + self.pipeline.connect("file_type_router.application/pdf", "pdf_file_converter.sources") + converters_used.append("pdf_file_converter") + + if "text/html" in supported_mime_types: + from haystack.components.converters import HTMLToDocument + + self.pipeline.add_component("html_file_converter", HTMLToDocument()) + self.pipeline.connect("file_type_router.text/html", "html_file_converter.sources") + converters_used.append("html_file_converter") + + # Add remaining common components + self.pipeline.add_component("document_joiner", DocumentJoiner()) + self.pipeline.add_component("document_cleaner", DocumentCleaner()) + self.pipeline.add_component("document_splitter", DocumentSplitter()) + + # Connect converters to joiner, if they exist + for converter_name in converters_used: + self.pipeline.connect(f"{converter_name}.documents", "document_joiner.documents") + + # Connect joiner to cleaner and splitter + self.pipeline.connect("document_joiner.documents", "document_cleaner.documents") + self.pipeline.connect("document_cleaner.documents", "document_splitter.documents") + + if embedding_model: + embedder_instance = self._find_embedder(embedding_model, embedding_model_kwargs) + self.pipeline.add_component("storage_sink", DocumentWriter(document_store=document_store)) + self.pipeline.add_component("writer", embedder_instance) + self.pipeline.connect("writer", "storage_sink") + else: + self.pipeline.add_component("writer", DocumentWriter(document_store=document_store)) + + self.pipeline.connect("document_splitter.documents", "writer.documents") + + # this is more of a sanity check for the maintainer of the pipeline, to make sure that the pipeline is + # configured correctly + if len(self.pipeline.inputs()) < 1: + raise RuntimeError("IndexingPipeline needs at least one input component.") + if len(self.pipeline.outputs()) < 1: + raise RuntimeError("IndexingPipeline needs at least one output component.") + + def run(self, files: List[Union[str, Path]]) -> Dict[str, Any]: + """ + Performs indexing of the given list of documents into the DocumentStore. + :param files: A list of paths to files to index. + :type files: List[Union[str, Path]] + + :return: the output of the pipeline run, which is a dictionary containing the number of documents written + """ + if not files: + return {"documents_written": 0} + input_files = self._process_files(files) + pipeline_output = self.pipeline.run(data={"file_type_router": {"sources": input_files}}) + aggregated_results = {} + # combine the results of all outputs into one dictionary + for component_result in pipeline_output.values(): + aggregated_results.update(component_result) + return aggregated_results + + def _find_embedder(self, embedding_model: str, init_kwargs: Optional[Dict[str, Any]] = None) -> Any: + embedder_patterns = { + r"^text-embedding.*": OpenAIDocumentEmbedder, + r"^sentence-transformers.*": SentenceTransformersDocumentEmbedder, + # add more patterns or adjust them here + } + embedder_class = next((val for pat, val in embedder_patterns.items() if re.match(pat, embedding_model)), None) + if not embedder_class: + raise ValueError( + f"Could not find an embedder for the given embedding model name {embedding_model}. " + f"Please provide a valid embedding model name. " + f"Valid embedder classes are {embedder_patterns.values()}." + ) + return self._create_embedder(embedder_class, embedding_model, init_kwargs) + + def _create_embedder( + self, embedder_class: Type, model_name: str, init_kwargs: Optional[Dict[str, Any]] = None + ) -> Any: + init_signature = inspect.signature(embedder_class.__init__) + + kwargs = {**(init_kwargs or {})} + + # Determine the correct parameter name and set it + if "model_name_or_path" in init_signature.parameters: + kwargs["model_name_or_path"] = model_name + elif "model_name" in init_signature.parameters: + kwargs["model_name"] = model_name + else: + raise ValueError(f"Could not find a parameter for the model name in the embedder class {embedder_class}") + + # Instantiate the class + return embedder_class(**kwargs) + + def _list_files_recursively(self, path: Union[str, Path]) -> List[str]: + """ + List all files in a directory recursively as a list of strings, or return the file itself + if it's not a directory. + :param path: the path to list files from + :type path: Union[str, Path] + :return: a list of strings, where each string is a path to a file + """ + + if os.path.isfile(path): + return [str(path)] + elif os.path.isdir(path): + file_list: List[str] = [] + for root, _, files in os.walk(path): + for file in files: + file_list.append(os.path.join(root, file)) + return file_list + else: + return [] + + def _process_files(self, files: List[Union[str, Path]]) -> List[str]: + """ + Process a list of files and directories, listing all files recursively and removing duplicates. + :param files: A list of files and directories to process. + :type files: List[Union[str, Path]] + :return: A list of unique files. + """ + nested_file_lists = [self._list_files_recursively(file) for file in files] + combined_files = [item for sublist in nested_file_lists for item in sublist] + unique_files = list(set(combined_files)) + return unique_files diff --git a/releasenotes/notes/add-indexing-ready-made-pipeline-85c1da2f8f910f9d.yaml b/releasenotes/notes/add-indexing-ready-made-pipeline-85c1da2f8f910f9d.yaml new file mode 100644 index 000000000..2e4e0d25c --- /dev/null +++ b/releasenotes/notes/add-indexing-ready-made-pipeline-85c1da2f8f910f9d.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + Add a indexing `build_indexing_pipeline` utility function diff --git a/test/pipelines/test_indexing_pipeline.py b/test/pipelines/test_indexing_pipeline.py new file mode 100644 index 000000000..4d3b57b8c --- /dev/null +++ b/test/pipelines/test_indexing_pipeline.py @@ -0,0 +1,69 @@ +import pytest + +from haystack.pipeline_utils.indexing import build_indexing_pipeline +from haystack.document_stores import InMemoryDocumentStore + + +class TestIndexingPipeline: + # indexing files without embeddings + def test_indexing_files_without_embeddings(self, test_files_path): + file_paths = [test_files_path / "txt" / "doc_1.txt", test_files_path / "txt" / "doc_2.txt"] + document_store = InMemoryDocumentStore() + pipeline = build_indexing_pipeline(document_store=document_store) + result = pipeline.run(files=file_paths) + assert result == {"documents_written": 2} + + # indexing files with embeddings + @pytest.mark.integration + def test_indexing_files_with_embeddings(self, test_files_path): + document_store = InMemoryDocumentStore() + pipeline = build_indexing_pipeline( + document_store=document_store, embedding_model="sentence-transformers/all-mpnet-base-v2" + ) + file_paths = [test_files_path / "txt" / "doc_1.txt", test_files_path / "txt" / "doc_2.txt"] + result = pipeline.run(files=file_paths) + assert result == {"documents_written": 2} + + @pytest.mark.integration + def test_indexing_dirs_with_embeddings(self, test_files_path): + document_store = InMemoryDocumentStore() + pipeline = build_indexing_pipeline( + document_store=document_store, embedding_model="sentence-transformers/all-mpnet-base-v2" + ) + file_paths = [test_files_path / "txt"] + result = pipeline.run(files=file_paths) + assert "documents_written" in result + assert result["documents_written"] >= 3 + + # indexing multiple files + def test_indexing_multiple_file_types(self, test_files_path): + document_store = InMemoryDocumentStore() + pipeline = build_indexing_pipeline(document_store=document_store) + file_paths = [ + test_files_path / "txt" / "doc_1.txt", + test_files_path / "txt" / "doc_2.txt", + test_files_path / "pdf" / "sample_pdf_1.pdf", + ] + result = pipeline.run(files=file_paths) + # pdf gets split into 2 documents + assert result == {"documents_written": 4} + + # indexing empty list of files + def test_indexing_empty_list_of_files(self): + document_store = InMemoryDocumentStore() + pipeline = build_indexing_pipeline(document_store=document_store) + result = pipeline.run(files=[]) + assert result == {"documents_written": 0} + + # embedding model is not found + def test_embedding_model_not_found(self): + document_store = InMemoryDocumentStore() + with pytest.raises(ValueError, match="Could not find an embedder"): + build_indexing_pipeline(document_store=document_store, embedding_model="invalid_model") + + @pytest.mark.integration + def test_open_ai_embedding_model(self): + document_store = InMemoryDocumentStore() + pipe = build_indexing_pipeline(document_store=document_store, embedding_model="text-embedding-ada-002") + # don't run the pipeline and waste credits, just check that it was created correctly + assert pipe is not None