diff --git a/pdelfin/eval/buildelo.py b/pdelfin/eval/buildelo.py index 6e0af95..151c4c1 100644 --- a/pdelfin/eval/buildelo.py +++ b/pdelfin/eval/buildelo.py @@ -3,6 +3,8 @@ import boto3 import dataclasses import random import re +from concurrent.futures import ProcessPoolExecutor, as_completed +import functools from tqdm import tqdm from itertools import combinations @@ -13,18 +15,13 @@ from dolma_refine.evaluate.aligners import HirschbergAligner from pdelfin.eval.evalhtml import create_review_html -s3_client = boto3.client('s3') - @dataclasses.dataclass class Comparison: pdf_path: str - comparison_a_path: str comparison_b_path: str - comparison_a_str: str comparison_b_str: str - alignment: float @property @@ -35,7 +32,48 @@ class Comparison: def comparison_b_method(self): return re.search(r'page[0-9]+_(\w+)\.md$', self.comparison_b_path).group(1) -def build_review_page(args, comparisons): +def process_single_pdf(pdf_path, all_mds, comparisons, segmenter_name="spacy"): + """Process a single PDF and return its comparisons.""" + # Create resources inside the worker process + s3_client = boto3.client('s3') + segmenter = SpacySegmenter(segmenter_name) + aligner = HirschbergAligner(match_score=1, + mismatch_score=-1, + indel_score=-1) + comparer = DocumentEditSimilarity(segmenter=segmenter, aligner=aligner) + + pdf_comps = [] + result_comps = [] + + # Get all comparison files for this PDF + for comp in comparisons: + comp_path = pdf_path.replace(".pdf", f"_{comp}.md") + if comp_path in all_mds: + pdf_comps.append(comp_path) + + # Generate all possible combinations + for (compa, compb) in combinations(pdf_comps, 2): + if random.choice([True, False]): + compa, compb = compb, compa + + # Get the text content + text_a = get_s3_bytes(s3_client, compa).decode("utf-8") + text_b = get_s3_bytes(s3_client, compb).decode("utf-8") + + result_comps.append( + Comparison( + pdf_path=pdf_path, + comparison_a_path=compa, + comparison_b_path=compb, + comparison_a_str=text_a, + comparison_b_str=text_b, + alignment=comparer.compute(text_a, text_b) + ) + ) + + return result_comps + +def build_review_page(args, comparisons, index=0): page_data = [] for comp in comparisons: @@ -50,7 +88,8 @@ def build_review_page(args, comparisons): "alignment": comp.alignment }) - create_review_html(page_data, args.name + ".html") + report_name = f"{args.name}{f'_{index}' if args.num_copies > 1 else ''}.html" + create_review_html(page_data, report_name) if __name__ == "__main__": parser = argparse.ArgumentParser( @@ -67,11 +106,23 @@ if __name__ == "__main__": type=int, help="Number of entries to show on the generated review page", ) + parser.add_argument( + '--max_workers', + type=int, + default=None, + help="Maximum number of worker processes to use for parallel processing", + ) parser.add_argument( '--comparisons', - default=["pdelf", "gotocr", "gotocr_format", "mineru"], + default=["pdelf", "marker", "gotocr_format", "mineru"], help="Different variants to compare against" ) + parser.add_argument( + '--num_copies', + default=1, + type=int, + help="Number of reports to generate, labeled _0, _1, etc. if greater than 1", + ) parser.add_argument( 's3_path', type=str, @@ -80,47 +131,50 @@ if __name__ == "__main__": args = parser.parse_args() - segmenter = SpacySegmenter("spacy") - aligner = HirschbergAligner(match_score=1, - mismatch_score=-1, - indel_score=-1) - comparer = DocumentEditSimilarity(segmenter=segmenter, aligner=aligner) - - all_comps = [] + # Create S3 client only for initial file listing + s3_client = boto3.client('s3') + + # Get all PDFs and MD files all_pdfs = set(expand_s3_glob(s3_client, args.s3_path + "/*.pdf")) all_mds = set(expand_s3_glob(s3_client, args.s3_path + "/*.md")) - for pdf_path in tqdm(all_pdfs): - pdf_comps = [] - for comp in args.comparisons: - comp_path = pdf_path.replace(".pdf", f"_{comp}.md") - if comp_path in all_mds: - pdf_comps.append(comp_path) - - for (compa, compb) in combinations(pdf_comps, 2): - if random.choice([True, False]): - compa, compb = compb, compa - - text_a = get_s3_bytes(s3_client, compa).decode("utf-8") - text_b = get_s3_bytes(s3_client, compb).decode("utf-8") - - all_comps.append( - Comparison(pdf_path=pdf_path, - comparison_a_path=compa, - comparison_b_path=compb, - comparison_a_str=text_a, - comparison_b_str=text_b, - alignment=comparer.compute(text_a, text_b) - ) - ) - - # if len(all_comps) > 1000: - # break - - # Sorting by alignment score is problemetic, because it only returns completely pathological parses - # And we miss cases where the parse is similar, but one thing hallucinated a word or two, etc. - #all_comps.sort(key=lambda c: c.alignment) + all_comps = [] + # Create a partial function with all the common arguments + process_pdf = functools.partial( + process_single_pdf, + all_mds=all_mds, + comparisons=args.comparisons + ) + + # Use ProcessPoolExecutor for parallel processing + with ProcessPoolExecutor(max_workers=args.max_workers) as executor: + # Submit all PDF processing tasks + future_to_pdf = { + executor.submit(process_pdf, pdf_path): pdf_path + for pdf_path in all_pdfs + } + + # Process results as they complete using tqdm for progress + for future in tqdm(as_completed(future_to_pdf), total=len(all_pdfs)): + pdf_path = future_to_pdf[future] + try: + pdf_results = future.result() + all_comps.extend(pdf_results) + except Exception as e: + print(f"Error processing {pdf_path}: {str(e)}") + + # Shuffle the results random.shuffle(all_comps) - result = build_review_page(args, all_comps[0:args.review_size]) \ No newline at end of file + # Generate the specified number of copies of the report + for i in range(args.num_copies): + start_index = i * args.review_size + end_index = start_index + args.review_size + + # Check if there is enough data for the next report + if start_index >= len(all_comps): + print(f"Not enough data to generate report {i}. Stopping early.") + break + + build_review_page(args, all_comps[start_index:end_index], index=i) \ No newline at end of file