haystack/e2e/pipelines/test_dense_doc_search.py
2025-04-04 10:50:05 +02:00

80 lines
4.1 KiB
Python

# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
#
# SPDX-License-Identifier: Apache-2.0
import json
from haystack import Pipeline
from haystack.components.converters import PyPDFToDocument, TextFileToDocument
from haystack.components.embedders import SentenceTransformersDocumentEmbedder, SentenceTransformersTextEmbedder
from haystack.components.joiners import DocumentJoiner
from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.routers import FileTypeRouter
from haystack.components.writers import DocumentWriter
from haystack.document_stores.in_memory import InMemoryDocumentStore
def test_dense_doc_search_pipeline(tmp_path, samples_path):
# Create the indexing pipeline
indexing_pipeline = Pipeline()
indexing_pipeline.add_component(
instance=FileTypeRouter(mime_types=["text/plain", "application/pdf"]), name="file_type_router"
)
indexing_pipeline.add_component(instance=TextFileToDocument(), name="text_file_converter")
indexing_pipeline.add_component(instance=PyPDFToDocument(), name="pdf_file_converter")
indexing_pipeline.add_component(instance=DocumentJoiner(), name="joiner")
indexing_pipeline.add_component(instance=DocumentCleaner(), name="cleaner")
indexing_pipeline.add_component(
instance=DocumentSplitter(split_by="period", split_length=250, split_overlap=30), name="splitter"
)
indexing_pipeline.add_component(
instance=SentenceTransformersDocumentEmbedder(model="sentence-transformers/all-MiniLM-L6-v2"), name="embedder"
)
indexing_pipeline.add_component(instance=DocumentWriter(document_store=InMemoryDocumentStore()), name="writer")
indexing_pipeline.connect("file_type_router.text/plain", "text_file_converter.sources")
indexing_pipeline.connect("file_type_router.application/pdf", "pdf_file_converter.sources")
indexing_pipeline.connect("text_file_converter.documents", "joiner.documents")
indexing_pipeline.connect("pdf_file_converter.documents", "joiner.documents")
indexing_pipeline.connect("joiner.documents", "cleaner.documents")
indexing_pipeline.connect("cleaner.documents", "splitter.documents")
indexing_pipeline.connect("splitter.documents", "embedder.documents")
indexing_pipeline.connect("embedder.documents", "writer.documents")
# Serialize the indexing pipeline to YAML.
with open(tmp_path / "test_dense_doc_search_indexing_pipeline.yaml", "w") as f:
indexing_pipeline.dump(f)
# Load the indexing pipeline back
with open(tmp_path / "test_dense_doc_search_indexing_pipeline.yaml", "r") as f:
indexing_pipeline = Pipeline.load(f)
indexing_result = indexing_pipeline.run({"file_type_router": {"sources": list(samples_path.iterdir())}})
filled_document_store = indexing_pipeline.get_component("writer").document_store
assert indexing_result["writer"]["documents_written"] == 2
assert filled_document_store.count_documents() == 2
# Create the querying pipeline
query_pipeline = Pipeline()
query_pipeline.add_component(
instance=SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2"), name="text_embedder"
)
query_pipeline.add_component(
instance=InMemoryEmbeddingRetriever(document_store=filled_document_store, top_k=20), name="embedding_retriever"
)
query_pipeline.connect("text_embedder", "embedding_retriever")
querying_result = query_pipeline.run({"text_embedder": {"text": "Who lives in Rome?"}})
assert querying_result["embedding_retriever"]["documents"][0].content == "My name is Giorgio and I live in Rome."
# Serialize the querying pipeline to JSON
with open(tmp_path / "test_dense_doc_search_query_pipeline.json", "w") as f:
print(json.dumps(query_pipeline.to_dict(), indent=4))
json.dump(query_pipeline.to_dict(), f)
# Load the querying pipeline back
with open(tmp_path / "test_dense_doc_search_query_pipeline.json", "r") as f:
query_pipeline = Pipeline.from_dict(json.load(f))