From 743e48361cc0531df1839a994a0ceb593e1869f7 Mon Sep 17 00:00:00 2001 From: Jake Poznanski Date: Thu, 9 Oct 2025 19:43:22 +0000 Subject: [PATCH] New claude sonnet, going to add multilinguage tests to olmocr bench 1025 internal version --- olmocr/bench/miners/mine_multilingual_gpt.py | 5 +- olmocr/bench/synth/mine_html_templates.py | 4 +- olmocr/data/repackage_olmocrmix.py | 353 +++++++++++++++++++ 3 files changed, 358 insertions(+), 4 deletions(-) create mode 100644 olmocr/data/repackage_olmocrmix.py diff --git a/olmocr/bench/miners/mine_multilingual_gpt.py b/olmocr/bench/miners/mine_multilingual_gpt.py index 09a9f42..2c25411 100644 --- a/olmocr/bench/miners/mine_multilingual_gpt.py +++ b/olmocr/bench/miners/mine_multilingual_gpt.py @@ -26,6 +26,7 @@ from tqdm import tqdm from olmocr.data.renderpdf import render_pdf_to_base64png from olmocr.filter import PdfFilter +from lingua import Language TARGET_IMAGE_DIM = 1024 @@ -146,7 +147,7 @@ def process_pdf(s3_path: str, temp_dir: str, output_dir: str, api_key: str) -> b if not download_pdf_from_s3(s3_path, local_pdf_path): return False - pdf_filter = PdfFilter() + pdf_filter = PdfFilter(languages_to_keep=Language.all()) if pdf_filter.filter_out_pdf(local_pdf_path): print(f"Filtering out {pdf_filename}") @@ -287,7 +288,7 @@ def main(): print(f"Reached maximum number of PDFs with tables ({args.max_pdfs}), stopping") break - print(f"Found and copied {table_pdfs_found} PDFs with tables to {args.output_dir}") + print(f"Found and copied {table_pdfs_found} PDFs to {args.output_dir}") if __name__ == "__main__": diff --git a/olmocr/bench/synth/mine_html_templates.py b/olmocr/bench/synth/mine_html_templates.py index 5ee5d19..59723a3 100644 --- a/olmocr/bench/synth/mine_html_templates.py +++ b/olmocr/bench/synth/mine_html_templates.py @@ -341,7 +341,7 @@ async def generate_html_from_image(client, image_base64): try: # Step 1: Initial analysis and column detection analysis_response = await client.messages.create( - model="claude-sonnet-4-20250514", + model="claude-sonnet-4-5-20250929", max_tokens=2000, temperature=0.1, messages=[ @@ -375,7 +375,7 @@ async def generate_html_from_image(client, image_base64): # Step 2: Initial HTML generation with detailed layout instructions initial_response = await client.messages.create( - model="claude-sonnet-4-20250514", + model="claude-sonnet-4-5-20250929", max_tokens=6000, temperature=0.2, messages=[ diff --git a/olmocr/data/repackage_olmocrmix.py b/olmocr/data/repackage_olmocrmix.py new file mode 100644 index 0000000..e4e3dc1 --- /dev/null +++ b/olmocr/data/repackage_olmocrmix.py @@ -0,0 +1,353 @@ +#!/usr/bin/env python3 +""" +Repackage locally processed OLMoCR-mix style data back into parquet metadata and PDF tarballs. + +Given a directory that mirrors the layout produced by prepare_olmocrmix.py (folders of markdown/PDF +pairs), this script rebuilds a HuggingFace-style payload by: + * walking the processed directory to recover document ids, metadata, and natural text + * emitting a parquet file whose index/columns match what prepare_olmocrmix.py expects + * chunking PDFs into .tar.gz archives that stay under a user-configurable size (default 1 GiB) + +The parquet rows contain the `response` JSON blob expected by downstream tooling, along with helper +columns (`doc_id`, `page_number`, `pdf_relpath`, `url`, etc.) that can be useful when mirroring to +remote storage. +""" + +from __future__ import annotations + +import argparse +import json +import tarfile +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, Iterator, List, Optional, Tuple + +import pandas as pd +import yaml +from tqdm import tqdm + +DEFAULT_MAX_TAR_BYTES = 1_073_741_824 # 1 GiB + + +@dataclass(slots=True) +class DocumentRecord: + doc_id: str + markdown_path: Path + pdf_path: Path + response_json: str + pdf_size: int + page_number: Optional[int] + url: Optional[str] + pdf_relpath: str + + +def parse_front_matter(markdown_text: str) -> Tuple[Dict[str, object], str]: + """ + Parse YAML front matter from a markdown string. + + Returns a tuple of (front_matter_dict, body_text). + """ + if not markdown_text.startswith("---"): + return {}, markdown_text.strip() + + closing_idx = markdown_text.find("\n---", 3) + if closing_idx == -1: + return {}, markdown_text.strip() + + fm_block = markdown_text[3:closing_idx] + remainder = markdown_text[closing_idx + 4 :] + + front_matter = yaml.safe_load(fm_block) or {} + # Preserve internal spacing but trim a single leading newline if present. + if remainder.startswith("\n"): + remainder = remainder[1:] + return front_matter, remainder + + +def infer_doc_id(md_path: Path, processed_root: Path) -> str: + """Reconstruct the doc_id used in parquet/index space.""" + rel = md_path.relative_to(processed_root) + if len(rel.parts) < 2: + stem = rel.stem + prefix = rel.stem[:4] + else: + prefix = rel.parts[0] + stem = Path(rel.parts[-1]).stem + return f"{prefix}{stem}" + + +def infer_pdf_path(md_path: Path, doc_id: str, pdf_root: Optional[Path]) -> Path: + """Locate the PDF file corresponding to the markdown doc.""" + pdf_candidate = md_path.with_suffix(".pdf") + if pdf_candidate.exists(): + return pdf_candidate.resolve() + + if pdf_root is not None: + alt_path = pdf_root / f"{doc_id}.pdf" + if alt_path.exists(): + return alt_path.resolve() + + raise FileNotFoundError(f"No PDF found for {md_path}") + + +def normalize_response_payload(front_matter: Dict[str, object], body_text: str) -> Dict[str, object]: + """Merge parsed fields with the natural text payload.""" + payload = dict(front_matter) + text = body_text if body_text.strip() else None + + payload.setdefault("primary_language", None) + payload.setdefault("is_rotation_valid", True) + payload.setdefault("rotation_correction", 0) + payload.setdefault("is_table", False) + payload.setdefault("is_diagram", False) + payload["natural_text"] = text + return payload + + +def guess_url(front_matter: Dict[str, object], doc_id: str, source_url_template: Optional[str]) -> Optional[str]: + """ + Infer a URL for the document. + + Priority: + 1. Front matter fields named url/source_url/pdf_url + 2. Provided template with placeholders {doc_id}, {prefix}, {base_id} + """ + for key in ("url", "source_url", "pdf_url", "uri"): + value = front_matter.get(key) + if isinstance(value, str) and value: + return value + + if source_url_template: + prefix = doc_id[:4] + base_id = doc_id[4:] + base_pdf = base_id.rsplit("-", 1)[0] if "-" in base_id else base_id + return source_url_template.format(doc_id=doc_id, prefix=prefix, base_id=base_id, base_pdf=base_pdf) + + return None + + +def parse_page_number(doc_id: str, front_matter: Dict[str, object]) -> Optional[int]: + """Extract page number from front matter or doc_id suffix.""" + if "page_number" in front_matter: + value = front_matter["page_number"] + try: + return int(value) + except (TypeError, ValueError): + pass + + if "-" in doc_id: + suffix = doc_id.rsplit("-", 1)[-1] + try: + return int(suffix) + except ValueError: + return None + return None + + +def collect_documents( + processed_dir: Path, + pdf_root: Optional[Path], + url_template: Optional[str], + strict: bool, +) -> List[DocumentRecord]: + """Scan processed markdown/pdf pairs into DocumentRecord objects.""" + records: List[DocumentRecord] = [] + md_files = sorted(processed_dir.rglob("*.md")) + + for md_path in tqdm(md_files, desc="Scanning markdown files"): + try: + doc_id = infer_doc_id(md_path, processed_dir) + pdf_path = infer_pdf_path(md_path, doc_id, pdf_root) + markdown_text = md_path.read_text(encoding="utf-8") + front_matter, body_text = parse_front_matter(markdown_text) + response_payload = normalize_response_payload(front_matter, body_text) + response_json = json.dumps(response_payload, ensure_ascii=False) + pdf_size = pdf_path.stat().st_size + page_number = parse_page_number(doc_id, front_matter) + url = guess_url(front_matter, doc_id, url_template) + pdf_relpath = f"{doc_id}.pdf" + + records.append( + DocumentRecord( + doc_id=doc_id, + markdown_path=md_path, + pdf_path=pdf_path, + response_json=response_json, + pdf_size=pdf_size, + page_number=page_number, + url=url, + pdf_relpath=pdf_relpath, + ) + ) + except Exception as exc: + if strict: + raise + tqdm.write(f"[WARN] Skipping {md_path}: {exc}") + + return records + + +def write_parquet(records: List[DocumentRecord], parquet_path: Path, compression: str) -> None: + """Emit the textual payload into a parquet file.""" + if not records: + raise RuntimeError("No records to write into parquet") + + data = { + "url": [rec.url for rec in records], + "page_number": [rec.page_number for rec in records], + "response": [rec.response_json for rec in records], + "pdf_relpath": [rec.pdf_relpath for rec in records], + "markdown_path": [str(rec.markdown_path) for rec in records], + } + index = [rec.doc_id for rec in records] + df = pd.DataFrame(data, index=index) + df.index.name = "id" + + parquet_path.parent.mkdir(parents=True, exist_ok=True) + df.to_parquet(parquet_path, compression=compression) + + +def chunk_records_by_size(records: List[DocumentRecord], max_bytes: int) -> Iterator[List[DocumentRecord]]: + """Yield batches of records whose summed PDF sizes stay under max_bytes.""" + batch: List[DocumentRecord] = [] + batch_size = 0 + overhead = 1024 # rough tar header allowance per entry + + for record in records: + entry_size = record.pdf_size + overhead + if entry_size > max_bytes: + raise RuntimeError(f"Single PDF {record.pdf_path} exceeds max tar size {max_bytes} bytes") + + if batch and batch_size + entry_size > max_bytes: + yield batch + batch = [] + batch_size = 0 + + batch.append(record) + batch_size += entry_size + + if batch: + yield batch + + +def write_pdf_tarballs(records: List[DocumentRecord], pdf_dir: Path, chunk_prefix: str, max_bytes: int, manifest_path: Path) -> None: + """Bundle PDFs into .tar.gz archives under the size cap.""" + pdf_dir.mkdir(parents=True, exist_ok=True) + manifest_path.parent.mkdir(parents=True, exist_ok=True) + + manifest_rows: List[Dict[str, str]] = [] + batches = chunk_records_by_size(records, max_bytes) + + for chunk_idx, batch in enumerate(batches): + tar_name = f"{chunk_prefix}_{chunk_idx:05d}.tar.gz" + tar_path = pdf_dir / tar_name + with tarfile.open(tar_path, "w:gz", dereference=True) as tar: + for rec in batch: + tar.add(rec.pdf_path, arcname=f"{rec.doc_id}.pdf", recursive=False) + manifest_rows.append({"doc_id": rec.doc_id, "chunk": tar_name, "arcname": f"{rec.doc_id}.pdf"}) + + actual_size = tar_path.stat().st_size + if actual_size > max_bytes: + raise RuntimeError(f"{tar_path} exceeded size cap ({actual_size} bytes > {max_bytes} bytes)") + + with manifest_path.open("w", encoding="utf-8") as manifest_file: + for row in manifest_rows: + manifest_file.write(json.dumps(row) + "\n") + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Repackage processed olmocr-mix data into parquet + PDF tarballs.") + parser.add_argument("--processed-dir", required=True, type=Path, help="Directory with markdown/PDF pairs (output of prepare_olmocrmix.py).") + parser.add_argument("--subset", required=True, help="Dataset subset identifier (e.g. 00_documents).") + parser.add_argument("--split", required=True, help="Dataset split identifier (e.g. train_s2pdf).") + parser.add_argument( + "--output-dir", + required=True, + type=Path, + help="Destination directory for the parquet file and pdf tarballs.", + ) + parser.add_argument( + "--parquet-name", + default=None, + help="Filename for the generated parquet file (defaults to {subset}_{split}.parquet).", + ) + parser.add_argument( + "--pdf-chunk-dir", + default="pdf_chunks", + help="Name of the subdirectory (under output-dir) to place PDF tarballs in.", + ) + parser.add_argument( + "--pdf-chunk-prefix", + default=None, + help="Prefix for generated tarball filenames (defaults to {subset}_{split}).", + ) + parser.add_argument( + "--max-tar-size-bytes", + type=int, + default=DEFAULT_MAX_TAR_BYTES, + help="Maximum uncompressed size (in bytes) to pack into a single tarball (default 1 GiB).", + ) + parser.add_argument( + "--pdf-root", + type=Path, + default=None, + help="Optional directory containing {doc_id}.pdf files if they are not alongside the markdown.", + ) + parser.add_argument( + "--url-template", + type=str, + default=None, + help="Optional template to synthesize URLs, e.g. 's3://bucket/{prefix}/{base_pdf}.pdf'.", + ) + parser.add_argument( + "--parquet-compression", + default="snappy", + help="Compression codec passed to pandas.to_parquet (default: snappy).", + ) + parser.add_argument( + "--manifest-name", + default="pdf_chunk_manifest.jsonl", + help="Filename for the emitted chunk manifest (stored under output-dir).", + ) + parser.add_argument("--strict", action="store_true", help="Fail immediately when a markdown/PDF pair cannot be processed.") + return parser.parse_args() + + +def build_dataset_tag(subset: str, split: str) -> str: + """Normalize subset/split into a filesystem-friendly tag.""" + return f"{subset.strip().replace('/', '_')}_{split.strip().replace('/', '_')}" + + +def main() -> None: + args = parse_args() + + processed_dir = args.processed_dir.expanduser().resolve() + if not processed_dir.exists(): + raise FileNotFoundError(f"Processed directory not found: {processed_dir}") + + pdf_root = args.pdf_root.expanduser().resolve() if args.pdf_root else None + output_dir = args.output_dir.expanduser().resolve() + dataset_tag = build_dataset_tag(args.subset, args.split) + parquet_name = args.parquet_name or f"{dataset_tag}.parquet" + chunk_prefix = args.pdf_chunk_prefix or dataset_tag + parquet_path = output_dir / parquet_name + pdf_dir = output_dir / args.pdf_chunk_dir + manifest_path = output_dir / args.manifest_name + + records = collect_documents(processed_dir, pdf_root, args.url_template, args.strict) + if not records: + raise RuntimeError("No markdown/PDF pairs discovered - nothing to package.") + + records.sort(key=lambda rec: rec.doc_id) + + write_parquet(records, parquet_path, args.parquet_compression) + write_pdf_tarballs(records, pdf_dir, chunk_prefix, args.max_tar_size_bytes, manifest_path) + + print(f"Wrote parquet: {parquet_path}") + print(f"Wrote PDF tarballs to: {pdf_dir}") + print(f"Wrote manifest: {manifest_path}") + print(f"Total documents packaged: {len(records)}") + + +if __name__ == "__main__": + main()