feat(browser): add standalone CDP browser launch and lxml extraction strategy

Add new features to enhance browser automation and HTML extraction:
- Add CDP browser launch capability with customizable ports and profiles
- Implement JsonLxmlExtractionStrategy for faster HTML parsing
- Add CLI command 'crwl cdp' for launching standalone CDP browsers
- Support connecting to external CDP browsers via URL
- Optimize selector caching and context-sensitive queries

BREAKING CHANGE: LLMConfig import path changed from crawl4ai.types to crawl4ai
This commit is contained in:
UncleCode 2025-03-07 20:55:56 +08:00
parent f78c46446b
commit a68cbb232b
22 changed files with 745 additions and 29 deletions

View File

@ -23,6 +23,7 @@ from .extraction_strategy import (
CosineStrategy,
JsonCssExtractionStrategy,
JsonXPathExtractionStrategy,
JsonLxmlExtractionStrategy
)
from .chunking_strategy import ChunkingStrategy, RegexChunking
from .markdown_generation_strategy import DefaultMarkdownGenerator
@ -103,6 +104,7 @@ __all__ = [
"CosineStrategy",
"JsonCssExtractionStrategy",
"JsonXPathExtractionStrategy",
"JsonLxmlExtractionStrategy",
"ChunkingStrategy",
"RegexChunking",
"DefaultMarkdownGenerator",

View File

@ -434,8 +434,9 @@ class BrowserManager:
self.playwright = await async_playwright().start()
if self.config.use_managed_browser:
cdp_url = await self.managed_browser.start()
if self.config.cdp_url or self.config.use_managed_browser:
self.config.use_managed_browser = True
cdp_url = await self.managed_browser.start() if not self.config.cdp_url else self.config.cdp_url
self.browser = await self.playwright.chromium.connect_over_cdp(cdp_url)
contexts = self.browser.contexts
if contexts:
@ -790,7 +791,10 @@ class BrowserManager:
# If using a managed browser, just grab the shared default_context
if self.config.use_managed_browser:
context = self.default_context
page = await context.new_page()
pages = context.pages
page = next((p for p in pages if p.url == crawlerRunConfig.url), None)
if not page:
page = await context.new_page()
else:
# Otherwise, check if we have an existing context for this config
config_signature = self._make_config_signature(crawlerRunConfig)
@ -840,6 +844,9 @@ class BrowserManager:
async def close(self):
"""Close all browser resources and clean up."""
if self.config.cdp_url:
return
if self.config.sleep_on_close:
await asyncio.sleep(0.5)

View File

@ -342,7 +342,11 @@ class BrowserProfiler:
# Check if path exists and is a valid profile
if not os.path.isdir(profile_path):
return None
# Chrck if profile_name itself is full path
if os.path.isabs(profile_name):
profile_path = profile_name
else:
return None
# Look for profile indicators
is_profile = (
@ -541,4 +545,225 @@ class BrowserProfiler:
break
else:
self.logger.error(f"Invalid choice. Please enter a number between 1 and {exit_option}.", tag="MENU")
self.logger.error(f"Invalid choice. Please enter a number between 1 and {exit_option}.", tag="MENU")
async def launch_standalone_browser(self,
browser_type: str = "chromium",
user_data_dir: Optional[str] = None,
debugging_port: int = 9222,
headless: bool = False) -> Optional[str]:
"""
Launch a standalone browser with CDP debugging enabled and keep it running
until the user presses 'q'. Returns and displays the CDP URL.
Args:
browser_type (str): Type of browser to launch ('chromium' or 'firefox')
user_data_dir (str, optional): Path to user profile directory
debugging_port (int): Port to use for CDP debugging
headless (bool): Whether to run in headless mode
Returns:
str: CDP URL for the browser, or None if launch failed
Example:
```python
profiler = BrowserProfiler()
cdp_url = await profiler.launch_standalone_browser(
user_data_dir="/path/to/profile",
debugging_port=9222
)
# Use cdp_url to connect to the browser
```
"""
# Use the provided directory if specified, otherwise create a temporary directory
if user_data_dir:
# Directory is provided directly, ensure it exists
profile_path = user_data_dir
os.makedirs(profile_path, exist_ok=True)
else:
# Create a temporary profile directory
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
profile_name = f"temp_{timestamp}_{uuid.uuid4().hex[:6]}"
profile_path = os.path.join(self.profiles_dir, profile_name)
os.makedirs(profile_path, exist_ok=True)
# Print initial information
border = f"{Fore.CYAN}{'='*80}{Style.RESET_ALL}"
self.logger.info(f"\n{border}", tag="CDP")
self.logger.info(f"Launching standalone browser with CDP debugging", tag="CDP")
self.logger.info(f"Browser type: {Fore.GREEN}{browser_type}{Style.RESET_ALL}", tag="CDP")
self.logger.info(f"Profile path: {Fore.YELLOW}{profile_path}{Style.RESET_ALL}", tag="CDP")
self.logger.info(f"Debugging port: {Fore.CYAN}{debugging_port}{Style.RESET_ALL}", tag="CDP")
self.logger.info(f"Headless mode: {Fore.CYAN}{headless}{Style.RESET_ALL}", tag="CDP")
# Create managed browser instance
managed_browser = ManagedBrowser(
browser_type=browser_type,
user_data_dir=profile_path,
headless=headless,
logger=self.logger,
debugging_port=debugging_port
)
# Set up signal handlers to ensure cleanup on interrupt
original_sigint = signal.getsignal(signal.SIGINT)
original_sigterm = signal.getsignal(signal.SIGTERM)
# Define cleanup handler for signals
async def cleanup_handler(sig, frame):
self.logger.warning("\nCleaning up browser process...", tag="CDP")
await managed_browser.cleanup()
# Restore original signal handlers
signal.signal(signal.SIGINT, original_sigint)
signal.signal(signal.SIGTERM, original_sigterm)
if sig == signal.SIGINT:
self.logger.error("Browser terminated by user.", tag="CDP")
sys.exit(1)
# Set signal handlers
def sigint_handler(sig, frame):
asyncio.create_task(cleanup_handler(sig, frame))
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
# Event to signal when user wants to exit
user_done_event = asyncio.Event()
# Run keyboard input loop in a separate task
async def listen_for_quit_command():
import termios
import tty
import select
# First output the prompt
self.logger.info(f"{Fore.CYAN}Press '{Fore.WHITE}q{Fore.CYAN}' to stop the browser and exit...{Style.RESET_ALL}", tag="CDP")
# Save original terminal settings
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
# Switch to non-canonical mode (no line buffering)
tty.setcbreak(fd)
while True:
# Check if input is available (non-blocking)
readable, _, _ = select.select([sys.stdin], [], [], 0.5)
if readable:
key = sys.stdin.read(1)
if key.lower() == 'q':
self.logger.info(f"{Fore.GREEN}Closing browser...{Style.RESET_ALL}", tag="CDP")
user_done_event.set()
return
# Check if the browser process has already exited
if managed_browser.browser_process and managed_browser.browser_process.poll() is not None:
self.logger.info("Browser already closed. Ending input listener.", tag="CDP")
user_done_event.set()
return
await asyncio.sleep(0.1)
finally:
# Restore terminal settings
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
# Function to retrieve and display CDP JSON config
async def get_cdp_json(port):
import aiohttp
cdp_url = f"http://localhost:{port}"
json_url = f"{cdp_url}/json/version"
try:
async with aiohttp.ClientSession() as session:
# Try multiple times in case the browser is still starting up
for _ in range(10):
try:
async with session.get(json_url) as response:
if response.status == 200:
data = await response.json()
return cdp_url, data
except Exception:
pass
await asyncio.sleep(0.5)
return cdp_url, None
except Exception as e:
self.logger.error(f"Error fetching CDP JSON: {str(e)}", tag="CDP")
return cdp_url, None
cdp_url = None
config_json = None
try:
# Start the browser
await managed_browser.start()
# Check if browser started successfully
browser_process = managed_browser.browser_process
if not browser_process:
self.logger.error("Failed to start browser process.", tag="CDP")
return None
self.logger.info(f"Browser launched successfully. Retrieving CDP information...", tag="CDP")
# Get CDP URL and JSON config
cdp_url, config_json = await get_cdp_json(debugging_port)
if cdp_url:
self.logger.success(f"CDP URL: {Fore.GREEN}{cdp_url}{Style.RESET_ALL}", tag="CDP")
if config_json:
# Display relevant CDP information
self.logger.info(f"Browser: {Fore.CYAN}{config_json.get('Browser', 'Unknown')}{Style.RESET_ALL}", tag="CDP")
self.logger.info(f"Protocol Version: {config_json.get('Protocol-Version', 'Unknown')}", tag="CDP")
if 'webSocketDebuggerUrl' in config_json:
self.logger.info(f"WebSocket URL: {Fore.GREEN}{config_json['webSocketDebuggerUrl']}{Style.RESET_ALL}", tag="CDP")
else:
self.logger.warning("Could not retrieve CDP configuration JSON", tag="CDP")
else:
self.logger.error(f"Failed to get CDP URL on port {debugging_port}", tag="CDP")
await managed_browser.cleanup()
return None
# Start listening for keyboard input
listener_task = asyncio.create_task(listen_for_quit_command())
# Wait for the user to press 'q' or for the browser process to exit naturally
while not user_done_event.is_set() and browser_process.poll() is None:
await asyncio.sleep(0.5)
# Cancel the listener task if it's still running
if not listener_task.done():
listener_task.cancel()
try:
await listener_task
except asyncio.CancelledError:
pass
# If the browser is still running and the user pressed 'q', terminate it
if browser_process.poll() is None and user_done_event.is_set():
self.logger.info("Terminating browser process...", tag="CDP")
await managed_browser.cleanup()
self.logger.success(f"Browser closed.", tag="CDP")
except Exception as e:
self.logger.error(f"Error launching standalone browser: {str(e)}", tag="CDP")
await managed_browser.cleanup()
return None
finally:
# Restore original signal handlers
signal.signal(signal.SIGINT, original_sigint)
signal.signal(signal.SIGTERM, original_sigterm)
# Make sure browser is fully cleaned up
await managed_browser.cleanup()
# Return the CDP URL
return cdp_url

View File

@ -1,5 +1,6 @@
import click
import os
import sys
import time
import humanize
@ -198,7 +199,24 @@ def show_examples():
# 2. Then use that profile to crawl the authenticated site:
crwl https://site-requiring-login.com/dashboard -p my-profile-name
5 Sample Config Files:
5 CDP Mode for Browser Automation:
# Launch browser with CDP debugging on default port 9222
crwl cdp
# Use a specific profile and custom port
crwl cdp -p my-profile -P 9223
# Launch headless browser with CDP enabled
crwl cdp --headless
# Launch in incognito mode (ignores profile)
crwl cdp --incognito
# Use the CDP URL with other tools (Puppeteer, Playwright, etc.)
# The URL will be displayed in the terminal when the browser starts
6 Sample Config Files:
browser.yml:
headless: true
@ -256,7 +274,7 @@ llm_schema.json:
}
}
6 Advanced Usage:
7 Advanced Usage:
# Combine configs with direct parameters
crwl https://example.com -B browser.yml -b "headless=false,viewport_width=1920"
@ -282,7 +300,7 @@ llm_schema.json:
For more documentation visit: https://github.com/unclecode/crawl4ai
7 Q&A with LLM:
8 Q&A with LLM:
# Ask a question about the content
crwl https://example.com -q "What is the main topic discussed?"
@ -310,7 +328,7 @@ For more documentation visit: https://github.com/unclecode/crawl4ai
See full list of providers: https://docs.litellm.ai/docs/providers
8 Profile Management:
9 Profile Management:
# Launch interactive profile manager
crwl profiles
@ -549,11 +567,89 @@ async def manage_profiles():
# Add a separator between operations
console.print("\n")
@click.group(context_settings={"help_option_names": ["-h", "--help"]})
def cli():
"""Crawl4AI CLI - Web content extraction and browser profile management tool"""
pass
@cli.command("cdp")
@click.option("--user-data-dir", "-d", help="Directory to use for browser data (will be created if it doesn't exist)")
@click.option("--port", "-P", type=int, default=9222, help="Debugging port (default: 9222)")
@click.option("--browser-type", "-b", type=click.Choice(["chromium", "firefox"]), default="chromium",
help="Browser type (default: chromium)")
@click.option("--headless", is_flag=True, help="Run browser in headless mode")
@click.option("--incognito", is_flag=True, help="Run in incognito/private mode (ignores user-data-dir)")
def cdp_cmd(user_data_dir: Optional[str], port: int, browser_type: str, headless: bool, incognito: bool):
"""Launch a standalone browser with CDP debugging enabled
This command launches a browser with Chrome DevTools Protocol (CDP) debugging enabled,
prints the CDP URL, and keeps the browser running until you press 'q'.
The CDP URL can be used for various automation and debugging tasks.
Examples:
# Launch Chromium with CDP on default port 9222
crwl cdp
# Use a specific directory for browser data and custom port
crwl cdp --user-data-dir ~/browser-data --port 9223
# Launch in headless mode
crwl cdp --headless
# Launch in incognito mode (ignores user-data-dir)
crwl cdp --incognito
"""
profiler = BrowserProfiler()
try:
# Handle data directory
data_dir = None
if not incognito and user_data_dir:
# Expand user path (~/something)
expanded_path = os.path.expanduser(user_data_dir)
# Create directory if it doesn't exist
if not os.path.exists(expanded_path):
console.print(f"[yellow]Directory '{expanded_path}' doesn't exist. Creating it.[/yellow]")
os.makedirs(expanded_path, exist_ok=True)
data_dir = expanded_path
# Print launch info
console.print(Panel(
f"[cyan]Launching browser with CDP debugging[/cyan]\n\n"
f"Browser type: [green]{browser_type}[/green]\n"
f"Debugging port: [yellow]{port}[/yellow]\n"
f"User data directory: [cyan]{data_dir or 'Temporary directory'}[/cyan]\n"
f"Headless: [cyan]{'Yes' if headless else 'No'}[/cyan]\n"
f"Incognito: [cyan]{'Yes' if incognito else 'No'}[/cyan]\n\n"
f"[yellow]Press 'q' to quit when done[/yellow]",
title="CDP Browser",
border_style="cyan"
))
# Run the browser
cdp_url = anyio.run(
profiler.launch_standalone_browser,
browser_type,
data_dir,
port,
headless
)
if not cdp_url:
console.print("[red]Failed to launch browser or get CDP URL[/red]")
sys.exit(1)
except Exception as e:
console.print(f"[red]Error launching CDP browser: {str(e)}[/red]")
sys.exit(1)
@cli.command("crawl")
@click.argument("url", required=True)
@click.option("--browser-config", "-B", type=click.Path(exists=True), help="Browser config file (YAML/JSON)")
@ -737,6 +833,7 @@ def default(url: str, example: bool, browser_config: str, crawler_config: str, f
Other commands:
crwl profiles - Manage browser profiles for identity-based crawling
crwl crawl - Crawl a website with advanced options
crwl cdp - Launch browser with CDP debugging enabled
crwl examples - Show more usage examples
"""

View File

@ -1168,7 +1168,8 @@ class JsonCssExtractionStrategy(JsonElementExtractionStrategy):
super().__init__(schema, **kwargs)
def _parse_html(self, html_content: str):
return BeautifulSoup(html_content, "html.parser")
# return BeautifulSoup(html_content, "html.parser")
return BeautifulSoup(html_content, "lxml")
def _get_base_elements(self, parsed_html, selector: str):
return parsed_html.select(selector)
@ -1187,6 +1188,373 @@ class JsonCssExtractionStrategy(JsonElementExtractionStrategy):
def _get_element_attribute(self, element, attribute: str):
return element.get(attribute)
class JsonLxmlExtractionStrategy(JsonElementExtractionStrategy):
def __init__(self, schema: Dict[str, Any], **kwargs):
kwargs["input_format"] = "html"
super().__init__(schema, **kwargs)
self._selector_cache = {}
self._xpath_cache = {}
self._result_cache = {}
# Control selector optimization strategy
self.use_caching = kwargs.get("use_caching", True)
self.optimize_common_patterns = kwargs.get("optimize_common_patterns", True)
# Load lxml dependencies once
from lxml import etree, html
from lxml.cssselect import CSSSelector
self.etree = etree
self.html_parser = html
self.CSSSelector = CSSSelector
def _parse_html(self, html_content: str):
"""Parse HTML content with error recovery"""
try:
parser = self.etree.HTMLParser(recover=True, remove_blank_text=True)
return self.etree.fromstring(html_content, parser)
except Exception as e:
if self.verbose:
print(f"Error parsing HTML, falling back to alternative method: {e}")
try:
return self.html_parser.fromstring(html_content)
except Exception as e2:
if self.verbose:
print(f"Critical error parsing HTML: {e2}")
# Create minimal document as fallback
return self.etree.Element("html")
def _optimize_selector(self, selector_str):
"""Optimize common selector patterns for better performance"""
if not self.optimize_common_patterns:
return selector_str
# Handle td:nth-child(N) pattern which is very common in table scraping
import re
if re.search(r'td:nth-child\(\d+\)', selector_str):
return selector_str # Already handled specially in _apply_selector
# Split complex selectors into parts for optimization
parts = selector_str.split()
if len(parts) <= 1:
return selector_str
# For very long selectors, consider using just the last specific part
if len(parts) > 3 and any(p.startswith('.') or p.startswith('#') for p in parts):
specific_parts = [p for p in parts if p.startswith('.') or p.startswith('#')]
if specific_parts:
return specific_parts[-1] # Use most specific class/id selector
return selector_str
def _create_selector_function(self, selector_str):
"""Create a selector function that handles all edge cases"""
original_selector = selector_str
# Try to optimize the selector if appropriate
if self.optimize_common_patterns:
selector_str = self._optimize_selector(selector_str)
try:
# Attempt to compile the CSS selector
compiled = self.CSSSelector(selector_str)
xpath = compiled.path
# Store XPath for later use
self._xpath_cache[selector_str] = xpath
# Create the wrapper function that implements the selection strategy
def selector_func(element, context_sensitive=True):
cache_key = None
# Use result caching if enabled
if self.use_caching:
# Create a cache key based on element and selector
element_id = element.get('id', '') or str(hash(element))
cache_key = f"{element_id}::{selector_str}"
if cache_key in self._result_cache:
return self._result_cache[cache_key]
results = []
try:
# Strategy 1: Direct CSS selector application (fastest)
results = compiled(element)
# If that fails and we need context sensitivity
if not results and context_sensitive:
# Strategy 2: Try XPath with context adjustment
context_xpath = self._make_context_sensitive_xpath(xpath, element)
if context_xpath:
results = element.xpath(context_xpath)
# Strategy 3: Handle special case - nth-child
if not results and 'nth-child' in original_selector:
results = self._handle_nth_child_selector(element, original_selector)
# Strategy 4: Direct descendant search for class/ID selectors
if not results:
results = self._fallback_class_id_search(element, original_selector)
# Strategy 5: Last resort - tag name search for the final part
if not results:
parts = original_selector.split()
if parts:
last_part = parts[-1]
# Extract tag name from the selector
tag_match = re.match(r'^(\w+)', last_part)
if tag_match:
tag_name = tag_match.group(1)
results = element.xpath(f".//{tag_name}")
# Cache results if caching is enabled
if self.use_caching and cache_key:
self._result_cache[cache_key] = results
except Exception as e:
if self.verbose:
print(f"Error applying selector '{selector_str}': {e}")
return results
return selector_func
except Exception as e:
if self.verbose:
print(f"Error compiling selector '{selector_str}': {e}")
# Fallback function for invalid selectors
return lambda element, context_sensitive=True: []
def _make_context_sensitive_xpath(self, xpath, element):
"""Convert absolute XPath to context-sensitive XPath"""
try:
# If starts with descendant-or-self, it's already context-sensitive
if xpath.startswith('descendant-or-self::'):
return xpath
# Remove leading slash if present
if xpath.startswith('/'):
context_xpath = f".{xpath}"
else:
context_xpath = f".//{xpath}"
# Validate the XPath by trying it
try:
element.xpath(context_xpath)
return context_xpath
except:
# If that fails, try a simpler descendant search
return f".//{xpath.split('/')[-1]}"
except:
return None
def _handle_nth_child_selector(self, element, selector_str):
"""Special handling for nth-child selectors in tables"""
import re
results = []
try:
# Extract the column number from td:nth-child(N)
match = re.search(r'td:nth-child\((\d+)\)', selector_str)
if match:
col_num = match.group(1)
# Check if there's content after the nth-child part
remaining_selector = selector_str.split(f"td:nth-child({col_num})", 1)[-1].strip()
if remaining_selector:
# If there's a specific element we're looking for after the column
# Extract any tag names from the remaining selector
tag_match = re.search(r'(\w+)', remaining_selector)
tag_name = tag_match.group(1) if tag_match else '*'
results = element.xpath(f".//td[{col_num}]//{tag_name}")
else:
# Just get the column cell
results = element.xpath(f".//td[{col_num}]")
except Exception as e:
if self.verbose:
print(f"Error handling nth-child selector: {e}")
return results
def _fallback_class_id_search(self, element, selector_str):
"""Fallback to search by class or ID"""
results = []
try:
# Extract class selectors (.classname)
import re
class_matches = re.findall(r'\.([a-zA-Z0-9_-]+)', selector_str)
# Extract ID selectors (#idname)
id_matches = re.findall(r'#([a-zA-Z0-9_-]+)', selector_str)
# Try each class
for class_name in class_matches:
class_results = element.xpath(f".//*[contains(@class, '{class_name}')]")
results.extend(class_results)
# Try each ID (usually more specific)
for id_name in id_matches:
id_results = element.xpath(f".//*[@id='{id_name}']")
results.extend(id_results)
except Exception as e:
if self.verbose:
print(f"Error in fallback class/id search: {e}")
return results
def _get_selector(self, selector_str):
"""Get or create a selector function with caching"""
if selector_str not in self._selector_cache:
self._selector_cache[selector_str] = self._create_selector_function(selector_str)
return self._selector_cache[selector_str]
def _get_base_elements(self, parsed_html, selector: str):
"""Get all base elements using the selector"""
selector_func = self._get_selector(selector)
# For base elements, we don't need context sensitivity
return selector_func(parsed_html, context_sensitive=False)
def _get_elements(self, element, selector: str):
"""Get child elements using the selector with context sensitivity"""
selector_func = self._get_selector(selector)
return selector_func(element, context_sensitive=True)
def _get_element_text(self, element) -> str:
"""Extract normalized text from element"""
try:
# Get all text nodes and normalize
text = " ".join(t.strip() for t in element.xpath(".//text()") if t.strip())
return text
except Exception as e:
if self.verbose:
print(f"Error extracting text: {e}")
# Fallback
try:
return element.text_content().strip()
except:
return ""
def _get_element_html(self, element) -> str:
"""Get HTML string representation of element"""
try:
return self.etree.tostring(element, encoding='unicode', method='html')
except Exception as e:
if self.verbose:
print(f"Error serializing HTML: {e}")
return ""
def _get_element_attribute(self, element, attribute: str):
"""Get attribute value safely"""
try:
return element.get(attribute)
except Exception as e:
if self.verbose:
print(f"Error getting attribute '{attribute}': {e}")
return None
def _clear_caches(self):
"""Clear caches to free memory"""
if self.use_caching:
self._result_cache.clear()
class JsonLxmlExtractionStrategy_naive(JsonElementExtractionStrategy):
def __init__(self, schema: Dict[str, Any], **kwargs):
kwargs["input_format"] = "html" # Force HTML input
super().__init__(schema, **kwargs)
self._selector_cache = {}
def _parse_html(self, html_content: str):
from lxml import etree
parser = etree.HTMLParser(recover=True)
return etree.fromstring(html_content, parser)
def _get_selector(self, selector_str):
"""Get a selector function that works within the context of an element"""
if selector_str not in self._selector_cache:
from lxml.cssselect import CSSSelector
try:
# Store both the compiled selector and its xpath translation
compiled = CSSSelector(selector_str)
# Create a function that will apply this selector appropriately
def select_func(element):
try:
# First attempt: direct CSS selector application
results = compiled(element)
if results:
return results
# Second attempt: contextual XPath selection
# Convert the root-based XPath to a context-based XPath
xpath = compiled.path
# If the XPath already starts with descendant-or-self, handle it specially
if xpath.startswith('descendant-or-self::'):
context_xpath = xpath
else:
# For normal XPath expressions, make them relative to current context
context_xpath = f"./{xpath.lstrip('/')}"
results = element.xpath(context_xpath)
if results:
return results
# Final fallback: simple descendant search for common patterns
if 'nth-child' in selector_str:
# Handle td:nth-child(N) pattern
import re
match = re.search(r'td:nth-child\((\d+)\)', selector_str)
if match:
col_num = match.group(1)
sub_selector = selector_str.split(')', 1)[-1].strip()
if sub_selector:
return element.xpath(f".//td[{col_num}]//{sub_selector}")
else:
return element.xpath(f".//td[{col_num}]")
# Last resort: try each part of the selector separately
parts = selector_str.split()
if len(parts) > 1 and parts[-1]:
return element.xpath(f".//{parts[-1]}")
return []
except Exception as e:
if self.verbose:
print(f"Error applying selector '{selector_str}': {e}")
return []
self._selector_cache[selector_str] = select_func
except Exception as e:
if self.verbose:
print(f"Error compiling selector '{selector_str}': {e}")
# Fallback function for invalid selectors
def fallback_func(element):
return []
self._selector_cache[selector_str] = fallback_func
return self._selector_cache[selector_str]
def _get_base_elements(self, parsed_html, selector: str):
selector_func = self._get_selector(selector)
return selector_func(parsed_html)
def _get_elements(self, element, selector: str):
selector_func = self._get_selector(selector)
return selector_func(element)
def _get_element_text(self, element) -> str:
return "".join(element.xpath(".//text()")).strip()
def _get_element_html(self, element) -> str:
from lxml import etree
return etree.tostring(element, encoding='unicode')
def _get_element_attribute(self, element, attribute: str):
return element.get(attribute)
class JsonXPathExtractionStrategy(JsonElementExtractionStrategy):
"""

View File

@ -11,7 +11,7 @@ import asyncio
import os
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
from crawl4ai.types import LLMConfig
from crawl4ai import LLMConfig
from crawl4ai.extraction_strategy import (
LLMExtractionStrategy,
JsonCssExtractionStrategy,

View File

@ -1,4 +1,4 @@
from crawl4ai.types import LLMConfig
from crawl4ai import LLMConfig
from crawl4ai import AsyncWebCrawler, LLMExtractionStrategy
import asyncio
import os

View File

@ -1,7 +1,7 @@
import os
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
from crawl4ai.types import LLMConfig
from crawl4ai import LLMConfig
from crawl4ai.content_filter_strategy import LLMContentFilter
async def test_llm_filter():

View File

@ -1,6 +1,6 @@
import os, sys
from crawl4ai.types import LLMConfig
from crawl4ai import LLMConfig
sys.path.append(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

View File

@ -1,6 +1,6 @@
import os, sys
from crawl4ai.types import LLMConfig
from crawl4ai import LLMConfig
# append parent directory to system path
sys.path.append(

View File

@ -1,6 +1,6 @@
import os
import time
from crawl4ai.types import LLMConfig
from crawl4ai import LLMConfig
from crawl4ai.web_crawler import WebCrawler
from crawl4ai.chunking_strategy import *
from crawl4ai.extraction_strategy import *

View File

@ -17,7 +17,7 @@ from crawl4ai.configs import ProxyConfig
from crawl4ai import RoundRobinProxyStrategy
from crawl4ai.content_filter_strategy import LLMContentFilter
from crawl4ai import DefaultMarkdownGenerator
from crawl4ai.types import LLMConfig
from crawl4ai import LLMConfig
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy
from crawl4ai.processors.pdf import PDFCrawlerStrategy, PDFContentScrapingStrategy
from pprint import pprint

View File

@ -131,7 +131,7 @@ OverlappingWindowChunking(
```python
from pydantic import BaseModel
from crawl4ai.extraction_strategy import LLMExtractionStrategy
from crawl4ai.types import LLMConfig
from crawl4ai import LLMConfig
# Define schema
class Article(BaseModel):
@ -198,7 +198,7 @@ result = await crawler.arun(
```python
from crawl4ai.chunking_strategy import OverlappingWindowChunking
from crawl4ai.types import LLMConfig
from crawl4ai import LLMConfig
# Create chunking strategy
chunker = OverlappingWindowChunking(

View File

@ -305,7 +305,7 @@ asyncio.run(main())
```python
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, DefaultMarkdownGenerator
from crawl4ai.content_filter_strategy import LLMContentFilter
from crawl4ai.types import LLMConfig
from crawl4ai import LLMConfig
import asyncio
llm_config = LLMConfig(provider="gemini/gemini-1.5-pro", api_token="env:GEMINI_API_KEY")
@ -335,7 +335,7 @@ asyncio.run(main())
```python
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy
from crawl4ai.types import LLMConfig
from crawl4ai import LLMConfig
llm_config = LLMConfig(provider="gemini/gemini-1.5-pro", api_token="env:GEMINI_API_KEY")
@ -401,7 +401,7 @@ print(schema)
experimentation between different LLM configurations.
```python
from crawl4ai.types import LLMConfig
from crawl4ai import LLMConfig
from crawl4ai.extraction_strategy import LLMExtractionStrategy
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig

View File

@ -128,7 +128,7 @@ Crawl4AI can also extract structured data (JSON) using CSS or XPath selectors. B
```python
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy
from crawl4ai.types import LLMConfig
from crawl4ai import LLMConfig
# Generate a schema (one-time cost)
html = "<div class='product'><h2>Gaming Laptop</h2><span class='price'>$999.99</span></div>"

View File

@ -415,7 +415,7 @@ The schema generator is available as a static method on both `JsonCssExtractionS
```python
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy, JsonXPathExtractionStrategy
from crawl4ai.types import LLMConfig
from crawl4ai import LLMConfig
# Sample HTML with product information
html = """

View File

@ -1,7 +1,7 @@
import os
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
from crawl4ai.types import LLMConfig
from crawl4ai import LLMConfig
from crawl4ai.content_filter_strategy import LLMContentFilter
async def test_llm_filter():

View File

@ -7,7 +7,7 @@ import json
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
from crawl4ai.types import LLMConfig
from crawl4ai import LLMConfig
from crawl4ai.async_webcrawler import AsyncWebCrawler
from crawl4ai.chunking_strategy import RegexChunking
from crawl4ai.extraction_strategy import LLMExtractionStrategy

View File

@ -0,0 +1,17 @@
from crawl4ai.browser_profiler import BrowserProfiler
import asyncio
if __name__ == "__main__":
# Test launching a standalone browser
async def test_standalone_browser():
profiler = BrowserProfiler()
cdp_url = await profiler.launch_standalone_browser(
browser_type="chromium",
user_data_dir="~/.crawl4ai/browser_profile/test-browser-data",
debugging_port=9222,
headless=False
)
print(f"CDP URL: {cdp_url}")
asyncio.run(test_standalone_browser())

View File

@ -7,7 +7,7 @@ from crawl4ai import (
BrowserConfig, CrawlerRunConfig, DefaultMarkdownGenerator,
PruningContentFilter, JsonCssExtractionStrategy, LLMContentFilter, CacheMode
)
from crawl4ai.types import LLMConfig
from crawl4ai import LLMConfig
from crawl4ai.docker_client import Crawl4aiDockerClient
class Crawl4AiTester:

View File

@ -2,7 +2,7 @@ import inspect
from typing import Any, Dict
from enum import Enum
from crawl4ai.types import LLMConfig
from crawl4ai import LLMConfig
def to_serializable_dict(obj: Any) -> Dict:
"""

View File

@ -1,5 +1,5 @@
import unittest, os
from crawl4ai.types import LLMConfig
from crawl4ai import LLMConfig
from crawl4ai.web_crawler import WebCrawler
from crawl4ai.chunking_strategy import (
RegexChunking,