diff --git a/examples/getting_started/rag.py b/examples/getting_started/rag.py deleted file mode 100644 index d64ee7073..000000000 --- a/examples/getting_started/rag.py +++ /dev/null @@ -1,22 +0,0 @@ -import os -from haystack import Document -from haystack.document_stores.in_memory import InMemoryDocumentStore -from haystack.pipeline_utils import build_rag_pipeline - -API_KEY = "SET YOUR OPENAI API KEY HERE" - -# We support many different databases. Here we load a simple and lightweight in-memory document store. -document_store = InMemoryDocumentStore() - -# Create some example documents and add them to the document store. -documents = [ - Document(content="My name is Jean and I live in Paris."), - Document(content="My name is Mark and I live in Berlin."), - Document(content="My name is Giorgio and I live in Rome."), -] -document_store.write_documents(documents) - -# Let's now build a simple RAG pipeline that uses a generative model to answer questions. -rag_pipeline = build_rag_pipeline(llm_api_key=API_KEY, document_store=document_store) -answers = rag_pipeline.run(query="Who lives in Rome?") -print(answers.data) diff --git a/examples/getting_started/rag_custom_data.py b/examples/getting_started/rag_custom_data.py deleted file mode 100644 index 904a45a42..000000000 --- a/examples/getting_started/rag_custom_data.py +++ /dev/null @@ -1,34 +0,0 @@ -from haystack.document_stores.in_memory import InMemoryDocumentStore -from haystack.pipeline_utils import build_rag_pipeline, build_indexing_pipeline -from haystack.pipeline_utils.indexing import download_files - -# We are model agnostic :) In this getting started you can choose any OpenAI or Huggingface TGI generation model -generation_model = "gpt-3.5-turbo" -API_KEY = "sk-..." # ADD YOUR KEY HERE - -# We support many different databases. Here, we load a simple and lightweight in-memory database. -document_store = InMemoryDocumentStore() - -# Download example files from web -files = download_files(sources=["http://www.paulgraham.com/superlinear.html"]) - -# Pipelines are our main abstratcion. -# Here we create a pipeline that can index TXT and HTML. You can also use your own private files. -indexing_pipeline = build_indexing_pipeline( - document_store=document_store, - embedding_model="intfloat/e5-base-v2", - supported_mime_types=["text/plain", "text/html"], # "application/pdf" -) -indexing_pipeline.run(files=files) # you can also supply files=[path_to_directory], which is searched recursively - -# RAG pipeline with vector-based retriever + LLM -rag_pipeline = build_rag_pipeline( - document_store=document_store, - embedding_model="intfloat/e5-base-v2", - generation_model=generation_model, - llm_api_key=API_KEY, -) - -# For details, like which documents were used to generate the answer, look into the result object -result = rag_pipeline.run(query="What are superlinear returns and why are they important?") -print(result.data) diff --git a/haystack/pipeline_utils/__init__.py b/haystack/pipeline_utils/__init__.py deleted file mode 100644 index 3d6db9968..000000000 --- a/haystack/pipeline_utils/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from haystack.pipeline_utils.rag import build_rag_pipeline -from haystack.pipeline_utils.indexing import build_indexing_pipeline - -__all__ = ["build_rag_pipeline", "build_indexing_pipeline"] diff --git a/haystack/pipeline_utils/indexing.py b/haystack/pipeline_utils/indexing.py deleted file mode 100644 index bccdf526a..000000000 --- a/haystack/pipeline_utils/indexing.py +++ /dev/null @@ -1,235 +0,0 @@ -import os -import re -from pathlib import Path -from tempfile import NamedTemporaryFile -from typing import Optional, List, Any, Dict -from typing import Union, Type - -from haystack import Pipeline -from haystack.components.converters import TextFileToDocument -from haystack.components.embedders import SentenceTransformersDocumentEmbedder, OpenAIDocumentEmbedder -from haystack.components.fetchers import LinkContentFetcher -from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter -from haystack.components.routers import FileTypeRouter -from haystack.components.joiners import DocumentJoiner -from haystack.components.writers import DocumentWriter -from haystack.document_stores.types import DocumentStore - - -def download_files(sources: List[str]) -> List[str]: - """ - Downloads a list of files from the web and returns a list of their paths where they are stored locally. - :param sources: A list of URLs to download. - :type sources: List[str] - :return: A list of paths to the downloaded files. - """ - - fetcher = LinkContentFetcher(user_agents=["Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"]) - streams = fetcher.run(urls=sources) - - all_files = [] - for stream in streams["streams"]: - file_suffix = ".html" if stream.meta["content_type"] == "text/html" else ".pdf" - f = NamedTemporaryFile(delete=False, suffix=file_suffix) - stream.to_file(Path(f.name)) - all_files.append(f.name) - - return all_files - - -def build_indexing_pipeline( - document_store: Any, - embedding_model: str = "intfloat/e5-base-v2", - embedding_model_kwargs: Optional[Dict[str, Any]] = None, - supported_mime_types: Optional[List[str]] = None, -): - """ - Returns a prebuilt pipeline for indexing documents into a DocumentStore. Indexing pipeline automatically detects - the file type of the input files and converts them into Documents. The supported file types are: .txt, - .pdf, and .html but by default only .txt and .html are indexed unless the supported_mime_types parameter is set. - - Example usage: - - ```python - from haystack.utils import build_indexing_pipeline - indexing_pipe = build_indexing_pipeline(document_store=your_document_store_instance) - indexing_pipe.run(files=["path/to/file1", "path/to/file2"]) - >>> {'documents_written': 2} - ``` - - One can also pass an embedding model to the pipeline, which will then calculate embeddings for the documents - and store them in the DocumentStore. Example usage: - ```python - indexing_pipe = build_indexing_pipeline(document_store=your_document_store_instance, - embedding_model="sentence-transformers/all-mpnet-base-v2") - indexing_pipe.run(files=["path/to/file1", "path/to/file2"]) - >>> {'documents_written': 2} - ``` - - After running indexing pipeline, the documents are indexed in the DocumentStore and can be used for querying. - - - :param document_store: An instance of a DocumentStore to index documents into. - :param embedding_model: The name of the model to use for document embeddings. - Needs top be compatible with SentenceTransformers - :param embedding_model_kwargs: Keyword arguments to pass to the embedding model class. - :param supported_mime_types: List of MIME types to support in the pipeline. If not given, - defaults to ["text/plain", "text/html"]. - - """ - return _IndexingPipeline( - document_store=document_store, - embedding_model=embedding_model, - embedding_model_kwargs=embedding_model_kwargs, - supported_mime_types=supported_mime_types, - ) - - -class _IndexingPipeline: - """ - An internal class to simplify creation of prebuilt pipeline for indexing documents into a DocumentStore. Indexing - pipeline automatically detect the file type of the input files and converts them into Documents. The supported - file types are: .txt, .pdf, and .html - """ - - def __init__( - self, - document_store: DocumentStore, - embedding_model: Optional[str] = None, - embedding_model_kwargs: Optional[Dict[str, Any]] = None, - supported_mime_types: Optional[List[str]] = None, - ): - """ - :param document_store: An instance of a DocumentStore to index documents into. - :param embedding_model: The name of the model to use for document embeddings. - :param supported_mime_types: List of MIME types to support in the pipeline. If not given, - defaults to ["text/plain", "application/pdf", "text/html"]. - """ - - if supported_mime_types is None: - supported_mime_types = ["text/plain", "text/html"] - - self.pipeline = Pipeline() - self.pipeline.add_component("file_type_router", FileTypeRouter(mime_types=supported_mime_types)) - converters_used: List[str] = [] - # Add converters dynamically based on MIME types - if "text/plain" in supported_mime_types: - self.pipeline.add_component("text_file_converter", TextFileToDocument()) - self.pipeline.connect("file_type_router.text/plain", "text_file_converter.sources") - converters_used.append("text_file_converter") - - if "application/pdf" in supported_mime_types: - from haystack.components.converters import PyPDFToDocument - - self.pipeline.add_component("pdf_file_converter", PyPDFToDocument()) - self.pipeline.connect("file_type_router.application/pdf", "pdf_file_converter.sources") - converters_used.append("pdf_file_converter") - - if "text/html" in supported_mime_types: - from haystack.components.converters import HTMLToDocument - - self.pipeline.add_component("html_file_converter", HTMLToDocument()) - self.pipeline.connect("file_type_router.text/html", "html_file_converter.sources") - converters_used.append("html_file_converter") - - # Add remaining common components - self.pipeline.add_component("document_joiner", DocumentJoiner()) - self.pipeline.add_component("document_cleaner", DocumentCleaner()) - self.pipeline.add_component("document_splitter", DocumentSplitter()) - - # Connect converters to joiner, if they exist - for converter_name in converters_used: - self.pipeline.connect(f"{converter_name}.documents", "document_joiner.documents") - - # Connect joiner to cleaner and splitter - self.pipeline.connect("document_joiner.documents", "document_cleaner.documents") - self.pipeline.connect("document_cleaner.documents", "document_splitter.documents") - - if embedding_model: - embedder_instance = self._find_embedder(embedding_model, embedding_model_kwargs) - self.pipeline.add_component("storage_sink", DocumentWriter(document_store=document_store)) - self.pipeline.add_component("writer", embedder_instance) - self.pipeline.connect("writer", "storage_sink") - else: - self.pipeline.add_component("writer", DocumentWriter(document_store=document_store)) - - self.pipeline.connect("document_splitter.documents", "writer.documents") - - # this is more of a sanity check for the maintainer of the pipeline, to make sure that the pipeline is - # configured correctly - if len(self.pipeline.inputs()) < 1: - raise RuntimeError("IndexingPipeline needs at least one input component.") - if len(self.pipeline.outputs()) < 1: - raise RuntimeError("IndexingPipeline needs at least one output component.") - - def run(self, files: List[Union[str, Path]]) -> Dict[str, Any]: - """ - Performs indexing of the given list of documents into the DocumentStore. - :param files: A list of paths to files to index. - :type files: List[Union[str, Path]] - - :return: the output of the pipeline run, which is a dictionary containing the number of documents written - """ - if not files: - return {"documents_written": 0} - input_files = self._process_files(files) - pipeline_output = self.pipeline.run(data={"file_type_router": {"sources": input_files}}) - aggregated_results = {} - # combine the results of all outputs into one dictionary - for component_result in pipeline_output.values(): - aggregated_results.update(component_result) - return aggregated_results - - def _find_embedder(self, embedding_model: str, init_kwargs: Optional[Dict[str, Any]] = None) -> Any: - embedder_patterns = { - r"^text-embedding.*": OpenAIDocumentEmbedder, - r"^sentence-transformers.*": SentenceTransformersDocumentEmbedder, - r"^intfloat/.*": SentenceTransformersDocumentEmbedder, - # add more patterns or adjust them here - } - embedder_class = next((val for pat, val in embedder_patterns.items() if re.match(pat, embedding_model)), None) - if not embedder_class: - raise ValueError( - f"Could not find an embedder for the given embedding model name {embedding_model}. " - f"Please provide a valid embedding model name. " - f"Valid embedder classes are {embedder_patterns.values()}." - ) - return self._create_embedder(embedder_class, embedding_model, init_kwargs) - - def _create_embedder(self, embedder_class: Type, model: str, init_kwargs: Optional[Dict[str, Any]] = None) -> Any: - # Note: here we assume the embedder accepts a parameter called `model` that takes the model's name or path. - # See https://github.com/deepset-ai/haystack/issues/6534 - kwargs = {**(init_kwargs or {}), "model": model} - return embedder_class(**kwargs) - - def _list_files_recursively(self, path: Union[str, Path]) -> List[str]: - """ - List all files in a directory recursively as a list of strings, or return the file itself - if it's not a directory. - :param path: the path to list files from - :type path: Union[str, Path] - :return: a list of strings, where each string is a path to a file - """ - - if os.path.isfile(path): - return [str(path)] - elif os.path.isdir(path): - file_list: List[str] = [] - for root, _, files in os.walk(path): - for file in files: - file_list.append(os.path.join(root, file)) - return file_list - else: - return [] - - def _process_files(self, files: List[Union[str, Path]]) -> List[str]: - """ - Process a list of files and directories, listing all files recursively and removing duplicates. - :param files: A list of files and directories to process. - :type files: List[Union[str, Path]] - :return: A list of unique files. - """ - nested_file_lists = [self._list_files_recursively(file) for file in files] - combined_files = [item for sublist in nested_file_lists for item in sublist] - unique_files = list(set(combined_files)) - return unique_files diff --git a/haystack/pipeline_utils/rag.py b/haystack/pipeline_utils/rag.py deleted file mode 100644 index fda4afcb6..000000000 --- a/haystack/pipeline_utils/rag.py +++ /dev/null @@ -1,204 +0,0 @@ -import re -from abc import ABC, abstractmethod -from typing import Optional, Any - -from huggingface_hub import HfApi - -from haystack import Pipeline -from haystack.components.builders.answer_builder import AnswerBuilder -from haystack.components.builders.prompt_builder import PromptBuilder -from haystack.components.embedders import SentenceTransformersTextEmbedder -from haystack.components.generators import OpenAIGenerator, HuggingFaceTGIGenerator -from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever -from haystack.dataclasses import Answer -from haystack.document_stores.types import DocumentStore -from haystack.document_stores.in_memory import InMemoryDocumentStore - - -def build_rag_pipeline( - document_store: DocumentStore, - embedding_model: str = "intfloat/e5-base-v2", - generation_model: str = "gpt-3.5-turbo", - llm_api_key: Optional[str] = None, - prompt_template: Optional[str] = None, -): - """ - Returns a prebuilt pipeline to perform retrieval augmented generation - :param document_store: An instance of a DocumentStore to read from. - :param embedding_model: The name of the model to use for embedding. Only SentenceTransformer models supported in this getting started code. - :param prompt_template: The template to use for the prompt. If not given, a default RAG template is used. - :param generation_model: The name of the model to use for generation. - Currently supporting: OpenAI generation models and Huggingface TGI models for text generation - :param llm_api_key: The API key to use for the OpenAI Language Model. If not given, the value of the - llm_api_key will be attempted to be read from the environment variable OPENAI_API_KEY. - """ - # Resolve components based on the chosen parameters - retriever = resolve_retriever(document_store) - embedder = resolve_embedder(embedding_model) - generator = resolve_generator(generation_model, llm_api_key) - prompt_template = resolve_prompt_template(prompt_template) - - # Add them to the Pipeline and connect them - pipeline = _RAGPipeline( - retriever=retriever, embedder=embedder, generator=generator, prompt_template=prompt_template - ) - return pipeline - - -class _RAGPipeline: - """ - A simple ready-made pipeline for RAG. It requires a populated document store. - """ - - def __init__(self, retriever: Any, embedder: Any, generator: Any, prompt_template: str): - """ - Initializes the pipeline. - :param retriever: The retriever to use. - :param embedder: The embedder to use. - :param generator: The generator to use. - :param prompt_template: The template to use for the prompt. - """ - self.pipeline = Pipeline() - self.pipeline.add_component(instance=embedder, name="text_embedder") - self.pipeline.add_component(instance=retriever, name="retriever") - self.pipeline.add_component(instance=PromptBuilder(template=prompt_template), name="prompt_builder") - self.pipeline.add_component(instance=generator, name="llm") - self.pipeline.add_component(instance=AnswerBuilder(), name="answer_builder") - self.pipeline.connect("text_embedder", "retriever") - self.pipeline.connect("retriever", "prompt_builder.documents") - self.pipeline.connect("prompt_builder.prompt", "llm.prompt") - self.pipeline.connect("llm.replies", "answer_builder.replies") - self.pipeline.connect("llm.meta", "answer_builder.meta") - self.pipeline.connect("retriever", "answer_builder.documents") - - def run(self, query: str) -> Answer: - """ - Performs RAG using the given query. - - :param query: The query to ask. - :return: An Answer object. - """ - run_values = { - "prompt_builder": {"question": query}, - "answer_builder": {"query": query}, - "text_embedder": {"text": query}, - } - return self.pipeline.run(run_values)["answer_builder"]["answers"][0] - - -def resolve_embedder(embedding_model: str) -> SentenceTransformersTextEmbedder: - """ - Resolves the embedder - :param embedding_model: The embedding model to use. - """ - try: - embedder = SentenceTransformersTextEmbedder(model=embedding_model) - except Exception: - raise ValueError( - f"Embedding model: {embedding_model} is not supported. Please provide a SentenceTransformers model." - f"You can download the models through the huggingface model hub here: https://huggingface.co/sentence-transformers" - ) - return embedder - - -def resolve_retriever(document_store, retriever_class: Optional[str] = None) -> Optional[Any]: - """ - Resolves the retriever class to use for the given document store. - :param document_store: The document store to use. - :param retriever_class: The retriever class to use. If not given, it will be inferred from the document store. - """ - # first match the document store to the retriever - # TODO: add more retrievers - embedding_retriever_map = {InMemoryDocumentStore: InMemoryEmbeddingRetriever} - - retriever_clazz = ( - retriever_class or embedding_retriever_map[type(document_store)] - if type(document_store) in embedding_retriever_map - else None - ) - if not retriever_clazz: - raise ValueError( - f"Document store {type(document_store)} is not supported. Please provide a retriever class or use " - f"one of the following document stores: {list(embedding_retriever_map.keys())}" - ) - - retriever = retriever_clazz(document_store=document_store) # type: ignore - return retriever - - -def resolve_generator(generation_model: str, llm_api_key: Optional[str] = None) -> Optional[Any]: - """ - Resolves the generator to use for the given generation model. - :param generation_model: The generation model to use. - :param llm_api_key: The API key to use for the language model. - """ - generator = None - for resolver_clazz in _GeneratorResolver.get_resolvers(): - resolver = resolver_clazz() - generator = resolver.resolve(generation_model, llm_api_key) - if generator: - break - if not generator: - raise ValueError(f"Could not resolve LLM generator for the given model {generation_model}") - return generator - - -def resolve_prompt_template(prompt_template: Optional[str]) -> str: - prompt_template = ( - prompt_template - or """ - Given these documents, answer the question. - - Documents: - {% for doc in documents %} - {{ doc.content }} - {% endfor %} - - Question: {{question}} - - Answer: - """ - ) - return prompt_template - - -class _GeneratorResolver(ABC): - _resolvers = [] # type: ignore - - def __init_subclass__(cls, **kwargs): - super().__init_subclass__(**kwargs) - _GeneratorResolver._resolvers.append(cls) - - @abstractmethod - def resolve(self, model_key: str, api_key: str) -> Any: - pass - - @classmethod - def get_resolvers(cls): - return cls._resolvers - - -class _OpenAIResolved(_GeneratorResolver): - """ - Resolves the OpenAIGenerator. - """ - - def resolve(self, model_key: str, api_key: str) -> Any: - # does the model_key match the pattern OpenAI GPT pattern? - if re.match(r"^gpt-4-.*", model_key) or re.match(r"^gpt-3.5-.*", model_key): - return OpenAIGenerator(model=model_key, api_key=api_key) - return None - - -class _HuggingFaceTGIGeneratorResolved(_GeneratorResolver): - """ - Resolves the HuggingFaceTGIGenerator. - """ - - def resolve(self, model_key: str, api_key: str) -> Any: - hf = HfApi() - try: - hf.model_info(model_key) - return HuggingFaceTGIGenerator(model=model_key, token=api_key, generation_kwargs={"max_new_tokens": 1024}) - except Exception: - return None diff --git a/test/pipelines/test_indexing_pipeline.py b/test/pipelines/test_indexing_pipeline.py deleted file mode 100644 index ea9b33025..000000000 --- a/test/pipelines/test_indexing_pipeline.py +++ /dev/null @@ -1,76 +0,0 @@ -import os - -import pytest - -from haystack.pipeline_utils.indexing import build_indexing_pipeline -from haystack.document_stores.in_memory import InMemoryDocumentStore - - -class TestIndexingPipeline: - # indexing files without embeddings - @pytest.mark.integration - def test_indexing_files_without_embeddings(self, test_files_path): - file_paths = [test_files_path / "txt" / "doc_1.txt", test_files_path / "txt" / "doc_2.txt"] - document_store = InMemoryDocumentStore() - pipeline = build_indexing_pipeline(document_store=document_store) - result = pipeline.run(files=file_paths) - assert result == {"documents_written": 2} - - # indexing files with embeddings - @pytest.mark.integration - def test_indexing_files_with_embeddings(self, test_files_path): - document_store = InMemoryDocumentStore() - pipeline = build_indexing_pipeline( - document_store=document_store, embedding_model="sentence-transformers/all-mpnet-base-v2" - ) - file_paths = [test_files_path / "txt" / "doc_1.txt", test_files_path / "txt" / "doc_2.txt"] - result = pipeline.run(files=file_paths) - assert result == {"documents_written": 2} - - @pytest.mark.integration - def test_indexing_dirs_with_embeddings(self, test_files_path): - document_store = InMemoryDocumentStore() - pipeline = build_indexing_pipeline( - document_store=document_store, embedding_model="sentence-transformers/all-mpnet-base-v2" - ) - file_paths = [test_files_path / "txt"] - result = pipeline.run(files=file_paths) - assert "documents_written" in result - assert result["documents_written"] >= 3 - - # indexing multiple files - @pytest.mark.integration - def test_indexing_multiple_file_types(self, test_files_path): - document_store = InMemoryDocumentStore() - pipeline = build_indexing_pipeline( - document_store=document_store, supported_mime_types=["text/plain", "application/pdf"] - ) - file_paths = [ - test_files_path / "txt" / "doc_1.txt", - test_files_path / "txt" / "doc_2.txt", - test_files_path / "pdf" / "sample_pdf_1.pdf", - ] - result = pipeline.run(files=file_paths) - # pdf gets split into 2 documents - assert result == {"documents_written": 4} - - # indexing empty list of files - def test_indexing_empty_list_of_files(self): - document_store = InMemoryDocumentStore() - pipeline = build_indexing_pipeline(document_store=document_store) - result = pipeline.run(files=[]) - assert result == {"documents_written": 0} - - # embedding model is not found - def test_embedding_model_not_found(self): - document_store = InMemoryDocumentStore() - with pytest.raises(ValueError, match="Could not find an embedder"): - build_indexing_pipeline(document_store=document_store, embedding_model="invalid_model") - - @pytest.mark.skipif(os.environ.get("OPENAI_API_KEY", "") == "", reason="OPENAI_API_KEY is not set") - @pytest.mark.integration - def test_open_ai_embedding_model(self): - document_store = InMemoryDocumentStore() - pipe = build_indexing_pipeline(document_store=document_store, embedding_model="text-embedding-ada-002") - # don't run the pipeline and waste credits, just check that it was created correctly - assert pipe is not None diff --git a/test/pipelines/test_rag_pipelines.py b/test/pipelines/test_rag_pipelines.py deleted file mode 100644 index 2cd2027dd..000000000 --- a/test/pipelines/test_rag_pipelines.py +++ /dev/null @@ -1,29 +0,0 @@ -import os - -import pytest - -from haystack.dataclasses import Answer -from haystack.document_stores.in_memory import InMemoryDocumentStore -from haystack.pipeline_utils.rag import build_rag_pipeline -from haystack.testing.factory import document_store_class - - -@pytest.mark.skipif(os.environ.get("OPENAI_API_KEY", "") == "", reason="OPENAI_API_KEY is not set") -@pytest.mark.integration -def test_rag_pipeline(mock_chat_completion): - rag_pipe = build_rag_pipeline(document_store=InMemoryDocumentStore()) - answer = rag_pipe.run(query="question") - assert isinstance(answer, Answer) - - -def test_rag_pipeline_other_docstore(): - FakeStore = document_store_class("FakeStore") - with pytest.raises(ValueError, match="InMemoryDocumentStore"): - assert build_rag_pipeline(document_store=FakeStore()) - - -def test_rag_pipeline_embedder_exist_if_model_is_given(): - rag_pipe = build_rag_pipeline( - document_store=InMemoryDocumentStore(), embedding_model="sentence-transformers/all-mpnet-base-v2" - ) - assert "text_embedder" in rag_pipe.pipeline.graph.nodes