From a75b9dd4bb257dfeb59e68127dce80324155a992 Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Wed, 9 Aug 2023 18:14:04 +0200 Subject: [PATCH] feat: LinkContentFetcher - add content-type resolution, user agent switching, PDF handler (#5374) * Add content type resolution, pdf handler, user agent switching --------- Co-authored-by: Daria Fokina Co-authored-by: ZanSara --- examples/arxiv_paper_summary.py | 34 +++ haystack/nodes/retriever/link_content.py | 240 ++++++++++++++---- ...link-content-fetcher-512d039e3c7684f1.yaml | 8 + test/nodes/test_link_content_fetcher.py | 100 +++++++- test/nodes/test_web_retriever.py | 18 +- 5 files changed, 345 insertions(+), 55 deletions(-) create mode 100644 examples/arxiv_paper_summary.py create mode 100644 releasenotes/notes/improve-link-content-fetcher-512d039e3c7684f1.yaml diff --git a/examples/arxiv_paper_summary.py b/examples/arxiv_paper_summary.py new file mode 100644 index 000000000..ca92d2f1c --- /dev/null +++ b/examples/arxiv_paper_summary.py @@ -0,0 +1,34 @@ +import os +from haystack.nodes import PromptNode, LinkContentFetcher, PromptTemplate +from haystack import Pipeline + +anthropic_key = os.environ.get("ANTHROPIC_API_KEY") +if not anthropic_key: + raise ValueError("Please set the ANTHROPIC_API_KEY environment variable") + +alt_user_agents = [ + "Mozilla/5.0 (Macintosh; Intel Mac OS X 13_4_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5 Safari/605.1.15" +] + +retriever = LinkContentFetcher(user_agents=alt_user_agents) +pt = PromptTemplate( + "Given the content below, create a summary consisting of three sections: Objectives, " + "Implementation and Learnings/Conclusions.\n" + "Each section should have at least three bullet points. \n" + "In the content below disregard References section.\n\n: {documents}" +) + +prompt_node = PromptNode( + "claude-instant-1", api_key=anthropic_key, max_length=512, default_prompt_template=pt, model_kwargs={"stream": True} +) + +pipeline = Pipeline() +pipeline.add_node(component=retriever, name="Retriever", inputs=["Query"]) +pipeline.add_node(component=prompt_node, name="PromptNode", inputs=["Retriever"]) + +research_papers = ["https://arxiv.org/pdf/2307.03172.pdf", "https://arxiv.org/pdf/1706.03762.pdf"] + +for research_paper in research_papers: + print(f"Research paper summary: {research_paper}") + pipeline.run(research_paper) + print("\n\n\n") diff --git a/haystack/nodes/retriever/link_content.py b/haystack/nodes/retriever/link_content.py index 8edd94f2e..fa984254b 100644 --- a/haystack/nodes/retriever/link_content.py +++ b/haystack/nodes/retriever/link_content.py @@ -1,4 +1,7 @@ +import inspect +import io import logging +from collections import defaultdict from datetime import datetime from http import HTTPStatus from typing import Optional, Dict, List, Union, Callable, Any, Tuple @@ -7,34 +10,42 @@ from urllib.parse import urlparse import requests from boilerpy3 import extractors from requests import Response -from requests.exceptions import InvalidURL +from requests.exceptions import InvalidURL, HTTPError +from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type, RetryCallState from haystack import __version__ +from haystack.lazy_imports import LazyImport from haystack.nodes import PreProcessor, BaseComponent from haystack.schema import Document, MultiLabel logger = logging.getLogger(__name__) +with LazyImport("Run 'pip install farm-haystack[pdf]'") as fitz_import: + import fitz -def html_content_handler(response: Response, raise_on_failure: bool = False) -> Optional[str]: + +def html_content_handler(response: Response) -> Optional[str]: """ - Extracts content from the response text using the boilerpy3 extractor. + Extracts text from HTML response text using the boilerpy3 extractor. :param response: Response object from the request. - :param raise_on_failure: A boolean indicating whether to raise an exception when a failure occurs + :return: The extracted text. """ - extractor = extractors.ArticleExtractor(raise_on_failure=raise_on_failure) - content = "" - try: - content = extractor.get_content(response.text) - except Exception as e: - if raise_on_failure: - raise e - return content + extractor = extractors.ArticleExtractor(raise_on_failure=False) + return extractor.get_content(response.text) -def pdf_content_handler(response: Response, raise_on_failure: bool = False) -> Optional[str]: - # TODO: implement this - return None +def pdf_content_handler(response: Response) -> Optional[str]: + """ + Extracts text from PDF response stream using the PyMuPDF library. + + :param response: Response object from the request. + :return: The extracted text. + """ + file_path = io.BytesIO(response.content) + with fitz.open(stream=file_path, filetype="pdf") as doc: + text = "\f".join([page.get_text() for page in doc]) + + return text.encode("ascii", errors="ignore").decode() class LinkContentFetcher(BaseComponent): @@ -43,31 +54,112 @@ class LinkContentFetcher(BaseComponent): LinkContentFetcher supports the following content types: - HTML + - PDF + LinkContentFetcher offers a few options for customizing the content extraction process: + - content_handlers: A dictionary of content handlers to use for extracting content from a response. + - processor: PreProcessor to apply to the extracted text + - raise_on_failure: A boolean indicating whether to raise an exception when a failure occurs + + One can use LinkContentFetcher as a standalone component or as part of a Pipeline. Here is an example of using + LinkContentFetcher as a standalone component: + + ```python + from haystack.nodes import LinkContentFetcher + from haystack.schema import Document + + link_content_fetcher = LinkContentFetcher() + dl_wiki: List[Document] = link_content_fetcher.fetch(url="https://en.wikipedia.org/wiki/Deep_learning") + print(dl_wiki) + ``` + + One can also use LinkContentFetcher as part of a Pipeline. Here is an example of using LinkContentFetcher as part + of a Pipeline: + + ```python + import os + from haystack.nodes import PromptNode, LinkContentFetcher, PromptTemplate + from haystack import Pipeline + + anthropic_key = os.environ.get("ANTHROPIC_API_KEY") + if not anthropic_key: + raise ValueError("Please set the ANTHROPIC_API_KEY environment variable") + + + retriever = LinkContentFetcher() # optionally add additional user agents + pt = PromptTemplate( + "Given the content below, create a summary consisting of three sections: Objectives, " + "Implementation and Learnings/Conclusions.\n" + "Each section should have at least three bullet points. \n" + "In the content below disregard References section.\n\n: {documents}" + ) + + prompt_node = PromptNode("claude-instant-1", + api_key=anthropic_key, + max_length=512, + default_prompt_template=pt, + model_kwargs={"stream": True} + ) + + pipeline = Pipeline() + pipeline.add_node(component=retriever, name="Retriever", inputs=["Query"]) + pipeline.add_node(component=prompt_node, name="PromptNode", inputs=["Retriever"]) + + research_papers = ["https://arxiv.org/pdf/2307.03172.pdf", "https://arxiv.org/pdf/1706.03762.pdf"] + + for research_paper in research_papers: + print(f"Research paper summary: {research_paper}") + pipeline.run(research_paper) + print("\n\n\n") """ outgoing_edges = 1 - REQUEST_HEADERS = { + _USER_AGENT = f"haystack/LinkContentRetriever/{__version__}" + + _REQUEST_HEADERS = { "accept": "*/*", - "User-Agent": f"haystack/LinkContentFetcher/{__version__}", + "User-Agent": _USER_AGENT, "Accept-Language": "en-US,en;q=0.9,it;q=0.8,es;q=0.7", "referer": "https://www.google.com/", } - def __init__(self, processor: Optional[PreProcessor] = None, raise_on_failure: Optional[bool] = False): + def __init__( + self, + content_handlers: Optional[Dict[str, Callable]] = None, + processor: Optional[PreProcessor] = None, + raise_on_failure: Optional[bool] = False, + user_agents: Optional[List[str]] = None, + retry_attempts: Optional[int] = None, + ): """ Creates a LinkContentFetcher instance. + :param content_handlers: A dictionary of content handlers to use for extracting content from a response. :param processor: PreProcessor to apply to the extracted text :param raise_on_failure: A boolean indicating whether to raise an exception when a failure occurs during content extraction. If False, the error is simply logged and the program continues. Defaults to False. + :param user_agents: A list of user agents to use when fetching content. Defaults to None. + :param retry_attempts: The number of times to retry fetching content. Defaults to 2. """ super().__init__() self.processor = processor self.raise_on_failure = raise_on_failure - self.handlers: Dict[str, Callable] = {"html": html_content_handler, "pdf": pdf_content_handler} + self.user_agents = user_agents or [LinkContentFetcher._USER_AGENT] + self.current_user_agent_idx: int = 0 + self.retry_attempts = retry_attempts or 2 + self.handlers: Dict[str, Callable] = defaultdict(lambda: html_content_handler) + + # register default content handlers + self._register_content_handler("text/html", html_content_handler) + if fitz_import.is_successful(): + self._register_content_handler("application/pdf", pdf_content_handler) + + # register custom content handlers, can override default handlers + if content_handlers: + for content_type, handler in content_handlers.items(): + self._register_content_handler(content_type, handler) def fetch(self, url: str, timeout: Optional[int] = 3, doc_kwargs: Optional[dict] = None) -> List[Document]: """ @@ -79,7 +171,7 @@ class LinkContentFetcher(BaseComponent): :param doc_kwargs: Optional kwargs to pass to the Document constructor. :return: List of Document objects or an empty list if no content is extracted. """ - if not url or not self._is_valid_url(url): + if not self._is_valid_url(url): raise InvalidURL("Invalid or missing URL: {}".format(url)) doc_kwargs = doc_kwargs or {} @@ -87,31 +179,30 @@ class LinkContentFetcher(BaseComponent): "meta": {"url": url, "timestamp": int(datetime.utcnow().timestamp())} } extracted_doc.update(doc_kwargs) - - response = self._get_response(url, timeout=timeout) - has_content = response.status_code == HTTPStatus.OK and response.text + response = self._get_response(url, timeout=timeout or 3) + has_content = response.status_code == HTTPStatus.OK and (response.text or response.content) fetched_documents = [] if has_content: - handler = "html" # will handle non-HTML content types soon, add content type resolution here - if handler in self.handlers: - extracted_content = self.handlers[handler](response, self.raise_on_failure) - if extracted_content: - extracted_doc["content"] = extracted_content - logger.debug("%s handler extracted content from %s", handler, url) - else: - logger.warning("%s handler failed to extract content from %s", handler, url) - # perhaps we have a snippet from web search, if so, use it as content - snippet_text = extracted_doc.get("snippet_text", "") - if snippet_text: - extracted_doc["content"] = snippet_text + # if we get here, we have a valid response, let's try to extract content + # using the registered content handler + extracted_content: str = "" + handler: Callable = self._get_content_type_handler(response.headers.get("Content-Type", "")) + try: + extracted_content = handler(response) + except Exception as e: + if self.raise_on_failure: + raise e + logger.warning("failed to extract content from %s", response.url) + content = extracted_content or extracted_doc.get("snippet_text", "") # fallback to snippet_text + if not content: + return [] + if extracted_content: + logger.debug("%s handler extracted content from %s", handler, url) - if "content" in extracted_doc: - document = Document.from_dict(extracted_doc) + extracted_doc["content"] = content + document = Document.from_dict(extracted_doc) + fetched_documents = self.processor.process(documents=[document]) if self.processor else [document] - if self.processor: - fetched_documents = self.processor.process(documents=[document]) - else: - fetched_documents = [document] return fetched_documents def run( @@ -126,8 +217,7 @@ class LinkContentFetcher(BaseComponent): Fetches content from a URL specified by query parameter and converts it into a list of Document objects. param query: The query - a URL to fetch content from. - param filters: Not used. - param top_k: Not used. + param file_paths: Not used. param labels: Not used. param documents: Not used. param meta: Not used. @@ -153,7 +243,6 @@ class LinkContentFetcher(BaseComponent): Takes a list of queries, where each query is expected to be a URL. For each query, the method fetches content from the specified URL and transforms it into a list of Document objects. The output is a list of these document lists, where each individual list of Document objects corresponds to the content retrieved - from a specific query URL. param queries: List of queries - URLs to fetch content from. param file_paths: Not used. @@ -177,24 +266,79 @@ class LinkContentFetcher(BaseComponent): return {"documents": results}, "output_1" - def _get_response(self, url: str, timeout: Optional[int]) -> requests.Response: + def _register_content_handler(self, content_type: str, handler: Callable): + """ + Register a new content handler for a specific content type. + If a handler for the given content type already exists, it will be overridden. + + :param content_type: The content type for which the handler should be used. + :param handler: The handler function. This function should accept a requests.Response object parameter, + and return the extracted text (or None). + """ + if not callable(handler): + raise ValueError(f"handler must be a callable, but got {type(handler).__name__}") + + params = inspect.signature(handler).parameters + if len(params) != 1 or list(params.keys()) != ["response"]: + raise ValueError(f"{content_type} handler must accept 'response: requests.Response' as a single parameter") + + self.handlers[content_type] = handler + + def _get_response(self, url: str, timeout: Optional[int] = None) -> requests.Response: """ Fetches content from a URL. Returns a response object. :param url: The URL to fetch content from. :param timeout: The timeout in seconds. :return: A response object. """ + + @retry( + # we want to reraise the exception if we fail after the last self.retry_attempts + # then we can catch it in the outer try/except block, see below + reraise=True, + stop=stop_after_attempt(self.retry_attempts), + wait=wait_exponential(multiplier=1, min=2, max=10), + retry=(retry_if_exception_type((HTTPError, requests.RequestException))), + # This method is invoked only after failed requests (exception raised) + after=self._switch_user_agent, + ) + def _request(): + # we need a request copy because we modify the headers + headers = self._REQUEST_HEADERS.copy() + headers["User-Agent"] = self.user_agents[self.current_user_agent_idx] + r = requests.get(url, headers=headers, timeout=timeout or 3) + r.raise_for_status() + return r + try: - response = requests.get(url, headers=LinkContentFetcher.REQUEST_HEADERS, timeout=timeout) - response.raise_for_status() + response = _request() except Exception as e: + # catch all exceptions including HTTPError and RequestException if self.raise_on_failure: raise e - + # if we don't raise on failure, log it, and return a response object logger.warning("Couldn't retrieve content from %s", url) response = requests.Response() + finally: + self.current_user_agent_idx = 0 return response + def _get_content_type_handler(self, content_type: str) -> Callable: + """ + Get the appropriate content handler based on the content type. + :param content_type: The content type of the response. + :return: The matching content handler callable or the default html_content_handler if no match is found. + """ + content_type_lookup: str = (content_type or "").split(";")[0] + return self.handlers[content_type_lookup] + + def _switch_user_agent(self, retry_state: RetryCallState) -> None: + """ + Switches the User-Agent for this LinkContentRetriever to the next one in the list of user agents. + :param retry_state: The retry state (unused, required by tenacity). + """ + self.current_user_agent_idx = (self.current_user_agent_idx + 1) % len(self.user_agents) + def _is_valid_url(self, url: str) -> bool: """ Checks if a URL is valid. diff --git a/releasenotes/notes/improve-link-content-fetcher-512d039e3c7684f1.yaml b/releasenotes/notes/improve-link-content-fetcher-512d039e3c7684f1.yaml new file mode 100644 index 000000000..afb84eae2 --- /dev/null +++ b/releasenotes/notes/improve-link-content-fetcher-512d039e3c7684f1.yaml @@ -0,0 +1,8 @@ +--- +enhancements: + - | + Several minor enhancements for LinkContentFetcher: + - Dynamic content handler resolution + - Custom User-Agent header (optional, minimize blocking) + - PDF support + - Register new content handlers diff --git a/test/nodes/test_link_content_fetcher.py b/test/nodes/test_link_content_fetcher.py index 9a3d4f648..fd0767af8 100644 --- a/test/nodes/test_link_content_fetcher.py +++ b/test/nodes/test_link_content_fetcher.py @@ -1,3 +1,4 @@ +from typing import Optional from unittest.mock import Mock, patch import logging import pytest @@ -15,6 +16,7 @@ def mocked_requests(): mock_requests.get.return_value = mock_response mock_response.status_code = 200 mock_response.text = "Sample content from webpage" + mock_response.headers = {"Content-Type": "text/html"} yield mock_requests @@ -33,7 +35,8 @@ def test_init(): assert r.processor is None assert isinstance(r.handlers, dict) - assert "html" in r.handlers + assert "text/html" in r.handlers + assert "application/pdf" in r.handlers @pytest.mark.unit @@ -45,7 +48,56 @@ def test_init_with_preprocessor(): r = LinkContentFetcher(processor=pre_processor_mock) assert r.processor == pre_processor_mock assert isinstance(r.handlers, dict) - assert "html" in r.handlers + assert "text/html" in r.handlers + assert "application/pdf" in r.handlers + + +@pytest.mark.unit +def test_init_with_content_handlers(): + """ + Checks the initialization of the LinkContentFetcher content handlers. + """ + + def fake_but_valid_video_content_handler(response: Response) -> Optional[str]: + pass + + r = LinkContentFetcher(content_handlers={"video/mp4": fake_but_valid_video_content_handler}) + + assert isinstance(r.handlers, dict) + assert "text/html" in r.handlers + assert "application/pdf" in r.handlers + assert "video/mp4" in r.handlers + + +@pytest.mark.unit +def test_init_with_content_handlers_override(): + """ + Checks the initialization of the LinkContentFetcher content handlers but with pdf handler overridden. + """ + + def new_pdf_content_handler(response: Response) -> Optional[str]: + pass + + r = LinkContentFetcher(content_handlers={"application/pdf": new_pdf_content_handler}) + + assert isinstance(r.handlers, dict) + assert "text/html" in r.handlers + assert "application/pdf" in r.handlers + assert r.handlers["application/pdf"] == new_pdf_content_handler + + +@pytest.mark.unit +def test_init_with_invalid_content_handlers(): + """ + Checks the initialization of the LinkContentFetcher content handlers fails with invalid content handlers. + """ + + # invalid because it does not have the correct signature + def invalid_video_content_handler() -> Optional[str]: + pass + + with pytest.raises(ValueError, match="handler must accept"): + LinkContentFetcher(content_handlers={"video/mp4": invalid_video_content_handler}) @pytest.mark.unit @@ -58,7 +110,7 @@ def test_fetch(mocked_requests, mocked_article_extractor): pre_processor_mock = Mock() pre_processor_mock.process.return_value = [Document("Sample content from webpage")] - r = LinkContentFetcher(pre_processor_mock) + r = LinkContentFetcher(processor=pre_processor_mock) result = r.fetch(url=url, doc_kwargs={"text": "Sample content from webpage"}) assert len(result) == 1 @@ -120,7 +172,7 @@ def test_fetch_correct_arguments(mocked_requests, mocked_article_extractor): args, kwargs = mocked_requests.get.call_args assert args[0] == url assert kwargs["timeout"] == 3 - assert kwargs["headers"] == r.REQUEST_HEADERS + assert kwargs["headers"] == r._REQUEST_HEADERS # another variant url = "https://deepset.ai" @@ -129,7 +181,7 @@ def test_fetch_correct_arguments(mocked_requests, mocked_article_extractor): args, kwargs = mocked_requests.get.call_args assert args[0] == url assert kwargs["timeout"] == 10 - assert kwargs["headers"] == r.REQUEST_HEADERS + assert kwargs["headers"] == r._REQUEST_HEADERS @pytest.mark.unit @@ -309,6 +361,44 @@ def test_is_invalid_url(): assert not retriever._is_valid_url(url), f"Expected {url} to be invalid" +@pytest.mark.unit +def test_switch_user_agent_on_failed_request(): + """ + Test that LinkContentFetcher switches user agents on failed requests + """ + url = "http://fakeurl.com" + retry_attempts = 2 + lc = LinkContentFetcher(user_agents=["ua1", "ua2"], retry_attempts=retry_attempts) + with patch("haystack.nodes.retriever.link_content.requests.get") as mocked_get: + mocked_get.return_value.raise_for_status.side_effect = requests.HTTPError() + lc._get_response(url) + + assert mocked_get.call_count == retry_attempts + assert mocked_get.call_args_list[0][1]["headers"]["User-Agent"] == "ua1" + assert mocked_get.call_args_list[1][1]["headers"]["User-Agent"] == "ua2" + + +@pytest.mark.unit +def test_valid_requests_dont_switch_agent(mocked_requests): + """ + Test that LinkContentFetcher doesn't switch user agents on valid requests + """ + lcf = LinkContentFetcher() + + # Make first valid request + lcf._get_response("http://example.com") + + # Make second valid request + lcf._get_response("http://example.com") + + # Assert that requests.get was called twice with the same default user agents + assert mocked_requests.get.call_count == 2 + assert ( + mocked_requests.get.call_args_list[0][1]["headers"]["User-Agent"] + == mocked_requests.get.call_args_list[1][1]["headers"]["User-Agent"] + ) + + @pytest.mark.integration def test_call_with_valid_url_on_live_web(): """ diff --git a/test/nodes/test_web_retriever.py b/test/nodes/test_web_retriever.py index 99abfc211..73961b33e 100644 --- a/test/nodes/test_web_retriever.py +++ b/test/nodes/test_web_retriever.py @@ -6,6 +6,7 @@ import pytest from haystack import Document, Pipeline from haystack.document_stores.base import BaseDocumentStore from haystack.nodes import WebRetriever, PromptNode +from haystack.nodes.retriever.link_content import html_content_handler from haystack.nodes.preprocessor import PreProcessor from haystack.nodes.retriever.web import SearchResult from test.nodes.conftest import example_serperdev_response @@ -27,6 +28,15 @@ def mocked_article_extractor(): yield +@pytest.fixture +def mocked_link_content_fetcher_handler_type(): + with patch( + "haystack.nodes.retriever.link_content.LinkContentFetcher._get_content_type_handler", + return_value=html_content_handler, + ): + yield + + @pytest.mark.unit def test_init_default_parameters(): retriever = WebRetriever(api_key="test_key") @@ -121,7 +131,9 @@ def test_scrape_links_empty_list(): @pytest.mark.unit -def test_scrape_links_with_search_results(mocked_requests, mocked_article_extractor): +def test_scrape_links_with_search_results( + mocked_requests, mocked_article_extractor, mocked_link_content_fetcher_handler_type +): wr = WebRetriever(api_key="fake_key") sr1 = SearchResult("https://pagesix.com", "Some text", "0.43", "1") @@ -136,7 +148,9 @@ def test_scrape_links_with_search_results(mocked_requests, mocked_article_extrac @pytest.mark.unit -def test_scrape_links_with_search_results_with_preprocessor(mocked_requests, mocked_article_extractor): +def test_scrape_links_with_search_results_with_preprocessor( + mocked_requests, mocked_article_extractor, mocked_link_content_fetcher_handler_type +): wr = WebRetriever(api_key="fake_key", mode="preprocessed_documents") preprocessor = PreProcessor(progress_bar=False)