test: Add scripts to send benchmark results to datadog (#5432)

* Add config files

* log benchmarks to stdout

* Add top-k and batch size to configs

* Add batch size to configs

* fix: don't download files if they already exist

* Add batch size to configs

* refine script

* Remove configs using 1m docs

* update run script

* update run script

* update run script

* datadog integration

* remove out folder

* gitignore benchmarks output

* test: send benchmarks to datadog

* remove uncommented lines in script

* feat: take branch/tag argument for benchmark setup script

* fix: run.sh should ignore errors

* Remove changes unrelated to datadog

* Apply black

* Update test/benchmarks/utils.py

Co-authored-by: Silvano Cerza <3314350+silvanocerza@users.noreply.github.com>

* PR feedback

* Account for reader benchmarks not doing indexing

* Change key of reader metrics

* Apply PR feedback

* Remove whitespace

---------

Co-authored-by: rjanjua <rohan.janjua@gmail.com>
Co-authored-by: Silvano Cerza <3314350+silvanocerza@users.noreply.github.com>
This commit is contained in:
bogdankostic 2023-08-03 10:09:00 +02:00 committed by GitHub
parent a26859f065
commit 56cea8cbbd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 332 additions and 6 deletions

View File

@ -0,0 +1,153 @@
from enum import Enum
from itertools import chain
from time import time
from typing import Dict, List, Optional
import datadog
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
import logging
LOGGER = logging.getLogger(__name__)
class Tag(Enum):
@classmethod
def values(cls):
return [e.value for e in cls]
class NoneTag(Tag):
none = "none_none_none_none-1234" # should not match any other tag
class DatasetSizeTags(Tag):
size_100k = "dataset_size:100k"
class ReaderModelTags(Tag):
debertabase = "reader:debertabase"
debertalarge = "reader:debertalarge"
tinyroberta = "reader:tinyroberta"
class RetrieverModelTags(Tag):
bm25 = "retriever:bm25"
minilm = "retriever:minilm"
mpnetbase = "retriever:mpnetbase"
class DocumentStoreModelTags(Tag):
opensearch = "documentstore:opensearch"
elasticsearch = "documentstore:elasticsearch"
weaviate = "documentstore:weaviate"
class BenchmarkType(Tag):
retriever = "benchmark_type:retriever"
retriever_reader = "benchmark_type:retriever_reader"
reader = "benchmark_type:reader"
class CustomDatadogMetric:
name: str
timestamp: float
value: float
tags: List[Tag]
def __init__(self, name: str, value: float, tags: Optional[List[Tag]] = None) -> None:
self.timestamp = time()
self.name = name
self.value = value
self.tags = self.validate_tags(tags) if tags is not None else []
def validate_tags(self, tags: List[Tag]) -> List[Tag]:
valid_tags: List[Tag] = []
for tag in tags:
if isinstance(
tag, (DatasetSizeTags, ReaderModelTags, RetrieverModelTags, DocumentStoreModelTags, BenchmarkType)
):
valid_tags.append(tag)
elif tag != NoneTag.none:
# Log invalid tags as errors
LOGGER.error(f"Tag is not a valid dataset or environment tag: tag={tag}")
return valid_tags
class IndexingDocsPerSecond(CustomDatadogMetric):
def __init__(self, value: float, tags: Optional[List[Tag]] = None) -> None:
name = "haystack.benchmarks.indexing.docs_per_second"
super().__init__(name=name, value=value, tags=tags)
class QueryingExactMatchMetric(CustomDatadogMetric):
def __init__(self, value: float, tags: Optional[List[Tag]] = None) -> None:
name = "haystack.benchmarks.querying.exact_match"
super().__init__(name=name, value=value, tags=tags)
class QueryingF1Metric(CustomDatadogMetric):
def __init__(self, value: float, tags: Optional[List[Tag]] = None) -> None:
name = "haystack.benchmarks.querying.f1_score"
super().__init__(name=name, value=value, tags=tags)
class QueryingRecallMetric(CustomDatadogMetric):
def __init__(self, value: float, tags: Optional[List[Tag]] = None) -> None:
name = "haystack.benchmarks.querying.recall"
super().__init__(name=name, value=value, tags=tags)
class QueryingMapMetric(CustomDatadogMetric):
def __init__(self, value: float, tags: Optional[List[Tag]] = None) -> None:
name = "haystack.benchmarks.querying.map"
super().__init__(name=name, value=value, tags=tags)
class QueryingSecondsPerQueryMetric(CustomDatadogMetric):
def __init__(self, value: float, tags: Optional[List[Tag]] = None) -> None:
name = "haystack.benchmarks.querying.seconds_per_query"
super().__init__(name=name, value=value, tags=tags)
class MetricsAPI:
def __init__(self, datadog_api_key: str, datadog_host: str):
self.datadog_api_key = datadog_api_key
self.datadog_host = datadog_host
@retry(retry=retry_if_exception_type(ConnectionError), wait=wait_fixed(5), stop=stop_after_attempt(3), reraise=True)
def send_custom_dd_metric(self, metric: CustomDatadogMetric) -> dict:
datadog.initialize(api_key=self.datadog_api_key, host_name=self.datadog_host)
tags: List[str] = list(map(lambda t: str(t.value), metric.tags))
post_metric_response: Dict = datadog.api.Metric.send(
metric=metric.name, points=[metric.timestamp, metric.value], tags=tags
)
if post_metric_response.get("status") != "ok":
LOGGER.error(
f"Could not send custom metric. Retrying. metric_name={metric.name}, metric_value={metric.value}, "
f"status={post_metric_response.get('status')}, error={post_metric_response.get('errors')}, "
f"{post_metric_response}"
)
raise ConnectionError(f"Could not send custom metric. {post_metric_response}")
else:
LOGGER.info(
f"Sent custom metric. metric_name={metric.name}, metric_value={metric.value}, "
f"status={post_metric_response.get('status')}"
)
return post_metric_response
def send_custom_dd_metrics(self, metrics: List[CustomDatadogMetric]) -> List[Dict]:
responses = []
for metric in metrics:
try:
response = self.send_custom_dd_metric(metric)
responses.append(response)
except ConnectionError as e:
LOGGER.error(
f"Could not send custom metric even after retrying. "
f"metric_name={metric.name}, metric_value={metric.value}"
)
return responses

View File

@ -0,0 +1 @@
datadog==0.45.0

View File

@ -0,0 +1,165 @@
import argparse
import os
import json
from typing import Dict
from metric_handler import (
ReaderModelTags,
NoneTag,
RetrieverModelTags,
DocumentStoreModelTags,
BenchmarkType,
LOGGER,
DatasetSizeTags,
IndexingDocsPerSecond,
QueryingExactMatchMetric,
QueryingF1Metric,
QueryingRecallMetric,
QueryingSecondsPerQueryMetric,
QueryingMapMetric,
MetricsAPI,
Tag,
)
def parse_benchmark_files(folder_path: str) -> Dict:
metrics = {}
for filename in os.listdir(folder_path):
if filename.endswith(".json"):
file_path = os.path.join(folder_path, filename)
with open(file_path, "r") as file:
data = json.load(file)
indexing_metrics = data.get("indexing", {})
querying_metrics = data.get("querying")
config = data.get("config")
if indexing_metrics.get("error") is None and querying_metrics.get("error") is None:
metrics[filename.split(".json")[0]] = {
"indexing": indexing_metrics,
"querying": querying_metrics,
"config": config,
}
return metrics
def get_reader_tag(config: Dict) -> Tag:
for comp in config["components"]:
if comp["name"] == "Reader":
model = comp["params"]["model_name_or_path"]
if model == "deepset/tinyroberta-squad2":
return ReaderModelTags.tinyroberta
if model == "deepset/deberta-v3-base-squad2":
return ReaderModelTags.debertabase
if model == "deepset/deberta-v3-large-squad2":
return ReaderModelTags.debertalarge
return NoneTag.none
def get_retriever_tag(config: Dict) -> Tag:
for comp in config["components"]:
if comp["name"] == "Retriever":
if comp["type"] == "BM25Retriever":
return RetrieverModelTags.bm25
model = comp["params"]["embedding_model"]
if "minilm" in model:
return RetrieverModelTags.minilm
if "mpnet-base" in model:
return RetrieverModelTags.mpnetbase
return NoneTag.none
def get_documentstore_tag(config: Dict) -> Tag:
for comp in config["components"]:
if comp["name"] == "DocumentStore":
if comp["type"] == "ElasticsearchDocumentStore":
return DocumentStoreModelTags.elasticsearch
if comp["type"] == "WeaviateDocumentStore":
return DocumentStoreModelTags.weaviate
if comp["type"] == "OpenSearchDocumentStore":
return DocumentStoreModelTags.opensearch
return NoneTag.none
def get_benchmark_type_tag(reader_tag, retriever_tag, document_store_tag):
if reader_tag != NoneTag.none and retriever_tag != NoneTag.none and document_store_tag != NoneTag.none:
return BenchmarkType.retriever_reader
elif retriever_tag != NoneTag.none and document_store_tag != NoneTag.none:
return BenchmarkType.retriever
elif reader_tag != NoneTag.none and retriever_tag == NoneTag.none:
return BenchmarkType.reader
LOGGER.warn(
f"Did not find benchmark_type for the combination of tags, retriever={retriever_tag}, reader={reader_tag}, "
f"document_store={document_store_tag}"
)
return NoneTag.none
def collect_metrics_from_json_files(folder_path):
benchmark_metrics = parse_benchmark_files(folder_path)
metrics_to_send_to_dd = []
for benchmark_name, metrics in benchmark_metrics.items():
indexing_metrics = metrics["indexing"]
querying_metrics = metrics["querying"]
config = metrics["config"]
docs_per_second = indexing_metrics.get("docs_per_second")
exact_match = querying_metrics.get("exact_match")
f1_score = querying_metrics.get("f1")
recall = querying_metrics.get("recall")
seconds_per_query = querying_metrics.get("seconds_per_query")
map_query = querying_metrics.get("map")
size_tag = DatasetSizeTags.size_100k
reader_tag = get_reader_tag(config)
retriever_tag = get_retriever_tag(config)
document_store_tag = get_documentstore_tag(config)
benchmark_type_tag = get_benchmark_type_tag(reader_tag, retriever_tag, document_store_tag)
tags = [size_tag, reader_tag, retriever_tag, document_store_tag, benchmark_type_tag]
if docs_per_second:
metrics_to_send_to_dd.append(IndexingDocsPerSecond(docs_per_second, tags))
if exact_match or exact_match == 0:
metrics_to_send_to_dd.append(QueryingExactMatchMetric(exact_match, tags))
if f1_score or f1_score == 0:
metrics_to_send_to_dd.append(QueryingF1Metric(f1_score, tags))
if recall or recall == 0:
metrics_to_send_to_dd.append(QueryingRecallMetric(recall, tags))
if seconds_per_query:
metrics_to_send_to_dd.append(QueryingSecondsPerQueryMetric(seconds_per_query, tags))
if map_query or map_query == 0:
metrics_to_send_to_dd.append(QueryingMapMetric(map_query, tags))
return metrics_to_send_to_dd
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("folder_path", type=str, help="Path to the folder with benchmark results")
parser.add_argument("datadog_api_key", type=str, help="Datadog API key")
parser.add_argument("datadog_api_host", type=str, help="Datadog API host")
args = parser.parse_args()
folder_path = args.folder_path
datadog_api_key = args.datadog_api_key
datadog_api_host = args.datadog_api_host
metrics_to_send_to_dd = collect_metrics_from_json_files(folder_path)
api = MetricsAPI(datadog_api_key=datadog_api_key, datadog_host=datadog_api_host)
api.send_custom_dd_metrics(metrics_to_send_to_dd)

View File

@ -34,7 +34,7 @@ def benchmark_reader(pipeline: Pipeline, labels_file: Path) -> Dict:
reader_type, reader_model, reader_top_k = get_reader_config(pipeline)
results = {
"reader": {
"querying": {
"exact_match": metrics["exact_match"],
"f1": metrics["f1"],
"n_queries": len(eval_labels),

View File

@ -4,7 +4,6 @@ import argparse
import json
from haystack import Pipeline
from haystack.nodes import BaseRetriever, BaseReader
from haystack.pipelines.config import read_pipeline_config_from_yaml
from utils import prepare_environment, contains_reader, contains_retriever
@ -72,7 +71,6 @@ if __name__ == "__main__":
config_file = Path(args.config)
output_file = f"{config_file.stem}_results.json" if args.output is None else args.output
results = run_benchmark(config_file)
with open(output_file, "w") as f:
json.dump(results, f, indent=2)

View File

@ -24,7 +24,6 @@ def prepare_environment(pipeline_config: Dict, benchmark_config: Dict):
# Download data if specified in benchmark config
if "data_url" in benchmark_config:
download_from_url(url=benchmark_config["data_url"], target_dir="data/")
n_docs = 0
if "documents_directory" in benchmark_config:
documents_dir = Path(benchmark_config["documents_directory"])
@ -56,18 +55,28 @@ def launch_document_store(document_store: str, n_docs: int = 0):
launch_weaviate(sleep=30, delete_existing=True)
def download_from_url(url: str, target_dir: Union[str, Path]):
def file_previously_downloaded(url_path: Path, target_dir: Union[str, Path]) -> bool:
if ".tar" in url_path.suffixes:
return Path(target_dir, url_path.parent).exists()
return Path(target_dir, url_path.name).exists()
def download_from_url(url: str, target_dir: Union[str, Path]) -> None:
"""
Download from a URL to a local file.
:param url: URL
:param target_dir: Local directory where the URL content will be saved.
"""
url_path = Path(url)
if file_previously_downloaded(url_path, target_dir):
logger.info(f"Skipping download of {url}, as a previous copy exists")
return
if not os.path.exists(target_dir):
os.makedirs(target_dir)
url_path = Path(url)
logger.info("Downloading %s to %s", url_path.name, target_dir)
with tempfile.NamedTemporaryFile() as temp_file:
http_get(url=url, temp_file=temp_file)