import argparse
import base64
import csv
import datetime
import json
import os
import random
import re
import sqlite3
import tempfile
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import boto3
import requests
import tinyhost
from tqdm import tqdm
from olmocr.data.renderpdf import render_pdf_to_base64webp
from olmocr.s3_utils import get_s3_bytes, parse_s3_path
def parse_args():
parser = argparse.ArgumentParser(description="Scan OLMO OCR workspace results and create visual samples")
parser.add_argument("workspace", help="OLMO OCR workspace path (s3://bucket/workspace)")
parser.add_argument("--pages_per_output", type=int, default=30, help="Number of pages per output file")
parser.add_argument("--repeats", type=int, default=1, help="Number of output files to generate")
parser.add_argument("--pdf_profile", help="AWS profile for accessing PDFs")
parser.add_argument("--output_dir", default="dolma_samples", help="Directory to save output HTML files")
parser.add_argument("--max_workers", type=int, default=4, help="Maximum number of worker threads")
parser.add_argument(
"--db_path",
default="~/s2pdf_url_data/d65142df-6588-4b68-a12c-d468b3761189.csv.db",
help="Path to the SQLite database containing PDF hash to URL mapping",
)
parser.add_argument(
"--prolific_code",
required=True,
help="Fixed completion code to use for all outputs",
)
parser.add_argument(
"--prolific_csv",
default="prolific_codes.csv",
help="Path to save the file with tinyhost links (one URL per line)",
)
parser.add_argument(
"--read_results",
help="Path to a CSV file containing previously generated tinyhost links to extract annotations",
)
return parser.parse_args()
# Fixed prolific code is now passed in as a command line argument
def obfuscate_code(code):
"""Gently obfuscate the Prolific code so it's not immediately visible in source."""
# Convert to base64 and reverse
encoded = base64.b64encode(code.encode()).decode()
return encoded[::-1]
def deobfuscate_code(obfuscated_code):
"""Deobfuscate the code - this will be done in JavaScript."""
# Reverse and decode from base64
reversed_encoded = obfuscated_code[::-1]
try:
return base64.b64decode(reversed_encoded).decode()
except:
return "ERROR_DECODING"
def parse_pdf_hash(pretty_pdf_path: str) -> Optional[str]:
pattern = r"s3://ai2-s2-pdfs/([a-f0-9]{4})/([a-f0-9]+)\.pdf"
match = re.match(pattern, pretty_pdf_path)
if match:
return match.group(1) + match.group(2)
return None
def get_original_url(pdf_hash: str, db_path: str) -> Optional[str]:
"""Look up the original URL for a PDF hash in the SQLite database."""
if not pdf_hash:
return None
try:
sqlite_db_path = os.path.expanduser(db_path)
if not os.path.exists(sqlite_db_path):
print(f"SQLite database not found at {sqlite_db_path}")
return None
conn = sqlite3.connect(sqlite_db_path)
cursor = conn.cursor()
cursor.execute("SELECT uri FROM pdf_mapping WHERE pdf_hash = ?", (pdf_hash,))
result = cursor.fetchone()
conn.close()
if result:
return result[0]
return None
except Exception as e:
print(f"Error looking up URL for PDF hash {pdf_hash}: {e}")
return None
def list_result_files(s3_client, workspace_path):
"""List all JSON result files in the workspace results directory."""
bucket, prefix = parse_s3_path(workspace_path)
results_prefix = os.path.join(prefix, "results").rstrip("/") + "/"
all_files = []
paginator = s3_client.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket, Prefix=results_prefix):
if "Contents" in page:
all_files.extend([f"s3://{bucket}/{obj['Key']}" for obj in page["Contents"] if obj["Key"].endswith(".jsonl") or obj["Key"].endswith(".json")])
if len(all_files) > 1000:
break
return all_files
def get_random_pages(s3_client, result_files, count=30):
"""Get random pages from the result files."""
random_pages = []
# Try to collect the requested number of pages
attempts = 0
max_attempts = count * 3 # Allow extra attempts to handle potential failures
while len(random_pages) < count and attempts < max_attempts:
attempts += 1
# Pick a random result file
if not result_files:
print("No result files found!")
break
result_file = random.choice(result_files)
try:
# Get the content of the file
content = get_s3_bytes(s3_client, result_file)
lines = content.decode("utf-8").strip().split("\n")
if not lines:
continue
# Pick a random line (which contains a complete document)
line = random.choice(lines)
doc = json.loads(line)
# A Dolma document has "text", "metadata", and "attributes" fields
if "text" not in doc or "metadata" not in doc or "attributes" not in doc:
print(f"Document in {result_file} is not a valid Dolma document")
continue
# Get the original PDF path from metadata
pdf_path = doc["metadata"].get("Source-File")
if not pdf_path:
continue
# Get page spans from attributes
page_spans = doc["attributes"].get("pdf_page_numbers", [])
if not page_spans:
continue
# Pick a random page span
page_span = random.choice(page_spans)
if len(page_span) >= 3:
# Page spans are [start_pos, end_pos, page_num]
page_num = page_span[2]
# Extract text for this page
start_pos, end_pos = page_span[0], page_span[1]
page_text = doc["text"][start_pos:end_pos].strip()
# Include the text snippet with the page info
random_pages.append((pdf_path, page_num, page_text, result_file))
if len(random_pages) >= count:
break
except Exception as e:
print(f"Error processing {result_file}: {e}")
continue
print(f"Found {len(random_pages)} random pages from Dolma documents")
return random_pages
def create_presigned_url(s3_client, pdf_path, expiration=3600 * 24 * 7):
"""Create a presigned URL for the given S3 path."""
try:
bucket, key = parse_s3_path(pdf_path)
url = s3_client.generate_presigned_url("get_object", Params={"Bucket": bucket, "Key": key}, ExpiresIn=expiration)
return url
except Exception as e:
print(f"Error creating presigned URL for {pdf_path}: {e}")
return None
def create_html_output(random_pages, pdf_s3_client, output_path, workspace_path, db_path, prolific_code, resolution=2048):
"""Create an HTML file with rendered PDF pages."""
# Obfuscate the provided Prolific code
obfuscated_code = obfuscate_code(prolific_code)
# Get current date and time for the report
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
html_content = f"""
OLMO OCR Samples
Task Overview
In this task, you will review {len(random_pages)} document pages and determine whether they contain any Personally Identifiable Information (PII). For each page, please follow the decision flow outlined in the "How to Annotate" section below.
Carefully but efficiently inspect each page and select the appropriate response. You do not need to read every word. Instead, focus on ascertaining the document's intended use and spotting information that would qualify as PII.
The entire task should take about 25-30 minutes.
How to Annotate
The current annotation will be highlighted with a blue outline and a set of response buttons will be displayed directly below the page preview. For each page, complete the following steps:
Determine if the document is intended for public release.
Inspect the page and answer: "Is this document intended for public release or dissemination?"
Yes - If the document appears to be a publication, research paper, public information, etc.
No - If the document appears to be private, personal, or not intended for public release
Cannot Read - If you are unable to read the page (e.g., foreign language, no text, etc.)
Report Content - If the content is inappropriate or disturbing
If you selected "Yes," "Cannot Read," or "Report Content," you will automatically move to the next document. If you selected "No," proceed to Step 2.
Identify the kind of PII found in the private document (if any).
You will be shown a checklist with a set of PII options.
Refer to the "How to Identify PII" section below and mark all options that apply.
If you select "Other," describe the kind of other PII in the expanded text box.
Press the blue Continue button to complete your annotation.
You will automatically be moved to the next annotation.
You may review and edit your previous annotations at any time. To do so, press the green Edit button directly above the page preview for the annotation you want to edit.
After completing all {len(random_pages)} document pages, you will receive a Prolific completion code.
How to Identify PII
Identifiers for PII
Some personal information needs to be accompanied by an identifier to be considered PII. Identifiers that trigger PII include:
Names
Email Addresses
Phone Numbers
PII that must co-occur with an Identifier
The following types of information should only be marked as PII if they occur alongside an identifier (commonly, a person's name):
Names (full, first, last, maiden, nicknames, aliases)
Contact Information (phone numbers, emails)
Addresses (street address, postal code, etc.)
Biographical Information (date of birth, place of birth, gender, sexual orientation, race, ethnicity, citizenship/immigration status, religion)
Location Information (geolocations, specific coordinates)
Employment Information (job titles, workplace names, employment history)
Education Information (school names, degrees, transcripts)
Medical Information (health records, diagnoses, genetic or neural data)
Note that some of these items are identifiers themselves.
Example: A street address might be personal information, but is not PII by itself. However, a street address associated with a name is regulated PII.
PII that occurs even without an Identifier
Certain types of sensitive information should always be classified as PII because the information is inherently self-identifying. The following should always be marked as PII even if they do not occur alongside an identifier:
Government IDs (SSNs, passport numbers, driver's license numbers, tax IDs)
Financial Information (credit card numbers, bank account/routing numbers)
Biometric Data (fingerprints, retina scans, facial recognition data, voice signatures)
Login information (only mark as PII when a username, password, and login location are present together)
Generated On
{current_time}
Workspace
{workspace_path}
Sample Size
{len(random_pages)} pages
"""
for i, (pdf_path, page_num, page_text, result_file) in enumerate(tqdm(random_pages, desc="Rendering pages")):
# Get original URL from PDF hash
pdf_hash = parse_pdf_hash(pdf_path)
_original_url = get_original_url(pdf_hash, db_path) if pdf_hash else None
# Create a truncated path for display
display_path = pdf_path
if len(display_path) > 60:
display_path = "..." + display_path[-57:]
# Generate presigned URL
presigned_url = create_presigned_url(pdf_s3_client, pdf_path)
try:
# Download PDF to temp file
bucket, key = parse_s3_path(pdf_path)
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_file:
pdf_data = pdf_s3_client.get_object(Bucket=bucket, Key=key)["Body"].read()
temp_file.write(pdf_data)
temp_file_path = temp_file.name
# Render PDF to base64 webp
base64_image = render_pdf_to_base64webp(temp_file_path, page_num, resolution)
# Add CSS class for the first annotation interface to be active by default
active_class = " active" if i == 0 else ""
# Add to HTML with the annotation interface
html_content += f"""
Is this document meant for public dissemination? (ex. news article, research paper, etc.)
Select any PII found in this public document:
Select any PII found in this private document:
Identifiers (Select these if found)
PII that requires an identifier above
PII that is always sensitive (even without an identifier)
"""
# Clean up temp file
os.unlink(temp_file_path)
except Exception as e:
# Add CSS class for the first annotation interface to be active by default
active_class = " active" if i == 0 else ""
html_content += f"""
PII that is always sensitive (even without an identifier)
"""
html_content += (
"""
Thank you! All annotations are complete.
Your Prolific completion code is: Loading...
"""
+ obfuscated_code
+ """
"""
)
with open(output_path, "w") as f:
f.write(html_content)
print(f"Created HTML output at {output_path}")
def generate_sample_set(args, i, s3_client, pdf_s3_client, result_files):
"""Generate a single sample set."""
output_filename = Path(args.output_dir) / f"dolma_samples_{i+1}.html"
print(f"\nGenerating sample set {i+1} of {args.repeats}")
# Get random pages
random_pages = get_random_pages(s3_client, result_files, args.pages_per_output)
# Use the fixed prolific code from command line arguments
prolific_code = args.prolific_code
# Create HTML output with the Prolific code
create_html_output(random_pages, pdf_s3_client, output_filename, args.workspace, args.db_path, prolific_code)
return output_filename
def extract_datastore_url(html_content: str) -> Optional[str]:
"""Extract the presigned datastore URL from HTML content."""
match = re.search(r'const\s+presignedGetUrl\s*=\s*"([^"]+)"', html_content)
if match:
return match.group(1)
return None
def fetch_annotations(tinyhost_link: str) -> Tuple[Dict[str, Any], str]:
"""Fetch and parse annotations from a tinyhost link."""
# Request the HTML content
print(f"Fetching annotations from {tinyhost_link}")
response = requests.get(tinyhost_link)
response.raise_for_status()
html_content = response.text
# Extract the datastore URL
datastore_url = extract_datastore_url(html_content)
if not datastore_url:
print(f"Could not find datastore URL in {tinyhost_link}")
return {}, tinyhost_link
# Fetch the datastore content
print(f"Found datastore URL: {datastore_url}")
try:
datastore_response = requests.get(datastore_url)
datastore_response.raise_for_status()
annotations = datastore_response.json()
return annotations, tinyhost_link
except Exception as e:
print(f"Error fetching datastore from {datastore_url}: {e}")
return {}, tinyhost_link
def process_annotations(annotations_by_link: List[Tuple[Dict[str, Any], str]]) -> Dict[str, List[Dict[str, Any]]]:
"""Process and categorize annotations by feedback type."""
results = {
"public_document": [],
"private_document": [],
"cannot_read": [],
"report_content": [],
"no_annotation": [],
}
# Process each annotation
for annotations, link in annotations_by_link:
for page_id, annotation in annotations.items():
if not annotation or "primaryOption" not in annotation:
results["no_annotation"].append(
{"page_id": page_id, "link": link, "pdf_path": annotation.get("pdfPath", "Unknown") if annotation else "Unknown"}
)
continue
primary_option = annotation["primaryOption"]
pdf_path = annotation.get("pdfPath", "Unknown")
# Build a result item based on the new annotation structure
if primary_option == "yes-public":
# Public document - no PII info collected with new flow
results["public_document"].append(
{"page_id": page_id, "link": link, "pdf_path": pdf_path, "pii_types": [], "has_pii": False, "description": ""}
)
elif primary_option == "no-public":
# Private document with potential PII
private_pii_options = annotation.get("privatePiiOptions", [])
other_desc = annotation.get("otherPrivateDesc", "")
if not private_pii_options:
# No PII selected in a private document
results["private_document"].append(
{"page_id": page_id, "link": link, "pdf_path": pdf_path, "pii_types": [], "has_pii": False, "description": ""}
)
else:
# PII found in a private document
results["private_document"].append(
{
"page_id": page_id,
"link": link,
"pdf_path": pdf_path,
"pii_types": private_pii_options,
"has_pii": True,
"description": other_desc if "other" in private_pii_options else "",
}
)
elif primary_option == "cannot-read":
results["cannot_read"].append({"page_id": page_id, "link": link, "pdf_path": pdf_path})
elif primary_option == "report-content":
results["report_content"].append({"page_id": page_id, "link": link, "pdf_path": pdf_path})
else:
results["no_annotation"].append({"page_id": page_id, "link": link, "pdf_path": pdf_path})
return results
def print_annotation_report(annotation_results: Dict[str, List[Dict[str, Any]]]):
"""Print a summary report of annotations."""
total_pages = sum(len(items) for items in annotation_results.values())
print("\n" + "=" * 80)
print(f"ANNOTATION REPORT - Total Pages: {total_pages}")
print("=" * 80)
# Count pages with PII in public documents
public_with_pii = [page for page in annotation_results["public_document"] if page.get("has_pii", False)]
public_without_pii = [page for page in annotation_results["public_document"] if not page.get("has_pii", False)]
# Count pages with PII in private documents
private_with_pii = [page for page in annotation_results["private_document"] if page.get("has_pii", False)]
private_without_pii = [page for page in annotation_results["private_document"] if not page.get("has_pii", False)]
# Print summary statistics
print("\nSummary:")
print(
f" Public documents (total): {len(annotation_results['public_document'])} ({len(annotation_results['public_document'])/total_pages*100:.1f}% of all pages)"
)
print(f" - With PII: {len(public_with_pii)} ({len(public_with_pii)/max(1, len(annotation_results['public_document']))*100:.1f}% of public docs)")
print(
f" - Without PII: {len(public_without_pii)} ({len(public_without_pii)/max(1, len(annotation_results['public_document']))*100:.1f}% of public docs)"
)
print(
f" Private documents (total): {len(annotation_results['private_document'])} ({len(annotation_results['private_document'])/total_pages*100:.1f}% of all pages)"
)
print(f" - With PII: {len(private_with_pii)} ({len(private_with_pii)/max(1, len(annotation_results['private_document']))*100:.1f}% of private docs)")
print(
f" - Without PII: {len(private_without_pii)} ({len(private_without_pii)/max(1, len(annotation_results['private_document']))*100:.1f}% of private docs)"
)
print(f" Unreadable pages: {len(annotation_results['cannot_read'])} ({len(annotation_results['cannot_read'])/total_pages*100:.1f}%)")
print(f" Pages with reported content: {len(annotation_results['report_content'])} ({len(annotation_results['report_content'])/total_pages*100:.1f}%)")
print(f" Pages without annotation: {len(annotation_results['no_annotation'])} ({len(annotation_results['no_annotation'])/total_pages*100:.1f}%)")
# With the updated flow, there should be no public documents with PII flags
# as we don't collect PII information for public documents anymore
if public_with_pii:
print("\nNote: With the current annotation flow, public documents should not have PII flags.")
print(f"Found {len(public_with_pii)} public documents incorrectly marked with PII.")
# Analyze PII types in private documents
if private_with_pii:
# Categorize the PII types for clearer reporting
pii_categories = {
"Identifiers": ["names", "email", "phone"],
"PII requiring identifiers": ["addresses", "biographical", "location", "employment", "education", "medical"],
"Always sensitive PII": ["government-id", "financial", "biometric", "login-info"]
}
# Dictionary to track all PII counts
pii_counts_private = {}
for page in private_with_pii:
for pii_type in page.get("pii_types", []):
pii_counts_private[pii_type] = pii_counts_private.get(pii_type, 0) + 1
# Print categorized PII counts
print("\nPII Types in Private Documents:")
# Print each category
for category, pii_types in pii_categories.items():
print(f"\n {category}:")
for pii_type in pii_types:
count = pii_counts_private.get(pii_type, 0)
if count > 0:
print(f" - {pii_type}: {count} ({count/len(private_with_pii)*100:.1f}%)")
# Print any other PII types not in our categories (like "other")
other_pii = [pii_type for pii_type in pii_counts_private.keys()
if not any(pii_type in types for types in pii_categories.values())]
if other_pii:
print("\n Other PII types:")
for pii_type in other_pii:
count = pii_counts_private.get(pii_type, 0)
print(f" - {pii_type}: {count} ({count/len(private_with_pii)*100:.1f}%)")
# With the updated flow, there should be no public documents with PII flags
# so we can remove this section
if public_with_pii:
print("\nNote: Public documents with PII flags found in old annotation results.")
print("These are from annotation sessions before the workflow change and should be disregarded.")
# Print detailed report for private documents with PII
if private_with_pii:
print("\nDetailed Report - Private Documents with PII:")
print("-" * 80)
for i, item in enumerate(private_with_pii, 1):
print(f"{i}. PDF: {item['pdf_path']}")
print(f" Page ID: {item['page_id']}")
print(f" Link: {item['link']}#{item['page_id']}")
print(f" PII Types: {', '.join(item['pii_types'])}")
if item.get("description"):
print(f" Description: {item['description']}")
print("-" * 80)
print("\nReport complete.")
def read_and_process_results(args):
"""Read and process results from a previously generated CSV file."""
try:
# Read the CSV file
links = []
with open(args.read_results, "r") as f:
for line in f:
if line.strip():
links.append(line.strip())
if not links:
print(f"No tinyhost links found in {args.read_results}")
return
print(f"Found {len(links)} tinyhost links in {args.read_results}")
# Fetch and process annotations
annotations_by_link = []
for link in tqdm(links, desc="Fetching annotations"):
try:
annotations, link_url = fetch_annotations(link)
annotations_by_link.append((annotations, link_url))
except Exception as e:
print(f"Error processing {link}: {e}")
# Process and categorize annotations
annotation_results = process_annotations(annotations_by_link)
# Print report
print_annotation_report(annotation_results)
# Save detailed report to file
output_file = Path(args.output_dir) / "annotation_report.csv"
print(f"\nSaving detailed report to {output_file}")
with open(output_file, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["Category", "PDF Path", "Page ID", "Link", "Document Type", "PII Types", "Description"])
for category, items in annotation_results.items():
for item in items:
if category == "public_document":
doc_type = "Public"
pii_types = ", ".join(item.get("pii_types", []))
description = item.get("description", "")
elif category == "private_document":
doc_type = "Private"
pii_types = ", ".join(item.get("pii_types", []))
description = item.get("description", "")
else:
doc_type = ""
pii_types = ""
description = ""
writer.writerow([category, item["pdf_path"], item["page_id"], f"{item['link']}#{item['page_id']}", doc_type, pii_types, description])
print(f"Report saved to {output_file}")
except Exception as e:
print(f"Error processing results: {e}")
def main():
args = parse_args()
# Check if we're reading results from a previous run
if args.read_results:
read_and_process_results(args)
return
# Set up S3 clients
s3_client = boto3.client("s3")
# Set up PDF S3 client with profile if specified
if args.pdf_profile:
pdf_session = boto3.Session(profile_name=args.pdf_profile)
pdf_s3_client = pdf_session.client("s3")
else:
pdf_s3_client = s3_client
# Create output directory
output_dir = Path(args.output_dir)
output_dir.mkdir(exist_ok=True, parents=True)
# List all result files
print(f"Listing result files in {args.workspace}/results...")
result_files = list_result_files(s3_client, args.workspace)
print(f"Found {len(result_files)} result files")
# Use ThreadPoolExecutor to parallelize the generation of sample sets
output_files = []
if args.repeats > 1:
print(f"Using ThreadPoolExecutor with {min(args.max_workers, args.repeats)} workers")
with ThreadPoolExecutor(max_workers=min(args.max_workers, args.repeats)) as executor:
futures = []
for i in range(args.repeats):
future = executor.submit(generate_sample_set, args, i, s3_client, pdf_s3_client, result_files)
futures.append(future)
# Wait for all futures to complete and collect results
for future in futures:
try:
output_filename = future.result()
output_files.append(output_filename)
print(f"Completed generation of {output_filename}")
except Exception as e:
print(f"Error generating sample set: {e}")
else:
# If only one repeat, just run it directly
output_filename = generate_sample_set(args, 0, s3_client, pdf_s3_client, result_files)
output_files.append(output_filename)
# Now upload each resulting file into tinyhost
print("Generated all files, uploading tinyhost links now")
links = []
for output_filename in output_files:
link = tinyhost.tinyhost([str(output_filename)])[0]
links.append(link)
print(link)
# Create CSV file with just the tinyhost links, one per line
csv_path = args.prolific_csv
print(f"Writing tinyhost links to {csv_path}")
with open(csv_path, "w", newline="") as csvfile:
for link in links:
csvfile.write(f"{link}\n")
print(f"Tinyhost links written to {csv_path}")
if __name__ == "__main__":
main()