Compare commits

...

1 Commits

Author SHA1 Message Date
UncleCode
4dfd270161 fix: #855
feat(deep-crawling): enhance dispatcher to handle multi-page crawl results

Modify MemoryAdaptiveDispatcher to properly handle results from deep crawling operations:
- Add support for processing multiple results from deep crawling
- Implement memory usage distribution across multiple results
- Update task monitoring for deep crawling scenarios
- Modify return types in deep crawling strategies to use RunManyReturn
2025-03-24 22:54:53 +08:00
5 changed files with 74 additions and 42 deletions

View File

@ -1,4 +1,4 @@
from typing import Dict, Optional, List, Tuple
from typing import Dict, Optional, List, Tuple, Union
from .async_configs import CrawlerRunConfig
from .models import (
CrawlResult,
@ -183,7 +183,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
config: CrawlerRunConfig,
task_id: str,
retry_count: int = 0,
) -> CrawlerTaskResult:
) -> Union[CrawlerTaskResult, List[CrawlerTaskResult]]:
start_time = time.time()
error_message = ""
memory_usage = peak_memory = 0.0
@ -244,8 +244,53 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
end_memory = process.memory_info().rss / (1024 * 1024)
memory_usage = peak_memory = end_memory - start_memory
# Handle rate limiting
if self.rate_limiter and result.status_code:
# Check if we have a container with multiple results (deep crawl result)
if isinstance(result, list) or (hasattr(result, '_results') and len(result._results) > 1):
# Handle deep crawling results - create a list of task results
task_results = []
result_list = result if isinstance(result, list) else result._results
for idx, single_result in enumerate(result_list):
# Create individual task result for each crawled page
sub_task_id = f"{task_id}_{idx}"
single_memory = memory_usage / len(result_list) # Distribute memory usage
# Only update rate limiter for first result which corresponds to the original URL
if idx == 0 and self.rate_limiter and hasattr(single_result, 'status_code') and single_result.status_code:
if not self.rate_limiter.update_delay(url, single_result.status_code):
error_msg = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}"
if self.monitor:
self.monitor.update_task(task_id, status=CrawlStatus.FAILED)
task_result = CrawlerTaskResult(
task_id=sub_task_id,
url=single_result.url,
result=single_result,
memory_usage=single_memory,
peak_memory=single_memory,
start_time=start_time,
end_time=time.time(),
error_message=single_result.error_message if not single_result.success else "",
retry_count=retry_count
)
task_results.append(task_result)
# Update monitor with completion status based on the first/primary result
if self.monitor:
primary_result = result_list[0]
if not primary_result.success:
self.monitor.update_task(task_id, status=CrawlStatus.FAILED)
else:
self.monitor.update_task(
task_id,
status=CrawlStatus.COMPLETED,
extra_info=f"Deep crawl: {len(result_list)} pages"
)
return task_results
# Handle single result (original behavior)
if self.rate_limiter and hasattr(result, 'status_code') and result.status_code:
if not self.rate_limiter.update_delay(url, result.status_code):
error_message = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}"
if self.monitor:
@ -291,7 +336,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
error_message=error_message,
retry_count=retry_count
)
async def run_urls(
self,
urls: List[str],
@ -356,8 +401,13 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
# Process completed tasks
for completed_task in done:
result = await completed_task
results.append(result)
task_result = await completed_task
# Handle both single results and lists of results
if isinstance(task_result, list):
results.extend(task_result)
else:
results.append(task_result)
# Update active tasks list
active_tasks = list(pending)
@ -379,7 +429,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
memory_monitor.cancel()
if self.monitor:
self.monitor.stop()
async def _update_queue_priorities(self):
"""Periodically update priorities of items in the queue to prevent starvation"""
# Skip if queue is empty

View File

@ -673,18 +673,6 @@ class AsyncWebCrawler:
urls: List[str],
config: Optional[CrawlerRunConfig] = None,
dispatcher: Optional[BaseDispatcher] = None,
# Legacy parameters maintained for backwards compatibility
# word_count_threshold=MIN_WORD_THRESHOLD,
# extraction_strategy: ExtractionStrategy = None,
# chunking_strategy: ChunkingStrategy = RegexChunking(),
# content_filter: RelevantContentFilter = None,
# cache_mode: Optional[CacheMode] = None,
# bypass_cache: bool = False,
# css_selector: str = None,
# screenshot: bool = False,
# pdf: bool = False,
# user_agent: str = None,
# verbose=True,
**kwargs
) -> RunManyReturn:
"""
@ -718,20 +706,7 @@ class AsyncWebCrawler:
print(f"Processed {result.url}: {len(result.markdown)} chars")
"""
config = config or CrawlerRunConfig()
# if config is None:
# config = CrawlerRunConfig(
# word_count_threshold=word_count_threshold,
# extraction_strategy=extraction_strategy,
# chunking_strategy=chunking_strategy,
# content_filter=content_filter,
# cache_mode=cache_mode,
# bypass_cache=bypass_cache,
# css_selector=css_selector,
# screenshot=screenshot,
# pdf=pdf,
# verbose=verbose,
# **kwargs,
# )
if dispatcher is None:
dispatcher = MemoryAdaptiveDispatcher(

View File

@ -7,6 +7,7 @@ from contextvars import ContextVar
from ..types import AsyncWebCrawler, CrawlerRunConfig, CrawlResult, RunManyReturn
class DeepCrawlDecorator:
"""Decorator that adds deep crawling capability to arun method."""
deep_crawl_active = ContextVar("deep_crawl_active", default=False)
@ -59,7 +60,8 @@ class DeepCrawlStrategy(ABC):
start_url: str,
crawler: AsyncWebCrawler,
config: CrawlerRunConfig,
) -> List[CrawlResult]:
# ) -> List[CrawlResult]:
) -> RunManyReturn:
"""
Batch (non-streaming) mode:
Processes one BFS level at a time, then yields all the results.
@ -72,7 +74,8 @@ class DeepCrawlStrategy(ABC):
start_url: str,
crawler: AsyncWebCrawler,
config: CrawlerRunConfig,
) -> AsyncGenerator[CrawlResult, None]:
# ) -> AsyncGenerator[CrawlResult, None]:
) -> RunManyReturn:
"""
Streaming mode:
Processes one BFS level at a time and yields results immediately as they arrive.

View File

@ -9,7 +9,7 @@ from ..models import TraversalStats
from .filters import FilterChain
from .scorers import URLScorer
from . import DeepCrawlStrategy
from ..types import AsyncWebCrawler, CrawlerRunConfig, CrawlResult
from ..types import AsyncWebCrawler, CrawlerRunConfig, CrawlResult, RunManyReturn
from ..utils import normalize_url_for_deep_crawl, efficient_normalize_url_for_deep_crawl
from math import inf as infinity
@ -143,7 +143,8 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
start_url: str,
crawler: AsyncWebCrawler,
config: CrawlerRunConfig,
) -> List[CrawlResult]:
# ) -> List[CrawlResult]:
) -> RunManyReturn:
"""
Batch (non-streaming) mode:
Processes one BFS level at a time, then yields all the results.
@ -191,7 +192,8 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
start_url: str,
crawler: AsyncWebCrawler,
config: CrawlerRunConfig,
) -> AsyncGenerator[CrawlResult, None]:
# ) -> AsyncGenerator[CrawlResult, None]:
) -> RunManyReturn:
"""
Streaming mode:
Processes one BFS level at a time and yields results immediately as they arrive.

View File

@ -3,7 +3,7 @@ from typing import AsyncGenerator, Optional, Set, Dict, List, Tuple
from ..models import CrawlResult
from .bfs_strategy import BFSDeepCrawlStrategy # noqa
from ..types import AsyncWebCrawler, CrawlerRunConfig
from ..types import AsyncWebCrawler, CrawlerRunConfig, RunManyReturn
class DFSDeepCrawlStrategy(BFSDeepCrawlStrategy):
"""
@ -17,7 +17,8 @@ class DFSDeepCrawlStrategy(BFSDeepCrawlStrategy):
start_url: str,
crawler: AsyncWebCrawler,
config: CrawlerRunConfig,
) -> List[CrawlResult]:
# ) -> List[CrawlResult]:
) -> RunManyReturn:
"""
Batch (non-streaming) DFS mode.
Uses a stack to traverse URLs in DFS order, aggregating CrawlResults into a list.
@ -65,7 +66,8 @@ class DFSDeepCrawlStrategy(BFSDeepCrawlStrategy):
start_url: str,
crawler: AsyncWebCrawler,
config: CrawlerRunConfig,
) -> AsyncGenerator[CrawlResult, None]:
# ) -> AsyncGenerator[CrawlResult, None]:
) -> RunManyReturn:
"""
Streaming DFS mode.
Uses a stack to traverse URLs in DFS order and yields CrawlResults as they become available.