diff --git a/haystack/nodes/__init__.py b/haystack/nodes/__init__.py index 15ca52ce3..457c335b7 100644 --- a/haystack/nodes/__init__.py +++ b/haystack/nodes/__init__.py @@ -23,7 +23,7 @@ from haystack.nodes.file_converter import ( ) from haystack.nodes.image_to_text import TransformersImageToText from haystack.nodes.label_generator import PseudoLabelGenerator -from haystack.nodes.other import Docs2Answers, JoinDocuments, RouteDocuments, JoinAnswers, DocumentMerger +from haystack.nodes.other import Docs2Answers, JoinDocuments, RouteDocuments, JoinAnswers, DocumentMerger, Shaper from haystack.nodes.preprocessor import BasePreProcessor, PreProcessor from haystack.nodes.prompt import PromptNode, PromptTemplate, PromptModel from haystack.nodes.query_classifier import SklearnQueryClassifier, TransformersQueryClassifier diff --git a/haystack/nodes/other/__init__.py b/haystack/nodes/other/__init__.py index 39f9dfce5..a92291f4e 100644 --- a/haystack/nodes/other/__init__.py +++ b/haystack/nodes/other/__init__.py @@ -4,3 +4,4 @@ from haystack.nodes.other.route_documents import RouteDocuments from haystack.nodes.other.join_answers import JoinAnswers from haystack.nodes.other.join import JoinNode from haystack.nodes.other.document_merger import DocumentMerger +from haystack.nodes.other.shaper import Shaper diff --git a/haystack/nodes/other/shaper.py b/haystack/nodes/other/shaper.py new file mode 100644 index 000000000..bb6933bb6 --- /dev/null +++ b/haystack/nodes/other/shaper.py @@ -0,0 +1,408 @@ +from typing import Optional, List, Dict, Any, Tuple, Union, Callable + +import logging + +from haystack.nodes.base import BaseComponent +from haystack.schema import Document, Answer, MultiLabel + + +logger = logging.getLogger(__name__) + + +def rename(value: Any) -> Tuple[Any]: + """ + Identity function. Can be used to rename values in the invocation context without changing them. + + Example: + + ```python + assert rename(1) == (1, ) + ``` + """ + return (value,) + + +def value_to_list(value: Any, target_list: List[Any]) -> Tuple[List[Any]]: + """ + Transforms a value into a list containing this value as many times as the length of the target list. + + Example: + + ```python + assert value_to_list(value=1, target_list=list(range(5))) == ([1, 1, 1, 1, 1], ) + ``` + """ + return ([value] * len(target_list),) + + +def join_lists(lists: List[List[Any]]) -> Tuple[List[Any]]: + """ + Joins the passed lists into a single one. + + Example: + + ```python + assert join_lists(lists=[[1, 2, 3], [4, 5]]) == ([1, 2, 3, 4, 5], ) + ``` + """ + merged_list = [] + for inner_list in lists: + merged_list += inner_list + return (merged_list,) + + +def join_strings(strings: List[str], delimiter: str = " ") -> Tuple[str]: + """ + Transforms a list of strings into a single string. The content of this string + is the content of all original strings separated by the delimiter you specify. + + Example: + + ```python + assert join_strings(strings=["first", "second", "third"], delimiter=" - ") == ("first - second - third", ) + ``` + """ + return (delimiter.join(strings),) + + +def join_documents(documents: List[Document], delimiter: str = " ") -> Tuple[List[Document]]: + """ + Transforms a list of documents into a list containing a single Document. The content of this list + is the content of all original documents separated by the delimiter you specify. + + All metadata is dropped. (TODO: fix) + + Example: + + ```python + assert join_documents( + documents=[ + Document(content="first"), + Document(content="second"), + Document(content="third") + ], + delimiter=" - " + ) == ([Document(content="first - second - third")], ) + ``` + """ + return ([Document(content=delimiter.join([d.content for d in documents]))],) + + +def strings_to_answers(strings: List[str]) -> Tuple[List[Answer]]: + """ + Transforms a list of strings into a list of Answers. + + Example: + + ```python + assert strings_to_answers(strings=["first", "second", "third"]) == ([ + Answer(answer="first"), + Answer(answer="second"), + Answer(answer="third"), + ], ) + ``` + """ + return ([Answer(answer=string, type="generative") for string in strings],) + + +def answers_to_strings(answers: List[Answer]) -> Tuple[List[str]]: + """ + Extracts the content field of Documents and returns a list of strings. + + Example: + + ```python + assert answers_to_strings( + answers=[ + Answer(answer="first"), + Answer(answer="second"), + Answer(answer="third") + ] + ) == (["first", "second", "third"],) + ``` + """ + return ([answer.answer for answer in answers],) + + +def strings_to_documents( + strings: List[str], + meta: Union[List[Optional[Dict[str, Any]]], Optional[Dict[str, Any]]] = None, + id_hash_keys: Optional[List[str]] = None, +) -> Tuple[List[Document]]: + """ + Transforms a list of strings into a list of Documents. If you pass the metadata in a single + dictionary, all Documents get the same metadata. If you pass the metadata as a list, the length of this list + must be the same as the length of the list of strings, and each Document gets its own metadata. + You can specify `id_hash_keys` only once and it gets assigned to all Documents. + + Example: + + ```python + assert strings_to_documents( + strings=["first", "second", "third"], + meta=[{"position": i} for i in range(3)], + id_hash_keys=['content', 'meta] + ) == ([ + Document(content="first", metadata={"position": 1}, id_hash_keys=['content', 'meta])]), + Document(content="second", metadata={"position": 2}, id_hash_keys=['content', 'meta]), + Document(content="third", metadata={"position": 3}, id_hash_keys=['content', 'meta]) + ], ) + ``` + """ + all_metadata: List[Optional[Dict[str, Any]]] + if isinstance(meta, dict): + all_metadata = [meta] * len(strings) + elif isinstance(meta, list): + if len(meta) != len(strings): + raise ValueError( + f"Not enough metadata dictionaries. strings_to_documents received {len(strings)} and {len(meta)} metadata dictionaries." + ) + all_metadata = meta + else: + all_metadata = [None] * len(strings) + + return ([Document(content=string, meta=m, id_hash_keys=id_hash_keys) for string, m in zip(strings, all_metadata)],) + + +def documents_to_strings(documents: List[Document]) -> Tuple[List[str]]: + """ + Extracts the content field of Documents and returns a list of strings. + + Example: + + ```python + assert documents_to_strings( + documents=[ + Document(content="first"), + Document(content="second"), + Document(content="third") + ] + ) == (["first", "second", "third"],) + ``` + """ + return ([doc.content for doc in documents],) + + +REGISTERED_FUNCTIONS: Dict[str, Callable[..., Tuple[Any]]] = { + "rename": rename, + "value_to_list": value_to_list, + "join_lists": join_lists, + "join_strings": join_strings, + "join_documents": join_documents, + "strings_to_answers": strings_to_answers, + "answers_to_strings": answers_to_strings, + "strings_to_documents": strings_to_documents, + "documents_to_strings": documents_to_strings, +} + + +class Shaper(BaseComponent): + + """ + Shaper is a component that can invoke arbitrary, registered functions on the invocation context + (query, documents, and so on) of a pipeline. It then passes the new or modified variables further down the pipeline. + + Using YAML configuration, the Shaper component is initialized with functions to invoke on pipeline invocation + context. + + For example, in the YAML snippet below: + ```yaml + components: + - name: shaper + type: Shaper + params: + func: value_to_list + inputs: + value: query + target_list: documents + output: [questions] + ``` + Shaper component is initialized with a directive to invoke function expand on the variable query and to store + the result in the invocation context variable questions. All other invocation context variables are passed down + the pipeline as they are. + + Shaper is especially useful for pipelines with PromptNodes, where we need to modify the invocation + context to match the templates of PromptNodes. + + You can use multiple Shaper components in a pipeline to modify the invocation context as needed. + + `Shaper` supports the current functions: + + - `value_to_list` + - `join_strings` + - `join_documents` + - `join_lists` + - `strings_to_documents` + - `documents_to_strings` + + See their descriptions in the code for details about their inputs, outputs, and other parameters. + """ + + outgoing_edges = 1 + + def __init__( + self, + func: str, + outputs: List[str], + inputs: Optional[Dict[str, Union[List[str], str]]] = None, + params: Optional[Dict[str, Any]] = None, + ): + """ + Initializes the Shaper component. + + Some examples: + + ```yaml + - name: shaper + type: Shaper + params: + func: value_to_list + inputs: + value: query + target_list: documents + outputs: + - questions + ``` + This node takes the content of `query` and creates a list that contains the value of `query` `len(documents)` times. + This list is stored in the invocation context under the key `questions`. + + ```yaml + - name: shaper + type: Shaper + params: + func: join_documents + inputs: + value: documents + params: + delimiter: ' - ' + outputs: + - documents + ``` + This node overwrites the content of `documents` in the invocation context with a list containing a single Document + whose content is the concatenation of all the original Documents. So if `documents` contained + `[Document("A"), Document("B"), Document("C")]`, this shaper overwrites it with `[Document("A - B - C")]` + + ```yaml + - name: shaper + type: Shaper + params: + func: join_strings + params: + strings: ['a', 'b', 'c'] + delimiter: ' . ' + outputs: + - single_string + + - name: shaper + type: Shaper + params: + func: strings_to_documents + inputs: + strings: single_string + metadata: + name: 'my_file.txt' + outputs: + - single_document + ``` + These two nodes, executed one after the other, first add a key in the invocation context called `single_string` + that contains `a . b . c`, and then create another key called `single_document` that contains instead + `[Document(content="a . b . c", metadata={'name': 'my_file.txt'})]`. + + :param func: The function to apply. + :param inputs: Maps the function's input kwargs to the key-value pairs in the invocation context. + For example, `value_to_list` expects the `value` and `target_list` parameters, so `inputs` might contain: + `{'value': 'query', 'target_list': 'documents'}`. It doesn't need to contain all keyword args, see `params`. + :param params: Maps the function's input kwargs to some fixed values. For example, `value_to_list` expects + `value` and `target_list` parameters, so `params` might contain + `{'value': 'A', 'target_list': [1, 1, 1, 1]}` and the node's output is `["A", "A", "A", "A"]`. + It doesn't need to contain all keyword args, see `inputs`. + You can use params to provide fallback values for arguments of `run` that you're not sure exist. + So if you need `query` to exist, you can provide a fallback value in the params, which will be used only if `query` + is not passed to this node by the pipeline. + :param outputs: THe key to store the outputs in the invocation context. The length of the outputs must match + the number of outputs produced by the function invoked. + """ + super().__init__() + self.function = REGISTERED_FUNCTIONS[func] + self.outputs = outputs + self.inputs = inputs or {} + self.params = params or {} + + def run( # type: ignore + self, + query: Optional[str] = None, + file_paths: Optional[List[str]] = None, + labels: Optional[MultiLabel] = None, + documents: Optional[List[Document]] = None, + meta: Optional[dict] = None, + invocation_context: Optional[Dict[str, Any]] = None, + ) -> Tuple[Dict, str]: + invocation_context = invocation_context or {} + if query and "query" not in invocation_context.keys(): + invocation_context["query"] = query + + if file_paths and "file_paths" not in invocation_context.keys(): + invocation_context["file_paths"] = file_paths + + if labels and "labels" not in invocation_context.keys(): + invocation_context["labels"] = labels + + if documents and "documents" not in invocation_context.keys(): + invocation_context["documents"] = documents + + if meta and "meta" not in invocation_context.keys(): + invocation_context["meta"] = meta + + input_values: Dict[str, Any] = {} + for key, value in self.inputs.items(): + if isinstance(value, list): + input_values[key] = [] + for v in value: + if v in invocation_context.keys() and v is not None: + input_values[key].append(invocation_context[v]) + else: + if value in invocation_context.keys() and value is not None: + input_values[key] = invocation_context[value] + + input_values = {**self.params, **input_values} + try: + logger.debug( + "Shaper is invoking this function: %s(%s)", + self.function.__name__, + ", ".join([f"{key}={value}" for key, value in input_values.items()]), + ) + output_values = self.function(**input_values) + except TypeError as e: + raise ValueError( + "Shaper couldn't apply the function to your inputs and parameters. " + "Check the above stacktrace and make sure you provided all the correct inputs, parameters, " + "and parameter types." + ) from e + + for output_key, output_value in zip(self.outputs, output_values): + invocation_context[output_key] = output_value + + results = {"invocation_context": invocation_context} + if output_key in ["query", "file_paths", "labels", "documents", "meta"]: + results[output_key] = output_value + + return results, "output_1" + + def run_batch( # type: ignore + self, + query: Optional[str] = None, + file_paths: Optional[List[str]] = None, + labels: Optional[MultiLabel] = None, + documents: Optional[List[Document]] = None, + meta: Optional[dict] = None, + invocation_context: Optional[Dict[str, Any]] = None, + ) -> Tuple[Dict, str]: + + return self.run( + query=query, + file_paths=file_paths, + labels=labels, + documents=documents, + meta=meta, + invocation_context=invocation_context, + ) diff --git a/haystack/nodes/prompt/prompt_node.py b/haystack/nodes/prompt/prompt_node.py index 10b787785..4f17881f1 100644 --- a/haystack/nodes/prompt/prompt_node.py +++ b/haystack/nodes/prompt/prompt_node.py @@ -911,29 +911,42 @@ class PromptNode(BaseComponent): :param meta: The meta to be used for the prompt. Usually not used. :param invocation_context: The invocation context to be used for the prompt. """ - - # invocation_context is a dictionary that is passed from a pipeline node to a pipeline node and can be used - # to pass results from a pipeline node to any other downstream pipeline node. - invocation_context = invocation_context or {} - # prompt_collector is an empty list, it's passed to the PromptNode that will fill it with the rendered prompts, # so that they can be returned by `run()` as part of the pipeline's debug output. prompt_collector: List[str] = [] - results = self( - query=query, - labels=labels, - documents=[doc.content for doc in documents if isinstance(doc.content, str)] if documents else [], - prompt_collector=prompt_collector, - **invocation_context, - ) + invocation_context = invocation_context or {} + if query and "query" not in invocation_context.keys(): + invocation_context["query"] = query + + if file_paths and "file_paths" not in invocation_context.keys(): + invocation_context["file_paths"] = file_paths + + if labels and "labels" not in invocation_context.keys(): + invocation_context["labels"] = labels + + if documents and "documents" not in invocation_context.keys(): + invocation_context["documents"] = documents + + if meta and "meta" not in invocation_context.keys(): + invocation_context["meta"] = meta + + if "documents" in invocation_context.keys(): + for doc in invocation_context.get("documents", []): + if not isinstance(doc, str) and not isinstance(doc.content, str): + raise ValueError("PromptNode only accepts text documents.") + invocation_context["documents"] = [ + doc.content if isinstance(doc, Document) else doc for doc in invocation_context.get("documents", []) + ] + + results = self(prompt_collector=prompt_collector, **invocation_context) final_result: Dict[str, Any] = {} - if self.output_variable: - invocation_context[self.output_variable] = results - final_result[self.output_variable] = results + output_variable = self.output_variable or "results" + if output_variable: + invocation_context[output_variable] = results + final_result[output_variable] = results - final_result["results"] = results final_result["invocation_context"] = invocation_context final_result["_debug"] = {"prompts_used": prompt_collector} return final_result, "output_1" diff --git a/test/nodes/test_shaper.py b/test/nodes/test_shaper.py new file mode 100644 index 000000000..4c3cf01a7 --- /dev/null +++ b/test/nodes/test_shaper.py @@ -0,0 +1,1099 @@ +import re +import pytest + +import haystack +from haystack import Pipeline, Document, Answer +from haystack.nodes.other.shaper import Shaper + + +@pytest.fixture +def mock_function(monkeypatch): + monkeypatch.setattr( + haystack.nodes.other.shaper, "REGISTERED_FUNCTIONS", {"test_function": lambda a, b: ([a] * len(b),)} + ) + + +def test_basic_invocation_only_inputs(mock_function): + shaper = Shaper(func="test_function", inputs={"a": "query", "b": "documents"}, outputs=["c"]) + results, _ = shaper.run(query="test query", documents=["doesn't", "really", "matter"]) + assert results["invocation_context"]["c"] == ["test query", "test query", "test query"] + + +def test_basic_invocation_only_params(mock_function): + shaper = Shaper(func="test_function", params={"a": "A", "b": list(range(3))}, outputs=["c"]) + results, _ = shaper.run() + assert results["invocation_context"]["c"] == ["A", "A", "A"] + + +def test_basic_invocation_inputs_and_params(mock_function): + shaper = Shaper(func="test_function", inputs={"a": "query"}, params={"b": list(range(2))}, outputs=["c"]) + results, _ = shaper.run(query="test query") + assert results["invocation_context"]["c"] == ["test query", "test query"] + + +def test_basic_invocation_inputs_and_params_colliding(mock_function): + shaper = Shaper( + func="test_function", inputs={"a": "query"}, params={"a": "default value", "b": list(range(2))}, outputs=["c"] + ) + results, _ = shaper.run(query="test query") + assert results["invocation_context"]["c"] == ["test query", "test query"] + + +def test_basic_invocation_inputs_and_params_using_params_as_defaults(mock_function): + shaper = Shaper( + func="test_function", inputs={"a": "query"}, params={"a": "default", "b": list(range(2))}, outputs=["c"] + ) + results, _ = shaper.run() + assert results["invocation_context"]["c"] == ["default", "default"] + + +def test_missing_argument(mock_function): + shaper = Shaper(func="test_function", inputs={"b": "documents"}, outputs=["c"]) + with pytest.raises(ValueError, match="Shaper couldn't apply the function to your inputs and parameters."): + shaper.run(query="test query", documents=["doesn't", "really", "matter"]) + + +def test_excess_argument(mock_function): + shaper = Shaper( + func="test_function", inputs={"a": "query", "b": "documents", "something_extra": "query"}, outputs=["c"] + ) + with pytest.raises(ValueError, match="Shaper couldn't apply the function to your inputs and parameters."): + shaper.run(query="test query", documents=["doesn't", "really", "matter"]) + + +def test_value_not_in_invocation_context(mock_function): + shaper = Shaper(func="test_function", inputs={"a": "query", "b": "something_that_does_not_exist"}, outputs=["c"]) + with pytest.raises(ValueError, match="Shaper couldn't apply the function to your inputs and parameters."): + shaper.run(query="test query", documents=["doesn't", "really", "matter"]) + + +def test_value_only_in_invocation_context(mock_function): + shaper = Shaper(func="test_function", inputs={"a": "query", "b": "invocation_context_specific"}, outputs=["c"]) + results, _s = shaper.run( + query="test query", invocation_context={"invocation_context_specific": ["doesn't", "really", "matter"]} + ) + assert results["invocation_context"]["c"] == ["test query", "test query", "test query"] + + +def test_yaml(mock_function, tmp_path): + with open(tmp_path / "tmp_config.yml", "w") as tmp_file: + tmp_file.write( + f""" + version: ignore + components: + - name: shaper + type: Shaper + params: + func: test_function + inputs: + a: query + params: + b: [1, 1] + outputs: + - c + pipelines: + - name: query + nodes: + - name: shaper + inputs: + - Query + """ + ) + pipeline = Pipeline.load_from_yaml(path=tmp_path / "tmp_config.yml") + result = pipeline.run( + query="test query", documents=[Document(content="first"), Document(content="second"), Document(content="third")] + ) + assert result["invocation_context"]["c"] == ["test query", "test query"] + assert result["query"] == "test query" + assert result["documents"] == [Document(content="first"), Document(content="second"), Document(content="third")] + + +# +# rename +# + + +def test_rename(): + shaper = Shaper(func="rename", inputs={"value": "query"}, outputs=["questions"]) + results, _ = shaper.run(query="test query") + assert results["invocation_context"]["questions"] == "test query" + + +def test_rename_yaml(tmp_path): + with open(tmp_path / "tmp_config.yml", "w") as tmp_file: + tmp_file.write( + f""" + version: ignore + components: + - name: shaper + type: Shaper + params: + func: rename + inputs: + value: query + outputs: + - questions + pipelines: + - name: query + nodes: + - name: shaper + inputs: + - Query + """ + ) + pipeline = Pipeline.load_from_yaml(path=tmp_path / "tmp_config.yml") + result = pipeline.run(query="test query") + assert result["invocation_context"]["query"] == "test query" + assert result["invocation_context"]["questions"] == "test query" + + +# +# value_to_list +# + + +def test_value_to_list(): + shaper = Shaper(func="value_to_list", inputs={"value": "query", "target_list": "documents"}, outputs=["questions"]) + results, _ = shaper.run(query="test query", documents=["doesn't", "really", "matter"]) + assert results["invocation_context"]["questions"] == ["test query", "test query", "test query"] + + +def test_value_to_list_yaml(tmp_path): + with open(tmp_path / "tmp_config.yml", "w") as tmp_file: + tmp_file.write( + f""" + version: ignore + components: + - name: shaper + type: Shaper + params: + func: value_to_list + inputs: + value: query + target_list: documents + outputs: + - questions + pipelines: + - name: query + nodes: + - name: shaper + inputs: + - Query + """ + ) + pipeline = Pipeline.load_from_yaml(path=tmp_path / "tmp_config.yml") + result = pipeline.run( + query="test query", documents=[Document(content="first"), Document(content="second"), Document(content="third")] + ) + assert result["invocation_context"]["questions"] == ["test query", "test query", "test query"] + # Assert pipeline output is unaffected + assert result["query"] == "test query" + assert result["documents"] == [Document(content="first"), Document(content="second"), Document(content="third")] + + +# +# join_lists +# + + +def test_join_lists(): + shaper = Shaper(func="join_lists", params={"lists": [[1, 2, 3], [4, 5]]}, outputs=["list"]) + results, _ = shaper.run() + assert results["invocation_context"]["list"] == [1, 2, 3, 4, 5] + + +def test_join_lists_yaml(tmp_path): + with open(tmp_path / "tmp_config.yml", "w") as tmp_file: + tmp_file.write( + f""" + version: ignore + components: + - name: shaper + type: Shaper + params: + func: join_lists + inputs: + lists: + - documents + - file_paths + outputs: + - single_list + pipelines: + - name: query + nodes: + - name: shaper + inputs: + - Query + """ + ) + pipeline = Pipeline.load_from_yaml(path=tmp_path / "tmp_config.yml") + result = pipeline.run(documents=["first", "second", "third"], file_paths=["file1.txt", "file2.txt"]) + assert result["invocation_context"]["single_list"] == ["first", "second", "third", "file1.txt", "file2.txt"] + + +# +# join_strings +# + + +def test_join_strings(): + shaper = Shaper( + func="join_strings", params={"strings": ["first", "second"], "delimiter": " | "}, outputs=["single_string"] + ) + results, _ = shaper.run() + assert results["invocation_context"]["single_string"] == "first | second" + + +def test_join_strings_default_delimiter(): + shaper = Shaper(func="join_strings", params={"strings": ["first", "second"]}, outputs=["single_string"]) + results, _ = shaper.run() + assert results["invocation_context"]["single_string"] == "first second" + + +def test_join_strings_yaml(tmp_path): + with open(tmp_path / "tmp_config.yml", "w") as tmp_file: + tmp_file.write( + f""" + version: ignore + components: + - name: shaper + type: Shaper + params: + func: join_strings + inputs: + strings: documents + params: + delimiter: ' - ' + outputs: + - single_string + pipelines: + - name: query + nodes: + - name: shaper + inputs: + - Query + """ + ) + pipeline = Pipeline.load_from_yaml(path=tmp_path / "tmp_config.yml") + result = pipeline.run(documents=["first", "second", "third"]) + assert result["invocation_context"]["single_string"] == "first - second - third" + + +def test_join_strings_default_delimiter_yaml(tmp_path): + with open(tmp_path / "tmp_config.yml", "w") as tmp_file: + tmp_file.write( + f""" + version: ignore + components: + - name: shaper + type: Shaper + params: + func: join_strings + inputs: + strings: documents + outputs: + - single_string + pipelines: + - name: query + nodes: + - name: shaper + inputs: + - Query + """ + ) + pipeline = Pipeline.load_from_yaml(path=tmp_path / "tmp_config.yml") + result = pipeline.run(documents=["first", "second", "third"]) + assert result["invocation_context"]["single_string"] == "first second third" + + +# +# join_documents +# + + +def test_join_documents(): + shaper = Shaper( + func="join_documents", inputs={"documents": "documents"}, params={"delimiter": " | "}, outputs=["documents"] + ) + results, _ = shaper.run( + documents=[Document(content="first"), Document(content="second"), Document(content="third")] + ) + assert results["invocation_context"]["documents"] == [Document(content="first | second | third")] + + +def test_join_documents_default_delimiter(): + shaper = Shaper(func="join_documents", inputs={"documents": "documents"}, outputs=["documents"]) + results, _ = shaper.run( + documents=[Document(content="first"), Document(content="second"), Document(content="third")] + ) + assert results["invocation_context"]["documents"] == [Document(content="first second third")] + + +def test_join_documents_yaml(tmp_path): + with open(tmp_path / "tmp_config.yml", "w") as tmp_file: + tmp_file.write( + f""" + version: ignore + + components: + - name: shaper + type: Shaper + params: + func: join_documents + inputs: + documents: documents + params: + delimiter: ' - ' + outputs: + - documents + + pipelines: + - name: query + nodes: + - name: shaper + inputs: + - Query + """ + ) + pipeline = Pipeline.load_from_yaml(path=tmp_path / "tmp_config.yml") + result = pipeline.run( + query="test query", documents=[Document(content="first"), Document(content="second"), Document(content="third")] + ) + assert result["invocation_context"]["documents"] == [Document(content="first - second - third")] + assert result["documents"] == [Document(content="first - second - third")] + + +def test_join_documents_default_delimiter_yaml(tmp_path): + with open(tmp_path / "tmp_config.yml", "w") as tmp_file: + tmp_file.write( + f""" + version: ignore + components: + - name: shaper + type: Shaper + params: + func: join_documents + inputs: + documents: documents + outputs: + - documents + pipelines: + - name: query + nodes: + - name: shaper + inputs: + - Query + """ + ) + pipeline = Pipeline.load_from_yaml(path=tmp_path / "tmp_config.yml") + result = pipeline.run( + query="test query", documents=[Document(content="first"), Document(content="second"), Document(content="third")] + ) + assert result["invocation_context"]["documents"] == [Document(content="first second third")] + + +# +# strings_to_answers +# + + +def test_strings_to_answers_no_meta_no_hashkeys(): + shaper = Shaper(func="strings_to_answers", inputs={"strings": "responses"}, outputs=["answers"]) + results, _ = shaper.run(invocation_context={"responses": ["first", "second", "third"]}) + assert results["invocation_context"]["answers"] == [ + Answer(answer="first", type="generative"), + Answer(answer="second", type="generative"), + Answer(answer="third", type="generative"), + ] + + +def test_strings_to_answers_yaml(tmp_path): + with open(tmp_path / "tmp_config.yml", "w") as tmp_file: + tmp_file.write( + f""" + version: ignore + components: + - name: shaper + type: Shaper + params: + func: strings_to_answers + params: + strings: ['a', 'b', 'c'] + outputs: + - answers + pipelines: + - name: query + nodes: + - name: shaper + inputs: + - Query + """ + ) + pipeline = Pipeline.load_from_yaml(path=tmp_path / "tmp_config.yml") + result = pipeline.run() + assert result["invocation_context"]["answers"] == [ + Answer(answer="a", type="generative"), + Answer(answer="b", type="generative"), + Answer(answer="c", type="generative"), + ] + + +# +# answers_to_strings +# + + +def test_answers_to_strings(): + shaper = Shaper(func="answers_to_strings", inputs={"answers": "documents"}, outputs=["strings"]) + results, _ = shaper.run(documents=[Answer(answer="first"), Answer(answer="second"), Answer(answer="third")]) + assert results["invocation_context"]["strings"] == ["first", "second", "third"] + + +def test_answers_to_strings_yaml(tmp_path): + with open(tmp_path / "tmp_config.yml", "w") as tmp_file: + tmp_file.write( + f""" + version: ignore + components: + - name: shaper + type: Shaper + params: + func: answers_to_strings + inputs: + answers: documents + outputs: + - strings + pipelines: + - name: query + nodes: + - name: shaper + inputs: + - Query + """ + ) + pipeline = Pipeline.load_from_yaml(path=tmp_path / "tmp_config.yml") + result = pipeline.run(documents=[Answer(answer="a"), Answer(answer="b"), Answer(answer="c")]) + assert result["invocation_context"]["strings"] == ["a", "b", "c"] + + +# +# strings_to_documents +# + + +def test_strings_to_documents_no_meta_no_hashkeys(): + shaper = Shaper(func="strings_to_documents", inputs={"strings": "responses"}, outputs=["documents"]) + results, _ = shaper.run(invocation_context={"responses": ["first", "second", "third"]}) + assert results["invocation_context"]["documents"] == [ + Document(content="first"), + Document(content="second"), + Document(content="third"), + ] + + +def test_strings_to_documents_single_meta_no_hashkeys(): + shaper = Shaper( + func="strings_to_documents", inputs={"strings": "responses"}, params={"meta": {"a": "A"}}, outputs=["documents"] + ) + results, _ = shaper.run(invocation_context={"responses": ["first", "second", "third"]}) + assert results["invocation_context"]["documents"] == [ + Document(content="first", meta={"a": "A"}), + Document(content="second", meta={"a": "A"}), + Document(content="third", meta={"a": "A"}), + ] + + +def test_strings_to_documents_wrong_number_of_meta(): + shaper = Shaper( + func="strings_to_documents", + inputs={"strings": "responses"}, + params={"meta": [{"a": "A"}]}, + outputs=["documents"], + ) + + with pytest.raises(ValueError, match="Not enough metadata dictionaries."): + shaper.run(invocation_context={"responses": ["first", "second", "third"]}) + + +def test_strings_to_documents_many_meta_no_hashkeys(): + shaper = Shaper( + func="strings_to_documents", + inputs={"strings": "responses"}, + params={"meta": [{"a": i + 1} for i in range(3)]}, + outputs=["documents"], + ) + results, _ = shaper.run(invocation_context={"responses": ["first", "second", "third"]}) + assert results["invocation_context"]["documents"] == [ + Document(content="first", meta={"a": 1}), + Document(content="second", meta={"a": 2}), + Document(content="third", meta={"a": 3}), + ] + + +def test_strings_to_documents_single_meta_with_hashkeys(): + shaper = Shaper( + func="strings_to_documents", + inputs={"strings": "responses"}, + params={"meta": {"a": "A"}, "id_hash_keys": ["content", "meta"]}, + outputs=["documents"], + ) + results, _ = shaper.run(invocation_context={"responses": ["first", "second", "third"]}) + assert results["invocation_context"]["documents"] == [ + Document(content="first", meta={"a": "A"}, id_hash_keys=["content", "meta"]), + Document(content="second", meta={"a": "A"}, id_hash_keys=["content", "meta"]), + Document(content="third", meta={"a": "A"}, id_hash_keys=["content", "meta"]), + ] + + +def test_strings_to_documents_no_meta_no_hashkeys_yaml(tmp_path): + with open(tmp_path / "tmp_config.yml", "w") as tmp_file: + tmp_file.write( + f""" + version: ignore + components: + - name: shaper + type: Shaper + params: + func: strings_to_documents + params: + strings: ['a', 'b', 'c'] + outputs: + - documents + pipelines: + - name: query + nodes: + - name: shaper + inputs: + - Query + """ + ) + pipeline = Pipeline.load_from_yaml(path=tmp_path / "tmp_config.yml") + result = pipeline.run() + assert result["invocation_context"]["documents"] == [ + Document(content="a"), + Document(content="b"), + Document(content="c"), + ] + + +def test_strings_to_documents_meta_and_hashkeys_yaml(tmp_path): + with open(tmp_path / "tmp_config.yml", "w") as tmp_file: + tmp_file.write( + f""" + version: ignore + components: + - name: shaper + type: Shaper + params: + func: strings_to_documents + params: + strings: ['first', 'second', 'third'] + id_hash_keys: ['content', 'meta'] + meta: + - a: 1 + - a: 2 + - a: 3 + outputs: + - documents + pipelines: + - name: query + nodes: + - name: shaper + inputs: + - Query + """ + ) + pipeline = Pipeline.load_from_yaml(path=tmp_path / "tmp_config.yml") + result = pipeline.run() + assert result["invocation_context"]["documents"] == [ + Document(content="first", meta={"a": 1}, id_hash_keys=["content", "meta"]), + Document(content="second", meta={"a": 2}, id_hash_keys=["content", "meta"]), + Document(content="third", meta={"a": 3}, id_hash_keys=["content", "meta"]), + ] + + +# +# documents_to_strings +# + + +def test_documents_to_strings(): + shaper = Shaper(func="documents_to_strings", inputs={"documents": "documents"}, outputs=["strings"]) + results, _ = shaper.run( + documents=[Document(content="first"), Document(content="second"), Document(content="third")] + ) + assert results["invocation_context"]["strings"] == ["first", "second", "third"] + + +def test_documents_to_strings_yaml(tmp_path): + with open(tmp_path / "tmp_config.yml", "w") as tmp_file: + tmp_file.write( + f""" + version: ignore + components: + - name: shaper + type: Shaper + params: + func: documents_to_strings + inputs: + documents: documents + outputs: + - strings + pipelines: + - name: query + nodes: + - name: shaper + inputs: + - Query + """ + ) + pipeline = Pipeline.load_from_yaml(path=tmp_path / "tmp_config.yml") + result = pipeline.run(documents=[Document(content="a"), Document(content="b"), Document(content="c")]) + assert result["invocation_context"]["strings"] == ["a", "b", "c"] + + +# +# Chaining and real-world usage +# + + +def test_chain_shapers(): + shaper_1 = Shaper( + func="join_documents", inputs={"documents": "documents"}, params={"delimiter": " - "}, outputs=["documents"] + ) + shaper_2 = Shaper( + func="value_to_list", inputs={"value": "query", "target_list": "documents"}, outputs=["questions"] + ) + + pipe = Pipeline() + pipe.add_node(shaper_1, name="shaper_1", inputs=["Query"]) + pipe.add_node(shaper_2, name="shaper_2", inputs=["shaper_1"]) + + results = pipe.run( + query="test query", documents=[Document(content="first"), Document(content="second"), Document(content="third")] + ) + + assert results["invocation_context"]["documents"] == [Document(content="first - second - third")] + assert results["invocation_context"]["questions"] == ["test query"] + + +def test_chain_shapers_yaml(tmp_path): + with open(tmp_path / "tmp_config.yml", "w") as tmp_file: + tmp_file.write( + f""" + version: ignore + components: + + - name: shaper_1 + type: Shaper + params: + func: join_documents + inputs: + documents: documents + params: + delimiter: ' - ' + outputs: + - documents + + - name: shaper_2 + type: Shaper + params: + func: value_to_list + inputs: + value: query + target_list: documents + outputs: + - questions + + pipelines: + - name: query + nodes: + - name: shaper_1 + inputs: + - Query + - name: shaper_2 + inputs: + - shaper_1 + """ + ) + pipe = Pipeline.load_from_yaml(path=tmp_path / "tmp_config.yml") + + results = pipe.run( + query="test query", documents=[Document(content="first"), Document(content="second"), Document(content="third")] + ) + + assert results["invocation_context"]["documents"] == [Document(content="first - second - third")] + assert results["invocation_context"]["questions"] == ["test query"] + + +def test_chain_shapers_yaml_2(tmp_path): + with open(tmp_path / "tmp_config.yml", "w") as tmp_file: + tmp_file.write( + f""" + version: ignore + components: + + - name: shaper_1 + type: Shaper + params: + func: strings_to_documents + params: + strings: + - first + - second + - third + outputs: + - string_documents + + - name: shaper_2 + type: Shaper + params: + func: value_to_list + inputs: + target_list: string_documents + params: + value: hello + outputs: + - greetings + + - name: shaper_3 + type: Shaper + params: + func: join_strings + inputs: + strings: greetings + params: + delimiter: '. ' + outputs: + - many_greetings + + - name: expander + type: Shaper + params: + func: value_to_list + inputs: + value: many_greetings + params: + target_list: [1] + outputs: + - many_greetings + + - name: shaper_4 + type: Shaper + params: + func: strings_to_documents + inputs: + strings: many_greetings + outputs: + - documents_with_greetings + + pipelines: + - name: query + nodes: + - name: shaper_1 + inputs: + - Query + - name: shaper_2 + inputs: + - shaper_1 + - name: shaper_3 + inputs: + - shaper_2 + - name: expander + inputs: + - shaper_3 + - name: shaper_4 + inputs: + - expander + """ + ) + pipe = Pipeline.load_from_yaml(path=tmp_path / "tmp_config.yml") + results = pipe.run() + assert results["invocation_context"]["documents_with_greetings"] == [Document(content="hello. hello. hello")] + + +def test_with_prompt_node(tmp_path): + + with open(tmp_path / "tmp_config.yml", "w") as tmp_file: + tmp_file.write( + f""" + version: ignore + components: + - name: prompt_model + type: PromptModel + + - name: shaper + type: Shaper + params: + func: value_to_list + inputs: + value: query + target_list: documents + outputs: + - questions + + - name: prompt_node + type: PromptNode + params: + output_variable: answers + model_name_or_path: prompt_model + default_prompt_template: question-answering + + pipelines: + - name: query + nodes: + - name: shaper + inputs: + - Query + - name: prompt_node + inputs: + - shaper + """ + ) + pipeline = Pipeline.load_from_yaml(path=tmp_path / "tmp_config.yml") + result = pipeline.run( + query="What's Berlin like?", + documents=[Document("Berlin is an amazing city."), Document("Berlin is a cool city in Germany.")], + ) + assert len(result["answers"]) == 2 + assert any(word for word in ["berlin", "germany", "cool", "city", "amazing"] if word in result["answers"]) + + assert len(result["invocation_context"]) > 0 + assert len(result["invocation_context"]["questions"]) == 2 + + +def test_with_multiple_prompt_nodes(tmp_path): + + with open(tmp_path / "tmp_config.yml", "w") as tmp_file: + tmp_file.write( + f""" + version: ignore + components: + - name: prompt_model + type: PromptModel + + - name: shaper + type: Shaper + params: + func: value_to_list + inputs: + value: query + target_list: documents + outputs: [questions] + + - name: renamer + type: Shaper + params: + func: rename + inputs: + value: new-questions + outputs: + - questions + + - name: prompt_node + type: PromptNode + params: + model_name_or_path: prompt_model + default_prompt_template: question-answering + + - name: prompt_node_second + type: PromptNode + params: + model_name_or_path: prompt_model + default_prompt_template: question-generation + output_variable: new-questions + + - name: prompt_node_third + type: PromptNode + params: + output_variable: answers + model_name_or_path: google/flan-t5-small + default_prompt_template: question-answering + + pipelines: + - name: query + nodes: + - name: shaper + inputs: + - Query + - name: prompt_node + inputs: + - shaper + - name: prompt_node_second + inputs: + - prompt_node + - name: renamer + inputs: + - prompt_node_second + - name: prompt_node_third + inputs: + - renamer + """ + ) + pipeline = Pipeline.load_from_yaml(path=tmp_path / "tmp_config.yml") + result = pipeline.run( + query="What's Berlin like?", + documents=[Document("Berlin is an amazing city."), Document("Berlin is a cool city in Germany.")], + ) + results = result["answers"] + assert len(results) == 2 + assert any([True for r in results if "Berlin" in r]) + + +def test_join_query_and_documents_yaml(tmp_path): + with open(tmp_path / "tmp_config.yml", "w") as tmp_file: + tmp_file.write( + f""" + version: ignore + + components: + - name: expander + type: Shaper + params: + func: value_to_list + inputs: + value: query + params: + target_list: [1] + outputs: + - query + + - name: joiner + type: Shaper + params: + func: join_lists + inputs: + lists: + - documents + - query + outputs: + - query + + pipelines: + - name: query + nodes: + - name: expander + inputs: + - Query + - name: joiner + inputs: + - expander + """ + ) + pipeline = Pipeline.load_from_yaml(path=tmp_path / "tmp_config.yml") + result = pipeline.run(query="What is going on here?", documents=["first", "second", "third"]) + assert result["query"] == ["first", "second", "third", "What is going on here?"] + + +def test_join_query_and_documents_into_single_string_yaml(tmp_path): + with open(tmp_path / "tmp_config.yml", "w") as tmp_file: + tmp_file.write( + f""" + version: ignore + components: + - name: expander + type: Shaper + params: + func: value_to_list + inputs: + value: query + params: + target_list: [1] + outputs: + - query + + - name: joiner + type: Shaper + params: + func: join_lists + inputs: + lists: + - documents + - query + outputs: + - query + + - name: concatenator + type: Shaper + params: + func: join_strings + inputs: + strings: query + outputs: + - query + + pipelines: + - name: query + nodes: + - name: expander + inputs: + - Query + - name: joiner + inputs: + - expander + - name: concatenator + inputs: + - joiner + """ + ) + pipeline = Pipeline.load_from_yaml(path=tmp_path / "tmp_config.yml") + result = pipeline.run(query="What is going on here?", documents=["first", "second", "third"]) + assert result["query"] == "first second third What is going on here?" + + +def test_join_query_and_documents_convert_into_documents_yaml(tmp_path): + with open(tmp_path / "tmp_config.yml", "w") as tmp_file: + tmp_file.write( + f""" + version: ignore + components: + - name: expander + type: Shaper + params: + func: value_to_list + inputs: + value: query + params: + target_list: [1] + outputs: + - query + + - name: joiner + type: Shaper + params: + func: join_lists + inputs: + lists: + - documents + - query + outputs: + - query_and_docs + + - name: converter + type: Shaper + params: + func: strings_to_documents + inputs: + strings: query_and_docs + outputs: + - query_and_docs + + pipelines: + - name: query + nodes: + - name: expander + inputs: + - Query + - name: joiner + inputs: + - expander + - name: converter + inputs: + - joiner + """ + ) + pipeline = Pipeline.load_from_yaml(path=tmp_path / "tmp_config.yml") + result = pipeline.run(query="What is going on here?", documents=["first", "second", "third"]) + assert result["invocation_context"]["query_and_docs"] + assert len(result["invocation_context"]["query_and_docs"]) == 4 + assert isinstance(result["invocation_context"]["query_and_docs"][0], Document)