feat: LinkContentFetcher - add content-type resolution, user agent switching, PDF handler (#5374)

* Add content type resolution, pdf handler, user agent switching
---------

Co-authored-by: Daria Fokina <daria.fokina@deepset.ai>
Co-authored-by: ZanSara <sara.zanzottera@deepset.ai>
This commit is contained in:
Vladimir Blagojevic 2023-08-09 18:14:04 +02:00 committed by GitHub
parent 52133d3a81
commit a75b9dd4bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 345 additions and 55 deletions

View File

@ -0,0 +1,34 @@
import os
from haystack.nodes import PromptNode, LinkContentFetcher, PromptTemplate
from haystack import Pipeline
anthropic_key = os.environ.get("ANTHROPIC_API_KEY")
if not anthropic_key:
raise ValueError("Please set the ANTHROPIC_API_KEY environment variable")
alt_user_agents = [
"Mozilla/5.0 (Macintosh; Intel Mac OS X 13_4_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5 Safari/605.1.15"
]
retriever = LinkContentFetcher(user_agents=alt_user_agents)
pt = PromptTemplate(
"Given the content below, create a summary consisting of three sections: Objectives, "
"Implementation and Learnings/Conclusions.\n"
"Each section should have at least three bullet points. \n"
"In the content below disregard References section.\n\n: {documents}"
)
prompt_node = PromptNode(
"claude-instant-1", api_key=anthropic_key, max_length=512, default_prompt_template=pt, model_kwargs={"stream": True}
)
pipeline = Pipeline()
pipeline.add_node(component=retriever, name="Retriever", inputs=["Query"])
pipeline.add_node(component=prompt_node, name="PromptNode", inputs=["Retriever"])
research_papers = ["https://arxiv.org/pdf/2307.03172.pdf", "https://arxiv.org/pdf/1706.03762.pdf"]
for research_paper in research_papers:
print(f"Research paper summary: {research_paper}")
pipeline.run(research_paper)
print("\n\n\n")

View File

@ -1,4 +1,7 @@
import inspect
import io
import logging
from collections import defaultdict
from datetime import datetime
from http import HTTPStatus
from typing import Optional, Dict, List, Union, Callable, Any, Tuple
@ -7,34 +10,42 @@ from urllib.parse import urlparse
import requests
from boilerpy3 import extractors
from requests import Response
from requests.exceptions import InvalidURL
from requests.exceptions import InvalidURL, HTTPError
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type, RetryCallState
from haystack import __version__
from haystack.lazy_imports import LazyImport
from haystack.nodes import PreProcessor, BaseComponent
from haystack.schema import Document, MultiLabel
logger = logging.getLogger(__name__)
with LazyImport("Run 'pip install farm-haystack[pdf]'") as fitz_import:
import fitz
def html_content_handler(response: Response, raise_on_failure: bool = False) -> Optional[str]:
def html_content_handler(response: Response) -> Optional[str]:
"""
Extracts content from the response text using the boilerpy3 extractor.
Extracts text from HTML response text using the boilerpy3 extractor.
:param response: Response object from the request.
:param raise_on_failure: A boolean indicating whether to raise an exception when a failure occurs
:return: The extracted text.
"""
extractor = extractors.ArticleExtractor(raise_on_failure=raise_on_failure)
content = ""
try:
content = extractor.get_content(response.text)
except Exception as e:
if raise_on_failure:
raise e
return content
extractor = extractors.ArticleExtractor(raise_on_failure=False)
return extractor.get_content(response.text)
def pdf_content_handler(response: Response, raise_on_failure: bool = False) -> Optional[str]:
# TODO: implement this
return None
def pdf_content_handler(response: Response) -> Optional[str]:
"""
Extracts text from PDF response stream using the PyMuPDF library.
:param response: Response object from the request.
:return: The extracted text.
"""
file_path = io.BytesIO(response.content)
with fitz.open(stream=file_path, filetype="pdf") as doc:
text = "\f".join([page.get_text() for page in doc])
return text.encode("ascii", errors="ignore").decode()
class LinkContentFetcher(BaseComponent):
@ -43,31 +54,112 @@ class LinkContentFetcher(BaseComponent):
LinkContentFetcher supports the following content types:
- HTML
- PDF
LinkContentFetcher offers a few options for customizing the content extraction process:
- content_handlers: A dictionary of content handlers to use for extracting content from a response.
- processor: PreProcessor to apply to the extracted text
- raise_on_failure: A boolean indicating whether to raise an exception when a failure occurs
One can use LinkContentFetcher as a standalone component or as part of a Pipeline. Here is an example of using
LinkContentFetcher as a standalone component:
```python
from haystack.nodes import LinkContentFetcher
from haystack.schema import Document
link_content_fetcher = LinkContentFetcher()
dl_wiki: List[Document] = link_content_fetcher.fetch(url="https://en.wikipedia.org/wiki/Deep_learning")
print(dl_wiki)
```
One can also use LinkContentFetcher as part of a Pipeline. Here is an example of using LinkContentFetcher as part
of a Pipeline:
```python
import os
from haystack.nodes import PromptNode, LinkContentFetcher, PromptTemplate
from haystack import Pipeline
anthropic_key = os.environ.get("ANTHROPIC_API_KEY")
if not anthropic_key:
raise ValueError("Please set the ANTHROPIC_API_KEY environment variable")
retriever = LinkContentFetcher() # optionally add additional user agents
pt = PromptTemplate(
"Given the content below, create a summary consisting of three sections: Objectives, "
"Implementation and Learnings/Conclusions.\n"
"Each section should have at least three bullet points. \n"
"In the content below disregard References section.\n\n: {documents}"
)
prompt_node = PromptNode("claude-instant-1",
api_key=anthropic_key,
max_length=512,
default_prompt_template=pt,
model_kwargs={"stream": True}
)
pipeline = Pipeline()
pipeline.add_node(component=retriever, name="Retriever", inputs=["Query"])
pipeline.add_node(component=prompt_node, name="PromptNode", inputs=["Retriever"])
research_papers = ["https://arxiv.org/pdf/2307.03172.pdf", "https://arxiv.org/pdf/1706.03762.pdf"]
for research_paper in research_papers:
print(f"Research paper summary: {research_paper}")
pipeline.run(research_paper)
print("\n\n\n")
"""
outgoing_edges = 1
REQUEST_HEADERS = {
_USER_AGENT = f"haystack/LinkContentRetriever/{__version__}"
_REQUEST_HEADERS = {
"accept": "*/*",
"User-Agent": f"haystack/LinkContentFetcher/{__version__}",
"User-Agent": _USER_AGENT,
"Accept-Language": "en-US,en;q=0.9,it;q=0.8,es;q=0.7",
"referer": "https://www.google.com/",
}
def __init__(self, processor: Optional[PreProcessor] = None, raise_on_failure: Optional[bool] = False):
def __init__(
self,
content_handlers: Optional[Dict[str, Callable]] = None,
processor: Optional[PreProcessor] = None,
raise_on_failure: Optional[bool] = False,
user_agents: Optional[List[str]] = None,
retry_attempts: Optional[int] = None,
):
"""
Creates a LinkContentFetcher instance.
:param content_handlers: A dictionary of content handlers to use for extracting content from a response.
:param processor: PreProcessor to apply to the extracted text
:param raise_on_failure: A boolean indicating whether to raise an exception when a failure occurs
during content extraction. If False, the error is simply logged and the program continues.
Defaults to False.
:param user_agents: A list of user agents to use when fetching content. Defaults to None.
:param retry_attempts: The number of times to retry fetching content. Defaults to 2.
"""
super().__init__()
self.processor = processor
self.raise_on_failure = raise_on_failure
self.handlers: Dict[str, Callable] = {"html": html_content_handler, "pdf": pdf_content_handler}
self.user_agents = user_agents or [LinkContentFetcher._USER_AGENT]
self.current_user_agent_idx: int = 0
self.retry_attempts = retry_attempts or 2
self.handlers: Dict[str, Callable] = defaultdict(lambda: html_content_handler)
# register default content handlers
self._register_content_handler("text/html", html_content_handler)
if fitz_import.is_successful():
self._register_content_handler("application/pdf", pdf_content_handler)
# register custom content handlers, can override default handlers
if content_handlers:
for content_type, handler in content_handlers.items():
self._register_content_handler(content_type, handler)
def fetch(self, url: str, timeout: Optional[int] = 3, doc_kwargs: Optional[dict] = None) -> List[Document]:
"""
@ -79,7 +171,7 @@ class LinkContentFetcher(BaseComponent):
:param doc_kwargs: Optional kwargs to pass to the Document constructor.
:return: List of Document objects or an empty list if no content is extracted.
"""
if not url or not self._is_valid_url(url):
if not self._is_valid_url(url):
raise InvalidURL("Invalid or missing URL: {}".format(url))
doc_kwargs = doc_kwargs or {}
@ -87,31 +179,30 @@ class LinkContentFetcher(BaseComponent):
"meta": {"url": url, "timestamp": int(datetime.utcnow().timestamp())}
}
extracted_doc.update(doc_kwargs)
response = self._get_response(url, timeout=timeout)
has_content = response.status_code == HTTPStatus.OK and response.text
response = self._get_response(url, timeout=timeout or 3)
has_content = response.status_code == HTTPStatus.OK and (response.text or response.content)
fetched_documents = []
if has_content:
handler = "html" # will handle non-HTML content types soon, add content type resolution here
if handler in self.handlers:
extracted_content = self.handlers[handler](response, self.raise_on_failure)
if extracted_content:
extracted_doc["content"] = extracted_content
logger.debug("%s handler extracted content from %s", handler, url)
else:
logger.warning("%s handler failed to extract content from %s", handler, url)
# perhaps we have a snippet from web search, if so, use it as content
snippet_text = extracted_doc.get("snippet_text", "")
if snippet_text:
extracted_doc["content"] = snippet_text
# if we get here, we have a valid response, let's try to extract content
# using the registered content handler
extracted_content: str = ""
handler: Callable = self._get_content_type_handler(response.headers.get("Content-Type", ""))
try:
extracted_content = handler(response)
except Exception as e:
if self.raise_on_failure:
raise e
logger.warning("failed to extract content from %s", response.url)
content = extracted_content or extracted_doc.get("snippet_text", "") # fallback to snippet_text
if not content:
return []
if extracted_content:
logger.debug("%s handler extracted content from %s", handler, url)
if "content" in extracted_doc:
document = Document.from_dict(extracted_doc)
extracted_doc["content"] = content
document = Document.from_dict(extracted_doc)
fetched_documents = self.processor.process(documents=[document]) if self.processor else [document]
if self.processor:
fetched_documents = self.processor.process(documents=[document])
else:
fetched_documents = [document]
return fetched_documents
def run(
@ -126,8 +217,7 @@ class LinkContentFetcher(BaseComponent):
Fetches content from a URL specified by query parameter and converts it into a list of Document objects.
param query: The query - a URL to fetch content from.
param filters: Not used.
param top_k: Not used.
param file_paths: Not used.
param labels: Not used.
param documents: Not used.
param meta: Not used.
@ -153,7 +243,6 @@ class LinkContentFetcher(BaseComponent):
Takes a list of queries, where each query is expected to be a URL. For each query, the method
fetches content from the specified URL and transforms it into a list of Document objects. The output is a list
of these document lists, where each individual list of Document objects corresponds to the content retrieved
from a specific query URL.
param queries: List of queries - URLs to fetch content from.
param file_paths: Not used.
@ -177,24 +266,79 @@ class LinkContentFetcher(BaseComponent):
return {"documents": results}, "output_1"
def _get_response(self, url: str, timeout: Optional[int]) -> requests.Response:
def _register_content_handler(self, content_type: str, handler: Callable):
"""
Register a new content handler for a specific content type.
If a handler for the given content type already exists, it will be overridden.
:param content_type: The content type for which the handler should be used.
:param handler: The handler function. This function should accept a requests.Response object parameter,
and return the extracted text (or None).
"""
if not callable(handler):
raise ValueError(f"handler must be a callable, but got {type(handler).__name__}")
params = inspect.signature(handler).parameters
if len(params) != 1 or list(params.keys()) != ["response"]:
raise ValueError(f"{content_type} handler must accept 'response: requests.Response' as a single parameter")
self.handlers[content_type] = handler
def _get_response(self, url: str, timeout: Optional[int] = None) -> requests.Response:
"""
Fetches content from a URL. Returns a response object.
:param url: The URL to fetch content from.
:param timeout: The timeout in seconds.
:return: A response object.
"""
@retry(
# we want to reraise the exception if we fail after the last self.retry_attempts
# then we can catch it in the outer try/except block, see below
reraise=True,
stop=stop_after_attempt(self.retry_attempts),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=(retry_if_exception_type((HTTPError, requests.RequestException))),
# This method is invoked only after failed requests (exception raised)
after=self._switch_user_agent,
)
def _request():
# we need a request copy because we modify the headers
headers = self._REQUEST_HEADERS.copy()
headers["User-Agent"] = self.user_agents[self.current_user_agent_idx]
r = requests.get(url, headers=headers, timeout=timeout or 3)
r.raise_for_status()
return r
try:
response = requests.get(url, headers=LinkContentFetcher.REQUEST_HEADERS, timeout=timeout)
response.raise_for_status()
response = _request()
except Exception as e:
# catch all exceptions including HTTPError and RequestException
if self.raise_on_failure:
raise e
# if we don't raise on failure, log it, and return a response object
logger.warning("Couldn't retrieve content from %s", url)
response = requests.Response()
finally:
self.current_user_agent_idx = 0
return response
def _get_content_type_handler(self, content_type: str) -> Callable:
"""
Get the appropriate content handler based on the content type.
:param content_type: The content type of the response.
:return: The matching content handler callable or the default html_content_handler if no match is found.
"""
content_type_lookup: str = (content_type or "").split(";")[0]
return self.handlers[content_type_lookup]
def _switch_user_agent(self, retry_state: RetryCallState) -> None:
"""
Switches the User-Agent for this LinkContentRetriever to the next one in the list of user agents.
:param retry_state: The retry state (unused, required by tenacity).
"""
self.current_user_agent_idx = (self.current_user_agent_idx + 1) % len(self.user_agents)
def _is_valid_url(self, url: str) -> bool:
"""
Checks if a URL is valid.

View File

@ -0,0 +1,8 @@
---
enhancements:
- |
Several minor enhancements for LinkContentFetcher:
- Dynamic content handler resolution
- Custom User-Agent header (optional, minimize blocking)
- PDF support
- Register new content handlers

View File

@ -1,3 +1,4 @@
from typing import Optional
from unittest.mock import Mock, patch
import logging
import pytest
@ -15,6 +16,7 @@ def mocked_requests():
mock_requests.get.return_value = mock_response
mock_response.status_code = 200
mock_response.text = "Sample content from webpage"
mock_response.headers = {"Content-Type": "text/html"}
yield mock_requests
@ -33,7 +35,8 @@ def test_init():
assert r.processor is None
assert isinstance(r.handlers, dict)
assert "html" in r.handlers
assert "text/html" in r.handlers
assert "application/pdf" in r.handlers
@pytest.mark.unit
@ -45,7 +48,56 @@ def test_init_with_preprocessor():
r = LinkContentFetcher(processor=pre_processor_mock)
assert r.processor == pre_processor_mock
assert isinstance(r.handlers, dict)
assert "html" in r.handlers
assert "text/html" in r.handlers
assert "application/pdf" in r.handlers
@pytest.mark.unit
def test_init_with_content_handlers():
"""
Checks the initialization of the LinkContentFetcher content handlers.
"""
def fake_but_valid_video_content_handler(response: Response) -> Optional[str]:
pass
r = LinkContentFetcher(content_handlers={"video/mp4": fake_but_valid_video_content_handler})
assert isinstance(r.handlers, dict)
assert "text/html" in r.handlers
assert "application/pdf" in r.handlers
assert "video/mp4" in r.handlers
@pytest.mark.unit
def test_init_with_content_handlers_override():
"""
Checks the initialization of the LinkContentFetcher content handlers but with pdf handler overridden.
"""
def new_pdf_content_handler(response: Response) -> Optional[str]:
pass
r = LinkContentFetcher(content_handlers={"application/pdf": new_pdf_content_handler})
assert isinstance(r.handlers, dict)
assert "text/html" in r.handlers
assert "application/pdf" in r.handlers
assert r.handlers["application/pdf"] == new_pdf_content_handler
@pytest.mark.unit
def test_init_with_invalid_content_handlers():
"""
Checks the initialization of the LinkContentFetcher content handlers fails with invalid content handlers.
"""
# invalid because it does not have the correct signature
def invalid_video_content_handler() -> Optional[str]:
pass
with pytest.raises(ValueError, match="handler must accept"):
LinkContentFetcher(content_handlers={"video/mp4": invalid_video_content_handler})
@pytest.mark.unit
@ -58,7 +110,7 @@ def test_fetch(mocked_requests, mocked_article_extractor):
pre_processor_mock = Mock()
pre_processor_mock.process.return_value = [Document("Sample content from webpage")]
r = LinkContentFetcher(pre_processor_mock)
r = LinkContentFetcher(processor=pre_processor_mock)
result = r.fetch(url=url, doc_kwargs={"text": "Sample content from webpage"})
assert len(result) == 1
@ -120,7 +172,7 @@ def test_fetch_correct_arguments(mocked_requests, mocked_article_extractor):
args, kwargs = mocked_requests.get.call_args
assert args[0] == url
assert kwargs["timeout"] == 3
assert kwargs["headers"] == r.REQUEST_HEADERS
assert kwargs["headers"] == r._REQUEST_HEADERS
# another variant
url = "https://deepset.ai"
@ -129,7 +181,7 @@ def test_fetch_correct_arguments(mocked_requests, mocked_article_extractor):
args, kwargs = mocked_requests.get.call_args
assert args[0] == url
assert kwargs["timeout"] == 10
assert kwargs["headers"] == r.REQUEST_HEADERS
assert kwargs["headers"] == r._REQUEST_HEADERS
@pytest.mark.unit
@ -309,6 +361,44 @@ def test_is_invalid_url():
assert not retriever._is_valid_url(url), f"Expected {url} to be invalid"
@pytest.mark.unit
def test_switch_user_agent_on_failed_request():
"""
Test that LinkContentFetcher switches user agents on failed requests
"""
url = "http://fakeurl.com"
retry_attempts = 2
lc = LinkContentFetcher(user_agents=["ua1", "ua2"], retry_attempts=retry_attempts)
with patch("haystack.nodes.retriever.link_content.requests.get") as mocked_get:
mocked_get.return_value.raise_for_status.side_effect = requests.HTTPError()
lc._get_response(url)
assert mocked_get.call_count == retry_attempts
assert mocked_get.call_args_list[0][1]["headers"]["User-Agent"] == "ua1"
assert mocked_get.call_args_list[1][1]["headers"]["User-Agent"] == "ua2"
@pytest.mark.unit
def test_valid_requests_dont_switch_agent(mocked_requests):
"""
Test that LinkContentFetcher doesn't switch user agents on valid requests
"""
lcf = LinkContentFetcher()
# Make first valid request
lcf._get_response("http://example.com")
# Make second valid request
lcf._get_response("http://example.com")
# Assert that requests.get was called twice with the same default user agents
assert mocked_requests.get.call_count == 2
assert (
mocked_requests.get.call_args_list[0][1]["headers"]["User-Agent"]
== mocked_requests.get.call_args_list[1][1]["headers"]["User-Agent"]
)
@pytest.mark.integration
def test_call_with_valid_url_on_live_web():
"""

View File

@ -6,6 +6,7 @@ import pytest
from haystack import Document, Pipeline
from haystack.document_stores.base import BaseDocumentStore
from haystack.nodes import WebRetriever, PromptNode
from haystack.nodes.retriever.link_content import html_content_handler
from haystack.nodes.preprocessor import PreProcessor
from haystack.nodes.retriever.web import SearchResult
from test.nodes.conftest import example_serperdev_response
@ -27,6 +28,15 @@ def mocked_article_extractor():
yield
@pytest.fixture
def mocked_link_content_fetcher_handler_type():
with patch(
"haystack.nodes.retriever.link_content.LinkContentFetcher._get_content_type_handler",
return_value=html_content_handler,
):
yield
@pytest.mark.unit
def test_init_default_parameters():
retriever = WebRetriever(api_key="test_key")
@ -121,7 +131,9 @@ def test_scrape_links_empty_list():
@pytest.mark.unit
def test_scrape_links_with_search_results(mocked_requests, mocked_article_extractor):
def test_scrape_links_with_search_results(
mocked_requests, mocked_article_extractor, mocked_link_content_fetcher_handler_type
):
wr = WebRetriever(api_key="fake_key")
sr1 = SearchResult("https://pagesix.com", "Some text", "0.43", "1")
@ -136,7 +148,9 @@ def test_scrape_links_with_search_results(mocked_requests, mocked_article_extrac
@pytest.mark.unit
def test_scrape_links_with_search_results_with_preprocessor(mocked_requests, mocked_article_extractor):
def test_scrape_links_with_search_results_with_preprocessor(
mocked_requests, mocked_article_extractor, mocked_link_content_fetcher_handler_type
):
wr = WebRetriever(api_key="fake_key", mode="preprocessed_documents")
preprocessor = PreProcessor(progress_bar=False)