mirror of
https://github.com/langgenius/dify.git
synced 2025-12-05 23:36:09 +00:00
89 lines
3.0 KiB
Python
89 lines
3.0 KiB
Python
|
|
import logging
|
||
|
|
import uuid
|
||
|
|
from typing import ClassVar
|
||
|
|
|
||
|
|
from apscheduler.schedulers.background import BackgroundScheduler # type: ignore
|
||
|
|
|
||
|
|
from core.workflow.graph_engine.entities.commands import CommandType, GraphEngineCommand
|
||
|
|
from core.workflow.graph_engine.layers.base import GraphEngineLayer
|
||
|
|
from core.workflow.graph_events.base import GraphEngineEvent
|
||
|
|
from services.workflow.entities import WorkflowScheduleCFSPlanEntity
|
||
|
|
from services.workflow.scheduler import CFSPlanScheduler, SchedulerCommand
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
|
||
|
|
class TimeSliceLayer(GraphEngineLayer):
|
||
|
|
"""
|
||
|
|
CFS plan scheduler to control the timeslice of the workflow.
|
||
|
|
"""
|
||
|
|
|
||
|
|
scheduler: ClassVar[BackgroundScheduler] = BackgroundScheduler()
|
||
|
|
|
||
|
|
def __init__(self, cfs_plan_scheduler: CFSPlanScheduler) -> None:
|
||
|
|
"""
|
||
|
|
CFS plan scheduler allows to control the timeslice of the workflow.
|
||
|
|
"""
|
||
|
|
|
||
|
|
if not TimeSliceLayer.scheduler.running:
|
||
|
|
TimeSliceLayer.scheduler.start()
|
||
|
|
|
||
|
|
super().__init__()
|
||
|
|
self.cfs_plan_scheduler = cfs_plan_scheduler
|
||
|
|
self.stopped = False
|
||
|
|
self.schedule_id = ""
|
||
|
|
|
||
|
|
def _checker_job(self, schedule_id: str):
|
||
|
|
"""
|
||
|
|
Check if the workflow need to be suspended.
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
if self.stopped:
|
||
|
|
self.scheduler.remove_job(schedule_id)
|
||
|
|
return
|
||
|
|
|
||
|
|
if self.cfs_plan_scheduler.can_schedule() == SchedulerCommand.RESOURCE_LIMIT_REACHED:
|
||
|
|
# remove the job
|
||
|
|
self.scheduler.remove_job(schedule_id)
|
||
|
|
|
||
|
|
if not self.command_channel:
|
||
|
|
logger.exception("No command channel to stop the workflow")
|
||
|
|
return
|
||
|
|
|
||
|
|
# send command to pause the workflow
|
||
|
|
self.command_channel.send_command(
|
||
|
|
GraphEngineCommand(
|
||
|
|
command_type=CommandType.PAUSE,
|
||
|
|
payload={
|
||
|
|
"reason": SchedulerCommand.RESOURCE_LIMIT_REACHED,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
)
|
||
|
|
|
||
|
|
except Exception:
|
||
|
|
logger.exception("scheduler error during check if the workflow need to be suspended")
|
||
|
|
|
||
|
|
def on_graph_start(self):
|
||
|
|
"""
|
||
|
|
Start timer to check if the workflow need to be suspended.
|
||
|
|
"""
|
||
|
|
|
||
|
|
if self.cfs_plan_scheduler.plan.schedule_strategy == WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice:
|
||
|
|
self.schedule_id = uuid.uuid4().hex
|
||
|
|
|
||
|
|
self.scheduler.add_job(
|
||
|
|
lambda: self._checker_job(self.schedule_id),
|
||
|
|
"interval",
|
||
|
|
seconds=self.cfs_plan_scheduler.plan.granularity,
|
||
|
|
id=self.schedule_id,
|
||
|
|
)
|
||
|
|
|
||
|
|
def on_event(self, event: GraphEngineEvent):
|
||
|
|
pass
|
||
|
|
|
||
|
|
def on_graph_end(self, error: Exception | None) -> None:
|
||
|
|
self.stopped = True
|
||
|
|
# remove the scheduler
|
||
|
|
if self.schedule_id:
|
||
|
|
self.scheduler.remove_job(self.schedule_id)
|