Classify pipeline's type based on its components (#3132)

* Add pipeline get_type mehod

* Add pipeline uptime

* Add pipeline telemetry event sending

* Send pipeline telemetry once a day (at most)

* Add pipeline invocation counter, change invocation counter logic

* Update allowed telemetry parameters - allow pipeline parameters

* PR review: add unit test
This commit is contained in:
Vladimir Blagojevic 2022-09-21 14:53:42 +02:00 committed by GitHub
parent 89247b804c
commit 938e6fda5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 268 additions and 22 deletions

View File

@ -317,6 +317,7 @@ Set the component for a node in the Pipeline.
#### Pipeline.run
```python
@pipeline_invocation_counter
def run(query: Optional[str] = None,
file_paths: Optional[List[str]] = None,
labels: Optional[MultiLabel] = None,
@ -348,6 +349,7 @@ the Nodes received and the output they generated. You can then find all debug in
#### Pipeline.run\_batch
```python
@pipeline_invocation_counter
def run_batch(queries: List[str] = None,
file_paths: Optional[List[str]] = None,
labels: Optional[Union[MultiLabel, List[MultiLabel]]] = None,
@ -997,6 +999,26 @@ You can select between:
:param wrong_examples_fields: A list of fields to include in the worst samples.
:param max_characters_per_field: The maximum number of characters to include in the worst samples report (per field).
<a id="base.Pipeline.get_type"></a>
#### Pipeline.get\_type
```python
def get_type() -> str
```
Returns the type of the pipeline.
<a id="base.Pipeline.uptime"></a>
#### Pipeline.uptime
```python
def uptime() -> timedelta
```
Returns the uptime of the pipeline in timedelta.
<a id="base._HaystackBeirRetrieverAdapter"></a>
## \_HaystackBeirRetrieverAdapter
@ -1403,6 +1425,20 @@ Return the document store object used in the current pipeline.
Instance of DocumentStore or None
<a id="standard_pipelines.BaseStandardPipeline.get_type"></a>
#### BaseStandardPipeline.get\_type
```python
def get_type() -> str
```
Return the type of the pipeline.
**Returns**:
Type of the pipeline
<a id="standard_pipelines.BaseStandardPipeline.eval"></a>
#### BaseStandardPipeline.eval

View File

@ -1,9 +1,14 @@
# pylint: disable=too-many-public-methods
from __future__ import annotations
import datetime
from datetime import timedelta
from functools import partial
from hashlib import sha1
from typing import Dict, List, Optional, Any, Set, Tuple, Union
try:
from typing import Literal
except ImportError:
@ -40,12 +45,13 @@ from haystack.pipelines.config import (
)
from haystack.pipelines.utils import generate_code, print_eval_report
from haystack.utils import DeepsetCloud, calculate_context_similarity
from haystack.utils.reflection import pipeline_invocation_counter
from haystack.schema import Answer, EvaluationResult, MultiLabel, Document, Span
from haystack.errors import HaystackError, PipelineError, PipelineConfigError
from haystack.nodes.base import BaseComponent, RootNode
from haystack.nodes.retriever.base import BaseRetriever
from haystack.document_stores.base import BaseDocumentStore
from haystack.telemetry import send_event
from haystack.telemetry import send_event, send_custom_event
from haystack.utils.experiment_tracking import MLflowTrackingHead, Tracker as tracker
@ -66,6 +72,9 @@ class Pipeline:
def __init__(self):
self.graph = DiGraph()
self.init_time = datetime.datetime.now(datetime.timezone.utc)
self.last_telemetry_update = datetime.datetime.now(datetime.timezone.utc)
self.telemetry_update_interval = datetime.timedelta(hours=24)
@property
def root_node(self) -> Optional[str]:
@ -437,6 +446,7 @@ class Pipeline:
def _run_node(self, node_id: str, node_input: Dict[str, Any]) -> Tuple[Dict, str]:
return self.graph.nodes[node_id]["component"]._dispatch_run(**node_input)
@pipeline_invocation_counter
def run( # type: ignore
self,
query: Optional[str] = None,
@ -557,8 +567,11 @@ class Pipeline:
i = 0
else:
i += 1 # attempt executing next node in the queue as current `node_id` has unprocessed predecessors
if self.should_send_telemetry():
self.send_telemetry()
return node_output
@pipeline_invocation_counter
def run_batch( # type: ignore
self,
queries: List[str] = None,
@ -706,7 +719,8 @@ class Pipeline:
i = 0
else:
i += 1 # attempt executing next node in the queue as current `node_id` has unprocessed predecessors
if self.should_send_telemetry():
self.send_telemetry()
return node_output
@classmethod
@ -2159,6 +2173,58 @@ class Pipeline:
max_characters_per_field=max_characters_per_field,
)
def get_type(self) -> str:
"""
Returns the type of the pipeline.
"""
# values of the dict are functions evaluating whether components of this pipeline match the pipeline type
# specified by dict keys
pipeline_types = {
"GenerativeQAPipeline": lambda x: {"Generator", "Retriever"} <= set(x.keys()),
"FAQPipeline": lambda x: {"Docs2Answers"} <= set(x.keys()),
"ExtractiveQAPipeline": lambda x: {"Reader", "Retriever"} <= set(x.keys()),
"SearchSummarizationPipeline": lambda x: {"Retriever", "Summarizer"} <= set(x.keys()),
"TranslationWrapperPipeline": lambda x: {"InputTranslator", "OutputTranslator"} <= set(x.keys()),
"RetrieverQuestionGenerationPipeline": lambda x: {"Retriever", "QuestionGenerator"} <= set(x.keys()),
"QuestionAnswerGenerationPipeline": lambda x: {"QuestionGenerator", "Reader"} <= set(x.keys()),
"DocumentSearchPipeline": lambda x: {"Retriever"} <= set(x.keys()),
"QuestionGenerationPipeline": lambda x: {"QuestionGenerator"} <= set(x.keys()),
"MostSimilarDocumentsPipeline": lambda x: len(x.values()) == 1
and isinstance(list(x.values())[0], BaseDocumentStore),
}
retrievers = [type(comp).__name__ for comp in self.components.values() if isinstance(comp, BaseRetriever)]
doc_stores = [type(comp).__name__ for comp in self.components.values() if isinstance(comp, BaseDocumentStore)]
pipeline_type = next(
(p_type for p_type, eval_f in pipeline_types.items() if eval_f(self.components)), "Unknown pipeline"
)
retrievers_used = retrievers if retrievers else "None"
doc_stores_used = doc_stores if doc_stores else "None"
return f"{pipeline_type} (retriever: {retrievers_used}, doc_store: {doc_stores_used})"
def uptime(self) -> timedelta:
"""
Returns the uptime of the pipeline in timedelta.
"""
return datetime.datetime.now(datetime.timezone.utc) - self.init_time
def send_telemetry(self):
fingerprint = sha1(json.dumps(self.get_config(), sort_keys=True).encode()).hexdigest()
send_custom_event(
"pipeline",
payload={
"fingerprint": fingerprint,
"type": self.get_type(),
"uptime": int(self.uptime().total_seconds()),
"run_total": self.run.counter + self.run_batch.counter,
},
)
self.last_telemetry_update = datetime.datetime.now(datetime.timezone.utc)
def should_send_telemetry(self):
now = datetime.datetime.now(datetime.timezone.utc)
return now - self.last_telemetry_update > self.telemetry_update_interval
class _HaystackBeirRetrieverAdapter:
def __init__(self, index_pipeline: Pipeline, query_pipeline: Pipeline, index_params: dict, query_params: dict):

View File

@ -158,6 +158,14 @@ class BaseStandardPipeline(ABC):
"""
return self.pipeline.get_document_store()
def get_type(self) -> str:
"""
Return the type of the pipeline.
:return: Type of the pipeline
"""
return self.pipeline.get_type()
def eval(
self,
labels: List[MultiLabel],
@ -646,7 +654,7 @@ class RetrieverQuestionGenerationPipeline(BaseStandardPipeline):
def __init__(self, retriever: BaseRetriever, question_generator: QuestionGenerator):
self.pipeline = Pipeline()
self.pipeline.add_node(component=retriever, name="Retriever", inputs=["Query"])
self.pipeline.add_node(component=question_generator, name="Question Generator", inputs=["Retriever"])
self.pipeline.add_node(component=question_generator, name="QuestionGenerator", inputs=["Retriever"])
def run(self, query: str, params: Optional[dict] = None, debug: Optional[bool] = None):
output = self.pipeline.run(query=query, params=params, debug=debug)

View File

@ -283,7 +283,15 @@ def _delete_telemetry_file(file_type_to_delete: TelemetryFileType):
class NonPrivateParameters:
param_names: List[str] = ["top_k", "model_name_or_path", "add_isolated_node_eval"]
param_names: List[str] = [
"top_k",
"model_name_or_path",
"add_isolated_node_eval",
"fingerprint",
"type",
"uptime",
"run_total",
]
@classmethod
def apply_filter(cls, param_dicts: Dict[str, Any]) -> Dict[str, Any]:

View File

@ -1,4 +1,5 @@
import inspect
import functools
from typing import Any, Dict, Tuple, Callable
@ -10,3 +11,25 @@ def args_to_kwargs(args: Tuple, func: Callable) -> Dict[str, Any]:
arg_names = arg_names[1 : 1 + len(args)]
args_as_kwargs = {arg_name: arg for arg, arg_name in zip(args, arg_names)}
return args_as_kwargs
def pipeline_invocation_counter(func):
@functools.wraps(func)
def wrapper_invocation_counter(*args, **kwargs):
# single query
this_invocation_count = 1
# were named arguments used?
if "queries" in kwargs:
this_invocation_count = len(kwargs["queries"]) if kwargs["queries"] else 1
elif "documents" in kwargs:
this_invocation_count = len(kwargs["documents"]) if kwargs["documents"] else 1
else:
# positional arguments used? try to infer count from the first parameter in args
if args[0] and isinstance(args[0], list):
this_invocation_count = len(args[0])
wrapper_invocation_counter.counter += this_invocation_count
return func(*args, **kwargs)
wrapper_invocation_counter.counter = 0
return wrapper_invocation_counter

View File

@ -22,6 +22,7 @@ import psutil
import pytest
import requests
from haystack import Answer
from haystack.nodes.base import BaseComponent
try:
@ -49,7 +50,14 @@ except (ImportError, ModuleNotFoundError) as ie:
from haystack.document_stores import BaseDocumentStore, DeepsetCloudDocumentStore, InMemoryDocumentStore
from haystack.nodes import BaseReader, BaseRetriever, OpenAIAnswerGenerator
from haystack.nodes import (
BaseReader,
BaseRetriever,
OpenAIAnswerGenerator,
BaseGenerator,
BaseSummarizer,
BaseTranslator,
)
from haystack.nodes.answer_generator.transformers import Seq2SeqGenerator
from haystack.nodes.answer_generator.transformers import RAGenerator
from haystack.nodes.ranker import SentenceTransformersRanker
@ -263,6 +271,43 @@ class MockRetriever(BaseRetriever):
pass
class MockSeq2SegGenerator(BaseGenerator):
def predict(self, query: str, documents: List[Document], top_k: Optional[int]) -> Dict:
pass
class MockSummarizer(BaseSummarizer):
def predict_batch(
self,
documents: Union[List[Document], List[List[Document]]],
generate_single_summary: Optional[bool] = None,
batch_size: Optional[int] = None,
) -> Union[List[Document], List[List[Document]]]:
pass
def predict(self, documents: List[Document], generate_single_summary: Optional[bool] = None) -> List[Document]:
pass
class MockTranslator(BaseTranslator):
def translate(
self,
results: List[Dict[str, Any]] = None,
query: Optional[str] = None,
documents: Optional[Union[List[Document], List[Answer], List[str], List[Dict[str, Any]]]] = None,
dict_key: Optional[str] = None,
) -> Union[str, List[Document], List[Answer], List[str], List[Dict[str, Any]]]:
pass
def translate_batch(
self,
queries: Optional[List[str]] = None,
documents: Optional[Union[List[Document], List[Answer], List[List[Document]], List[List[Answer]]]] = None,
batch_size: Optional[int] = None,
) -> List[Union[str, List[Document], List[Answer], List[str], List[Dict[str, Any]]]]:
pass
class MockDenseRetriever(MockRetriever):
def __init__(self, document_store: BaseDocumentStore, embedding_dim: int = 768):
self.embedding_dim = embedding_dim
@ -275,6 +320,14 @@ class MockDenseRetriever(MockRetriever):
return [np.random.rand(self.embedding_dim)] * len(docs)
class MockQuestionGenerator(QuestionGenerator):
def __init__(self):
pass
def predict(self, query: str, documents: List[Document], top_k: Optional[int]) -> Dict:
pass
class MockReader(BaseReader):
outgoing_edges = 1

View File

@ -1094,7 +1094,7 @@ def test_question_generation_eval(retriever_with_docs, question_generator):
metrics = eval_result.calculate_metrics(document_scope="document_id")
assert "Retriever" in eval_result
assert "Question Generator" in eval_result
assert "QuestionGenerator" in eval_result
assert len(eval_result) == 2
assert metrics["Retriever"]["mrr"] == 0.5
@ -1104,12 +1104,12 @@ def test_question_generation_eval(retriever_with_docs, question_generator):
assert metrics["Retriever"]["precision"] == 0.1
assert metrics["Retriever"]["ndcg"] == 0.5
assert metrics["Question Generator"]["mrr"] == 0.5
assert metrics["Question Generator"]["map"] == 0.5
assert metrics["Question Generator"]["recall_multi_hit"] == 0.5
assert metrics["Question Generator"]["recall_single_hit"] == 0.5
assert metrics["Question Generator"]["precision"] == 0.1
assert metrics["Question Generator"]["ndcg"] == 0.5
assert metrics["QuestionGenerator"]["mrr"] == 0.5
assert metrics["QuestionGenerator"]["map"] == 0.5
assert metrics["QuestionGenerator"]["recall_multi_hit"] == 0.5
assert metrics["QuestionGenerator"]["recall_single_hit"] == 0.5
assert metrics["QuestionGenerator"]["precision"] == 0.1
assert metrics["QuestionGenerator"]["ndcg"] == 0.5
@pytest.mark.parametrize("document_store_with_docs", ["elasticsearch"], indirect=True)

View File

@ -836,7 +836,7 @@ def test_question_generation_eval(retriever_with_docs, question_generator):
metrics = eval_result.calculate_metrics(document_scope="document_id")
assert "Retriever" in eval_result
assert "Question Generator" in eval_result
assert "QuestionGenerator" in eval_result
assert len(eval_result) == 2
assert metrics["Retriever"]["mrr"] == 0.5
@ -846,12 +846,12 @@ def test_question_generation_eval(retriever_with_docs, question_generator):
assert metrics["Retriever"]["precision"] == 0.1
assert metrics["Retriever"]["ndcg"] == 0.5
assert metrics["Question Generator"]["mrr"] == 0.5
assert metrics["Question Generator"]["map"] == 0.5
assert metrics["Question Generator"]["recall_multi_hit"] == 0.5
assert metrics["Question Generator"]["recall_single_hit"] == 0.5
assert metrics["Question Generator"]["precision"] == 0.1
assert metrics["Question Generator"]["ndcg"] == 0.5
assert metrics["QuestionGenerator"]["mrr"] == 0.5
assert metrics["QuestionGenerator"]["map"] == 0.5
assert metrics["QuestionGenerator"]["recall_multi_hit"] == 0.5
assert metrics["QuestionGenerator"]["recall_single_hit"] == 0.5
assert metrics["QuestionGenerator"]["precision"] == 0.1
assert metrics["QuestionGenerator"]["ndcg"] == 0.5
# Commented out because of the following issue https://github.com/deepset-ai/haystack/issues/2962

View File

@ -6,7 +6,6 @@ import json
import platform
import sys
from typing import Tuple
from pyparsing import original_text_for
import pytest
from requests import PreparedRequest
@ -22,11 +21,24 @@ from haystack.document_stores.elasticsearch import ElasticsearchDocumentStore
from haystack.nodes.other.join_docs import JoinDocuments
from haystack.nodes.base import BaseComponent
from haystack.nodes.retriever.sparse import BM25Retriever
from haystack.pipelines import Pipeline, RootNode
from haystack.pipelines import (
Pipeline,
RootNode,
GenerativeQAPipeline,
FAQPipeline,
ExtractiveQAPipeline,
SearchSummarizationPipeline,
TranslationWrapperPipeline,
RetrieverQuestionGenerationPipeline,
QuestionAnswerGenerationPipeline,
DocumentSearchPipeline,
QuestionGenerationPipeline,
MostSimilarDocumentsPipeline,
)
from haystack.pipelines.config import validate_config_strings, get_component_definitions
from haystack.pipelines.utils import generate_code
from haystack.errors import PipelineConfigError
from haystack.nodes import PreProcessor, TextConverter
from haystack.nodes import PreProcessor, TextConverter, QuestionGenerator
from haystack.utils.deepsetcloud import DeepsetCloudError
from haystack import Document, Answer
from haystack.nodes.other.route_documents import RouteDocuments
@ -39,9 +51,14 @@ from ..conftest import (
DC_TEST_INDEX,
SAMPLES_PATH,
MockDocumentStore,
MockSeq2SegGenerator,
MockRetriever,
MockNode,
deepset_cloud_fixture,
MockReader,
MockSummarizer,
MockTranslator,
MockQuestionGenerator,
)
logger = logging.getLogger(__name__)
@ -765,6 +782,41 @@ def test_validate_pipeline_config_recursive_config(reduce_windows_recursion_limi
validate_config_strings(pipeline_config)
def test_pipeline_classify_type():
pipe = GenerativeQAPipeline(generator=MockSeq2SegGenerator(), retriever=MockRetriever())
assert pipe.get_type().startswith("GenerativeQAPipeline")
pipe = FAQPipeline(retriever=MockRetriever())
assert pipe.get_type().startswith("FAQPipeline")
pipe = ExtractiveQAPipeline(reader=MockReader(), retriever=MockRetriever())
assert pipe.get_type().startswith("ExtractiveQAPipeline")
search_pipe = SearchSummarizationPipeline(summarizer=MockSummarizer(), retriever=MockRetriever())
assert search_pipe.get_type().startswith("SearchSummarizationPipeline")
pipe = RetrieverQuestionGenerationPipeline(retriever=MockRetriever(), question_generator=MockQuestionGenerator())
assert pipe.get_type().startswith("RetrieverQuestionGenerationPipeline")
qag_pipe = QuestionAnswerGenerationPipeline(question_generator=MockQuestionGenerator(), reader=MockReader())
assert qag_pipe.get_type().startswith("QuestionAnswerGenerationPipeline")
pipe = DocumentSearchPipeline(retriever=MockRetriever())
assert pipe.get_type().startswith("DocumentSearchPipeline")
pipe = QuestionGenerationPipeline(question_generator=MockQuestionGenerator())
assert pipe.get_type().startswith("QuestionGenerationPipeline")
pipe = TranslationWrapperPipeline(
input_translator=MockTranslator(), output_translator=MockTranslator(), pipeline=qag_pipe
)
pipe.get_type().startswith("TranslationWrapperPipeline")
# pipe = MostSimilarDocumentsPipeline(document_store=MockDocumentStore())
# assert pipe.get_type().startswith("MostSimilarDocumentsPipeline")
@pytest.mark.usefixtures(deepset_cloud_fixture.__name__)
@responses.activate
def test_load_from_deepset_cloud_query():