mirror of
https://github.com/deepset-ai/haystack.git
synced 2026-01-06 12:07:04 +00:00
feat: Adjust LinkContentFetcher run method, use ByteStream (#5972)
This commit is contained in:
parent
c102b152dc
commit
6a50123b9f
@ -1,17 +1,16 @@
|
||||
import io
|
||||
import logging
|
||||
from collections import defaultdict
|
||||
from datetime import datetime
|
||||
from typing import Optional, Dict, List, Callable, Any, IO
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from typing import Any, Callable, Dict, List, Optional, Tuple
|
||||
|
||||
import requests
|
||||
from requests import Response
|
||||
from requests.exceptions import HTTPError
|
||||
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type, RetryCallState
|
||||
from haystack.preview import component, default_from_dict, default_to_dict
|
||||
from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt, wait_exponential
|
||||
|
||||
from haystack import __version__
|
||||
from haystack.preview import Document
|
||||
from haystack.preview import component, default_from_dict, default_to_dict
|
||||
from haystack.preview.dataclasses import ByteStream
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -26,26 +25,27 @@ REQUEST_HEADERS = {
|
||||
}
|
||||
|
||||
|
||||
def text_content_handler(response: Response) -> Dict[str, str]:
|
||||
def text_content_handler(response: Response) -> ByteStream:
|
||||
"""
|
||||
:param response: Response object from the request.
|
||||
:return: The extracted text.
|
||||
"""
|
||||
return {"text": response.text}
|
||||
return ByteStream.from_string(response.text)
|
||||
|
||||
|
||||
def binary_content_handler(response: Response) -> Dict[str, IO[bytes]]:
|
||||
def binary_content_handler(response: Response) -> ByteStream:
|
||||
"""
|
||||
:param response: Response object from the request.
|
||||
:return: The extracted binary file-like object.
|
||||
"""
|
||||
return {"blob": io.BytesIO(response.content)}
|
||||
return ByteStream(data=response.content)
|
||||
|
||||
|
||||
@component
|
||||
class LinkContentFetcher:
|
||||
"""
|
||||
LinkContentFetcher fetches content from a URL link and converts it to a Document object.
|
||||
LinkContentFetcher is a component for fetching and extracting content from URLs. It supports handling various
|
||||
content types, retries on failures, and automatic user-agent rotation for failed web requests.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@ -56,15 +56,13 @@ class LinkContentFetcher:
|
||||
timeout: int = 3,
|
||||
):
|
||||
"""
|
||||
Creates a LinkContentFetcher instance.
|
||||
Initializes a LinkContentFetcher instance.
|
||||
|
||||
:param raise_on_failure: A boolean indicating whether to raise an exception when a failure occurs
|
||||
during content extraction. If False, the error is simply logged and the program continues.
|
||||
Defaults to False.
|
||||
:param user_agents: A list of user agents to use when fetching content. Defaults to None, in which case a
|
||||
default user agent is used.
|
||||
:param retry_attempts: The number of times to retry fetching content. Defaults to 2.
|
||||
:param timeout: The timeout in seconds for the request. Defaults to 3.
|
||||
:param raise_on_failure: If True, raises an exception on failure when fetching a single URL.
|
||||
For multiple URLs, errors are logged and successful fetches are returned. Default is True.
|
||||
:param user_agents: A list of user agents for fetching content. If None, a default user agent is used.
|
||||
:param retry_attempts: Number of retry attempts for fetching content. Default is 2.
|
||||
:param timeout: Timeout in seconds for the request. Default is 3.
|
||||
"""
|
||||
self.raise_on_failure = raise_on_failure
|
||||
self.user_agents = user_agents or [DEFAULT_USER_AGENT]
|
||||
@ -73,7 +71,7 @@ class LinkContentFetcher:
|
||||
self.timeout = timeout
|
||||
|
||||
# register default content handlers that extract data from the response
|
||||
self.handlers: Dict[str, Callable[[Response], Dict[str, Any]]] = defaultdict(lambda: text_content_handler)
|
||||
self.handlers: Dict[str, Callable[[Response], ByteStream]] = defaultdict(lambda: text_content_handler)
|
||||
self.handlers["text/html"] = text_content_handler
|
||||
self.handlers["text/plain"] = text_content_handler
|
||||
self.handlers["application/pdf"] = binary_content_handler
|
||||
@ -116,37 +114,96 @@ class LinkContentFetcher:
|
||||
"""
|
||||
return default_from_dict(cls, data)
|
||||
|
||||
@component.output_types(documents=Optional[Document])
|
||||
def run(self, url: str):
|
||||
@component.output_types(streams=List[ByteStream])
|
||||
def run(self, urls: List[str]):
|
||||
"""
|
||||
Fetches content from a URL and converts it to a Document objects. If no content is extracted,
|
||||
an empty Document object is returned (if raise_on_failure is False).
|
||||
Fetches content from a list of URLs and returns a list of extracted content streams.
|
||||
Each content stream is a ByteStream object containing the extracted content as binary data.
|
||||
The content type of each stream is stored in the metadata of the ByteStream object under
|
||||
the key "content_type". The URL of the fetched content is stored under the key "url".
|
||||
|
||||
:param url: URL to fetch content from.
|
||||
:param timeout: Timeout in seconds for the request.
|
||||
:return: List of Document objects or an empty list if no content is extracted.
|
||||
:param urls: A list of URLs to fetch content from.
|
||||
:return: A lists of ByteStream objects representing the extracted content.
|
||||
|
||||
:raises: If the provided list of URLs contains only a single URL, and `raise_on_failure` is set to True,
|
||||
an exception will be raised in case of an error during content retrieval. In all other scenarios, any
|
||||
retrieval errors are logged, and a list of successfully retrieved ByteStream objects is returned.
|
||||
"""
|
||||
document_data: Dict[str, Any] = {"metadata": {"url": url, "timestamp": int(datetime.utcnow().timestamp())}}
|
||||
streams: List[ByteStream] = []
|
||||
if not urls:
|
||||
return {"streams": streams}
|
||||
|
||||
# don't use multithreading if there's only one URL
|
||||
if len(urls) == 1:
|
||||
stream_metadata, stream = self.fetch(urls[0])
|
||||
stream.metadata.update(stream_metadata)
|
||||
streams.append(stream)
|
||||
else:
|
||||
with ThreadPoolExecutor() as executor:
|
||||
results = executor.map(self._fetch_with_exception_suppression, urls)
|
||||
|
||||
for stream_metadata, stream in results: # type: ignore
|
||||
if stream_metadata is not None and stream is not None:
|
||||
stream.metadata.update(stream_metadata)
|
||||
streams.append(stream)
|
||||
|
||||
return {"streams": streams}
|
||||
|
||||
def fetch(self, url: str) -> Tuple[Dict[str, str], ByteStream]:
|
||||
"""
|
||||
Fetches content from a URL and returns it as a ByteStream.
|
||||
|
||||
:param url: The URL to fetch content from.
|
||||
:return: A tuple containing the ByteStream metadata dict and the corresponding ByteStream.
|
||||
ByteStream metadata contains the URL and the content type of the fetched content.
|
||||
The content type is a string indicating the type of content fetched (e.g., "text/html", "application/pdf").
|
||||
The ByteStream object contains the fetched content as binary data.
|
||||
|
||||
:raises: If an error occurs during content retrieval and `raise_on_failure` is set to True, this method will
|
||||
raise an exception. Otherwise, all fetching errors are logged, and an empty ByteStream is returned.
|
||||
|
||||
"""
|
||||
content_type: str = "text/html"
|
||||
stream: ByteStream = ByteStream(data=b"")
|
||||
try:
|
||||
response = self._get_response(url)
|
||||
content_type = self._get_content_type(response)
|
||||
document_data["mime_type"] = content_type
|
||||
handler: Callable = self.handlers[content_type]
|
||||
document_data.update(handler(response))
|
||||
return {"document": Document(**document_data)}
|
||||
|
||||
stream = handler(response)
|
||||
except Exception as e:
|
||||
if self.raise_on_failure:
|
||||
raise e
|
||||
logger.debug("Couldn't retrieve content from %s", url)
|
||||
return {"document": None}
|
||||
# less verbose log as this is expected to happen often (requests failing, blocked, etc.)
|
||||
logger.debug("Couldn't retrieve content from %s due to %s", url, str(e))
|
||||
|
||||
finally:
|
||||
self.current_user_agent_idx = 0
|
||||
|
||||
return {"content_type": content_type, "url": url}, stream
|
||||
|
||||
def _fetch_with_exception_suppression(self, url: str) -> Tuple[Optional[Dict[str, str]], Optional[ByteStream]]:
|
||||
"""
|
||||
Fetches content from a URL and returns it as a ByteStream.
|
||||
|
||||
If `raise_on_failure` is set to True, this method will wrap the fetch method and catch any exceptions.
|
||||
Otherwise, it will simply call the fetch method.
|
||||
:param url: The URL to fetch content from.
|
||||
:return: A tuple containing the ByteStream metadata dict and the corresponding ByteStream.
|
||||
|
||||
"""
|
||||
if self.raise_on_failure:
|
||||
try:
|
||||
return self.fetch(url)
|
||||
except Exception as e:
|
||||
logger.warning("Error fetching %s: %s", url, str(e))
|
||||
return {"content_type": "Unknown", "url": url}, None
|
||||
else:
|
||||
return self.fetch(url)
|
||||
|
||||
def _get_content_type(self, response: Response):
|
||||
"""
|
||||
Get the content type of the response.
|
||||
|
||||
:param response: The response object.
|
||||
:return: The content type of the response.
|
||||
"""
|
||||
@ -157,6 +214,7 @@ class LinkContentFetcher:
|
||||
"""
|
||||
Switches the User-Agent for this LinkContentRetriever to the next one in the list of user agents.
|
||||
Used by tenacity to retry the requests with a different user agent.
|
||||
|
||||
:param retry_state: The retry state (unused, required by tenacity).
|
||||
"""
|
||||
self.current_user_agent_idx = (self.current_user_agent_idx + 1) % len(self.user_agents)
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
---
|
||||
preview:
|
||||
- |
|
||||
Adds LinkContentFetcher component to Haystack 2.0. LinkContentFetcher fetches content from a given URL and
|
||||
converts it into a Document object, which can then be used within the Haystack 2.0 pipeline.
|
||||
Introduced the LinkContentFetcher in Haystack 2.0. This component fetches content from specified
|
||||
URLs and converts them into ByteStream objects for further processing in Haystack pipelines.
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
import io
|
||||
from unittest.mock import patch, Mock
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
from haystack.preview.components.fetchers.link_content import (
|
||||
LinkContentFetcher,
|
||||
@ -99,27 +99,29 @@ class TestLinkContentFetcher:
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_run_text(self):
|
||||
correct_response = b"Example test response"
|
||||
with patch("haystack.preview.components.fetchers.link_content.requests") as mock_run:
|
||||
mock_run.get.return_value = Mock(
|
||||
status_code=200, text="Example test response", headers={"Content-Type": "text/plain"}
|
||||
)
|
||||
fetcher = LinkContentFetcher()
|
||||
document = fetcher.run("https://www.example.com")["document"]
|
||||
assert document.text == "Example test response"
|
||||
assert document.metadata["url"] == "https://www.example.com"
|
||||
assert "timestamp" in document.metadata
|
||||
streams = fetcher.run(urls=["https://www.example.com"])["streams"]
|
||||
first_stream = streams[0]
|
||||
assert first_stream.data == correct_response
|
||||
assert first_stream.metadata["content_type"] == "text/plain"
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_run_html(self):
|
||||
correct_response = b"<h1>Example test response</h1>"
|
||||
with patch("haystack.preview.components.fetchers.link_content.requests") as mock_run:
|
||||
mock_run.get.return_value = Mock(
|
||||
status_code=200, text="<h1>Example test response</h1>", headers={"Content-Type": "text/html"}
|
||||
)
|
||||
fetcher = LinkContentFetcher()
|
||||
document = fetcher.run("https://www.example.com")["document"]
|
||||
assert document.text == "<h1>Example test response</h1>"
|
||||
assert document.metadata["url"] == "https://www.example.com"
|
||||
assert "timestamp" in document.metadata
|
||||
streams = fetcher.run(urls=["https://www.example.com"])["streams"]
|
||||
first_stream = streams[0]
|
||||
assert first_stream.data == correct_response
|
||||
assert first_stream.metadata["content_type"] == "text/html"
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_run_binary(self, test_files_path):
|
||||
@ -129,42 +131,104 @@ class TestLinkContentFetcher:
|
||||
status_code=200, content=file_bytes, headers={"Content-Type": "application/pdf"}
|
||||
)
|
||||
fetcher = LinkContentFetcher()
|
||||
document = fetcher.run("https://www.example.com")["document"]
|
||||
# casting to list to make the blobs comparable
|
||||
assert list(document.blob) == list(io.BytesIO(file_bytes))
|
||||
assert document.metadata["url"] == "https://www.example.com"
|
||||
assert "timestamp" in document.metadata
|
||||
streams = fetcher.run(urls=["https://www.example.com"])["streams"]
|
||||
first_stream = streams[0]
|
||||
assert first_stream.data == file_bytes
|
||||
assert first_stream.metadata["content_type"] == "application/pdf"
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_run_bad_status_code(self):
|
||||
empty_byte_stream = b""
|
||||
fetcher = LinkContentFetcher(raise_on_failure=False)
|
||||
mock_response = Mock(status_code=403)
|
||||
with patch("haystack.preview.components.fetchers.link_content.requests") as mock_run:
|
||||
mock_run.get.return_value = mock_response
|
||||
document = fetcher.run("https://www.example.com")["document"]
|
||||
assert document is None
|
||||
streams = fetcher.run(urls=["https://www.example.com"])["streams"]
|
||||
|
||||
# empty byte stream is returned because raise_on_failure is False
|
||||
assert len(streams) == 1
|
||||
first_stream = streams[0]
|
||||
assert first_stream.data == empty_byte_stream
|
||||
assert first_stream.metadata["content_type"] == "text/html"
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_link_content_fetcher_html(self):
|
||||
fetcher = LinkContentFetcher()
|
||||
document = fetcher.run(HTML_URL)["document"]
|
||||
assert document.mime_type == "text/html"
|
||||
assert "Introduction to Haystack" in document.text
|
||||
assert document.metadata["url"] == HTML_URL
|
||||
streams = fetcher.run([HTML_URL])["streams"]
|
||||
first_stream = streams[0]
|
||||
assert "Haystack" in first_stream.data.decode("utf-8")
|
||||
assert first_stream.metadata["content_type"] == "text/html"
|
||||
assert "url" in first_stream.metadata and first_stream.metadata["url"] == HTML_URL
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_link_content_fetcher_text(self):
|
||||
fetcher = LinkContentFetcher()
|
||||
document = fetcher.run(TEXT_URL)["document"]
|
||||
assert document.mime_type == "text/plain"
|
||||
assert "Haystack" in document.text
|
||||
assert document.metadata["url"] == TEXT_URL
|
||||
streams = fetcher.run([TEXT_URL])["streams"]
|
||||
first_stream = streams[0]
|
||||
assert "Haystack" in first_stream.data.decode("utf-8")
|
||||
assert first_stream.metadata["content_type"] == "text/plain"
|
||||
assert "url" in first_stream.metadata and first_stream.metadata["url"] == TEXT_URL
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_link_content_fetcher_pdf(self):
|
||||
fetcher = LinkContentFetcher()
|
||||
document = fetcher.run(PDF_URL)["document"]
|
||||
assert document.mime_type == "application/octet-stream" # FIXME Should be "application/pdf"?
|
||||
assert document.text is None
|
||||
assert document.blob is not None
|
||||
assert document.metadata["url"] == PDF_URL
|
||||
streams = fetcher.run([PDF_URL])["streams"]
|
||||
assert len(streams) == 1
|
||||
first_stream = streams[0]
|
||||
assert first_stream.metadata["content_type"] in ("application/octet-stream", "application/pdf")
|
||||
assert "url" in first_stream.metadata and first_stream.metadata["url"] == PDF_URL
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_link_content_fetcher_multiple_different_content_types(self):
|
||||
"""
|
||||
This test is to ensure that the fetcher can handle a list of URLs that contain different content types.
|
||||
"""
|
||||
fetcher = LinkContentFetcher()
|
||||
streams = fetcher.run([PDF_URL, HTML_URL])["streams"]
|
||||
assert len(streams) == 2
|
||||
for stream in streams:
|
||||
assert stream.metadata["content_type"] in ("text/html", "application/pdf", "application/octet-stream")
|
||||
if stream.metadata["content_type"] == "text/html":
|
||||
assert "Haystack" in stream.data.decode("utf-8")
|
||||
elif stream.metadata["content_type"] == "application/pdf":
|
||||
assert len(stream.data) > 0
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_link_content_fetcher_multiple_html_streams(self):
|
||||
"""
|
||||
This test is to ensure that the fetcher can handle a list of URLs that contain different content types,
|
||||
and that we have two html streams.
|
||||
"""
|
||||
|
||||
fetcher = LinkContentFetcher()
|
||||
streams = fetcher.run([PDF_URL, HTML_URL, "https://google.com"])["streams"]
|
||||
assert len(streams) == 3
|
||||
for stream in streams:
|
||||
assert stream.metadata["content_type"] in ("text/html", "application/pdf", "application/octet-stream")
|
||||
if stream.metadata["content_type"] == "text/html":
|
||||
assert "Haystack" in stream.data.decode("utf-8") or "Google" in stream.data.decode("utf-8")
|
||||
elif stream.metadata["content_type"] == "application/pdf":
|
||||
assert len(stream.data) > 0
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_mix_of_good_and_failed_requests(self):
|
||||
"""
|
||||
This test is to ensure that the fetcher can handle a list of URLs that contain URLs that fail to be fetched.
|
||||
In such a case, the fetcher should return the content of the URLs that were successfully fetched and not raise
|
||||
an exception.
|
||||
"""
|
||||
fetcher = LinkContentFetcher()
|
||||
result = fetcher.run(["https://non_existent_website_dot.com/", "https://www.google.com/"])
|
||||
assert len(result["streams"]) == 1
|
||||
first_stream = result["streams"][0]
|
||||
assert first_stream.metadata["content_type"] == "text/html"
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_bad_request_exception_raised(self):
|
||||
"""
|
||||
This test is to ensure that the fetcher raises an exception when a single bad request is made and it is configured to
|
||||
do so.
|
||||
"""
|
||||
fetcher = LinkContentFetcher()
|
||||
with pytest.raises(requests.exceptions.ConnectionError):
|
||||
fetcher.run(["https://non_existent_website_dot.com/"])
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user