mirror of
https://github.com/allenai/olmocr.git
synced 2025-12-24 13:44:41 +00:00
Adding image merging to pdf report/hint/anchor
This commit is contained in:
parent
57d9a21eeb
commit
c8a4d14c57
@ -9,8 +9,7 @@
|
||||
|
||||
# coherency score best of these three
|
||||
import subprocess
|
||||
import sys
|
||||
import json
|
||||
import math
|
||||
import ftfy
|
||||
from dataclasses import dataclass
|
||||
from typing import Literal, List
|
||||
@ -103,7 +102,6 @@ class BoundingBox:
|
||||
def from_rectangle(rect: RectangleObject) -> "BoundingBox":
|
||||
return BoundingBox(rect[0], rect[1], rect[2], rect[3])
|
||||
|
||||
|
||||
@dataclass
|
||||
class TextElement(Element):
|
||||
text: str
|
||||
@ -118,18 +116,20 @@ class ImageElement(Element):
|
||||
@dataclass
|
||||
class PageReport:
|
||||
mediabox: BoundingBox
|
||||
elements: List[Element]
|
||||
text_elements: List[TextElement]
|
||||
image_elements: List[ImageElement]
|
||||
|
||||
def _pdf_report(local_pdf_path: str, page: int) -> PageReport:
|
||||
reader = PdfReader(local_pdf_path)
|
||||
page = reader.pages[page - 1]
|
||||
resources = page.get("/Resources", {})
|
||||
xobjects = resources.get("/XObject", {})
|
||||
elements = []
|
||||
text_elements, image_elements = [], []
|
||||
|
||||
|
||||
def visitor_body(text, cm, tm, font_dict, font_size):
|
||||
txt2user = mult(tm, cm)
|
||||
elements.append(TextElement(text, txt2user[4], txt2user[5]))
|
||||
text_elements.append(TextElement(text, txt2user[4], txt2user[5]))
|
||||
|
||||
def visitor_op(op, args, cm, tm):
|
||||
if op == b"Do":
|
||||
@ -142,30 +142,104 @@ def _pdf_report(local_pdf_path: str, page: int) -> PageReport:
|
||||
height = xobject.get("/Height")
|
||||
x0, y0 = _transform_point(0, 0, cm)
|
||||
x1, y1 = _transform_point(1, 1, cm)
|
||||
elements.append(ImageElement(xobject_name, BoundingBox(min(x0, x1), min(y0, y1), max(x0, x1), max(y0, y1))))
|
||||
image_elements.append(ImageElement(xobject_name, BoundingBox(min(x0, x1), min(y0, y1), max(x0, x1), max(y0, y1))))
|
||||
|
||||
page.extract_text(visitor_text=visitor_body, visitor_operand_before=visitor_op)
|
||||
|
||||
return PageReport(
|
||||
mediabox=BoundingBox.from_rectangle(page.mediabox),
|
||||
elements=elements,
|
||||
text_elements=text_elements,
|
||||
image_elements=image_elements,
|
||||
)
|
||||
|
||||
|
||||
def _merge_image_elements(images: List[ImageElement], tolerance: float=0.5) -> List[ImageElement]:
|
||||
n = len(images)
|
||||
parent = list(range(n)) # Initialize Union-Find parent pointers
|
||||
|
||||
def find(i):
|
||||
# Find with path compression
|
||||
root = i
|
||||
while parent[root] != root:
|
||||
root = parent[root]
|
||||
while parent[i] != i:
|
||||
parent_i = parent[i]
|
||||
parent[i] = root
|
||||
i = parent_i
|
||||
return root
|
||||
|
||||
def union(i, j):
|
||||
# Union by attaching root of one tree to another
|
||||
root_i = find(i)
|
||||
root_j = find(j)
|
||||
if root_i != root_j:
|
||||
parent[root_i] = root_j
|
||||
|
||||
def bboxes_overlap(b1: BoundingBox, b2: BoundingBox, tolerance: float) -> bool:
|
||||
# Compute horizontal and vertical distances between boxes
|
||||
h_dist = max(0, max(b1.x0, b2.x0) - min(b1.x1, b2.x1))
|
||||
v_dist = max(0, max(b1.y0, b2.y0) - min(b1.y1, b2.y1))
|
||||
# Check if distances are within tolerance
|
||||
return h_dist <= tolerance and v_dist <= tolerance
|
||||
|
||||
# Union overlapping images
|
||||
for i in range(n):
|
||||
for j in range(i + 1, n):
|
||||
if bboxes_overlap(images[i].bbox, images[j].bbox, tolerance):
|
||||
union(i, j)
|
||||
|
||||
# Group images by their root parent
|
||||
groups = {}
|
||||
for i in range(n):
|
||||
root = find(i)
|
||||
groups.setdefault(root, []).append(i)
|
||||
|
||||
# Merge images in the same group
|
||||
merged_images = []
|
||||
for indices in groups.values():
|
||||
# Initialize merged bounding box
|
||||
merged_bbox = images[indices[0]].bbox
|
||||
merged_name = images[indices[0]].name
|
||||
|
||||
for idx in indices[1:]:
|
||||
bbox = images[idx].bbox
|
||||
# Expand merged_bbox to include the current bbox
|
||||
merged_bbox = BoundingBox(
|
||||
x0=min(merged_bbox.x0, bbox.x0),
|
||||
y0=min(merged_bbox.y0, bbox.y0),
|
||||
x1=max(merged_bbox.x1, bbox.x1),
|
||||
y1=max(merged_bbox.y1, bbox.y1),
|
||||
)
|
||||
# Optionally, update the name
|
||||
merged_name += f"+{images[idx].name}"
|
||||
|
||||
merged_images.append(ImageElement(name=merged_name, bbox=merged_bbox))
|
||||
|
||||
# Return the merged images along with other elements
|
||||
return merged_images
|
||||
|
||||
|
||||
def _linearize_pdf_report(report: PageReport) -> str:
|
||||
result = ""
|
||||
|
||||
result += f"Page dimensions: {report.mediabox.x1:.1f}x{report.mediabox.y1:.1f}\n"
|
||||
|
||||
for index, element in enumerate(report.elements):
|
||||
if isinstance(element, ImageElement):
|
||||
result += f"[Image {element.bbox.x0:.0f}x{element.bbox.y0:.0f} to {element.bbox.x1:.0f}x{element.bbox.y1:.0f}]"
|
||||
if isinstance(element, TextElement):
|
||||
if len(element.text.strip()) == 0:
|
||||
continue
|
||||
|
||||
# Need to use ftfy to fix text, because occasionally there are invalid surrogate pairs and other UTF issues that cause
|
||||
# pyarrow to fail to load the json later
|
||||
result += f"[{element.x:.0f}x{element.y:.0f}]{ftfy.fix_text(element.text)}"
|
||||
#images = report.image_elements
|
||||
images = _merge_image_elements(report.image_elements)
|
||||
|
||||
for index, element in enumerate(images):
|
||||
result += f"[Image {element.bbox.x0:.0f}x{element.bbox.y0:.0f} to {element.bbox.x1:.0f}x{element.bbox.y1:.0f}]"
|
||||
|
||||
for index, element in enumerate(report.text_elements):
|
||||
if len(element.text.strip()) == 0:
|
||||
continue
|
||||
|
||||
element_text = ftfy.fix_text(element.text)
|
||||
# Replace square brackets with something else not to throw off the syntax
|
||||
element_text = element_text.replace("[", "\[").replace("]", "\[")
|
||||
|
||||
# Need to use ftfy to fix text, because occasionally there are invalid surrogate pairs and other UTF issues that cause
|
||||
# pyarrow to fail to load the json later
|
||||
result += f"[{element.x:.0f}x{element.y:.0f}]{element_text}"
|
||||
|
||||
return result
|
||||
|
||||
4779
tests/gnarly_pdfs/large_prompt_hint1.pdf
Normal file
4779
tests/gnarly_pdfs/large_prompt_hint1.pdf
Normal file
File diff suppressed because one or more lines are too long
BIN
tests/gnarly_pdfs/large_prompt_hint2.pdf
Normal file
BIN
tests/gnarly_pdfs/large_prompt_hint2.pdf
Normal file
Binary file not shown.
BIN
tests/gnarly_pdfs/newspaper.pdf
Normal file
BIN
tests/gnarly_pdfs/newspaper.pdf
Normal file
Binary file not shown.
@ -66,6 +66,31 @@ class AnchorTest(unittest.TestCase):
|
||||
buffer = io.BytesIO(jsondata.encode('utf-8'))
|
||||
paj.read_json(buffer, read_options=paj.ReadOptions(use_threads=False, block_size=len(jsondata)))
|
||||
|
||||
def testLargePromptHint1(self):
|
||||
local_pdf_path = os.path.join(os.path.dirname(__file__), "gnarly_pdfs", "large_prompt_hint1.pdf")
|
||||
|
||||
anchor_text = get_anchor_text(local_pdf_path, 4, pdf_engine="pdfreport")
|
||||
|
||||
print(anchor_text)
|
||||
print(len(anchor_text))
|
||||
|
||||
def testLargePromptHint2(self):
|
||||
local_pdf_path = os.path.join(os.path.dirname(__file__), "gnarly_pdfs", "large_prompt_hint2.pdf")
|
||||
|
||||
anchor_text = get_anchor_text(local_pdf_path, 2, pdf_engine="pdfreport")
|
||||
|
||||
print(anchor_text)
|
||||
print(len(anchor_text))
|
||||
|
||||
def testNewsPaperPromptHint(self):
|
||||
local_pdf_path = os.path.join(os.path.dirname(__file__), "gnarly_pdfs", "newspaper.pdf")
|
||||
|
||||
anchor_text = get_anchor_text(local_pdf_path, 1, pdf_engine="pdfreport")
|
||||
|
||||
print(anchor_text)
|
||||
print(len(anchor_text))
|
||||
|
||||
|
||||
|
||||
|
||||
class BuildSilverTest(unittest.TestCase):
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user