olmocr/pdelfin/beakerpipeline.py

134 lines
4.7 KiB
Python
Raw Normal View History

import argparse
import subprocess
import signal
import sys
import os
import time
import tempfile
import redis
import random
import boto3
import atexit # New import for atexit
from pdelfin.s3_utils import expand_s3_glob
workspace_s3 = boto3.client('s3')
pdf_s3 = boto3.client('s3')
def populate_queue_if_empty(queue, s3_glob_path):
# Check if the queue is empty, and if so, populate it with work items
if queue.llen("work_queue") == 0:
paths = expand_s3_glob(pdf_s3, s3_glob_path)
for path in paths:
queue.rpush("work_queue", path)
print("Queue populated with initial work items.")
def process(item):
# Simulate processing time between 1 and 10 seconds
print(f"Processing item: {item}")
time.sleep(random.randint(1, 3))
print(f"Completed processing item: {item}")
def main():
parser = argparse.ArgumentParser(description='Set up Redis Sentinel-based worker queue.')
parser.add_argument('--leader-ip', help='IP address of the initial leader node')
parser.add_argument('--leader-port', type=int, default=6379, help='Port of the initial leader node')
parser.add_argument('--replica', type=int, required=True, help='Replica number (0 to N-1)')
parser.add_argument('--add-pdfs', required=True, help='S3 glob path for work items')
args = parser.parse_args()
replica_number = args.replica
base_redis_port = 6379
base_sentinel_port = 26379
redis_port = base_redis_port + replica_number
sentinel_port = base_sentinel_port + replica_number
if replica_number == 0:
leader_ip = args.leader_ip if args.leader_ip else '127.0.0.1'
leader_port = args.leader_port
else:
if not args.leader_ip:
print('Error: --leader_ip is required for replica nodes (replica_number >= 1)')
sys.exit(1)
leader_ip = args.leader_ip
leader_port = args.leader_port
temp_dir = tempfile.mkdtemp()
redis_conf_path = os.path.join(temp_dir, 'redis.conf')
sentinel_conf_path = os.path.join(temp_dir, 'sentinel.conf')
with open(redis_conf_path, 'w') as f:
f.write(f'port {redis_port}\n')
f.write(f'dbfilename dump-{replica_number}.rdb\n')
f.write(f'appendfilename "appendonly-{replica_number}.aof"\n')
f.write(f'logfile "redis-{replica_number}.log"\n')
f.write(f'dir {temp_dir}\n')
if replica_number == 0:
f.write('bind 0.0.0.0\n')
else:
f.write(f'replicaof {leader_ip} {leader_port}\n')
master_name = 'mymaster'
quorum = 2
with open(sentinel_conf_path, 'w') as f:
f.write(f'port {sentinel_port}\n')
f.write(f'dir {temp_dir}\n')
f.write(f'sentinel monitor {master_name} {leader_ip} {leader_port} {quorum}\n')
f.write(f'sentinel down-after-milliseconds {master_name} 5000\n')
f.write(f'sentinel failover-timeout {master_name} 10000\n')
f.write(f'sentinel parallel-syncs {master_name} 1\n')
redis_process = subprocess.Popen(['redis-server', redis_conf_path])
sentinel_process = subprocess.Popen(['redis-sentinel', sentinel_conf_path])
# Register atexit function to guarantee process termination
def terminate_processes():
redis_process.terminate()
sentinel_process.terminate()
redis_process.wait() # Ensures subprocess is cleaned up
sentinel_process.wait()
print("Child processes terminated.")
atexit.register(terminate_processes)
# Also handle signal-based termination
def handle_signal(signum, frame):
terminate_processes()
sys.exit(0)
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)
# Connect to Redis
r = redis.StrictRedis(host=leader_ip, port=redis_port, decode_responses=True)
# Populate the work queue if this is the leader (replica 0)
if replica_number == 0:
populate_queue_if_empty(r, args.add_pdfs)
try:
while True:
# Try to get an item from the queue with a 1-minute timeout for processing
# work_item = r.brpoplpush("work_queue", "processing_queue", 60)
# if work_item:
# try:
# process(work_item)
# # Remove from the processing queue if processed successfully
# r.lrem("processing_queue", 1, work_item)
# except Exception as e:
# print(f"Error processing {work_item}: {e}")
# # If an error occurs, let it be requeued after timeout
queue_length = r.llen("work_queue")
print(f"Total work items in queue: {queue_length}")
time.sleep(1)
except KeyboardInterrupt:
handle_signal(None, None)
if __name__ == '__main__':
main()