feat: LinkContentFetcher - replace requests with httpx, add async and http/2 (#9034)

* LinkContentFetcher - replace requests with httpx, add async and http/2

* Update haystack/components/fetchers/link_content.py

Co-authored-by: Julian Risch <julian.risch@deepset.ai>

* Update haystack/components/fetchers/link_content.py

Co-authored-by: Julian Risch <julian.risch@deepset.ai>

* PR feedback

* Merge sync and async

---------

Co-authored-by: Julian Risch <julian.risch@deepset.ai>
This commit is contained in:
Vladimir Blagojevic 2025-03-26 07:55:08 -06:00 committed by GitHub
parent e64db61973
commit 13941d8bd9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 391 additions and 42 deletions

View File

@ -2,20 +2,24 @@
#
# SPDX-License-Identifier: Apache-2.0
import asyncio
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from fnmatch import fnmatch
from typing import Callable, Dict, List, Optional, Tuple
from typing import Callable, Dict, List, Optional, Tuple, cast
import requests
from requests import Response
from requests.exceptions import HTTPError
import httpx
from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt, wait_exponential
from haystack import component, logging
from haystack.dataclasses import ByteStream
from haystack.lazy_imports import LazyImport
from haystack.version import __version__
# HTTP/2 support via lazy import
with LazyImport("Run 'pip install httpx[http2]' to use HTTP/2 support") as h2_import:
pass # nothing to import as we simply set the http2 attribute, library handles the rest
logger = logging.getLogger(__name__)
@ -29,22 +33,22 @@ REQUEST_HEADERS = {
}
def _text_content_handler(response: Response) -> ByteStream:
def _text_content_handler(response: httpx.Response) -> ByteStream:
"""
Handles text content.
:param response: Response object from the request.
:return: The extracted text.
:returns: The extracted text.
"""
return ByteStream.from_string(response.text)
def _binary_content_handler(response: Response) -> ByteStream:
def _binary_content_handler(response: httpx.Response) -> ByteStream:
"""
Handles binary content.
:param response: Response object from the request.
:return: The extracted binary file-like object.
:returns: The extracted binary file-like object.
"""
return ByteStream(data=response.content)
@ -72,14 +76,30 @@ class LinkContentFetcher:
assert streams[0].meta == {'content_type': 'text/html', 'url': 'https://www.google.com'}
assert streams[0].data
```
For async usage:
```python
import asyncio
from haystack.components.fetchers import LinkContentFetcher
async def fetch_async():
fetcher = LinkContentFetcher()
result = await fetcher.run_async(urls=["https://www.google.com"])
return result["streams"]
streams = asyncio.run(fetch_async())
```
"""
def __init__(
def __init__( # pylint: disable=too-many-positional-arguments
self,
raise_on_failure: bool = True,
user_agents: Optional[List[str]] = None,
retry_attempts: int = 2,
timeout: int = 3,
http2: bool = False,
client_kwargs: Optional[Dict] = None,
):
"""
Initializes the component.
@ -90,15 +110,46 @@ class LinkContentFetcher:
for fetching content. If `None`, a default user agent is used.
:param retry_attempts: The number of times to retry to fetch the URL's content.
:param timeout: Timeout in seconds for the request.
:param http2: Whether to enable HTTP/2 support for requests. Defaults to False.
Requires the 'h2' package to be installed (via `pip install httpx[http2]`).
:param client_kwargs: Additional keyword arguments to pass to the httpx client.
If `None`, default values are used.
"""
self.raise_on_failure = raise_on_failure
self.user_agents = user_agents or [DEFAULT_USER_AGENT]
self.current_user_agent_idx: int = 0
self.retry_attempts = retry_attempts
self.timeout = timeout
self.http2 = http2
self.client_kwargs = client_kwargs or {}
# Configure default client settings
self.client_kwargs.setdefault("timeout", timeout)
self.client_kwargs.setdefault("follow_redirects", True)
# Create httpx clients
client_kwargs = {**self.client_kwargs}
# Optional HTTP/2 support
if http2:
try:
h2_import.check()
client_kwargs["http2"] = True
except ImportError:
logger.warning(
"HTTP/2 support requested but 'h2' package is not installed. "
"Falling back to HTTP/1.1. Install with `pip install httpx[http2]` to enable HTTP/2 support."
)
self.http2 = False # Update the setting to match actual capability
# Initialize synchronous client
self._client = httpx.Client(**client_kwargs)
# Initialize asynchronous client
self._async_client = httpx.AsyncClient(**client_kwargs)
# register default content handlers that extract data from the response
self.handlers: Dict[str, Callable[[Response], ByteStream]] = defaultdict(lambda: _text_content_handler)
self.handlers: Dict[str, Callable[[httpx.Response], ByteStream]] = defaultdict(lambda: _text_content_handler)
self.handlers["text/*"] = _text_content_handler
self.handlers["text/html"] = _binary_content_handler
self.handlers["application/json"] = _text_content_handler
@ -111,7 +162,7 @@ class LinkContentFetcher:
reraise=True,
stop=stop_after_attempt(self.retry_attempts),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=(retry_if_exception_type((HTTPError, requests.RequestException))),
retry=(retry_if_exception_type((httpx.HTTPStatusError, httpx.RequestError))),
# This method is invoked only after failed requests (exception raised)
after=self._switch_user_agent,
)
@ -119,12 +170,32 @@ class LinkContentFetcher:
# we need to copy because we modify the headers
headers = REQUEST_HEADERS.copy()
headers["User-Agent"] = self.user_agents[self.current_user_agent_idx]
response = requests.get(url, headers=headers, timeout=timeout or 3)
response = self._client.get(url, headers=headers)
response.raise_for_status()
return response
self._get_response: Callable = get_response
def __del__(self):
"""
Clean up resources when the component is deleted.
Closes both the synchronous and asynchronous HTTP clients to prevent
resource leaks.
"""
try:
# Close the synchronous client if it exists
if hasattr(self, "_client"):
self._client.close()
# Close the asynchronous client if it exists
if hasattr(self, "_async_client"):
# We can't await here in __del__, so we need to use close() instead of aclose()
self._async_client.close()
except Exception:
# Suppress any exceptions during cleanup
pass
@component.output_types(streams=List[ByteStream])
def run(self, urls: List[str]):
"""
@ -136,7 +207,7 @@ class LinkContentFetcher:
the key "content_type". The URL of the fetched content is stored under the key "url".
:param urls: A list of URLs to fetch content from.
:return: `ByteStream` objects representing the extracted content.
:returns: `ByteStream` objects representing the extracted content.
:raises Exception: If the provided list of URLs contains only a single URL, and `raise_on_failure` is set to
`True`, an exception will be raised in case of an error during content retrieval.
@ -165,12 +236,57 @@ class LinkContentFetcher:
return {"streams": streams}
@component.output_types(streams=List[ByteStream])
async def run_async(self, urls: List[str]):
"""
Asynchronously fetches content from a list of URLs and returns a list of extracted content streams.
This is the asynchronous version of the `run` method with the same parameters and return values.
:param urls: A list of URLs to fetch content from.
:returns: `ByteStream` objects representing the extracted content.
"""
streams: List[ByteStream] = []
if not urls:
return {"streams": streams}
# Create tasks for all URLs using _fetch_async directly
tasks = [self._fetch_async(url, self._async_client) for url in urls]
# Only capture exceptions when we have multiple URLs or raise_on_failure=False
# This ensures errors propagate appropriately for single URLs with raise_on_failure=True
return_exceptions = not (len(urls) == 1 and self.raise_on_failure)
results = await asyncio.gather(*tasks, return_exceptions=return_exceptions)
# Process results
for i, result in enumerate(results):
# Handle exception results (only happens when return_exceptions=True)
if isinstance(result, Exception):
logger.warning("Error fetching {url}: {error}", url=urls[i], error=str(result))
# Add an empty result for failed URLs when raise_on_failure=False
if not self.raise_on_failure:
streams.append(ByteStream(data=b"", meta={"content_type": "Unknown", "url": urls[i]}))
continue
# Process successful results
# At this point, result is not an exception, so we need to cast it to the correct type for mypy
if not isinstance(result, Exception): # Runtime check
# Use cast to tell mypy that result is the tuple type returned by _fetch_async
result_tuple = cast(Tuple[Optional[Dict[str, str]], Optional[ByteStream]], result)
stream_metadata, stream = result_tuple
if stream_metadata is not None and stream is not None:
stream.meta.update(stream_metadata)
stream.mime_type = stream.meta.get("content_type", None)
streams.append(stream)
return {"streams": streams}
def _fetch(self, url: str) -> Tuple[Dict[str, str], ByteStream]:
"""
Fetches content from a URL and returns it as a ByteStream.
:param url: The URL to fetch content from.
:return: A tuple containing the ByteStream metadata dict and the corresponding ByteStream.
:returns: A tuple containing the ByteStream metadata dict and the corresponding ByteStream.
ByteStream metadata contains the URL and the content type of the fetched content.
The content type is a string indicating the type of content fetched (for example, "text/html",
"application/pdf"). The ByteStream object contains the fetched content as binary data.
@ -197,6 +313,38 @@ class LinkContentFetcher:
return {"content_type": content_type, "url": url}, stream
async def _fetch_async(
self, url: str, client: httpx.AsyncClient
) -> Tuple[Optional[Dict[str, str]], Optional[ByteStream]]:
"""
Asynchronously fetches content from a URL and returns it as a ByteStream.
:param url: The URL to fetch content from.
:param client: The async httpx client to use for making requests.
:returns: A tuple containing the ByteStream metadata dict and the corresponding ByteStream.
"""
content_type: str = "text/html"
stream: Optional[ByteStream] = None
metadata: Optional[Dict[str, str]] = None
try:
response = await self._get_response_async(url, client)
content_type = self._get_content_type(response)
handler: Callable = self._resolve_handler(content_type)
stream = handler(response)
metadata = {"content_type": content_type, "url": url}
except Exception as e:
if self.raise_on_failure:
raise e
logger.debug("Couldn't retrieve content from {url} because {error}", url=url, error=str(e))
# Create an empty ByteStream for failed requests when raise_on_failure is False
stream = ByteStream(data=b"")
metadata = {"content_type": content_type, "url": url}
finally:
self.current_user_agent_idx = 0
return metadata, stream
def _fetch_with_exception_suppression(self, url: str) -> Tuple[Optional[Dict[str, str]], Optional[ByteStream]]:
"""
Fetches content from a URL and returns it as a ByteStream.
@ -204,7 +352,7 @@ class LinkContentFetcher:
If `raise_on_failure` is set to True, this method will wrap the fetch() method and catch any exceptions.
Otherwise, it will simply call the fetch() method.
:param url: The URL to fetch content from.
:return: A tuple containing the ByteStream metadata dict and the corresponding ByteStream.
:returns: A tuple containing the ByteStream metadata dict and the corresponding ByteStream.
"""
if self.raise_on_failure:
@ -216,17 +364,52 @@ class LinkContentFetcher:
else:
return self._fetch(url)
def _get_content_type(self, response: Response):
async def _get_response_async(self, url: str, client: httpx.AsyncClient) -> httpx.Response:
"""
Asynchronously gets a response from a URL with retry logic.
:param url: The URL to fetch.
:param client: The async httpx client to use for making requests.
:returns: The httpx Response object.
"""
attempt = 0
last_exception = None
while attempt <= self.retry_attempts:
try:
headers = REQUEST_HEADERS.copy()
headers["User-Agent"] = self.user_agents[self.current_user_agent_idx]
response = await client.get(url, headers=headers)
response.raise_for_status()
return response
except (httpx.HTTPStatusError, httpx.RequestError) as e:
last_exception = e
attempt += 1
if attempt <= self.retry_attempts:
self._switch_user_agent(None) # Switch user agent for next retry
# Wait before retry using exponential backoff
await asyncio.sleep(min(2 * 2 ** (attempt - 1), 10))
else:
break
# If we've exhausted all retries, raise the last exception
if last_exception:
raise last_exception
# This should never happen, but just in case
raise httpx.RequestError("Failed to get response after retries", request=None)
def _get_content_type(self, response: httpx.Response):
"""
Get the content type of the response.
:param response: The response object.
:return: The content type of the response.
:returns: The content type of the response.
"""
content_type = response.headers.get("Content-Type", "")
return content_type.split(";")[0]
def _resolve_handler(self, content_type: str) -> Callable[[Response], ByteStream]:
def _resolve_handler(self, content_type: str) -> Callable[[httpx.Response], ByteStream]:
"""
Resolves the handler for the given content type.
@ -249,7 +432,7 @@ class LinkContentFetcher:
# default handler
return self.handlers["text/plain"]
def _switch_user_agent(self, retry_state: RetryCallState) -> None:
def _switch_user_agent(self, retry_state: Optional[RetryCallState] = None) -> None:
"""
Switches the User-Agent for this LinkContentRetriever to the next one in the list of user agents.

View File

@ -132,6 +132,9 @@ extra-dependencies = [
# ComponentTool
"docstring-parser",
# needed in link content fetcher tests
"httpx[http2]",
# Azure Utils
"azure-identity",

View File

@ -0,0 +1,4 @@
---
features:
- |
Introduced asynchronous functionality and HTTP/2 support in the LinkContentFetcher component thus improving content fetching in several aspects.

View File

@ -4,7 +4,7 @@
from unittest.mock import patch, Mock
import pytest
import requests
import httpx
from haystack.components.fetchers.link_content import (
LinkContentFetcher,
@ -20,31 +20,34 @@ PDF_URL = "https://raw.githubusercontent.com/deepset-ai/haystack/b5987a6d8d0714e
@pytest.fixture
def mock_get_link_text_content():
with patch("haystack.components.fetchers.link_content.requests") as mock_run:
mock_run.get.return_value = Mock(
status_code=200, text="Example test response", headers={"Content-Type": "text/plain"}
)
yield mock_run
with patch("haystack.components.fetchers.link_content.httpx.Client.get") as mock_get:
mock_response = Mock(status_code=200, text="Example test response", headers={"Content-Type": "text/plain"})
mock_get.return_value = mock_response
yield mock_get
@pytest.fixture
def mock_get_link_content(test_files_path):
with patch("haystack.components.fetchers.link_content.requests") as mock_run:
mock_run.get.return_value = Mock(
with patch("haystack.components.fetchers.link_content.httpx.Client.get") as mock_get:
mock_response = Mock(
status_code=200,
content=open(test_files_path / "pdf" / "sample_pdf_1.pdf", "rb").read(),
headers={"Content-Type": "application/pdf"},
)
yield mock_run
mock_get.return_value = mock_response
yield mock_get
class TestLinkContentFetcher:
def test_init(self):
"""Test initialization with default parameters"""
fetcher = LinkContentFetcher()
assert fetcher.raise_on_failure is True
assert fetcher.user_agents == [DEFAULT_USER_AGENT]
assert fetcher.retry_attempts == 2
assert fetcher.timeout == 3
assert fetcher.http2 is False
assert isinstance(fetcher.client_kwargs, dict)
assert fetcher.handlers == {
"text/*": _text_content_handler,
"text/html": _binary_content_handler,
@ -55,20 +58,33 @@ class TestLinkContentFetcher:
"video/*": _binary_content_handler,
}
assert hasattr(fetcher, "_get_response")
assert hasattr(fetcher, "_client")
assert isinstance(fetcher._client, httpx.Client)
def test_init_with_params(self):
fetcher = LinkContentFetcher(raise_on_failure=False, user_agents=["test"], retry_attempts=1, timeout=2)
"""Test initialization with custom parameters"""
fetcher = LinkContentFetcher(
raise_on_failure=False,
user_agents=["test"],
retry_attempts=1,
timeout=2,
http2=True,
client_kwargs={"verify": False},
)
assert fetcher.raise_on_failure is False
assert fetcher.user_agents == ["test"]
assert fetcher.retry_attempts == 1
assert fetcher.timeout == 2
assert fetcher.http2 is True
assert "verify" in fetcher.client_kwargs
assert fetcher.client_kwargs["verify"] is False
def test_run_text(self):
"""Test fetching text content"""
correct_response = b"Example test response"
with patch("haystack.components.fetchers.link_content.requests") as mock_run:
mock_run.get.return_value = Mock(
status_code=200, text="Example test response", headers={"Content-Type": "text/plain"}
)
with patch("haystack.components.fetchers.link_content.httpx.Client.get") as mock_get:
mock_response = Mock(status_code=200, text="Example test response", headers={"Content-Type": "text/plain"})
mock_get.return_value = mock_response
fetcher = LinkContentFetcher()
streams = fetcher.run(urls=["https://www.example.com"])["streams"]
first_stream = streams[0]
@ -77,11 +93,13 @@ class TestLinkContentFetcher:
assert first_stream.mime_type == "text/plain"
def test_run_html(self):
"""Test fetching HTML content"""
correct_response = b"<h1>Example test response</h1>"
with patch("haystack.components.fetchers.link_content.requests") as mock_run:
mock_run.get.return_value = Mock(
with patch("haystack.components.fetchers.link_content.httpx.Client.get") as mock_get:
mock_response = Mock(
status_code=200, content=b"<h1>Example test response</h1>", headers={"Content-Type": "text/html"}
)
mock_get.return_value = mock_response
fetcher = LinkContentFetcher()
streams = fetcher.run(urls=["https://www.example.com"])["streams"]
first_stream = streams[0]
@ -90,11 +108,11 @@ class TestLinkContentFetcher:
assert first_stream.mime_type == "text/html"
def test_run_binary(self, test_files_path):
"""Test fetching binary content"""
file_bytes = open(test_files_path / "pdf" / "sample_pdf_1.pdf", "rb").read()
with patch("haystack.components.fetchers.link_content.requests") as mock_run:
mock_run.get.return_value = Mock(
status_code=200, content=file_bytes, headers={"Content-Type": "application/pdf"}
)
with patch("haystack.components.fetchers.link_content.httpx.Client.get") as mock_get:
mock_response = Mock(status_code=200, content=file_bytes, headers={"Content-Type": "application/pdf"})
mock_get.return_value = mock_response
fetcher = LinkContentFetcher()
streams = fetcher.run(urls=["https://www.example.com"])["streams"]
first_stream = streams[0]
@ -103,11 +121,16 @@ class TestLinkContentFetcher:
assert first_stream.mime_type == "application/pdf"
def test_run_bad_status_code(self):
"""Test behavior when a request results in an error status code"""
empty_byte_stream = b""
fetcher = LinkContentFetcher(raise_on_failure=False)
mock_response = Mock(status_code=403)
with patch("haystack.components.fetchers.link_content.requests") as mock_run:
mock_run.get.return_value = mock_response
mock_response.raise_for_status.side_effect = httpx.HTTPStatusError(
"403 Client Error", request=Mock(), response=mock_response
)
with patch("haystack.components.fetchers.link_content.httpx.Client.get") as mock_get:
mock_get.return_value = mock_response
streams = fetcher.run(urls=["https://www.example.com"])["streams"]
# empty byte stream is returned because raise_on_failure is False
@ -119,6 +142,9 @@ class TestLinkContentFetcher:
@pytest.mark.integration
def test_link_content_fetcher_html(self):
"""
Test fetching HTML content from a real URL.
"""
fetcher = LinkContentFetcher()
streams = fetcher.run([HTML_URL])["streams"]
first_stream = streams[0]
@ -129,6 +155,9 @@ class TestLinkContentFetcher:
@pytest.mark.integration
def test_link_content_fetcher_text(self):
"""
Test fetching text content from a real URL.
"""
fetcher = LinkContentFetcher()
streams = fetcher.run([TEXT_URL])["streams"]
first_stream = streams[0]
@ -139,6 +168,9 @@ class TestLinkContentFetcher:
@pytest.mark.integration
def test_link_content_fetcher_pdf(self):
"""
Test fetching PDF content from a real URL.
"""
fetcher = LinkContentFetcher()
streams = fetcher.run([PDF_URL])["streams"]
assert len(streams) == 1
@ -204,14 +236,141 @@ class TestLinkContentFetcher:
do so.
"""
fetcher = LinkContentFetcher()
with pytest.raises(requests.exceptions.ConnectionError):
with pytest.raises((httpx.ConnectError, httpx.ConnectTimeout)):
fetcher.run(["https://non_existent_website_dot.com/"])
@pytest.mark.integration
def test_link_content_fetcher_audio(self):
"""
Test fetching audio content from a real URL.
"""
fetcher = LinkContentFetcher()
streams = fetcher.run(["https://download.samplelib.com/mp3/sample-3s.mp3"])["streams"]
first_stream = streams[0]
assert first_stream.meta["content_type"] == "audio/mpeg"
assert first_stream.mime_type == "audio/mpeg"
assert len(first_stream.data) > 0
class TestLinkContentFetcherAsync:
@pytest.mark.asyncio
async def test_run_async(self):
"""Test basic async fetching with a mocked response"""
with patch("haystack.components.fetchers.link_content.httpx.AsyncClient.get") as mock_get:
mock_response = Mock(status_code=200, text="Example test response", headers={"Content-Type": "text/plain"})
mock_get.return_value = mock_response
fetcher = LinkContentFetcher()
streams = (await fetcher.run_async(urls=["https://www.example.com"]))["streams"]
first_stream = streams[0]
assert first_stream.data == b"Example test response"
assert first_stream.meta["content_type"] == "text/plain"
assert first_stream.mime_type == "text/plain"
@pytest.mark.asyncio
async def test_run_async_multiple(self):
"""Test async fetching of multiple URLs with mocked responses"""
with patch("haystack.components.fetchers.link_content.httpx.AsyncClient.get") as mock_get:
mock_response = Mock(status_code=200, text="Example test response", headers={"Content-Type": "text/plain"})
mock_get.return_value = mock_response
fetcher = LinkContentFetcher()
streams = (await fetcher.run_async(urls=["https://www.example1.com", "https://www.example2.com"]))[
"streams"
]
assert len(streams) == 2
for stream in streams:
assert stream.data == b"Example test response"
assert stream.meta["content_type"] == "text/plain"
assert stream.mime_type == "text/plain"
@pytest.mark.asyncio
async def test_run_async_empty_urls(self):
"""Test async fetching with empty URL list"""
fetcher = LinkContentFetcher()
streams = (await fetcher.run_async(urls=[]))["streams"]
assert len(streams) == 0
@pytest.mark.asyncio
async def test_run_async_error_handling(self):
"""Test error handling for async fetching"""
with patch("haystack.components.fetchers.link_content.httpx.AsyncClient.get") as mock_get:
mock_response = Mock(status_code=404)
mock_response.raise_for_status.side_effect = httpx.HTTPStatusError(
"404 Not Found", request=Mock(), response=mock_response
)
mock_get.return_value = mock_response
# With raise_on_failure=False
fetcher = LinkContentFetcher(raise_on_failure=False)
streams = (await fetcher.run_async(urls=["https://www.example.com"]))["streams"]
assert len(streams) == 1 # Returns an empty stream
# With raise_on_failure=True
fetcher = LinkContentFetcher(raise_on_failure=True)
with pytest.raises(httpx.HTTPStatusError):
await fetcher.run_async(urls=["https://www.example.com"])
@pytest.mark.asyncio
async def test_run_async_user_agent_rotation(self):
"""Test user agent rotation in async fetching"""
with patch("haystack.components.fetchers.link_content.httpx.AsyncClient.get") as mock_get:
# First call raises an error to trigger user agent rotation
first_response = Mock(status_code=403)
first_response.raise_for_status.side_effect = httpx.HTTPStatusError(
"403 Forbidden", request=Mock(), response=first_response
)
# Second call succeeds
second_response = Mock(status_code=200, text="Success", headers={"Content-Type": "text/plain"})
# Use side_effect to return different responses on consecutive calls
mock_get.side_effect = [first_response, second_response]
# Create fetcher with custom user agents
fetcher = LinkContentFetcher(user_agents=["agent1", "agent2"], retry_attempts=1)
# Should succeed on the second attempt with the second user agent
streams = (await fetcher.run_async(urls=["https://www.example.com"]))["streams"]
assert len(streams) == 1
assert streams[0].data == b"Success"
@pytest.mark.asyncio
@pytest.mark.integration
async def test_run_async_integration(self):
"""Test async fetching with real HTTP requests"""
fetcher = LinkContentFetcher()
streams = (await fetcher.run_async([HTML_URL]))["streams"]
first_stream = streams[0]
assert "Haystack" in first_stream.data.decode("utf-8")
assert first_stream.meta["content_type"] == "text/html"
assert first_stream.mime_type == "text/html"
@pytest.mark.asyncio
@pytest.mark.integration
async def test_run_async_multiple_integration(self):
"""Test async fetching of multiple URLs with real HTTP requests"""
fetcher = LinkContentFetcher()
streams = (await fetcher.run_async([HTML_URL, TEXT_URL]))["streams"]
assert len(streams) == 2
for stream in streams:
assert "Haystack" in stream.data.decode("utf-8")
if stream.meta["url"] == HTML_URL:
assert stream.meta["content_type"] == "text/html"
assert stream.mime_type == "text/html"
elif stream.meta["url"] == TEXT_URL:
assert stream.meta["content_type"] == "text/plain"
assert stream.mime_type == "text/plain"
@pytest.mark.asyncio
@pytest.mark.integration
async def test_run_async_with_client_kwargs(self):
"""Test async fetching with custom client kwargs"""
fetcher = LinkContentFetcher(client_kwargs={"follow_redirects": True, "timeout": 10.0})
streams = (await fetcher.run_async([HTML_URL]))["streams"]
assert len(streams) == 1
assert "Haystack" in streams[0].data.decode("utf-8")