feat: Add PipelineTemplate for ready made pipelines (#7001)

* Initial working version

* More experiments

* Use jinja2 tempplate extensions

* Remove for_type static method

* Add ternary filter, with_kwargs

* Add indexing pipeline

* Rename with_kwargs -> with_parameters, allow cummulative invocation

* Remove ternary filter, use another approach to conditionals in default component configuration

* Massi feedback, add pydocs

* PipelineTemplateBuilder -> PipelineTemplate

* Add unit tests

* Add reno note

* Fix pylint, mypy

* PR review - Madeesh

* Add list_variables

* PR review feedback - Madeesh

* Update haystack/templates/pipelines.py

Co-authored-by: Madeesh Kannan <shadeMe@users.noreply.github.com>

* Add timeout, fix pylint

* Use full import path

* Update release note

* Fix failing windows test

* Update haystack/templates/source.py

Co-authored-by: Madeesh Kannan <shadeMe@users.noreply.github.com>

* Update haystack/templates/source.py

Co-authored-by: Madeesh Kannan <shadeMe@users.noreply.github.com>

* PR feedback Madeesh

* Fix pylint

---------

Co-authored-by: Madeesh Kannan <shadeMe@users.noreply.github.com>
This commit is contained in:
Vladimir Blagojevic 2024-02-19 13:59:48 +01:00 committed by GitHub
parent 29acffca1d
commit cb01cb4207
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 628 additions and 0 deletions

View File

@ -0,0 +1,4 @@
from haystack.templates.pipelines import PipelineTemplate
from haystack.templates.source import PredefinedTemplate, TemplateSource
__all__ = ["PipelineTemplate", "TemplateSource", "PredefinedTemplate"]

View File

@ -0,0 +1,104 @@
components:
cleaner: {{ cleaner | default({
"init_parameters": {
"remove_empty_lines": true,
"remove_extra_whitespaces": true,
"remove_regex": None,
"remove_repeated_substrings": false,
"remove_substrings": None
},
"type": "haystack.components.preprocessors.document_cleaner.DocumentCleaner"
}, true) | tojson }}
embedder: {{ embedder | default({
"init_parameters": {
"batch_size": 32,
"device": "cpu",
"embedding_separator": "\\n",
"model": "sentence-transformers/all-MiniLM-L6-v2",
"normalize_embeddings": false,
"prefix": "",
"progress_bar": true,
"suffix": "",
"token": None
},
"type": "haystack.components.embedders.sentence_transformers_document_embedder.SentenceTransformersDocumentEmbedder"
}, true) | tojson }}
# FileTypeRouter is used to route different file types to different file converters
# The default mime types are set to text/plain. If we'll handle PDF files, we'll add application/pdf to mime types
# Let's configure that part dynamically based on the use_pdf_file_converter flag
{% set default_mime_types = ["text/plain"] %}
{% set additional_mime_types = ["application/pdf"] if use_pdf_file_converter | default(false) else [] %}
{% set file_type_router_mime_types = default_mime_types + additional_mime_types %}
file_type_router: {{ file_type_router | default({
"init_parameters": {
"mime_types": file_type_router_mime_types,
},
"type": "haystack.components.routers.file_type_router.FileTypeRouter"
}, true) | tojson }}
doc_joiner: {{ doc_joiner | default({
"init_parameters": {
"join_mode": "concatenate"
},
"type": "haystack.components.joiners.document_joiner.DocumentJoiner"
}, true) | tojson }}
splitter: {{ splitter | default({
"init_parameters": {
"split_by": "sentence",
"split_length": 250,
"split_overlap": 30
},
"type": "haystack.components.preprocessors.document_splitter.DocumentSplitter"
}, true) | tojson }}
{% if use_pdf_file_converter %}
pdf_file_converter: {{ pdf_file_converter | default({
"init_parameters": {
"converter_name": "default"
},
"type": "haystack.components.converters.pypdf.PyPDFToDocument"
}, true) | tojson }}
{% endif %}
text_file_converter: {{ text_file_converter | default({
"init_parameters": {
"encoding": "utf-8"
},
"type": "haystack.components.converters.txt.TextFileToDocument"
}, true) | tojson }}
writer: {{ writer | default({
"init_parameters": {
"document_store": {
"init_parameters": {
"bm25_algorithm": "BM25Okapi",
"bm25_parameters": {},
"bm25_tokenization_regex": "(?u)\\b\\w\\w+\\b",
"embedding_similarity_function": "dot_product"
},
"type": "haystack.document_stores.in_memory.document_store.InMemoryDocumentStore"
},
"policy": "FAIL"
},
"type": "haystack.components.writers.document_writer.DocumentWriter"
}, true) | tojson }}
connections:
- receiver: text_file_converter.sources
sender: file_type_router.text/plain
- receiver: doc_joiner.documents
sender: text_file_converter.documents
{% if use_pdf_file_converter %}
- receiver: pdf_file_converter.sources
sender: file_type_router.application/pdf
- receiver: doc_joiner.documents
sender: pdf_file_converter.documents
{% endif %}
- receiver: cleaner.documents
sender: doc_joiner.documents
- receiver: splitter.documents
sender: cleaner.documents
- receiver: embedder.documents
sender: splitter.documents
- receiver: writer.documents
sender: embedder.documents
max_loops_allowed: 5
metadata: {}

View File

@ -0,0 +1,149 @@
from typing import Dict, Any, Set, Optional
import yaml
from jinja2 import meta, TemplateSyntaxError
from jinja2.nativetypes import NativeEnvironment
from haystack import Pipeline
from haystack.core.component import Component
from haystack.core.errors import PipelineValidationError
from haystack.core.serialization import component_to_dict
from haystack.templates.source import TemplateSource
class PipelineTemplate:
"""
The PipelineTemplate class enables the straightforward creation of flexible and configurable pipelines using
Jinja2 templated YAML files. Specifically designed to simplify the setup of complex data processing pipelines for
a range of NLP tasksincluding question answering, retriever augmented generation (RAG), document indexing, among
others - PipelineTemplate empowers users to dynamically generate pipeline configurations from templates and
customize components as necessary. Its design philosophy centers on providing an accessible, yet powerful, tool
for constructing pipelines that accommodate both common use cases and specialized requirements with ease.
The class enables two primary use cases:
1. Building a pipeline directly using all default components specified in a predefined or custom template.
2. Customizing pipelines by overriding default components with custom component settings, integrating user-provided
component instances, and adjusting component parameters conditionally.
Examples of usage:
- **Default Build**: Instantiating a pipeline with default settings for a "question answering" (qa) task.
```python
from haystack.templates import PipelineTemplate, TemplateSource, PredefinedTemplate
# Create a pipeline with default components for a QA task
ts = TemplateSource.from_predefined(PredefinedTemplate.QA)
pipe = PipelineTemplate(ts).build()
print(pipe.run(data={"question": "What's the capital of Bosnia and Herzegovina? Be brief"}))
```
- **Custom Component Settings**: Customizing a pipeline by overriding a component, such as integrating a
streaming-capable generator for real-time feedback.
```python
from haystack.components.generators import OpenAIGenerator
from haystack.components.generators.utils import print_streaming_chunk
from haystack.templates import PipelineTemplate, TemplateSource, PredefinedTemplate
# Customize the pipeline with a streaming-capable generator
ts = TemplateSource.from_predefined(PredefinedTemplate.QA)
streaming_pipe = PipelineTemplate(ts).override("generator",
OpenAIGenerator(
streaming_callback=print_streaming_chunk)).build()
streaming_pipe.run(data={"question": "What's the capital of Germany? Tell me about it"})
```
- **Customizing for Specific Tasks**: Building a pipeline for document indexing with specific components tailored
to the task.
```python
from haystack.components.embedders import SentenceTransformersDocumentEmbedder
from haystack.templates import PipelineTemplate, TemplateSource, PredefinedTemplate
# Customize the pipeline for document indexing with specific components, include PDF file converter
ts = TemplateSource.from_predefined(PredefinedTemplate.INDEXING)
ptb = PipelineTemplate(ts, template_params={"use_pdf_file_converter": True})
ptb.override("embedder", SentenceTransformersDocumentEmbedder(progress_bar=True))
pipe = ptb.build()
result = pipe.run(data={
"sources": ["some_text_file.txt", "another_pdf_file.pdf"]})
print(result)
```
The `PipelineTemplate` is designed to offer both ease of use for common pipeline configurations and the
flexibility to customize and extend pipelines as required by advanced users and specific use cases.
"""
template_file_extension = ".yaml.jinja2"
def __init__(self, pipeline_template: TemplateSource, template_params: Optional[Dict[str, Any]] = None):
"""
Initialize a PipelineTemplate.
:param pipeline_template: The template source to use. See `TemplateSource` for available methods to load
templates.
:param template_params: An optional dictionary of parameters to use when rendering the pipeline template.
"""
self.template_text = pipeline_template.template
env = NativeEnvironment()
try:
self.template = env.from_string(self.template_text)
except TemplateSyntaxError as e:
raise ValueError(f"Invalid pipeline template, template syntax error: {e.message}") from e
self.templated_variables = self._extract_variables(env)
self.components: Dict[str, Any] = {}
self.template_params = template_params or {}
def override(self, component_name: str, component_instance: Component) -> "PipelineTemplate":
"""
Overrides a component specified in the pipeline template with a custom component instance.
:param component_name: The name of the component within the template to override.
:param component_instance: The instance of the component to use as an override. Must be an instance
of a class annotated with `@component`.
:return: The instance of `PipelineTemplate` to allow for method chaining.
:raises PipelineValidationError: If the `component_name` does not exist in the template or if
`component_instance` is not a valid component.
"""
# check if the component_name is allowed in the template
if component_name not in self.templated_variables:
raise PipelineValidationError(f"Component '{component_name}' is not defined in the pipeline template")
if not isinstance(component_instance, Component):
raise PipelineValidationError(
f"'{type(component_instance)}' doesn't seem to be a component. Is this class decorated with @component?"
)
self.components[component_name] = component_to_dict(component_instance)
return self
def list_variables(self) -> Set[str]:
"""
Lists all templated variables in the pipeline template.
:return: a list of strings representing the names of templated variables in the pipeline template.
"""
return self.templated_variables
def build(self):
"""
Constructs a `Pipeline` instance based on the template and any overridden components.
:return: An instance of `Pipeline` constructed from the rendered template and custom component configurations.
"""
rendered_yaml = self.template.render(**self.components, **self.template_params)
pipeline_yaml = yaml.safe_load(rendered_yaml)
return Pipeline.from_dict(pipeline_yaml)
def _extract_variables(self, env: NativeEnvironment) -> Set[str]:
"""
Extracts all variables from a list of Jinja template strings.
:param env: A Jinja native environment.
:return: A set of variable names extracted from the template strings.
"""
variables = set()
ast = env.parse(self.template_text)
variables.update(meta.find_undeclared_variables(ast))
return variables

View File

@ -0,0 +1,23 @@
components:
generator: {{ generator | default({
"init_parameters": {
"api_key": {
"env_vars": [ "OPENAI_API_KEY" ],
"strict": true,
"type": "env_var"
},
"model": "gpt-3.5-turbo"
},
"type": "haystack.components.generators.openai.OpenAIGenerator"
}, true) | tojson }}
prompt_builder: {{ prompt_builder | default({
"init_parameters": {
"template": "\nGiven the question {{question}}\nAnswer it in German only.\n\nAntwort:",
},
"type": "haystack.components.builders.prompt_builder.PromptBuilder"
}, true) | tojson }}
connections:
- receiver: generator.prompt
sender: prompt_builder.prompt
max_loops_allowed: 2
metadata: {}

View File

@ -0,0 +1,72 @@
components:
answer_builder: {{ answer_builder | default({
"init_parameters": {},
"type": "haystack.components.builders.answer_builder.AnswerBuilder"
}, true) | tojson }}
generator: {{ generator | default({
"init_parameters": {
"api_key": {
"env_vars": [ "OPENAI_API_KEY" ],
"strict": true,
"type": "env_var"
},
"model": "gpt-3.5-turbo"
},
"type": "haystack.components.generators.openai.OpenAIGenerator"
}, true) | tojson }}
retriever: {{ retriever | default({
"init_parameters": {
"document_store": {
"init_parameters": {
"bm25_algorithm": "BM25L",
"bm25_parameters": {},
"bm25_tokenization_regex": "(?u)\\b\\w\\w+\\b",
"embedding_similarity_function": "dot_product"
},
"type": "haystack.document_stores.in_memory.document_store.InMemoryDocumentStore"
},
"filters": None,
"return_embedding": false,
"scale_score": false,
"top_k": 10
},
"type": "haystack.components.retrievers.in_memory.embedding_retriever.InMemoryEmbeddingRetriever"
}, true) | tojson }}
text_embedder: {{ text_embedder | default({
"init_parameters": {
"batch_size": 32,
"device": "cpu",
"model": "sentence-transformers/all-mpnet-base-v2",
"normalize_embeddings": false,
"prefix": "",
"progress_bar": true,
"suffix": "",
"token": {
"env_vars": [ "HF_API_TOKEN" ],
"strict": false,
"type": "env_var"
}
},
"type": "haystack.components.embedders.sentence_transformers_text_embedder.SentenceTransformersTextEmbedder"
}, true) | tojson }}
prompt_builder: {{ prompt_builder | default({
"init_parameters": {
"template": "\nGiven these documents, answer the question.\n\nDocuments:\n{% for doc in documents %}\n{{ doc.content }}\n {% endfor %}\n\nQuestion: {{question}}\n\nAnswer:\n",
},
"type": "haystack.components.builders.prompt_builder.PromptBuilder"
}, true) | tojson }}
connections:
- receiver: retriever.query_embedding
sender: text_embedder.embedding
- receiver: prompt_builder.documents
sender: retriever.documents
- receiver: answer_builder.documents
sender: retriever.documents
- receiver: generator.prompt
sender: prompt_builder.prompt
- receiver: answer_builder.replies
sender: generator.replies
- receiver: answer_builder.meta
sender: generator.meta
max_loops_allowed: 100
metadata: {}

View File

@ -0,0 +1,112 @@
import re
from enum import Enum
from pathlib import Path
from typing import Union
import requests
TEMPLATE_FILE_EXTENSION = ".yaml.jinja2"
TEMPLATE_HOME_DIR = Path(__file__).resolve().parent
class PredefinedTemplate(Enum):
"""
Enumeration of predefined pipeline templates that can be used to create a `PipelineTemplate` using `TemplateSource`.
See `TemplateSource.from_predefined` for usage.
"""
# maintain 1-to-1 mapping between the enum name and the template file name in templates directory
QA = "qa"
RAG = "rag"
INDEXING = "indexing"
class TemplateSource:
"""
TemplateSource loads template content from various inputs, including strings, files, predefined templates, and URLs.
The class provides mechanisms to load templates dynamically and ensure they contain valid Jinja2 syntax.
TemplateSource is used by `PipelineTemplate` to load pipeline templates from various sources.
For example:
```python
# Load a predefined indexing pipeline template
ts = TemplateSource.from_predefined(PredefinedTemplate.INDEXING)
pipeline = PipelineTemplate(ts)
# Load a custom pipeline template from a file
ts = TemplateSource.from_file("path/to/custom_template.yaml.jinja2")
pipeline = PipelineTemplate(ts)
```
Similar methods are available to load templates from strings and URLs.
"""
def __init__(self, template: str):
"""
Initialize a TemplateSource.
:param template: The template string to use.
"""
self._template = template
@classmethod
def from_str(cls, template_str: str) -> "TemplateSource":
"""
Create a TemplateSource from a string.
:param template_str: The template string to use. Must contain valid Jinja2 syntax.
:return: An instance of `TemplateSource`.
"""
if not cls._contains_jinja2_syntax(template_str):
raise ValueError("The provided template does not contain Jinja2 syntax.")
return cls(template_str)
@classmethod
def from_file(cls, file_path: Union[Path, str]) -> "TemplateSource":
"""
Create a TemplateSource from a file.
:param file_path: The path to the file containing the template. Must contain valid Jinja2 syntax.
:return: An instance of `TemplateSource`.
"""
with open(file_path, "r") as file:
return cls.from_str(file.read())
@classmethod
def from_predefined(cls, predefined_template: PredefinedTemplate) -> "TemplateSource":
"""
Create a TemplateSource from a predefined template. See `PredefinedTemplate` for available options.
:param predefined_template: The name of the predefined template to use.
:return: An instance of `TemplateSource`.
"""
template_path = f"{TEMPLATE_HOME_DIR}/{predefined_template.value}{TEMPLATE_FILE_EXTENSION}"
return cls.from_file(template_path)
@classmethod
def from_url(cls, url: str) -> "TemplateSource":
"""
Create a TemplateSource from a URL.
:param url: The URL to fetch the template from. Must contain valid Jinja2 syntax.
:return: An instance of `TemplateSource`.
"""
response = requests.get(url, timeout=10)
response.raise_for_status()
return cls.from_str(response.text)
@property
def template(self) -> str:
"""
Returns the raw template string as a read-only property.
"""
return self._template
@staticmethod
def _contains_jinja2_syntax(potential_jinja_template: str) -> bool:
"""
Determines if a given string contains Jinja2 templating syntax.
:param potential_jinja_template: The string to check for Jinja2 syntax.
:return: `True` if Jinja2 syntax is found, otherwise `False`.
"""
# Patterns to look for: {{ var }}, {% block %}, {# comment #}
patterns = [r"\{\{.*?\}\}", r"\{%.*?%\}", r"\{#.*?#\}"]
combined_pattern = re.compile("|".join(patterns))
return bool(combined_pattern.search(potential_jinja_template))

View File

@ -0,0 +1,42 @@
---
highlights:
- |
Introducing a flexible and dynamic approach to creating NLP pipelines with Haystack's new PipelineTemplate class!
This innovative feature utilizes Jinja2 templated YAML files, allowing users to effortlessly construct and customize
complex data processing pipelines for various NLP tasks. From question answering and document indexing to custom
pipeline requirements, the PipelineTemplate simplifies configuration and enhances adaptability. Users can now easily
override default components or integrate custom settings with simple, straightforward code.
For example, the following pipeline template can be used to create an indexing pipeline:
```python
from haystack.components.embedders import SentenceTransformersDocumentEmbedder
from haystack.templates import PipelineTemplate, TemplateSource, PredefinedTemplate
ts = TemplateSource.from_predefined(PredefinedTemplate.INDEXING)
pt = PipelineTemplate(ts, template_params={"use_pdf_file_converter": True})
pt.override("embedder", SentenceTransformersDocumentEmbedder(progress_bar=True))
pipe = ptb.build()
result = pipe.run(data={"sources": ["some_local_dir/and_text_file.txt", "some_other_local_dir/and_pdf_file.pdf"]})
print(result)
```
In the above example, a PredefinedTemplate.INDEXING enum is used to create a pipeline with a custom instance of
SentenceTransformersDocumentEmbedder and the PDF file converter enabled. The pipeline is then run on a list of
local files and the result is printed (number of indexed documents).
We could have of course used the same PipelineTemplate class to create any other pre-defined pipeline or even a
custom pipeline with custom components and settings.
On the other hand, the following pipeline template can be used to create a pre-defined RAG pipeline:
```python
from haystack.templates import PipelineTemplate, TemplateSource, PredefinedTemplate
ts = TemplateSource.from_predefined(PredefinedTemplate.RAG)
pipe = PipelineTemplate(ts).build()
result = pipe.run(query="What's the meaning of life?")
print(result)
```
TemplateSource loads template content from various inputs, including strings, files, predefined templates, and URLs.
The class provides mechanisms to load templates dynamically and ensure they contain valid Jinja2 syntax.

View File

@ -0,0 +1,122 @@
import sys
import tempfile
import pytest
from haystack import Pipeline
from haystack.components.builders import PromptBuilder
from haystack.components.embedders import SentenceTransformersDocumentEmbedder
from haystack.components.generators import HuggingFaceTGIGenerator
from haystack.core.errors import PipelineValidationError
from haystack.templates import PipelineTemplate, TemplateSource, PredefinedTemplate
@pytest.fixture
def random_valid_template():
template = """components:
generator: {{ generator | tojson }}
prompt_builder: {{prompt_builder}}
connections:
- receiver: generator.prompt
sender: prompt_builder.prompt
max_loops_allowed: 2
metadata: {}
"""
return template
class TestPipelineTemplate:
# test_TemplateSource
# If the provided template does not contain Jinja2 syntax.
def test_from_str(self):
with pytest.raises(ValueError):
TemplateSource.from_str("invalid_template")
# If the provided template contains Jinja2 syntax.
def test_from_str_valid(self):
ts = TemplateSource.from_str("{{ valid_template }}")
assert ts.template == "{{ valid_template }}"
# If the provided file path does not exist.
def test_from_file_invalid_path(self):
with pytest.raises(FileNotFoundError):
TemplateSource.from_file("invalid_path")
# If the provided file path exists.
@pytest.mark.skipif(sys.platform == "win32", reason="Fails on Windows CI with permission denied")
def test_from_file_valid_path(self, random_valid_template):
temp_file = tempfile.NamedTemporaryFile(mode="w")
temp_file.write(random_valid_template)
temp_file.flush()
ts = TemplateSource.from_file(temp_file.name)
assert ts.template == random_valid_template
# Use predefined template
def test_from_predefined_invalid_template(self):
ts = TemplateSource.from_predefined(PredefinedTemplate.INDEXING)
assert len(ts.template) > 0
# Raises PipelineValidationError when attempting to override a non-existent component
def test_override_nonexistent_component(self):
ts = TemplateSource.from_predefined(PredefinedTemplate.INDEXING)
with pytest.raises(PipelineValidationError):
PipelineTemplate(ts).override("nonexistent_component", SentenceTransformersDocumentEmbedder())
# Building a pipeline directly using all default components specified in a predefined or custom template.
def test_build_pipeline_with_default_components(self):
ts = TemplateSource.from_predefined(PredefinedTemplate.INDEXING)
pipeline = PipelineTemplate(ts).build()
assert isinstance(pipeline, Pipeline)
# pipeline has components
assert pipeline.get_component("cleaner")
assert pipeline.get_component("writer")
assert pipeline.get_component("embedder")
# pipeline should have inputs and outputs
assert len(pipeline.inputs()) > 0
assert len(pipeline.outputs()) > 0
# Customizing pipelines by overriding default components with custom component settings
def test_customize_pipeline_with_overrides(self):
ts = TemplateSource.from_predefined(PredefinedTemplate.INDEXING)
pt = PipelineTemplate(ts)
pt.override("embedder", SentenceTransformersDocumentEmbedder(progress_bar=True, batch_size=64))
pipe = pt.build()
assert isinstance(pipe, Pipeline)
assert pipe.get_component("embedder")
embedder: SentenceTransformersDocumentEmbedder = pipe.get_component("embedder")
embedder_dict = embedder.to_dict()
assert embedder_dict["init_parameters"]["progress_bar"]
assert embedder_dict["init_parameters"]["batch_size"] == 64
# Overrides a generator component specified in the pipeline template with a completely different generator
@pytest.mark.integration
def test_override_component(self):
# integration because we'll fetch the tokenizer
pipe = (
PipelineTemplate(TemplateSource.from_predefined(PredefinedTemplate.QA))
.override("generator", HuggingFaceTGIGenerator())
.build()
)
assert isinstance(pipe, Pipeline)
assert pipe.get_component("generator")
assert isinstance(pipe.get_component("generator"), HuggingFaceTGIGenerator)
# Building a pipeline with a custom template that uses Jinja2 syntax to specify components and their connections
@pytest.mark.integration
def test_building_pipeline_with_direct_template(self, random_valid_template):
pt = PipelineTemplate(TemplateSource.from_str(random_valid_template))
pt.override("generator", HuggingFaceTGIGenerator())
pt.override("prompt_builder", PromptBuilder("Some fake prompt"))
pipe = pt.build()
assert isinstance(pipe, Pipeline)
assert pipe.get_component("generator")
assert isinstance(pipe.get_component("generator"), HuggingFaceTGIGenerator)
assert pipe.get_component("prompt_builder")
assert isinstance(pipe.get_component("prompt_builder"), PromptBuilder)