mirror of
https://github.com/langgenius/dify.git
synced 2025-12-05 07:14:13 +00:00
Signed-off-by: lyzno1 <yuanyouhuilyz@gmail.com> Co-authored-by: Stream <Stream_2@qq.com> Co-authored-by: lyzno1 <92089059+lyzno1@users.noreply.github.com> Co-authored-by: zhsama <torvalds@linux.do> Co-authored-by: Harry <xh001x@hotmail.com> Co-authored-by: lyzno1 <yuanyouhuilyz@gmail.com> Co-authored-by: yessenia <yessenia.contact@gmail.com> Co-authored-by: hjlarry <hjlarry@163.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: WTW0313 <twwu@dify.ai> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
89 lines
3.0 KiB
Python
89 lines
3.0 KiB
Python
import logging
|
|
import uuid
|
|
from typing import ClassVar
|
|
|
|
from apscheduler.schedulers.background import BackgroundScheduler # type: ignore
|
|
|
|
from core.workflow.graph_engine.entities.commands import CommandType, GraphEngineCommand
|
|
from core.workflow.graph_engine.layers.base import GraphEngineLayer
|
|
from core.workflow.graph_events.base import GraphEngineEvent
|
|
from services.workflow.entities import WorkflowScheduleCFSPlanEntity
|
|
from services.workflow.scheduler import CFSPlanScheduler, SchedulerCommand
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TimeSliceLayer(GraphEngineLayer):
|
|
"""
|
|
CFS plan scheduler to control the timeslice of the workflow.
|
|
"""
|
|
|
|
scheduler: ClassVar[BackgroundScheduler] = BackgroundScheduler()
|
|
|
|
def __init__(self, cfs_plan_scheduler: CFSPlanScheduler) -> None:
|
|
"""
|
|
CFS plan scheduler allows to control the timeslice of the workflow.
|
|
"""
|
|
|
|
if not TimeSliceLayer.scheduler.running:
|
|
TimeSliceLayer.scheduler.start()
|
|
|
|
super().__init__()
|
|
self.cfs_plan_scheduler = cfs_plan_scheduler
|
|
self.stopped = False
|
|
self.schedule_id = ""
|
|
|
|
def _checker_job(self, schedule_id: str):
|
|
"""
|
|
Check if the workflow need to be suspended.
|
|
"""
|
|
try:
|
|
if self.stopped:
|
|
self.scheduler.remove_job(schedule_id)
|
|
return
|
|
|
|
if self.cfs_plan_scheduler.can_schedule() == SchedulerCommand.RESOURCE_LIMIT_REACHED:
|
|
# remove the job
|
|
self.scheduler.remove_job(schedule_id)
|
|
|
|
if not self.command_channel:
|
|
logger.exception("No command channel to stop the workflow")
|
|
return
|
|
|
|
# send command to pause the workflow
|
|
self.command_channel.send_command(
|
|
GraphEngineCommand(
|
|
command_type=CommandType.PAUSE,
|
|
payload={
|
|
"reason": SchedulerCommand.RESOURCE_LIMIT_REACHED,
|
|
},
|
|
)
|
|
)
|
|
|
|
except Exception:
|
|
logger.exception("scheduler error during check if the workflow need to be suspended")
|
|
|
|
def on_graph_start(self):
|
|
"""
|
|
Start timer to check if the workflow need to be suspended.
|
|
"""
|
|
|
|
if self.cfs_plan_scheduler.plan.schedule_strategy == WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice:
|
|
self.schedule_id = uuid.uuid4().hex
|
|
|
|
self.scheduler.add_job(
|
|
lambda: self._checker_job(self.schedule_id),
|
|
"interval",
|
|
seconds=self.cfs_plan_scheduler.plan.granularity,
|
|
id=self.schedule_id,
|
|
)
|
|
|
|
def on_event(self, event: GraphEngineEvent):
|
|
pass
|
|
|
|
def on_graph_end(self, error: Exception | None) -> None:
|
|
self.stopped = True
|
|
# remove the scheduler
|
|
if self.schedule_id:
|
|
self.scheduler.remove_job(self.schedule_id)
|