Refa: revert to original task message collection logic (#8251)

### What problem does this PR solve?

Get rid of 'RedisDB.get_unacked_iterator queue rag_flow_svr_queue_1
doesn't exist'

----

Edit: revert to original message collection logic.

### Type of change

- [x] Refactoring

---------

Co-authored-by: Zhichang Yu <yuzhichang@gmail.com>
Co-authored-by: Kevin Hu <kevinhu.sh@gmail.com>
This commit is contained in:
Yongteng Lei 2025-06-13 16:38:53 +08:00 committed by GitHub
parent 65d5268439
commit 8f9e7a6f6f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -21,8 +21,6 @@ import sys
import threading
import time
from valkey import RedisError
from api.utils.log_utils import initRootLogger, get_project_base_directory
from graphrag.general.index import run_graphrag
from graphrag.utils import get_llm_cache, set_llm_cache, get_tags_from_cache, set_tags_to_cache
@ -188,45 +186,20 @@ def set_progress(task_id, from_page=0, to_page=-1, prog=None, msg="Processing...
async def collect():
global CONSUMER_NAME, DONE_TASKS, FAILED_TASKS
global UNACKED_ITERATOR
svr_queue_names = get_svr_queue_names()
redis_msg = None
svr_queue_names = get_svr_queue_names()
try:
if not UNACKED_ITERATOR:
UNACKED_ITERATOR = None
logging.debug("Rebuilding UNACKED_ITERATOR due to it is None")
try:
UNACKED_ITERATOR = REDIS_CONN.get_unacked_iterator(svr_queue_names, SVR_CONSUMER_GROUP_NAME, CONSUMER_NAME)
logging.debug("UNACKED_ITERATOR rebuilt successfully")
except RedisError as e:
UNACKED_ITERATOR = None
logging.warning(f"Failed to rebuild UNACKED_ITERATOR: {e}")
if UNACKED_ITERATOR:
try:
redis_msg = next(UNACKED_ITERATOR)
except StopIteration:
UNACKED_ITERATOR = None
logging.debug("UNACKED_ITERATOR exhausted, clearing")
except Exception as e:
UNACKED_ITERATOR = None
logging.warning(f"UNACKED_ITERATOR raised exception: {e}")
if not redis_msg:
UNACKED_ITERATOR = REDIS_CONN.get_unacked_iterator(svr_queue_names, SVR_CONSUMER_GROUP_NAME, CONSUMER_NAME)
try:
redis_msg = next(UNACKED_ITERATOR)
except StopIteration:
for svr_queue_name in svr_queue_names:
try:
redis_msg = REDIS_CONN.queue_consumer(svr_queue_name, SVR_CONSUMER_GROUP_NAME, CONSUMER_NAME)
if redis_msg:
break
except RedisError as e:
logging.warning(f"queue_consumer failed for {svr_queue_name}: {e}")
continue
except Exception as e:
logging.exception(f"collect task encountered unexpected exception: {e}")
UNACKED_ITERATOR = None
await trio.sleep(1)
redis_msg = REDIS_CONN.queue_consumer(svr_queue_name, SVR_CONSUMER_GROUP_NAME, CONSUMER_NAME)
if redis_msg:
break
except Exception:
logging.exception("collect got exception")
return None, None
if not redis_msg: