Fix dependency graph for indexing pipelines during codegen (#2311)

* fix dependency graph for indexing pipelines

* Update Documentation & Code Style

* add test and fix get_config for existing components

* Update Documentation & Code Style

* fix mypy finding

* refactored Pipeline.get_config

* Update Documentation & Code Style

* split to_code test into get_config test and generate_code test

* fix child component handling in get_config()

* Update Documentation & Code Style

* fix get_params

* make get_config fully recursive

* add multi level dependency test

* Update Documentation & Code Style

* add some review feedback

* fix multiple dependent components of same type

* fix mypy finding

* rename dependencies to utilized_components

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
tstadel 2022-03-17 22:03:39 +01:00 committed by GitHub
parent b8a3c930f0
commit 8f7dd13eb9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 276 additions and 64 deletions

View File

@ -361,7 +361,7 @@ Reader from multiple Retrievers, or re-ranking of candidate documents.
#### add\_node
```python
def add_node(component, name: str, inputs: List[str])
def add_node(component: BaseComponent, name: str, inputs: List[str])
```
Add a new node to the pipeline.

View File

@ -41,10 +41,7 @@ def exportable_to_yaml(init_func):
# Store all the named input parameters in self._component_config
for k, v in kwargs.items():
if isinstance(v, BaseComponent):
self._component_config["params"][k] = v._component_config
elif v is not None:
self._component_config["params"][k] = v
self._component_config["params"][k] = v
return wrapper_exportable_to_yaml
@ -55,7 +52,6 @@ class BaseComponent(ABC):
"""
outgoing_edges: int
name: Optional[str] = None
_subclasses: dict = {}
_component_config: dict = {}
@ -74,6 +70,36 @@ class BaseComponent(ABC):
# Enables generic load() for all specific component implementations.
cls._subclasses[cls.__name__] = cls
@property
def name(self) -> Optional[str]:
return self._component_config.get("name", None)
@name.setter
def name(self, value: str):
self._component_config["name"] = value
@property
def utilized_components(self) -> List[BaseComponent]:
if "params" not in self._component_config:
return list()
return [param for param in self._component_config["params"].values() if isinstance(param, BaseComponent)]
@property
def type(self) -> str:
return self._component_config["type"]
def get_params(self, return_defaults: bool = False) -> Dict[str, Any]:
component_signature = self._get_signature()
params: Dict[str, Any] = {}
for key, value in self._component_config["params"].items():
if value != component_signature[key].default or return_defaults:
params[key] = value
if return_defaults:
for key, param in component_signature.items():
if key not in params:
params[key] = param.default
return params
@classmethod
def get_subclass(cls, component_type: str):
if component_type not in cls._subclasses.keys():
@ -207,3 +233,13 @@ class BaseComponent(ABC):
output["params"] = params
return output, stream
@classmethod
def _get_signature(cls) -> Dict[str, inspect.Parameter]:
component_classes = inspect.getmro(cls)
component_signature: Dict[str, inspect.Parameter] = {
param_key: parameter
for class_ in component_classes
for param_key, parameter in inspect.signature(class_).parameters.items()
}
return component_signature

View File

@ -1,6 +1,6 @@
from __future__ import annotations
from os import pipe
from typing import Dict, List, Optional, Any
from typing import Dict, List, Optional, Any, Set
import copy
import json
@ -467,14 +467,14 @@ class Pipeline(BasePipeline):
self.root_node = None
@property
def components(self):
def components(self) -> Dict[str, BaseComponent]:
return {
name: attributes["component"]
for name, attributes in self.graph.nodes.items()
if not isinstance(attributes["component"], RootNode)
}
def add_node(self, component, name: str, inputs: List[str]):
def add_node(self, component: BaseComponent, name: str, inputs: List[str]):
"""
Add a new node to the pipeline.
@ -500,6 +500,9 @@ class Pipeline(BasePipeline):
f"Root node '{root_node}' is invalid. Available options are {valid_root_nodes}."
)
component.name = name
component_names = self._get_all_component_names()
component_names.add(name)
self._set_sub_component_names(component, component_names=component_names)
self.graph.add_node(name, component=component, inputs=inputs)
if len(self.graph.nodes) == 2: # first node added; connect with Root
@ -1159,69 +1162,81 @@ class Pipeline(BasePipeline):
:param return_defaults: whether to output parameters that have the default values.
"""
pipeline_name = ROOT_NODE_TO_PIPELINE_NAME[self.root_node.lower()]
pipelines: dict = {pipeline_name: {"name": pipeline_name, "nodes": []}}
pipeline_definitions: Dict[str, Dict] = {pipeline_name: {"name": pipeline_name, "nodes": []}}
components = {}
for node in self.graph.nodes:
if node == self.root_node:
component_definitions: Dict[str, Dict] = {}
for node_name, node_attributes in self.graph.nodes.items():
if node_name == self.root_node:
continue
component_instance = self.graph.nodes.get(node)["component"]
component_type = component_instance._component_config["type"]
component_params = component_instance._component_config["params"]
components[node] = {"name": node, "type": component_type, "params": {}}
component: BaseComponent = node_attributes["component"]
if node_name != component.name:
raise PipelineError(f"Component name '{component.name}' does not match node name '{node_name}'.")
component_parent_classes = inspect.getmro(type(component_instance))
component_signature: dict = {}
for component_parent in component_parent_classes:
component_signature = {**component_signature, **inspect.signature(component_parent).parameters}
for param_key, param_value in component_params.items():
# A parameter for a Component could be another Component. For instance, a Retriever has
# the DocumentStore as a parameter.
# Component configs must be a dict with a "type" key. The "type" keys distinguishes between
# other parameters like "custom_mapping" that are dicts.
# This currently only checks for the case single-level nesting case, wherein, "a Component has another
# Component as a parameter". For deeper nesting cases, this function should be made recursive.
if isinstance(param_value, dict) and "type" in param_value.keys(): # the parameter is a Component
sub_component = param_value
sub_component_type_name = sub_component["type"]
sub_component_signature = inspect.signature(
BaseComponent._subclasses[sub_component_type_name]
).parameters
sub_component_params = {
k: v
for k, v in sub_component["params"].items()
if sub_component_signature[k].default != v or return_defaults is True
}
sub_component_name = self._generate_component_name(
type_name=sub_component_type_name, params=sub_component_params, existing_components=components
)
components[sub_component_name] = {
"name": sub_component_name,
"type": sub_component_type_name,
"params": sub_component_params,
}
components[node]["params"][param_key] = sub_component_name
else:
if component_signature[param_key].default != param_value or return_defaults is True:
components[node]["params"][param_key] = param_value
self._add_component_to_definitions(
component=component, component_definitions=component_definitions, return_defaults=return_defaults
)
# create the Pipeline definition with how the Component are connected
pipelines[pipeline_name]["nodes"].append({"name": node, "inputs": list(self.graph.predecessors(node))})
pipeline_definitions[pipeline_name]["nodes"].append(
{"name": node_name, "inputs": list(self.graph.predecessors(node_name))}
)
config = {
"components": list(components.values()),
"pipelines": list(pipelines.values()),
"components": list(component_definitions.values()),
"pipelines": list(pipeline_definitions.values()),
"version": __version__,
}
return config
def _generate_component_name(self, type_name: str, params: Dict[str, Any], existing_components: Dict[str, Any]):
def _add_component_to_definitions(
self, component: BaseComponent, component_definitions: Dict[str, Dict], return_defaults: bool = False
):
"""
Add the definition of the component and all its dependencies (components too) to the component_definitions dict.
This is used to collect all component definitions within Pipeline.get_config()
"""
if component.name is None:
raise PipelineError(f"Component with config '{component._component_config}' does not have a name.")
component_params: Dict[str, Any] = component.get_params(return_defaults)
# handling of subcomponents: add to definitions and substitute by reference
for param_key, param_value in component_params.items():
if isinstance(param_value, BaseComponent):
sub_component = param_value
self._add_component_to_definitions(sub_component, component_definitions, return_defaults)
component_params[param_key] = sub_component.name
component_definitions[component.name] = {
"name": component.name,
"type": component.type,
"params": component_params,
}
def _get_all_component_names(self, components_to_search: Optional[List[BaseComponent]] = None) -> Set[str]:
component_names = set()
if components_to_search is None:
components_to_search = list(self.components.values())
for component in components_to_search:
if component.name is not None:
component_names.add(component.name)
sub_component_names = self._get_all_component_names(component.utilized_components)
component_names.update(sub_component_names)
return component_names
def _set_sub_component_names(self, component: BaseComponent, component_names: Set[str]):
for sub_component in component.utilized_components:
if sub_component.name is None:
sub_component.name = self._generate_component_name(
type_name=sub_component.type, existing_component_names=component_names
)
component_names.add(sub_component.name)
self._set_sub_component_names(sub_component, component_names=component_names)
def _generate_component_name(self, type_name: str, existing_component_names: Set[str]) -> str:
component_name: str = type_name
# add number if there are multiple distinct ones of the same type
while component_name in existing_components and params != existing_components[component_name]["params"]:
while component_name in existing_component_names:
occupied_num = 1
if len(component_name) > len(type_name):
occupied_num = int(component_name[len(type_name) + 1 :])

View File

@ -113,12 +113,6 @@ def build_component_dependency_graph(
:param component_definitions: the definition of the pipeline components (e.g. use get_component_definitions() to obtain it)
"""
graph = DiGraph()
for node in pipeline_definition["nodes"]:
node_name = node["name"]
graph.add_node(node_name)
for input in node["inputs"]:
if input in component_definitions:
graph.add_edge(input, node_name)
for component_name, component_definition in component_definitions.items():
params = component_definition.get("params", {})
referenced_components: List[str] = list()
@ -129,6 +123,17 @@ def build_component_dependency_graph(
referenced_components.append(param_value)
for referenced_component in referenced_components:
graph.add_edge(referenced_component, component_name)
for node in pipeline_definition["nodes"]:
node_name = node["name"]
graph.add_node(node_name)
for input in node["inputs"]:
if input in component_definitions:
# Special case for (actually permitted) cyclic dependencies between two components:
# e.g. DensePassageRetriever depends on ElasticsearchDocumentStore.
# In indexing pipelines ElasticsearchDocumentStore depends on DensePassageRetriever's output.
# But this second dependency is looser, so we neglect it.
if not graph.has_edge(node_name, input):
graph.add_edge(input, node_name)
return graph

View File

@ -194,6 +194,137 @@ def test_get_config_creates_two_different_dependent_components_of_same_type():
assert expected_component in config["components"]
def test_get_config_reuses_same_dependent_components():
child = ChildComponent()
parent = ParentComponent(dependent=child)
pipeline = Pipeline()
pipeline.add_node(component=parent, name="parent", inputs=["Query"])
pipeline.add_node(component=child, name="child", inputs=["parent"])
config = pipeline.get_config()
expected_pipelines = [
{"name": "query", "nodes": [{"name": "parent", "inputs": ["Query"]}, {"name": "child", "inputs": ["parent"]}]}
]
expected_components = [
{"name": "parent", "type": "ParentComponent", "params": {"dependent": "child"}},
{"name": "child", "type": "ChildComponent", "params": {}},
]
config = pipeline.get_config()
for expected_pipeline in expected_pipelines:
assert expected_pipeline in config["pipelines"]
for expected_component in expected_components:
assert expected_component in config["components"]
def test_get_config_creates_different_components_if_instances_differ():
child_a = ChildComponent()
child_b = ChildComponent()
child_c = ChildComponent()
parent = ParentComponent(dependent=child_a)
parent2 = ParentComponent(dependent=child_b)
p_ensemble = Pipeline()
p_ensemble.add_node(component=parent, name="ParentA", inputs=["Query"])
p_ensemble.add_node(component=parent2, name="ParentB", inputs=["Query"])
p_ensemble.add_node(component=child_c, name="Child", inputs=["Query"])
expected_components = [
{"name": "ParentA", "type": "ParentComponent", "params": {"dependent": "ChildComponent"}},
{"name": "ChildComponent", "type": "ChildComponent", "params": {}},
{"name": "ParentB", "type": "ParentComponent", "params": {"dependent": "ChildComponent_2"}},
{"name": "ChildComponent_2", "type": "ChildComponent", "params": {}},
{"name": "Child", "type": "ChildComponent", "params": {}},
]
expected_pipelines = [
{
"name": "query",
"nodes": [
{"name": "ParentA", "inputs": ["Query"]},
{"name": "ParentB", "inputs": ["Query"]},
{"name": "Child", "inputs": ["Query"]},
],
}
]
config = p_ensemble.get_config()
for expected_pipeline in expected_pipelines:
assert expected_pipeline in config["pipelines"]
for expected_component in expected_components:
assert expected_component in config["components"]
def test_get_config_reuses_same_unnamed_dependent_components():
child = ChildComponent()
parent = ParentComponent(dependent=child)
parent2 = ParentComponent(dependent=child)
p_ensemble = Pipeline()
p_ensemble.add_node(component=parent, name="ParentA", inputs=["Query"])
p_ensemble.add_node(component=parent2, name="ParentB", inputs=["Query"])
expected_components = [
{"name": "ParentA", "type": "ParentComponent", "params": {"dependent": "ChildComponent"}},
{"name": "ChildComponent", "type": "ChildComponent", "params": {}},
{"name": "ParentB", "type": "ParentComponent", "params": {"dependent": "ChildComponent"}},
]
expected_pipelines = [
{"name": "query", "nodes": [{"name": "ParentA", "inputs": ["Query"]}, {"name": "ParentB", "inputs": ["Query"]}]}
]
config = p_ensemble.get_config()
for expected_pipeline in expected_pipelines:
assert expected_pipeline in config["pipelines"]
for expected_component in expected_components:
assert expected_component in config["components"]
def test_get_config_multi_level_dependencies():
child = ChildComponent()
intermediate = ParentComponent(dependent=child)
parent = ParentComponent(dependent=intermediate)
p_ensemble = Pipeline()
p_ensemble.add_node(component=parent, name="Parent", inputs=["Query"])
expected_components = [
{"name": "Parent", "type": "ParentComponent", "params": {"dependent": "ParentComponent"}},
{"name": "ChildComponent", "type": "ChildComponent", "params": {}},
{"name": "ParentComponent", "type": "ParentComponent", "params": {"dependent": "ChildComponent"}},
]
expected_pipelines = [{"name": "query", "nodes": [{"name": "Parent", "inputs": ["Query"]}]}]
config = p_ensemble.get_config()
for expected_pipeline in expected_pipelines:
assert expected_pipeline in config["pipelines"]
for expected_component in expected_components:
assert expected_component in config["components"]
def test_get_config_multi_level_dependencies_of_same_type():
child = ChildComponent()
second_intermediate = ParentComponent(dependent=child)
intermediate = ParentComponent(dependent=second_intermediate)
parent = ParentComponent(dependent=intermediate)
p_ensemble = Pipeline()
p_ensemble.add_node(component=parent, name="ParentComponent", inputs=["Query"])
expected_components = [
{"name": "ParentComponent_3", "type": "ParentComponent", "params": {"dependent": "ChildComponent"}},
{"name": "ParentComponent_2", "type": "ParentComponent", "params": {"dependent": "ParentComponent_3"}},
{"name": "ParentComponent", "type": "ParentComponent", "params": {"dependent": "ParentComponent_2"}},
{"name": "ChildComponent", "type": "ChildComponent", "params": {}},
]
expected_pipelines = [{"name": "query", "nodes": [{"name": "ParentComponent", "inputs": ["Query"]}]}]
config = p_ensemble.get_config()
for expected_pipeline in expected_pipelines:
assert expected_pipeline in config["pipelines"]
for expected_component in expected_components:
assert expected_component in config["components"]
def test_get_config_component_with_superclass_arguments():
class CustomBaseDocumentStore(MockDocumentStore):
def __init__(self, base_parameter: str):
@ -389,6 +520,31 @@ def test_generate_code_is_component_order_invariant():
assert code == expected_code
def test_generate_code_can_handle_weak_cyclic_pipelines():
config = {
"version": "unstable",
"components": [
{"name": "parent", "type": "ParentComponent", "params": {"dependent": "child"}},
{"name": "child", "type": "ChildComponent", "params": {}},
],
"pipelines": [
{
"name": "query",
"nodes": [{"name": "parent", "inputs": ["Query"]}, {"name": "child", "inputs": ["parent"]}],
}
],
}
code = generate_code(pipeline_config=config, generate_imports=False)
assert code == (
"child = ChildComponent()\n"
"parent = ParentComponent(dependent=child)\n"
"\n"
"pipeline = Pipeline()\n"
'pipeline.add_node(component=parent, name="parent", inputs=["Query"])\n'
'pipeline.add_node(component=child, name="child", inputs=["parent"])'
)
@pytest.mark.parametrize("input", ["\btest", " test", "#test", "+test", "\ttest", "\ntest", "test()"])
def test_validate_user_input_invalid(input):
with pytest.raises(PipelineConfigError, match="is not a valid variable name or value"):