olmocr/pdelfin/prompts/anchor.py

351 lines
12 KiB
Python

# This file generates anchor text in a variety of different ways
# The goal here is to generate a bit of text which can be used to help prompt a VLM
# to better understand a document
# pdftotext
# pdfium
# pymupdf
# pypdf
# coherency score best of these three
import subprocess
import re
import ftfy
from dataclasses import dataclass
from typing import Literal, List
import pypdfium2 as pdfium
import pymupdf
from pdelfin.filter.coherency import get_document_coherency
from pypdf import PdfReader
from pypdf.generic import RectangleObject
from pdelfin.prompts._adv_anchor import mult
def get_anchor_text(local_pdf_path: str, page: int, pdf_engine: Literal["pdftotext", "pdfium", "pymupdf", "pypdf", "topcoherency", "pdfreport"]) -> str:
assert page > 0, "Pages are 1-indexed in pdf-land"
if pdf_engine == "pdftotext":
return _get_pdftotext(local_pdf_path, page)
elif pdf_engine == "pdfium":
return _get_pdfium(local_pdf_path, page)
elif pdf_engine == "pypdf":
return _get_pypdf_raw(local_pdf_path, page)
elif pdf_engine == "pymupdf":
return _get_pymupdf(local_pdf_path, page)
elif pdf_engine == "topcoherency":
options = {
"pdftotext": _get_pdftotext(local_pdf_path, page),
"pymupdf": _get_pymupdf(local_pdf_path, page),
"pdfium": _get_pdfium(local_pdf_path, page),
"pypdf_raw": _get_pypdf_raw(local_pdf_path, page)
}
scores = {label: get_document_coherency(text) for label, text in options.items()}
best_option_label = max(scores, key=scores.get)
best_option = options[best_option_label]
print(f"topcoherency chosen: {best_option_label}")
return best_option
elif pdf_engine == "pdfreport":
return _linearize_pdf_report(_pdf_report(local_pdf_path, page))
else:
raise NotImplementedError("Unknown engine")
def _get_pdftotext(local_pdf_path: str, page: int) -> str:
pdftotext_result = subprocess.run(
["pdftotext", "-f", str(page), "-l", str(page), local_pdf_path, "-"],
timeout=60,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
assert pdftotext_result.returncode == 0
return pdftotext_result.stdout.decode("utf-8")
def _get_pymupdf(local_pdf_path: str, page: int) -> str:
pm_doc = pymupdf.open(local_pdf_path)
return pm_doc[page - 1].get_text()
def _get_pypdf_raw(local_pdf_path: str, page: int) -> str:
reader = PdfReader(local_pdf_path)
pypage = reader.pages[page - 1]
return pypage.extract_text()
def _get_pdfium(local_pdf_path: str, page: int) -> str:
pdf = pdfium.PdfDocument(local_pdf_path)
textpage = pdf[page - 1].get_textpage()
return textpage.get_text_bounded()
def _transform_point(x, y, m):
x_new = m[0]*x + m[2]*y + m[4]
y_new = m[1]*x + m[3]*y + m[5]
return x_new, y_new
@dataclass(frozen=True)
class Element:
pass
@dataclass(frozen=True)
class BoundingBox:
x0: float
y0: float
x1: float
y1: float
@staticmethod
def from_rectangle(rect: RectangleObject) -> "BoundingBox":
return BoundingBox(rect[0], rect[1], rect[2], rect[3])
@dataclass(frozen=True)
class TextElement(Element):
text: str
x: float
y: float
@dataclass(frozen=True)
class ImageElement(Element):
name: str
bbox: BoundingBox
@dataclass(frozen=True)
class PageReport:
mediabox: BoundingBox
text_elements: List[TextElement]
image_elements: List[ImageElement]
def _pdf_report(local_pdf_path: str, page: int) -> PageReport:
reader = PdfReader(local_pdf_path)
page = reader.pages[page - 1]
resources = page.get("/Resources", {})
xobjects = resources.get("/XObject", {})
text_elements, image_elements = [], []
def visitor_body(text, cm, tm, font_dict, font_size):
txt2user = mult(tm, cm)
text_elements.append(TextElement(text, txt2user[4], txt2user[5]))
def visitor_op(op, args, cm, tm):
if op == b"Do":
xobject_name = args[0]
xobject = xobjects.get(xobject_name)
if xobject and xobject["/Subtype"] == "/Image":
# Compute image bbox
# The image is placed according to the CTM
width = xobject.get("/Width")
height = xobject.get("/Height")
x0, y0 = _transform_point(0, 0, cm)
x1, y1 = _transform_point(1, 1, cm)
image_elements.append(ImageElement(xobject_name, BoundingBox(min(x0, x1), min(y0, y1), max(x0, x1), max(y0, y1))))
page.extract_text(visitor_text=visitor_body, visitor_operand_before=visitor_op)
return PageReport(
mediabox=BoundingBox.from_rectangle(page.mediabox),
text_elements=text_elements,
image_elements=image_elements,
)
def _merge_image_elements(images: List[ImageElement], tolerance: float=0.5) -> List[ImageElement]:
n = len(images)
parent = list(range(n)) # Initialize Union-Find parent pointers
def find(i):
# Find with path compression
root = i
while parent[root] != root:
root = parent[root]
while parent[i] != i:
parent_i = parent[i]
parent[i] = root
i = parent_i
return root
def union(i, j):
# Union by attaching root of one tree to another
root_i = find(i)
root_j = find(j)
if root_i != root_j:
parent[root_i] = root_j
def bboxes_overlap(b1: BoundingBox, b2: BoundingBox, tolerance: float) -> bool:
# Compute horizontal and vertical distances between boxes
h_dist = max(0, max(b1.x0, b2.x0) - min(b1.x1, b2.x1))
v_dist = max(0, max(b1.y0, b2.y0) - min(b1.y1, b2.y1))
# Check if distances are within tolerance
return h_dist <= tolerance and v_dist <= tolerance
# Union overlapping images
for i in range(n):
for j in range(i + 1, n):
if bboxes_overlap(images[i].bbox, images[j].bbox, tolerance):
union(i, j)
# Group images by their root parent
groups = {}
for i in range(n):
root = find(i)
groups.setdefault(root, []).append(i)
# Merge images in the same group
merged_images = []
for indices in groups.values():
# Initialize merged bounding box
merged_bbox = images[indices[0]].bbox
merged_name = images[indices[0]].name
for idx in indices[1:]:
bbox = images[idx].bbox
# Expand merged_bbox to include the current bbox
merged_bbox = BoundingBox(
x0=min(merged_bbox.x0, bbox.x0),
y0=min(merged_bbox.y0, bbox.y0),
x1=max(merged_bbox.x1, bbox.x1),
y1=max(merged_bbox.y1, bbox.y1),
)
# Optionally, update the name
merged_name += f"+{images[idx].name}"
merged_images.append(ImageElement(name=merged_name, bbox=merged_bbox))
# Return the merged images along with other elements
return merged_images
def _cap_split_string(text: str, max_length: int) -> str:
if len(text) <= max_length:
return text
head_length = max_length // 2 - 3
tail_length = head_length
head = text[:head_length].rsplit(' ', 1)[0] or text[:head_length]
tail = text[-tail_length:].split(' ', 1)[-1] or text[-tail_length:]
return f"{head} ... {tail}"
def _cleanup_element_text(element_text: str) -> str:
MAX_TEXT_ELEMENT_LENGTH = 250
TEXT_REPLACEMENTS = {
"[": "\\[",
"]": "\\]",
"\n": "\\n",
"\r": "\\r",
"\t": "\\t"
}
text_replacement_pattern = re.compile("|".join(re.escape(key) for key in TEXT_REPLACEMENTS.keys()))
element_text = ftfy.fix_text(element_text).strip()
# Replace square brackets with escaped brackets and other escaped chars
element_text = text_replacement_pattern.sub(lambda match: TEXT_REPLACEMENTS[match.group(0)], element_text)
return _cap_split_string(element_text, MAX_TEXT_ELEMENT_LENGTH)
def _linearize_pdf_report(report: PageReport, max_length: int = 4000) -> str:
result = ""
result += f"Page dimensions: {report.mediabox.x1:.1f}x{report.mediabox.y1:.1f}\n"
images = _merge_image_elements(report.image_elements)
# Process image elements
image_strings = []
for element in images:
image_str = f"[Image {element.bbox.x0:.0f}x{element.bbox.y0:.0f} to {element.bbox.x1:.0f}x{element.bbox.y1:.0f}]\n"
# Use element's unique identifier (e.g., id or position) for comparison
image_strings.append((element, image_str))
# Process text elements
text_strings = []
for element in report.text_elements:
if len(element.text.strip()) == 0:
continue
element_text = _cleanup_element_text(element.text)
text_str = f"[{element.x:.0f}x{element.y:.0f}]{element_text}\n"
text_strings.append((element, text_str))
# Combine all elements with their positions for sorting
all_elements = []
for elem, s in image_strings:
position = (elem.bbox.x0, elem.bbox.y0)
all_elements.append(('image', elem, s, position))
for elem, s in text_strings:
position = (elem.x, elem.y)
all_elements.append(('text', elem, s, position))
# Calculate total length
total_length = len(result) + sum(len(s) for _, _, s, _ in all_elements)
if total_length <= max_length:
# Include all elements
for _, _, s, _ in all_elements:
result += s
return result
# Identify elements with min/max coordinates
edge_elements = set()
if images:
min_x0_image = min(images, key=lambda e: e.bbox.x0)
max_x1_image = max(images, key=lambda e: e.bbox.x1)
min_y0_image = min(images, key=lambda e: e.bbox.y0)
max_y1_image = max(images, key=lambda e: e.bbox.y1)
edge_elements.update([min_x0_image, max_x1_image, min_y0_image, max_y1_image])
if report.text_elements:
text_elements = [e for e in report.text_elements if len(e.text.strip()) > 0]
min_x_text = min(text_elements, key=lambda e: e.x)
max_x_text = max(text_elements, key=lambda e: e.x)
min_y_text = min(text_elements, key=lambda e: e.y)
max_y_text = max(text_elements, key=lambda e: e.y)
edge_elements.update([min_x_text, max_x_text, min_y_text, max_y_text])
# Keep track of element IDs to prevent duplication
selected_element_ids = set()
selected_elements = []
# Include edge elements first
for elem_type, elem, s, position in all_elements:
if elem in edge_elements and id(elem) not in selected_element_ids:
selected_elements.append((elem_type, elem, s, position))
selected_element_ids.add(id(elem))
# Calculate remaining length
current_length = len(result) + sum(len(s) for _, _, s, _ in selected_elements)
remaining_length = max_length - current_length
# Exclude edge elements from the pool
remaining_elements = [
(elem_type, elem, s, position) for elem_type, elem, s, position in all_elements
if id(elem) not in selected_element_ids
]
# Sort remaining elements by their positions (e.g., x-coordinate and then y-coordinate)
remaining_elements.sort(key=lambda x: (x[3][0], x[3][1]))
# Add elements until reaching max_length
for elem_type, elem, s, position in remaining_elements:
if current_length + len(s) > max_length:
break
selected_elements.append((elem_type, elem, s, position))
selected_element_ids.add(id(elem))
current_length += len(s)
# Sort selected elements by their positions to maintain logical order
selected_elements.sort(key=lambda x: (x[3][0], x[3][1]))
# Build the final result
for _, _, s, _ in selected_elements:
result += s
return result