Remove specialised Pipeline (#5584)

* Remove Pipeline

* Add release notes

* Enhance imports

* Update release note

Co-authored-by: Massimiliano Pippi <mpippi@gmail.com>

* Remove Pipeline tests

---------

Co-authored-by: Massimiliano Pippi <mpippi@gmail.com>
This commit is contained in:
Silvano Cerza 2023-08-18 17:48:13 +02:00 committed by GitHub
parent 72e0a588db
commit 4ef813fc8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 4 additions and 420 deletions

View File

@ -1,3 +1,2 @@
from canals.component import component
from canals import component, Pipeline
from haystack.preview.dataclasses import *
from haystack.preview.pipeline import Pipeline, PipelineError, NoSuchDocumentStoreError, load_pipelines, save_pipelines

View File

@ -1,115 +0,0 @@
from typing import List, Dict, Any, Optional, Callable
from pathlib import Path
from canals.pipeline import (
Pipeline as CanalsPipeline,
PipelineError,
load_pipelines as load_canals_pipelines,
save_pipelines as save_canals_pipelines,
)
from haystack.preview.document_stores.protocols import DocumentStore
from haystack.preview.document_stores.mixins import DocumentStoreAwareMixin
class NotADocumentStoreError(PipelineError):
pass
class NoSuchDocumentStoreError(PipelineError):
pass
class Pipeline(CanalsPipeline):
"""
Haystack Pipeline is a thin wrapper over Canals' Pipelines to add support for DocumentStores.
"""
def __init__(self):
super().__init__()
self._document_stores: Dict[str, DocumentStore] = {}
def add_document_store(self, name: str, document_store: DocumentStore) -> None:
"""
Make a DocumentStore available to all nodes of this pipeline.
:param name: the name of the DocumentStore.
:param document_store: the DocumentStore object.
:returns: None
"""
if not getattr(document_store, "__haystack_document_store__", False):
raise NotADocumentStoreError(
f"'{type(document_store).__name__}' is not decorated with @document_store, "
"so it can't be added to the pipeline with Pipeline.add_document_store()."
)
self._document_stores[name] = document_store
def list_document_stores(self) -> List[str]:
"""
Returns a dictionary with all the DocumentStores that are attached to this Pipeline.
:returns: a dictionary with all the DocumentStores attached to this Pipeline.
"""
return list(self._document_stores.keys())
def get_document_store(self, name: str) -> DocumentStore:
"""
Returns the DocumentStore associated with the given name.
:param name: the name of the DocumentStore
:returns: the DocumentStore
"""
try:
return self._document_stores[name]
except KeyError as e:
raise NoSuchDocumentStoreError(f"No DocumentStore named '{name}' was added to this pipeline.") from e
def add_component(self, name: str, instance: Any, document_store: Optional[str] = None) -> None:
"""
Make this component available to the pipeline. Components are not connected to anything by default:
use `Pipeline.connect()` to connect components together.
Component names must be unique, but component instances can be reused if needed.
If `document_store` is not None, the pipeline will also connect this component to the requested DocumentStore.
Note that only components that inherit from DocumentStoreAwareMixin can be connected to DocumentStores.
:param name: the name of the component.
:param instance: the component instance.
:param document_store: the DocumentStore this component needs access to, if any.
:raises ValueError: if:
- a component with the same name already exists
- a component requiring a DocumentStore didn't receive it
- a component that didn't expect a DocumentStore received it
:raises PipelineValidationError: if the given instance is not a component
:raises NoSuchDocumentStoreError: if the given DocumentStore name is not known to the pipeline
"""
if isinstance(instance, DocumentStoreAwareMixin):
if not document_store:
raise ValueError(f"Component '{name}' needs a DocumentStore.")
if document_store not in self._document_stores:
raise NoSuchDocumentStoreError(
f"DocumentStore named '{document_store}' not found. "
f"Add it with 'pipeline.add_document_store('{document_store}', <the DocumentStore instance>)'."
)
if instance.document_store:
raise ValueError("Reusing components with DocumentStores is not supported. Create a separate instance.")
instance.document_store = self._document_stores[document_store]
instance._document_store_name = document_store
elif document_store:
raise ValueError(f"Component '{name}' doesn't support DocumentStores.")
super().add_component(name, instance)
def load_pipelines(path: Path, _reader: Optional[Callable[..., Any]] = None):
return load_canals_pipelines(path=path, _reader=_reader)
def save_pipelines(pipelines: Dict[str, Pipeline], path: Path, _writer: Optional[Callable[..., Any]] = None):
save_canals_pipelines(pipelines=pipelines, path=path, _writer=_writer)

View File

@ -0,0 +1,3 @@
---
preview:
- Remove Pipeline specialisation to support DocumentStores.

View File

@ -1,303 +0,0 @@
from typing import Any, Optional, Dict, List
import pytest
from haystack.preview import Pipeline, component, Document
from haystack.preview.document_stores import document_store
from haystack.preview.pipeline import NotADocumentStoreError, NoSuchDocumentStoreError
from haystack.preview.document_stores import DocumentStoreAwareMixin, DuplicatePolicy, DocumentStore
# Note: we're using a real class instead of a mock because mocks don't play too well with protocols.
@document_store
class MockStore:
def count_documents(self) -> int:
return 0
def filter_documents(self, filters: Optional[Dict[str, Any]] = None) -> List[Document]:
return []
def write_documents(self, documents: List[Document], policy: DuplicatePolicy = DuplicatePolicy.FAIL) -> None:
return None
def delete_documents(self, document_ids: List[str]) -> None:
return None
@pytest.mark.unit
def test_add_store():
store_1 = MockStore()
store_2 = MockStore()
pipe = Pipeline()
pipe.add_document_store(name="first_store", document_store=store_1)
pipe.add_document_store(name="second_store", document_store=store_2)
assert pipe._document_stores.get("first_store") == store_1
assert pipe._document_stores.get("second_store") == store_2
@pytest.mark.unit
def test_add_store_wrong_object():
pipe = Pipeline()
with pytest.raises(NotADocumentStoreError, match="'str' is not decorated with @document_store,"):
pipe.add_document_store(name="document_store", document_store="I'm surely not a DocumentStore object!")
@pytest.mark.unit
def test_list_stores():
store_1 = MockStore()
store_2 = MockStore()
pipe = Pipeline()
pipe.add_document_store(name="first_store", document_store=store_1)
pipe.add_document_store(name="second_store", document_store=store_2)
assert pipe.list_document_stores() == ["first_store", "second_store"]
@pytest.mark.unit
def test_get_store():
store_1 = MockStore()
store_2 = MockStore()
pipe = Pipeline()
pipe.add_document_store(name="first_store", document_store=store_1)
pipe.add_document_store(name="second_store", document_store=store_2)
assert pipe.get_document_store("first_store") == store_1
assert pipe.get_document_store("second_store") == store_2
@pytest.mark.unit
def test_get_store_wrong_name():
store_1 = MockStore()
pipe = Pipeline()
with pytest.raises(NoSuchDocumentStoreError):
pipe.get_document_store("first_store")
pipe.add_document_store(name="first_store", document_store=store_1)
assert pipe.get_document_store("first_store") == store_1
with pytest.raises(NoSuchDocumentStoreError):
pipe.get_document_store("third_store")
@pytest.mark.unit
def test_add_component_store_aware_component_receives_one_docstore():
store_1 = MockStore()
store_2 = MockStore()
@component
class MockComponent(DocumentStoreAwareMixin):
supported_document_stores = [DocumentStore]
@component.output_types(value=int)
def run(self, value: int):
return {"value": value}
mock = MockComponent()
pipe = Pipeline()
pipe.add_document_store(name="first_store", document_store=store_1)
pipe.add_document_store(name="second_store", document_store=store_2)
pipe.add_component("component", mock, document_store="first_store")
assert mock.document_store == store_1
assert mock._document_store_name == "first_store"
assert pipe.run(data={"component": {"value": 1}}) == {"component": {"value": 1}}
@pytest.mark.unit
def test_add_component_store_aware_component_receives_no_docstore():
store_1 = MockStore()
store_2 = MockStore()
@component
class MockComponent(DocumentStoreAwareMixin):
supported_document_stores = [DocumentStore]
@component.output_types(value=int)
def run(self, value: int):
return {"value": value}
pipe = Pipeline()
pipe.add_document_store(name="first_store", document_store=store_1)
pipe.add_document_store(name="second_store", document_store=store_2)
with pytest.raises(ValueError, match="Component 'component' needs a DocumentStore."):
pipe.add_component("component", MockComponent())
@pytest.mark.unit
def test_non_store_aware_component_receives_one_docstore():
store_1 = MockStore()
store_2 = MockStore()
@component
class MockComponent:
supported_document_stores = [DocumentStore]
@component.output_types(value=int)
def run(self, value: int):
return {"value": value}
pipe = Pipeline()
pipe.add_document_store(name="first_store", document_store=store_1)
pipe.add_document_store(name="second_store", document_store=store_2)
with pytest.raises(ValueError, match="Component 'component' doesn't support DocumentStores."):
pipe.add_component("component", MockComponent(), document_store="first_store")
@pytest.mark.unit
def test_add_component_store_aware_component_receives_wrong_docstore_name():
store_1 = MockStore()
store_2 = MockStore()
@component
class MockComponent(DocumentStoreAwareMixin):
supported_document_stores = [DocumentStore]
@component.output_types(value=int)
def run(self, value: int):
return {"value": value}
pipe = Pipeline()
pipe.add_document_store(name="first_store", document_store=store_1)
pipe.add_document_store(name="second_store", document_store=store_2)
with pytest.raises(NoSuchDocumentStoreError, match="DocumentStore named 'wrong_store' not found."):
pipe.add_component("component", MockComponent(), document_store="wrong_store")
@pytest.mark.unit
def test_add_component_store_aware_component_receives_correct_docstore_type():
store_1 = MockStore()
store_2 = MockStore()
@component
class MockComponent(DocumentStoreAwareMixin):
supported_document_stores = [MockStore]
@component.output_types(value=int)
def run(self, value: int):
return {"value": value}
mock = MockComponent()
pipe = Pipeline()
pipe.add_document_store(name="first_store", document_store=store_1)
pipe.add_document_store(name="second_store", document_store=store_2)
pipe.add_component("component", mock, document_store="second_store")
assert mock.document_store == store_2
assert mock._document_store_name == "second_store"
@pytest.mark.unit
def test_add_component_store_aware_component_is_reused():
store_1 = MockStore()
store_2 = MockStore()
@component
class MockComponent(DocumentStoreAwareMixin):
supported_document_stores = [MockStore]
@component.output_types(value=int)
def run(self, value: int):
return {"value": value}
mock = MockComponent()
pipe = Pipeline()
pipe.add_document_store(name="first_store", document_store=store_1)
pipe.add_document_store(name="second_store", document_store=store_2)
pipe.add_component("component", mock, document_store="second_store")
with pytest.raises(ValueError, match="Reusing components with DocumentStores is not supported"):
pipe.add_component("component2", mock, document_store="second_store")
with pytest.raises(ValueError, match="Reusing components with DocumentStores is not supported"):
pipe.add_component("component2", mock, document_store="first_store")
assert mock.document_store == store_2
assert mock._document_store_name == "second_store"
@pytest.mark.unit
def test_add_component_store_aware_component_receives_subclass_of_correct_docstore_type():
class MockStoreSubclass(MockStore):
...
store_1 = MockStoreSubclass()
store_2 = MockStore()
@component
class MockComponent(DocumentStoreAwareMixin):
supported_document_stores = [MockStore]
@component.output_types(value=int)
def run(self, value: int):
return {"value": value}
mock = MockComponent()
mock2 = MockComponent()
pipe = Pipeline()
pipe.add_document_store(name="first_store", document_store=store_1)
pipe.add_document_store(name="second_store", document_store=store_2)
pipe.add_component("component", mock, document_store="first_store")
assert mock.document_store == store_1
assert mock._document_store_name == "first_store"
pipe.add_component("component2", mock2, document_store="second_store")
assert mock2._document_store_name == "second_store"
@pytest.mark.unit
def test_add_component_store_aware_component_does_not_check_supported_stores():
class SomethingElse:
...
@component
class MockComponent(DocumentStoreAwareMixin):
supported_document_stores = [SomethingElse]
@component.output_types(value=int)
def run(self, value: int):
return {"value": value}
MockComponent()
@pytest.mark.unit
def test_add_component_store_aware_component_receives_wrong_docstore_type():
store_1 = MockStore()
store_2 = MockStore()
class MockStore2:
def count_documents(self) -> int:
return 0
def filter_documents(self, filters: Optional[Dict[str, Any]] = None) -> List[Document]:
return []
def write_documents(self, documents: List[Document], policy: DuplicatePolicy = DuplicatePolicy.FAIL) -> None:
return None
def delete_documents(self, document_ids: List[str]) -> None:
return None
@component
class MockComponent(DocumentStoreAwareMixin):
supported_document_stores = [MockStore2]
@component.output_types(value=int)
def run(self, value: int):
return {"value": value}
mock = MockComponent()
pipe = Pipeline()
pipe.add_document_store(name="first_store", document_store=store_1)
pipe.add_document_store(name="second_store", document_store=store_2)
with pytest.raises(ValueError, match="is not compatible with this component"):
pipe.add_component("component", mock, document_store="second_store")