haystack/ui/utils.py
Fabrice Depaulis 77d52ad215
Rely api healthcheck on status code rather than json decoding (#1871)
* Rely api healthcheck on status code rather than json decoding

* Install UI dependencies on the Linux and Windows CI

Co-authored-by: Fabrice Depaulis <fabrice.depaulis@orange.com>
Co-authored-by: ZanSara <sarazanzo94@gmail.com>
2021-12-10 18:05:23 +01:00

123 lines
3.9 KiB
Python

from typing import List, Dict, Any, Tuple
import os
import logging
import requests
from time import sleep
from uuid import uuid4
import streamlit as st
API_ENDPOINT = os.getenv("API_ENDPOINT", "http://localhost:8000")
STATUS = "initialized"
HS_VERSION = "hs_version"
DOC_REQUEST = "query"
DOC_FEEDBACK = "feedback"
DOC_UPLOAD = "file-upload"
def haystack_is_ready():
"""
Used to show the "Haystack is loading..." message
"""
url = f"{API_ENDPOINT}/{STATUS}"
try:
if requests.get(url).status_code < 400:
return True
except Exception as e:
logging.exception(e)
sleep(1) # To avoid spamming a non-existing endpoint at startup
return False
@st.cache
def haystack_version():
"""
Get the Haystack version from the REST API
"""
url = f"{API_ENDPOINT}/{HS_VERSION}"
return requests.get(url, timeout=0.1).json()["hs_version"]
def query(query, filters={}, top_k_reader=5, top_k_retriever=5) -> Tuple[List[Dict[str, Any]], Dict[str, str]]:
"""
Send a query to the REST API and parse the answer.
Returns both a ready-to-use representation of the results and the raw JSON.
"""
url = f"{API_ENDPOINT}/{DOC_REQUEST}"
params = {"filters": filters, "Retriever": {"top_k": top_k_retriever}, "Reader": {"top_k": top_k_reader}}
req = {"query": query, "params": params}
response_raw = requests.post(url, json=req)
if response_raw.status_code >= 400 and response_raw.status_code != 503:
raise Exception(f"{vars(response_raw)}")
response = response_raw.json()
if "errors" in response:
raise Exception(", ".join(response["errors"]))
# Format response
results = []
answers = response["answers"]
for answer in answers:
if answer.get("answer", None):
results.append(
{
"context": "..." + answer["context"] + "...",
"answer": answer.get("answer", None),
"source": answer["meta"]["name"],
"relevance": round(answer["score"] * 100, 2),
"document": [doc for doc in response["documents"] if doc["id"] == answer["document_id"]][0],
"offset_start_in_doc": answer["offsets_in_document"][0]["start"],
"_raw": answer
}
)
else:
results.append(
{
"context": None,
"answer": None,
"document": None,
"relevance": round(answer["score"] * 100, 2),
"_raw": answer,
}
)
return results, response
def send_feedback(query, answer_obj, is_correct_answer, is_correct_document, document) -> None:
"""
Send a feedback (label) to the REST API
"""
url = f"{API_ENDPOINT}/{DOC_FEEDBACK}"
req = {
"id": str(uuid4()),
"query": query,
"document": document,
"is_correct_answer": is_correct_answer,
"is_correct_document": is_correct_document,
"origin": "user-feedback",
"answer": answer_obj
}
response_raw = requests.post(url, json=req)
if response_raw.status_code >= 400:
raise ValueError(f"An error was returned [code {response_raw.status_code}]: {response_raw.json()}")
def upload_doc(file):
url = f"{API_ENDPOINT}/{DOC_UPLOAD}"
files = [("files", file)]
response = requests.post(url, files=files).json()
return response
def get_backlink(result) -> Tuple[str, str]:
if result.get("document", None):
doc = result["document"]
if isinstance(doc, dict):
if doc.get("meta", None):
if isinstance(doc["meta"], dict):
if doc["meta"].get("url", None) and doc["meta"].get("title", None):
return doc["meta"]["url"], doc["meta"]["title"]
return None, None