Doing some work on annotations again...

This commit is contained in:
Jake Poznanski 2025-04-15 22:27:07 +00:00
parent 1d0c560455
commit 9a67f50539
2 changed files with 765 additions and 25 deletions

View File

@ -0,0 +1,652 @@
import argparse
import base64
import json
import os
import random
import re
import tempfile
import urllib.parse
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple
import boto3
import pydantic
import requests
from tqdm import tqdm
from olmocr.data.renderpdf import render_pdf_to_base64png
from olmocr.s3_utils import get_s3_bytes, parse_s3_path
class PIIAnnotation(pydantic.BaseModel):
"""Structured model for PII annotations returned by ChatGPT"""
is_public_document: bool
cannot_read: bool = False
inappropriate_content: bool = False
# PII identifiers
contains_names: bool = False
contains_email_addresses: bool = False
contains_phone_numbers: bool = False
# PII that must co-occur with identifiers
contains_addresses: bool = False
contains_biographical_info: bool = False # DOB, gender, etc.
contains_location_info: bool = False
contains_employment_info: bool = False
contains_education_info: bool = False
contains_medical_info: bool = False
# Always sensitive PII
contains_government_ids: bool = False # SSN, passport, etc.
contains_financial_info: bool = False # Credit card, bank account
contains_biometric_data: bool = False
contains_login_info: bool = False # Username + password
other_pii: str = ""
@property
def has_pii(self) -> bool:
"""Check if the document contains any PII"""
pii_fields = [
self.contains_names,
self.contains_email_addresses,
self.contains_phone_numbers,
self.contains_addresses,
self.contains_biographical_info,
self.contains_location_info,
self.contains_employment_info,
self.contains_education_info,
self.contains_medical_info,
self.contains_government_ids,
self.contains_financial_info,
self.contains_biometric_data,
self.contains_login_info
]
return any(pii_fields) or bool(self.other_pii.strip())
def get_pii_types(self) -> List[str]:
"""Get a list of all PII types found in the document"""
pii_types = []
if self.contains_names:
pii_types.append("names")
if self.contains_email_addresses:
pii_types.append("email")
if self.contains_phone_numbers:
pii_types.append("phone")
if self.contains_addresses:
pii_types.append("addresses")
if self.contains_biographical_info:
pii_types.append("biographical")
if self.contains_location_info:
pii_types.append("location")
if self.contains_employment_info:
pii_types.append("employment")
if self.contains_education_info:
pii_types.append("education")
if self.contains_medical_info:
pii_types.append("medical")
if self.contains_government_ids:
pii_types.append("government-id")
if self.contains_financial_info:
pii_types.append("financial")
if self.contains_biometric_data:
pii_types.append("biometric")
if self.contains_login_info:
pii_types.append("login-info")
if self.other_pii.strip():
pii_types.append("other")
return pii_types
def parse_args():
parser = argparse.ArgumentParser(description="Automatically scan OLMO OCR workspace results using ChatGPT")
parser.add_argument("workspace", help="OLMO OCR workspace path (s3://bucket/workspace)")
parser.add_argument("--pages_per_run", type=int, default=30, help="Number of pages per run")
parser.add_argument("--pdf_profile", help="AWS profile for accessing PDFs")
parser.add_argument("--output_dir", default="dolma_samples", help="Directory to save output files")
parser.add_argument("--max_workers", type=int, default=4, help="Maximum number of worker threads")
parser.add_argument("--openai_api_key", help="OpenAI API key (or set OPENAI_API_KEY env var)")
parser.add_argument("--openai_model", default="gpt-4.1", help="OpenAI model to use")
return parser.parse_args()
def list_result_files(s3_client, workspace_path):
"""List all JSON result files in the workspace results directory."""
bucket, prefix = parse_s3_path(workspace_path)
results_prefix = os.path.join(prefix, "results").rstrip("/") + "/"
all_files = []
paginator = s3_client.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket, Prefix=results_prefix):
if "Contents" in page:
all_files.extend([f"s3://{bucket}/{obj['Key']}" for obj in page["Contents"] if obj["Key"].endswith(".jsonl") or obj["Key"].endswith(".json")])
if len(all_files)>1000:
break
return all_files
def get_random_pages(s3_client, result_files, count=30):
"""Get random pages from the result files."""
random_pages = []
# Try to collect the requested number of pages
attempts = 0
max_attempts = count * 3 # Allow extra attempts to handle potential failures
while len(random_pages) < count and attempts < max_attempts:
attempts += 1
# Pick a random result file
if not result_files:
print("No result files found!")
break
result_file = random.choice(result_files)
try:
# Get the content of the file
content = get_s3_bytes(s3_client, result_file)
lines = content.decode("utf-8").strip().split("\n")
if not lines:
continue
# Pick a random line (which contains a complete document)
line = random.choice(lines)
doc = json.loads(line)
# A Dolma document has "text", "metadata", and "attributes" fields
if "text" not in doc or "metadata" not in doc or "attributes" not in doc:
print(f"Document in {result_file} is not a valid Dolma document")
continue
# Get the original PDF path from metadata
pdf_path = doc["metadata"].get("Source-File")
if not pdf_path:
continue
# Get page spans from attributes
page_spans = doc["attributes"].get("pdf_page_numbers", [])
if not page_spans:
continue
# Pick a random page span
page_span = random.choice(page_spans)
if len(page_span) >= 3:
# Page spans are [start_pos, end_pos, page_num]
page_num = page_span[2]
# Extract text for this page
start_pos, end_pos = page_span[0], page_span[1]
page_text = doc["text"][start_pos:end_pos].strip()
# Include the text snippet with the page info
random_pages.append((pdf_path, page_num, page_text, result_file))
if len(random_pages) >= count:
break
except Exception as e:
print(f"Error processing {result_file}: {e}")
continue
print(f"Found {len(random_pages)} random pages from Dolma documents")
return random_pages
def chatgpt_analyze_page(pdf_path: str, page_num: int, pdf_s3_client, openai_api_key: str, openai_model: str) -> Optional[PIIAnnotation]:
"""Analyze a page using the ChatGPT vision model."""
try:
# Download PDF to temp file and render to image
bucket, key = parse_s3_path(pdf_path)
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_file:
pdf_data = pdf_s3_client.get_object(Bucket=bucket, Key=key)["Body"].read()
temp_file.write(pdf_data)
temp_file_path = temp_file.name
# Render PDF to base64 image
base64_image = render_pdf_to_base64png(temp_file_path, page_num, target_longest_image_dim=2048)
# Clean up temp file
os.unlink(temp_file_path)
# Prepare the ChatGPT system prompt with PII guidelines
system_prompt = """
You are a document analyzer that identifies Personally Identifiable Information (PII) in documents.
Your task is to analyze the provided document image and determine:
1. Whether the document is intended for public release or dissemination (e.g., research paper, public report, etc.)
2. If the document contains any PII
For PII identification, follow these specific guidelines:
IDENTIFIERS FOR PII:
The following are considered identifiers that can make information PII:
- Names (full names, first names, last names, nicknames)
- Email addresses
- Phone numbers
PII THAT MUST CO-OCCUR WITH AN IDENTIFIER:
The following types of information should ONLY be marked as PII if they occur ALONGSIDE an identifier (commonly, a person's name):
- Addresses (street address, postal code, etc.)
- Biographical Information (date of birth, place of birth, gender, sexual orientation, race, ethnicity, citizenship/immigration status, religion)
- Location Information (geolocations, specific coordinates)
- Employment Information (job titles, workplace names, employment history)
- Education Information (school names, degrees, transcripts)
- Medical Information (health records, diagnoses, genetic or neural data)
PII THAT OCCURS EVEN WITHOUT AN IDENTIFIER:
The following should ALWAYS be marked as PII even if they do not occur alongside an identifier:
- Government IDs (Social Security Numbers, passport numbers, driver's license numbers, tax IDs)
- Financial Information (credit card numbers, bank account/routing numbers)
- Biometric Data (fingerprints, retina scans, facial recognition data, voice signatures)
- Login information (ONLY mark as PII when a username, password, and login location are present together)
Your response should be a valid JSON object matching the PIIAnnotation model.
"""
# Prepare the user message
user_message = """
Please analyze this document page and determine if it contains any PII (Personally Identifiable Information).
Return your analysis in JSON format following this model structure:
{
"is_public_document": true/false,
"cannot_read": true/false,
"inappropriate_content": true/false,
"contains_names": true/false,
"contains_email_addresses": true/false,
"contains_phone_numbers": true/false,
"contains_addresses": true/false,
"contains_biographical_info": true/false,
"contains_location_info": true/false,
"contains_employment_info": true/false,
"contains_education_info": true/false,
"contains_medical_info": true/false,
"contains_government_ids": true/false,
"contains_financial_info": true/false,
"contains_biometric_data": true/false,
"contains_login_info": true/false,
"other_pii": ""
}
Follow the guidelines I provided carefully when determining if the document contains PII.
"""
# API request to ChatGPT vision model
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {openai_api_key}"
}
payload = {
"model": openai_model,
"messages": [
{
"role": "system",
"content": system_prompt
},
{
"role": "user",
"content": [
{
"type": "text",
"text": user_message
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/webp;base64,{base64_image}"
}
}
]
}
],
"max_tokens": 1000
}
response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers=headers,
json=payload
)
if response.status_code != 200:
print(f"Error from OpenAI API: {response.status_code} - {response.text}")
return None
# Extract the JSON from the response
response_data = response.json()
content = response_data["choices"][0]["message"]["content"]
# Try to extract JSON from the content (sometimes it might include explanatory text)
json_match = re.search(r'```json\s*({[\s\S]*?})\s*```|({[\s\S]*})', content)
if json_match:
json_str = json_match.group(1) or json_match.group(2)
try:
return PIIAnnotation.parse_raw(json_str)
except pydantic.ValidationError as e:
print(f"Error parsing JSON response: {e}")
return None
else:
print(f"No JSON found in response: {content}")
return None
except Exception as e:
print(f"Error analyzing page {pdf_path} (page {page_num}): {e}")
return None
def create_presigned_url(s3_client, pdf_path, expiration=3600 * 24 * 7):
"""Create a presigned URL for the given S3 path."""
try:
bucket, key = parse_s3_path(pdf_path)
url = s3_client.generate_presigned_url(
"get_object",
Params={"Bucket": bucket, "Key": key},
ExpiresIn=expiration
)
return url
except Exception as e:
print(f"Error creating presigned URL for {pdf_path}: {e}")
return None
def process_pages(random_pages, pdf_s3_client, openai_api_key, openai_model, max_workers):
"""Process multiple pages in parallel using ThreadPoolExecutor."""
results = []
# First generate presigned URLs for all PDFs
print("Generating presigned URLs for PDFs...")
presigned_urls = {}
for pdf_path, page_num, _, _ in random_pages:
if pdf_path not in presigned_urls and pdf_path.startswith("s3://"):
url = create_presigned_url(pdf_s3_client, pdf_path)
if url:
presigned_urls[pdf_path] = url
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {}
# Submit all tasks
for pdf_path, page_num, page_text, result_file in tqdm(random_pages, desc="Submitting pages for analysis"):
future = executor.submit(
chatgpt_analyze_page,
pdf_path,
page_num,
pdf_s3_client,
openai_api_key,
openai_model
)
futures[future] = (pdf_path, page_num, page_text, result_file)
# Process results as they complete
for future in tqdm(futures, desc="Processing results"):
pdf_path, page_num, page_text, result_file = futures[future]
try:
annotation = future.result()
if annotation:
# Get presigned URL with page number
presigned_url = None
if pdf_path in presigned_urls:
presigned_url = f"{presigned_urls[pdf_path]}#page={page_num}"
results.append((pdf_path, page_num, page_text, result_file, annotation, presigned_url))
else:
print(f"Failed to get annotation for {pdf_path} (page {page_num})")
except Exception as e:
print(f"Error processing {pdf_path} (page {page_num}): {e}")
return results
def categorize_results(all_results):
"""Categorize results for reporting."""
categories = {
"public_document": [],
"private_document": [],
"cannot_read": [],
"report_content": [],
"no_annotation": [],
}
for pdf_path, page_num, page_text, result_file, annotation, presigned_url in all_results:
if annotation.cannot_read:
categories["cannot_read"].append({
"pdf_path": pdf_path,
"pdf_page": page_num,
"result_file": result_file,
"presigned_url": presigned_url
})
elif annotation.inappropriate_content:
categories["report_content"].append({
"pdf_path": pdf_path,
"pdf_page": page_num,
"result_file": result_file,
"presigned_url": presigned_url
})
elif annotation.is_public_document:
categories["public_document"].append({
"pdf_path": pdf_path,
"pdf_page": page_num,
"result_file": result_file,
"pii_types": annotation.get_pii_types(),
"has_pii": annotation.has_pii,
"description": annotation.other_pii,
"presigned_url": presigned_url
})
else:
# Private document
categories["private_document"].append({
"pdf_path": pdf_path,
"pdf_page": page_num,
"result_file": result_file,
"pii_types": annotation.get_pii_types(),
"has_pii": annotation.has_pii,
"description": annotation.other_pii,
"presigned_url": presigned_url
})
return categories
def print_annotation_report(annotation_results: Dict[str, List[Dict[str, Any]]]):
"""Print a summary report of annotations."""
total_pages = sum(len(items) for items in annotation_results.values())
print("\n" + "=" * 80)
print(f"ANNOTATION REPORT - Total Pages: {total_pages}")
print("=" * 80)
# Count pages with PII in public documents
public_with_pii = [page for page in annotation_results["public_document"] if page.get("has_pii", False)]
public_without_pii = [page for page in annotation_results["public_document"] if not page.get("has_pii", False)]
# Count pages with PII in private documents
private_with_pii = [page for page in annotation_results["private_document"] if page.get("has_pii", False)]
private_without_pii = [page for page in annotation_results["private_document"] if not page.get("has_pii", False)]
# Print summary statistics
print("\nSummary:")
print(
f" Public documents (total): {len(annotation_results['public_document'])} ({len(annotation_results['public_document'])/total_pages*100:.1f}% of all pages)"
)
print(f" - With PII: {len(public_with_pii)} ({len(public_with_pii)/max(1, len(annotation_results['public_document']))*100:.1f}% of public docs)")
print(
f" - Without PII: {len(public_without_pii)} ({len(public_without_pii)/max(1, len(annotation_results['public_document']))*100:.1f}% of public docs)"
)
print(
f" Private documents (total): {len(annotation_results['private_document'])} ({len(annotation_results['private_document'])/total_pages*100:.1f}% of all pages)"
)
print(f" - With PII: {len(private_with_pii)} ({len(private_with_pii)/max(1, len(annotation_results['private_document']))*100:.1f}% of private docs)")
print(
f" - Without PII: {len(private_without_pii)} ({len(private_without_pii)/max(1, len(annotation_results['private_document']))*100:.1f}% of private docs)"
)
print(f" Unreadable pages: {len(annotation_results['cannot_read'])} ({len(annotation_results['cannot_read'])/total_pages*100:.1f}%)")
print(f" Pages with reported content: {len(annotation_results['report_content'])} ({len(annotation_results['report_content'])/total_pages*100:.1f}%)")
print(f" Pages without annotation: {len(annotation_results['no_annotation'])} ({len(annotation_results['no_annotation'])/total_pages*100:.1f}%)")
# Analyze PII types in private documents
if private_with_pii:
# Categorize the PII types for clearer reporting
pii_categories = {
"Identifiers": ["names", "email", "phone"],
"PII requiring identifiers": ["addresses", "biographical", "location", "employment", "education", "medical"],
"Always sensitive PII": ["government-id", "financial", "biometric", "login-info"],
}
# Dictionary to track all PII counts
pii_counts_private = {}
for page in private_with_pii:
for pii_type in page.get("pii_types", []):
pii_counts_private[pii_type] = pii_counts_private.get(pii_type, 0) + 1
# Print categorized PII counts
print("\nPII Types in Private Documents:")
# Print each category
for category, pii_types in pii_categories.items():
print(f"\n {category}:")
for pii_type in pii_types:
count = pii_counts_private.get(pii_type, 0)
if count > 0:
print(f" - {pii_type}: {count} ({count/len(private_with_pii)*100:.1f}%)")
# Print any other PII types not in our categories (like "other")
other_pii = [pii_type for pii_type in pii_counts_private.keys() if not any(pii_type in types for types in pii_categories.values())]
if other_pii:
print("\n Other PII types:")
for pii_type in other_pii:
count = pii_counts_private.get(pii_type, 0)
print(f" - {pii_type}: {count} ({count/len(private_with_pii)*100:.1f}%)")
# Print detailed report for private documents with PII
if private_with_pii:
print("\nDetailed Report - Private Documents with PII:")
print("-" * 80)
for i, item in enumerate(private_with_pii, 1):
pdf_path = item['pdf_path']
pdf_page = item['pdf_page']
presigned_url = item.get('presigned_url')
print(f"{i}. PDF: {pdf_path}")
print(f" Page: {pdf_page}")
if presigned_url:
print(f" Presigned URL: {presigned_url}")
print(f" PII Types: {', '.join(item['pii_types'])}")
if item.get("description"):
print(f" Description: {item['description']}")
print("-" * 80)
# Print links to unreadable pages
if annotation_results["cannot_read"]:
print("\nUnreadable Pages:")
print("-" * 80)
for i, item in enumerate(annotation_results["cannot_read"], 1):
pdf_path = item['pdf_path']
pdf_page = item['pdf_page']
presigned_url = item.get('presigned_url')
print(f"{i}. PDF: {pdf_path}")
print(f" Page: {pdf_page}")
if presigned_url:
print(f" Presigned URL: {presigned_url}")
print("-" * 80)
# Print links to inappropriate content
if annotation_results["report_content"]:
print("\nReported Content:")
print("-" * 80)
for i, item in enumerate(annotation_results["report_content"], 1):
pdf_path = item['pdf_path']
pdf_page = item['pdf_page']
presigned_url = item.get('presigned_url')
print(f"{i}. PDF: {pdf_path}")
print(f" Page: {pdf_page}")
if presigned_url:
print(f" Presigned URL: {presigned_url}")
print("-" * 80)
print("\nReport complete.")
def save_results(results, output_dir):
"""Save the results to a JSON file."""
output_path = Path(output_dir) / "autoscan_results.json"
# Convert results to serializable format
serializable_results = []
for pdf_path, page_num, page_text, result_file, annotation, presigned_url in results:
serializable_results.append({
"pdf_path": pdf_path,
"page_num": page_num,
"page_text": page_text,
"result_file": result_file,
"annotation": annotation.dict(),
"presigned_url": presigned_url
})
with open(output_path, "w") as f:
json.dump(serializable_results, f, indent=2)
print(f"Results saved to {output_path}")
def main():
args = parse_args()
# Get OpenAI API key from args or environment
openai_api_key = args.openai_api_key or os.environ.get("OPENAI_API_KEY")
if not openai_api_key:
raise ValueError("OpenAI API key must be provided via --openai_api_key or OPENAI_API_KEY environment variable")
# Set up S3 clients
s3_client = boto3.client("s3")
# Set up PDF S3 client with profile if specified
if args.pdf_profile:
pdf_session = boto3.Session(profile_name=args.pdf_profile)
pdf_s3_client = pdf_session.client("s3")
else:
pdf_s3_client = s3_client
# Create output directory
output_dir = Path(args.output_dir)
output_dir.mkdir(exist_ok=True, parents=True)
# List all result files
print(f"Listing result files in {args.workspace}/results...")
result_files = list_result_files(s3_client, args.workspace)
print(f"Found {len(result_files)} result files")
# Get random pages
random_pages = get_random_pages(s3_client, result_files, args.pages_per_run)
# Process pages with ChatGPT
print(f"Processing {len(random_pages)} pages with ChatGPT...")
all_results = process_pages(
random_pages,
pdf_s3_client,
openai_api_key,
args.openai_model,
args.max_workers
)
# Save results
save_results(all_results, args.output_dir)
# Categorize and report results
categorized_results = categorize_results(all_results)
print_annotation_report(categorized_results)
if __name__ == "__main__":
main()

View File

@ -746,7 +746,7 @@ def create_html_output(random_pages, pdf_s3_client, output_path, workspace_path,
<div class="page-image-wrapper">
<img class="page-image" src="data:image/webp;base64,{base64_image}" alt="PDF Page {page_num}" loading="lazy" />
</div>
<div class="annotation-interface{active_class}" data-id="page-{i}" data-pdf-path="{pdf_path}">
<div class="annotation-interface{active_class}" data-id="page-{i}" data-pdf-path="{pdf_path}" data-pdf-page="{page_num}">
<div class="question-container" id="question1-{i}">
<p class="question-text">Is this document meant for public dissemination? (ex. news article, research paper, etc.)</p>
<span class="btn-group">
@ -822,7 +822,7 @@ def create_html_output(random_pages, pdf_s3_client, output_path, workspace_path,
</p>
</div>
<div class="error">Error: {str(e)}</div>
<div class="annotation-interface{active_class}" data-id="page-{i}" data-pdf-path="{pdf_path}">
<div class="annotation-interface{active_class}" data-id="page-{i}" data-pdf-path="{pdf_path}" data-pdf-page="{page_num}">
<div class="question-container" id="question1-{i}">
<p class="question-text">Is this document meant for public dissemination?</p>
<span class="btn-group">
@ -1039,6 +1039,7 @@ def create_html_output(random_pages, pdf_s3_client, output_path, workspace_path,
const otherPrivateDesc = interfaceDiv.querySelector('#other-pii-private-' + id.split('-')[1])?.value || '';
const pdfPath = interfaceDiv.getAttribute('data-pdf-path');
const pdfPage = interfaceDiv.getAttribute('data-pdf-page');
const datastore = await fetchDatastore() || {};
datastore[id] = {
@ -1047,7 +1048,8 @@ def create_html_output(random_pages, pdf_s3_client, output_path, workspace_path,
privatePiiOptions: privatePiiOptions,
otherPublicDesc: otherPublicDesc,
otherPrivateDesc: otherPrivateDesc,
pdfPath: pdfPath
pdfPath: pdfPath,
pdfPage: pdfPage
};
await putDatastore(datastore);
@ -1300,7 +1302,37 @@ def extract_datastore_url(html_content: str) -> Optional[str]:
return None
def fetch_annotations(tinyhost_link: str) -> Tuple[Dict[str, Any], str]:
def extract_page_number_from_html(html_content: str, page_id: str) -> Optional[int]:
"""Extract PDF page number from HTML content for a specific page_id.
This is a fallback mechanism for older versions of the annotation page
that didn't store the page number in a data attribute.
"""
# Try to find the page number in the "View Cached PDF (page X)" text
# Look for section with this page_id
page_section_pattern = f'<div class="page-container"[^>]*data-index="([^"]*)"[^>]*>.*?<div class="page-info">.*?<a href="[^"]*#page=([0-9]+)"[^>]*>View Cached PDF \\(page ([0-9]+)\\)</a>'
matches = re.finditer(page_section_pattern, html_content, re.DOTALL)
for match in matches:
container_index = match.group(1)
pdf_page_from_url = match.group(2)
pdf_page_from_text = match.group(3)
# Check if this container index matches our page_id (page-X)
if f"page-{container_index}" == page_id:
# Both numbers should be the same, but prefer the one from the URL fragment
try:
return int(pdf_page_from_url)
except (ValueError, TypeError):
try:
return int(pdf_page_from_text)
except (ValueError, TypeError):
pass
return None
def fetch_annotations(tinyhost_link: str) -> Tuple[Dict[str, Any], str, str]:
"""Fetch and parse annotations from a tinyhost link."""
# Request the HTML content
print(f"Fetching annotations from {tinyhost_link}")
@ -1312,7 +1344,7 @@ def fetch_annotations(tinyhost_link: str) -> Tuple[Dict[str, Any], str]:
datastore_url = extract_datastore_url(html_content)
if not datastore_url:
print(f"Could not find datastore URL in {tinyhost_link}")
return {}, tinyhost_link
return {}, tinyhost_link, html_content
# Fetch the datastore content
print(f"Found datastore URL: {datastore_url}")
@ -1320,13 +1352,13 @@ def fetch_annotations(tinyhost_link: str) -> Tuple[Dict[str, Any], str]:
datastore_response = requests.get(datastore_url)
datastore_response.raise_for_status()
annotations = datastore_response.json()
return annotations, tinyhost_link
return annotations, tinyhost_link, html_content
except Exception as e:
print(f"Error fetching datastore from {datastore_url}: {e}")
return {}, tinyhost_link
return {}, tinyhost_link, html_content
def process_annotations(annotations_by_link: List[Tuple[Dict[str, Any], str]]) -> Dict[str, List[Dict[str, Any]]]:
def process_annotations(annotations_by_link: List[Tuple[Dict[str, Any], str, str]]) -> Dict[str, List[Dict[str, Any]]]:
"""Process and categorize annotations by feedback type."""
results = {
"public_document": [],
@ -1337,7 +1369,7 @@ def process_annotations(annotations_by_link: List[Tuple[Dict[str, Any], str]]) -
}
# Process each annotation
for annotations, link in annotations_by_link:
for annotations, link, html_content in annotations_by_link:
for page_id, annotation in annotations.items():
if not annotation or "primaryOption" not in annotation:
results["no_annotation"].append(
@ -1347,12 +1379,28 @@ def process_annotations(annotations_by_link: List[Tuple[Dict[str, Any], str]]) -
primary_option = annotation["primaryOption"]
pdf_path = annotation.get("pdfPath", "Unknown")
# Get PDF page number from annotation data
# This is the actual page number in the PDF that was annotated
pdf_page = None
# First try to get it from the annotation data (for new format)
if annotation.get("pdfPage"):
try:
pdf_page = int(annotation.get("pdfPage"))
except (ValueError, TypeError):
pass
# Fallback: try to extract page number from HTML content (for older format)
if pdf_page is None:
pdf_page = extract_page_number_from_html(html_content, page_id)
# Build a result item based on the new annotation structure
if primary_option == "yes-public":
# Public document - no PII info collected with new flow
results["public_document"].append(
{"page_id": page_id, "link": link, "pdf_path": pdf_path, "pii_types": [], "has_pii": False, "description": ""}
{"page_id": page_id, "link": link, "pdf_path": pdf_path, "pdf_page": pdf_page,
"pii_types": [], "has_pii": False, "description": ""}
)
elif primary_option == "no-public":
@ -1363,7 +1411,8 @@ def process_annotations(annotations_by_link: List[Tuple[Dict[str, Any], str]]) -
if not private_pii_options:
# No PII selected in a private document
results["private_document"].append(
{"page_id": page_id, "link": link, "pdf_path": pdf_path, "pii_types": [], "has_pii": False, "description": ""}
{"page_id": page_id, "link": link, "pdf_path": pdf_path, "pdf_page": pdf_page,
"pii_types": [], "has_pii": False, "description": ""}
)
else:
# PII found in a private document
@ -1372,6 +1421,7 @@ def process_annotations(annotations_by_link: List[Tuple[Dict[str, Any], str]]) -
"page_id": page_id,
"link": link,
"pdf_path": pdf_path,
"pdf_page": pdf_page,
"pii_types": private_pii_options,
"has_pii": True,
"description": other_desc if "other" in private_pii_options else "",
@ -1379,18 +1429,18 @@ def process_annotations(annotations_by_link: List[Tuple[Dict[str, Any], str]]) -
)
elif primary_option == "cannot-read":
results["cannot_read"].append({"page_id": page_id, "link": link, "pdf_path": pdf_path})
results["cannot_read"].append({"page_id": page_id, "link": link, "pdf_path": pdf_path, "pdf_page": pdf_page})
elif primary_option == "report-content":
results["report_content"].append({"page_id": page_id, "link": link, "pdf_path": pdf_path})
results["report_content"].append({"page_id": page_id, "link": link, "pdf_path": pdf_path, "pdf_page": pdf_page})
else:
results["no_annotation"].append({"page_id": page_id, "link": link, "pdf_path": pdf_path})
results["no_annotation"].append({"page_id": page_id, "link": link, "pdf_path": pdf_path, "pdf_page": pdf_page})
return results
def print_annotation_report(annotation_results: Dict[str, List[Dict[str, Any]]]):
def print_annotation_report(annotation_results: Dict[str, List[Dict[str, Any]]], pdf_s3_client=None):
"""Print a summary report of annotations."""
total_pages = sum(len(items) for items in annotation_results.values())
@ -1479,9 +1529,24 @@ def print_annotation_report(annotation_results: Dict[str, List[Dict[str, Any]]])
print("\nDetailed Report - Private Documents with PII:")
print("-" * 80)
for i, item in enumerate(private_with_pii, 1):
print(f"{i}. PDF: {item['pdf_path']}")
print(f" Page ID: {item['page_id']}")
print(f" Link: {item['link']}#{item['page_id']}")
pdf_path = item['pdf_path']
page_id = item['page_id']
# Get the actual PDF page number
pdf_page = item.get('pdf_page')
# Generate presigned URL with PDF page number if client is available
presigned_url = None
if pdf_s3_client and pdf_path.startswith("s3://"):
presigned_url = create_presigned_url(pdf_s3_client, pdf_path)
if presigned_url and pdf_page is not None:
presigned_url += f"#page={pdf_page}"
print(f"{i}. PDF: {pdf_path}")
print(f" Page ID: {page_id}")
print(f" Link: {item['link']}#{page_id}")
if presigned_url:
print(f" Presigned URL: {presigned_url}")
print(f" PII Types: {', '.join(item['pii_types'])}")
if item.get("description"):
print(f" Description: {item['description']}")
@ -1505,21 +1570,28 @@ def read_and_process_results(args):
return
print(f"Found {len(links)} tinyhost links in {args.read_results}")
# Set up PDF S3 client with profile if specified
if args.pdf_profile:
pdf_session = boto3.Session(profile_name=args.pdf_profile)
pdf_s3_client = pdf_session.client("s3")
else:
pdf_s3_client = boto3.client("s3")
# Fetch and process annotations
annotations_by_link = []
for link in tqdm(links, desc="Fetching annotations"):
try:
annotations, link_url = fetch_annotations(link)
annotations_by_link.append((annotations, link_url))
annotations, link_url, html_content = fetch_annotations(link)
annotations_by_link.append((annotations, link_url, html_content))
except Exception as e:
print(f"Error processing {link}: {e}")
# Process and categorize annotations
annotation_results = process_annotations(annotations_by_link)
# Print report
print_annotation_report(annotation_results)
# Print report with presigned URLs
print_annotation_report(annotation_results, pdf_s3_client)
# Save detailed report to file
output_file = Path(args.output_dir) / "annotation_report.csv"
@ -1527,10 +1599,25 @@ def read_and_process_results(args):
with open(output_file, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["Category", "PDF Path", "Page ID", "Link", "Document Type", "PII Types", "Description"])
writer.writerow(["Category", "PDF Path", "Page ID", "Link", "Presigned URL", "Document Type", "PII Types", "Description"])
for category, items in annotation_results.items():
for item in items:
pdf_path = item["pdf_path"]
page_id = item["page_id"]
# Get the actual PDF page number
pdf_page = item.get("pdf_page")
# Generate presigned URL with the PDF page number
presigned_url = ""
if pdf_path.startswith("s3://"):
url = create_presigned_url(pdf_s3_client, pdf_path)
if url and pdf_page is not None:
presigned_url = f"{url}#page={pdf_page}"
elif url:
presigned_url = url
if category == "public_document":
doc_type = "Public"
pii_types = ", ".join(item.get("pii_types", []))
@ -1544,7 +1631,8 @@ def read_and_process_results(args):
pii_types = ""
description = ""
writer.writerow([category, item["pdf_path"], item["page_id"], f"{item['link']}#{item['page_id']}", doc_type, pii_types, description])
writer.writerow([category, item["pdf_path"], item["page_id"], f"{item['link']}#{item['page_id']}",
presigned_url, doc_type, pii_types, description])
print(f"Report saved to {output_file}")