Adding url reference for tests, some mining and cleanup scripts

This commit is contained in:
Jake Poznanski 2025-03-18 22:35:44 +00:00
parent 3c22cf3430
commit ad82e5526f
4 changed files with 310 additions and 584 deletions

View File

@ -0,0 +1,220 @@
#!/usr/bin/env python
import argparse
import glob
import json
import os
import sys
from collections import defaultdict
def get_rejected_tests(dataset_jsonl):
"""
Parse dataset.jsonl to identify rejected tests.
Returns:
- rejected_tests: Set of test IDs that were marked as rejected
- pdf_tests: Dict mapping PDF filenames to sets of test IDs
- test_pdf_map: Dict mapping test IDs to their PDF filenames
"""
rejected_tests = set()
pdf_tests = defaultdict(set)
test_pdf_map = {}
try:
with open(dataset_jsonl, "r") as f:
for line in f:
if not line.strip():
continue
try:
test = json.loads(line)
test_id = test.get("id")
pdf_name = test.get("pdf")
# Store the test in our mapping
if test_id and pdf_name:
pdf_tests[pdf_name].add(test_id)
test_pdf_map[test_id] = pdf_name
# Check if the test is marked as rejected
if test.get("checked", None) == "rejected":
rejected_tests.add(test_id)
except json.JSONDecodeError:
print(f"Warning: Could not parse line: {line}")
continue
except FileNotFoundError:
print(f"Error: Dataset file {dataset_jsonl} not found.")
sys.exit(1)
return rejected_tests, pdf_tests, test_pdf_map
def update_dataset(dataset_jsonl, rejected_tests, dry_run=True):
"""
Create a new dataset.jsonl without the rejected tests.
"""
temp_file = dataset_jsonl + ".temp"
removed_count = 0
try:
with open(dataset_jsonl, "r") as source, open(temp_file, "w") as target:
for line in source:
if not line.strip():
continue
try:
test = json.loads(line)
test_id = test.get("id")
if test_id in rejected_tests:
removed_count += 1
else:
target.write(line)
except json.JSONDecodeError:
continue
except FileNotFoundError:
print(f"Error: Dataset file {dataset_jsonl} not found.")
sys.exit(1)
if not dry_run:
os.replace(temp_file, dataset_jsonl)
else:
os.remove(temp_file)
return removed_count
def find_orphaned_pdfs(pdf_dir, pdf_tests, rejected_tests):
"""
Find PDF files that have all their tests rejected.
"""
orphaned_pdfs = []
for pdf_name, tests in pdf_tests.items():
# Check if all tests for this PDF are in the rejected list
if tests and all(test_id in rejected_tests for test_id in tests):
pdf_path = os.path.join(pdf_dir, pdf_name)
if os.path.exists(pdf_path):
orphaned_pdfs.append(pdf_path)
return orphaned_pdfs
def find_unreferenced_pdfs(pdf_dir, pdf_tests):
"""
Find PDF files in the pdf_dir that are not referenced by any test.
"""
unreferenced_pdfs = []
# List all PDFs in the directory (recursively)
for pdf_path in glob.glob(os.path.join(pdf_dir, "**", "*.pdf"), recursive=True):
# Get the relative path of the PDF from pdf_dir
pdf_name = os.path.relpath(pdf_path, pdf_dir)
if pdf_name not in pdf_tests:
unreferenced_pdfs.append(pdf_path)
return unreferenced_pdfs
def main():
parser = argparse.ArgumentParser(description="Delete rejected tests from dataset and orphaned/unreferenced PDFs")
parser.add_argument("--data_dir", type=str, required=True, help="Directory containing dataset.jsonl files and the pdfs/ folder")
parser.add_argument("--force", action="store_true", help="Perform actual deletion without confirmation")
args = parser.parse_args()
data_dir = args.data_dir
dry_run = not args.force
# Verify pdfs directory exists
pdf_dir = os.path.join(data_dir, "pdfs")
if not os.path.exists(pdf_dir):
print(f"Error: pdfs/ directory not found in {data_dir}")
sys.exit(1)
# Find all JSONL dataset files in the data_dir
dataset_files = glob.glob(os.path.join(data_dir, "*.jsonl"))
if not dataset_files:
print("No JSONL dataset files found.")
sys.exit(0)
# Global aggregation over all dataset files
global_rejected_tests = set()
global_pdf_tests = defaultdict(set)
global_test_pdf_map = {}
for dataset_file in dataset_files:
rejected_tests, pdf_tests, test_pdf_map = get_rejected_tests(dataset_file)
global_rejected_tests |= rejected_tests
for pdf_name, test_ids in pdf_tests.items():
global_pdf_tests[pdf_name].update(test_ids)
global_test_pdf_map.update(test_pdf_map)
total_tests = sum(len(test_ids) for test_ids in global_pdf_tests.values())
# Compute orphaned and unreferenced PDFs using global mapping
orphaned_pdfs = find_orphaned_pdfs(pdf_dir, global_pdf_tests, global_rejected_tests)
unreferenced_pdfs = find_unreferenced_pdfs(pdf_dir, global_pdf_tests)
# Print summary (global)
print("\n===== DELETION SUMMARY =====")
print(f"Mode: {'DRY RUN (no changes will be made)' if dry_run else 'FORCE (changes will be applied)'}")
print(f"Total tests: {total_tests}")
print(f"Tests marked as rejected: {len(global_rejected_tests)}")
print(f"PDF files with all tests rejected: {len(orphaned_pdfs)}")
print(f"PDF files not referenced by any tests: {len(unreferenced_pdfs)}")
if global_rejected_tests:
print("\nRejected tests:")
for test_id in sorted(global_rejected_tests):
print(f" - {test_id} (from {global_test_pdf_map.get(test_id, 'unknown')})")
if orphaned_pdfs:
print("\nPDF files to be deleted (all tests rejected):")
for pdf_path in sorted(orphaned_pdfs):
print(f" - {os.path.basename(pdf_path)}")
if unreferenced_pdfs:
print("\nPDF files to be deleted (unreferenced by any tests):")
for pdf_path in sorted(unreferenced_pdfs):
print(f" - {os.path.basename(pdf_path)}")
# If dry run, exit here
if dry_run and (global_rejected_tests or orphaned_pdfs or unreferenced_pdfs):
print("\nThis is a dry run. No changes have been made.")
print("To perform the actual deletion, run the script with the --force flag.")
return
# Confirm before deletion if there are items to delete
if global_rejected_tests or orphaned_pdfs or unreferenced_pdfs:
confirm = input("\nDo you want to proceed with deletion? (y/N): ")
if confirm.lower() not in ("y", "yes"):
print("Deletion cancelled.")
return
# Update each dataset file by removing rejected tests
for dataset_file in dataset_files:
removed_count = update_dataset(dataset_file, global_rejected_tests, dry_run=False)
print(f"Removed {removed_count} rejected tests from {os.path.basename(dataset_file)}")
# Delete orphaned PDFs
for pdf_path in orphaned_pdfs:
try:
os.remove(pdf_path)
print(f"Deleted orphaned PDF: {os.path.basename(pdf_path)}")
except OSError as e:
print(f"Error deleting {os.path.basename(pdf_path)}: {e}")
# Delete unreferenced PDFs
for pdf_path in unreferenced_pdfs:
try:
os.remove(pdf_path)
print(f"Deleted unreferenced PDF: {os.path.basename(pdf_path)}")
except OSError as e:
print(f"Error deleting {os.path.basename(pdf_path)}: {e}")
print("\nDeletion completed successfully.")
else:
print("\nNo rejected tests, orphaned PDFs, or unreferenced PDFs found. Nothing to delete.")
if __name__ == "__main__":
main()

View File

@ -11,13 +11,15 @@ This script:
5. Extracts the page from the PDF and saves it to an output folder
Usage:
python mine_tables.py --input_list path/to/s3_paths.txt --output_dir path/to/output --api_key your_gemini_api_key
python mine_tables.py --input_list path/to/s3_paths.txt --output_dir path/to/output --api_key your_gemini_api_key [--parallel 4]
"""
import argparse
import base64
import concurrent.futures
import os
import random
import threading
from typing import Dict, List, Optional, Tuple
import boto3
@ -26,12 +28,15 @@ import pypdf
from bs4 import BeautifulSoup
from google import genai
from google.genai import types
from tqdm import tqdm
from olmocr.bench.tests import TableTest, save_tests
from olmocr.data.renderpdf import render_pdf_to_base64png
from olmocr.filter import PdfFilter
# Create a thread-safe lock for writing to the output file
file_lock = threading.Lock()
tests_lock = threading.Lock()
def download_pdf_from_s3(s3_path: str, local_path: str) -> bool:
"""
@ -141,7 +146,8 @@ def detect_tables(pdf_path: str, page_num: int, api_key: str) -> Optional[Tuple[
text=(
"Analyze the document attached and output it in markdown format. "
"Output equations as Latex escaped with $$. "
"Output tables in valid HTML format that preserves the structure and content exactly. "
"Output tables in HTML format that preserves the structure and content exactly, do not use <br> tags. "
"Instead of the markdown table format, be sure to output tables in HTML, even though the rest of the document is styled in markdown. "
"Output figures with just a simple markdown image placeholder."
)
),
@ -292,7 +298,7 @@ def generate_table_tests(tables: List[np.ndarray], pdf_image: str, api_key: str,
return tests
def process_pdf(s3_path: str, temp_dir: str, output_dir: str, api_key: str, tests: List[TableTest]) -> None:
def process_pdf(s3_path: str, temp_dir: str, output_dir: str, api_key: str) -> List[TableTest]:
"""
Process a single PDF from S3.
@ -301,21 +307,30 @@ def process_pdf(s3_path: str, temp_dir: str, output_dir: str, api_key: str, test
temp_dir: Directory for temporary files
output_dir: Directory for output files
api_key: Gemini API key
tests: List to append tests to
Returns:
List[TableTest]: List of generated table tests
"""
# Create a thread-specific temp directory to avoid conflicts
thread_id = threading.get_ident()
thread_temp_dir = os.path.join(temp_dir, f"thread_{thread_id}")
os.makedirs(thread_temp_dir, exist_ok=True)
# Extract filename from S3 path
pdf_filename = os.path.basename(s3_path)
local_pdf_path = os.path.join(temp_dir, pdf_filename)
local_pdf_path = os.path.join(thread_temp_dir, pdf_filename)
# Download PDF from S3
if not download_pdf_from_s3(s3_path, local_pdf_path):
return
return []
pdf_filter = PdfFilter()
if pdf_filter.filter_out_pdf(local_pdf_path):
print(f"Filtering out {pdf_filename}")
return
if os.path.exists(local_pdf_path):
os.remove(local_pdf_path)
return []
try:
# Read the PDF to get the number of pages
@ -324,11 +339,13 @@ def process_pdf(s3_path: str, temp_dir: str, output_dir: str, api_key: str, test
if num_pages == 0:
print(f"PDF {pdf_filename} has no pages")
return
return []
all_pages = list(range(len(reader.pages)))
random.shuffle(all_pages)
local_tests = []
for page_num in all_pages:
# Detect tables and obtain the rendered image for this page
result = detect_tables(local_pdf_path, page_num, api_key)
@ -348,7 +365,8 @@ def process_pdf(s3_path: str, temp_dir: str, output_dir: str, api_key: str, test
# Extract the page and save to output dir
pdf_basename = os.path.splitext(pdf_filename)[0]
output_pdf_path = os.path.join(output_dir, "pdfs", f"{pdf_basename}_pg{page_num+1}.pdf")
extract_page_from_pdf(local_pdf_path, output_pdf_path, page_num)
with file_lock: # Use lock when writing to shared output directory
extract_page_from_pdf(local_pdf_path, output_pdf_path, page_num)
# Create table tests
for i, test_data in enumerate(table_tests_data):
@ -366,18 +384,69 @@ def process_pdf(s3_path: str, temp_dir: str, output_dir: str, api_key: str, test
top_heading=test_data.get("top_heading", None),
left_heading=test_data.get("left_heading", None),
)
tests.append(test)
local_tests.append(test)
print(f"Processed {pdf_filename} page {page_num+1}, found {len(tables)} tables, created {len(table_tests_data)} tests")
return # Process only one page per PDF
break # Process only one page per PDF
return local_tests
except Exception as e:
print(f"Error processing {pdf_filename}: {str(e)}")
return []
finally:
# Cleanup
if os.path.exists(local_pdf_path):
os.remove(local_pdf_path)
def process_pdfs_parallel(s3_paths: List[str], temp_dir: str, output_dir: str, api_key: str, max_tests: int, num_workers: int):
"""
Process PDFs in parallel using a thread pool.
Args:
s3_paths: List of S3 paths to PDFs
temp_dir: Directory for temporary files
output_dir: Directory for output files
api_key: Gemini API key
max_tests: Maximum number of tests to generate
num_workers: Number of parallel workers to use
"""
# Create shared resources
all_tests = []
output_file = os.path.join(output_dir, "table_tests.jsonl")
# Create a ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
# Submit tasks and track futures
futures = {executor.submit(process_pdf, s3_path, temp_dir, output_dir, api_key): s3_path for s3_path in s3_paths}
# Process results as they complete
for future in concurrent.futures.as_completed(futures):
s3_path = futures[future]
try:
# Get the tests produced by this worker
new_tests = future.result()
# If we got new tests, add them to our collection
if new_tests:
all_tests.extend(new_tests)
save_tests(all_tests, output_file)
print(f"Added {len(new_tests)} tests from {os.path.basename(s3_path)}, total: {len(all_tests)}")
# Check if we've reached the maximum number of tests
if len(all_tests) >= max_tests:
print(f"Reached maximum number of tests ({max_tests}), stopping")
# Cancel any pending futures
for f in futures:
if not f.done():
f.cancel()
break
except Exception as e:
print(f"Task for {os.path.basename(s3_path)} generated an exception: {e}")
def main():
parser = argparse.ArgumentParser(description="Extract tables from PDF documents and create table tests")
parser.add_argument("--input_list", required=True, help="Path to a file containing S3 paths to PDFs")
@ -385,6 +454,7 @@ def main():
parser.add_argument("--api_key", help="Gemini API key (if not provided, will use GEMINI_API_KEY environment variable)")
parser.add_argument("--temp_dir", default="/tmp/mine_tables", help="Directory for temporary files")
parser.add_argument("--max_tests", type=int, default=100, help="Maximum number of tests to generate")
parser.add_argument("--parallel", type=int, default=1, help="Number of parallel threads to use")
args = parser.parse_args()
# Get API key
@ -399,19 +469,16 @@ def main():
with open(args.input_list, "r") as f:
s3_paths = [line.strip() for line in f if line.strip()]
random.shuffle(s3_paths)
print(f"Found {len(s3_paths)} PDF paths in input list")
tests = []
for s3_path in tqdm(s3_paths, desc="Processing PDFs"):
process_pdf(s3_path, args.temp_dir, args.output_dir, api_key, tests)
if tests:
save_tests(tests, os.path.join(args.output_dir, "table_tests.jsonl"))
# Determine number of workers to use
num_workers = max(1, min(args.parallel, len(s3_paths)))
print(f"Processing PDFs using {num_workers} parallel workers")
if len(tests) >= args.max_tests:
print(f"Reached maximum number of tests ({args.max_tests}), stopping")
break
print(f"Saved {len(tests)} table tests to {os.path.join(args.output_dir, 'table_tests.jsonl')}")
# Process PDFs in parallel
process_pdfs_parallel(s3_paths, args.temp_dir, args.output_dir, api_key, args.max_tests, num_workers)
if __name__ == "__main__":

View File

@ -70,6 +70,7 @@ class BasePDFTest:
type: str
max_diffs: int = 0
checked: Optional[TestChecked] = None
url: Optional[str] = None
def __post_init__(self):
if not self.pdf:

View File

@ -1,562 +0,0 @@
#!/usr/bin/env python3
import argparse
import json
import os
import re
import sys
from collections import defaultdict
from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit
import requests
from olmocr.data.renderpdf import render_pdf_to_base64png
def parse_rules_file(file_path):
"""Parse the rules file and organize rules by PDF."""
pdf_rules = defaultdict(list)
with open(file_path, "r") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
rule = json.loads(line)
# Add checked field if it doesn't exist
if "checked" not in rule:
rule["checked"] = None
if "pdf" in rule:
pdf_rules[rule["pdf"]].append(rule)
except json.JSONDecodeError:
print(f"Warning: Could not parse line as JSON: {line}")
return pdf_rules
def get_rule_html(rule, rule_index):
"""Generate HTML representation for a rule with interactive elements."""
rule_type = rule.get("type", "unknown")
rule_id = f"rule-{rule_index}"
# Determine status button class based on 'checked' value
checked_status = rule.get("checked")
thumbs_up_class = "active" if checked_status == "verified" else ""
thumbs_down_class = "active" if checked_status == "rejected" else ""
# Create thumbs up/down buttons
status_button = f"""
<div class="status-control">
<button class="status-button thumbs-up {thumbs_up_class}"
data-rule-id="{rule_id}"
data-action="verified"
onclick="toggleStatus(this)"></button>
<button class="status-button thumbs-down {thumbs_down_class}"
data-rule-id="{rule_id}"
data-action="rejected"
onclick="toggleStatus(this)"></button>
</div>
"""
# Create HTML based on rule type
if rule_type == "present":
return f"""
<tr class="rule-row present-rule" data-rule-id="{rule_id}" data-rule-index="{rule_index}">
<td>{status_button}</td>
<td><span class="rule-type present">PRESENT</span></td>
<td>
<div class="editable-text"
contenteditable="true"
data-rule-id="{rule_id}"
data-field="text"
onblur="updateRuleText(this)">{rule.get('text', '')}</div>
</td>
<td>Threshold: {rule.get('threshold', 'N/A')}</td>
</tr>
"""
elif rule_type == "absent":
return f"""
<tr class="rule-row absent-rule" data-rule-id="{rule_id}" data-rule-index="{rule_index}">
<td>{status_button}</td>
<td><span class="rule-type absent">ABSENT</span></td>
<td>
<div class="editable-text"
contenteditable="true"
data-rule-id="{rule_id}"
data-field="text"
onblur="updateRuleText(this)">{rule.get('text', '')}</div>
</td>
<td>Threshold: {rule.get('threshold', 'N/A')}</td>
</tr>
"""
elif rule_type == "order":
return f"""
<tr class="rule-row order-rule" data-rule-id="{rule_id}" data-rule-index="{rule_index}">
<td>{status_button}</td>
<td><span class="rule-type order">ORDER</span></td>
<td>
<p><strong>Before:</strong>
<span class="editable-text"
contenteditable="true"
data-rule-id="{rule_id}"
data-field="before"
onblur="updateRuleText(this)">{rule.get('before', '')}</span>
</p>
<p><strong>After:</strong>
<span class="editable-text"
contenteditable="true"
data-rule-id="{rule_id}"
data-field="after"
onblur="updateRuleText(this)">{rule.get('after', '')}</span>
</p>
</td>
<td>Threshold: {rule.get('threshold', 'N/A')}</td>
</tr>
"""
else:
return f"""
<tr class="rule-row unknown-rule" data-rule-id="{rule_id}" data-rule-index="{rule_index}">
<td>{status_button}</td>
<td><span class="rule-type unknown">UNKNOWN</span></td>
<td>Unknown rule type: {rule_type}</td>
<td></td>
</tr>
"""
def generate_html(pdf_rules, rules_file_path):
"""Generate the HTML page with PDF renderings and interactive rules."""
# Limit to 10 unique PDFs
pdf_names = list(pdf_rules.keys())[:10]
# Prepare rules data for JavaScript
all_rules = []
for pdf_name in pdf_names:
all_rules.extend(pdf_rules[pdf_name])
rules_json = json.dumps(all_rules)
html = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Interactive PDF Rules Visualizer</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 0;
padding: 20px;
background-color: #f5f5f5;
}
.container {
max-width: 1920px;
margin: 0 auto;
}
h1 {
color: #333;
text-align: center;
margin-bottom: 30px;
}
.pdf-container {
background-color: white;
border-radius: 8px;
box-shadow: 0 2px 10px rgba(0, 0, 0, 0.1);
margin-bottom: 30px;
overflow: hidden;
}
.pdf-header {
background-color: #4a6fa5;
color: white;
padding: 15px;
font-size: 18px;
font-weight: bold;
}
.pdf-content {
display: flex;
flex-direction: row;
padding: 20px;
}
@media (max-width: 1200px) {
.pdf-content {
flex-direction: column;
}
}
.pdf-image {
flex: 0 0 50%;
max-width: 800px;
text-align: center;
padding-right: 20px;
}
.pdf-image img {
max-width: 100%;
height: auto;
border: 1px solid #ddd;
}
.rules-container {
flex: 1;
overflow: auto;
}
.rules-table {
width: 100%;
border-collapse: collapse;
}
.rules-table th {
background-color: #4a6fa5;
color: white;
padding: 10px;
text-align: left;
}
.rules-table td {
padding: 10px;
border-bottom: 1px solid #ddd;
vertical-align: top;
}
.rule-type {
display: inline-block;
padding: 5px 10px;
border-radius: 4px;
color: white;
font-weight: bold;
}
.present {
background-color: #28a745;
}
.absent {
background-color: #dc3545;
}
.order {
background-color: #fd7e14;
}
.unknown {
background-color: #6c757d;
}
.rule-row:hover {
background-color: #f8f9fa;
}
/* New styles for interactive elements */
.editable-text {
min-height: 20px;
padding: 5px;
border-radius: 4px;
border: 1px solid transparent;
transition: border-color 0.2s;
}
.editable-text:hover {
border-color: #ccc;
background-color: #f8f9fa;
}
.editable-text:focus {
outline: none;
border-color: #4a6fa5;
background-color: #fff;
}
.status-control {
display: flex;
justify-content: center;
align-items: center;
gap: 8px;
}
.status-button {
width: 36px;
height: 36px;
border-radius: 4px;
border: 1px solid #ccc;
background-color: #f8f9fa;
cursor: pointer;
transition: all 0.2s;
display: flex;
justify-content: center;
align-items: center;
}
.status-button:hover {
border-color: #999;
background-color: #e9ecef;
}
.thumbs-up:before {
content: "👍";
font-size: 18px;
opacity: 0.5;
}
.thumbs-down:before {
content: "👎";
font-size: 18px;
opacity: 0.5;
}
.thumbs-up.active {
background-color: #28a745;
border-color: #28a745;
}
.thumbs-up.active:before {
opacity: 1;
color: white;
}
.thumbs-down.active {
background-color: #dc3545;
border-color: #dc3545;
}
.thumbs-down.active:before {
opacity: 1;
color: white;
}
</style>
</head>
<body>
<div class="container">
<h1>Interactive PDF Rules Visualizer</h1>
"""
# Global rule index for unique IDs
rule_index = 0
for pdf_name in pdf_names:
rules = pdf_rules[pdf_name]
# Render the PDF (first page only) from the /pdfs folder
try:
pdf_path = os.path.join(os.path.dirname(rules_file_path), "pdfs", pdf_name)
base64_img = render_pdf_to_base64png(pdf_path, 0)
img_html = f'<img src="data:image/png;base64,{base64_img}" alt="{pdf_name}">'
except Exception as e:
img_html = f'<div class="error">Error rendering PDF: {str(e)}</div>'
html += f"""
<div class="pdf-container">
<div class="pdf-header">{pdf_name}</div>
<div class="pdf-content">
<div class="pdf-image">
{img_html}
</div>
<div class="rules-container">
<table class="rules-table">
<thead>
<tr>
<th>Status</th>
<th>Type</th>
<th>Content</th>
<th>Parameters</th>
</tr>
</thead>
<tbody>
"""
for rule in rules:
html += get_rule_html(rule, rule_index)
rule_index += 1
html += """
</tbody>
</table>
</div>
</div>
</div>
"""
# Add JavaScript to manage interactivity and datastore integration
html += f"""
</div>
<script>
// Store all rules data (initially injected from the JSON file)
let rulesData = {rules_json};
// Function to toggle status button
function toggleStatus(button) {{
const ruleRow = button.closest('.rule-row');
const ruleIndex = parseInt(ruleRow.dataset.ruleIndex);
const action = button.dataset.action;
const currentState = rulesData[ruleIndex].checked;
const newState = (currentState === action) ? null : action;
rulesData[ruleIndex].checked = newState;
// Update UI for status buttons
const buttons = ruleRow.querySelectorAll('.status-button');
buttons.forEach(btn => {{
if (btn.dataset.action === newState) {{
btn.classList.add('active');
}} else {{
btn.classList.remove('active');
}}
}});
// Upload updated data to datastore
uploadRulesData();
outputJSON();
}}
// Function to update rule text
function updateRuleText(element) {{
const ruleRow = element.closest('.rule-row');
const ruleIndex = parseInt(ruleRow.dataset.ruleIndex);
const field = element.dataset.field;
const newText = element.innerText.trim();
// Update the rules data
rulesData[ruleIndex][field] = newText;
// Upload updated data to datastore
uploadRulesData();
outputJSON();
}}
// Function to output JSONL to console
function outputJSON() {{
console.clear();
console.log("Updated JSONL:");
rulesData.forEach(rule => {{
console.log(JSON.stringify(rule));
}});
}}
// Function to upload rulesData to datastore using putDatastore
async function uploadRulesData() {{
try {{
await putDatastore(rulesData);
console.log("Datastore updated successfully");
}} catch (error) {{
console.error("Failed to update datastore", error);
}}
}}
// Function to update UI from rulesData (used after fetching datastore state)
function updateUIFromRulesData() {{
document.querySelectorAll('.rule-row').forEach(ruleRow => {{
const ruleIndex = parseInt(ruleRow.dataset.ruleIndex);
const rule = rulesData[ruleIndex];
// Update status buttons
const buttons = ruleRow.querySelectorAll('.status-button');
buttons.forEach(btn => {{
if (btn.dataset.action === rule.checked) {{
btn.classList.add('active');
}} else {{
btn.classList.remove('active');
}}
}});
// Update editable text fields
ruleRow.querySelectorAll('.editable-text').forEach(div => {{
const field = div.dataset.field;
if (rule[field] !== undefined) {{
div.innerText = rule[field];
}}
}});
}});
}}
// On page load, fetch data from the datastore and update UI accordingly
document.addEventListener('DOMContentLoaded', async function() {{
try {{
const datastoreState = await fetchDatastore();
if (datastoreState.length) {{
rulesData = datastoreState;
updateUIFromRulesData();
outputJSON();
}}
}} catch (error) {{
console.error("Error fetching datastore", error);
}}
}});
</script>
</body>
</html>
"""
return html
def get_page_datastore(html: str):
"""
Fetch the JSON datastore from the presigned URL.
Returns a dict. If any error or no content, returns {}.
"""
match = re.search(r"const presignedGetUrl = \"(.*?)\";", html)
if not match:
return None
presigned_url = match.group(1)
try:
# Clean up the presigned URL (sometimes the signature may need re-encoding)
url_parts = urlsplit(presigned_url)
query_params = parse_qs(url_parts.query)
encoded_query = urlencode(query_params, doseq=True)
cleaned_url = urlunsplit((url_parts.scheme, url_parts.netloc, url_parts.path, encoded_query, url_parts.fragment))
resp = requests.get(cleaned_url)
resp.raise_for_status()
return resp.json()
except Exception as e:
print(f"Error fetching datastore from {presigned_url}: {e}")
return None
def main():
parser = argparse.ArgumentParser(description="Generate an interactive HTML visualization of PDF rules.")
parser.add_argument("rules_file", help="Path to the rules file (JSON lines format)")
parser.add_argument("-o", "--output", help="Output HTML file path", default="interactive_pdf_rules.html")
args = parser.parse_args()
if not os.path.exists(args.rules_file):
print(f"Error: Rules file not found: {args.rules_file}")
sys.exit(1)
if os.path.exists(args.output):
print(f"Output file {args.output} already exists, attempting to reload it's datastore")
with open(args.output, "r") as df:
datastore = get_page_datastore(df.read())
if datastore is None:
print(f"Datastore for {args.output} is empty, please run tinyhost and verify your rules and then rerun the script")
sys.exit(1)
print(f"Loaded {len(datastore)} entries from datastore, updating {args.rules_file}")
with open(args.rules_file, "w") as of:
for rule in datastore:
of.write(json.dumps(rule) + "\n")
return
pdf_rules = parse_rules_file(args.rules_file)
html = generate_html(pdf_rules, args.rules_file)
with open(args.output, "w") as f:
f.write(html)
print(f"Interactive HTML visualization created: {args.output}")
if __name__ == "__main__":
main()