import argparse
import base64
import csv
import datetime
import json
import os
import random
import re
import sqlite3
import tempfile
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import boto3
import requests
import tinyhost
from tqdm import tqdm
from olmocr.data.renderpdf import render_pdf_to_base64webp
from olmocr.s3_utils import get_s3_bytes, parse_s3_path
def parse_args():
parser = argparse.ArgumentParser(description="Scan OLMO OCR workspace results and create visual samples")
parser.add_argument("workspace", help="OLMO OCR workspace path (s3://bucket/workspace)")
parser.add_argument("--pages_per_output", type=int, default=30, help="Number of pages per output file")
parser.add_argument("--repeats", type=int, default=1, help="Number of output files to generate")
parser.add_argument("--pdf_profile", help="AWS profile for accessing PDFs")
parser.add_argument("--output_dir", default="dolma_samples", help="Directory to save output HTML files")
parser.add_argument("--max_workers", type=int, default=4, help="Maximum number of worker threads")
parser.add_argument(
"--db_path",
default="~/s2pdf_url_data/d65142df-6588-4b68-a12c-d468b3761189.csv.db",
help="Path to the SQLite database containing PDF hash to URL mapping",
)
parser.add_argument(
"--prolific_code",
required=True,
help="Fixed completion code to use for all outputs",
)
parser.add_argument(
"--prolific_csv",
default="prolific_codes.csv",
help="Path to save the file with tinyhost links (one URL per line)",
)
parser.add_argument(
"--read_results",
help="Path to a CSV file containing previously generated tinyhost links to extract annotations",
)
return parser.parse_args()
# Fixed prolific code is now passed in as a command line argument
def obfuscate_code(code):
"""Gently obfuscate the Prolific code so it's not immediately visible in source."""
# Convert to base64 and reverse
encoded = base64.b64encode(code.encode()).decode()
return encoded[::-1]
def deobfuscate_code(obfuscated_code):
"""Deobfuscate the code - this will be done in JavaScript."""
# Reverse and decode from base64
reversed_encoded = obfuscated_code[::-1]
try:
return base64.b64decode(reversed_encoded).decode()
except:
return "ERROR_DECODING"
def parse_pdf_hash(pretty_pdf_path: str) -> Optional[str]:
pattern = r"s3://ai2-s2-pdfs/([a-f0-9]{4})/([a-f0-9]+)\.pdf"
match = re.match(pattern, pretty_pdf_path)
if match:
return match.group(1) + match.group(2)
return None
def get_original_url(pdf_hash: str, db_path: str) -> Optional[str]:
"""Look up the original URL for a PDF hash in the SQLite database."""
if not pdf_hash:
return None
try:
sqlite_db_path = os.path.expanduser(db_path)
if not os.path.exists(sqlite_db_path):
print(f"SQLite database not found at {sqlite_db_path}")
return None
conn = sqlite3.connect(sqlite_db_path)
cursor = conn.cursor()
cursor.execute("SELECT uri FROM pdf_mapping WHERE pdf_hash = ?", (pdf_hash,))
result = cursor.fetchone()
conn.close()
if result:
return result[0]
return None
except Exception as e:
print(f"Error looking up URL for PDF hash {pdf_hash}: {e}")
return None
def list_result_files(s3_client, workspace_path):
"""List all JSON result files in the workspace results directory."""
bucket, prefix = parse_s3_path(workspace_path)
results_prefix = os.path.join(prefix, "results").rstrip("/") + "/"
all_files = []
paginator = s3_client.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket, Prefix=results_prefix):
if "Contents" in page:
all_files.extend([f"s3://{bucket}/{obj['Key']}" for obj in page["Contents"] if obj["Key"].endswith(".jsonl") or obj["Key"].endswith(".json")])
if len(all_files) > 1000:
break
return all_files
def get_random_pages(s3_client, result_files, count=30):
"""Get random pages from the result files."""
random_pages = []
# Try to collect the requested number of pages
attempts = 0
max_attempts = count * 3 # Allow extra attempts to handle potential failures
while len(random_pages) < count and attempts < max_attempts:
attempts += 1
# Pick a random result file
if not result_files:
print("No result files found!")
break
result_file = random.choice(result_files)
try:
# Get the content of the file
content = get_s3_bytes(s3_client, result_file)
lines = content.decode("utf-8").strip().split("\n")
if not lines:
continue
# Pick a random line (which contains a complete document)
line = random.choice(lines)
doc = json.loads(line)
# A Dolma document has "text", "metadata", and "attributes" fields
if "text" not in doc or "metadata" not in doc or "attributes" not in doc:
print(f"Document in {result_file} is not a valid Dolma document")
continue
# Get the original PDF path from metadata
pdf_path = doc["metadata"].get("Source-File")
if not pdf_path:
continue
# Get page spans from attributes
page_spans = doc["attributes"].get("pdf_page_numbers", [])
if not page_spans:
continue
# Pick a random page span
page_span = random.choice(page_spans)
if len(page_span) >= 3:
# Page spans are [start_pos, end_pos, page_num]
page_num = page_span[2]
# Extract text for this page
start_pos, end_pos = page_span[0], page_span[1]
page_text = doc["text"][start_pos:end_pos].strip()
# Include the text snippet with the page info
random_pages.append((pdf_path, page_num, page_text, result_file))
if len(random_pages) >= count:
break
except Exception as e:
print(f"Error processing {result_file}: {e}")
continue
print(f"Found {len(random_pages)} random pages from Dolma documents")
return random_pages
def create_presigned_url(s3_client, pdf_path, expiration=3600 * 24 * 7):
"""Create a presigned URL for the given S3 path."""
try:
bucket, key = parse_s3_path(pdf_path)
url = s3_client.generate_presigned_url("get_object", Params={"Bucket": bucket, "Key": key}, ExpiresIn=expiration)
return url
except Exception as e:
print(f"Error creating presigned URL for {pdf_path}: {e}")
return None
def create_html_output(random_pages, pdf_s3_client, output_path, workspace_path, db_path, prolific_code, resolution=2048):
"""Create an HTML file with rendered PDF pages."""
# Obfuscate the provided Prolific code
obfuscated_code = obfuscate_code(prolific_code)
# Get current date and time for the report
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
html_content = f"""
OLMO OCR Samples
Task Instructions
Your task is to review {len(random_pages)} document pages and determine whether they contain any Personally Identifiable Information (PII). Carefully but efficiently inspect each page and select the appropriate response. You do not need to read every word - quickly scan the page and look for any obvious PII. The time expected to complete this task is 10-15 minutes.
How to Annotate
The page you are currently annotating will be highlighted with a blue outline and a set of response buttons will be displayed directly below it.
Yes PII - Select this if you find any information on the page that qualifies as PII. A text box will appear below - briefly describe the kind of PII you encountered (e.g., full name, social security number, etc.) then press the Enter key.
No PII - Select this if the page does not contain any PII.
I cannot read this - Select this if you are unable to read the page for any reason (e.g., written in a language other than English, heavily redacted text, etc.)
Disturbing content - Select this if the page contains disturbing or graphic content.
You may edit your annotations any time before submitting. To do so, press the green Edit button directly above the page.
After completing all the document pages on this screen, you will receive a Prolific completion code.
What Counts as PII?
Names: Full names, first names, last names, nicknames, maiden names, aliases
Addresses: Street addresses, postal codes, cities, states, countries
Personal Attributes: Date of birth, place of birth, gender, race, religion
Online Identifiers: IP addresses, login IDs, usernames, passwords, API keys, URLs
Location Information: Geolocations, specific coordinates
Employment Information: Job titles, workplace names, employment history
Education Information: School names, degrees, transcripts
Medical Information: Health records, diagnoses
Company Names: If they are tied to an individual's identity (e.g., a person's personal business)
What NOT to Mark as PII
Author names, researcher names, citations, or references from published research papers should NOT be marked as PII. These names are part of the normal publication process and are not considered private or sensitive information for the purposes of this task.
Only mark information as PII if it relates to private, sensitive, or personal details about an individual outside the context of the publication.
Generated On
{current_time}
Workspace
{workspace_path}
Sample Size
{len(random_pages)} pages
"""
for i, (pdf_path, page_num, page_text, result_file) in enumerate(tqdm(random_pages, desc="Rendering pages")):
# Get original URL from PDF hash
pdf_hash = parse_pdf_hash(pdf_path)
original_url = get_original_url(pdf_hash, db_path) if pdf_hash else None
# Create a truncated path for display
display_path = pdf_path
if len(display_path) > 60:
display_path = "..." + display_path[-57:]
# Generate presigned URL
presigned_url = create_presigned_url(pdf_s3_client, pdf_path)
try:
# Download PDF to temp file
bucket, key = parse_s3_path(pdf_path)
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_file:
pdf_data = pdf_s3_client.get_object(Bucket=bucket, Key=key)["Body"].read()
temp_file.write(pdf_data)
temp_file_path = temp_file.name
# Render PDF to base64 webp
base64_image = render_pdf_to_base64webp(temp_file_path, page_num, resolution)
# Add CSS class for the first annotation interface to be active by default
active_class = " active" if i == 0 else ""
# Add to HTML with the annotation interface
html_content += f"""
"""
# Clean up temp file
os.unlink(temp_file_path)
except Exception as e:
# Add CSS class for the first annotation interface to be active by default
active_class = " active" if i == 0 else ""
html_content += f"""
Thank you! All annotations are complete.
Your Prolific completion code is: Loading...
"""
+ obfuscated_code
+ """
"""
)
with open(output_path, "w") as f:
f.write(html_content)
print(f"Created HTML output at {output_path}")
def generate_sample_set(args, i, s3_client, pdf_s3_client, result_files):
"""Generate a single sample set."""
output_filename = Path(args.output_dir) / f"dolma_samples_{i+1}.html"
print(f"\nGenerating sample set {i+1} of {args.repeats}")
# Get random pages
random_pages = get_random_pages(s3_client, result_files, args.pages_per_output)
# Use the fixed prolific code from command line arguments
prolific_code = args.prolific_code
# Create HTML output with the Prolific code
create_html_output(random_pages, pdf_s3_client, output_filename, args.workspace, args.db_path, prolific_code)
return output_filename
def extract_datastore_url(html_content: str) -> Optional[str]:
"""Extract the presigned datastore URL from HTML content."""
match = re.search(r'const\s+presignedGetUrl\s*=\s*"([^"]+)"', html_content)
if match:
return match.group(1)
return None
def fetch_annotations(tinyhost_link: str) -> Tuple[Dict[str, Any], str]:
"""Fetch and parse annotations from a tinyhost link."""
# Request the HTML content
print(f"Fetching annotations from {tinyhost_link}")
response = requests.get(tinyhost_link)
response.raise_for_status()
html_content = response.text
# Extract the datastore URL
datastore_url = extract_datastore_url(html_content)
if not datastore_url:
print(f"Could not find datastore URL in {tinyhost_link}")
return {}, tinyhost_link
# Fetch the datastore content
print(f"Found datastore URL: {datastore_url}")
try:
datastore_response = requests.get(datastore_url)
datastore_response.raise_for_status()
annotations = datastore_response.json()
return annotations, tinyhost_link
except Exception as e:
print(f"Error fetching datastore from {datastore_url}: {e}")
return {}, tinyhost_link
def process_annotations(annotations_by_link: List[Tuple[Dict[str, Any], str]]) -> Dict[str, List[Dict[str, Any]]]:
"""Process and categorize annotations by feedback type."""
results = {
"yes_pii": [],
"no_pii": [],
"cannot_read": [],
"disturbing": [],
"no_annotation": [],
}
# Process each annotation
for annotations, link in annotations_by_link:
for page_id, annotation in annotations.items():
if not annotation or "feedbackOption" not in annotation:
results["no_annotation"].append(
{"page_id": page_id, "link": link, "pdf_path": annotation.get("pdfPath", "Unknown") if annotation else "Unknown"}
)
continue
category = annotation["feedbackOption"]
result_item = {
"page_id": page_id,
"link": link,
"pdf_path": annotation.get("pdfPath", "Unknown"),
"description": annotation.get("piiDescription", ""),
}
if category == "yes-pii":
results["yes_pii"].append(result_item)
elif category == "no-pii":
results["no_pii"].append(result_item)
elif category == "cannot-read":
results["cannot_read"].append(result_item)
elif category == "disturbing":
results["disturbing"].append(result_item)
else:
results["no_annotation"].append(result_item)
return results
def print_annotation_report(annotation_results: Dict[str, List[Dict[str, Any]]]):
"""Print a summary report of annotations."""
total_pages = sum(len(items) for items in annotation_results.values())
print("\n" + "=" * 80)
print(f"ANNOTATION REPORT - Total Pages: {total_pages}")
print("=" * 80)
# Print summary statistics
print("\nSummary:")
print(f" Pages with PII: {len(annotation_results['yes_pii'])} ({len(annotation_results['yes_pii'])/total_pages*100:.1f}%)")
print(f" Pages without PII: {len(annotation_results['no_pii'])} ({len(annotation_results['no_pii'])/total_pages*100:.1f}%)")
print(f" Unreadable pages: {len(annotation_results['cannot_read'])} ({len(annotation_results['cannot_read'])/total_pages*100:.1f}%)")
print(f" Pages with disturbing content: {len(annotation_results['disturbing'])} ({len(annotation_results['disturbing'])/total_pages*100:.1f}%)")
print(f" Pages without annotation: {len(annotation_results['no_annotation'])} ({len(annotation_results['no_annotation'])/total_pages*100:.1f}%)")
# Print detailed report for pages with PII
if annotation_results["yes_pii"]:
print("\nDetailed Report - Pages with PII:")
print("-" * 80)
for i, item in enumerate(annotation_results["yes_pii"], 1):
print(f"{i}. PDF: {item['pdf_path']}")
print(f" Page ID: {item['page_id']}")
print(f" Link: {item['link']}#{item['page_id']}")
print(f" Description: {item['description']}")
print("-" * 80)
print("\nReport complete.")
def read_and_process_results(args):
"""Read and process results from a previously generated CSV file."""
try:
# Read the CSV file
links = []
with open(args.read_results, "r") as f:
for line in f:
if line.strip():
links.append(line.strip())
if not links:
print(f"No tinyhost links found in {args.read_results}")
return
print(f"Found {len(links)} tinyhost links in {args.read_results}")
# Fetch and process annotations
annotations_by_link = []
for link in tqdm(links, desc="Fetching annotations"):
try:
annotations, link_url = fetch_annotations(link)
annotations_by_link.append((annotations, link_url))
except Exception as e:
print(f"Error processing {link}: {e}")
# Process and categorize annotations
annotation_results = process_annotations(annotations_by_link)
# Print report
print_annotation_report(annotation_results)
# Save detailed report to file
output_file = Path(args.output_dir) / "annotation_report.csv"
print(f"\nSaving detailed report to {output_file}")
with open(output_file, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["Category", "PDF Path", "Page ID", "Link", "Description"])
for category, items in annotation_results.items():
for item in items:
writer.writerow([category, item["pdf_path"], item["page_id"], f"{item['link']}#{item['page_id']}", item.get("description", "")])
print(f"Report saved to {output_file}")
except Exception as e:
print(f"Error processing results: {e}")
def main():
args = parse_args()
# Check if we're reading results from a previous run
if args.read_results:
read_and_process_results(args)
return
# Set up S3 clients
s3_client = boto3.client("s3")
# Set up PDF S3 client with profile if specified
if args.pdf_profile:
pdf_session = boto3.Session(profile_name=args.pdf_profile)
pdf_s3_client = pdf_session.client("s3")
else:
pdf_s3_client = s3_client
# Create output directory
output_dir = Path(args.output_dir)
output_dir.mkdir(exist_ok=True, parents=True)
# List all result files
print(f"Listing result files in {args.workspace}/results...")
result_files = list_result_files(s3_client, args.workspace)
print(f"Found {len(result_files)} result files")
# Use ThreadPoolExecutor to parallelize the generation of sample sets
output_files = []
if args.repeats > 1:
print(f"Using ThreadPoolExecutor with {min(args.max_workers, args.repeats)} workers")
with ThreadPoolExecutor(max_workers=min(args.max_workers, args.repeats)) as executor:
futures = []
for i in range(args.repeats):
future = executor.submit(generate_sample_set, args, i, s3_client, pdf_s3_client, result_files)
futures.append(future)
# Wait for all futures to complete and collect results
for future in futures:
try:
output_filename = future.result()
output_files.append(output_filename)
print(f"Completed generation of {output_filename}")
except Exception as e:
print(f"Error generating sample set: {e}")
else:
# If only one repeat, just run it directly
output_filename = generate_sample_set(args, 0, s3_client, pdf_s3_client, result_files)
output_files.append(output_filename)
# Now upload each resulting file into tinyhost
print("Generated all files, uploading tinyhost links now")
links = []
for output_filename in output_files:
link = tinyhost.tinyhost([str(output_filename)])[0]
links.append(link)
print(link)
# Create CSV file with just the tinyhost links, one per line
csv_path = args.prolific_csv
print(f"Writing tinyhost links to {csv_path}")
with open(csv_path, "w", newline="") as csvfile:
for link in links:
csvfile.write(f"{link}\n")
print(f"Tinyhost links written to {csv_path}")
if __name__ == "__main__":
main()