Feat/monitor task (#11116)

### What problem does this PR solve?

Show task executor.

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
Lynn 2025-11-10 12:51:39 +08:00 committed by GitHub
parent 7423a5806e
commit d016a06fd5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 74 additions and 3 deletions

View File

@ -392,6 +392,21 @@ class AdminCLI(Cmd):
print(str(e))
print(f"Can't access {self.host}, port: {self.port}")
def _format_service_detail_table(self, data):
if not any([isinstance(v, list) for v in data.values()]):
# normal table
return data
# handle task_executor heartbeats map, for example {'name': [{'done': 2, 'now': timestamp1}, {'done': 3, 'now': timestamp2}]
task_executor_list = []
for k, v in data.items():
# display latest status
heartbeats = sorted(v, key=lambda x: x["now"], reverse=True)
task_executor_list.append({
"task_executor_name": k,
**heartbeats[0],
})
return task_executor_list
def _print_table_simple(self, data):
if not data:
print("No data to print")
@ -595,7 +610,8 @@ class AdminCLI(Cmd):
if isinstance(res_data['message'], str):
print(res_data['message'])
else:
self._print_table_simple(res_data['message'])
data = self._format_service_detail_table(res_data['message'])
self._print_table_simple(data)
else:
print(f"Service {res_data['service_name']} is down, {res_data['message']}")
else:

View File

@ -1,6 +1,6 @@
[project]
name = "ragflow-cli"
version = "0.21.1"
version = "0.22.0"
description = "Admin Service's client of [RAGFlow](https://github.com/infiniflow/ragflow). The Admin Service provides user management and system monitoring. "
authors = [{ name = "Lynn", email = "lynn_inf@hotmail.com" }]
license = { text = "Apache License, Version 2.0" }

View File

@ -183,11 +183,13 @@ class RAGFlowServerConfig(BaseConfig):
class TaskExecutorConfig(BaseConfig):
message_queue_type: str
def to_dict(self) -> dict[str, Any]:
result = super().to_dict()
if 'extra' not in result:
result['extra'] = dict()
result['extra']['message_queue_type'] = self.message_queue_type
return result
@ -299,6 +301,15 @@ def load_configurations(config_path: str) -> list[BaseConfig]:
id_count += 1
case "admin":
pass
case "task_executor":
name: str = 'task_executor'
host: str = v.get('host', '')
port: int = v.get('port', 0)
message_queue_type: str = v.get('message_queue_type')
config = TaskExecutorConfig(id=id_count, name=name, host=host, port=port, message_queue_type=message_queue_type,
service_type="task_executor", detail_func_name="check_task_executor_alive")
configurations.append(config)
id_count += 1
case _:
logging.warning(f"Unknown configuration key: {k}")
continue

View File

@ -192,6 +192,10 @@ class ServiceMgr:
config_dict['status'] = 'timeout'
except Exception:
config_dict['status'] = 'timeout'
if not config_dict['host']:
config_dict['host'] = '-'
if not config_dict['port']:
config_dict['port'] = '-'
result.append(config_dict)
return result

View File

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from datetime import datetime
import json
import os
import requests
from timeit import default_timer as timer
@ -148,7 +150,7 @@ def check_ragflow_server_alive():
try:
url = f'http://{settings.HOST_IP}:{settings.HOST_PORT}/v1/system/ping'
if '0.0.0.0' in url:
url.replace('0.0.0.0', '127.0.0.1')
url = url.replace('0.0.0.0', '127.0.0.1')
response = requests.get(url)
if response.status_code == 200:
return {"status": "alive", "message": f"Confirm elapsed: {(timer() - start_time) * 1000.0:.1f} ms."}
@ -161,6 +163,26 @@ def check_ragflow_server_alive():
}
def check_task_executor_alive():
task_executor_heartbeats = {}
try:
task_executors = REDIS_CONN.smembers("TASKEXE")
now = datetime.now().timestamp()
for task_executor_id in task_executors:
heartbeats = REDIS_CONN.zrangebyscore(task_executor_id, now - 60 * 30, now)
heartbeats = [json.loads(heartbeat) for heartbeat in heartbeats]
task_executor_heartbeats[task_executor_id] = heartbeats
if task_executor_heartbeats:
return {"status": "alive", "message": task_executor_heartbeats}
else:
return {"status": "timeout", "message": "Not found any task executor."}
except Exception as e:
return {
"status": "timeout",
"message": f"error: {str(e)}"
}
def run_health_checks() -> tuple[dict, bool]:
result: dict[str, str | dict] = {}

View File

@ -32,6 +32,8 @@ redis:
db: 1
password: 'infini_rag_flow'
host: 'localhost:6379'
task_executor:
message_queue_type: 'redis'
user_default_llm:
default_models:
embedding_model:

View File

@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import socket
import concurrent
# from beartype import BeartypeConf
# from beartype.claw import beartype_all # <-- you didn't sign up for this
@ -963,6 +964,17 @@ async def handle_task():
redis_msg.ack()
async def get_server_ip() -> str:
# get ip by udp
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
except Exception as e:
logging.error(str(e))
return 'Unknown'
async def report_status():
global CONSUMER_NAME, BOOT_AT, PENDING_TASKS, LAG_TASKS, DONE_TASKS, FAILED_TASKS
REDIS_CONN.sadd("TASKEXE", CONSUMER_NAME)
@ -975,8 +987,12 @@ async def report_status():
PENDING_TASKS = int(group_info.get("pending", 0))
LAG_TASKS = int(group_info.get("lag", 0))
pid = os.getpid()
ip_address = await get_server_ip()
current = copy.deepcopy(CURRENT_TASKS)
heartbeat = json.dumps({
"ip_address": ip_address,
"pid": pid,
"name": CONSUMER_NAME,
"now": now.astimezone().isoformat(timespec="milliseconds"),
"boot_at": BOOT_AT,