Compare commits
1 Commits
main
...
run-many-d
Author | SHA1 | Date | |
---|---|---|---|
![]() |
4dfd270161 |
@ -1,4 +1,4 @@
|
|||||||
from typing import Dict, Optional, List, Tuple
|
from typing import Dict, Optional, List, Tuple, Union
|
||||||
from .async_configs import CrawlerRunConfig
|
from .async_configs import CrawlerRunConfig
|
||||||
from .models import (
|
from .models import (
|
||||||
CrawlResult,
|
CrawlResult,
|
||||||
@ -183,7 +183,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
|
|||||||
config: CrawlerRunConfig,
|
config: CrawlerRunConfig,
|
||||||
task_id: str,
|
task_id: str,
|
||||||
retry_count: int = 0,
|
retry_count: int = 0,
|
||||||
) -> CrawlerTaskResult:
|
) -> Union[CrawlerTaskResult, List[CrawlerTaskResult]]:
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
error_message = ""
|
error_message = ""
|
||||||
memory_usage = peak_memory = 0.0
|
memory_usage = peak_memory = 0.0
|
||||||
@ -244,8 +244,53 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
|
|||||||
end_memory = process.memory_info().rss / (1024 * 1024)
|
end_memory = process.memory_info().rss / (1024 * 1024)
|
||||||
memory_usage = peak_memory = end_memory - start_memory
|
memory_usage = peak_memory = end_memory - start_memory
|
||||||
|
|
||||||
# Handle rate limiting
|
# Check if we have a container with multiple results (deep crawl result)
|
||||||
if self.rate_limiter and result.status_code:
|
if isinstance(result, list) or (hasattr(result, '_results') and len(result._results) > 1):
|
||||||
|
# Handle deep crawling results - create a list of task results
|
||||||
|
task_results = []
|
||||||
|
result_list = result if isinstance(result, list) else result._results
|
||||||
|
|
||||||
|
for idx, single_result in enumerate(result_list):
|
||||||
|
# Create individual task result for each crawled page
|
||||||
|
sub_task_id = f"{task_id}_{idx}"
|
||||||
|
single_memory = memory_usage / len(result_list) # Distribute memory usage
|
||||||
|
|
||||||
|
# Only update rate limiter for first result which corresponds to the original URL
|
||||||
|
if idx == 0 and self.rate_limiter and hasattr(single_result, 'status_code') and single_result.status_code:
|
||||||
|
if not self.rate_limiter.update_delay(url, single_result.status_code):
|
||||||
|
error_msg = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}"
|
||||||
|
if self.monitor:
|
||||||
|
self.monitor.update_task(task_id, status=CrawlStatus.FAILED)
|
||||||
|
|
||||||
|
task_result = CrawlerTaskResult(
|
||||||
|
task_id=sub_task_id,
|
||||||
|
url=single_result.url,
|
||||||
|
result=single_result,
|
||||||
|
memory_usage=single_memory,
|
||||||
|
peak_memory=single_memory,
|
||||||
|
start_time=start_time,
|
||||||
|
end_time=time.time(),
|
||||||
|
error_message=single_result.error_message if not single_result.success else "",
|
||||||
|
retry_count=retry_count
|
||||||
|
)
|
||||||
|
task_results.append(task_result)
|
||||||
|
|
||||||
|
# Update monitor with completion status based on the first/primary result
|
||||||
|
if self.monitor:
|
||||||
|
primary_result = result_list[0]
|
||||||
|
if not primary_result.success:
|
||||||
|
self.monitor.update_task(task_id, status=CrawlStatus.FAILED)
|
||||||
|
else:
|
||||||
|
self.monitor.update_task(
|
||||||
|
task_id,
|
||||||
|
status=CrawlStatus.COMPLETED,
|
||||||
|
extra_info=f"Deep crawl: {len(result_list)} pages"
|
||||||
|
)
|
||||||
|
|
||||||
|
return task_results
|
||||||
|
|
||||||
|
# Handle single result (original behavior)
|
||||||
|
if self.rate_limiter and hasattr(result, 'status_code') and result.status_code:
|
||||||
if not self.rate_limiter.update_delay(url, result.status_code):
|
if not self.rate_limiter.update_delay(url, result.status_code):
|
||||||
error_message = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}"
|
error_message = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}"
|
||||||
if self.monitor:
|
if self.monitor:
|
||||||
@ -356,8 +401,13 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
|
|||||||
|
|
||||||
# Process completed tasks
|
# Process completed tasks
|
||||||
for completed_task in done:
|
for completed_task in done:
|
||||||
result = await completed_task
|
task_result = await completed_task
|
||||||
results.append(result)
|
|
||||||
|
# Handle both single results and lists of results
|
||||||
|
if isinstance(task_result, list):
|
||||||
|
results.extend(task_result)
|
||||||
|
else:
|
||||||
|
results.append(task_result)
|
||||||
|
|
||||||
# Update active tasks list
|
# Update active tasks list
|
||||||
active_tasks = list(pending)
|
active_tasks = list(pending)
|
||||||
|
@ -673,18 +673,6 @@ class AsyncWebCrawler:
|
|||||||
urls: List[str],
|
urls: List[str],
|
||||||
config: Optional[CrawlerRunConfig] = None,
|
config: Optional[CrawlerRunConfig] = None,
|
||||||
dispatcher: Optional[BaseDispatcher] = None,
|
dispatcher: Optional[BaseDispatcher] = None,
|
||||||
# Legacy parameters maintained for backwards compatibility
|
|
||||||
# word_count_threshold=MIN_WORD_THRESHOLD,
|
|
||||||
# extraction_strategy: ExtractionStrategy = None,
|
|
||||||
# chunking_strategy: ChunkingStrategy = RegexChunking(),
|
|
||||||
# content_filter: RelevantContentFilter = None,
|
|
||||||
# cache_mode: Optional[CacheMode] = None,
|
|
||||||
# bypass_cache: bool = False,
|
|
||||||
# css_selector: str = None,
|
|
||||||
# screenshot: bool = False,
|
|
||||||
# pdf: bool = False,
|
|
||||||
# user_agent: str = None,
|
|
||||||
# verbose=True,
|
|
||||||
**kwargs
|
**kwargs
|
||||||
) -> RunManyReturn:
|
) -> RunManyReturn:
|
||||||
"""
|
"""
|
||||||
@ -718,20 +706,7 @@ class AsyncWebCrawler:
|
|||||||
print(f"Processed {result.url}: {len(result.markdown)} chars")
|
print(f"Processed {result.url}: {len(result.markdown)} chars")
|
||||||
"""
|
"""
|
||||||
config = config or CrawlerRunConfig()
|
config = config or CrawlerRunConfig()
|
||||||
# if config is None:
|
|
||||||
# config = CrawlerRunConfig(
|
|
||||||
# word_count_threshold=word_count_threshold,
|
|
||||||
# extraction_strategy=extraction_strategy,
|
|
||||||
# chunking_strategy=chunking_strategy,
|
|
||||||
# content_filter=content_filter,
|
|
||||||
# cache_mode=cache_mode,
|
|
||||||
# bypass_cache=bypass_cache,
|
|
||||||
# css_selector=css_selector,
|
|
||||||
# screenshot=screenshot,
|
|
||||||
# pdf=pdf,
|
|
||||||
# verbose=verbose,
|
|
||||||
# **kwargs,
|
|
||||||
# )
|
|
||||||
|
|
||||||
if dispatcher is None:
|
if dispatcher is None:
|
||||||
dispatcher = MemoryAdaptiveDispatcher(
|
dispatcher = MemoryAdaptiveDispatcher(
|
||||||
|
@ -7,6 +7,7 @@ from contextvars import ContextVar
|
|||||||
from ..types import AsyncWebCrawler, CrawlerRunConfig, CrawlResult, RunManyReturn
|
from ..types import AsyncWebCrawler, CrawlerRunConfig, CrawlResult, RunManyReturn
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class DeepCrawlDecorator:
|
class DeepCrawlDecorator:
|
||||||
"""Decorator that adds deep crawling capability to arun method."""
|
"""Decorator that adds deep crawling capability to arun method."""
|
||||||
deep_crawl_active = ContextVar("deep_crawl_active", default=False)
|
deep_crawl_active = ContextVar("deep_crawl_active", default=False)
|
||||||
@ -59,7 +60,8 @@ class DeepCrawlStrategy(ABC):
|
|||||||
start_url: str,
|
start_url: str,
|
||||||
crawler: AsyncWebCrawler,
|
crawler: AsyncWebCrawler,
|
||||||
config: CrawlerRunConfig,
|
config: CrawlerRunConfig,
|
||||||
) -> List[CrawlResult]:
|
# ) -> List[CrawlResult]:
|
||||||
|
) -> RunManyReturn:
|
||||||
"""
|
"""
|
||||||
Batch (non-streaming) mode:
|
Batch (non-streaming) mode:
|
||||||
Processes one BFS level at a time, then yields all the results.
|
Processes one BFS level at a time, then yields all the results.
|
||||||
@ -72,7 +74,8 @@ class DeepCrawlStrategy(ABC):
|
|||||||
start_url: str,
|
start_url: str,
|
||||||
crawler: AsyncWebCrawler,
|
crawler: AsyncWebCrawler,
|
||||||
config: CrawlerRunConfig,
|
config: CrawlerRunConfig,
|
||||||
) -> AsyncGenerator[CrawlResult, None]:
|
# ) -> AsyncGenerator[CrawlResult, None]:
|
||||||
|
) -> RunManyReturn:
|
||||||
"""
|
"""
|
||||||
Streaming mode:
|
Streaming mode:
|
||||||
Processes one BFS level at a time and yields results immediately as they arrive.
|
Processes one BFS level at a time and yields results immediately as they arrive.
|
||||||
|
@ -9,7 +9,7 @@ from ..models import TraversalStats
|
|||||||
from .filters import FilterChain
|
from .filters import FilterChain
|
||||||
from .scorers import URLScorer
|
from .scorers import URLScorer
|
||||||
from . import DeepCrawlStrategy
|
from . import DeepCrawlStrategy
|
||||||
from ..types import AsyncWebCrawler, CrawlerRunConfig, CrawlResult
|
from ..types import AsyncWebCrawler, CrawlerRunConfig, CrawlResult, RunManyReturn
|
||||||
from ..utils import normalize_url_for_deep_crawl, efficient_normalize_url_for_deep_crawl
|
from ..utils import normalize_url_for_deep_crawl, efficient_normalize_url_for_deep_crawl
|
||||||
from math import inf as infinity
|
from math import inf as infinity
|
||||||
|
|
||||||
@ -143,7 +143,8 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
|
|||||||
start_url: str,
|
start_url: str,
|
||||||
crawler: AsyncWebCrawler,
|
crawler: AsyncWebCrawler,
|
||||||
config: CrawlerRunConfig,
|
config: CrawlerRunConfig,
|
||||||
) -> List[CrawlResult]:
|
# ) -> List[CrawlResult]:
|
||||||
|
) -> RunManyReturn:
|
||||||
"""
|
"""
|
||||||
Batch (non-streaming) mode:
|
Batch (non-streaming) mode:
|
||||||
Processes one BFS level at a time, then yields all the results.
|
Processes one BFS level at a time, then yields all the results.
|
||||||
@ -191,7 +192,8 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
|
|||||||
start_url: str,
|
start_url: str,
|
||||||
crawler: AsyncWebCrawler,
|
crawler: AsyncWebCrawler,
|
||||||
config: CrawlerRunConfig,
|
config: CrawlerRunConfig,
|
||||||
) -> AsyncGenerator[CrawlResult, None]:
|
# ) -> AsyncGenerator[CrawlResult, None]:
|
||||||
|
) -> RunManyReturn:
|
||||||
"""
|
"""
|
||||||
Streaming mode:
|
Streaming mode:
|
||||||
Processes one BFS level at a time and yields results immediately as they arrive.
|
Processes one BFS level at a time and yields results immediately as they arrive.
|
||||||
|
@ -3,7 +3,7 @@ from typing import AsyncGenerator, Optional, Set, Dict, List, Tuple
|
|||||||
|
|
||||||
from ..models import CrawlResult
|
from ..models import CrawlResult
|
||||||
from .bfs_strategy import BFSDeepCrawlStrategy # noqa
|
from .bfs_strategy import BFSDeepCrawlStrategy # noqa
|
||||||
from ..types import AsyncWebCrawler, CrawlerRunConfig
|
from ..types import AsyncWebCrawler, CrawlerRunConfig, RunManyReturn
|
||||||
|
|
||||||
class DFSDeepCrawlStrategy(BFSDeepCrawlStrategy):
|
class DFSDeepCrawlStrategy(BFSDeepCrawlStrategy):
|
||||||
"""
|
"""
|
||||||
@ -17,7 +17,8 @@ class DFSDeepCrawlStrategy(BFSDeepCrawlStrategy):
|
|||||||
start_url: str,
|
start_url: str,
|
||||||
crawler: AsyncWebCrawler,
|
crawler: AsyncWebCrawler,
|
||||||
config: CrawlerRunConfig,
|
config: CrawlerRunConfig,
|
||||||
) -> List[CrawlResult]:
|
# ) -> List[CrawlResult]:
|
||||||
|
) -> RunManyReturn:
|
||||||
"""
|
"""
|
||||||
Batch (non-streaming) DFS mode.
|
Batch (non-streaming) DFS mode.
|
||||||
Uses a stack to traverse URLs in DFS order, aggregating CrawlResults into a list.
|
Uses a stack to traverse URLs in DFS order, aggregating CrawlResults into a list.
|
||||||
@ -65,7 +66,8 @@ class DFSDeepCrawlStrategy(BFSDeepCrawlStrategy):
|
|||||||
start_url: str,
|
start_url: str,
|
||||||
crawler: AsyncWebCrawler,
|
crawler: AsyncWebCrawler,
|
||||||
config: CrawlerRunConfig,
|
config: CrawlerRunConfig,
|
||||||
) -> AsyncGenerator[CrawlResult, None]:
|
# ) -> AsyncGenerator[CrawlResult, None]:
|
||||||
|
) -> RunManyReturn:
|
||||||
"""
|
"""
|
||||||
Streaming DFS mode.
|
Streaming DFS mode.
|
||||||
Uses a stack to traverse URLs in DFS order and yields CrawlResults as they become available.
|
Uses a stack to traverse URLs in DFS order and yields CrawlResults as they become available.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user