Working on annotation for dolma docs release

This commit is contained in:
Jake Poznanski 2025-04-16 19:29:45 +00:00
parent 9a67f50539
commit e16f66d6c5
2 changed files with 178 additions and 242 deletions

View File

@ -1,51 +1,79 @@
import argparse
import base64
import json
import os
import random
import re
import tempfile
import urllib.parse
from concurrent.futures import ThreadPoolExecutor
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple
from typing import Any, Dict, List, Optional
import boto3
import pydantic
import requests
from openai import OpenAI
from tqdm import tqdm
from olmocr.data.renderpdf import render_pdf_to_base64png
from olmocr.s3_utils import get_s3_bytes, parse_s3_path
LanguageCode = Enum(
"LanguageCode",
{
"en": "English",
"zh": "Chinese",
"hi": "Hindi",
"es": "Spanish",
"fr": "French",
"ar": "Arabic",
"bn": "Bengali",
"ru": "Russian",
"pt": "Portuguese",
"ur": "Urdu",
"id": "Indonesian",
"de": "German",
"ja": "Japanese",
"sw": "Swahili",
"mr": "Marathi",
"te": "Telugu",
"tr": "Turkish",
"vi": "Vietnamese",
"ta": "Tamil",
"ko": "Korean",
"other": "Other",
},
)
class PIIAnnotation(pydantic.BaseModel):
"""Structured model for PII annotations returned by ChatGPT"""
document_description: str
language_code: LanguageCode
cannot_read: bool
inappropriate_content: bool
is_public_document: bool
cannot_read: bool = False
inappropriate_content: bool = False
# PII identifiers
contains_names: bool = False
contains_email_addresses: bool = False
contains_phone_numbers: bool = False
contains_names: bool
contains_email_addresses: bool
contains_phone_numbers: bool
# PII that must co-occur with identifiers
contains_addresses: bool = False
contains_biographical_info: bool = False # DOB, gender, etc.
contains_location_info: bool = False
contains_employment_info: bool = False
contains_education_info: bool = False
contains_medical_info: bool = False
contains_addresses: bool
contains_biographical_info: bool # DOB, gender, etc.
contains_location_info: bool
contains_employment_info: bool
contains_education_info: bool
contains_medical_info: bool
# Always sensitive PII
contains_government_ids: bool = False # SSN, passport, etc.
contains_financial_info: bool = False # Credit card, bank account
contains_biometric_data: bool = False
contains_login_info: bool = False # Username + password
other_pii: str = ""
contains_government_ids: bool
contains_financial_info: bool
contains_biometric_data: bool
contains_login_info: bool
other_pii: str
@property
def has_pii(self) -> bool:
"""Check if the document contains any PII"""
@ -62,14 +90,14 @@ class PIIAnnotation(pydantic.BaseModel):
self.contains_government_ids,
self.contains_financial_info,
self.contains_biometric_data,
self.contains_login_info
self.contains_login_info,
]
return any(pii_fields) or bool(self.other_pii.strip())
def get_pii_types(self) -> List[str]:
"""Get a list of all PII types found in the document"""
pii_types = []
if self.contains_names:
pii_types.append("names")
if self.contains_email_addresses:
@ -98,7 +126,7 @@ class PIIAnnotation(pydantic.BaseModel):
pii_types.append("login-info")
if self.other_pii.strip():
pii_types.append("other")
return pii_types
@ -125,7 +153,7 @@ def list_result_files(s3_client, workspace_path):
if "Contents" in page:
all_files.extend([f"s3://{bucket}/{obj['Key']}" for obj in page["Contents"] if obj["Key"].endswith(".jsonl") or obj["Key"].endswith(".json")])
if len(all_files)>1000:
if len(all_files) > 1000:
break
return all_files
@ -201,7 +229,7 @@ def get_random_pages(s3_client, result_files, count=30):
def chatgpt_analyze_page(pdf_path: str, page_num: int, pdf_s3_client, openai_api_key: str, openai_model: str) -> Optional[PIIAnnotation]:
"""Analyze a page using the ChatGPT vision model."""
"""Analyze a page using the ChatGPT vision model with structured outputs."""
try:
# Download PDF to temp file and render to image
bucket, key = parse_s3_path(pdf_path)
@ -212,12 +240,15 @@ def chatgpt_analyze_page(pdf_path: str, page_num: int, pdf_s3_client, openai_api
# Render PDF to base64 image
base64_image = render_pdf_to_base64png(temp_file_path, page_num, target_longest_image_dim=2048)
# Clean up temp file
os.unlink(temp_file_path)
# Prepare the ChatGPT system prompt with PII guidelines
system_prompt = """
# Create OpenAI client
client = OpenAI(api_key=openai_api_key)
# Prepare the user message with all instructions
user_message = """
You are a document analyzer that identifies Personally Identifiable Information (PII) in documents.
Your task is to analyze the provided document image and determine:
1. Whether the document is intended for public release or dissemination (e.g., research paper, public report, etc.)
@ -247,96 +278,24 @@ The following should ALWAYS be marked as PII even if they do not occur alongside
- Biometric Data (fingerprints, retina scans, facial recognition data, voice signatures)
- Login information (ONLY mark as PII when a username, password, and login location are present together)
Your response should be a valid JSON object matching the PIIAnnotation model.
If the document is a form, then only consider fields which are filled out with specific values as potential PII.
"""
# Prepare the user message
user_message = """
Please analyze this document page and determine if it contains any PII (Personally Identifiable Information).
Return your analysis in JSON format following this model structure:
{
"is_public_document": true/false,
"cannot_read": true/false,
"inappropriate_content": true/false,
"contains_names": true/false,
"contains_email_addresses": true/false,
"contains_phone_numbers": true/false,
"contains_addresses": true/false,
"contains_biographical_info": true/false,
"contains_location_info": true/false,
"contains_employment_info": true/false,
"contains_education_info": true/false,
"contains_medical_info": true/false,
"contains_government_ids": true/false,
"contains_financial_info": true/false,
"contains_biometric_data": true/false,
"contains_login_info": true/false,
"other_pii": ""
}
Follow the guidelines I provided carefully when determining if the document contains PII.
"""
# API request to ChatGPT vision model
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {openai_api_key}"
}
payload = {
"model": openai_model,
"messages": [
{
"role": "system",
"content": system_prompt
},
# Use the chat completions API with the custom schema
completion = client.beta.chat.completions.parse(
model=openai_model,
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": user_message
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/webp;base64,{base64_image}"
}
}
]
"content": [{"type": "text", "text": user_message}, {"type": "image_url", "image_url": {"url": f"data:image/webp;base64,{base64_image}"}}],
}
],
"max_tokens": 1000
}
response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers=headers,
json=payload
response_format=PIIAnnotation,
max_tokens=1000,
)
if response.status_code != 200:
print(f"Error from OpenAI API: {response.status_code} - {response.text}")
return None
# Extract the JSON from the response
response_data = response.json()
content = response_data["choices"][0]["message"]["content"]
# Try to extract JSON from the content (sometimes it might include explanatory text)
json_match = re.search(r'```json\s*({[\s\S]*?})\s*```|({[\s\S]*})', content)
if json_match:
json_str = json_match.group(1) or json_match.group(2)
try:
return PIIAnnotation.parse_raw(json_str)
except pydantic.ValidationError as e:
print(f"Error parsing JSON response: {e}")
return None
else:
print(f"No JSON found in response: {content}")
return None
return completion.choices[0].message.parsed
except Exception as e:
print(f"Error analyzing page {pdf_path} (page {page_num}): {e}")
return None
@ -346,11 +305,7 @@ def create_presigned_url(s3_client, pdf_path, expiration=3600 * 24 * 7):
"""Create a presigned URL for the given S3 path."""
try:
bucket, key = parse_s3_path(pdf_path)
url = s3_client.generate_presigned_url(
"get_object",
Params={"Bucket": bucket, "Key": key},
ExpiresIn=expiration
)
url = s3_client.generate_presigned_url("get_object", Params={"Bucket": bucket, "Key": key}, ExpiresIn=expiration)
return url
except Exception as e:
print(f"Error creating presigned URL for {pdf_path}: {e}")
@ -360,7 +315,7 @@ def create_presigned_url(s3_client, pdf_path, expiration=3600 * 24 * 7):
def process_pages(random_pages, pdf_s3_client, openai_api_key, openai_model, max_workers):
"""Process multiple pages in parallel using ThreadPoolExecutor."""
results = []
# First generate presigned URLs for all PDFs
print("Generating presigned URLs for PDFs...")
presigned_urls = {}
@ -369,22 +324,15 @@ def process_pages(random_pages, pdf_s3_client, openai_api_key, openai_model, max
url = create_presigned_url(pdf_s3_client, pdf_path)
if url:
presigned_urls[pdf_path] = url
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {}
# Submit all tasks
for pdf_path, page_num, page_text, result_file in tqdm(random_pages, desc="Submitting pages for analysis"):
future = executor.submit(
chatgpt_analyze_page,
pdf_path,
page_num,
pdf_s3_client,
openai_api_key,
openai_model
)
future = executor.submit(chatgpt_analyze_page, pdf_path, page_num, pdf_s3_client, openai_api_key, openai_model)
futures[future] = (pdf_path, page_num, page_text, result_file)
# Process results as they complete
for future in tqdm(futures, desc="Processing results"):
pdf_path, page_num, page_text, result_file = futures[future]
@ -395,13 +343,13 @@ def process_pages(random_pages, pdf_s3_client, openai_api_key, openai_model, max
presigned_url = None
if pdf_path in presigned_urls:
presigned_url = f"{presigned_urls[pdf_path]}#page={page_num}"
results.append((pdf_path, page_num, page_text, result_file, annotation, presigned_url))
else:
print(f"Failed to get annotation for {pdf_path} (page {page_num})")
except Exception as e:
print(f"Error processing {pdf_path} (page {page_num}): {e}")
return results
@ -414,44 +362,38 @@ def categorize_results(all_results):
"report_content": [],
"no_annotation": [],
}
for pdf_path, page_num, page_text, result_file, annotation, presigned_url in all_results:
if annotation.cannot_read:
categories["cannot_read"].append({
"pdf_path": pdf_path,
"pdf_page": page_num,
"result_file": result_file,
"presigned_url": presigned_url
})
if annotation.cannot_read or annotation.language_code != LanguageCode.en:
categories["cannot_read"].append({"pdf_path": pdf_path, "pdf_page": page_num, "result_file": result_file, "presigned_url": presigned_url})
elif annotation.inappropriate_content:
categories["report_content"].append({
"pdf_path": pdf_path,
"pdf_page": page_num,
"result_file": result_file,
"presigned_url": presigned_url
})
categories["report_content"].append({"pdf_path": pdf_path, "pdf_page": page_num, "result_file": result_file, "presigned_url": presigned_url})
elif annotation.is_public_document:
categories["public_document"].append({
"pdf_path": pdf_path,
"pdf_page": page_num,
"result_file": result_file,
"pii_types": annotation.get_pii_types(),
"has_pii": annotation.has_pii,
"description": annotation.other_pii,
"presigned_url": presigned_url
})
categories["public_document"].append(
{
"pdf_path": pdf_path,
"pdf_page": page_num,
"result_file": result_file,
"pii_types": annotation.get_pii_types(),
"has_pii": annotation.has_pii,
"description": annotation.other_pii,
"presigned_url": presigned_url,
}
)
else:
# Private document
categories["private_document"].append({
"pdf_path": pdf_path,
"pdf_page": page_num,
"result_file": result_file,
"pii_types": annotation.get_pii_types(),
"has_pii": annotation.has_pii,
"description": annotation.other_pii,
"presigned_url": presigned_url
})
categories["private_document"].append(
{
"pdf_path": pdf_path,
"pdf_page": page_num,
"result_file": result_file,
"pii_types": annotation.get_pii_types(),
"has_pii": annotation.has_pii,
"description": annotation.other_pii,
"presigned_url": presigned_url,
}
)
return categories
@ -532,10 +474,10 @@ def print_annotation_report(annotation_results: Dict[str, List[Dict[str, Any]]])
print("\nDetailed Report - Private Documents with PII:")
print("-" * 80)
for i, item in enumerate(private_with_pii, 1):
pdf_path = item['pdf_path']
pdf_page = item['pdf_page']
presigned_url = item.get('presigned_url')
pdf_path = item["pdf_path"]
pdf_page = item["pdf_page"]
presigned_url = item.get("presigned_url")
print(f"{i}. PDF: {pdf_path}")
print(f" Page: {pdf_page}")
if presigned_url:
@ -544,31 +486,31 @@ def print_annotation_report(annotation_results: Dict[str, List[Dict[str, Any]]])
if item.get("description"):
print(f" Description: {item['description']}")
print("-" * 80)
# Print links to unreadable pages
if annotation_results["cannot_read"]:
print("\nUnreadable Pages:")
print("-" * 80)
for i, item in enumerate(annotation_results["cannot_read"], 1):
pdf_path = item['pdf_path']
pdf_page = item['pdf_page']
presigned_url = item.get('presigned_url')
pdf_path = item["pdf_path"]
pdf_page = item["pdf_page"]
presigned_url = item.get("presigned_url")
print(f"{i}. PDF: {pdf_path}")
print(f" Page: {pdf_page}")
if presigned_url:
print(f" Presigned URL: {presigned_url}")
print("-" * 80)
# Print links to inappropriate content
if annotation_results["report_content"]:
print("\nReported Content:")
print("-" * 80)
for i, item in enumerate(annotation_results["report_content"], 1):
pdf_path = item['pdf_path']
pdf_page = item['pdf_page']
presigned_url = item.get('presigned_url')
pdf_path = item["pdf_path"]
pdf_page = item["pdf_page"]
presigned_url = item.get("presigned_url")
print(f"{i}. PDF: {pdf_path}")
print(f" Page: {pdf_page}")
if presigned_url:
@ -581,72 +523,68 @@ def print_annotation_report(annotation_results: Dict[str, List[Dict[str, Any]]])
def save_results(results, output_dir):
"""Save the results to a JSON file."""
output_path = Path(output_dir) / "autoscan_results.json"
# Convert results to serializable format
serializable_results = []
for pdf_path, page_num, page_text, result_file, annotation, presigned_url in results:
serializable_results.append({
"pdf_path": pdf_path,
"page_num": page_num,
"page_text": page_text,
"result_file": result_file,
"annotation": annotation.dict(),
"presigned_url": presigned_url
})
serializable_results.append(
{
"pdf_path": pdf_path,
"page_num": page_num,
"page_text": page_text,
"result_file": result_file,
"annotation": annotation.dict(),
"presigned_url": presigned_url,
}
)
with open(output_path, "w") as f:
json.dump(serializable_results, f, indent=2)
json.dump(serializable_results, f, indent=2, default=lambda o: o.value if isinstance(o, Enum) else o)
print(f"Results saved to {output_path}")
def main():
args = parse_args()
# Get OpenAI API key from args or environment
openai_api_key = args.openai_api_key or os.environ.get("OPENAI_API_KEY")
if not openai_api_key:
raise ValueError("OpenAI API key must be provided via --openai_api_key or OPENAI_API_KEY environment variable")
# Set up S3 clients
s3_client = boto3.client("s3")
# Set up PDF S3 client with profile if specified
if args.pdf_profile:
pdf_session = boto3.Session(profile_name=args.pdf_profile)
pdf_s3_client = pdf_session.client("s3")
else:
pdf_s3_client = s3_client
# Create output directory
output_dir = Path(args.output_dir)
output_dir.mkdir(exist_ok=True, parents=True)
# List all result files
print(f"Listing result files in {args.workspace}/results...")
result_files = list_result_files(s3_client, args.workspace)
print(f"Found {len(result_files)} result files")
# Get random pages
random_pages = get_random_pages(s3_client, result_files, args.pages_per_run)
# Process pages with ChatGPT
print(f"Processing {len(random_pages)} pages with ChatGPT...")
all_results = process_pages(
random_pages,
pdf_s3_client,
openai_api_key,
args.openai_model,
args.max_workers
)
all_results = process_pages(random_pages, pdf_s3_client, openai_api_key, args.openai_model, args.max_workers)
# Save results
save_results(all_results, args.output_dir)
# Categorize and report results
categorized_results = categorize_results(all_results)
print_annotation_report(categorized_results)
if __name__ == "__main__":
main()
main()

View File

@ -1304,20 +1304,20 @@ def extract_datastore_url(html_content: str) -> Optional[str]:
def extract_page_number_from_html(html_content: str, page_id: str) -> Optional[int]:
"""Extract PDF page number from HTML content for a specific page_id.
This is a fallback mechanism for older versions of the annotation page
This is a fallback mechanism for older versions of the annotation page
that didn't store the page number in a data attribute.
"""
# Try to find the page number in the "View Cached PDF (page X)" text
# Look for section with this page_id
page_section_pattern = f'<div class="page-container"[^>]*data-index="([^"]*)"[^>]*>.*?<div class="page-info">.*?<a href="[^"]*#page=([0-9]+)"[^>]*>View Cached PDF \\(page ([0-9]+)\\)</a>'
page_section_pattern = '<div class="page-container"[^>]*data-index="([^"]*)"[^>]*>.*?<div class="page-info">.*?<a href="[^"]*#page=([0-9]+)"[^>]*>View Cached PDF \\(page ([0-9]+)\\)</a>'
matches = re.finditer(page_section_pattern, html_content, re.DOTALL)
for match in matches:
container_index = match.group(1)
pdf_page_from_url = match.group(2)
pdf_page_from_text = match.group(3)
# Check if this container index matches our page_id (page-X)
if f"page-{container_index}" == page_id:
# Both numbers should be the same, but prefer the one from the URL fragment
@ -1328,7 +1328,7 @@ def extract_page_number_from_html(html_content: str, page_id: str) -> Optional[i
return int(pdf_page_from_text)
except (ValueError, TypeError):
pass
return None
@ -1379,28 +1379,27 @@ def process_annotations(annotations_by_link: List[Tuple[Dict[str, Any], str, str
primary_option = annotation["primaryOption"]
pdf_path = annotation.get("pdfPath", "Unknown")
# Get PDF page number from annotation data
# This is the actual page number in the PDF that was annotated
pdf_page = None
# First try to get it from the annotation data (for new format)
if annotation.get("pdfPage"):
try:
pdf_page = int(annotation.get("pdfPage"))
except (ValueError, TypeError):
pass
# Fallback: try to extract page number from HTML content (for older format)
if pdf_page is None:
pdf_page = extract_page_number_from_html(html_content, page_id)
# Build a result item based on the new annotation structure
if primary_option == "yes-public":
# Public document - no PII info collected with new flow
results["public_document"].append(
{"page_id": page_id, "link": link, "pdf_path": pdf_path, "pdf_page": pdf_page,
"pii_types": [], "has_pii": False, "description": ""}
{"page_id": page_id, "link": link, "pdf_path": pdf_path, "pdf_page": pdf_page, "pii_types": [], "has_pii": False, "description": ""}
)
elif primary_option == "no-public":
@ -1411,8 +1410,7 @@ def process_annotations(annotations_by_link: List[Tuple[Dict[str, Any], str, str
if not private_pii_options:
# No PII selected in a private document
results["private_document"].append(
{"page_id": page_id, "link": link, "pdf_path": pdf_path, "pdf_page": pdf_page,
"pii_types": [], "has_pii": False, "description": ""}
{"page_id": page_id, "link": link, "pdf_path": pdf_path, "pdf_page": pdf_page, "pii_types": [], "has_pii": False, "description": ""}
)
else:
# PII found in a private document
@ -1529,19 +1527,19 @@ def print_annotation_report(annotation_results: Dict[str, List[Dict[str, Any]]],
print("\nDetailed Report - Private Documents with PII:")
print("-" * 80)
for i, item in enumerate(private_with_pii, 1):
pdf_path = item['pdf_path']
page_id = item['page_id']
# Get the actual PDF page number
pdf_page = item.get('pdf_page')
pdf_path = item["pdf_path"]
page_id = item["page_id"]
# Get the actual PDF page number
pdf_page = item.get("pdf_page")
# Generate presigned URL with PDF page number if client is available
presigned_url = None
if pdf_s3_client and pdf_path.startswith("s3://"):
presigned_url = create_presigned_url(pdf_s3_client, pdf_path)
if presigned_url and pdf_page is not None:
presigned_url += f"#page={pdf_page}"
print(f"{i}. PDF: {pdf_path}")
print(f" Page ID: {page_id}")
print(f" Link: {item['link']}#{page_id}")
@ -1570,7 +1568,7 @@ def read_and_process_results(args):
return
print(f"Found {len(links)} tinyhost links in {args.read_results}")
# Set up PDF S3 client with profile if specified
if args.pdf_profile:
pdf_session = boto3.Session(profile_name=args.pdf_profile)
@ -1604,11 +1602,10 @@ def read_and_process_results(args):
for category, items in annotation_results.items():
for item in items:
pdf_path = item["pdf_path"]
page_id = item["page_id"]
# Get the actual PDF page number
pdf_page = item.get("pdf_page")
# Generate presigned URL with the PDF page number
presigned_url = ""
if pdf_path.startswith("s3://"):
@ -1617,7 +1614,7 @@ def read_and_process_results(args):
presigned_url = f"{url}#page={pdf_page}"
elif url:
presigned_url = url
if category == "public_document":
doc_type = "Public"
pii_types = ", ".join(item.get("pii_types", []))
@ -1631,8 +1628,9 @@ def read_and_process_results(args):
pii_types = ""
description = ""
writer.writerow([category, item["pdf_path"], item["page_id"], f"{item['link']}#{item['page_id']}",
presigned_url, doc_type, pii_types, description])
writer.writerow(
[category, item["pdf_path"], item["page_id"], f"{item['link']}#{item['page_id']}", presigned_url, doc_type, pii_types, description]
)
print(f"Report saved to {output_file}")