feat: reduce and focus telemetry (#4087)

* simplified telemetry and docker containers detection

* pylint

* mypy

* mypy

* Add new credentials and metadata

* remove prints

* mypy

* remove comment

* simplify inout len measurement

* black

* removed old telemetry, to revert

* reintroduce env function

* reintroduce old telemetry

* fix telemetry selection

* telemetry for promptnode

* telemetry for some training methods

* telemetry for eval and distillation

* mypy & pylint

* review

* Update lg

* mypy

* improve docstrings

* pylint

* mypy

* fix test

* linting

* remove old tests

---------

Co-authored-by: agnieszka-m <amarzec13@gmail.com>
This commit is contained in:
ZanSara 2023-02-22 19:02:47 +01:00 committed by GitHub
parent 181e5474e8
commit f816efa50c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 388 additions and 182 deletions

View File

@ -25,3 +25,10 @@ from haystack.environment import set_pytorch_secure_model_loading
pd.options.display.max_colwidth = 80
set_pytorch_secure_model_loading()
import os
from haystack.telemetry_2 import send_event
send_event(event_name="Haystack imported")

View File

@ -2,16 +2,13 @@ import logging
import os
import platform
import sys
from typing import Any, Dict
from typing import Any, Dict, Optional
import torch
import transformers
from haystack import __version__
HAYSTACK_EXECUTION_CONTEXT = "HAYSTACK_EXECUTION_CONTEXT"
HAYSTACK_DOCKER_CONTAINER = "HAYSTACK_DOCKER_CONTAINER"
# Any remote API (OpenAI, Cohere etc.)
HAYSTACK_REMOTE_API_BACKOFF_SEC = "HAYSTACK_REMOTE_API_BACKOFF_SEC"
HAYSTACK_REMOTE_API_MAX_RETRIES = "HAYSTACK_REMOTE_API_MAX_RETRIES"
@ -32,10 +29,65 @@ def set_pytorch_secure_model_loading(flag_val="1"):
logger.info("TORCH_FORCE_WEIGHTS_ONLY_LOAD is already set to %s, Haystack will use the same.", os_flag_val)
def is_containerized() -> Optional[bool]:
# https://www.baeldung.com/linux/is-process-running-inside-container
# Using CPU scheduling info as I found it to be the only one usable on my machine.
path = "/proc/1/sched"
try:
if os.path.exists("/.dockerenv"):
return True
with open(path, "r") as cgroupfile:
first_line = cgroupfile.readline()
if first_line.startswith("systemd") or first_line.startswith("init"):
return False
return True
except Exception:
logger.debug("Failed to detect if Haystack is running in a container (for telemetry purposes).")
return None
def collect_static_system_specs() -> Dict[str, Any]:
"""
Collects meta data about the setup that is used with Haystack, such as:
operating system, python version, Haystack version, transformers version,
pytorch version, number of GPUs, execution environment.
"""
return {
"libraries.haystack": __version__,
"libraries.transformers": transformers.__version__ if "transformers" in sys.modules.keys() else False,
"libraries.torch": torch.__version__ if "torch" in sys.modules.keys() else False,
"libraries.cuda": torch.version.cuda if "torch" in sys.modules.keys() and torch.cuda.is_available() else False,
"os.containerized": is_containerized(),
# FIXME review these
"os.version": platform.release(),
"os.family": platform.system(),
"os.machine": platform.machine(),
"python.version": platform.python_version(), # FIXME verify
"hardware.cpus": os.cpu_count(), # FIXME verify
"hardware.gpus": torch.cuda.device_count() if torch.cuda.is_available() else 0, # probably ok
}
def collect_dynamic_system_specs() -> Dict[str, Any]:
return {
"libraries.pytest": sys.modules["pytest"].__version__ if "pytest" in sys.modules.keys() else False,
"libraries.ray": sys.modules["ray"].__version__ if "ray" in sys.modules.keys() else False,
"libraries.ipython": sys.modules["ipython"].__version__ if "ipython" in sys.modules.keys() else False,
"libraries.colab": sys.modules["pytest"].__version__ if "google.colab" in sys.modules.keys() else False,
}
#
# Old telemetry
#
def get_or_create_env_meta_data() -> Dict[str, Any]:
"""
Collects meta data about the setup that is used with Haystack, such as: operating system, python version, Haystack version, transformers version, pytorch version, number of GPUs, execution environment, and the value stored in the env variable HAYSTACK_EXECUTION_CONTEXT.
"""
from haystack.telemetry import HAYSTACK_EXECUTION_CONTEXT
global env_meta_data # pylint: disable=global-statement
if not env_meta_data:
env_meta_data = {
@ -60,6 +112,8 @@ def _get_execution_environment():
Identifies the execution environment that Haystack is running in.
Options are: colab notebook, kubernetes, CPU/GPU docker container, test environment, jupyter notebook, python script
"""
from haystack.telemetry import HAYSTACK_DOCKER_CONTAINER
if os.environ.get("CI", "False").lower() == "true":
execution_env = "ci"
elif "google.colab" in sys.modules:

View File

@ -12,6 +12,7 @@ from haystack.modeling.model.adaptive_model import AdaptiveModel
from haystack.modeling.model.biadaptive_model import BiAdaptiveModel
from haystack.modeling.model.optimization import WrappedDataParallel
from haystack.utils.experiment_tracking import Tracker as tracker
from haystack.telemetry_2 import send_event
from haystack.modeling.visual import BUSH_SEP
@ -57,6 +58,7 @@ class Evaluator:
:return: all_results: A list of dictionaries, one for each prediction head. Each dictionary contains the metrics
and reports generated during evaluation.
"""
send_event("Evaluator.eval()")
model.prediction_heads[0].use_confidence_scores_for_ranking = use_confidence_scores_for_ranking
model.prediction_heads[0].use_no_answer_legacy_confidence = use_no_answer_legacy_confidence
model.eval()

View File

@ -22,6 +22,7 @@ from haystack.modeling.model.optimization import get_scheduler, WrappedDataParal
from haystack.modeling.utils import GracefulKiller
from haystack.utils.experiment_tracking import Tracker as tracker
from haystack.utils.early_stopping import EarlyStopping
from haystack.telemetry import send_event
logger = logging.getLogger(__name__)
@ -163,6 +164,7 @@ class Trainer:
:return: Returns the model after training. When you do ``early_stopping``
with a ``save_dir`` the best model is loaded and returned.
"""
send_event("Trainer.train()")
# connect the prediction heads with the right output from processor
self.model.connect_heads_with_processor(self.data_silo.processor.tasks, require_labels=True)
# Check that the tokenizer(s) fits the language model(s)

View File

@ -30,6 +30,7 @@ from haystack.modeling.utils import initialize_device_settings
from haystack.nodes.base import BaseComponent
from haystack.schema import Document
from haystack.utils.reflection import retry_with_exponential_backoff
from haystack.telemetry_2 import send_event
logger = logging.getLogger(__name__)
@ -769,6 +770,7 @@ class PromptNode(BaseComponent):
:param stop_words: Stops text generation if any one of the stop words is generated.
:param model_kwargs: Additional keyword arguments passed when loading the model specified by `model_name_or_path`.
"""
send_event("PromptNode initialized")
super().__init__()
self.prompt_templates: Dict[str, PromptTemplate] = {pt.name: pt for pt in get_predefined_prompt_templates()} # type: ignore
self.default_prompt_template: Union[str, PromptTemplate, None] = default_prompt_template
@ -827,6 +829,7 @@ class PromptNode(BaseComponent):
:param prompt_template: The name or object of the optional PromptTemplate to use.
:return: A list of strings as model responses.
"""
send_event("PromptNode.prompt()", event_properties={"template": str(prompt_template)})
results = []
# we pop the prompt_collector kwarg to avoid passing it to the model
prompt_collector: List[str] = kwargs.pop("prompt_collector", [])

View File

@ -28,6 +28,7 @@ from haystack.schema import Document, Answer, Span
from haystack.document_stores.base import BaseDocumentStore
from haystack.nodes.reader.base import BaseReader
from haystack.utils.early_stopping import EarlyStopping
from haystack.telemetry_2 import send_event
logger = logging.getLogger(__name__)
@ -434,6 +435,7 @@ class FARMReader(BaseReader):
:param max_query_length: Maximum length of the question in number of tokens.
:return: None
"""
send_event("FARMReader.train()")
return self._training_procedure(
data_dir=data_dir,
train_filename=train_filename,
@ -555,6 +557,7 @@ class FARMReader(BaseReader):
:param early_stopping: An initialized EarlyStopping object to control early stopping and saving of the best models.
:return: None
"""
send_event("FARMReader.distil_prediction_layer_from()")
return self._training_procedure(
data_dir=data_dir,
train_filename=train_filename,
@ -677,6 +680,7 @@ class FARMReader(BaseReader):
:param early_stopping: An initialized EarlyStopping object to control early stopping and saving of the best models.
:return: None
"""
send_event("FARMReader.distil_intermediate_layers_from()")
return self._training_procedure(
data_dir=data_dir,
train_filename=train_filename,
@ -938,7 +942,7 @@ class FARMReader(BaseReader):
"Hence, results might slightly differ from those of `Pipeline.eval()`\n."
"If you are just about starting to evaluate your model consider using `Pipeline.eval()` instead."
)
send_event("FARMReader.eval_on_file()")
if device is None:
device = self.devices[0]
else:
@ -1016,7 +1020,7 @@ class FARMReader(BaseReader):
"Hence, results might slightly differ from those of `Pipeline.eval()`\n."
"If you are just about starting to evaluate your model consider using `Pipeline.eval()` instead."
)
send_event("FARMReader.eval()")
if device is None:
device = self.devices[0]
else:

View File

@ -31,6 +31,7 @@ from haystack.nodes.retriever._losses import _TRAINING_LOSSES
from haystack.nodes.retriever._openai_encoder import _OpenAIEmbeddingEncoder
from haystack.schema import Document
from haystack.utils.reflection import retry_with_exponential_backoff
from haystack.telemetry_2 import send_event
from ._base_embedding_encoder import _BaseEmbeddingEncoder
@ -199,6 +200,7 @@ class _SentenceTransformersEmbeddingEncoder(_BaseEmbeddingEncoder):
reference the Sentence-Transformers [documentation](https://www.sbert.net/docs/training/overview.html#sentence_transformers.SentenceTransformer.fit)
for a full list of keyword arguments.
"""
send_event("SentenceTransformersEmbeddingEncoder.train()")
if train_loss not in _TRAINING_LOSSES:
raise ValueError(f"Unrecognized train_loss {train_loss}. Should be one of: {_TRAINING_LOSSES.keys()}")

View File

@ -10,6 +10,7 @@ from tqdm.auto import tqdm
from haystack.schema import Document, MultiLabel
from haystack.errors import HaystackError, PipelineError
from haystack.nodes.base import BaseComponent
from haystack.telemetry_2 import send_event
from haystack.document_stores.base import BaseDocumentStore, BaseKnowledgeGraph, FilterType
@ -153,7 +154,7 @@ class BaseRetriever(BaseComponent):
contains the keys "predictions" and "metrics".
:param headers: Custom HTTP headers to pass to document store client if supported (e.g. {'Authorization': 'Basic YWRtaW46cm9vdA=='} for basic authentication)
"""
send_event("BaseRetriever.eval()")
# Extract all questions for evaluation
filters: Dict = {"origin": [label_origin]}

View File

@ -43,6 +43,7 @@ from haystack.modeling.data_handler.dataloader import NamedDataLoader
from haystack.modeling.model.optimization import initialize_optimizer
from haystack.modeling.training.base import Trainer
from haystack.modeling.utils import initialize_device_settings
from haystack.telemetry_2 import send_event
logger = logging.getLogger(__name__)
@ -654,6 +655,7 @@ class DensePassageRetriever(DenseRetriever):
Checkpoints can be stored via setting `checkpoint_every` to a custom number of steps.
If any checkpoints are stored, a subsequent run of train() will resume training from the latest available checkpoint.
"""
send_event("DensePassageRetriever.train()")
self.processor.embed_title = embed_title
self.processor.data_dir = Path(data_dir)
self.processor.train_filename = train_filename
@ -1305,6 +1307,7 @@ class TableTextRetriever(DenseRetriever):
:param checkpoints_to_keep: The maximum number of train checkpoints to save.
:param early_stopping: An initialized EarlyStopping object to control early stopping and saving of the best models.
"""
send_event("TableTextRetriever.train()")
if embed_meta_fields is None:
embed_meta_fields = ["page_title", "section_title", "caption"]
@ -1910,6 +1913,7 @@ class EmbeddingRetriever(DenseRetriever):
reference the Sentence-Transformers [documentation](https://www.sbert.net/docs/training/overview.html#sentence_transformers.SentenceTransformer.fit)
for a full list of keyword arguments.
"""
send_event("EmbeddingRetriever.train()")
self.embedding_encoder.train(
training_data,
learning_rate=learning_rate,

View File

@ -22,6 +22,7 @@ import tempfile
from pathlib import Path
import yaml
import mmh3
import numpy as np
import pandas as pd
import networkx as nx
@ -53,6 +54,7 @@ from haystack.nodes.retriever.base import BaseRetriever
from haystack.document_stores.base import BaseDocumentStore
from haystack.telemetry import send_event, send_custom_event, is_telemetry_enabled
from haystack.utils.experiment_tracking import MLflowTrackingHead, Tracker as tracker
from haystack.telemetry_2 import send_pipeline_run_event, send_pipeline_event, send_event as send_event_2
logger = logging.getLogger(__name__)
@ -79,6 +81,7 @@ class Pipeline:
self.last_window_run_total = 0
self.run_total = 0
self.sent_event_in_window = False
self.yaml_hash = False
@property
def root_node(self) -> Optional[str]:
@ -432,6 +435,14 @@ class Pipeline:
node={"name": name, "inputs": inputs},
instance=component,
)
# TELEMETRY: Hash the config of the pipeline without node names
# to be able to cluster later by "pipeline type"
# (is any specific pipeline configuration very popular?)
fingerprint_config = copy.copy(self.get_config())
for comp in fingerprint_config["components"]:
del comp["name"]
fingerprint = json.dumps(fingerprint_config, default=str)
self.fingerprint = "{:02x}".format(mmh3.hash128(fingerprint, signed=False))
def get_node(self, name: str) -> Optional[BaseComponent]:
"""
@ -482,6 +493,18 @@ class Pipeline:
about their execution. By default, this information includes the input parameters
the Nodes received and the output they generated. You can then find all debug information in the dictionary returned by this method under the key `_debug`.
"""
send_pipeline_run_event(
pipeline=self,
event_name="Pipeline.run()",
query=query,
file_paths=file_paths,
labels=labels,
documents=documents,
meta=meta,
params=params,
debug=debug,
)
# validate the node names
self._validate_node_names_in_params(params=params)
@ -618,6 +641,18 @@ class Pipeline:
about their execution. By default, this information includes the input parameters
the Nodes received and the output they generated. You can then find all debug information in the dictionary returned by this method under the key `_debug`.
"""
send_pipeline_run_event(
pipeline=self,
event_name="Pipeline.run_batch()",
queries=queries,
file_paths=file_paths,
labels=labels,
documents=documents,
meta=meta,
params=params,
debug=debug,
)
if file_paths is not None or meta is not None:
logger.info(
"It seems that an indexing Pipeline is run, so using the nodes' run method instead of run_batch."
@ -773,6 +808,17 @@ class Pipeline:
Returns a tuple containing the ncdg, map, recall and precision scores.
Each metric is represented by a dictionary containing the scores for each top_k value.
"""
send_event_2(
event_name="Pipeline.eval_beir()",
event_properties={
"dataset": dataset,
"index_pipeline": index_pipeline.yaml_hash,
"query_pipeline": query_pipeline.yaml_hash,
"num_documents": num_documents,
"top_k_values": top_k_values,
},
)
if index_params is None:
index_params = {}
if query_params is None:
@ -1211,6 +1257,8 @@ class Pipeline:
Additional information can be found here
https://huggingface.co/transformers/main_classes/model.html#transformers.PreTrainedModel.from_pretrained
"""
send_pipeline_event(pipeline=self, event_name="Pipeline.eval()")
eval_result = EvaluationResult()
if add_isolated_node_eval:
params = {} if params is None else params.copy()
@ -1328,6 +1376,8 @@ class Pipeline:
Additional information can be found here
https://huggingface.co/transformers/main_classes/model.html#transformers.PreTrainedModel.from_pretrained
"""
send_pipeline_event(pipeline=self, event_name="Pipeline.eval_batch()")
eval_result = EvaluationResult()
if add_isolated_node_eval:
params = {} if params is None else params.copy()
@ -1956,14 +2006,15 @@ class Pipeline:
`_` sign must be used to specify nested hierarchical properties.
:param strict_version_check: whether to fail in case of a version mismatch (throws a warning otherwise)
"""
config = read_pipeline_config_from_yaml(path)
return cls.load_from_config(
pipeline = cls.load_from_config(
pipeline_config=config,
pipeline_name=pipeline_name,
overwrite_with_env_variables=overwrite_with_env_variables,
strict_version_check=strict_version_check,
)
pipeline.yaml_hash = "{:02x}".format(mmh3.hash128(str(path), signed=False))
return pipeline
@classmethod
def load_from_config(

View File

@ -17,10 +17,12 @@ from pathlib import Path
import yaml
import posthog
from haystack.environment import HAYSTACK_EXECUTION_CONTEXT, get_or_create_env_meta_data
from haystack.environment import get_or_create_env_meta_data
posthog.api_key = "phc_F5v11iI2YHkoP6Er3cPILWSrLhY3D6UY4dEMga4eoaa"
posthog.host = "https://tm.hs.deepset.ai"
HAYSTACK_EXECUTION_CONTEXT = "HAYSTACK_EXECUTION_CONTEXT"
HAYSTACK_DOCKER_CONTAINER = "HAYSTACK_DOCKER_CONTAINER"
HAYSTACK_TELEMETRY_ENABLED = "HAYSTACK_TELEMETRY_ENABLED"
HAYSTACK_TELEMETRY_LOGGING_TO_FILE_ENABLED = "HAYSTACK_TELEMETRY_LOGGING_TO_FILE_ENABLED"
CONFIG_PATH = Path("~/.haystack/config.yaml").expanduser()
@ -142,6 +144,8 @@ def send_custom_event(event: str = "", payload: Optional[Dict[str, Any]] = None)
:param event: Name of the event. Use a noun and a verb, e.g., "evaluation started", "component created"
:param payload: A dictionary containing event meta data, e.g., parameter settings
"""
if os.environ.get("HAYSTACK_TELEMETRY_VERSION", "2") != "1":
return
global user_id # pylint: disable=global-statement
if payload is None:
payload = {}
@ -179,6 +183,7 @@ def send_custom_event(event: str = "", payload: Optional[Dict[str, Any]] = None)
return
except Exception as e:
print("Exception! ", e)
logger.debug("Telemetry was not able to send an event.", exc_info=e)

238
haystack/telemetry_2.py Normal file
View File

@ -0,0 +1,238 @@
import os
from typing import Any, Dict, Optional, List, Union
import uuid
import logging
from pathlib import Path
import json
import yaml
import posthog
from haystack.environment import collect_static_system_specs, collect_dynamic_system_specs
HAYSTACK_TELEMETRY_ENABLED = "HAYSTACK_TELEMETRY_ENABLED"
HAYSTACK_EXECUTION_CONTEXT = "HAYSTACK_EXECUTION_CONTEXT"
HAYSTACK_DOCKER_CONTAINER = "HAYSTACK_DOCKER_CONTAINER"
CONFIG_PATH = Path("~/.haystack/config.yaml").expanduser()
LOG_PATH = Path("~/.haystack/telemetry.log").expanduser()
logger = logging.getLogger(__name__)
class Telemetry:
"""
Haystack reports anonymous usage statistics to support continuous software improvements for all its users.
You can opt-out of sharing usage statistics by manually setting the environment
variable `HAYSTACK_TELEMETRY_ENABLED` as described for different operating systems on the
[documentation page](https://docs.haystack.deepset.ai/docs/telemetry#how-can-i-opt-out).
Check out the documentation for more details: [Telemetry](https://docs.haystack.deepset.ai/docs/telemetry).
"""
def __init__(self):
"""
Initializes the telemetry. Loads the user_id from the config file,
or creates a new id and saves it if the file is not found.
It also collects system information which cannot change across the lifecycle
of the process (for example `is_containerized()`).
"""
posthog.api_key = "phc_C44vUK9R1J6HYVdfJarTEPqVAoRPJzMXzFcj8PIrJgP"
posthog.host = "https://eu.posthog.com"
# disable posthog logging
for module_name in ["posthog", "backoff"]:
logging.getLogger(module_name).setLevel(logging.CRITICAL)
# Prevent module from sending errors to stderr when an exception is encountered during an emit() call
logging.getLogger(module_name).addHandler(logging.NullHandler())
logging.getLogger(module_name).propagate = False
self.user_id = None
if CONFIG_PATH.exists():
# Load the config file
try:
with open(CONFIG_PATH, "r", encoding="utf-8") as config_file:
config = yaml.safe_load(config_file)
if "user_id" in config:
self.user_id = config["user_id"]
except Exception as e:
logger.debug("Telemetry could not read the config file %s", CONFIG_PATH, exc_info=e)
else:
# Create the config file
logger.info(
"Haystack sends anonymous usage data to understand the actual usage and steer dev efforts "
"towards features that are most meaningful to users. You can opt-out at anytime by manually "
"setting the environment variable HAYSTACK_TELEMETRY_ENABLED as described for different "
"operating systems in the [documentation page](https://docs.haystack.deepset.ai/docs/telemetry#how-can-i-opt-out). "
"More information at [Telemetry](https://docs.haystack.deepset.ai/docs/telemetry)."
)
CONFIG_PATH.parents[0].mkdir(parents=True, exist_ok=True)
self.user_id = str(uuid.uuid4())
try:
with open(CONFIG_PATH, "w") as outfile:
yaml.dump({"user_id": self.user_id}, outfile, default_flow_style=False)
except Exception as e:
logger.debug("Telemetry could not write config file to %s", CONFIG_PATH, exc_info=e)
self.event_properties = collect_static_system_specs()
def send_event(self, event_name: str, event_properties: Optional[Dict[str, Any]] = None):
"""
Sends a telemetry event.
:param event_name: The name of the event to show in PostHog.
:param event_properties: Additional event metadata. These are merged with the
system metadata collected in __init__, so take care not to overwrite them.
"""
event_properties = event_properties or {}
dynamic_specs = collect_dynamic_system_specs()
try:
posthog.capture(
distinct_id=self.user_id,
event=event_name,
# loads/dumps to sort the keys
properties=json.loads(
json.dumps({**self.event_properties, **dynamic_specs, **event_properties}, sort_keys=True)
),
)
except Exception as e:
logger.debug("Telemetry couldn't make a POST request to PostHog.", exc_info=e)
def send_pipeline_run_event( # type: ignore
event_name: str,
pipeline: "Pipeline", # type: ignore
query: Optional[str] = None,
queries: Optional[List[str]] = None,
file_paths: Optional[List[str]] = None,
labels: Optional[Union["MultiLabel", List["MultiLabel"]]] = None, # type: ignore
documents: Optional[Union[List["Document"], List[List["Document"]]]] = None, # type: ignore
meta: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None,
params: Optional[dict] = None,
debug: Optional[bool] = None,
):
"""
Sends a telemetry event about the execution of a pipeline, if telemetry is enabled.
:param event_name: The name of the event to show in PostHog.
:param pipeline: the pipeline that is running
:param query: the value of the `query` input of the pipeline, if any
:param queries: the value of the `queries` input of the pipeline, if any
:param file_paths: the value of the `file_paths` input of the pipeline, if any
:param labels: the value of the `labels` input of the pipeline, if any
:param documents: the value of the `documents` input of the pipeline, if any
:param meta: the value of the `meta` input of the pipeline, if any
:param params: the value of the `params` input of the pipeline, if any
:param debug: the value of the `debug` input of the pipeline, if any
"""
try:
if telemetry:
event_properties: Dict[str, Optional[Union[str, bool, int, Dict[str, Any]]]] = {}
# Check if it's the public demo
exec_context = os.environ.get(HAYSTACK_EXECUTION_CONTEXT, "")
if exec_context == "public_demo":
event_properties["pipeline.is_public_demo"] = True
event_properties["pipeline.run_parameters.query"] = query
event_properties["pipeline.run_parameters.params"] = params
telemetry.send_event(event_name=event_name, event_properties=event_properties)
return
# Collect pipeline profile
event_properties["pipeline.classname"] = pipeline.__class__.__name__
event_properties["pipeline.fingerprint"] = pipeline.fingerprint
if pipeline.yaml_hash:
event_properties["pipeline.yaml_hash"] = pipeline.yaml_hash
# Add document store
docstore = pipeline.get_document_store()
if docstore:
event_properties["pipeline.document_store"] = docstore.__class__.__name__
# Add an entry for each node class and classify the pipeline by its root node
for node in pipeline.graph.nodes:
node_type = pipeline.graph.nodes.get(node)["component"].__class__.__name__
if node_type == "RootNode":
event_properties["pipeline.type"] = node
else:
event_properties["pipeline.nodes." + node_type] = (
event_properties.get("pipeline.nodes." + node_type, 0) + 1 # type: ignore
)
# Inputs of the run() or run_batch() call
if isinstance(labels, list):
labels_len = len(labels)
else:
labels_len = 1 if labels else 0
if documents and isinstance(documents, list) and isinstance(documents[0], list):
documents_len = [len(docs) if isinstance(docs, list) else 0 for docs in documents]
elif isinstance(documents, list):
documents_len = [len(documents)]
else:
documents_len = [0]
if meta and isinstance(meta, list):
meta_len = len(meta)
else:
meta_len = 1
event_properties["pipeline.run_parameters.queries"] = len(queries) if queries else bool(query)
event_properties["pipeline.run_parameters.file_paths"] = len(file_paths or [])
event_properties["pipeline.run_parameters.labels"] = labels_len
event_properties["pipeline.run_parameters.documents"] = documents_len # type: ignore
event_properties["pipeline.run_parameters.meta"] = meta_len
event_properties["pipeline.run_parameters.params"] = bool(params)
event_properties["pipeline.run_parameters.debug"] = bool(debug)
telemetry.send_event(event_name=event_name, event_properties=event_properties)
except Exception as e:
# Never let telemetry break things
logger.debug("There was an issue sending a %s telemetry event", event_name, exc_info=e)
def send_pipeline_event(pipeline: "Pipeline", event_name: str): # type: ignore
"""
Send a telemetry event related to a pipeline which is not a call to run(), if telemetry is enabled.
"""
try:
if telemetry:
telemetry.send_event(
event_name=event_name,
event_properties={
"pipeline.classname": pipeline.__class__.__name__,
"pipeline.fingerprint": pipeline.fingerprint,
"pipeline.yaml_hash": pipeline.yaml_hash,
},
)
except Exception as e:
# Never let telemetry break things
logger.debug("There was an issue sending a '%s' telemetry event", event_name, exc_info=e)
def send_event(event_name: str, event_properties: Optional[Dict[str, Any]] = None):
"""
Send a telemetry event, if telemetry is enabled.
"""
try:
if telemetry:
telemetry.send_event(event_name=event_name, event_properties=event_properties)
except Exception as e:
# Never let telemetry break things
logger.debug("There was an issue sending a '%s' telemetry event", event_name, exc_info=e)
def _serializer(obj):
"""
Small function used to build pipeline fingerprints and safely serialize any object.
"""
try:
return str(obj)
except:
return "~ non serializable object ~"
if os.environ.get("HAYSTACK_TELEMETRY_VERSION", "2") == "2":
telemetry = Telemetry()
else:
telemetry = None # type: ignore

View File

@ -1,10 +1,13 @@
from typing import Optional, Any, Dict, Union
from abc import ABC, abstractmethod
import logging
from pathlib import Path
from typing import Optional, Any, Dict, Union
import mlflow
from requests.exceptions import ConnectionError
from haystack import __version__
from haystack.environment import get_or_create_env_meta_data

View File

@ -12,6 +12,7 @@ import requests
from haystack.telemetry import send_tutorial_event
logger = logging.getLogger(__name__)

View File

@ -1,130 +0,0 @@
from pathlib import Path
from unittest.mock import patch, PropertyMock
import pytest
from haystack import telemetry
from haystack.errors import PipelineSchemaError
from haystack.telemetry import (
NonPrivateParameters,
send_event,
enable_writing_events_to_file,
disable_writing_events_to_file,
send_custom_event,
_delete_telemetry_file,
disable_telemetry,
enable_telemetry,
TelemetryFileType,
_write_telemetry_config,
)
@patch.object(
NonPrivateParameters, "param_names", return_value=["top_k", "model_name_or_path"], new_callable=PropertyMock
)
def test_private_params_not_tracked(mock_nonprivateparameters):
params = {"hostname": "private_hostname", "top_k": 2}
tracked_params = NonPrivateParameters.apply_filter(params)
expected_params = {"top_k": 2}
assert tracked_params == expected_params
@patch.object(
NonPrivateParameters, "param_names", return_value=["top_k", "model_name_or_path"], new_callable=PropertyMock
)
def test_non_private_params_tracked(mock_nonprivateparameters):
params = {"model_name_or_path": "test-model", "top_k": 2}
non_private_params = NonPrivateParameters.apply_filter(params)
assert non_private_params == params
@patch.object(NonPrivateParameters, "param_names", return_value=[], new_callable=PropertyMock)
def test_only_non_private_params(mock_nonprivateparameters):
non_private_params = NonPrivateParameters.apply_filter({"top_k": 2})
assert non_private_params == {}
@pytest.mark.integration
@patch("posthog.capture")
@patch.object(
NonPrivateParameters,
"param_names",
return_value=["top_k", "model_name_or_path", "add_isolated_node_eval"],
new_callable=PropertyMock,
)
# patches are applied in bottom-up order, which is why mock_nonprivateparameters is the first parameter and mock_posthog_capture is the second
def test_send_event_via_decorator(mock_nonprivateparameters, mock_posthog_capture):
class TestClass:
@send_event
def run(self, add_isolated_node_eval: bool = False):
pass
test_class = TestClass()
test_class.run(add_isolated_node_eval=True)
# todo replace [1] with .kwargs when moving from python 3.7 to 3.8 in CI
assert mock_posthog_capture.call_args[1]["event"] == "TestClass.run executed"
assert mock_posthog_capture.call_args[1]["properties"]["add_isolated_node_eval"]
@pytest.mark.integration
@patch("posthog.capture")
def test_send_event_if_custom_error_raised(mock_posthog_capture):
with pytest.raises(PipelineSchemaError):
raise PipelineSchemaError
# todo replace [1] with .kwargs when moving from python 3.7 to 3.8 in CI
assert mock_posthog_capture.call_args[1]["event"] == "PipelineSchemaError raised"
def num_lines(path: Path):
if path.is_file():
with open(path, "r") as f:
return len(f.readlines())
return 0
@pytest.mark.integration
@patch("posthog.capture")
def test_write_to_file(mock_posthog_capture, monkeypatch):
monkeypatch.setattr(telemetry, "LOG_PATH", Path("~/.haystack/telemetry_test.log").expanduser())
num_lines_before = num_lines(telemetry.LOG_PATH)
send_custom_event(event="test")
num_lines_after = num_lines(telemetry.LOG_PATH)
assert num_lines_before == num_lines_after
enable_writing_events_to_file()
num_lines_before = num_lines(telemetry.LOG_PATH)
send_custom_event(event="test")
num_lines_after = num_lines(telemetry.LOG_PATH)
assert num_lines_before + 1 == num_lines_after
disable_writing_events_to_file()
num_lines_before = num_lines(telemetry.LOG_PATH)
send_custom_event(event="test")
num_lines_after = num_lines(telemetry.LOG_PATH)
assert num_lines_before == num_lines_after
_delete_telemetry_file(TelemetryFileType.LOG_FILE)
@pytest.mark.integration
@patch("posthog.capture")
def test_disable_enable_telemetry(mock_posthog_capture, monkeypatch):
monkeypatch.setattr(telemetry, "HAYSTACK_TELEMETRY_ENABLED", "HAYSTACK_TELEMETRY_ENABLED_TEST")
monkeypatch.setattr(telemetry, "CONFIG_PATH", Path("~/.haystack/config_test.yaml").expanduser())
# config_test.yaml doesn't exist yet and won't be created automatically because the global user_id might have been set already by other tests
_write_telemetry_config()
send_custom_event(event="test")
send_custom_event(event="test")
assert mock_posthog_capture.call_count == 2, "two events should be sent"
disable_telemetry()
send_custom_event(event="test")
assert mock_posthog_capture.call_count == 3, "one additional event should be sent"
# todo replace [1] with .kwargs when moving from python 3.7 to 3.8 in CI
assert mock_posthog_capture.call_args[1]["event"] == "telemetry disabled", "a final event should be sent"
send_custom_event(event="test")
assert mock_posthog_capture.call_count == 3, "no additional event should be sent"
enable_telemetry()
send_custom_event(event="test")
assert mock_posthog_capture.call_count == 4, "one additional event should be sent"

View File

@ -2048,44 +2048,3 @@ def test_fix_to_pipeline_execution_when_join_follows_join():
res = pipeline.run(query="Alpha Beta Gamma Delta")
documents = res["documents"]
assert len(documents) == 4 # all four documents should be found
def test_send_pipeline_event():
"""
Test the event can be sent and the internal fields are correctly set
"""
pipeline = Pipeline()
pipeline.add_node(MockNode(), name="mock_node", inputs=["Query"])
with mock.patch("haystack.pipelines.base.send_custom_event") as mocked_send:
today_at_midnight = datetime.datetime.combine(datetime.datetime.now(), datetime.time.min, datetime.timezone.utc)
pipeline.send_pipeline_event()
mocked_send.assert_called_once()
assert pipeline.time_of_last_sent_event == today_at_midnight
assert pipeline.last_window_run_total == 0
def test_send_pipeline_event_unserializable_param():
"""
Test the event can be sent even when a certain component was initialized with a
non-serializable parameter, see https://github.com/deepset-ai/haystack/issues/3833
"""
class CustomNode(MockNode):
"""A mock node that can be inited passing a param"""
def __init__(self, param):
self.param = param
# create a custom node passing a parameter that can't be serialized (an empty set)
custom_node = CustomNode(param=set())
pipeline = Pipeline()
pipeline.add_node(custom_node, name="custom_node", inputs=["Query"])
with mock.patch("haystack.pipelines.base.send_custom_event") as mocked_send:
today_at_midnight = datetime.datetime.combine(datetime.datetime.now(), datetime.time.min, datetime.timezone.utc)
pipeline.send_pipeline_event()
mocked_send.assert_called_once()
assert pipeline.time_of_last_sent_event == today_at_midnight
assert pipeline.last_window_run_total == 0