#!/usr/bin/env python3 from contextlib import suppress from tempfile import NamedTemporaryFile import sys import os import fileinput import re import shutil import warnings import multiprocessing import PyPDF2 as pypdf from parse import parse from subprocess import Popen, check_call, PIPE, CalledProcessError, \ TimeoutExpired try: from subprocess import DEVNULL except ImportError: DEVNULL = open(os.devnull, 'wb') from ruffus import transform, suffix, merge, active_if, regex, jobs_limit, \ mkdir, formatter, follows, subdivide import ruffus.cmdline as cmdline from .hocrtransform import HocrTransform from .pageinfo import pdf_get_all_pageinfo from .pdfa import generate_pdfa_def warnings.simplefilter('ignore', pypdf.utils.PdfReadWarning) basedir = os.path.dirname(os.path.realpath(__file__)) parser = cmdline.get_argparse( prog="OCRmyPDF", description="Generate searchable PDF file from an image-only PDF file.") parser.add_argument( 'input_file', help="PDF file containing the images to be OCRed") parser.add_argument( 'output_file', help="output searchable PDF file") parser.add_argument( '-l', '--language', nargs='*', default=['eng'], help="language of the file to be OCRed") preprocessing = parser.add_argument_group( "Preprocessing options", "Improve OCR quality and final image") preprocessing.add_argument( '-d', '--deskew', action='store_true', help="deskew each page before performing OCR") preprocessing.add_argument( '-c', '--clean', action='store_true', help="clean pages with unpaper before performing OCR") preprocessing.add_argument( '-i', '--clean-final', action='store_true', help="incorporate the cleaned image in the final PDF file") preprocessing.add_argument( '--oversample', metavar='DPI', type=int, help="oversample images to improve OCR results slightly") parser.add_argument( '--force-ocr', action='store_true', help="Force to OCR, even if the page already contains fonts") parser.add_argument( '--skip-text', action='store_true', help="Skip OCR on pages that contain fonts and include the page anyway") parser.add_argument( '--skip-big', action='store_true', help="Skip OCR for pages that are very large") parser.add_argument( '--exact-image', action='store_true', help="Use original page from PDF without re-rendering") advanced = parser.add_argument_group( "Advanced", "Advanced options for power users and debugging") advanced.add_argument( '--deskew-provider', choices=['imagemagick', 'leptonica'], default='leptonica') advanced.add_argument( '--page-renderer', choices=['pdftoppm', 'ghostscript'], default='ghostscript') advanced.add_argument( '--temp-folder', default='', type=str, help="folder where the temporary files should be placed") advanced.add_argument( '--tesseract-config', default='', nargs='*', # Implemented help="Tesseract configuration") debugging = parser.add_argument_group( "Debugging", "Arguments to help with troubleshooting and debugging") debugging.add_argument( '-k', '--keep-temporary-files', action='store_true', help="keep temporary files (helpful for debugging)") debugging.add_argument( '-g', '--debug-rendering', action='store_true', help="render each page twice with debug information on second page") options = parser.parse_args() # ---------- # Logging _logger, _logger_mutex = cmdline.setup_logging(__name__, options.log_file, options.verbose) class WrappedLogger: def __init__(self, my_logger, my_mutex): self.logger = my_logger self.mutex = my_mutex def log(self, *args, **kwargs): with self.mutex: self.logger.log(*args, **kwargs) def debug(self, *args, **kwargs): with self.mutex: self.logger.debug(*args, **kwargs) def info(self, *args, **kwargs): with self.mutex: self.logger.info(*args, **kwargs) def warning(self, *args, **kwargs): with self.mutex: self.logger.warning(*args, **kwargs) def error(self, *args, **kwargs): with self.mutex: self.logger.error(*args, **kwargs) def critical(self, *args, **kwargs): with self.mutex: self.logger.critical(*args, **kwargs) _log = WrappedLogger(_logger, _logger_mutex) def re_symlink(input_file, soft_link_name, log=_log): """ Helper function: relinks soft symbolic link if necessary """ # Guard against soft linking to oneself if input_file == soft_link_name: log.debug("Warning: No symbolic link made. You are using " + "the original data directory as the working directory.") return # Soft link already exists: delete for relink? if os.path.lexists(soft_link_name): # do not delete or overwrite real (non-soft link) file if not os.path.islink(soft_link_name): raise Exception("%s exists and is not a link" % soft_link_name) try: os.unlink(soft_link_name) except: log.debug("Can't unlink %s" % (soft_link_name)) if not os.path.exists(input_file): raise Exception("trying to create a broken symlink to %s" % input_file) log.debug("os.symlink(%s, %s)" % (input_file, soft_link_name)) # Create symbolic link using absolute path os.symlink( os.path.abspath(input_file), soft_link_name ) # ------------- # The Pipeline manager = multiprocessing.Manager() _pdfinfo = manager.list() _pdfinfo_lock = manager.Lock() if options.temp_folder == '': options.temp_folder = 'tmp' @follows(mkdir(options.temp_folder)) @transform( input=options.input_file, filter=suffix('.pdf'), output='.cleaned.pdf', output_dir=options.temp_folder, extras=[_log, _pdfinfo, _pdfinfo_lock]) def clean_pdf( input_file, output_file, log, pdfinfo, pdfinfo_lock): args_mutool = [ 'mutool', 'clean', input_file, output_file ] check_call(args_mutool) with pdfinfo_lock: pdfinfo.extend(pdf_get_all_pageinfo(output_file)) log.info(pdfinfo) # pageno, width_pt, height_pt = map(int, options.page_info.split(' ', 3)) # pageinfo = pdf_get_pageinfo(options.input_file, pageno, width_pt, height_pt) # if not pageinfo['images']: # # If the page has no images, then it contains vector content or text # # or both. It seems quite unlikely that one would find meaningful text # # from rasterizing vector content. So skip the page. # log.info( # "Page {0} has no images - skipping OCR".format(pageno) # ) # elif pageinfo['has_text']: # s = "Page {0} already has text! – {1}" # if not options.force_ocr and not options.skip_text: # log.error(s.format(pageno, # "aborting (use -f or -s to force OCR)")) # sys.exit(1) # elif options.force_ocr: # log.info(s.format(pageno, # "rasterizing text and running OCR anyway")) # elif options.skip_text: # log.info(s.format(pageno, # "skipping all processing on this page")) # ocr_required = pageinfo['images'] and \ # (options.force_ocr or # (not (pageinfo['has_text'] and options.skip_text))) # if ocr_required and options.skip_big: # area = pageinfo['width_inches'] * pageinfo['height_inches'] # pixel_count = pageinfo['width_pixels'] * pageinfo['height_pixels'] # if area > (11.0 * 17.0) or pixel_count > (300.0 * 300.0 * 11 * 17): # ocr_required = False # log.info( # "Page {0} is very large; skipping due to -b".format(pageno)) @subdivide( clean_pdf, formatter(), "{path[0]}/*.page.pdf", "{path[0]}/", _log, _pdfinfo, _pdfinfo_lock) def split_pages( input_file, output_files, output_file_name_root, log, pdfinfo, pdfinfo_lock): for oo in output_files: with suppress(FileNotFoundError): os.unlink(oo) args_pdfseparate = [ 'pdfseparate', input_file, output_file_name_root + '%06d.page.pdf' ] check_call(args_pdfseparate) @transform( input=split_pages, filter=suffix('.page.pdf'), output='.done.pdf', output_dir=options.temp_folder, extras=[_log, _pdfinfo, _pdfinfo_lock]) def noop( input_file, output_file, log, pdfinfo, pdfinfo_lock): shutil.copy(input_file, output_file) @transform( input=clean_pdf, filter=suffix('.cleaned.pdf'), output='.pdfa_def.ps', output_dir=options.temp_folder, extras=[_log]) def generate_postscript_stub( input_file, output_file, log): generate_pdfa_def(output_file) @merge( input=[noop, generate_postscript_stub], output=options.output_file, extras=[_log, _pdfinfo, _pdfinfo_lock]) def merge_pages( input_files, output_file, log, pdfinfo, pdfinfo_lock): ocr_pages, postscript = input_files[0:-1], input_files[-1] with NamedTemporaryFile(delete=True) as gs_pdf: args_gs = [ "gs", "-dQUIET", "-dBATCH", "-dNOPAUSE", "-sDEVICE=pdfwrite", "-sColorConversionStrategy=/RGB", "-sProcessColorModel=DeviceRGB", "-dPDFA", "-sPDFACompatibilityPolicy=2", "-sOutputICCProfile=srgb.icc", "-sOutputFile=" + gs_pdf.name, postscript, # the PDF/A definition header ] args_gs.extend(ocr_pages) check_call(args_gs) shutil.copy(gs_pdf.name, output_file) # @active_if(not ocr_required or (ocr_required and options.exact_image)) # @transform(setup_working_directory, # formatter(), # os.path.join(options.temp_folder, '%04i.page.pdf' % pageno)) # def extract_single_page( # input_file, # output_file): # args_pdfseparate = [ # 'pdfseparate', # '-f', str(pageinfo['pageno']), '-l', str(pageinfo['pageno']), # input_file, # output_file # ] # check_call(args_pdfseparate) # @active_if(ocr_required) # @active_if(options.page_renderer == 'pdftoppm') # @transform(setup_working_directory, # formatter(), # "{path[0]}/%04i.pnm" % pageno) # def unpack_with_pdftoppm( # input_file, # output_file): # force_ppm = True # allow_jpeg = False # colorspace = 'color' # compression = 'deflate' # output_format = 'tiff' # if all(image['comp'] == 1 for image in pageinfo['images']): # if all(image['bpc'] == 1 for image in pageinfo['images']): # colorspace = 'mono' # compression = 'deflate' # elif not any(image['color'] == 'color' # for image in pageinfo['images']): # colorspace = 'gray' # if allow_jpeg and \ # all(image['enc'] == 'jpeg' for image in pageinfo['images']): # output_format = 'jpeg' # args_pdftoppm = [ # 'pdftoppm', # '-f', str(pageinfo['pageno']), '-l', str(pageinfo['pageno']), # '-rx', str(pageinfo['xres_render']), # '-ry', str(pageinfo['yres_render']) # ] # if not force_ppm: # if output_format == 'tiff': # args_pdftoppm.append('-tiff') # if False and compression: # args_pdftoppm.append('-tiffcompression') # args_pdftoppm.append(compression) # elif output_format == 'jpeg': # args_pdftoppm.append('-jpeg') # if colorspace == 'mono': # args_pdftoppm.append('-mono') # elif colorspace == 'gray': # args_pdftoppm.append('-gray') # args_pdftoppm.extend([str(input_file)]) # # Ask pdftoppm to write the binary output to stdout; therefore set # # universal_newlines=False # p = Popen(args_pdftoppm, close_fds=True, stdout=open(output_file, 'wb'), # stderr=PIPE, universal_newlines=False) # _, stderr = p.communicate() # if stderr: # # Because universal_newlines=False, stderr is bytes(), so we must # # manually convert it to str for logging # from codecs import decode # log.error(decode(stderr, sys.getdefaultencoding(), 'ignore')) # if p.returncode != 0: # raise CalledProcessError(p.returncode, args_pdftoppm) # @active_if(ocr_required) # @transform(unpack_with_pdftoppm, suffix(".pnm"), ".png") # def convert_to_png(input_file, output_file): # args_convert = [ # 'convert', # input_file, # output_file # ] # check_call(args_convert) # @active_if(ocr_required) # @active_if(options.page_renderer == 'ghostscript') # @transform(setup_working_directory, # formatter(), # "{path[0]}/%04i.png" % pageno) # def unpack_with_ghostscript( # input_file, # output_file): # device = 'png16m' # 24-bit # if all(image['comp'] == 1 for image in pageinfo['images']): # if all(image['bpc'] == 1 for image in pageinfo['images']): # device = 'pngmono' # elif not any(image['color'] == 'color' # for image in pageinfo['images']): # device = 'pnggray' # args_gs = [ # 'gs', # '-dBATCH', '-dNOPAUSE', # '-dFirstPage=%i' % pageno, # '-dLastPage=%i' % pageno, # '-sDEVICE=%s' % device, # '-o', output_file, # '-r{0}x{1}'.format( # str(pageinfo['xres_render']), str(pageinfo['yres_render'])), # input_file # ] # p = Popen(args_gs, close_fds=True, stdout=PIPE, stderr=PIPE, # universal_newlines=True) # stdout, stderr = p.communicate() # if stdout: # log.info(stdout) # if stderr: # log.error(stderr) # try: # f = open(output_file) # except FileNotFoundError: # raise # else: # f.close() # @active_if(ocr_required) # @active_if(options.preprocess_deskew != 0 # and options.deskew_provider == 'imagemagick') # @transform(convert_to_png, suffix(".png"), ".deskewed.png") # def deskew_imagemagick(input_file, output_file): # args_convert = [ # 'convert', # input_file, # '-deskew', '40%', # '-gravity', 'center', # '-extent', '{width_pixels}x{height_pixels}'.format(**pageinfo), # '+repage', # output_file # ] # p = Popen(args_convert, close_fds=True, stdout=PIPE, stderr=PIPE, # universal_newlines=True) # stdout, stderr = p.communicate() # if stdout: # log.info(stdout) # if stderr: # log.error(stderr) # if p.returncode != 0: # raise CalledProcessError(p.returncode, args_convert) # @active_if(ocr_required) # @active_if(options.preprocess_deskew != 0 # and options.deskew_provider == 'leptonica') # @transform(convert_to_png, suffix(".png"), ".deskewed.png") # def deskew_leptonica(input_file, output_file): # from .leptonica import deskew # deskew(input_file, output_file, # min(pageinfo['xres'], pageinfo['yres'])) # @active_if(ocr_required) # @active_if(options.preprocess_clean != 0) # @merge([unpack_with_pdftoppm, unpack_with_ghostscript, # deskew_imagemagick, deskew_leptonica], # os.path.join(options.temp_folder, "%04i.for_clean.pnm" % pageno)) # def select_image_for_cleaning(infiles, output_file): # input_file = infiles[-1] # args_convert = [ # 'convert', # input_file, # output_file # ] # check_call(args_convert) # @active_if(ocr_required) # @active_if(options.preprocess_clean != 0) # @transform(select_image_for_cleaning, suffix(".pnm"), ".cleaned.pnm") # def clean_unpaper(input_file, output_file): # args_unpaper = [ # 'unpaper', # '--dpi', str(int(round((pageinfo['xres'] * pageinfo['yres']) ** 0.5))), # '--mask-scan-size', '100', # '--no-deskew', # '--no-grayfilter', # '--no-blackfilter', # '--no-mask-center', # '--no-border-align', # input_file, # output_file # ] # p = Popen(args_unpaper, close_fds=True, stdout=PIPE, stderr=PIPE, # universal_newlines=True) # stdout, stderr = p.communicate() # if stdout: # log.info(stdout) # if stderr: # log.error(stderr) # if p.returncode != 0: # raise CalledProcessError(p.returncode, args_unpaper) # @active_if(ocr_required) # @transform(clean_unpaper, suffix(".cleaned.pnm"), ".cleaned.png") # def cleaned_to_png(input_file, output_file): # args_convert = [ # 'convert', # input_file, # output_file # ] # check_call(args_convert) # @active_if(ocr_required) # @merge([unpack_with_ghostscript, convert_to_png, deskew_imagemagick, # deskew_leptonica, cleaned_to_png], # os.path.join(options.temp_folder, "%04i.for_ocr.png" % pageno)) # def select_ocr_image(infiles, output_file): # re_symlink(infiles[-1], output_file) # hocr_template = ''' # # #
## # #
#