mirror of
https://github.com/allenai/olmocr.git
synced 2025-10-27 16:12:13 +00:00
New claude sonnet, going to add multilinguage tests to olmocr bench 1025 internal version
This commit is contained in:
parent
da4ada33a0
commit
743e48361c
@ -26,6 +26,7 @@ from tqdm import tqdm
|
||||
|
||||
from olmocr.data.renderpdf import render_pdf_to_base64png
|
||||
from olmocr.filter import PdfFilter
|
||||
from lingua import Language
|
||||
|
||||
TARGET_IMAGE_DIM = 1024
|
||||
|
||||
@ -146,7 +147,7 @@ def process_pdf(s3_path: str, temp_dir: str, output_dir: str, api_key: str) -> b
|
||||
if not download_pdf_from_s3(s3_path, local_pdf_path):
|
||||
return False
|
||||
|
||||
pdf_filter = PdfFilter()
|
||||
pdf_filter = PdfFilter(languages_to_keep=Language.all())
|
||||
|
||||
if pdf_filter.filter_out_pdf(local_pdf_path):
|
||||
print(f"Filtering out {pdf_filename}")
|
||||
@ -287,7 +288,7 @@ def main():
|
||||
print(f"Reached maximum number of PDFs with tables ({args.max_pdfs}), stopping")
|
||||
break
|
||||
|
||||
print(f"Found and copied {table_pdfs_found} PDFs with tables to {args.output_dir}")
|
||||
print(f"Found and copied {table_pdfs_found} PDFs to {args.output_dir}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@ -341,7 +341,7 @@ async def generate_html_from_image(client, image_base64):
|
||||
try:
|
||||
# Step 1: Initial analysis and column detection
|
||||
analysis_response = await client.messages.create(
|
||||
model="claude-sonnet-4-20250514",
|
||||
model="claude-sonnet-4-5-20250929",
|
||||
max_tokens=2000,
|
||||
temperature=0.1,
|
||||
messages=[
|
||||
@ -375,7 +375,7 @@ async def generate_html_from_image(client, image_base64):
|
||||
|
||||
# Step 2: Initial HTML generation with detailed layout instructions
|
||||
initial_response = await client.messages.create(
|
||||
model="claude-sonnet-4-20250514",
|
||||
model="claude-sonnet-4-5-20250929",
|
||||
max_tokens=6000,
|
||||
temperature=0.2,
|
||||
messages=[
|
||||
|
||||
353
olmocr/data/repackage_olmocrmix.py
Normal file
353
olmocr/data/repackage_olmocrmix.py
Normal file
@ -0,0 +1,353 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Repackage locally processed OLMoCR-mix style data back into parquet metadata and PDF tarballs.
|
||||
|
||||
Given a directory that mirrors the layout produced by prepare_olmocrmix.py (folders of markdown/PDF
|
||||
pairs), this script rebuilds a HuggingFace-style payload by:
|
||||
* walking the processed directory to recover document ids, metadata, and natural text
|
||||
* emitting a parquet file whose index/columns match what prepare_olmocrmix.py expects
|
||||
* chunking PDFs into .tar.gz archives that stay under a user-configurable size (default 1 GiB)
|
||||
|
||||
The parquet rows contain the `response` JSON blob expected by downstream tooling, along with helper
|
||||
columns (`doc_id`, `page_number`, `pdf_relpath`, `url`, etc.) that can be useful when mirroring to
|
||||
remote storage.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import tarfile
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Dict, Iterator, List, Optional, Tuple
|
||||
|
||||
import pandas as pd
|
||||
import yaml
|
||||
from tqdm import tqdm
|
||||
|
||||
DEFAULT_MAX_TAR_BYTES = 1_073_741_824 # 1 GiB
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class DocumentRecord:
|
||||
doc_id: str
|
||||
markdown_path: Path
|
||||
pdf_path: Path
|
||||
response_json: str
|
||||
pdf_size: int
|
||||
page_number: Optional[int]
|
||||
url: Optional[str]
|
||||
pdf_relpath: str
|
||||
|
||||
|
||||
def parse_front_matter(markdown_text: str) -> Tuple[Dict[str, object], str]:
|
||||
"""
|
||||
Parse YAML front matter from a markdown string.
|
||||
|
||||
Returns a tuple of (front_matter_dict, body_text).
|
||||
"""
|
||||
if not markdown_text.startswith("---"):
|
||||
return {}, markdown_text.strip()
|
||||
|
||||
closing_idx = markdown_text.find("\n---", 3)
|
||||
if closing_idx == -1:
|
||||
return {}, markdown_text.strip()
|
||||
|
||||
fm_block = markdown_text[3:closing_idx]
|
||||
remainder = markdown_text[closing_idx + 4 :]
|
||||
|
||||
front_matter = yaml.safe_load(fm_block) or {}
|
||||
# Preserve internal spacing but trim a single leading newline if present.
|
||||
if remainder.startswith("\n"):
|
||||
remainder = remainder[1:]
|
||||
return front_matter, remainder
|
||||
|
||||
|
||||
def infer_doc_id(md_path: Path, processed_root: Path) -> str:
|
||||
"""Reconstruct the doc_id used in parquet/index space."""
|
||||
rel = md_path.relative_to(processed_root)
|
||||
if len(rel.parts) < 2:
|
||||
stem = rel.stem
|
||||
prefix = rel.stem[:4]
|
||||
else:
|
||||
prefix = rel.parts[0]
|
||||
stem = Path(rel.parts[-1]).stem
|
||||
return f"{prefix}{stem}"
|
||||
|
||||
|
||||
def infer_pdf_path(md_path: Path, doc_id: str, pdf_root: Optional[Path]) -> Path:
|
||||
"""Locate the PDF file corresponding to the markdown doc."""
|
||||
pdf_candidate = md_path.with_suffix(".pdf")
|
||||
if pdf_candidate.exists():
|
||||
return pdf_candidate.resolve()
|
||||
|
||||
if pdf_root is not None:
|
||||
alt_path = pdf_root / f"{doc_id}.pdf"
|
||||
if alt_path.exists():
|
||||
return alt_path.resolve()
|
||||
|
||||
raise FileNotFoundError(f"No PDF found for {md_path}")
|
||||
|
||||
|
||||
def normalize_response_payload(front_matter: Dict[str, object], body_text: str) -> Dict[str, object]:
|
||||
"""Merge parsed fields with the natural text payload."""
|
||||
payload = dict(front_matter)
|
||||
text = body_text if body_text.strip() else None
|
||||
|
||||
payload.setdefault("primary_language", None)
|
||||
payload.setdefault("is_rotation_valid", True)
|
||||
payload.setdefault("rotation_correction", 0)
|
||||
payload.setdefault("is_table", False)
|
||||
payload.setdefault("is_diagram", False)
|
||||
payload["natural_text"] = text
|
||||
return payload
|
||||
|
||||
|
||||
def guess_url(front_matter: Dict[str, object], doc_id: str, source_url_template: Optional[str]) -> Optional[str]:
|
||||
"""
|
||||
Infer a URL for the document.
|
||||
|
||||
Priority:
|
||||
1. Front matter fields named url/source_url/pdf_url
|
||||
2. Provided template with placeholders {doc_id}, {prefix}, {base_id}
|
||||
"""
|
||||
for key in ("url", "source_url", "pdf_url", "uri"):
|
||||
value = front_matter.get(key)
|
||||
if isinstance(value, str) and value:
|
||||
return value
|
||||
|
||||
if source_url_template:
|
||||
prefix = doc_id[:4]
|
||||
base_id = doc_id[4:]
|
||||
base_pdf = base_id.rsplit("-", 1)[0] if "-" in base_id else base_id
|
||||
return source_url_template.format(doc_id=doc_id, prefix=prefix, base_id=base_id, base_pdf=base_pdf)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def parse_page_number(doc_id: str, front_matter: Dict[str, object]) -> Optional[int]:
|
||||
"""Extract page number from front matter or doc_id suffix."""
|
||||
if "page_number" in front_matter:
|
||||
value = front_matter["page_number"]
|
||||
try:
|
||||
return int(value)
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
|
||||
if "-" in doc_id:
|
||||
suffix = doc_id.rsplit("-", 1)[-1]
|
||||
try:
|
||||
return int(suffix)
|
||||
except ValueError:
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
def collect_documents(
|
||||
processed_dir: Path,
|
||||
pdf_root: Optional[Path],
|
||||
url_template: Optional[str],
|
||||
strict: bool,
|
||||
) -> List[DocumentRecord]:
|
||||
"""Scan processed markdown/pdf pairs into DocumentRecord objects."""
|
||||
records: List[DocumentRecord] = []
|
||||
md_files = sorted(processed_dir.rglob("*.md"))
|
||||
|
||||
for md_path in tqdm(md_files, desc="Scanning markdown files"):
|
||||
try:
|
||||
doc_id = infer_doc_id(md_path, processed_dir)
|
||||
pdf_path = infer_pdf_path(md_path, doc_id, pdf_root)
|
||||
markdown_text = md_path.read_text(encoding="utf-8")
|
||||
front_matter, body_text = parse_front_matter(markdown_text)
|
||||
response_payload = normalize_response_payload(front_matter, body_text)
|
||||
response_json = json.dumps(response_payload, ensure_ascii=False)
|
||||
pdf_size = pdf_path.stat().st_size
|
||||
page_number = parse_page_number(doc_id, front_matter)
|
||||
url = guess_url(front_matter, doc_id, url_template)
|
||||
pdf_relpath = f"{doc_id}.pdf"
|
||||
|
||||
records.append(
|
||||
DocumentRecord(
|
||||
doc_id=doc_id,
|
||||
markdown_path=md_path,
|
||||
pdf_path=pdf_path,
|
||||
response_json=response_json,
|
||||
pdf_size=pdf_size,
|
||||
page_number=page_number,
|
||||
url=url,
|
||||
pdf_relpath=pdf_relpath,
|
||||
)
|
||||
)
|
||||
except Exception as exc:
|
||||
if strict:
|
||||
raise
|
||||
tqdm.write(f"[WARN] Skipping {md_path}: {exc}")
|
||||
|
||||
return records
|
||||
|
||||
|
||||
def write_parquet(records: List[DocumentRecord], parquet_path: Path, compression: str) -> None:
|
||||
"""Emit the textual payload into a parquet file."""
|
||||
if not records:
|
||||
raise RuntimeError("No records to write into parquet")
|
||||
|
||||
data = {
|
||||
"url": [rec.url for rec in records],
|
||||
"page_number": [rec.page_number for rec in records],
|
||||
"response": [rec.response_json for rec in records],
|
||||
"pdf_relpath": [rec.pdf_relpath for rec in records],
|
||||
"markdown_path": [str(rec.markdown_path) for rec in records],
|
||||
}
|
||||
index = [rec.doc_id for rec in records]
|
||||
df = pd.DataFrame(data, index=index)
|
||||
df.index.name = "id"
|
||||
|
||||
parquet_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
df.to_parquet(parquet_path, compression=compression)
|
||||
|
||||
|
||||
def chunk_records_by_size(records: List[DocumentRecord], max_bytes: int) -> Iterator[List[DocumentRecord]]:
|
||||
"""Yield batches of records whose summed PDF sizes stay under max_bytes."""
|
||||
batch: List[DocumentRecord] = []
|
||||
batch_size = 0
|
||||
overhead = 1024 # rough tar header allowance per entry
|
||||
|
||||
for record in records:
|
||||
entry_size = record.pdf_size + overhead
|
||||
if entry_size > max_bytes:
|
||||
raise RuntimeError(f"Single PDF {record.pdf_path} exceeds max tar size {max_bytes} bytes")
|
||||
|
||||
if batch and batch_size + entry_size > max_bytes:
|
||||
yield batch
|
||||
batch = []
|
||||
batch_size = 0
|
||||
|
||||
batch.append(record)
|
||||
batch_size += entry_size
|
||||
|
||||
if batch:
|
||||
yield batch
|
||||
|
||||
|
||||
def write_pdf_tarballs(records: List[DocumentRecord], pdf_dir: Path, chunk_prefix: str, max_bytes: int, manifest_path: Path) -> None:
|
||||
"""Bundle PDFs into .tar.gz archives under the size cap."""
|
||||
pdf_dir.mkdir(parents=True, exist_ok=True)
|
||||
manifest_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
manifest_rows: List[Dict[str, str]] = []
|
||||
batches = chunk_records_by_size(records, max_bytes)
|
||||
|
||||
for chunk_idx, batch in enumerate(batches):
|
||||
tar_name = f"{chunk_prefix}_{chunk_idx:05d}.tar.gz"
|
||||
tar_path = pdf_dir / tar_name
|
||||
with tarfile.open(tar_path, "w:gz", dereference=True) as tar:
|
||||
for rec in batch:
|
||||
tar.add(rec.pdf_path, arcname=f"{rec.doc_id}.pdf", recursive=False)
|
||||
manifest_rows.append({"doc_id": rec.doc_id, "chunk": tar_name, "arcname": f"{rec.doc_id}.pdf"})
|
||||
|
||||
actual_size = tar_path.stat().st_size
|
||||
if actual_size > max_bytes:
|
||||
raise RuntimeError(f"{tar_path} exceeded size cap ({actual_size} bytes > {max_bytes} bytes)")
|
||||
|
||||
with manifest_path.open("w", encoding="utf-8") as manifest_file:
|
||||
for row in manifest_rows:
|
||||
manifest_file.write(json.dumps(row) + "\n")
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(description="Repackage processed olmocr-mix data into parquet + PDF tarballs.")
|
||||
parser.add_argument("--processed-dir", required=True, type=Path, help="Directory with markdown/PDF pairs (output of prepare_olmocrmix.py).")
|
||||
parser.add_argument("--subset", required=True, help="Dataset subset identifier (e.g. 00_documents).")
|
||||
parser.add_argument("--split", required=True, help="Dataset split identifier (e.g. train_s2pdf).")
|
||||
parser.add_argument(
|
||||
"--output-dir",
|
||||
required=True,
|
||||
type=Path,
|
||||
help="Destination directory for the parquet file and pdf tarballs.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--parquet-name",
|
||||
default=None,
|
||||
help="Filename for the generated parquet file (defaults to {subset}_{split}.parquet).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--pdf-chunk-dir",
|
||||
default="pdf_chunks",
|
||||
help="Name of the subdirectory (under output-dir) to place PDF tarballs in.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--pdf-chunk-prefix",
|
||||
default=None,
|
||||
help="Prefix for generated tarball filenames (defaults to {subset}_{split}).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--max-tar-size-bytes",
|
||||
type=int,
|
||||
default=DEFAULT_MAX_TAR_BYTES,
|
||||
help="Maximum uncompressed size (in bytes) to pack into a single tarball (default 1 GiB).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--pdf-root",
|
||||
type=Path,
|
||||
default=None,
|
||||
help="Optional directory containing {doc_id}.pdf files if they are not alongside the markdown.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--url-template",
|
||||
type=str,
|
||||
default=None,
|
||||
help="Optional template to synthesize URLs, e.g. 's3://bucket/{prefix}/{base_pdf}.pdf'.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--parquet-compression",
|
||||
default="snappy",
|
||||
help="Compression codec passed to pandas.to_parquet (default: snappy).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--manifest-name",
|
||||
default="pdf_chunk_manifest.jsonl",
|
||||
help="Filename for the emitted chunk manifest (stored under output-dir).",
|
||||
)
|
||||
parser.add_argument("--strict", action="store_true", help="Fail immediately when a markdown/PDF pair cannot be processed.")
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def build_dataset_tag(subset: str, split: str) -> str:
|
||||
"""Normalize subset/split into a filesystem-friendly tag."""
|
||||
return f"{subset.strip().replace('/', '_')}_{split.strip().replace('/', '_')}"
|
||||
|
||||
|
||||
def main() -> None:
|
||||
args = parse_args()
|
||||
|
||||
processed_dir = args.processed_dir.expanduser().resolve()
|
||||
if not processed_dir.exists():
|
||||
raise FileNotFoundError(f"Processed directory not found: {processed_dir}")
|
||||
|
||||
pdf_root = args.pdf_root.expanduser().resolve() if args.pdf_root else None
|
||||
output_dir = args.output_dir.expanduser().resolve()
|
||||
dataset_tag = build_dataset_tag(args.subset, args.split)
|
||||
parquet_name = args.parquet_name or f"{dataset_tag}.parquet"
|
||||
chunk_prefix = args.pdf_chunk_prefix or dataset_tag
|
||||
parquet_path = output_dir / parquet_name
|
||||
pdf_dir = output_dir / args.pdf_chunk_dir
|
||||
manifest_path = output_dir / args.manifest_name
|
||||
|
||||
records = collect_documents(processed_dir, pdf_root, args.url_template, args.strict)
|
||||
if not records:
|
||||
raise RuntimeError("No markdown/PDF pairs discovered - nothing to package.")
|
||||
|
||||
records.sort(key=lambda rec: rec.doc_id)
|
||||
|
||||
write_parquet(records, parquet_path, args.parquet_compression)
|
||||
write_pdf_tarballs(records, pdf_dir, chunk_prefix, args.max_tar_size_bytes, manifest_path)
|
||||
|
||||
print(f"Wrote parquet: {parquet_path}")
|
||||
print(f"Wrote PDF tarballs to: {pdf_dir}")
|
||||
print(f"Wrote manifest: {manifest_path}")
|
||||
print(f"Total documents packaged: {len(records)}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
x
Reference in New Issue
Block a user