diff --git a/docs/_src/api/api/pipelines.md b/docs/_src/api/api/pipelines.md index 7dbbff393..88888ec3f 100644 --- a/docs/_src/api/api/pipelines.md +++ b/docs/_src/api/api/pipelines.md @@ -361,7 +361,7 @@ Reader from multiple Retrievers, or re-ranking of candidate documents. #### add\_node ```python -def add_node(component, name: str, inputs: List[str]) +def add_node(component: BaseComponent, name: str, inputs: List[str]) ``` Add a new node to the pipeline. diff --git a/haystack/nodes/base.py b/haystack/nodes/base.py index a647163d0..a208f5c97 100644 --- a/haystack/nodes/base.py +++ b/haystack/nodes/base.py @@ -41,10 +41,7 @@ def exportable_to_yaml(init_func): # Store all the named input parameters in self._component_config for k, v in kwargs.items(): - if isinstance(v, BaseComponent): - self._component_config["params"][k] = v._component_config - elif v is not None: - self._component_config["params"][k] = v + self._component_config["params"][k] = v return wrapper_exportable_to_yaml @@ -55,7 +52,6 @@ class BaseComponent(ABC): """ outgoing_edges: int - name: Optional[str] = None _subclasses: dict = {} _component_config: dict = {} @@ -74,6 +70,36 @@ class BaseComponent(ABC): # Enables generic load() for all specific component implementations. cls._subclasses[cls.__name__] = cls + @property + def name(self) -> Optional[str]: + return self._component_config.get("name", None) + + @name.setter + def name(self, value: str): + self._component_config["name"] = value + + @property + def utilized_components(self) -> List[BaseComponent]: + if "params" not in self._component_config: + return list() + return [param for param in self._component_config["params"].values() if isinstance(param, BaseComponent)] + + @property + def type(self) -> str: + return self._component_config["type"] + + def get_params(self, return_defaults: bool = False) -> Dict[str, Any]: + component_signature = self._get_signature() + params: Dict[str, Any] = {} + for key, value in self._component_config["params"].items(): + if value != component_signature[key].default or return_defaults: + params[key] = value + if return_defaults: + for key, param in component_signature.items(): + if key not in params: + params[key] = param.default + return params + @classmethod def get_subclass(cls, component_type: str): if component_type not in cls._subclasses.keys(): @@ -207,3 +233,13 @@ class BaseComponent(ABC): output["params"] = params return output, stream + + @classmethod + def _get_signature(cls) -> Dict[str, inspect.Parameter]: + component_classes = inspect.getmro(cls) + component_signature: Dict[str, inspect.Parameter] = { + param_key: parameter + for class_ in component_classes + for param_key, parameter in inspect.signature(class_).parameters.items() + } + return component_signature diff --git a/haystack/pipelines/base.py b/haystack/pipelines/base.py index c10e3e9a2..29e16e7a0 100644 --- a/haystack/pipelines/base.py +++ b/haystack/pipelines/base.py @@ -1,6 +1,6 @@ from __future__ import annotations from os import pipe -from typing import Dict, List, Optional, Any +from typing import Dict, List, Optional, Any, Set import copy import json @@ -467,14 +467,14 @@ class Pipeline(BasePipeline): self.root_node = None @property - def components(self): + def components(self) -> Dict[str, BaseComponent]: return { name: attributes["component"] for name, attributes in self.graph.nodes.items() if not isinstance(attributes["component"], RootNode) } - def add_node(self, component, name: str, inputs: List[str]): + def add_node(self, component: BaseComponent, name: str, inputs: List[str]): """ Add a new node to the pipeline. @@ -500,6 +500,9 @@ class Pipeline(BasePipeline): f"Root node '{root_node}' is invalid. Available options are {valid_root_nodes}." ) component.name = name + component_names = self._get_all_component_names() + component_names.add(name) + self._set_sub_component_names(component, component_names=component_names) self.graph.add_node(name, component=component, inputs=inputs) if len(self.graph.nodes) == 2: # first node added; connect with Root @@ -1159,69 +1162,81 @@ class Pipeline(BasePipeline): :param return_defaults: whether to output parameters that have the default values. """ pipeline_name = ROOT_NODE_TO_PIPELINE_NAME[self.root_node.lower()] - pipelines: dict = {pipeline_name: {"name": pipeline_name, "nodes": []}} + pipeline_definitions: Dict[str, Dict] = {pipeline_name: {"name": pipeline_name, "nodes": []}} - components = {} - for node in self.graph.nodes: - if node == self.root_node: + component_definitions: Dict[str, Dict] = {} + for node_name, node_attributes in self.graph.nodes.items(): + if node_name == self.root_node: continue - component_instance = self.graph.nodes.get(node)["component"] - component_type = component_instance._component_config["type"] - component_params = component_instance._component_config["params"] - components[node] = {"name": node, "type": component_type, "params": {}} + component: BaseComponent = node_attributes["component"] + if node_name != component.name: + raise PipelineError(f"Component name '{component.name}' does not match node name '{node_name}'.") - component_parent_classes = inspect.getmro(type(component_instance)) - component_signature: dict = {} - for component_parent in component_parent_classes: - component_signature = {**component_signature, **inspect.signature(component_parent).parameters} - - for param_key, param_value in component_params.items(): - # A parameter for a Component could be another Component. For instance, a Retriever has - # the DocumentStore as a parameter. - # Component configs must be a dict with a "type" key. The "type" keys distinguishes between - # other parameters like "custom_mapping" that are dicts. - # This currently only checks for the case single-level nesting case, wherein, "a Component has another - # Component as a parameter". For deeper nesting cases, this function should be made recursive. - if isinstance(param_value, dict) and "type" in param_value.keys(): # the parameter is a Component - sub_component = param_value - sub_component_type_name = sub_component["type"] - sub_component_signature = inspect.signature( - BaseComponent._subclasses[sub_component_type_name] - ).parameters - sub_component_params = { - k: v - for k, v in sub_component["params"].items() - if sub_component_signature[k].default != v or return_defaults is True - } - - sub_component_name = self._generate_component_name( - type_name=sub_component_type_name, params=sub_component_params, existing_components=components - ) - components[sub_component_name] = { - "name": sub_component_name, - "type": sub_component_type_name, - "params": sub_component_params, - } - components[node]["params"][param_key] = sub_component_name - else: - if component_signature[param_key].default != param_value or return_defaults is True: - components[node]["params"][param_key] = param_value + self._add_component_to_definitions( + component=component, component_definitions=component_definitions, return_defaults=return_defaults + ) # create the Pipeline definition with how the Component are connected - pipelines[pipeline_name]["nodes"].append({"name": node, "inputs": list(self.graph.predecessors(node))}) + pipeline_definitions[pipeline_name]["nodes"].append( + {"name": node_name, "inputs": list(self.graph.predecessors(node_name))} + ) config = { - "components": list(components.values()), - "pipelines": list(pipelines.values()), + "components": list(component_definitions.values()), + "pipelines": list(pipeline_definitions.values()), "version": __version__, } return config - def _generate_component_name(self, type_name: str, params: Dict[str, Any], existing_components: Dict[str, Any]): + def _add_component_to_definitions( + self, component: BaseComponent, component_definitions: Dict[str, Dict], return_defaults: bool = False + ): + """ + Add the definition of the component and all its dependencies (components too) to the component_definitions dict. + This is used to collect all component definitions within Pipeline.get_config() + """ + if component.name is None: + raise PipelineError(f"Component with config '{component._component_config}' does not have a name.") + + component_params: Dict[str, Any] = component.get_params(return_defaults) + # handling of subcomponents: add to definitions and substitute by reference + for param_key, param_value in component_params.items(): + if isinstance(param_value, BaseComponent): + sub_component = param_value + self._add_component_to_definitions(sub_component, component_definitions, return_defaults) + component_params[param_key] = sub_component.name + + component_definitions[component.name] = { + "name": component.name, + "type": component.type, + "params": component_params, + } + + def _get_all_component_names(self, components_to_search: Optional[List[BaseComponent]] = None) -> Set[str]: + component_names = set() + if components_to_search is None: + components_to_search = list(self.components.values()) + for component in components_to_search: + if component.name is not None: + component_names.add(component.name) + sub_component_names = self._get_all_component_names(component.utilized_components) + component_names.update(sub_component_names) + return component_names + + def _set_sub_component_names(self, component: BaseComponent, component_names: Set[str]): + for sub_component in component.utilized_components: + if sub_component.name is None: + sub_component.name = self._generate_component_name( + type_name=sub_component.type, existing_component_names=component_names + ) + component_names.add(sub_component.name) + self._set_sub_component_names(sub_component, component_names=component_names) + + def _generate_component_name(self, type_name: str, existing_component_names: Set[str]) -> str: component_name: str = type_name # add number if there are multiple distinct ones of the same type - while component_name in existing_components and params != existing_components[component_name]["params"]: + while component_name in existing_component_names: occupied_num = 1 if len(component_name) > len(type_name): occupied_num = int(component_name[len(type_name) + 1 :]) diff --git a/haystack/pipelines/config.py b/haystack/pipelines/config.py index bba20f30f..6e8c77626 100644 --- a/haystack/pipelines/config.py +++ b/haystack/pipelines/config.py @@ -113,12 +113,6 @@ def build_component_dependency_graph( :param component_definitions: the definition of the pipeline components (e.g. use get_component_definitions() to obtain it) """ graph = DiGraph() - for node in pipeline_definition["nodes"]: - node_name = node["name"] - graph.add_node(node_name) - for input in node["inputs"]: - if input in component_definitions: - graph.add_edge(input, node_name) for component_name, component_definition in component_definitions.items(): params = component_definition.get("params", {}) referenced_components: List[str] = list() @@ -129,6 +123,17 @@ def build_component_dependency_graph( referenced_components.append(param_value) for referenced_component in referenced_components: graph.add_edge(referenced_component, component_name) + for node in pipeline_definition["nodes"]: + node_name = node["name"] + graph.add_node(node_name) + for input in node["inputs"]: + if input in component_definitions: + # Special case for (actually permitted) cyclic dependencies between two components: + # e.g. DensePassageRetriever depends on ElasticsearchDocumentStore. + # In indexing pipelines ElasticsearchDocumentStore depends on DensePassageRetriever's output. + # But this second dependency is looser, so we neglect it. + if not graph.has_edge(node_name, input): + graph.add_edge(input, node_name) return graph diff --git a/test/test_pipeline.py b/test/test_pipeline.py index ce7bdcade..30b36cafe 100644 --- a/test/test_pipeline.py +++ b/test/test_pipeline.py @@ -194,6 +194,137 @@ def test_get_config_creates_two_different_dependent_components_of_same_type(): assert expected_component in config["components"] +def test_get_config_reuses_same_dependent_components(): + child = ChildComponent() + parent = ParentComponent(dependent=child) + pipeline = Pipeline() + pipeline.add_node(component=parent, name="parent", inputs=["Query"]) + pipeline.add_node(component=child, name="child", inputs=["parent"]) + config = pipeline.get_config() + + expected_pipelines = [ + {"name": "query", "nodes": [{"name": "parent", "inputs": ["Query"]}, {"name": "child", "inputs": ["parent"]}]} + ] + expected_components = [ + {"name": "parent", "type": "ParentComponent", "params": {"dependent": "child"}}, + {"name": "child", "type": "ChildComponent", "params": {}}, + ] + + config = pipeline.get_config() + for expected_pipeline in expected_pipelines: + assert expected_pipeline in config["pipelines"] + for expected_component in expected_components: + assert expected_component in config["components"] + + +def test_get_config_creates_different_components_if_instances_differ(): + child_a = ChildComponent() + child_b = ChildComponent() + child_c = ChildComponent() + parent = ParentComponent(dependent=child_a) + parent2 = ParentComponent(dependent=child_b) + p_ensemble = Pipeline() + p_ensemble.add_node(component=parent, name="ParentA", inputs=["Query"]) + p_ensemble.add_node(component=parent2, name="ParentB", inputs=["Query"]) + p_ensemble.add_node(component=child_c, name="Child", inputs=["Query"]) + + expected_components = [ + {"name": "ParentA", "type": "ParentComponent", "params": {"dependent": "ChildComponent"}}, + {"name": "ChildComponent", "type": "ChildComponent", "params": {}}, + {"name": "ParentB", "type": "ParentComponent", "params": {"dependent": "ChildComponent_2"}}, + {"name": "ChildComponent_2", "type": "ChildComponent", "params": {}}, + {"name": "Child", "type": "ChildComponent", "params": {}}, + ] + + expected_pipelines = [ + { + "name": "query", + "nodes": [ + {"name": "ParentA", "inputs": ["Query"]}, + {"name": "ParentB", "inputs": ["Query"]}, + {"name": "Child", "inputs": ["Query"]}, + ], + } + ] + + config = p_ensemble.get_config() + for expected_pipeline in expected_pipelines: + assert expected_pipeline in config["pipelines"] + for expected_component in expected_components: + assert expected_component in config["components"] + + +def test_get_config_reuses_same_unnamed_dependent_components(): + child = ChildComponent() + parent = ParentComponent(dependent=child) + parent2 = ParentComponent(dependent=child) + p_ensemble = Pipeline() + p_ensemble.add_node(component=parent, name="ParentA", inputs=["Query"]) + p_ensemble.add_node(component=parent2, name="ParentB", inputs=["Query"]) + + expected_components = [ + {"name": "ParentA", "type": "ParentComponent", "params": {"dependent": "ChildComponent"}}, + {"name": "ChildComponent", "type": "ChildComponent", "params": {}}, + {"name": "ParentB", "type": "ParentComponent", "params": {"dependent": "ChildComponent"}}, + ] + + expected_pipelines = [ + {"name": "query", "nodes": [{"name": "ParentA", "inputs": ["Query"]}, {"name": "ParentB", "inputs": ["Query"]}]} + ] + + config = p_ensemble.get_config() + for expected_pipeline in expected_pipelines: + assert expected_pipeline in config["pipelines"] + for expected_component in expected_components: + assert expected_component in config["components"] + + +def test_get_config_multi_level_dependencies(): + child = ChildComponent() + intermediate = ParentComponent(dependent=child) + parent = ParentComponent(dependent=intermediate) + p_ensemble = Pipeline() + p_ensemble.add_node(component=parent, name="Parent", inputs=["Query"]) + + expected_components = [ + {"name": "Parent", "type": "ParentComponent", "params": {"dependent": "ParentComponent"}}, + {"name": "ChildComponent", "type": "ChildComponent", "params": {}}, + {"name": "ParentComponent", "type": "ParentComponent", "params": {"dependent": "ChildComponent"}}, + ] + + expected_pipelines = [{"name": "query", "nodes": [{"name": "Parent", "inputs": ["Query"]}]}] + + config = p_ensemble.get_config() + for expected_pipeline in expected_pipelines: + assert expected_pipeline in config["pipelines"] + for expected_component in expected_components: + assert expected_component in config["components"] + + +def test_get_config_multi_level_dependencies_of_same_type(): + child = ChildComponent() + second_intermediate = ParentComponent(dependent=child) + intermediate = ParentComponent(dependent=second_intermediate) + parent = ParentComponent(dependent=intermediate) + p_ensemble = Pipeline() + p_ensemble.add_node(component=parent, name="ParentComponent", inputs=["Query"]) + + expected_components = [ + {"name": "ParentComponent_3", "type": "ParentComponent", "params": {"dependent": "ChildComponent"}}, + {"name": "ParentComponent_2", "type": "ParentComponent", "params": {"dependent": "ParentComponent_3"}}, + {"name": "ParentComponent", "type": "ParentComponent", "params": {"dependent": "ParentComponent_2"}}, + {"name": "ChildComponent", "type": "ChildComponent", "params": {}}, + ] + + expected_pipelines = [{"name": "query", "nodes": [{"name": "ParentComponent", "inputs": ["Query"]}]}] + + config = p_ensemble.get_config() + for expected_pipeline in expected_pipelines: + assert expected_pipeline in config["pipelines"] + for expected_component in expected_components: + assert expected_component in config["components"] + + def test_get_config_component_with_superclass_arguments(): class CustomBaseDocumentStore(MockDocumentStore): def __init__(self, base_parameter: str): @@ -389,6 +520,31 @@ def test_generate_code_is_component_order_invariant(): assert code == expected_code +def test_generate_code_can_handle_weak_cyclic_pipelines(): + config = { + "version": "unstable", + "components": [ + {"name": "parent", "type": "ParentComponent", "params": {"dependent": "child"}}, + {"name": "child", "type": "ChildComponent", "params": {}}, + ], + "pipelines": [ + { + "name": "query", + "nodes": [{"name": "parent", "inputs": ["Query"]}, {"name": "child", "inputs": ["parent"]}], + } + ], + } + code = generate_code(pipeline_config=config, generate_imports=False) + assert code == ( + "child = ChildComponent()\n" + "parent = ParentComponent(dependent=child)\n" + "\n" + "pipeline = Pipeline()\n" + 'pipeline.add_node(component=parent, name="parent", inputs=["Query"])\n' + 'pipeline.add_node(component=child, name="child", inputs=["parent"])' + ) + + @pytest.mark.parametrize("input", ["\btest", " test", "#test", "+test", "\ttest", "\ntest", "test()"]) def test_validate_user_input_invalid(input): with pytest.raises(PipelineConfigError, match="is not a valid variable name or value"):