diff --git a/docs/_src/api/api/pipelines.md b/docs/_src/api/api/pipelines.md index 7fe239cf9..70be12c9b 100644 --- a/docs/_src/api/api/pipelines.md +++ b/docs/_src/api/api/pipelines.md @@ -317,6 +317,7 @@ Set the component for a node in the Pipeline. #### Pipeline.run ```python +@pipeline_invocation_counter def run(query: Optional[str] = None, file_paths: Optional[List[str]] = None, labels: Optional[MultiLabel] = None, @@ -348,6 +349,7 @@ the Nodes received and the output they generated. You can then find all debug in #### Pipeline.run\_batch ```python +@pipeline_invocation_counter def run_batch(queries: List[str] = None, file_paths: Optional[List[str]] = None, labels: Optional[Union[MultiLabel, List[MultiLabel]]] = None, @@ -997,6 +999,26 @@ You can select between: :param wrong_examples_fields: A list of fields to include in the worst samples. :param max_characters_per_field: The maximum number of characters to include in the worst samples report (per field). + + +#### Pipeline.get\_type + +```python +def get_type() -> str +``` + +Returns the type of the pipeline. + + + +#### Pipeline.uptime + +```python +def uptime() -> timedelta +``` + +Returns the uptime of the pipeline in timedelta. + ## \_HaystackBeirRetrieverAdapter @@ -1403,6 +1425,20 @@ Return the document store object used in the current pipeline. Instance of DocumentStore or None + + +#### BaseStandardPipeline.get\_type + +```python +def get_type() -> str +``` + +Return the type of the pipeline. + +**Returns**: + +Type of the pipeline + #### BaseStandardPipeline.eval diff --git a/haystack/pipelines/base.py b/haystack/pipelines/base.py index 651a840cc..8453b16bc 100644 --- a/haystack/pipelines/base.py +++ b/haystack/pipelines/base.py @@ -1,9 +1,14 @@ # pylint: disable=too-many-public-methods from __future__ import annotations + +import datetime +from datetime import timedelta from functools import partial +from hashlib import sha1 from typing import Dict, List, Optional, Any, Set, Tuple, Union + try: from typing import Literal except ImportError: @@ -40,12 +45,13 @@ from haystack.pipelines.config import ( ) from haystack.pipelines.utils import generate_code, print_eval_report from haystack.utils import DeepsetCloud, calculate_context_similarity +from haystack.utils.reflection import pipeline_invocation_counter from haystack.schema import Answer, EvaluationResult, MultiLabel, Document, Span from haystack.errors import HaystackError, PipelineError, PipelineConfigError from haystack.nodes.base import BaseComponent, RootNode from haystack.nodes.retriever.base import BaseRetriever from haystack.document_stores.base import BaseDocumentStore -from haystack.telemetry import send_event +from haystack.telemetry import send_event, send_custom_event from haystack.utils.experiment_tracking import MLflowTrackingHead, Tracker as tracker @@ -66,6 +72,9 @@ class Pipeline: def __init__(self): self.graph = DiGraph() + self.init_time = datetime.datetime.now(datetime.timezone.utc) + self.last_telemetry_update = datetime.datetime.now(datetime.timezone.utc) + self.telemetry_update_interval = datetime.timedelta(hours=24) @property def root_node(self) -> Optional[str]: @@ -437,6 +446,7 @@ class Pipeline: def _run_node(self, node_id: str, node_input: Dict[str, Any]) -> Tuple[Dict, str]: return self.graph.nodes[node_id]["component"]._dispatch_run(**node_input) + @pipeline_invocation_counter def run( # type: ignore self, query: Optional[str] = None, @@ -557,8 +567,11 @@ class Pipeline: i = 0 else: i += 1 # attempt executing next node in the queue as current `node_id` has unprocessed predecessors + if self.should_send_telemetry(): + self.send_telemetry() return node_output + @pipeline_invocation_counter def run_batch( # type: ignore self, queries: List[str] = None, @@ -706,7 +719,8 @@ class Pipeline: i = 0 else: i += 1 # attempt executing next node in the queue as current `node_id` has unprocessed predecessors - + if self.should_send_telemetry(): + self.send_telemetry() return node_output @classmethod @@ -2159,6 +2173,58 @@ class Pipeline: max_characters_per_field=max_characters_per_field, ) + def get_type(self) -> str: + """ + Returns the type of the pipeline. + """ + # values of the dict are functions evaluating whether components of this pipeline match the pipeline type + # specified by dict keys + pipeline_types = { + "GenerativeQAPipeline": lambda x: {"Generator", "Retriever"} <= set(x.keys()), + "FAQPipeline": lambda x: {"Docs2Answers"} <= set(x.keys()), + "ExtractiveQAPipeline": lambda x: {"Reader", "Retriever"} <= set(x.keys()), + "SearchSummarizationPipeline": lambda x: {"Retriever", "Summarizer"} <= set(x.keys()), + "TranslationWrapperPipeline": lambda x: {"InputTranslator", "OutputTranslator"} <= set(x.keys()), + "RetrieverQuestionGenerationPipeline": lambda x: {"Retriever", "QuestionGenerator"} <= set(x.keys()), + "QuestionAnswerGenerationPipeline": lambda x: {"QuestionGenerator", "Reader"} <= set(x.keys()), + "DocumentSearchPipeline": lambda x: {"Retriever"} <= set(x.keys()), + "QuestionGenerationPipeline": lambda x: {"QuestionGenerator"} <= set(x.keys()), + "MostSimilarDocumentsPipeline": lambda x: len(x.values()) == 1 + and isinstance(list(x.values())[0], BaseDocumentStore), + } + retrievers = [type(comp).__name__ for comp in self.components.values() if isinstance(comp, BaseRetriever)] + doc_stores = [type(comp).__name__ for comp in self.components.values() if isinstance(comp, BaseDocumentStore)] + + pipeline_type = next( + (p_type for p_type, eval_f in pipeline_types.items() if eval_f(self.components)), "Unknown pipeline" + ) + retrievers_used = retrievers if retrievers else "None" + doc_stores_used = doc_stores if doc_stores else "None" + return f"{pipeline_type} (retriever: {retrievers_used}, doc_store: {doc_stores_used})" + + def uptime(self) -> timedelta: + """ + Returns the uptime of the pipeline in timedelta. + """ + return datetime.datetime.now(datetime.timezone.utc) - self.init_time + + def send_telemetry(self): + fingerprint = sha1(json.dumps(self.get_config(), sort_keys=True).encode()).hexdigest() + send_custom_event( + "pipeline", + payload={ + "fingerprint": fingerprint, + "type": self.get_type(), + "uptime": int(self.uptime().total_seconds()), + "run_total": self.run.counter + self.run_batch.counter, + }, + ) + self.last_telemetry_update = datetime.datetime.now(datetime.timezone.utc) + + def should_send_telemetry(self): + now = datetime.datetime.now(datetime.timezone.utc) + return now - self.last_telemetry_update > self.telemetry_update_interval + class _HaystackBeirRetrieverAdapter: def __init__(self, index_pipeline: Pipeline, query_pipeline: Pipeline, index_params: dict, query_params: dict): diff --git a/haystack/pipelines/standard_pipelines.py b/haystack/pipelines/standard_pipelines.py index e96181e69..81d7a5453 100644 --- a/haystack/pipelines/standard_pipelines.py +++ b/haystack/pipelines/standard_pipelines.py @@ -158,6 +158,14 @@ class BaseStandardPipeline(ABC): """ return self.pipeline.get_document_store() + def get_type(self) -> str: + """ + Return the type of the pipeline. + + :return: Type of the pipeline + """ + return self.pipeline.get_type() + def eval( self, labels: List[MultiLabel], @@ -646,7 +654,7 @@ class RetrieverQuestionGenerationPipeline(BaseStandardPipeline): def __init__(self, retriever: BaseRetriever, question_generator: QuestionGenerator): self.pipeline = Pipeline() self.pipeline.add_node(component=retriever, name="Retriever", inputs=["Query"]) - self.pipeline.add_node(component=question_generator, name="Question Generator", inputs=["Retriever"]) + self.pipeline.add_node(component=question_generator, name="QuestionGenerator", inputs=["Retriever"]) def run(self, query: str, params: Optional[dict] = None, debug: Optional[bool] = None): output = self.pipeline.run(query=query, params=params, debug=debug) diff --git a/haystack/telemetry.py b/haystack/telemetry.py index 27173ae0e..01d2e82d7 100644 --- a/haystack/telemetry.py +++ b/haystack/telemetry.py @@ -283,7 +283,15 @@ def _delete_telemetry_file(file_type_to_delete: TelemetryFileType): class NonPrivateParameters: - param_names: List[str] = ["top_k", "model_name_or_path", "add_isolated_node_eval"] + param_names: List[str] = [ + "top_k", + "model_name_or_path", + "add_isolated_node_eval", + "fingerprint", + "type", + "uptime", + "run_total", + ] @classmethod def apply_filter(cls, param_dicts: Dict[str, Any]) -> Dict[str, Any]: diff --git a/haystack/utils/reflection.py b/haystack/utils/reflection.py index f012a0f21..991f6a0a2 100644 --- a/haystack/utils/reflection.py +++ b/haystack/utils/reflection.py @@ -1,4 +1,5 @@ import inspect +import functools from typing import Any, Dict, Tuple, Callable @@ -10,3 +11,25 @@ def args_to_kwargs(args: Tuple, func: Callable) -> Dict[str, Any]: arg_names = arg_names[1 : 1 + len(args)] args_as_kwargs = {arg_name: arg for arg, arg_name in zip(args, arg_names)} return args_as_kwargs + + +def pipeline_invocation_counter(func): + @functools.wraps(func) + def wrapper_invocation_counter(*args, **kwargs): + # single query + this_invocation_count = 1 + # were named arguments used? + if "queries" in kwargs: + this_invocation_count = len(kwargs["queries"]) if kwargs["queries"] else 1 + elif "documents" in kwargs: + this_invocation_count = len(kwargs["documents"]) if kwargs["documents"] else 1 + else: + # positional arguments used? try to infer count from the first parameter in args + if args[0] and isinstance(args[0], list): + this_invocation_count = len(args[0]) + + wrapper_invocation_counter.counter += this_invocation_count + return func(*args, **kwargs) + + wrapper_invocation_counter.counter = 0 + return wrapper_invocation_counter diff --git a/test/conftest.py b/test/conftest.py index 5648fc261..4180cfccb 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -22,6 +22,7 @@ import psutil import pytest import requests +from haystack import Answer from haystack.nodes.base import BaseComponent try: @@ -49,7 +50,14 @@ except (ImportError, ModuleNotFoundError) as ie: from haystack.document_stores import BaseDocumentStore, DeepsetCloudDocumentStore, InMemoryDocumentStore -from haystack.nodes import BaseReader, BaseRetriever, OpenAIAnswerGenerator +from haystack.nodes import ( + BaseReader, + BaseRetriever, + OpenAIAnswerGenerator, + BaseGenerator, + BaseSummarizer, + BaseTranslator, +) from haystack.nodes.answer_generator.transformers import Seq2SeqGenerator from haystack.nodes.answer_generator.transformers import RAGenerator from haystack.nodes.ranker import SentenceTransformersRanker @@ -263,6 +271,43 @@ class MockRetriever(BaseRetriever): pass +class MockSeq2SegGenerator(BaseGenerator): + def predict(self, query: str, documents: List[Document], top_k: Optional[int]) -> Dict: + pass + + +class MockSummarizer(BaseSummarizer): + def predict_batch( + self, + documents: Union[List[Document], List[List[Document]]], + generate_single_summary: Optional[bool] = None, + batch_size: Optional[int] = None, + ) -> Union[List[Document], List[List[Document]]]: + pass + + def predict(self, documents: List[Document], generate_single_summary: Optional[bool] = None) -> List[Document]: + pass + + +class MockTranslator(BaseTranslator): + def translate( + self, + results: List[Dict[str, Any]] = None, + query: Optional[str] = None, + documents: Optional[Union[List[Document], List[Answer], List[str], List[Dict[str, Any]]]] = None, + dict_key: Optional[str] = None, + ) -> Union[str, List[Document], List[Answer], List[str], List[Dict[str, Any]]]: + pass + + def translate_batch( + self, + queries: Optional[List[str]] = None, + documents: Optional[Union[List[Document], List[Answer], List[List[Document]], List[List[Answer]]]] = None, + batch_size: Optional[int] = None, + ) -> List[Union[str, List[Document], List[Answer], List[str], List[Dict[str, Any]]]]: + pass + + class MockDenseRetriever(MockRetriever): def __init__(self, document_store: BaseDocumentStore, embedding_dim: int = 768): self.embedding_dim = embedding_dim @@ -275,6 +320,14 @@ class MockDenseRetriever(MockRetriever): return [np.random.rand(self.embedding_dim)] * len(docs) +class MockQuestionGenerator(QuestionGenerator): + def __init__(self): + pass + + def predict(self, query: str, documents: List[Document], top_k: Optional[int]) -> Dict: + pass + + class MockReader(BaseReader): outgoing_edges = 1 diff --git a/test/pipelines/test_eval.py b/test/pipelines/test_eval.py index 4df42e74d..a6ae067d7 100644 --- a/test/pipelines/test_eval.py +++ b/test/pipelines/test_eval.py @@ -1094,7 +1094,7 @@ def test_question_generation_eval(retriever_with_docs, question_generator): metrics = eval_result.calculate_metrics(document_scope="document_id") assert "Retriever" in eval_result - assert "Question Generator" in eval_result + assert "QuestionGenerator" in eval_result assert len(eval_result) == 2 assert metrics["Retriever"]["mrr"] == 0.5 @@ -1104,12 +1104,12 @@ def test_question_generation_eval(retriever_with_docs, question_generator): assert metrics["Retriever"]["precision"] == 0.1 assert metrics["Retriever"]["ndcg"] == 0.5 - assert metrics["Question Generator"]["mrr"] == 0.5 - assert metrics["Question Generator"]["map"] == 0.5 - assert metrics["Question Generator"]["recall_multi_hit"] == 0.5 - assert metrics["Question Generator"]["recall_single_hit"] == 0.5 - assert metrics["Question Generator"]["precision"] == 0.1 - assert metrics["Question Generator"]["ndcg"] == 0.5 + assert metrics["QuestionGenerator"]["mrr"] == 0.5 + assert metrics["QuestionGenerator"]["map"] == 0.5 + assert metrics["QuestionGenerator"]["recall_multi_hit"] == 0.5 + assert metrics["QuestionGenerator"]["recall_single_hit"] == 0.5 + assert metrics["QuestionGenerator"]["precision"] == 0.1 + assert metrics["QuestionGenerator"]["ndcg"] == 0.5 @pytest.mark.parametrize("document_store_with_docs", ["elasticsearch"], indirect=True) diff --git a/test/pipelines/test_eval_batch.py b/test/pipelines/test_eval_batch.py index f5fe5ea38..a21dfff2f 100644 --- a/test/pipelines/test_eval_batch.py +++ b/test/pipelines/test_eval_batch.py @@ -836,7 +836,7 @@ def test_question_generation_eval(retriever_with_docs, question_generator): metrics = eval_result.calculate_metrics(document_scope="document_id") assert "Retriever" in eval_result - assert "Question Generator" in eval_result + assert "QuestionGenerator" in eval_result assert len(eval_result) == 2 assert metrics["Retriever"]["mrr"] == 0.5 @@ -846,12 +846,12 @@ def test_question_generation_eval(retriever_with_docs, question_generator): assert metrics["Retriever"]["precision"] == 0.1 assert metrics["Retriever"]["ndcg"] == 0.5 - assert metrics["Question Generator"]["mrr"] == 0.5 - assert metrics["Question Generator"]["map"] == 0.5 - assert metrics["Question Generator"]["recall_multi_hit"] == 0.5 - assert metrics["Question Generator"]["recall_single_hit"] == 0.5 - assert metrics["Question Generator"]["precision"] == 0.1 - assert metrics["Question Generator"]["ndcg"] == 0.5 + assert metrics["QuestionGenerator"]["mrr"] == 0.5 + assert metrics["QuestionGenerator"]["map"] == 0.5 + assert metrics["QuestionGenerator"]["recall_multi_hit"] == 0.5 + assert metrics["QuestionGenerator"]["recall_single_hit"] == 0.5 + assert metrics["QuestionGenerator"]["precision"] == 0.1 + assert metrics["QuestionGenerator"]["ndcg"] == 0.5 # Commented out because of the following issue https://github.com/deepset-ai/haystack/issues/2962 diff --git a/test/pipelines/test_pipeline.py b/test/pipelines/test_pipeline.py index c044a118c..d9027a424 100644 --- a/test/pipelines/test_pipeline.py +++ b/test/pipelines/test_pipeline.py @@ -6,7 +6,6 @@ import json import platform import sys from typing import Tuple -from pyparsing import original_text_for import pytest from requests import PreparedRequest @@ -22,11 +21,24 @@ from haystack.document_stores.elasticsearch import ElasticsearchDocumentStore from haystack.nodes.other.join_docs import JoinDocuments from haystack.nodes.base import BaseComponent from haystack.nodes.retriever.sparse import BM25Retriever -from haystack.pipelines import Pipeline, RootNode +from haystack.pipelines import ( + Pipeline, + RootNode, + GenerativeQAPipeline, + FAQPipeline, + ExtractiveQAPipeline, + SearchSummarizationPipeline, + TranslationWrapperPipeline, + RetrieverQuestionGenerationPipeline, + QuestionAnswerGenerationPipeline, + DocumentSearchPipeline, + QuestionGenerationPipeline, + MostSimilarDocumentsPipeline, +) from haystack.pipelines.config import validate_config_strings, get_component_definitions from haystack.pipelines.utils import generate_code from haystack.errors import PipelineConfigError -from haystack.nodes import PreProcessor, TextConverter +from haystack.nodes import PreProcessor, TextConverter, QuestionGenerator from haystack.utils.deepsetcloud import DeepsetCloudError from haystack import Document, Answer from haystack.nodes.other.route_documents import RouteDocuments @@ -39,9 +51,14 @@ from ..conftest import ( DC_TEST_INDEX, SAMPLES_PATH, MockDocumentStore, + MockSeq2SegGenerator, MockRetriever, MockNode, deepset_cloud_fixture, + MockReader, + MockSummarizer, + MockTranslator, + MockQuestionGenerator, ) logger = logging.getLogger(__name__) @@ -765,6 +782,41 @@ def test_validate_pipeline_config_recursive_config(reduce_windows_recursion_limi validate_config_strings(pipeline_config) +def test_pipeline_classify_type(): + + pipe = GenerativeQAPipeline(generator=MockSeq2SegGenerator(), retriever=MockRetriever()) + assert pipe.get_type().startswith("GenerativeQAPipeline") + + pipe = FAQPipeline(retriever=MockRetriever()) + assert pipe.get_type().startswith("FAQPipeline") + + pipe = ExtractiveQAPipeline(reader=MockReader(), retriever=MockRetriever()) + assert pipe.get_type().startswith("ExtractiveQAPipeline") + + search_pipe = SearchSummarizationPipeline(summarizer=MockSummarizer(), retriever=MockRetriever()) + assert search_pipe.get_type().startswith("SearchSummarizationPipeline") + + pipe = RetrieverQuestionGenerationPipeline(retriever=MockRetriever(), question_generator=MockQuestionGenerator()) + assert pipe.get_type().startswith("RetrieverQuestionGenerationPipeline") + + qag_pipe = QuestionAnswerGenerationPipeline(question_generator=MockQuestionGenerator(), reader=MockReader()) + assert qag_pipe.get_type().startswith("QuestionAnswerGenerationPipeline") + + pipe = DocumentSearchPipeline(retriever=MockRetriever()) + assert pipe.get_type().startswith("DocumentSearchPipeline") + + pipe = QuestionGenerationPipeline(question_generator=MockQuestionGenerator()) + assert pipe.get_type().startswith("QuestionGenerationPipeline") + + pipe = TranslationWrapperPipeline( + input_translator=MockTranslator(), output_translator=MockTranslator(), pipeline=qag_pipe + ) + pipe.get_type().startswith("TranslationWrapperPipeline") + + # pipe = MostSimilarDocumentsPipeline(document_store=MockDocumentStore()) + # assert pipe.get_type().startswith("MostSimilarDocumentsPipeline") + + @pytest.mark.usefixtures(deepset_cloud_fixture.__name__) @responses.activate def test_load_from_deepset_cloud_query():