diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index 10fb62b41..1e4551943 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -21,8 +21,6 @@ import sys import threading import time -from valkey import RedisError - from api.utils.log_utils import initRootLogger, get_project_base_directory from graphrag.general.index import run_graphrag from graphrag.utils import get_llm_cache, set_llm_cache, get_tags_from_cache, set_tags_to_cache @@ -188,45 +186,20 @@ def set_progress(task_id, from_page=0, to_page=-1, prog=None, msg="Processing... async def collect(): global CONSUMER_NAME, DONE_TASKS, FAILED_TASKS global UNACKED_ITERATOR - svr_queue_names = get_svr_queue_names() - redis_msg = None + svr_queue_names = get_svr_queue_names() try: if not UNACKED_ITERATOR: - UNACKED_ITERATOR = None - logging.debug("Rebuilding UNACKED_ITERATOR due to it is None") - try: - UNACKED_ITERATOR = REDIS_CONN.get_unacked_iterator(svr_queue_names, SVR_CONSUMER_GROUP_NAME, CONSUMER_NAME) - logging.debug("UNACKED_ITERATOR rebuilt successfully") - except RedisError as e: - UNACKED_ITERATOR = None - logging.warning(f"Failed to rebuild UNACKED_ITERATOR: {e}") - - if UNACKED_ITERATOR: - try: - redis_msg = next(UNACKED_ITERATOR) - except StopIteration: - UNACKED_ITERATOR = None - logging.debug("UNACKED_ITERATOR exhausted, clearing") - - except Exception as e: - UNACKED_ITERATOR = None - logging.warning(f"UNACKED_ITERATOR raised exception: {e}") - - if not redis_msg: + UNACKED_ITERATOR = REDIS_CONN.get_unacked_iterator(svr_queue_names, SVR_CONSUMER_GROUP_NAME, CONSUMER_NAME) + try: + redis_msg = next(UNACKED_ITERATOR) + except StopIteration: for svr_queue_name in svr_queue_names: - try: - redis_msg = REDIS_CONN.queue_consumer(svr_queue_name, SVR_CONSUMER_GROUP_NAME, CONSUMER_NAME) - if redis_msg: - break - except RedisError as e: - logging.warning(f"queue_consumer failed for {svr_queue_name}: {e}") - continue - - except Exception as e: - logging.exception(f"collect task encountered unexpected exception: {e}") - UNACKED_ITERATOR = None - await trio.sleep(1) + redis_msg = REDIS_CONN.queue_consumer(svr_queue_name, SVR_CONSUMER_GROUP_NAME, CONSUMER_NAME) + if redis_msg: + break + except Exception: + logging.exception("collect got exception") return None, None if not redis_msg: