diff --git a/pdelfin/filter/filter.py b/pdelfin/filter/filter.py index dee224a..f420495 100644 --- a/pdelfin/filter/filter.py +++ b/pdelfin/filter/filter.py @@ -60,6 +60,9 @@ class PdfFilter: word_counts = Counter(clean_text.split()) total_words = len(clean_text.split()) + if total_words == 0: + return False + seo_score = sum(word_counts[word] for word in seo_words if word in word_counts) return (seo_score / total_words) > self.download_spam_threshold @@ -107,3 +110,89 @@ class PdfFilter: return True # Filter out return False # Keep the PDF + + +if __name__ == "__main__": + import tempfile + import boto3 + from pdelfin.s3_utils import parse_s3_path + from concurrent.futures import ProcessPoolExecutor, as_completed + from tqdm import tqdm + + # Quiet logs from pypdf + logging.getLogger("pypdf").setLevel(logging.ERROR) + + def process_pdf(s3_path): + """ + Process a single PDF file to determine if it should be kept or removed. + """ + s3_bucket, s3_key = parse_s3_path(s3_path) + pdf_s3 = boto3.client("s3") + + with tempfile.NamedTemporaryFile(suffix=".pdf", delete=True) as tmp_file: + pdf_s3.download_fileobj(s3_bucket, s3_key, tmp_file) + tmp_file.flush() + + # Perform filtering logic + if filter.filter_out_pdf(tmp_file.name): + return s3_path, "remove" + else: + return s3_path, "keep" + + # Load the list of S3 paths with a progress bar + with open("/home/ubuntu/s2pdf_paths_1M.txt", "r") as f: + s3_work_paths = list(filter(None, (line.strip() for line in tqdm(f, desc="Loading paths")))) + + # Initialize the PDF filter + filter = PdfFilter( + languages_to_keep={Language.ENGLISH, None}, # none means could not detect language, that's okay keep it, might be an OCR + apply_download_spam_check=True, + apply_form_check=True, + ) + + # Output files + keep_path = "/home/ubuntu/s2pdf_paths_filter_keep.txt" + remove_path = "/home/ubuntu/s2pdf_paths_filter_remove.txt" + + # Max number of concurrent futures + max_concurrent_futures = 1000 + + # Process the PDFs in parallel with limited concurrent futures + with open(keep_path, "w") as fkeep, open(remove_path, "w") as fremove: + with ProcessPoolExecutor(max_workers=max_concurrent_futures) as executor: + futures = {} + with tqdm(total=len(s3_work_paths), desc="Processing PDFs") as pbar: + for s3_path in s3_work_paths: + # Submit a new future if we haven't reached the max_concurrent_futures + future = executor.submit(process_pdf, s3_path) + futures[future] = s3_path + + # Monitor completed futures + try: + while len(futures) >= max_concurrent_futures: + for completed_future in as_completed(futures, timeout=0.1): + s3_path = futures.pop(completed_future) + try: + s3_path, result = completed_future.result() + if result == "keep": + fkeep.write(s3_path + "\n") + elif result == "remove": + fremove.write(s3_path + "\n") + except Exception as e: + print(f"Error processing {s3_path}: {e}") + pbar.update(1) + except TimeoutError: + pass + + # Process remaining futures after all tasks have been submitted + for completed_future in as_completed(futures): + s3_path = futures.pop(completed_future) + try: + s3_path, result = completed_future.result() + if result == "keep": + fkeep.write(s3_path + "\n") + elif result == "remove": + fremove.write(s3_path + "\n") + except Exception as e: + print(f"Error processing {s3_path}: {e}") + pbar.update(1)