2024-01-15 08:46:22 +08:00
# -*- coding: utf-8 -*-
2024-02-05 18:08:17 +08:00
import os
2024-01-30 18:28:09 +08:00
import random
2024-01-19 19:51:57 +08:00
import fitz
2024-02-05 18:08:17 +08:00
import requests
2023-12-14 19:19:03 +08:00
import xgboost as xgb
2023-12-25 19:05:59 +08:00
from io import BytesIO
2023-12-14 19:19:03 +08:00
import torch
import re
import pdfplumber
import logging
from PIL import Image
import numpy as np
2024-02-05 18:08:17 +08:00
from api . db import ParserType
2024-02-21 16:32:38 +08:00
from deepdoc . visual import OCR , Recognizer
2024-01-15 08:46:22 +08:00
from rag . nlp import huqie
2023-12-14 19:19:03 +08:00
from collections import Counter
from copy import deepcopy
from huggingface_hub import hf_hub_download
2024-01-30 18:28:09 +08:00
2024-02-05 18:08:17 +08:00
2023-12-14 19:19:03 +08:00
logging . getLogger ( " pdfminer " ) . setLevel ( logging . WARNING )
class HuParser :
def __init__ ( self ) :
2024-02-21 16:32:38 +08:00
self . ocr = OCR ( )
2024-02-05 18:08:17 +08:00
if not hasattr ( self , " model_speciess " ) :
self . model_speciess = ParserType . GENERAL . value
2024-02-21 16:32:38 +08:00
self . layout_labels = [
" _background_ " ,
" Text " ,
" Title " ,
" Figure " ,
" Figure caption " ,
" Table " ,
" Table caption " ,
" Header " ,
" Footer " ,
" Reference " ,
" Equation " ,
]
self . tsr_labels = [
" table " ,
" table column " ,
" table row " ,
" table column header " ,
" table projected row header " ,
" table spanning cell " ,
]
self . layouter = Recognizer ( self . layout_labels , " layout " , " /data/newpeak/medical-gpt/res/ppdet/ " )
self . tbl_det = Recognizer ( self . tsr_labels , " tsr " , " /data/newpeak/medical-gpt/res/ppdet.tbl/ " )
2023-12-14 19:19:03 +08:00
self . updown_cnt_mdl = xgb . Booster ( )
if torch . cuda . is_available ( ) :
self . updown_cnt_mdl . set_param ( { " device " : " cuda " } )
self . updown_cnt_mdl . load_model ( hf_hub_download ( repo_id = " InfiniFlow/text_concat_xgb_v1.0 " ,
filename = " updown_concat_xgb.model " ) )
"""
If you have trouble downloading HuggingFace models , - _ ^ this might help ! !
For Linux :
export HF_ENDPOINT = https : / / hf - mirror . com
For Windows :
Good luck
^ _ -
"""
2024-02-05 18:08:17 +08:00
def __remote_call ( self , species , images , thr = 0.7 ) :
url = os . environ . get ( " INFINIFLOW_SERVER " )
token = os . environ . get ( " INFINIFLOW_TOKEN " )
2024-02-19 19:22:17 +08:00
if not url or not token :
logging . warning ( " INFINIFLOW_SERVER is not specified. To maximize the effectiveness, please visit https://github.com/infiniflow/ragflow, and sign in the our demo web site to get token. It ' s FREE! Using ' export ' to set both environment variables: INFINIFLOW_SERVER and INFINIFLOW_TOKEN. " )
2024-02-21 16:32:38 +08:00
return [ [ ] for _ in range ( len ( images ) ) ]
2024-02-05 18:08:17 +08:00
def convert_image_to_bytes ( PILimage ) :
image = BytesIO ( )
PILimage . save ( image , format = ' png ' )
image . seek ( 0 )
return image . getvalue ( )
images = [ convert_image_to_bytes ( img ) for img in images ]
def remote_call ( ) :
nonlocal images , thr
res = requests . post ( url + " /v1/layout/detect/ " + species , files = [ ( " image " , img ) for img in images ] , data = { " threashold " : thr } ,
headers = { " Authorization " : token } , timeout = len ( images ) * 10 )
res = res . json ( )
if res [ " retcode " ] != 0 : raise RuntimeError ( res [ " retmsg " ] )
return res [ " data " ]
for _ in range ( 3 ) :
try :
return remote_call ( )
except RuntimeError as e :
raise e
except Exception as e :
logging . error ( " layout_predict: " + str ( e ) )
return remote_call ( )
2023-12-14 19:19:03 +08:00
def __char_width ( self , c ) :
return ( c [ " x1 " ] - c [ " x0 " ] ) / / len ( c [ " text " ] )
def __height ( self , c ) :
return c [ " bottom " ] - c [ " top " ]
def _x_dis ( self , a , b ) :
return min ( abs ( a [ " x1 " ] - b [ " x0 " ] ) , abs ( a [ " x0 " ] - b [ " x1 " ] ) ,
abs ( a [ " x0 " ] + a [ " x1 " ] - b [ " x0 " ] - b [ " x1 " ] ) / 2 )
def _y_dis (
self , a , b ) :
return (
2024-01-30 18:28:09 +08:00
b [ " top " ] + b [ " bottom " ] - a [ " top " ] - a [ " bottom " ] ) / 2
2023-12-14 19:19:03 +08:00
def _match_proj ( self , b ) :
proj_patt = [
r " 第[零一二三四五六七八九十百]+章 " ,
r " 第[零一二三四五六七八九十百]+[条节] " ,
r " [零一二三四五六七八九十百]+[、是 ] " ,
r " [ \ (( ][零一二三四五六七八九十百]+[) \ )] " ,
r " [ \ (( ][0-9]+[) \ )] " ,
r " [0-9]+(、| \ .[ ]|) | \ .[^0-9./a-zA-Z_ % ><-] { 4,}) " ,
r " [0-9]+ \ .[0-9.]+(、| \ .[ ]) " ,
r " [⚫•➢①② ] " ,
]
return any ( [ re . match ( p , b [ " text " ] ) for p in proj_patt ] )
def _updown_concat_features ( self , up , down ) :
w = max ( self . __char_width ( up ) , self . __char_width ( down ) )
h = max ( self . __height ( up ) , self . __height ( down ) )
y_dis = self . _y_dis ( up , down )
LEN = 6
tks_down = huqie . qie ( down [ " text " ] [ : LEN ] ) . split ( " " )
tks_up = huqie . qie ( up [ " text " ] [ - LEN : ] ) . split ( " " )
tks_all = up [ " text " ] [ - LEN : ] . strip ( ) \
2024-01-30 18:28:09 +08:00
+ ( " " if re . match ( r " [a-zA-Z0-9]+ " ,
up [ " text " ] [ - 1 ] + down [ " text " ] [ 0 ] ) else " " ) \
+ down [ " text " ] [ : LEN ] . strip ( )
2023-12-14 19:19:03 +08:00
tks_all = huqie . qie ( tks_all ) . split ( " " )
fea = [
up . get ( " R " , - 1 ) == down . get ( " R " , - 1 ) ,
y_dis / h ,
down [ " page_number " ] - up [ " page_number " ] ,
up [ " layout_type " ] == down [ " layout_type " ] ,
up [ " layout_type " ] == " text " ,
down [ " layout_type " ] == " text " ,
up [ " layout_type " ] == " table " ,
down [ " layout_type " ] == " table " ,
True if re . search (
r " ([。?!;!?;+)) ]|[a-z] \ .)$ " ,
up [ " text " ] ) else False ,
True if re . search ( r " [, : ‘ “、0-9( +-]$ " , up [ " text " ] ) else False ,
True if re . search (
r " (^.?[/,?;: \ ],。;:’”?!》】)-]) " ,
down [ " text " ] ) else False ,
True if re . match ( r " [ \ (( ][^ \ ( \ )( ) ]+[) \ )]$ " , up [ " text " ] ) else False ,
True if re . search ( r " [, ,][^。.]+$ " , up [ " text " ] ) else False ,
True if re . search ( r " [, ,][^。.]+$ " , up [ " text " ] ) else False ,
True if re . search ( r " [ \ (( ][^ \ )) ]+$ " , up [ " text " ] )
2024-01-30 18:28:09 +08:00
and re . search ( r " [ \ )) ] " , down [ " text " ] ) else False ,
2023-12-14 19:19:03 +08:00
self . _match_proj ( down ) ,
True if re . match ( r " [A-Z] " , down [ " text " ] ) else False ,
True if re . match ( r " [A-Z] " , up [ " text " ] [ - 1 ] ) else False ,
True if re . match ( r " [a-z0-9] " , up [ " text " ] [ - 1 ] ) else False ,
True if re . match ( r " [0-9. % ,-]+$ " , down [ " text " ] ) else False ,
up [ " text " ] . strip ( ) [ - 2 : ] == down [ " text " ] . strip ( ) [ - 2 : ] if len ( up [ " text " ] . strip ( )
) > 1 and len (
down [ " text " ] . strip ( ) ) > 1 else False ,
up [ " x0 " ] > down [ " x1 " ] ,
abs ( self . __height ( up ) - self . __height ( down ) ) / min ( self . __height ( up ) ,
self . __height ( down ) ) ,
self . _x_dis ( up , down ) / max ( w , 0.000001 ) ,
( len ( up [ " text " ] ) - len ( down [ " text " ] ) ) /
max ( len ( up [ " text " ] ) , len ( down [ " text " ] ) ) ,
len ( tks_all ) - len ( tks_up ) - len ( tks_down ) ,
len ( tks_down ) - len ( tks_up ) ,
tks_down [ - 1 ] == tks_up [ - 1 ] ,
max ( down [ " in_row " ] , up [ " in_row " ] ) ,
abs ( down [ " in_row " ] - up [ " in_row " ] ) ,
len ( tks_down ) == 1 and huqie . tag ( tks_down [ 0 ] ) . find ( " n " ) > = 0 ,
len ( tks_up ) == 1 and huqie . tag ( tks_up [ 0 ] ) . find ( " n " ) > = 0
]
return fea
@staticmethod
def sort_Y_firstly ( arr , threashold ) :
# sort using y1 first and then x1
arr = sorted ( arr , key = lambda r : ( r [ " top " ] , r [ " x0 " ] ) )
for i in range ( len ( arr ) - 1 ) :
for j in range ( i , - 1 , - 1 ) :
# restore the order using th
if abs ( arr [ j + 1 ] [ " top " ] - arr [ j ] [ " top " ] ) < threashold \
and arr [ j + 1 ] [ " x0 " ] < arr [ j ] [ " x0 " ] :
tmp = deepcopy ( arr [ j ] )
arr [ j ] = deepcopy ( arr [ j + 1 ] )
arr [ j + 1 ] = deepcopy ( tmp )
return arr
2024-01-30 18:28:09 +08:00
@staticmethod
def sort_X_by_page ( arr , threashold ) :
# sort using y1 first and then x1
arr = sorted ( arr , key = lambda r : ( r [ " page_number " ] , r [ " x0 " ] , r [ " top " ] ) )
for i in range ( len ( arr ) - 1 ) :
for j in range ( i , - 1 , - 1 ) :
# restore the order using th
if abs ( arr [ j + 1 ] [ " x0 " ] - arr [ j ] [ " x0 " ] ) < threashold \
and arr [ j + 1 ] [ " top " ] < arr [ j ] [ " top " ] \
and arr [ j + 1 ] [ " page_number " ] == arr [ j ] [ " page_number " ] :
tmp = arr [ j ]
arr [ j ] = arr [ j + 1 ]
arr [ j + 1 ] = tmp
return arr
2023-12-14 19:19:03 +08:00
@staticmethod
def sort_R_firstly ( arr , thr = 0 ) :
# sort using y1 first and then x1
# sorted(arr, key=lambda r: (r["top"], r["x0"]))
arr = HuParser . sort_Y_firstly ( arr , thr )
for i in range ( len ( arr ) - 1 ) :
for j in range ( i , - 1 , - 1 ) :
if " R " not in arr [ j ] or " R " not in arr [ j + 1 ] :
continue
if arr [ j + 1 ] [ " R " ] < arr [ j ] [ " R " ] \
or (
arr [ j + 1 ] [ " R " ] == arr [ j ] [ " R " ]
and arr [ j + 1 ] [ " x0 " ] < arr [ j ] [ " x0 " ]
) :
tmp = arr [ j ]
arr [ j ] = arr [ j + 1 ]
arr [ j + 1 ] = tmp
return arr
@staticmethod
def sort_X_firstly ( arr , threashold , copy = True ) :
# sort using y1 first and then x1
arr = sorted ( arr , key = lambda r : ( r [ " x0 " ] , r [ " top " ] ) )
for i in range ( len ( arr ) - 1 ) :
for j in range ( i , - 1 , - 1 ) :
# restore the order using th
if abs ( arr [ j + 1 ] [ " x0 " ] - arr [ j ] [ " x0 " ] ) < threashold \
and arr [ j + 1 ] [ " top " ] < arr [ j ] [ " top " ] :
tmp = deepcopy ( arr [ j ] ) if copy else arr [ j ]
arr [ j ] = deepcopy ( arr [ j + 1 ] ) if copy else arr [ j + 1 ]
arr [ j + 1 ] = deepcopy ( tmp ) if copy else tmp
return arr
@staticmethod
def sort_C_firstly ( arr , thr = 0 ) :
# sort using y1 first and then x1
# sorted(arr, key=lambda r: (r["x0"], r["top"]))
arr = HuParser . sort_X_firstly ( arr , thr )
for i in range ( len ( arr ) - 1 ) :
for j in range ( i , - 1 , - 1 ) :
# restore the order using th
if " C " not in arr [ j ] or " C " not in arr [ j + 1 ] :
continue
if arr [ j + 1 ] [ " C " ] < arr [ j ] [ " C " ] \
or (
arr [ j + 1 ] [ " C " ] == arr [ j ] [ " C " ]
and arr [ j + 1 ] [ " top " ] < arr [ j ] [ " top " ]
) :
tmp = arr [ j ]
arr [ j ] = arr [ j + 1 ]
arr [ j + 1 ] = tmp
return arr
return sorted ( arr , key = lambda r : ( r . get ( " C " , r [ " x0 " ] ) , r [ " top " ] ) )
def _has_color ( self , o ) :
if o . get ( " ncs " , " " ) == " DeviceGray " :
if o [ " stroking_color " ] and o [ " stroking_color " ] [ 0 ] == 1 and o [ " non_stroking_color " ] and \
o [ " non_stroking_color " ] [ 0 ] == 1 :
if re . match ( r " [a-zT_ \ [ \ ] \ ( \ )-]+ " , o . get ( " text " , " " ) ) :
return False
return True
def __overlapped_area ( self , a , b , ratio = True ) :
tp , btm , x0 , x1 = a [ " top " ] , a [ " bottom " ] , a [ " x0 " ] , a [ " x1 " ]
if b [ " x0 " ] > x1 or b [ " x1 " ] < x0 :
return 0
if b [ " bottom " ] < tp or b [ " top " ] > btm :
return 0
x0_ = max ( b [ " x0 " ] , x0 )
x1_ = min ( b [ " x1 " ] , x1 )
assert x0_ < = x1_ , " Fuckedup! T: {} ,B: {} ,X0: {} ,X1: {} ==> {} " . format (
tp , btm , x0 , x1 , b )
tp_ = max ( b [ " top " ] , tp )
btm_ = min ( b [ " bottom " ] , btm )
assert tp_ < = btm_ , " Fuckedup! T: {} ,B: {} ,X0: {} ,X1: {} => {} " . format (
tp , btm , x0 , x1 , b )
ov = ( btm_ - tp_ ) * ( x1_ - x0_ ) if x1 - \
2024-01-30 18:28:09 +08:00
x0 != 0 and btm - tp != 0 else 0
2023-12-14 19:19:03 +08:00
if ov > 0 and ratio :
ov / = ( x1 - x0 ) * ( btm - tp )
return ov
def __find_overlapped_with_threashold ( self , box , boxes , thr = 0.3 ) :
if not boxes :
return
max_overlaped_i , max_overlaped , _max_overlaped = None , thr , 0
s , e = 0 , len ( boxes )
for i in range ( s , e ) :
ov = self . __overlapped_area ( box , boxes [ i ] )
_ov = self . __overlapped_area ( boxes [ i ] , box )
if ( ov , _ov ) < ( max_overlaped , _max_overlaped ) :
continue
max_overlaped_i = i
max_overlaped = ov
_max_overlaped = _ov
return max_overlaped_i
def __find_overlapped ( self , box , boxes_sorted_by_y , naive = False ) :
if not boxes_sorted_by_y :
return
bxs = boxes_sorted_by_y
s , e , ii = 0 , len ( bxs ) , 0
while s < e and not naive :
ii = ( e + s ) / / 2
pv = bxs [ ii ]
if box [ " bottom " ] < pv [ " top " ] :
e = ii
continue
if box [ " top " ] > pv [ " bottom " ] :
s = ii + 1
continue
break
while s < ii :
if box [ " top " ] > bxs [ s ] [ " bottom " ] :
s + = 1
break
while e - 1 > ii :
if box [ " bottom " ] < bxs [ e - 1 ] [ " top " ] :
e - = 1
break
max_overlaped_i , max_overlaped = None , 0
for i in range ( s , e ) :
ov = self . __overlapped_area ( bxs [ i ] , box )
if ov < = max_overlaped :
continue
max_overlaped_i = i
max_overlaped = ov
return max_overlaped_i
def _is_garbage ( self , b ) :
patt = [ r " ^•+$ " , r " (版权归©|免责条款|地址[:: ]) " , r " \ . { 3,} " , " ^[0-9] { 1,2} / ?[0-9] { 1,2}$ " ,
r " ^[0-9] { 1,2} of [0-9] { 1,2}$ " , " ^http://[^ ] { 12,} " ,
" (资料|数据)来源[:: ] " , " [0-9a-z._-]+@[a-z0-9-]+ \\ .[a-z] { 2,3} " ,
" \\ (cid *: *[0-9]+ * \\ ) "
]
return any ( [ re . search ( p , b [ " text " ] ) for p in patt ] )
def __layouts_cleanup ( self , boxes , layouts , far = 2 , thr = 0.7 ) :
def notOverlapped ( a , b ) :
return any ( [ a [ " x1 " ] < b [ " x0 " ] ,
a [ " x0 " ] > b [ " x1 " ] ,
a [ " bottom " ] < b [ " top " ] ,
a [ " top " ] > b [ " bottom " ] ] )
i = 0
while i + 1 < len ( layouts ) :
j = i + 1
while j < min ( i + far , len ( layouts ) ) \
and ( layouts [ i ] . get ( " type " , " " ) != layouts [ j ] . get ( " type " , " " )
or notOverlapped ( layouts [ i ] , layouts [ j ] ) ) :
j + = 1
if j > = min ( i + far , len ( layouts ) ) :
i + = 1
continue
if self . __overlapped_area ( layouts [ i ] , layouts [ j ] ) < thr \
and self . __overlapped_area ( layouts [ j ] , layouts [ i ] ) < thr :
i + = 1
continue
if layouts [ i ] . get ( " score " ) and layouts [ j ] . get ( " score " ) :
if layouts [ i ] [ " score " ] > layouts [ j ] [ " score " ] :
layouts . pop ( j )
else :
layouts . pop ( i )
continue
area_i , area_i_1 = 0 , 0
for b in boxes :
if not notOverlapped ( b , layouts [ i ] ) :
area_i + = self . __overlapped_area ( b , layouts [ i ] , False )
if not notOverlapped ( b , layouts [ j ] ) :
area_i_1 + = self . __overlapped_area ( b , layouts [ j ] , False )
if area_i > area_i_1 :
layouts . pop ( j )
else :
layouts . pop ( i )
return layouts
2024-02-21 16:32:38 +08:00
def __table_tsr ( self , images ) :
2024-02-05 18:08:17 +08:00
tbls = self . tbl_det ( images , thr = 0.5 )
2023-12-14 19:19:03 +08:00
res = [ ]
# align left&right for rows, align top&bottom for columns
for tbl in tbls :
lts = [ { " label " : b [ " type " ] ,
" score " : b [ " score " ] ,
" x0 " : b [ " bbox " ] [ 0 ] , " x1 " : b [ " bbox " ] [ 2 ] ,
" top " : b [ " bbox " ] [ 1 ] , " bottom " : b [ " bbox " ] [ - 1 ]
} for b in tbl ]
if not lts :
continue
left = [ b [ " x0 " ] for b in lts if b [ " label " ] . find (
" row " ) > 0 or b [ " label " ] . find ( " header " ) > 0 ]
right = [ b [ " x1 " ] for b in lts if b [ " label " ] . find (
" row " ) > 0 or b [ " label " ] . find ( " header " ) > 0 ]
if not left :
continue
left = np . median ( left ) if len ( left ) > 4 else np . min ( left )
right = np . median ( right ) if len ( right ) > 4 else np . max ( right )
for b in lts :
if b [ " label " ] . find ( " row " ) > 0 or b [ " label " ] . find ( " header " ) > 0 :
if b [ " x0 " ] > left :
b [ " x0 " ] = left
if b [ " x1 " ] < right :
b [ " x1 " ] = right
top = [ b [ " top " ] for b in lts if b [ " label " ] == " table column " ]
bottom = [ b [ " bottom " ] for b in lts if b [ " label " ] == " table column " ]
if not top :
res . append ( lts )
continue
top = np . median ( top ) if len ( top ) > 4 else np . min ( top )
bottom = np . median ( bottom ) if len ( bottom ) > 4 else np . max ( bottom )
for b in lts :
if b [ " label " ] == " table column " :
if b [ " top " ] > top :
b [ " top " ] = top
if b [ " bottom " ] < bottom :
b [ " bottom " ] = bottom
res . append ( lts )
return res
2024-01-25 18:57:39 +08:00
def _table_transformer_job ( self , ZM ) :
2023-12-14 19:19:03 +08:00
logging . info ( " Table processing... " )
imgs , pos = [ ] , [ ]
tbcnt = [ 0 ]
MARGIN = 10
self . tb_cpns = [ ]
assert len ( self . page_layout ) == len ( self . page_images )
for p , tbls in enumerate ( self . page_layout ) : # for page
tbls = [ f for f in tbls if f [ " type " ] == " table " ]
tbcnt . append ( len ( tbls ) )
if not tbls :
continue
for tb in tbls : # for table
left , top , right , bott = tb [ " x0 " ] - MARGIN , tb [ " top " ] - MARGIN , \
2024-01-30 18:28:09 +08:00
tb [ " x1 " ] + MARGIN , tb [ " bottom " ] + MARGIN
2023-12-14 19:19:03 +08:00
left * = ZM
top * = ZM
right * = ZM
bott * = ZM
pos . append ( ( left , top ) )
imgs . append ( self . page_images [ p ] . crop ( ( left , top , right , bott ) ) )
assert len ( self . page_images ) == len ( tbcnt ) - 1
if not imgs :
return
2024-02-21 16:32:38 +08:00
recos = self . __table_tsr ( imgs )
2023-12-14 19:19:03 +08:00
tbcnt = np . cumsum ( tbcnt )
for i in range ( len ( tbcnt ) - 1 ) : # for page
pg = [ ]
for j , tb_items in enumerate (
recos [ tbcnt [ i ] : tbcnt [ i + 1 ] ] ) : # for table
poss = pos [ tbcnt [ i ] : tbcnt [ i + 1 ] ]
for it in tb_items : # for table components
it [ " x0 " ] = ( it [ " x0 " ] + poss [ j ] [ 0 ] )
it [ " x1 " ] = ( it [ " x1 " ] + poss [ j ] [ 0 ] )
it [ " top " ] = ( it [ " top " ] + poss [ j ] [ 1 ] )
it [ " bottom " ] = ( it [ " bottom " ] + poss [ j ] [ 1 ] )
for n in [ " x0 " , " x1 " , " top " , " bottom " ] :
it [ n ] / = ZM
it [ " top " ] + = self . page_cum_height [ i ]
it [ " bottom " ] + = self . page_cum_height [ i ]
it [ " pn " ] = i
it [ " layoutno " ] = j
pg . append ( it )
self . tb_cpns . extend ( pg )
2024-01-25 18:57:39 +08:00
def gather ( kwd , fzy = 10 , ption = 0.6 ) :
eles = self . sort_Y_firstly (
[ r for r in self . tb_cpns if re . match ( kwd , r [ " label " ] ) ] , fzy )
eles = self . __layouts_cleanup ( self . boxes , eles , 5 , ption )
return self . sort_Y_firstly ( eles , 0 )
# add R,H,C,SP tag to boxes within table layout
headers = gather ( r " .*header$ " )
rows = gather ( r " .* (row|header) " )
spans = gather ( r " .*spanning " )
clmns = sorted ( [ r for r in self . tb_cpns if re . match (
r " table column$ " , r [ " label " ] ) ] , key = lambda x : ( x [ " pn " ] , x [ " layoutno " ] , x [ " x0 " ] ) )
clmns = self . __layouts_cleanup ( self . boxes , clmns , 5 , 0.5 )
for b in self . boxes :
if b . get ( " layout_type " , " " ) != " table " :
continue
ii = self . __find_overlapped_with_threashold ( b , rows , thr = 0.3 )
if ii is not None :
b [ " R " ] = ii
b [ " R_top " ] = rows [ ii ] [ " top " ]
b [ " R_bott " ] = rows [ ii ] [ " bottom " ]
ii = self . __find_overlapped_with_threashold ( b , headers , thr = 0.3 )
if ii is not None :
b [ " H_top " ] = headers [ ii ] [ " top " ]
b [ " H_bott " ] = headers [ ii ] [ " bottom " ]
b [ " H_left " ] = headers [ ii ] [ " x0 " ]
b [ " H_right " ] = headers [ ii ] [ " x1 " ]
b [ " H " ] = ii
ii = self . __find_overlapped_with_threashold ( b , clmns , thr = 0.3 )
if ii is not None :
b [ " C " ] = ii
b [ " C_left " ] = clmns [ ii ] [ " x0 " ]
b [ " C_right " ] = clmns [ ii ] [ " x1 " ]
ii = self . __find_overlapped_with_threashold ( b , spans , thr = 0.3 )
if ii is not None :
b [ " H_top " ] = spans [ ii ] [ " top " ]
b [ " H_bott " ] = spans [ ii ] [ " bottom " ]
b [ " H_left " ] = spans [ ii ] [ " x0 " ]
b [ " H_right " ] = spans [ ii ] [ " x1 " ]
b [ " SP " ] = ii
2024-02-21 16:32:38 +08:00
def __ocr ( self , pagenum , img , chars , ZM = 3 ) :
bxs = self . ocr ( np . array ( img ) )
2023-12-14 19:19:03 +08:00
if not bxs :
self . boxes . append ( [ ] )
return
bxs = [ ( line [ 0 ] , line [ 1 ] [ 0 ] ) for line in bxs ]
bxs = self . sort_Y_firstly (
[ { " x0 " : b [ 0 ] [ 0 ] / ZM , " x1 " : b [ 1 ] [ 0 ] / ZM ,
" top " : b [ 0 ] [ 1 ] / ZM , " text " : " " , " txt " : t ,
" bottom " : b [ - 1 ] [ 1 ] / ZM ,
" page_number " : pagenum } for b , t in bxs if b [ 0 ] [ 0 ] < = b [ 1 ] [ 0 ] and b [ 0 ] [ 1 ] < = b [ - 1 ] [ 1 ] ] ,
self . mean_height [ - 1 ] / 3
)
# merge chars in the same rect
for c in self . sort_X_firstly ( chars , self . mean_width [ pagenum - 1 ] / / 4 ) :
ii = self . __find_overlapped ( c , bxs )
if ii is None :
self . lefted_chars . append ( c )
continue
ch = c [ " bottom " ] - c [ " top " ]
bh = bxs [ ii ] [ " bottom " ] - bxs [ ii ] [ " top " ]
2024-01-30 18:28:09 +08:00
if abs ( ch - bh ) / max ( ch , bh ) > = 0.7 and c [ " text " ] != ' ' :
2023-12-14 19:19:03 +08:00
self . lefted_chars . append ( c )
continue
2024-01-30 18:28:09 +08:00
if c [ " text " ] == " " and bxs [ ii ] [ " text " ] :
if re . match ( r " [0-9a-zA-Z,.?;:! %% ] " , bxs [ ii ] [ " text " ] [ - 1 ] ) : bxs [ ii ] [ " text " ] + = " "
else :
bxs [ ii ] [ " text " ] + = c [ " text " ]
2023-12-14 19:19:03 +08:00
for b in bxs :
if not b [ " text " ] :
b [ " text " ] = b [ " txt " ]
del b [ " txt " ]
if self . mean_height [ - 1 ] == 0 :
self . mean_height [ - 1 ] = np . median ( [ b [ " bottom " ] - b [ " top " ]
for b in bxs ] )
self . boxes . append ( bxs )
2024-02-21 16:32:38 +08:00
def _layouts_rec ( self , ZM ) :
2023-12-14 19:19:03 +08:00
assert len ( self . page_images ) == len ( self . boxes )
# Tag layout type
boxes = [ ]
2024-02-05 18:08:17 +08:00
layouts = self . layouter ( self . page_images )
2024-02-21 16:32:38 +08:00
#save_results(self.page_images, layouts, self.layout_labels, output_dir='output/', threshold=0.7)
2023-12-14 19:19:03 +08:00
assert len ( self . page_images ) == len ( layouts )
for pn , lts in enumerate ( layouts ) :
bxs = self . boxes [ pn ]
lts = [ { " type " : b [ " type " ] ,
" score " : float ( b [ " score " ] ) ,
" x0 " : b [ " bbox " ] [ 0 ] / ZM , " x1 " : b [ " bbox " ] [ 2 ] / ZM ,
" top " : b [ " bbox " ] [ 1 ] / ZM , " bottom " : b [ " bbox " ] [ - 1 ] / ZM ,
" page_number " : pn ,
} for b in lts ]
lts = self . sort_Y_firstly ( lts , self . mean_height [ pn ] / 2 )
lts = self . __layouts_cleanup ( bxs , lts )
self . page_layout . append ( lts )
# Tag layout type, layouts are ready
def findLayout ( ty ) :
nonlocal bxs , lts
lts_ = [ lt for lt in lts if lt [ " type " ] == ty ]
i = 0
while i < len ( bxs ) :
if bxs [ i ] . get ( " layout_type " ) :
i + = 1
continue
if self . _is_garbage ( bxs [ i ] ) :
logging . debug ( " GARBAGE: " + bxs [ i ] [ " text " ] )
bxs . pop ( i )
continue
ii = self . __find_overlapped_with_threashold ( bxs [ i ] , lts_ ,
thr = 0.4 )
if ii is None : # belong to nothing
bxs [ i ] [ " layout_type " ] = " "
i + = 1
continue
lts_ [ ii ] [ " visited " ] = True
if lts_ [ ii ] [ " type " ] in [ " footer " , " header " , " reference " ] :
if lts_ [ ii ] [ " type " ] not in self . garbages :
self . garbages [ lts_ [ ii ] [ " type " ] ] = [ ]
self . garbages [ lts_ [ ii ] [ " type " ] ] . append ( bxs [ i ] [ " text " ] )
logging . debug ( " GARBAGE: " + bxs [ i ] [ " text " ] )
bxs . pop ( i )
continue
bxs [ i ] [ " layoutno " ] = f " { ty } - { ii } "
bxs [ i ] [ " layout_type " ] = lts_ [ ii ] [ " type " ]
i + = 1
for lt in [ " footer " , " header " , " reference " , " figure caption " ,
" table caption " , " title " , " text " , " table " , " figure " ] :
findLayout ( lt )
# add box to figure layouts which has not text box
for i , lt in enumerate (
[ lt for lt in lts if lt [ " type " ] == " figure " ] ) :
if lt . get ( " visited " ) :
continue
lt = deepcopy ( lt )
del lt [ " type " ]
lt [ " text " ] = " "
lt [ " layout_type " ] = " figure "
lt [ " layoutno " ] = f " figure- { i } "
bxs . append ( lt )
boxes . extend ( bxs )
self . boxes = boxes
2024-01-25 18:57:39 +08:00
garbage = set ( )
for k in self . garbages . keys ( ) :
self . garbages [ k ] = Counter ( self . garbages [ k ] )
for g , c in self . garbages [ k ] . items ( ) :
if c > 1 :
garbage . add ( g )
logging . debug ( " GARBAGE: " + " , " . join ( garbage ) )
self . boxes = [ b for b in self . boxes if b [ " text " ] . strip ( ) not in garbage ]
# cumlative Y
for i in range ( len ( self . boxes ) ) :
self . boxes [ i ] [ " top " ] + = \
self . page_cum_height [ self . boxes [ i ] [ " page_number " ] - 1 ]
self . boxes [ i ] [ " bottom " ] + = \
self . page_cum_height [ self . boxes [ i ] [ " page_number " ] - 1 ]
def _text_merge ( self ) :
2023-12-14 19:19:03 +08:00
# merge adjusted boxes
bxs = self . boxes
def end_with ( b , txt ) :
txt = txt . strip ( )
tt = b . get ( " text " , " " ) . strip ( )
return tt and tt . find ( txt ) == len ( tt ) - len ( txt )
def start_with ( b , txts ) :
tt = b . get ( " text " , " " ) . strip ( )
return tt and any ( [ tt . find ( t . strip ( ) ) == 0 for t in txts ] )
2024-01-25 18:57:39 +08:00
# horizontally merge adjacent box with the same layout
2023-12-14 19:19:03 +08:00
i = 0
while i < len ( bxs ) - 1 :
b = bxs [ i ]
b_ = bxs [ i + 1 ]
if b . get ( " layoutno " , " 0 " ) != b_ . get ( " layoutno " , " 1 " ) :
i + = 1
continue
dis_thr = 1
dis = b [ " x1 " ] - b_ [ " x0 " ]
if b . get ( " layout_type " , " " ) != " text " or b_ . get (
" layout_type " , " " ) != " text " :
if end_with ( b , " , " ) or start_with ( b_ , " ( , " ) :
dis_thr = - 8
else :
i + = 1
continue
if abs ( self . _y_dis ( b , b_ ) ) < self . mean_height [ bxs [ i ] [ " page_number " ] - 1 ] / 5 \
and dis > = dis_thr and b [ " x1 " ] < b_ [ " x1 " ] :
# merge
bxs [ i ] [ " x1 " ] = b_ [ " x1 " ]
bxs [ i ] [ " top " ] = ( b [ " top " ] + b_ [ " top " ] ) / 2
bxs [ i ] [ " bottom " ] = ( b [ " bottom " ] + b_ [ " bottom " ] ) / 2
bxs [ i ] [ " text " ] + = b_ [ " text " ]
bxs . pop ( i + 1 )
continue
i + = 1
self . boxes = bxs
2024-02-02 19:21:37 +08:00
def _naive_vertical_merge ( self ) :
bxs = self . sort_Y_firstly ( self . boxes , np . median ( self . mean_height ) / 3 )
i = 0
while i + 1 < len ( bxs ) :
b = bxs [ i ]
b_ = bxs [ i + 1 ]
if b [ " page_number " ] < b_ [ " page_number " ] and re . match ( r " [0-9 •一—-]+$ " , b [ " text " ] ) :
bxs . pop ( i )
continue
concatting_feats = [
b [ " text " ] . strip ( ) [ - 1 ] in " ,;: ' \" ,、‘“;:- " ,
len ( b [ " text " ] . strip ( ) ) > 1 and b [ " text " ] . strip ( ) [ - 2 ] in " ,;: ' \" ,‘“、;: " ,
b [ " text " ] . strip ( ) [ 0 ] in " 。;?!?”)),,、: " ,
]
# features for not concating
feats = [
b . get ( " layoutno " , 0 ) != b . get ( " layoutno " , 0 ) ,
b [ " text " ] . strip ( ) [ - 1 ] in " 。?!? " ,
self . is_english and b [ " text " ] . strip ( ) [ - 1 ] in " .!? " ,
b [ " page_number " ] == b_ [ " page_number " ] and b_ [ " top " ] - \
b [ " bottom " ] > self . mean_height [ b [ " page_number " ] - 1 ] * 1.5 ,
b [ " page_number " ] < b_ [ " page_number " ] and abs (
b [ " x0 " ] - b_ [ " x0 " ] ) > self . mean_width [ b [ " page_number " ] - 1 ] * 4
]
if any ( feats ) and not any ( concatting_feats ) :
i + = 1
continue
# merge up and down
b [ " bottom " ] = b_ [ " bottom " ]
b [ " text " ] + = b_ [ " text " ]
b [ " x0 " ] = min ( b [ " x0 " ] , b_ [ " x0 " ] )
b [ " x1 " ] = max ( b [ " x1 " ] , b_ [ " x1 " ] )
bxs . pop ( i + 1 )
self . boxes = bxs
2024-01-30 18:28:09 +08:00
def _concat_downward ( self , concat_between_pages = True ) :
2024-01-25 18:57:39 +08:00
# count boxes in the same row as a feature
2023-12-14 19:19:03 +08:00
for i in range ( len ( self . boxes ) ) :
mh = self . mean_height [ self . boxes [ i ] [ " page_number " ] - 1 ]
self . boxes [ i ] [ " in_row " ] = 0
j = max ( 0 , i - 12 )
while j < min ( i + 12 , len ( self . boxes ) ) :
if j == i :
j + = 1
continue
ydis = self . _y_dis ( self . boxes [ i ] , self . boxes [ j ] ) / mh
if abs ( ydis ) < 1 :
self . boxes [ i ] [ " in_row " ] + = 1
elif ydis > 0 :
break
j + = 1
# concat between rows
boxes = deepcopy ( self . boxes )
blocks = [ ]
while boxes :
chunks = [ ]
def dfs ( up , dp ) :
chunks . append ( up )
i = dp
while i < min ( dp + 12 , len ( boxes ) ) :
ydis = self . _y_dis ( up , boxes [ i ] )
smpg = up [ " page_number " ] == boxes [ i ] [ " page_number " ]
mh = self . mean_height [ up [ " page_number " ] - 1 ]
mw = self . mean_width [ up [ " page_number " ] - 1 ]
if smpg and ydis > mh * 4 :
break
if not smpg and ydis > mh * 16 :
break
down = boxes [ i ]
2024-01-30 18:28:09 +08:00
if not concat_between_pages and down [ " page_number " ] > up [ " page_number " ] :
break
2023-12-14 19:19:03 +08:00
if up . get ( " R " , " " ) != down . get (
" R " , " " ) and up [ " text " ] [ - 1 ] != " , " :
i + = 1
continue
if re . match ( r " [0-9] { 2,3}/[0-9] {3} $ " , up [ " text " ] ) \
or re . match ( r " [0-9] { 2,3}/[0-9] {3} $ " , down [ " text " ] ) :
i + = 1
continue
2024-01-25 18:57:39 +08:00
if not down [ " text " ] . strip ( ) :
2023-12-14 19:19:03 +08:00
i + = 1
continue
if up [ " x1 " ] < down [ " x0 " ] - 10 * \
mw or up [ " x0 " ] > down [ " x1 " ] + 10 * mw :
i + = 1
continue
if i - dp < 5 and up . get ( " layout_type " ) == " text " :
if up . get ( " layoutno " , " 1 " ) == down . get (
" layoutno " , " 2 " ) :
dfs ( down , i + 1 )
boxes . pop ( i )
return
i + = 1
continue
fea = self . _updown_concat_features ( up , down )
if self . updown_cnt_mdl . predict (
xgb . DMatrix ( [ fea ] ) ) [ 0 ] < = 0.5 :
i + = 1
continue
dfs ( down , i + 1 )
boxes . pop ( i )
return
dfs ( boxes [ 0 ] , 1 )
boxes . pop ( 0 )
if chunks :
blocks . append ( chunks )
# concat within each block
boxes = [ ]
for b in blocks :
if len ( b ) == 1 :
boxes . append ( b [ 0 ] )
continue
t = b [ 0 ]
for c in b [ 1 : ] :
t [ " text " ] = t [ " text " ] . strip ( )
c [ " text " ] = c [ " text " ] . strip ( )
if not c [ " text " ] :
continue
if t [ " text " ] and re . match (
r " [0-9 \ .a-zA-Z]+$ " , t [ " text " ] [ - 1 ] + c [ " text " ] [ - 1 ] ) :
t [ " text " ] + = " "
t [ " text " ] + = c [ " text " ]
t [ " x0 " ] = min ( t [ " x0 " ] , c [ " x0 " ] )
t [ " x1 " ] = max ( t [ " x1 " ] , c [ " x1 " ] )
t [ " page_number " ] = min ( t [ " page_number " ] , c [ " page_number " ] )
t [ " bottom " ] = c [ " bottom " ]
if not t [ " layout_type " ] \
and c [ " layout_type " ] :
t [ " layout_type " ] = c [ " layout_type " ]
boxes . append ( t )
self . boxes = self . sort_Y_firstly ( boxes , 0 )
2024-01-30 18:28:09 +08:00
def _filter_forpages ( self ) :
2023-12-14 19:19:03 +08:00
if not self . boxes :
return
2024-02-02 19:21:37 +08:00
findit = False
2024-01-30 18:28:09 +08:00
i = 0
while i < len ( self . boxes ) :
2024-02-01 18:53:56 +08:00
if not re . match ( r " (contents|目录|目次|table of contents|致谢|acknowledge)$ " , re . sub ( r " ( | | \ u3000)+ " , " " , self . boxes [ i ] [ " text " ] . lower ( ) ) ) :
2024-01-30 18:28:09 +08:00
i + = 1
continue
2024-02-02 19:21:37 +08:00
findit = True
2024-01-30 18:28:09 +08:00
eng = re . match ( r " [0-9a-zA-Z : ' .-] { 5,} " , self . boxes [ i ] [ " text " ] . strip ( ) )
self . boxes . pop ( i )
if i > = len ( self . boxes ) : break
prefix = self . boxes [ i ] [ " text " ] . strip ( ) [ : 3 ] if not eng else " " . join ( self . boxes [ i ] [ " text " ] . strip ( ) . split ( " " ) [ : 2 ] )
while not prefix :
self . boxes . pop ( i )
if i > = len ( self . boxes ) : break
prefix = self . boxes [ i ] [ " text " ] . strip ( ) [ : 3 ] if not eng else " " . join ( self . boxes [ i ] [ " text " ] . strip ( ) . split ( " " ) [ : 2 ] )
self . boxes . pop ( i )
if i > = len ( self . boxes ) or not prefix : break
for j in range ( i , min ( i + 128 , len ( self . boxes ) ) ) :
if not re . match ( prefix , self . boxes [ j ] [ " text " ] ) :
continue
for k in range ( i , j ) : self . boxes . pop ( i )
2023-12-14 19:19:03 +08:00
break
2024-02-02 19:21:37 +08:00
if findit : return
page_dirty = [ 0 ] * len ( self . page_images )
for b in self . boxes :
if re . search ( r " (··|··|··) " , b [ " text " ] ) :
page_dirty [ b [ " page_number " ] - 1 ] + = 1
page_dirty = set ( [ i + 1 for i , t in enumerate ( page_dirty ) if t > 3 ] )
if not page_dirty : return
i = 0
while i < len ( self . boxes ) :
if self . boxes [ i ] [ " page_number " ] in page_dirty :
self . boxes . pop ( i )
continue
i + = 1
2023-12-14 19:19:03 +08:00
2024-02-01 18:53:56 +08:00
def _merge_with_same_bullet ( self ) :
i = 0
while i + 1 < len ( self . boxes ) :
b = self . boxes [ i ]
b_ = self . boxes [ i + 1 ]
2024-02-02 19:21:37 +08:00
if not b [ " text " ] . strip ( ) :
self . boxes . pop ( i )
continue
if not b_ [ " text " ] . strip ( ) :
self . boxes . pop ( i + 1 )
continue
2024-02-01 18:53:56 +08:00
if b [ " text " ] . strip ( ) [ 0 ] != b_ [ " text " ] . strip ( ) [ 0 ] \
or b [ " text " ] . strip ( ) [ 0 ] . lower ( ) in set ( " qwertyuopasdfghjklzxcvbnm " ) \
2024-02-02 19:21:37 +08:00
or huqie . is_chinese ( b [ " text " ] . strip ( ) [ 0 ] ) \
2024-02-01 18:53:56 +08:00
or b [ " top " ] > b_ [ " bottom " ] :
i + = 1
continue
b_ [ " text " ] = b [ " text " ] + " \n " + b_ [ " text " ]
b_ [ " x0 " ] = min ( b [ " x0 " ] , b_ [ " x0 " ] )
b_ [ " x1 " ] = max ( b [ " x1 " ] , b_ [ " x1 " ] )
b_ [ " top " ] = b [ " top " ]
self . boxes . pop ( i )
2023-12-14 19:19:03 +08:00
def _blockType ( self , b ) :
patt = [
( " ^(20|19)[0-9] {2} [年/-][0-9] { 1,2}[月/-][0-9] { 1,2}日*$ " , " Dt " ) ,
( r " ^(20|19)[0-9] {2} 年$ " , " Dt " ) ,
( r " ^(20|19)[0-9] {2} [年-][0-9] { 1,2}月*$ " , " Dt " ) ,
( " ^[0-9] { 1,2}[月-][0-9] { 1,2}日*$ " , " Dt " ) ,
( r " ^第*[一二三四1-4]季度$ " , " Dt " ) ,
( r " ^(20|19)[0-9] {2} 年*[一二三四1-4]季度$ " , " Dt " ) ,
( r " ^(20|19)[0-9] {2} [ABCDE]$ " , " Dt " ) ,
( " ^[0-9.,+ % / -]+$ " , " Nu " ) ,
( r " ^[0-9A-Z/ \ ._~-]+$ " , " Ca " ) ,
( r " ^[A-Z]*[a-z ' -]+$ " , " En " ) ,
( r " ^[0-9.,+-]+[0-9A-Za-z/$¥ % <>( ) () ' -]+$ " , " NE " ) ,
( r " ^. {1} $ " , " Sg " )
]
for p , n in patt :
if re . search ( p , b [ " text " ] . strip ( ) ) :
return n
tks = [ t for t in huqie . qie ( b [ " text " ] ) . split ( " " ) if len ( t ) > 1 ]
if len ( tks ) > 3 :
if len ( tks ) < 12 :
return " Tx "
else :
return " Lx "
if len ( tks ) == 1 and huqie . tag ( tks [ 0 ] ) == " nr " :
return " Nr "
return " Ot "
def __cal_spans ( self , boxes , rows , cols , tbl , html = True ) :
# caculate span
clft = [ np . mean ( [ c . get ( " C_left " , c [ " x0 " ] ) for c in cln ] )
for cln in cols ]
crgt = [ np . mean ( [ c . get ( " C_right " , c [ " x1 " ] ) for c in cln ] )
for cln in cols ]
rtop = [ np . mean ( [ c . get ( " R_top " , c [ " top " ] ) for c in row ] )
for row in rows ]
rbtm = [ np . mean ( [ c . get ( " R_btm " , c [ " bottom " ] )
for c in row ] ) for row in rows ]
for b in boxes :
if " SP " not in b :
continue
b [ " colspan " ] = [ b [ " cn " ] ]
b [ " rowspan " ] = [ b [ " rn " ] ]
# col span
for j in range ( 0 , len ( clft ) ) :
if j == b [ " cn " ] :
continue
if clft [ j ] + ( crgt [ j ] - clft [ j ] ) / 2 < b [ " H_left " ] :
continue
if crgt [ j ] - ( crgt [ j ] - clft [ j ] ) / 2 > b [ " H_right " ] :
continue
b [ " colspan " ] . append ( j )
# row span
for j in range ( 0 , len ( rtop ) ) :
if j == b [ " rn " ] :
continue
if rtop [ j ] + ( rbtm [ j ] - rtop [ j ] ) / 2 < b [ " H_top " ] :
continue
if rbtm [ j ] - ( rbtm [ j ] - rtop [ j ] ) / 2 > b [ " H_bott " ] :
continue
b [ " rowspan " ] . append ( j )
def join ( arr ) :
if not arr :
return " "
return " " . join ( [ t [ " text " ] for t in arr ] )
# rm the spaning cells
for i in range ( len ( tbl ) ) :
for j , arr in enumerate ( tbl [ i ] ) :
if not arr :
continue
if all ( [ " rowspan " not in a and " colspan " not in a for a in arr ] ) :
continue
rowspan , colspan = [ ] , [ ]
for a in arr :
if isinstance ( a . get ( " rowspan " , 0 ) , list ) :
rowspan . extend ( a [ " rowspan " ] )
if isinstance ( a . get ( " colspan " , 0 ) , list ) :
colspan . extend ( a [ " colspan " ] )
rowspan , colspan = set ( rowspan ) , set ( colspan )
if len ( rowspan ) < 2 and len ( colspan ) < 2 :
for a in arr :
if " rowspan " in a :
del a [ " rowspan " ]
if " colspan " in a :
del a [ " colspan " ]
continue
rowspan , colspan = sorted ( rowspan ) , sorted ( colspan )
rowspan = list ( range ( rowspan [ 0 ] , rowspan [ - 1 ] + 1 ) )
colspan = list ( range ( colspan [ 0 ] , colspan [ - 1 ] + 1 ) )
assert i in rowspan , rowspan
assert j in colspan , colspan
arr = [ ]
for r in rowspan :
for c in colspan :
arr_txt = join ( arr )
if tbl [ r ] [ c ] and join ( tbl [ r ] [ c ] ) != arr_txt :
arr . extend ( tbl [ r ] [ c ] )
tbl [ r ] [ c ] = None if html else arr
for a in arr :
if len ( rowspan ) > 1 :
a [ " rowspan " ] = len ( rowspan )
elif " rowspan " in a :
del a [ " rowspan " ]
if len ( colspan ) > 1 :
a [ " colspan " ] = len ( colspan )
elif " colspan " in a :
del a [ " colspan " ]
tbl [ rowspan [ 0 ] ] [ colspan [ 0 ] ] = arr
return tbl
def __construct_table ( self , boxes , html = False ) :
cap = " "
i = 0
while i < len ( boxes ) :
if self . is_caption ( boxes [ i ] ) :
cap + = boxes [ i ] [ " text " ]
boxes . pop ( i )
i - = 1
i + = 1
if not boxes :
return [ ]
for b in boxes :
b [ " btype " ] = self . _blockType ( b )
max_type = Counter ( [ b [ " btype " ] for b in boxes ] ) . items ( )
max_type = max ( max_type , key = lambda x : x [ 1 ] ) [ 0 ] if max_type else " "
logging . debug ( " MAXTYPE: " + max_type )
rowh = [ b [ " R_bott " ] - b [ " R_top " ] for b in boxes if " R " in b ]
rowh = np . min ( rowh ) if rowh else 0
# boxes = self.sort_Y_firstly(boxes, rowh/5)
boxes = self . sort_R_firstly ( boxes , rowh / 2 )
boxes [ 0 ] [ " rn " ] = 0
rows = [ [ boxes [ 0 ] ] ]
btm = boxes [ 0 ] [ " bottom " ]
for b in boxes [ 1 : ] :
b [ " rn " ] = len ( rows ) - 1
lst_r = rows [ - 1 ]
if lst_r [ - 1 ] . get ( " R " , " " ) != b . get ( " R " , " " ) \
or ( b [ " top " ] > = btm - 3 and lst_r [ - 1 ] . get ( " R " , " -1 " ) != b . get ( " R " , " -2 " )
2024-01-30 18:28:09 +08:00
) : # new row
2023-12-14 19:19:03 +08:00
btm = b [ " bottom " ]
b [ " rn " ] + = 1
rows . append ( [ b ] )
continue
btm = ( btm + b [ " bottom " ] ) / 2.
rows [ - 1 ] . append ( b )
colwm = [ b [ " C_right " ] - b [ " C_left " ] for b in boxes if " C " in b ]
colwm = np . min ( colwm ) if colwm else 0
crosspage = len ( set ( [ b [ " page_number " ] for b in boxes ] ) ) > 1
if crosspage :
boxes = self . sort_X_firstly ( boxes , colwm / 2 , False )
else :
boxes = self . sort_C_firstly ( boxes , colwm / 2 )
boxes [ 0 ] [ " cn " ] = 0
cols = [ [ boxes [ 0 ] ] ]
right = boxes [ 0 ] [ " x1 " ]
for b in boxes [ 1 : ] :
b [ " cn " ] = len ( cols ) - 1
lst_c = cols [ - 1 ]
if ( int ( b . get ( " C " , " 1 " ) ) - int ( lst_c [ - 1 ] . get ( " C " , " 1 " ) ) == 1 and b [ " page_number " ] == lst_c [ - 1 ] [
" page_number " ] ) \
or ( b [ " x0 " ] > = right and lst_c [ - 1 ] . get ( " C " , " -1 " ) != b . get ( " C " , " -2 " ) ) : # new col
right = b [ " x1 " ]
b [ " cn " ] + = 1
cols . append ( [ b ] )
continue
right = ( right + b [ " x1 " ] ) / 2.
cols [ - 1 ] . append ( b )
tbl = [ [ [ ] for _ in range ( len ( cols ) ) ] for _ in range ( len ( rows ) ) ]
for b in boxes :
tbl [ b [ " rn " ] ] [ b [ " cn " ] ] . append ( b )
if len ( rows ) > = 4 :
# remove single in column
j = 0
while j < len ( tbl [ 0 ] ) :
e , ii = 0 , 0
for i in range ( len ( tbl ) ) :
if tbl [ i ] [ j ] :
e + = 1
ii = i
if e > 1 :
break
if e > 1 :
j + = 1
continue
f = ( j > 0 and tbl [ ii ] [ j - 1 ] and tbl [ ii ]
2024-01-30 18:28:09 +08:00
[ j - 1 ] [ 0 ] . get ( " text " ) ) or j == 0
2023-12-14 19:19:03 +08:00
ff = ( j + 1 < len ( tbl [ ii ] ) and tbl [ ii ] [ j + 1 ] and tbl [ ii ]
2024-01-30 18:28:09 +08:00
[ j + 1 ] [ 0 ] . get ( " text " ) ) or j + 1 > = len ( tbl [ ii ] )
2023-12-14 19:19:03 +08:00
if f and ff :
j + = 1
continue
bx = tbl [ ii ] [ j ] [ 0 ]
logging . debug ( " Relocate column single: " + bx [ " text " ] )
# j column only has one value
left , right = 100000 , 100000
if j > 0 and not f :
for i in range ( len ( tbl ) ) :
if tbl [ i ] [ j - 1 ] :
left = min ( left , np . min (
[ bx [ " x0 " ] - a [ " x1 " ] for a in tbl [ i ] [ j - 1 ] ] ) )
if j + 1 < len ( tbl [ 0 ] ) and not ff :
for i in range ( len ( tbl ) ) :
if tbl [ i ] [ j + 1 ] :
right = min ( right , np . min (
[ a [ " x0 " ] - bx [ " x1 " ] for a in tbl [ i ] [ j + 1 ] ] ) )
assert left < 100000 or right < 100000
if left < right :
for jj in range ( j , len ( tbl [ 0 ] ) ) :
for i in range ( len ( tbl ) ) :
for a in tbl [ i ] [ jj ] :
a [ " cn " ] - = 1
if tbl [ ii ] [ j - 1 ] :
tbl [ ii ] [ j - 1 ] . extend ( tbl [ ii ] [ j ] )
else :
tbl [ ii ] [ j - 1 ] = tbl [ ii ] [ j ]
for i in range ( len ( tbl ) ) :
tbl [ i ] . pop ( j )
else :
for jj in range ( j + 1 , len ( tbl [ 0 ] ) ) :
for i in range ( len ( tbl ) ) :
for a in tbl [ i ] [ jj ] :
a [ " cn " ] - = 1
if tbl [ ii ] [ j + 1 ] :
tbl [ ii ] [ j + 1 ] . extend ( tbl [ ii ] [ j ] )
else :
tbl [ ii ] [ j + 1 ] = tbl [ ii ] [ j ]
for i in range ( len ( tbl ) ) :
tbl [ i ] . pop ( j )
cols . pop ( j )
assert len ( cols ) == len ( tbl [ 0 ] ) , " Column NO. miss matched: %d vs %d " % (
len ( cols ) , len ( tbl [ 0 ] ) )
if len ( cols ) > = 4 :
# remove single in row
i = 0
while i < len ( tbl ) :
e , jj = 0 , 0
for j in range ( len ( tbl [ i ] ) ) :
if tbl [ i ] [ j ] :
e + = 1
jj = j
if e > 1 :
break
if e > 1 :
i + = 1
continue
f = ( i > 0 and tbl [ i - 1 ] [ jj ] and tbl [ i - 1 ]
2024-01-30 18:28:09 +08:00
[ jj ] [ 0 ] . get ( " text " ) ) or i == 0
2023-12-14 19:19:03 +08:00
ff = ( i + 1 < len ( tbl ) and tbl [ i + 1 ] [ jj ] and tbl [ i + 1 ]
2024-01-30 18:28:09 +08:00
[ jj ] [ 0 ] . get ( " text " ) ) or i + 1 > = len ( tbl )
2023-12-14 19:19:03 +08:00
if f and ff :
i + = 1
continue
bx = tbl [ i ] [ jj ] [ 0 ]
logging . debug ( " Relocate row single: " + bx [ " text " ] )
# i row only has one value
up , down = 100000 , 100000
if i > 0 and not f :
for j in range ( len ( tbl [ i - 1 ] ) ) :
if tbl [ i - 1 ] [ j ] :
up = min ( up , np . min (
[ bx [ " top " ] - a [ " bottom " ] for a in tbl [ i - 1 ] [ j ] ] ) )
if i + 1 < len ( tbl ) and not ff :
for j in range ( len ( tbl [ i + 1 ] ) ) :
if tbl [ i + 1 ] [ j ] :
down = min ( down , np . min (
[ a [ " top " ] - bx [ " bottom " ] for a in tbl [ i + 1 ] [ j ] ] ) )
assert up < 100000 or down < 100000
if up < down :
for ii in range ( i , len ( tbl ) ) :
for j in range ( len ( tbl [ ii ] ) ) :
for a in tbl [ ii ] [ j ] :
a [ " rn " ] - = 1
if tbl [ i - 1 ] [ jj ] :
tbl [ i - 1 ] [ jj ] . extend ( tbl [ i ] [ jj ] )
else :
tbl [ i - 1 ] [ jj ] = tbl [ i ] [ jj ]
tbl . pop ( i )
else :
for ii in range ( i + 1 , len ( tbl ) ) :
for j in range ( len ( tbl [ ii ] ) ) :
for a in tbl [ ii ] [ j ] :
a [ " rn " ] - = 1
if tbl [ i + 1 ] [ jj ] :
tbl [ i + 1 ] [ jj ] . extend ( tbl [ i ] [ jj ] )
else :
tbl [ i + 1 ] [ jj ] = tbl [ i ] [ jj ]
tbl . pop ( i )
rows . pop ( i )
# which rows are headers
hdset = set ( [ ] )
for i in range ( len ( tbl ) ) :
cnt , h = 0 , 0
for j , arr in enumerate ( tbl [ i ] ) :
if not arr :
continue
cnt + = 1
if max_type == " Nu " and arr [ 0 ] [ " btype " ] == " Nu " :
continue
if any ( [ a . get ( " H " ) for a in arr ] ) \
or ( max_type == " Nu " and arr [ 0 ] [ " btype " ] != " Nu " ) :
h + = 1
if h / cnt > 0.5 :
hdset . add ( i )
if html :
return [ self . __html_table ( cap , hdset ,
self . __cal_spans ( boxes , rows ,
cols , tbl , True )
) ]
return self . __desc_table ( cap , hdset ,
self . __cal_spans ( boxes , rows , cols , tbl , False ) )
def __html_table ( self , cap , hdset , tbl ) :
# constrcut HTML
html = " <table> "
if cap :
html + = f " <caption> { cap } </caption> "
for i in range ( len ( tbl ) ) :
row = " <tr> "
txts = [ ]
for j , arr in enumerate ( tbl [ i ] ) :
if arr is None :
continue
if not arr :
row + = " <td></td> " if i not in hdset else " <th></th> "
continue
txt = " "
if arr :
h = min ( np . min ( [ c [ " bottom " ] - c [ " top " ] for c in arr ] ) / 2 ,
self . mean_height [ arr [ 0 ] [ " page_number " ] - 1 ] / 2 )
txt = " " . join ( [ c [ " text " ]
for c in self . sort_Y_firstly ( arr , h ) ] )
txts . append ( txt )
sp = " "
if arr [ 0 ] . get ( " colspan " ) :
sp = " colspan= {} " . format ( arr [ 0 ] [ " colspan " ] )
if arr [ 0 ] . get ( " rowspan " ) :
sp + = " rowspan= {} " . format ( arr [ 0 ] [ " rowspan " ] )
if i in hdset :
row + = f " <th { sp } > " + txt + " </th> "
else :
row + = f " <td { sp } > " + txt + " </td> "
if i in hdset :
if all ( [ t in hdset for t in txts ] ) :
continue
for t in txts :
hdset . add ( t )
if row != " <tr> " :
row + = " </tr> "
else :
row = " "
html + = " \n " + row
html + = " \n </table> "
return html
def __desc_table ( self , cap , hdr_rowno , tbl ) :
# get text of every colomn in header row to become header text
clmno = len ( tbl [ 0 ] )
rowno = len ( tbl )
headers = { }
hdrset = set ( )
lst_hdr = [ ]
2024-01-30 18:28:09 +08:00
de = " 的 " if not self . is_english else " for "
2023-12-14 19:19:03 +08:00
for r in sorted ( list ( hdr_rowno ) ) :
headers [ r ] = [ " " for _ in range ( clmno ) ]
for i in range ( clmno ) :
if not tbl [ r ] [ i ] :
continue
txt = " " . join ( [ a [ " text " ] . strip ( ) for a in tbl [ r ] [ i ] ] )
headers [ r ] [ i ] = txt
hdrset . add ( txt )
if all ( [ not t for t in headers [ r ] ] ) :
del headers [ r ]
hdr_rowno . remove ( r )
continue
for j in range ( clmno ) :
if headers [ r ] [ j ] :
continue
if j > = len ( lst_hdr ) :
break
headers [ r ] [ j ] = lst_hdr [ j ]
lst_hdr = headers [ r ]
for i in range ( rowno ) :
if i not in hdr_rowno :
continue
for j in range ( i + 1 , rowno ) :
if j not in hdr_rowno :
break
for k in range ( clmno ) :
if not headers [ j - 1 ] [ k ] :
continue
if headers [ j ] [ k ] . find ( headers [ j - 1 ] [ k ] ) > = 0 :
continue
if len ( headers [ j ] [ k ] ) > len ( headers [ j - 1 ] [ k ] ) :
2024-01-30 18:28:09 +08:00
headers [ j ] [ k ] + = ( de if headers [ j ] [ k ]
2023-12-14 19:19:03 +08:00
else " " ) + headers [ j - 1 ] [ k ]
else :
headers [ j ] [ k ] = headers [ j - 1 ] [ k ] \
2024-01-30 18:28:09 +08:00
+ ( de if headers [ j - 1 ] [ k ] else " " ) \
+ headers [ j ] [ k ]
2023-12-14 19:19:03 +08:00
logging . debug (
f " >>>>>>>>>>>>>>>>> { cap } : SIZE:{ rowno } X { clmno } Header: { hdr_rowno } " )
row_txt = [ ]
for i in range ( rowno ) :
if i in hdr_rowno :
continue
rtxt = [ ]
def append ( delimer ) :
nonlocal rtxt , row_txt
rtxt = delimer . join ( rtxt )
if row_txt and len ( row_txt [ - 1 ] ) + len ( rtxt ) < 64 :
row_txt [ - 1 ] + = " \n " + rtxt
else :
row_txt . append ( rtxt )
r = 0
if len ( headers . items ( ) ) :
_arr = [ ( i - r , r ) for r , _ in headers . items ( ) if r < i ]
if _arr :
_ , r = min ( _arr , key = lambda x : x [ 0 ] )
if r not in headers and clmno < = 2 :
for j in range ( clmno ) :
if not tbl [ i ] [ j ] :
continue
txt = " " . join ( [ a [ " text " ] . strip ( ) for a in tbl [ i ] [ j ] ] )
if txt :
rtxt . append ( txt )
if rtxt :
append ( " : " )
continue
for j in range ( clmno ) :
if not tbl [ i ] [ j ] :
continue
txt = " " . join ( [ a [ " text " ] . strip ( ) for a in tbl [ i ] [ j ] ] )
if not txt :
continue
ctt = headers [ r ] [ j ] if r in headers else " "
if ctt :
ctt + = " : "
ctt + = txt
if ctt :
rtxt . append ( ctt )
if rtxt :
row_txt . append ( " ; " . join ( rtxt ) )
if cap :
2024-01-30 18:28:09 +08:00
if self . is_english :
from_ = " in "
else :
from_ = " 来自 "
row_txt = [ t + f " \t —— { from_ } “ { cap } ” " for t in row_txt ]
2023-12-14 19:19:03 +08:00
return row_txt
@staticmethod
def is_caption ( bx ) :
patt = [
r " [图表]+[ 0-9:: ] { 2,} "
]
if any ( [ re . match ( p , bx [ " text " ] . strip ( ) ) for p in patt ] ) \
or bx [ " layout_type " ] . find ( " caption " ) > = 0 :
return True
return False
2024-01-30 18:28:09 +08:00
def _extract_table_figure ( self , need_image , ZM , return_html ) :
2023-12-14 19:19:03 +08:00
tables = { }
figures = { }
# extract figure and table boxes
i = 0
lst_lout_no = " "
nomerge_lout_no = [ ]
while i < len ( self . boxes ) :
if " layoutno " not in self . boxes [ i ] :
i + = 1
continue
lout_no = str ( self . boxes [ i ] [ " page_number " ] ) + \
2024-01-30 18:28:09 +08:00
" - " + str ( self . boxes [ i ] [ " layoutno " ] )
2023-12-14 19:19:03 +08:00
if self . is_caption ( self . boxes [ i ] ) or self . boxes [ i ] [ " layout_type " ] in [ " table caption " , " title " ,
" figure caption " , " reference " ] :
nomerge_lout_no . append ( lst_lout_no )
if self . boxes [ i ] [ " layout_type " ] == " table " :
if re . match ( r " (数据|资料|图表)*来源[:: ] " , self . boxes [ i ] [ " text " ] ) :
self . boxes . pop ( i )
continue
if lout_no not in tables :
tables [ lout_no ] = [ ]
tables [ lout_no ] . append ( self . boxes [ i ] )
self . boxes . pop ( i )
lst_lout_no = lout_no
continue
if need_image and self . boxes [ i ] [ " layout_type " ] == " figure " :
if re . match ( r " (数据|资料|图表)*来源[:: ] " , self . boxes [ i ] [ " text " ] ) :
self . boxes . pop ( i )
continue
if lout_no not in figures :
figures [ lout_no ] = [ ]
figures [ lout_no ] . append ( self . boxes [ i ] )
self . boxes . pop ( i )
lst_lout_no = lout_no
continue
i + = 1
# merge table on different pages
nomerge_lout_no = set ( nomerge_lout_no )
tbls = sorted ( [ ( k , bxs ) for k , bxs in tables . items ( ) ] ,
key = lambda x : ( x [ 1 ] [ 0 ] [ " top " ] , x [ 1 ] [ 0 ] [ " x0 " ] ) )
i = len ( tbls ) - 1
while i - 1 > = 0 :
k0 , bxs0 = tbls [ i - 1 ]
k , bxs = tbls [ i ]
i - = 1
if k0 in nomerge_lout_no :
continue
if bxs [ 0 ] [ " page_number " ] == bxs0 [ 0 ] [ " page_number " ] :
continue
if bxs [ 0 ] [ " page_number " ] - bxs0 [ 0 ] [ " page_number " ] > 1 :
continue
mh = self . mean_height [ bxs [ 0 ] [ " page_number " ] - 1 ]
if self . _y_dis ( bxs0 [ - 1 ] , bxs [ 0 ] ) > mh * 23 :
continue
tables [ k0 ] . extend ( tables [ k ] )
del tables [ k ]
def x_overlapped ( a , b ) :
return not any ( [ a [ " x1 " ] < b [ " x0 " ] , a [ " x0 " ] > b [ " x1 " ] ] )
# find captions and pop out
i = 0
while i < len ( self . boxes ) :
c = self . boxes [ i ]
# mh = self.mean_height[c["page_number"]-1]
if not self . is_caption ( c ) :
i + = 1
continue
# find the nearest layouts
def nearest ( tbls ) :
nonlocal c
mink = " "
minv = 1000000000
for k , bxs in tbls . items ( ) :
for b in bxs [ : 10 ] :
if b . get ( " layout_type " , " " ) . find ( " caption " ) > = 0 :
continue
y_dis = self . _y_dis ( c , b )
x_dis = self . _x_dis (
c , b ) if not x_overlapped (
c , b ) else 0
dis = y_dis * y_dis + x_dis * x_dis
if dis < minv :
mink = k
minv = dis
return mink , minv
tk , tv = nearest ( tables )
fk , fv = nearest ( figures )
if min ( tv , fv ) > 2000 :
i + = 1
continue
if tv < fv :
tables [ tk ] . insert ( 0 , c )
logging . debug (
" TABLE: " +
self . boxes [ i ] [ " text " ] +
" ; Cap: " +
tk )
else :
figures [ fk ] . insert ( 0 , c )
logging . debug (
" FIGURE: " +
self . boxes [ i ] [ " text " ] +
" ; Cap: " +
tk )
self . boxes . pop ( i )
res = [ ]
def cropout ( bxs , ltype ) :
nonlocal ZM
pn = set ( [ b [ " page_number " ] - 1 for b in bxs ] )
if len ( pn ) < 2 :
pn = list ( pn ) [ 0 ]
ht = self . page_cum_height [ pn ]
b = {
" x0 " : np . min ( [ b [ " x0 " ] for b in bxs ] ) ,
" top " : np . min ( [ b [ " top " ] for b in bxs ] ) - ht ,
" x1 " : np . max ( [ b [ " x1 " ] for b in bxs ] ) ,
" bottom " : np . max ( [ b [ " bottom " ] for b in bxs ] ) - ht
}
louts = [ l for l in self . page_layout [ pn ] if l [ " type " ] == ltype ]
ii = self . __find_overlapped ( b , louts , naive = True )
if ii is not None :
b = louts [ ii ]
else :
logging . warn (
f " Missing layout match: { pn + 1 } ,%s " %
( bxs [ 0 ] . get (
" layoutno " , " " ) ) )
left , top , right , bott = b [ " x0 " ] , b [ " top " ] , b [ " x1 " ] , b [ " bottom " ]
return self . page_images [ pn ] \
. crop ( ( left * ZM , top * ZM ,
right * ZM , bott * ZM ) )
pn = { }
for b in bxs :
p = b [ " page_number " ] - 1
if p not in pn :
pn [ p ] = [ ]
pn [ p ] . append ( b )
pn = sorted ( pn . items ( ) , key = lambda x : x [ 0 ] )
imgs = [ cropout ( arr , ltype ) for p , arr in pn ]
pic = Image . new ( " RGB " ,
( int ( np . max ( [ i . size [ 0 ] for i in imgs ] ) ) ,
int ( np . sum ( [ m . size [ 1 ] for m in imgs ] ) ) ) ,
( 245 , 245 , 245 ) )
height = 0
for img in imgs :
pic . paste ( img , ( 0 , int ( height ) ) )
height + = img . size [ 1 ]
return pic
# crop figure out and add caption
for k , bxs in figures . items ( ) :
txt = " \n " . join (
[ b [ " text " ] for b in bxs
if not re . match ( r " [0-9a-z. \ + % -] " , b [ " text " ] . strip ( ) )
and len ( b [ " text " ] . strip ( ) ) > = 4
]
)
if not txt :
continue
res . append (
( cropout (
bxs ,
" figure " ) ,
[ txt ] if not return_html else [ f " <p> { txt } </p> " ] ) )
for k , bxs in tables . items ( ) :
if not bxs :
continue
res . append ( ( cropout ( bxs , " table " ) ,
self . __construct_table ( bxs , html = return_html ) ) )
return res
def proj_match ( self , line ) :
if len ( line ) < = 2 :
return
if re . match ( r " [0-9 ()., %% +/-]+$ " , line ) :
return False
for p , j in [
( r " 第[零一二三四五六七八九十百]+章 " , 1 ) ,
( r " 第[零一二三四五六七八九十百]+[条节] " , 2 ) ,
( r " [零一二三四五六七八九十百]+[、 ] " , 3 ) ,
( r " [ \ (( ][零一二三四五六七八九十百]+[) \ )] " , 4 ) ,
( r " [0-9]+(、| \ .[ ]| \ .[^0-9]) " , 5 ) ,
( r " [0-9]+ \ .[0-9]+(、|[. ]|[^0-9]) " , 6 ) ,
( r " [0-9]+ \ .[0-9]+ \ .[0-9]+(、|[ ]|[^0-9]) " , 7 ) ,
( r " [0-9]+ \ .[0-9]+ \ .[0-9]+ \ .[0-9]+(、|[ ]|[^0-9]) " , 8 ) ,
( r " . { ,48}[: :?? ]$ " , 9 ) ,
( r " [0-9]+) " , 10 ) ,
( r " [ \ (( ][0-9]+[) \ )] " , 11 ) ,
( r " [零一二三四五六七八九十百]+是 " , 12 ) ,
( r " [⚫•➢✓] " , 12 )
] :
if re . match ( p , line ) :
return j
return
2024-01-25 18:57:39 +08:00
def _line_tag ( self , bx , ZM ) :
pn = [ bx [ " page_number " ] ]
top = bx [ " top " ] - self . page_cum_height [ pn [ 0 ] - 1 ]
bott = bx [ " bottom " ] - self . page_cum_height [ pn [ 0 ] - 1 ]
while bott * ZM > self . page_images [ pn [ - 1 ] - 1 ] . size [ 1 ] :
bott - = self . page_images [ pn [ - 1 ] - 1 ] . size [ 1 ] / ZM
pn . append ( pn [ - 1 ] + 1 )
return " @@ {} \t {:.1f} \t {:.1f} \t {:.1f} \t {:.1f} ## " \
. format ( " - " . join ( [ str ( p ) for p in pn ] ) ,
bx [ " x0 " ] , bx [ " x1 " ] , top , bott )
2023-12-14 19:19:03 +08:00
def __filterout_scraps ( self , boxes , ZM ) :
def width ( b ) :
return b [ " x1 " ] - b [ " x0 " ]
def height ( b ) :
return b [ " bottom " ] - b [ " top " ]
def usefull ( b ) :
if b . get ( " layout_type " ) :
return True
if width (
b ) > self . page_images [ b [ " page_number " ] - 1 ] . size [ 0 ] / ZM / 3 :
return True
if b [ " bottom " ] - b [ " top " ] > self . mean_height [ b [ " page_number " ] - 1 ] :
return True
return False
res = [ ]
while boxes :
lines = [ ]
widths = [ ]
pw = self . page_images [ boxes [ 0 ] [ " page_number " ] - 1 ] . size [ 0 ] / ZM
mh = self . mean_height [ boxes [ 0 ] [ " page_number " ] - 1 ]
mj = self . proj_match (
boxes [ 0 ] [ " text " ] ) or boxes [ 0 ] . get (
" layout_type " ,
" " ) == " title "
def dfs ( line , st ) :
nonlocal mh , pw , lines , widths
lines . append ( line )
widths . append ( width ( line ) )
width_mean = np . mean ( widths )
mmj = self . proj_match (
line [ " text " ] ) or line . get (
" layout_type " ,
" " ) == " title "
for i in range ( st + 1 , min ( st + 20 , len ( boxes ) ) ) :
if ( boxes [ i ] [ " page_number " ] - line [ " page_number " ] ) > 0 :
break
if not mmj and self . _y_dis (
line , boxes [ i ] ) > = 3 * mh and height ( line ) < 1.5 * mh :
break
if not usefull ( boxes [ i ] ) :
continue
if mmj or \
( self . _x_dis ( boxes [ i ] , line ) < pw / 10 ) : \
# and abs(width(boxes[i])-width_mean)/max(width(boxes[i]),width_mean)<0.5):
# concat following
dfs ( boxes [ i ] , i )
boxes . pop ( i )
break
try :
if usefull ( boxes [ 0 ] ) :
dfs ( boxes [ 0 ] , 0 )
else :
logging . debug ( " WASTE: " + boxes [ 0 ] [ " text " ] )
except Exception as e :
pass
boxes . pop ( 0 )
mw = np . mean ( widths )
if mj or mw / pw > = 0.35 or mw > 200 :
2024-01-25 18:57:39 +08:00
res . append ( " \n " . join ( [ c [ " text " ] + self . _line_tag ( c , ZM ) for c in lines ] ) )
2023-12-14 19:19:03 +08:00
else :
logging . debug ( " REMOVED: " +
" << " . join ( [ c [ " text " ] for c in lines ] ) )
return " \n \n " . join ( res )
2024-01-31 19:57:45 +08:00
@staticmethod
def total_page_number ( fnm , binary = None ) :
try :
pdf = pdfplumber . open ( fnm ) if not binary else pdfplumber . open ( BytesIO ( binary ) )
return len ( pdf . pages )
except Exception as e :
pdf = fitz . open ( fnm ) if not binary else fitz . open ( stream = fnm , filetype = " pdf " )
return len ( pdf )
2024-01-25 18:57:39 +08:00
def __images__ ( self , fnm , zoomin = 3 , page_from = 0 , page_to = 299 ) :
2023-12-14 19:19:03 +08:00
self . lefted_chars = [ ]
self . mean_height = [ ]
self . mean_width = [ ]
self . boxes = [ ]
self . garbages = { }
self . page_cum_height = [ 0 ]
self . page_layout = [ ]
2024-01-19 19:51:57 +08:00
try :
self . pdf = pdfplumber . open ( fnm ) if isinstance ( fnm , str ) else pdfplumber . open ( BytesIO ( fnm ) )
2024-01-25 18:57:39 +08:00
self . page_images = [ p . to_image ( resolution = 72 * zoomin ) . annotated for i , p in
enumerate ( self . pdf . pages [ page_from : page_to ] ) ]
2024-02-02 19:21:37 +08:00
self . page_chars = [ [ c for c in page . chars if self . _has_color ( c ) ] for page in self . pdf . pages [ page_from : page_to ] ]
2024-01-25 18:57:39 +08:00
self . total_page = len ( self . pdf . pages )
2024-01-19 19:51:57 +08:00
except Exception as e :
self . pdf = fitz . open ( fnm ) if isinstance ( fnm , str ) else fitz . open ( stream = fnm , filetype = " pdf " )
self . page_images = [ ]
self . page_chars = [ ]
mat = fitz . Matrix ( zoomin , zoomin )
2024-01-25 18:57:39 +08:00
self . total_page = len ( self . pdf )
2024-02-02 19:21:37 +08:00
for i , page in enumerate ( self . pdf ) :
if i < page_from : continue
if i > = page_to : break
pix = page . get_pixmap ( matrix = mat )
2024-01-19 19:51:57 +08:00
img = Image . frombytes ( " RGB " , [ pix . width , pix . height ] ,
pix . samples )
self . page_images . append ( img )
self . page_chars . append ( [ ] )
2023-12-14 19:19:03 +08:00
logging . info ( " Images converted. " )
2024-02-02 19:21:37 +08:00
self . is_english = [ re . search ( r " [a-zA-Z0-9,/¸ ;: ' \ [ \ ] \ ( \ )!@#$ % ^&* \" ?<>._-] { 30,} " , " " . join ( random . choices ( [ c [ " text " ] for c in self . page_chars [ i ] ] , k = min ( 100 , len ( self . page_chars [ i ] ) ) ) ) ) for i in range ( len ( self . page_chars ) ) ]
2024-01-30 18:28:09 +08:00
if sum ( [ 1 if e else 0 for e in self . is_english ] ) > len ( self . page_images ) / 2 :
self . is_english = True
else :
self . is_english = False
2023-12-14 19:19:03 +08:00
for i , img in enumerate ( self . page_images ) :
2024-01-30 18:28:09 +08:00
chars = self . page_chars [ i ] if not self . is_english else [ ]
2023-12-14 19:19:03 +08:00
self . mean_height . append (
np . median ( sorted ( [ c [ " height " ] for c in chars ] ) ) if chars else 0
)
self . mean_width . append (
np . median ( sorted ( [ c [ " width " ] for c in chars ] ) ) if chars else 8
)
2024-01-25 18:57:39 +08:00
self . page_cum_height . append ( img . size [ 1 ] / zoomin )
2024-01-30 18:28:09 +08:00
j = 0
while j + 1 < len ( chars ) :
if chars [ j ] [ " text " ] and chars [ j + 1 ] [ " text " ] \
and re . match ( r " [0-9a-zA-Z,.:;! % ]+ " , chars [ j ] [ " text " ] + chars [ j + 1 ] [ " text " ] ) \
and chars [ j + 1 ] [ " x0 " ] - chars [ j ] [ " x1 " ] > = min ( chars [ j + 1 ] [ " width " ] ,
chars [ j ] [ " width " ] ) / 2 :
chars [ j ] [ " text " ] + = " "
j + = 1
2024-01-25 18:57:39 +08:00
# if i > 0:
# if not chars:
# self.page_cum_height.append(img.size[1] / zoomin)
# else:
# self.page_cum_height.append(
# np.max([c["bottom"] for c in chars]))
2024-02-21 16:32:38 +08:00
self . __ocr ( i + 1 , img , chars , zoomin )
2023-12-14 19:19:03 +08:00
2024-02-02 19:21:37 +08:00
if not self . is_english and not any ( [ c for c in self . page_chars ] ) and self . boxes :
2024-02-05 18:08:17 +08:00
bxes = [ b for bxs in self . boxes for b in bxs ]
self . is_english = re . search ( r " [ \ na-zA-Z0-9,/¸ ;: ' \ [ \ ] \ ( \ )!@#$ % ^&* \" ?<>._-] { 30,} " , " " . join ( [ b [ " text " ] for b in random . choices ( bxes , k = min ( 30 , len ( bxes ) ) ) ] ) )
2024-01-30 18:28:09 +08:00
logging . info ( " Is it English: " , self . is_english )
2023-12-14 19:19:03 +08:00
self . page_cum_height = np . cumsum ( self . page_cum_height )
2024-01-30 18:28:09 +08:00
assert len ( self . page_cum_height ) == len ( self . page_images ) + 1
2023-12-14 19:19:03 +08:00
2024-01-25 18:57:39 +08:00
def __call__ ( self , fnm , need_image = True , zoomin = 3 , return_html = False ) :
self . __images__ ( fnm , zoomin )
2024-02-21 16:32:38 +08:00
self . _layouts_rec ( zoomin )
2024-01-25 18:57:39 +08:00
self . _table_transformer_job ( zoomin )
self . _text_merge ( )
self . _concat_downward ( )
2024-01-30 18:28:09 +08:00
self . _filter_forpages ( )
tbls = self . _extract_table_figure ( need_image , zoomin , return_html )
2023-12-14 19:19:03 +08:00
return self . __filterout_scraps ( deepcopy ( self . boxes ) , zoomin ) , tbls
def remove_tag ( self , txt ) :
return re . sub ( r " @@[ \ t0-9.-]+?## " , " " , txt )
def crop ( self , text , ZM = 3 ) :
imgs = [ ]
for tag in re . findall ( r " @@[0-9-]+ \ t[0-9. \ t]+## " , text ) :
pn , left , right , top , bottom = tag . strip (
" # " ) . strip ( " @ " ) . split ( " \t " )
left , right , top , bottom = float ( left ) , float (
right ) , float ( top ) , float ( bottom )
bottom * = ZM
pns = [ int ( p ) - 1 for p in pn . split ( " - " ) ]
for pn in pns [ 1 : ] :
bottom + = self . page_images [ pn - 1 ] . size [ 1 ]
imgs . append (
self . page_images [ pns [ 0 ] ] . crop ( ( left * ZM , top * ZM ,
right *
ZM , min (
2024-01-30 18:28:09 +08:00
bottom , self . page_images [ pns [ 0 ] ] . size [ 1 ] )
2023-12-14 19:19:03 +08:00
) )
)
bottom - = self . page_images [ pns [ 0 ] ] . size [ 1 ]
for pn in pns [ 1 : ] :
imgs . append (
self . page_images [ pn ] . crop ( ( left * ZM , 0 ,
right * ZM ,
min ( bottom ,
self . page_images [ pn ] . size [ 1 ] )
) )
)
bottom - = self . page_images [ pn ] . size [ 1 ]
if not imgs :
return
GAP = 2
height = 0
for img in imgs :
height + = img . size [ 1 ] + GAP
height = int ( height )
pic = Image . new ( " RGB " ,
( int ( np . max ( [ i . size [ 0 ] for i in imgs ] ) ) , height ) ,
( 245 , 245 , 245 ) )
height = 0
for img in imgs :
pic . paste ( img , ( 0 , int ( height ) ) )
height + = img . size [ 1 ] + GAP
return pic
if __name__ == " __main__ " :
pass