Better context manager and cleanup of old browser instances

This commit is contained in:
Jake Poznanski 2025-09-21 23:38:09 +00:00
parent 7e786c79c5
commit 780bc7d934

View File

@ -19,6 +19,7 @@ import re
import sqlite3
import threading
import unittest
from contextlib import contextmanager
from dataclasses import dataclass
from typing import List, Optional
@ -125,10 +126,6 @@ equation_cache = EquationCache()
# --- End SQLite Cache Implementation ---
# Thread-local storage for Playwright and browser instances
_thread_local = threading.local()
@dataclass
class BoundingBox:
x: float
@ -158,21 +155,63 @@ def get_equation_hash(equation, bg_color="white", text_color="black", font_size=
return hashlib.sha1(params_str.encode("utf-8")).hexdigest()
def init_browser():
# Thread-local storage for browser contexts
_thread_local = threading.local()
@contextmanager
def browser_context():
"""
Initialize the Playwright and browser instance for the current thread if not already done.
Context manager for Playwright browser instances.
Returns a browser context that can be used for rendering equations.
Automatically handles initialization and cleanup.
Cleans up and recreates the browser every 100 uses to prevent memory leaks.
"""
# Initialize usage counter if not present
if not hasattr(_thread_local, "usage_count"):
_thread_local.usage_count = 0
# Check if we already have a browser for this thread
if not hasattr(_thread_local, "playwright"):
_thread_local.playwright = sync_playwright().start()
_thread_local.browser = _thread_local.playwright.chromium.launch()
_thread_local.usage_count = 0
# Increment usage counter
_thread_local.usage_count += 1
def get_browser():
"""
Return the browser instance for the current thread.
"""
init_browser()
return _thread_local.browser
# Check if we need to clean up the browser (every 100 uses)
if _thread_local.usage_count > 100:
# Clean up the old browser completely
print("Cleanup up old playwright instance to prevent memory leaks...")
try:
_thread_local.browser.close()
except Exception:
pass # Ignore errors during cleanup
try:
_thread_local.playwright.stop()
except Exception:
pass # Ignore errors during cleanup
# Remove the attributes to force re-initialization
delattr(_thread_local, "playwright")
delattr(_thread_local, "browser")
# Re-initialize with a fresh browser
_thread_local.playwright = sync_playwright().start()
_thread_local.browser = _thread_local.playwright.chromium.launch()
_thread_local.usage_count = 1 # Reset to 1 since we just used it
# Create a new context for this operation
context = _thread_local.browser.new_context(viewport={"width": 800, "height": 400})
try:
yield context
finally:
# Clean up the context after use
context.close()
def render_equation(
@ -207,155 +246,154 @@ def render_equation(
if not os.path.exists(katex_css_path) or not os.path.exists(katex_js_path):
raise FileNotFoundError(f"KaTeX files not found. Please ensure katex.min.css and katex.min.js are in {script_dir}")
# Get the browser instance for the current thread.
browser = get_browser()
# Use the browser context manager
with browser_context() as context:
# Create a new page.
page = context.new_page()
# Create a new page.
page = browser.new_page(viewport={"width": 800, "height": 400})
# Basic HTML structure for rendering.
page_html = f"""
<!DOCTYPE html>
<html>
<head>
<style>
body {{
display: flex;
justify-content: center;
align-items: center;
height: 100vh;
margin: 0;
background-color: {bg_color};
color: {text_color};
}}
#equation-container {{
padding: 0;
font-size: {font_size}px;
}}
</style>
</head>
<body>
<div id="equation-container"></div>
</body>
</html>
"""
page.set_content(page_html)
page.add_style_tag(path=katex_css_path)
page.add_script_tag(path=katex_js_path)
page.wait_for_load_state("networkidle")
katex_loaded = page.evaluate("typeof katex !== 'undefined'")
if not katex_loaded:
page.close()
raise RuntimeError("KaTeX library failed to load. Check your katex.min.js file.")
try:
error_message = page.evaluate(
f"""
() => {{
try {{
katex.render({escaped_equation}, document.getElementById("equation-container"), {{
displayMode: true,
throwOnError: true
}});
return null;
}} catch (error) {{
console.error("KaTeX error:", error.message);
return error.message;
}}
}}
# Basic HTML structure for rendering.
page_html = f"""
<!DOCTYPE html>
<html>
<head>
<style>
body {{
display: flex;
justify-content: center;
align-items: center;
height: 100vh;
margin: 0;
background-color: {bg_color};
color: {text_color};
}}
#equation-container {{
padding: 0;
font-size: {font_size}px;
}}
</style>
</head>
<body>
<div id="equation-container"></div>
</body>
</html>
"""
)
except PlaywrightError as ex:
print(escaped_equation)
error_message = str(ex)
page.close()
raise
page.set_content(page_html)
page.add_style_tag(path=katex_css_path)
page.add_script_tag(path=katex_js_path)
page.wait_for_load_state("networkidle")
if error_message:
print(f"Error rendering equation: '{equation}'")
print(error_message)
# Cache the error result so we don't retry it next time.
rendered_eq = RenderedEquation(mathml=error_message, spans=[], error=error_message)
if use_cache:
equation_cache.save(eq_hash, rendered_eq)
page.close()
return rendered_eq
katex_loaded = page.evaluate("typeof katex !== 'undefined'")
if not katex_loaded:
page.close()
raise RuntimeError("KaTeX library failed to load. Check your katex.min.js file.")
page.wait_for_selector(".katex", state="attached")
try:
error_message = page.evaluate(
f"""
() => {{
try {{
katex.render({escaped_equation}, document.getElementById("equation-container"), {{
displayMode: true,
throwOnError: true
}});
return null;
}} catch (error) {{
console.error("KaTeX error:", error.message);
return error.message;
}}
}}
"""
)
except PlaywrightError as ex:
print(escaped_equation)
error_message = str(ex)
page.close()
raise
if debug_dom:
katex_dom_html = page.evaluate(
if error_message:
print(f"Error rendering equation: '{equation}'")
print(error_message)
# Cache the error result so we don't retry it next time.
rendered_eq = RenderedEquation(mathml=error_message, spans=[], error=error_message)
if use_cache:
equation_cache.save(eq_hash, rendered_eq)
page.close()
return rendered_eq
page.wait_for_selector(".katex", state="attached")
if debug_dom:
katex_dom_html = page.evaluate(
"""
() => {
return document.getElementById("equation-container").innerHTML;
}
"""
)
print("\n===== KaTeX DOM HTML =====")
print(katex_dom_html)
# Extract inner-most spans with non-whitespace text.
spans_info = page.evaluate(
"""
() => {
return document.getElementById("equation-container").innerHTML;
const spans = Array.from(document.querySelectorAll('span'));
const list = [];
spans.forEach(span => {
if (span.children.length === 0 && /\\S/.test(span.textContent)) {
const rect = span.getBoundingClientRect();
list.push({
text: span.textContent.trim(),
boundingBox: {
x: rect.x,
y: rect.y,
width: rect.width,
height: rect.height
}
});
}
});
return list;
}
"""
)
print("\n===== KaTeX DOM HTML =====")
print(katex_dom_html)
# Extract inner-most spans with non-whitespace text.
spans_info = page.evaluate(
if debug_dom:
print("\n===== Extracted Span Information =====")
print(spans_info)
# Extract MathML output (if available) from the KaTeX output.
mathml = page.evaluate(
"""
() => {
const mathElem = document.querySelector('.katex-mathml math');
return mathElem ? mathElem.outerHTML : "";
}
"""
() => {
const spans = Array.from(document.querySelectorAll('span'));
const list = [];
spans.forEach(span => {
if (span.children.length === 0 && /\\S/.test(span.textContent)) {
const rect = span.getBoundingClientRect();
list.push({
text: span.textContent.trim(),
boundingBox: {
x: rect.x,
y: rect.y,
width: rect.width,
height: rect.height
}
});
}
});
return list;
}
"""
)
)
if debug_dom:
print("\n===== Extracted Span Information =====")
print(spans_info)
page.close()
# Extract MathML output (if available) from the KaTeX output.
mathml = page.evaluate(
"""
() => {
const mathElem = document.querySelector('.katex-mathml math');
return mathElem ? mathElem.outerHTML : "";
}
"""
)
rendered_eq = RenderedEquation(
mathml=mathml,
spans=[
SpanInfo(
text=s["text"],
bounding_box=BoundingBox(
x=s["boundingBox"]["x"],
y=s["boundingBox"]["y"],
width=s["boundingBox"]["width"],
height=s["boundingBox"]["height"],
),
)
for s in spans_info
],
)
page.close()
rendered_eq = RenderedEquation(
mathml=mathml,
spans=[
SpanInfo(
text=s["text"],
bounding_box=BoundingBox(
x=s["boundingBox"]["x"],
y=s["boundingBox"]["y"],
width=s["boundingBox"]["width"],
height=s["boundingBox"]["height"],
),
)
for s in spans_info
],
)
# Save the successfully rendered equation to the SQLite cache.
if use_cache:
equation_cache.save(eq_hash, rendered_eq)
return rendered_eq
# Save the successfully rendered equation to the SQLite cache.
if use_cache:
equation_cache.save(eq_hash, rendered_eq)
return rendered_eq
def compare_rendered_equations(reference: RenderedEquation, hypothesis: RenderedEquation) -> bool: