Compare commits
1 Commits
main
...
run-many-d
Author | SHA1 | Date | |
---|---|---|---|
![]() |
4dfd270161 |
@ -1,4 +1,4 @@
|
||||
from typing import Dict, Optional, List, Tuple
|
||||
from typing import Dict, Optional, List, Tuple, Union
|
||||
from .async_configs import CrawlerRunConfig
|
||||
from .models import (
|
||||
CrawlResult,
|
||||
@ -183,7 +183,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
|
||||
config: CrawlerRunConfig,
|
||||
task_id: str,
|
||||
retry_count: int = 0,
|
||||
) -> CrawlerTaskResult:
|
||||
) -> Union[CrawlerTaskResult, List[CrawlerTaskResult]]:
|
||||
start_time = time.time()
|
||||
error_message = ""
|
||||
memory_usage = peak_memory = 0.0
|
||||
@ -244,8 +244,53 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
|
||||
end_memory = process.memory_info().rss / (1024 * 1024)
|
||||
memory_usage = peak_memory = end_memory - start_memory
|
||||
|
||||
# Handle rate limiting
|
||||
if self.rate_limiter and result.status_code:
|
||||
# Check if we have a container with multiple results (deep crawl result)
|
||||
if isinstance(result, list) or (hasattr(result, '_results') and len(result._results) > 1):
|
||||
# Handle deep crawling results - create a list of task results
|
||||
task_results = []
|
||||
result_list = result if isinstance(result, list) else result._results
|
||||
|
||||
for idx, single_result in enumerate(result_list):
|
||||
# Create individual task result for each crawled page
|
||||
sub_task_id = f"{task_id}_{idx}"
|
||||
single_memory = memory_usage / len(result_list) # Distribute memory usage
|
||||
|
||||
# Only update rate limiter for first result which corresponds to the original URL
|
||||
if idx == 0 and self.rate_limiter and hasattr(single_result, 'status_code') and single_result.status_code:
|
||||
if not self.rate_limiter.update_delay(url, single_result.status_code):
|
||||
error_msg = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}"
|
||||
if self.monitor:
|
||||
self.monitor.update_task(task_id, status=CrawlStatus.FAILED)
|
||||
|
||||
task_result = CrawlerTaskResult(
|
||||
task_id=sub_task_id,
|
||||
url=single_result.url,
|
||||
result=single_result,
|
||||
memory_usage=single_memory,
|
||||
peak_memory=single_memory,
|
||||
start_time=start_time,
|
||||
end_time=time.time(),
|
||||
error_message=single_result.error_message if not single_result.success else "",
|
||||
retry_count=retry_count
|
||||
)
|
||||
task_results.append(task_result)
|
||||
|
||||
# Update monitor with completion status based on the first/primary result
|
||||
if self.monitor:
|
||||
primary_result = result_list[0]
|
||||
if not primary_result.success:
|
||||
self.monitor.update_task(task_id, status=CrawlStatus.FAILED)
|
||||
else:
|
||||
self.monitor.update_task(
|
||||
task_id,
|
||||
status=CrawlStatus.COMPLETED,
|
||||
extra_info=f"Deep crawl: {len(result_list)} pages"
|
||||
)
|
||||
|
||||
return task_results
|
||||
|
||||
# Handle single result (original behavior)
|
||||
if self.rate_limiter and hasattr(result, 'status_code') and result.status_code:
|
||||
if not self.rate_limiter.update_delay(url, result.status_code):
|
||||
error_message = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}"
|
||||
if self.monitor:
|
||||
@ -291,7 +336,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
|
||||
error_message=error_message,
|
||||
retry_count=retry_count
|
||||
)
|
||||
|
||||
|
||||
async def run_urls(
|
||||
self,
|
||||
urls: List[str],
|
||||
@ -356,8 +401,13 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
|
||||
|
||||
# Process completed tasks
|
||||
for completed_task in done:
|
||||
result = await completed_task
|
||||
results.append(result)
|
||||
task_result = await completed_task
|
||||
|
||||
# Handle both single results and lists of results
|
||||
if isinstance(task_result, list):
|
||||
results.extend(task_result)
|
||||
else:
|
||||
results.append(task_result)
|
||||
|
||||
# Update active tasks list
|
||||
active_tasks = list(pending)
|
||||
@ -379,7 +429,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
|
||||
memory_monitor.cancel()
|
||||
if self.monitor:
|
||||
self.monitor.stop()
|
||||
|
||||
|
||||
async def _update_queue_priorities(self):
|
||||
"""Periodically update priorities of items in the queue to prevent starvation"""
|
||||
# Skip if queue is empty
|
||||
|
@ -673,18 +673,6 @@ class AsyncWebCrawler:
|
||||
urls: List[str],
|
||||
config: Optional[CrawlerRunConfig] = None,
|
||||
dispatcher: Optional[BaseDispatcher] = None,
|
||||
# Legacy parameters maintained for backwards compatibility
|
||||
# word_count_threshold=MIN_WORD_THRESHOLD,
|
||||
# extraction_strategy: ExtractionStrategy = None,
|
||||
# chunking_strategy: ChunkingStrategy = RegexChunking(),
|
||||
# content_filter: RelevantContentFilter = None,
|
||||
# cache_mode: Optional[CacheMode] = None,
|
||||
# bypass_cache: bool = False,
|
||||
# css_selector: str = None,
|
||||
# screenshot: bool = False,
|
||||
# pdf: bool = False,
|
||||
# user_agent: str = None,
|
||||
# verbose=True,
|
||||
**kwargs
|
||||
) -> RunManyReturn:
|
||||
"""
|
||||
@ -718,20 +706,7 @@ class AsyncWebCrawler:
|
||||
print(f"Processed {result.url}: {len(result.markdown)} chars")
|
||||
"""
|
||||
config = config or CrawlerRunConfig()
|
||||
# if config is None:
|
||||
# config = CrawlerRunConfig(
|
||||
# word_count_threshold=word_count_threshold,
|
||||
# extraction_strategy=extraction_strategy,
|
||||
# chunking_strategy=chunking_strategy,
|
||||
# content_filter=content_filter,
|
||||
# cache_mode=cache_mode,
|
||||
# bypass_cache=bypass_cache,
|
||||
# css_selector=css_selector,
|
||||
# screenshot=screenshot,
|
||||
# pdf=pdf,
|
||||
# verbose=verbose,
|
||||
# **kwargs,
|
||||
# )
|
||||
|
||||
|
||||
if dispatcher is None:
|
||||
dispatcher = MemoryAdaptiveDispatcher(
|
||||
|
@ -7,6 +7,7 @@ from contextvars import ContextVar
|
||||
from ..types import AsyncWebCrawler, CrawlerRunConfig, CrawlResult, RunManyReturn
|
||||
|
||||
|
||||
|
||||
class DeepCrawlDecorator:
|
||||
"""Decorator that adds deep crawling capability to arun method."""
|
||||
deep_crawl_active = ContextVar("deep_crawl_active", default=False)
|
||||
@ -59,7 +60,8 @@ class DeepCrawlStrategy(ABC):
|
||||
start_url: str,
|
||||
crawler: AsyncWebCrawler,
|
||||
config: CrawlerRunConfig,
|
||||
) -> List[CrawlResult]:
|
||||
# ) -> List[CrawlResult]:
|
||||
) -> RunManyReturn:
|
||||
"""
|
||||
Batch (non-streaming) mode:
|
||||
Processes one BFS level at a time, then yields all the results.
|
||||
@ -72,7 +74,8 @@ class DeepCrawlStrategy(ABC):
|
||||
start_url: str,
|
||||
crawler: AsyncWebCrawler,
|
||||
config: CrawlerRunConfig,
|
||||
) -> AsyncGenerator[CrawlResult, None]:
|
||||
# ) -> AsyncGenerator[CrawlResult, None]:
|
||||
) -> RunManyReturn:
|
||||
"""
|
||||
Streaming mode:
|
||||
Processes one BFS level at a time and yields results immediately as they arrive.
|
||||
|
@ -9,7 +9,7 @@ from ..models import TraversalStats
|
||||
from .filters import FilterChain
|
||||
from .scorers import URLScorer
|
||||
from . import DeepCrawlStrategy
|
||||
from ..types import AsyncWebCrawler, CrawlerRunConfig, CrawlResult
|
||||
from ..types import AsyncWebCrawler, CrawlerRunConfig, CrawlResult, RunManyReturn
|
||||
from ..utils import normalize_url_for_deep_crawl, efficient_normalize_url_for_deep_crawl
|
||||
from math import inf as infinity
|
||||
|
||||
@ -143,7 +143,8 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
|
||||
start_url: str,
|
||||
crawler: AsyncWebCrawler,
|
||||
config: CrawlerRunConfig,
|
||||
) -> List[CrawlResult]:
|
||||
# ) -> List[CrawlResult]:
|
||||
) -> RunManyReturn:
|
||||
"""
|
||||
Batch (non-streaming) mode:
|
||||
Processes one BFS level at a time, then yields all the results.
|
||||
@ -191,7 +192,8 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
|
||||
start_url: str,
|
||||
crawler: AsyncWebCrawler,
|
||||
config: CrawlerRunConfig,
|
||||
) -> AsyncGenerator[CrawlResult, None]:
|
||||
# ) -> AsyncGenerator[CrawlResult, None]:
|
||||
) -> RunManyReturn:
|
||||
"""
|
||||
Streaming mode:
|
||||
Processes one BFS level at a time and yields results immediately as they arrive.
|
||||
|
@ -3,7 +3,7 @@ from typing import AsyncGenerator, Optional, Set, Dict, List, Tuple
|
||||
|
||||
from ..models import CrawlResult
|
||||
from .bfs_strategy import BFSDeepCrawlStrategy # noqa
|
||||
from ..types import AsyncWebCrawler, CrawlerRunConfig
|
||||
from ..types import AsyncWebCrawler, CrawlerRunConfig, RunManyReturn
|
||||
|
||||
class DFSDeepCrawlStrategy(BFSDeepCrawlStrategy):
|
||||
"""
|
||||
@ -17,7 +17,8 @@ class DFSDeepCrawlStrategy(BFSDeepCrawlStrategy):
|
||||
start_url: str,
|
||||
crawler: AsyncWebCrawler,
|
||||
config: CrawlerRunConfig,
|
||||
) -> List[CrawlResult]:
|
||||
# ) -> List[CrawlResult]:
|
||||
) -> RunManyReturn:
|
||||
"""
|
||||
Batch (non-streaming) DFS mode.
|
||||
Uses a stack to traverse URLs in DFS order, aggregating CrawlResults into a list.
|
||||
@ -65,7 +66,8 @@ class DFSDeepCrawlStrategy(BFSDeepCrawlStrategy):
|
||||
start_url: str,
|
||||
crawler: AsyncWebCrawler,
|
||||
config: CrawlerRunConfig,
|
||||
) -> AsyncGenerator[CrawlResult, None]:
|
||||
# ) -> AsyncGenerator[CrawlResult, None]:
|
||||
) -> RunManyReturn:
|
||||
"""
|
||||
Streaming DFS mode.
|
||||
Uses a stack to traverse URLs in DFS order and yields CrawlResults as they become available.
|
||||
|
Loading…
x
Reference in New Issue
Block a user