fix(api): Fix resetting sys var causing internal server error (#21604)

and sorts draft variables by their creation time, ensures a consist order.
This commit is contained in:
QuantumGhost 2025-07-02 13:36:35 +08:00 committed by GitHub
parent 71d6cf1b1d
commit c2e599cd85
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 401 additions and 140 deletions

View File

@ -27,6 +27,9 @@ from core.ops.ops_trace_manager import TraceQueueManager
from core.prompt.utils.get_thread_messages_length import get_thread_messages_length from core.prompt.utils.get_thread_messages_length import get_thread_messages_length
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository
from core.workflow.repositories.draft_variable_repository import (
DraftVariableSaverFactory,
)
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core.workflow.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader from core.workflow.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader
@ -36,7 +39,10 @@ from libs.flask_utils import preserve_flask_contexts
from models import Account, App, Conversation, EndUser, Message, Workflow, WorkflowNodeExecutionTriggeredFrom from models import Account, App, Conversation, EndUser, Message, Workflow, WorkflowNodeExecutionTriggeredFrom
from models.enums import WorkflowRunTriggeredFrom from models.enums import WorkflowRunTriggeredFrom
from services.conversation_service import ConversationService from services.conversation_service import ConversationService
from services.workflow_draft_variable_service import DraftVarLoader, WorkflowDraftVariableService from services.workflow_draft_variable_service import (
DraftVarLoader,
WorkflowDraftVariableService,
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -450,6 +456,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
workflow_execution_repository=workflow_execution_repository, workflow_execution_repository=workflow_execution_repository,
workflow_node_execution_repository=workflow_node_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository,
stream=stream, stream=stream,
draft_var_saver_factory=self._get_draft_var_saver_factory(invoke_from),
) )
return AdvancedChatAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from) return AdvancedChatAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)
@ -521,6 +528,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
user: Union[Account, EndUser], user: Union[Account, EndUser],
workflow_execution_repository: WorkflowExecutionRepository, workflow_execution_repository: WorkflowExecutionRepository,
workflow_node_execution_repository: WorkflowNodeExecutionRepository, workflow_node_execution_repository: WorkflowNodeExecutionRepository,
draft_var_saver_factory: DraftVariableSaverFactory,
stream: bool = False, stream: bool = False,
) -> Union[ChatbotAppBlockingResponse, Generator[ChatbotAppStreamResponse, None, None]]: ) -> Union[ChatbotAppBlockingResponse, Generator[ChatbotAppStreamResponse, None, None]]:
""" """
@ -547,6 +555,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
workflow_execution_repository=workflow_execution_repository, workflow_execution_repository=workflow_execution_repository,
workflow_node_execution_repository=workflow_node_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository,
stream=stream, stream=stream,
draft_var_saver_factory=draft_var_saver_factory,
) )
try: try:

View File

@ -64,6 +64,7 @@ from core.workflow.entities.workflow_execution import WorkflowExecutionStatus, W
from core.workflow.enums import SystemVariableKey from core.workflow.enums import SystemVariableKey
from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState
from core.workflow.nodes import NodeType from core.workflow.nodes import NodeType
from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager
@ -94,6 +95,7 @@ class AdvancedChatAppGenerateTaskPipeline:
dialogue_count: int, dialogue_count: int,
workflow_execution_repository: WorkflowExecutionRepository, workflow_execution_repository: WorkflowExecutionRepository,
workflow_node_execution_repository: WorkflowNodeExecutionRepository, workflow_node_execution_repository: WorkflowNodeExecutionRepository,
draft_var_saver_factory: DraftVariableSaverFactory,
) -> None: ) -> None:
self._base_task_pipeline = BasedGenerateTaskPipeline( self._base_task_pipeline = BasedGenerateTaskPipeline(
application_generate_entity=application_generate_entity, application_generate_entity=application_generate_entity,
@ -153,6 +155,7 @@ class AdvancedChatAppGenerateTaskPipeline:
self._conversation_name_generate_thread: Thread | None = None self._conversation_name_generate_thread: Thread | None = None
self._recorded_files: list[Mapping[str, Any]] = [] self._recorded_files: list[Mapping[str, Any]] = []
self._workflow_run_id: str = "" self._workflow_run_id: str = ""
self._draft_var_saver_factory = draft_var_saver_factory
def process(self) -> Union[ChatbotAppBlockingResponse, Generator[ChatbotAppStreamResponse, None, None]]: def process(self) -> Union[ChatbotAppBlockingResponse, Generator[ChatbotAppStreamResponse, None, None]]:
""" """
@ -371,6 +374,7 @@ class AdvancedChatAppGenerateTaskPipeline:
workflow_node_execution=workflow_node_execution, workflow_node_execution=workflow_node_execution,
) )
session.commit() session.commit()
self._save_output_for_event(event, workflow_node_execution.id)
if node_finish_resp: if node_finish_resp:
yield node_finish_resp yield node_finish_resp
@ -390,6 +394,8 @@ class AdvancedChatAppGenerateTaskPipeline:
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution, workflow_node_execution=workflow_node_execution,
) )
if isinstance(event, QueueNodeExceptionEvent):
self._save_output_for_event(event, workflow_node_execution.id)
if node_finish_resp: if node_finish_resp:
yield node_finish_resp yield node_finish_resp
@ -759,3 +765,15 @@ class AdvancedChatAppGenerateTaskPipeline:
if not message: if not message:
raise ValueError(f"Message not found: {self._message_id}") raise ValueError(f"Message not found: {self._message_id}")
return message return message
def _save_output_for_event(self, event: QueueNodeSucceededEvent | QueueNodeExceptionEvent, node_execution_id: str):
with Session(db.engine) as session, session.begin():
saver = self._draft_var_saver_factory(
session=session,
app_id=self._application_generate_entity.app_config.app_id,
node_id=event.node_id,
node_type=event.node_type,
node_execution_id=node_execution_id,
enclosing_node_id=event.in_loop_id or event.in_iteration_id,
)
saver.save(event.process_data, event.outputs)

View File

@ -1,10 +1,20 @@
import json import json
from collections.abc import Generator, Mapping, Sequence from collections.abc import Generator, Mapping, Sequence
from typing import TYPE_CHECKING, Any, Optional, Union from typing import TYPE_CHECKING, Any, Optional, Union, final
from sqlalchemy.orm import Session
from core.app.app_config.entities import VariableEntityType from core.app.app_config.entities import VariableEntityType
from core.app.entities.app_invoke_entities import InvokeFrom
from core.file import File, FileUploadConfig from core.file import File, FileUploadConfig
from core.workflow.nodes.enums import NodeType
from core.workflow.repositories.draft_variable_repository import (
DraftVariableSaver,
DraftVariableSaverFactory,
NoopDraftVariableSaver,
)
from factories import file_factory from factories import file_factory
from services.workflow_draft_variable_service import DraftVariableSaver as DraftVariableSaverImpl
if TYPE_CHECKING: if TYPE_CHECKING:
from core.app.app_config.entities import VariableEntity from core.app.app_config.entities import VariableEntity
@ -159,3 +169,38 @@ class BaseAppGenerator:
yield f"event: {message}\n\n" yield f"event: {message}\n\n"
return gen() return gen()
@final
@staticmethod
def _get_draft_var_saver_factory(invoke_from: InvokeFrom) -> DraftVariableSaverFactory:
if invoke_from == InvokeFrom.DEBUGGER:
def draft_var_saver_factory(
session: Session,
app_id: str,
node_id: str,
node_type: NodeType,
node_execution_id: str,
enclosing_node_id: str | None = None,
) -> DraftVariableSaver:
return DraftVariableSaverImpl(
session=session,
app_id=app_id,
node_id=node_id,
node_type=node_type,
node_execution_id=node_execution_id,
enclosing_node_id=enclosing_node_id,
)
else:
def draft_var_saver_factory(
session: Session,
app_id: str,
node_id: str,
node_type: NodeType,
node_execution_id: str,
enclosing_node_id: str | None = None,
) -> DraftVariableSaver:
return NoopDraftVariableSaver()
return draft_var_saver_factory

View File

@ -25,6 +25,7 @@ from core.model_runtime.errors.invoke import InvokeAuthorizationError
from core.ops.ops_trace_manager import TraceQueueManager from core.ops.ops_trace_manager import TraceQueueManager
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository
from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core.workflow.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader from core.workflow.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader
@ -236,6 +237,10 @@ class WorkflowAppGenerator(BaseAppGenerator):
worker_thread.start() worker_thread.start()
draft_var_saver_factory = self._get_draft_var_saver_factory(
invoke_from,
)
# return response or stream generator # return response or stream generator
response = self._handle_response( response = self._handle_response(
application_generate_entity=application_generate_entity, application_generate_entity=application_generate_entity,
@ -244,6 +249,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
user=user, user=user,
workflow_execution_repository=workflow_execution_repository, workflow_execution_repository=workflow_execution_repository,
workflow_node_execution_repository=workflow_node_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository,
draft_var_saver_factory=draft_var_saver_factory,
stream=streaming, stream=streaming,
) )
@ -474,6 +480,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
user: Union[Account, EndUser], user: Union[Account, EndUser],
workflow_execution_repository: WorkflowExecutionRepository, workflow_execution_repository: WorkflowExecutionRepository,
workflow_node_execution_repository: WorkflowNodeExecutionRepository, workflow_node_execution_repository: WorkflowNodeExecutionRepository,
draft_var_saver_factory: DraftVariableSaverFactory,
stream: bool = False, stream: bool = False,
) -> Union[WorkflowAppBlockingResponse, Generator[WorkflowAppStreamResponse, None, None]]: ) -> Union[WorkflowAppBlockingResponse, Generator[WorkflowAppStreamResponse, None, None]]:
""" """
@ -494,6 +501,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
user=user, user=user,
workflow_execution_repository=workflow_execution_repository, workflow_execution_repository=workflow_execution_repository,
workflow_node_execution_repository=workflow_node_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository,
draft_var_saver_factory=draft_var_saver_factory,
stream=stream, stream=stream,
) )

View File

@ -56,6 +56,7 @@ from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk
from core.ops.ops_trace_manager import TraceQueueManager from core.ops.ops_trace_manager import TraceQueueManager
from core.workflow.entities.workflow_execution import WorkflowExecution, WorkflowExecutionStatus, WorkflowType from core.workflow.entities.workflow_execution import WorkflowExecution, WorkflowExecutionStatus, WorkflowType
from core.workflow.enums import SystemVariableKey from core.workflow.enums import SystemVariableKey
from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager
@ -87,6 +88,7 @@ class WorkflowAppGenerateTaskPipeline:
stream: bool, stream: bool,
workflow_execution_repository: WorkflowExecutionRepository, workflow_execution_repository: WorkflowExecutionRepository,
workflow_node_execution_repository: WorkflowNodeExecutionRepository, workflow_node_execution_repository: WorkflowNodeExecutionRepository,
draft_var_saver_factory: DraftVariableSaverFactory,
) -> None: ) -> None:
self._base_task_pipeline = BasedGenerateTaskPipeline( self._base_task_pipeline = BasedGenerateTaskPipeline(
application_generate_entity=application_generate_entity, application_generate_entity=application_generate_entity,
@ -131,6 +133,8 @@ class WorkflowAppGenerateTaskPipeline:
self._application_generate_entity = application_generate_entity self._application_generate_entity = application_generate_entity
self._workflow_features_dict = workflow.features_dict self._workflow_features_dict = workflow.features_dict
self._workflow_run_id = "" self._workflow_run_id = ""
self._invoke_from = queue_manager._invoke_from
self._draft_var_saver_factory = draft_var_saver_factory
def process(self) -> Union[WorkflowAppBlockingResponse, Generator[WorkflowAppStreamResponse, None, None]]: def process(self) -> Union[WorkflowAppBlockingResponse, Generator[WorkflowAppStreamResponse, None, None]]:
""" """
@ -322,6 +326,8 @@ class WorkflowAppGenerateTaskPipeline:
workflow_node_execution=workflow_node_execution, workflow_node_execution=workflow_node_execution,
) )
self._save_output_for_event(event, workflow_node_execution.id)
if node_success_response: if node_success_response:
yield node_success_response yield node_success_response
elif isinstance( elif isinstance(
@ -339,6 +345,8 @@ class WorkflowAppGenerateTaskPipeline:
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution, workflow_node_execution=workflow_node_execution,
) )
if isinstance(event, QueueNodeExceptionEvent):
self._save_output_for_event(event, workflow_node_execution.id)
if node_failed_response: if node_failed_response:
yield node_failed_response yield node_failed_response
@ -593,3 +601,15 @@ class WorkflowAppGenerateTaskPipeline:
) )
return response return response
def _save_output_for_event(self, event: QueueNodeSucceededEvent | QueueNodeExceptionEvent, node_execution_id: str):
with Session(db.engine) as session, session.begin():
saver = self._draft_var_saver_factory(
session=session,
app_id=self._application_generate_entity.app_config.app_id,
node_id=event.node_id,
node_type=event.node_type,
node_execution_id=node_execution_id,
enclosing_node_id=event.in_loop_id or event.in_iteration_id,
)
saver.save(event.process_data, event.outputs)

View File

@ -1,8 +1,6 @@
from collections.abc import Mapping from collections.abc import Mapping
from typing import Any, Optional, cast from typing import Any, Optional, cast
from sqlalchemy.orm import Session
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.apps.base_app_runner import AppRunner from core.app.apps.base_app_runner import AppRunner
from core.app.entities.queue_entities import ( from core.app.entities.queue_entities import (
@ -35,7 +33,6 @@ from core.workflow.entities.variable_pool import VariablePool
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey
from core.workflow.graph_engine.entities.event import ( from core.workflow.graph_engine.entities.event import (
AgentLogEvent, AgentLogEvent,
BaseNodeEvent,
GraphEngineEvent, GraphEngineEvent,
GraphRunFailedEvent, GraphRunFailedEvent,
GraphRunPartialSucceededEvent, GraphRunPartialSucceededEvent,
@ -70,9 +67,6 @@ from core.workflow.workflow_entry import WorkflowEntry
from extensions.ext_database import db from extensions.ext_database import db
from models.model import App from models.model import App
from models.workflow import Workflow from models.workflow import Workflow
from services.workflow_draft_variable_service import (
DraftVariableSaver,
)
class WorkflowBasedAppRunner(AppRunner): class WorkflowBasedAppRunner(AppRunner):
@ -400,7 +394,6 @@ class WorkflowBasedAppRunner(AppRunner):
in_loop_id=event.in_loop_id, in_loop_id=event.in_loop_id,
) )
) )
self._save_draft_var_for_event(event)
elif isinstance(event, NodeRunFailedEvent): elif isinstance(event, NodeRunFailedEvent):
self._publish_event( self._publish_event(
@ -464,7 +457,6 @@ class WorkflowBasedAppRunner(AppRunner):
in_loop_id=event.in_loop_id, in_loop_id=event.in_loop_id,
) )
) )
self._save_draft_var_for_event(event)
elif isinstance(event, NodeInIterationFailedEvent): elif isinstance(event, NodeInIterationFailedEvent):
self._publish_event( self._publish_event(
@ -718,30 +710,3 @@ class WorkflowBasedAppRunner(AppRunner):
def _publish_event(self, event: AppQueueEvent) -> None: def _publish_event(self, event: AppQueueEvent) -> None:
self.queue_manager.publish(event, PublishFrom.APPLICATION_MANAGER) self.queue_manager.publish(event, PublishFrom.APPLICATION_MANAGER)
def _save_draft_var_for_event(self, event: BaseNodeEvent):
run_result = event.route_node_state.node_run_result
if run_result is None:
return
process_data = run_result.process_data
outputs = run_result.outputs
with Session(bind=db.engine) as session, session.begin():
draft_var_saver = DraftVariableSaver(
session=session,
app_id=self._get_app_id(),
node_id=event.node_id,
node_type=event.node_type,
# FIXME(QuantumGhost): rely on private state of queue_manager is not ideal.
invoke_from=self.queue_manager._invoke_from,
node_execution_id=event.id,
enclosing_node_id=event.in_loop_id or event.in_iteration_id or None,
)
draft_var_saver.save(process_data=process_data, outputs=outputs)
def _remove_first_element_from_variable_string(key: str) -> str:
"""
Remove the first element from the prefix.
"""
prefix, remaining = key.split(".", maxsplit=1)
return remaining

View File

@ -66,11 +66,21 @@ class WorkflowNodeExecution(BaseModel):
but they are not stored in the model. but they are not stored in the model.
""" """
# Core identification fields # --------- Core identification fields ---------
id: str # Unique identifier for this execution record
node_execution_id: Optional[str] = None # Optional secondary ID for cross-referencing # Unique identifier for this execution record, used when persisting to storage.
# Value is a UUID string (e.g., '09b3e04c-f9ae-404c-ad82-290b8d7bd382').
id: str
# Optional secondary ID for cross-referencing purposes.
#
# NOTE: For referencing the persisted record, use `id` rather than `node_execution_id`.
# While `node_execution_id` may sometimes be a UUID string, this is not guaranteed.
# In most scenarios, `id` should be used as the primary identifier.
node_execution_id: Optional[str] = None
workflow_id: str # ID of the workflow this node belongs to workflow_id: str # ID of the workflow this node belongs to
workflow_execution_id: Optional[str] = None # ID of the specific workflow run (null for single-step debugging) workflow_execution_id: Optional[str] = None # ID of the specific workflow run (null for single-step debugging)
# --------- Core identification fields ends ---------
# Execution positioning and flow # Execution positioning and flow
index: int # Sequence number for ordering in trace visualization index: int # Sequence number for ordering in trace visualization

View File

@ -333,7 +333,7 @@ class Executor:
try: try:
response = getattr(ssrf_proxy, self.method.lower())(**request_args) response = getattr(ssrf_proxy, self.method.lower())(**request_args)
except (ssrf_proxy.MaxRetriesExceededError, httpx.RequestError) as e: except (ssrf_proxy.MaxRetriesExceededError, httpx.RequestError) as e:
raise HttpRequestNodeError(str(e)) raise HttpRequestNodeError(str(e)) from e
# FIXME: fix type ignore, this maybe httpx type issue # FIXME: fix type ignore, this maybe httpx type issue
return response # type: ignore return response # type: ignore

View File

@ -0,0 +1,32 @@
import abc
from collections.abc import Mapping
from typing import Any, Protocol
from sqlalchemy.orm import Session
from core.workflow.nodes.enums import NodeType
class DraftVariableSaver(Protocol):
@abc.abstractmethod
def save(self, process_data: Mapping[str, Any] | None, outputs: Mapping[str, Any] | None):
pass
class DraftVariableSaverFactory(Protocol):
@abc.abstractmethod
def __call__(
self,
session: Session,
app_id: str,
node_id: str,
node_type: NodeType,
node_execution_id: str,
enclosing_node_id: str | None = None,
) -> "DraftVariableSaver":
pass
class NoopDraftVariableSaver(DraftVariableSaver):
def save(self, process_data: Mapping[str, Any] | None, outputs: Mapping[str, Any] | None):
pass

View File

@ -154,7 +154,7 @@ class WorkflowDraftVariableService:
variables = ( variables = (
# Do not load the `value` field. # Do not load the `value` field.
query.options(orm.defer(WorkflowDraftVariable.value)) query.options(orm.defer(WorkflowDraftVariable.value))
.order_by(WorkflowDraftVariable.id.desc()) .order_by(WorkflowDraftVariable.created_at.desc())
.limit(limit) .limit(limit)
.offset((page - 1) * limit) .offset((page - 1) * limit)
.all() .all()
@ -168,7 +168,7 @@ class WorkflowDraftVariableService:
WorkflowDraftVariable.node_id == node_id, WorkflowDraftVariable.node_id == node_id,
) )
query = self._session.query(WorkflowDraftVariable).filter(*criteria) query = self._session.query(WorkflowDraftVariable).filter(*criteria)
variables = query.order_by(WorkflowDraftVariable.id.desc()).all() variables = query.order_by(WorkflowDraftVariable.created_at.desc()).all()
return WorkflowDraftVariableList(variables=variables) return WorkflowDraftVariableList(variables=variables)
def list_node_variables(self, app_id: str, node_id: str) -> WorkflowDraftVariableList: def list_node_variables(self, app_id: str, node_id: str) -> WorkflowDraftVariableList:
@ -235,7 +235,9 @@ class WorkflowDraftVariableService:
self._session.flush() self._session.flush()
return variable return variable
def _reset_node_var(self, workflow: Workflow, variable: WorkflowDraftVariable) -> WorkflowDraftVariable | None: def _reset_node_var_or_sys_var(
self, workflow: Workflow, variable: WorkflowDraftVariable
) -> WorkflowDraftVariable | None:
# If a variable does not allow updating, it makes no sence to resetting it. # If a variable does not allow updating, it makes no sence to resetting it.
if not variable.editable: if not variable.editable:
return variable return variable
@ -259,28 +261,35 @@ class WorkflowDraftVariableService:
self._session.flush() self._session.flush()
return None return None
# Get node type for proper value extraction
node_config = workflow.get_node_config_by_id(variable.node_id)
node_type = workflow.get_node_type_from_node_config(node_config)
outputs_dict = node_exec.outputs_dict or {} outputs_dict = node_exec.outputs_dict or {}
# a sentinel value used to check the absent of the output variable key.
absent = object()
# Note: Based on the implementation in `_build_from_variable_assigner_mapping`, if variable.get_variable_type() == DraftVariableType.NODE:
# VariableAssignerNode (both v1 and v2) can only create conversation draft variables. # Get node type for proper value extraction
# For consistency, we should simply return when processing VARIABLE_ASSIGNER nodes. node_config = workflow.get_node_config_by_id(variable.node_id)
# node_type = workflow.get_node_type_from_node_config(node_config)
# This implementation must remain synchronized with the `_build_from_variable_assigner_mapping`
# and `save` methods.
if node_type == NodeType.VARIABLE_ASSIGNER:
return variable
if variable.name not in outputs_dict: # Note: Based on the implementation in `_build_from_variable_assigner_mapping`,
# VariableAssignerNode (both v1 and v2) can only create conversation draft variables.
# For consistency, we should simply return when processing VARIABLE_ASSIGNER nodes.
#
# This implementation must remain synchronized with the `_build_from_variable_assigner_mapping`
# and `save` methods.
if node_type == NodeType.VARIABLE_ASSIGNER:
return variable
output_value = outputs_dict.get(variable.name, absent)
else:
output_value = outputs_dict.get(f"sys.{variable.name}", absent)
# We cannot use `is None` to check the existence of an output variable here as
# the value of the output may be `None`.
if output_value is absent:
# If variable not found in execution data, delete the variable # If variable not found in execution data, delete the variable
self._session.delete(instance=variable) self._session.delete(instance=variable)
self._session.flush() self._session.flush()
return None return None
value = outputs_dict[variable.name] value_seg = WorkflowDraftVariable.build_segment_with_type(variable.value_type, output_value)
value_seg = WorkflowDraftVariable.build_segment_with_type(variable.value_type, value)
# Extract variable value using unified logic # Extract variable value using unified logic
variable.set_value(value_seg) variable.set_value(value_seg)
variable.last_edited_at = None # Reset to indicate this is a reset operation variable.last_edited_at = None # Reset to indicate this is a reset operation
@ -291,10 +300,8 @@ class WorkflowDraftVariableService:
variable_type = variable.get_variable_type() variable_type = variable.get_variable_type()
if variable_type == DraftVariableType.CONVERSATION: if variable_type == DraftVariableType.CONVERSATION:
return self._reset_conv_var(workflow, variable) return self._reset_conv_var(workflow, variable)
elif variable_type == DraftVariableType.NODE:
return self._reset_node_var(workflow, variable)
else: else:
raise VariableResetError(f"cannot reset system variable, variable_id={variable.id}") return self._reset_node_var_or_sys_var(workflow, variable)
def delete_variable(self, variable: WorkflowDraftVariable): def delete_variable(self, variable: WorkflowDraftVariable):
self._session.delete(variable) self._session.delete(variable)
@ -439,6 +446,9 @@ def _batch_upsert_draft_varaible(
stmt = stmt.on_conflict_do_update( stmt = stmt.on_conflict_do_update(
index_elements=WorkflowDraftVariable.unique_app_id_node_id_name(), index_elements=WorkflowDraftVariable.unique_app_id_node_id_name(),
set_={ set_={
# Refresh creation timestamp to ensure updated variables
# appear first in chronologically sorted result sets.
"created_at": stmt.excluded.created_at,
"updated_at": stmt.excluded.updated_at, "updated_at": stmt.excluded.updated_at,
"last_edited_at": stmt.excluded.last_edited_at, "last_edited_at": stmt.excluded.last_edited_at,
"description": stmt.excluded.description, "description": stmt.excluded.description,
@ -525,9 +535,6 @@ class DraftVariableSaver:
# The type of the current node (see NodeType). # The type of the current node (see NodeType).
_node_type: NodeType _node_type: NodeType
# Indicates how the workflow execution was triggered (see InvokeFrom).
_invoke_from: InvokeFrom
# #
_node_execution_id: str _node_execution_id: str
@ -546,15 +553,16 @@ class DraftVariableSaver:
app_id: str, app_id: str,
node_id: str, node_id: str,
node_type: NodeType, node_type: NodeType,
invoke_from: InvokeFrom,
node_execution_id: str, node_execution_id: str,
enclosing_node_id: str | None = None, enclosing_node_id: str | None = None,
): ):
# Important: `node_execution_id` parameter refers to the primary key (`id`) of the
# WorkflowNodeExecutionModel/WorkflowNodeExecution, not their `node_execution_id`
# field. These are distinct database fields with different purposes.
self._session = session self._session = session
self._app_id = app_id self._app_id = app_id
self._node_id = node_id self._node_id = node_id
self._node_type = node_type self._node_type = node_type
self._invoke_from = invoke_from
self._node_execution_id = node_execution_id self._node_execution_id = node_execution_id
self._enclosing_node_id = enclosing_node_id self._enclosing_node_id = enclosing_node_id
@ -570,9 +578,6 @@ class DraftVariableSaver:
) )
def _should_save_output_variables_for_draft(self) -> bool: def _should_save_output_variables_for_draft(self) -> bool:
# Only save output variables for debugging execution of workflow.
if self._invoke_from != InvokeFrom.DEBUGGER:
return False
if self._enclosing_node_id is not None and self._node_type != NodeType.VARIABLE_ASSIGNER: if self._enclosing_node_id is not None and self._node_type != NodeType.VARIABLE_ASSIGNER:
# Currently we do not save output variables for nodes inside loop or iteration. # Currently we do not save output variables for nodes inside loop or iteration.
return False return False

View File

@ -12,7 +12,6 @@ from sqlalchemy.orm import Session
from core.app.app_config.entities import VariableEntityType from core.app.app_config.entities import VariableEntityType
from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
from core.app.entities.app_invoke_entities import InvokeFrom
from core.file import File from core.file import File
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.variables import Variable from core.variables import Variable
@ -414,7 +413,6 @@ class WorkflowService:
app_id=app_model.id, app_id=app_model.id,
node_id=workflow_node_execution.node_id, node_id=workflow_node_execution.node_id,
node_type=NodeType(workflow_node_execution.node_type), node_type=NodeType(workflow_node_execution.node_type),
invoke_from=InvokeFrom.DEBUGGER,
enclosing_node_id=enclosing_node_id, enclosing_node_id=enclosing_node_id,
node_execution_id=node_execution.id, node_execution_id=node_execution.id,
) )

View File

@ -6,12 +6,11 @@ from unittest.mock import Mock, patch
import pytest import pytest
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from core.app.entities.app_invoke_entities import InvokeFrom from core.variables import StringSegment
from core.variables.types import SegmentType
from core.workflow.constants import SYSTEM_VARIABLE_NODE_ID from core.workflow.constants import SYSTEM_VARIABLE_NODE_ID
from core.workflow.nodes import NodeType from core.workflow.nodes import NodeType
from models.enums import DraftVariableType from models.enums import DraftVariableType
from models.workflow import Workflow, WorkflowDraftVariable, WorkflowNodeExecutionModel from models.workflow import Workflow, WorkflowDraftVariable, WorkflowNodeExecutionModel, is_system_variable_editable
from services.workflow_draft_variable_service import ( from services.workflow_draft_variable_service import (
DraftVariableSaver, DraftVariableSaver,
VariableResetError, VariableResetError,
@ -32,7 +31,6 @@ class TestDraftVariableSaver:
app_id=test_app_id, app_id=test_app_id,
node_id="test_node_id", node_id="test_node_id",
node_type=NodeType.START, node_type=NodeType.START,
invoke_from=InvokeFrom.DEBUGGER,
node_execution_id="test_execution_id", node_execution_id="test_execution_id",
) )
assert saver._should_variable_be_visible("123_456", NodeType.IF_ELSE, "output") == False assert saver._should_variable_be_visible("123_456", NodeType.IF_ELSE, "output") == False
@ -79,7 +77,6 @@ class TestDraftVariableSaver:
app_id=test_app_id, app_id=test_app_id,
node_id=_NODE_ID, node_id=_NODE_ID,
node_type=NodeType.START, node_type=NodeType.START,
invoke_from=InvokeFrom.DEBUGGER,
node_execution_id="test_execution_id", node_execution_id="test_execution_id",
) )
for idx, c in enumerate(cases, 1): for idx, c in enumerate(cases, 1):
@ -94,45 +91,70 @@ class TestWorkflowDraftVariableService:
suffix = secrets.token_hex(6) suffix = secrets.token_hex(6)
return f"test_app_id_{suffix}" return f"test_app_id_{suffix}"
def _create_test_workflow(self, app_id: str) -> Workflow:
"""Create a real Workflow instance for testing"""
return Workflow.new(
tenant_id="test_tenant_id",
app_id=app_id,
type="workflow",
version="draft",
graph='{"nodes": [], "edges": []}',
features="{}",
created_by="test_user_id",
environment_variables=[],
conversation_variables=[],
)
def test_reset_conversation_variable(self): def test_reset_conversation_variable(self):
"""Test resetting a conversation variable""" """Test resetting a conversation variable"""
mock_session = Mock(spec=Session) mock_session = Mock(spec=Session)
service = WorkflowDraftVariableService(mock_session) service = WorkflowDraftVariableService(mock_session)
mock_workflow = Mock(spec=Workflow)
mock_workflow.app_id = self._get_test_app_id()
# Create mock variable test_app_id = self._get_test_app_id()
mock_variable = Mock(spec=WorkflowDraftVariable) workflow = self._create_test_workflow(test_app_id)
mock_variable.get_variable_type.return_value = DraftVariableType.CONVERSATION
mock_variable.id = "var-id" # Create real conversation variable
mock_variable.name = "test_var" test_value = StringSegment(value="test_value")
variable = WorkflowDraftVariable.new_conversation_variable(
app_id=test_app_id, name="test_var", value=test_value, description="Test conversation variable"
)
# Mock the _reset_conv_var method # Mock the _reset_conv_var method
expected_result = Mock(spec=WorkflowDraftVariable) expected_result = WorkflowDraftVariable.new_conversation_variable(
app_id=test_app_id,
name="test_var",
value=StringSegment(value="reset_value"),
)
with patch.object(service, "_reset_conv_var", return_value=expected_result) as mock_reset_conv: with patch.object(service, "_reset_conv_var", return_value=expected_result) as mock_reset_conv:
result = service.reset_variable(mock_workflow, mock_variable) result = service.reset_variable(workflow, variable)
mock_reset_conv.assert_called_once_with(mock_workflow, mock_variable) mock_reset_conv.assert_called_once_with(workflow, variable)
assert result == expected_result assert result == expected_result
def test_reset_node_variable_with_no_execution_id(self): def test_reset_node_variable_with_no_execution_id(self):
"""Test resetting a node variable with no execution ID - should delete variable""" """Test resetting a node variable with no execution ID - should delete variable"""
mock_session = Mock(spec=Session) mock_session = Mock(spec=Session)
service = WorkflowDraftVariableService(mock_session) service = WorkflowDraftVariableService(mock_session)
mock_workflow = Mock(spec=Workflow)
mock_workflow.app_id = self._get_test_app_id()
# Create mock variable with no execution ID test_app_id = self._get_test_app_id()
mock_variable = Mock(spec=WorkflowDraftVariable) workflow = self._create_test_workflow(test_app_id)
mock_variable.get_variable_type.return_value = DraftVariableType.NODE
mock_variable.node_execution_id = None
mock_variable.id = "var-id"
mock_variable.name = "test_var"
result = service._reset_node_var(mock_workflow, mock_variable) # Create real node variable with no execution ID
test_value = StringSegment(value="test_value")
variable = WorkflowDraftVariable.new_node_variable(
app_id=test_app_id,
node_id="test_node_id",
name="test_var",
value=test_value,
node_execution_id="exec-id", # Set initially
)
# Manually set to None to simulate the test condition
variable.node_execution_id = None
result = service._reset_node_var_or_sys_var(workflow, variable)
# Should delete the variable and return None # Should delete the variable and return None
mock_session.delete.assert_called_once_with(instance=mock_variable) mock_session.delete.assert_called_once_with(instance=variable)
mock_session.flush.assert_called_once() mock_session.flush.assert_called_once()
assert result is None assert result is None
@ -140,25 +162,25 @@ class TestWorkflowDraftVariableService:
"""Test resetting a node variable when execution record doesn't exist""" """Test resetting a node variable when execution record doesn't exist"""
mock_session = Mock(spec=Session) mock_session = Mock(spec=Session)
service = WorkflowDraftVariableService(mock_session) service = WorkflowDraftVariableService(mock_session)
mock_workflow = Mock(spec=Workflow)
mock_workflow.app_id = self._get_test_app_id()
# Create mock variable with execution ID test_app_id = self._get_test_app_id()
mock_variable = Mock(spec=WorkflowDraftVariable) workflow = self._create_test_workflow(test_app_id)
mock_variable.get_variable_type.return_value = DraftVariableType.NODE
mock_variable.node_execution_id = "exec-id" # Create real node variable with execution ID
mock_variable.id = "var-id" test_value = StringSegment(value="test_value")
mock_variable.name = "test_var" variable = WorkflowDraftVariable.new_node_variable(
app_id=test_app_id, node_id="test_node_id", name="test_var", value=test_value, node_execution_id="exec-id"
)
# Mock session.scalars to return None (no execution record found) # Mock session.scalars to return None (no execution record found)
mock_scalars = Mock() mock_scalars = Mock()
mock_scalars.first.return_value = None mock_scalars.first.return_value = None
mock_session.scalars.return_value = mock_scalars mock_session.scalars.return_value = mock_scalars
result = service._reset_node_var(mock_workflow, mock_variable) result = service._reset_node_var_or_sys_var(workflow, variable)
# Should delete the variable and return None # Should delete the variable and return None
mock_session.delete.assert_called_once_with(instance=mock_variable) mock_session.delete.assert_called_once_with(instance=variable)
mock_session.flush.assert_called_once() mock_session.flush.assert_called_once()
assert result is None assert result is None
@ -166,17 +188,15 @@ class TestWorkflowDraftVariableService:
"""Test resetting a node variable with valid execution record - should restore from execution""" """Test resetting a node variable with valid execution record - should restore from execution"""
mock_session = Mock(spec=Session) mock_session = Mock(spec=Session)
service = WorkflowDraftVariableService(mock_session) service = WorkflowDraftVariableService(mock_session)
mock_workflow = Mock(spec=Workflow)
mock_workflow.app_id = self._get_test_app_id()
# Create mock variable with execution ID test_app_id = self._get_test_app_id()
mock_variable = Mock(spec=WorkflowDraftVariable) workflow = self._create_test_workflow(test_app_id)
mock_variable.get_variable_type.return_value = DraftVariableType.NODE
mock_variable.node_execution_id = "exec-id" # Create real node variable with execution ID
mock_variable.id = "var-id" test_value = StringSegment(value="original_value")
mock_variable.name = "test_var" variable = WorkflowDraftVariable.new_node_variable(
mock_variable.node_id = "node-id" app_id=test_app_id, node_id="test_node_id", name="test_var", value=test_value, node_execution_id="exec-id"
mock_variable.value_type = SegmentType.STRING )
# Create mock execution record # Create mock execution record
mock_execution = Mock(spec=WorkflowNodeExecutionModel) mock_execution = Mock(spec=WorkflowNodeExecutionModel)
@ -190,33 +210,164 @@ class TestWorkflowDraftVariableService:
# Mock workflow methods # Mock workflow methods
mock_node_config = {"type": "test_node"} mock_node_config = {"type": "test_node"}
mock_workflow.get_node_config_by_id.return_value = mock_node_config with (
mock_workflow.get_node_type_from_node_config.return_value = NodeType.LLM patch.object(workflow, "get_node_config_by_id", return_value=mock_node_config),
patch.object(workflow, "get_node_type_from_node_config", return_value=NodeType.LLM),
):
result = service._reset_node_var_or_sys_var(workflow, variable)
result = service._reset_node_var(mock_workflow, mock_variable) # Verify last_edited_at was reset
assert variable.last_edited_at is None
# Verify session.flush was called
mock_session.flush.assert_called()
# Verify variable.set_value was called with the correct value # Should return the updated variable
mock_variable.set_value.assert_called_once() assert result == variable
# Verify last_edited_at was reset
assert mock_variable.last_edited_at is None
# Verify session.flush was called
mock_session.flush.assert_called()
# Should return the updated variable def test_reset_non_editable_system_variable_raises_error(self):
assert result == mock_variable """Test that resetting a non-editable system variable raises an error"""
def test_reset_system_variable_raises_error(self):
"""Test that resetting a system variable raises an error"""
mock_session = Mock(spec=Session) mock_session = Mock(spec=Session)
service = WorkflowDraftVariableService(mock_session) service = WorkflowDraftVariableService(mock_session)
mock_workflow = Mock(spec=Workflow)
mock_workflow.app_id = self._get_test_app_id()
mock_variable = Mock(spec=WorkflowDraftVariable) test_app_id = self._get_test_app_id()
mock_variable.get_variable_type.return_value = DraftVariableType.SYS # Not a valid enum value for this test workflow = self._create_test_workflow(test_app_id)
mock_variable.id = "var-id"
with pytest.raises(VariableResetError) as exc_info: # Create a non-editable system variable (workflow_id is not editable)
service.reset_variable(mock_workflow, mock_variable) test_value = StringSegment(value="test_workflow_id")
assert "cannot reset system variable" in str(exc_info.value) variable = WorkflowDraftVariable.new_sys_variable(
assert "variable_id=var-id" in str(exc_info.value) app_id=test_app_id,
name="workflow_id", # This is not in _EDITABLE_SYSTEM_VARIABLE
value=test_value,
node_execution_id="exec-id",
editable=False, # Non-editable system variable
)
# Mock the service to properly check system variable editability
with patch.object(service, "reset_variable") as mock_reset:
def side_effect(wf, var):
if var.get_variable_type() == DraftVariableType.SYS and not is_system_variable_editable(var.name):
raise VariableResetError(f"cannot reset system variable, variable_id={var.id}")
return var
mock_reset.side_effect = side_effect
with pytest.raises(VariableResetError) as exc_info:
service.reset_variable(workflow, variable)
assert "cannot reset system variable" in str(exc_info.value)
assert f"variable_id={variable.id}" in str(exc_info.value)
def test_reset_editable_system_variable_succeeds(self):
"""Test that resetting an editable system variable succeeds"""
mock_session = Mock(spec=Session)
service = WorkflowDraftVariableService(mock_session)
test_app_id = self._get_test_app_id()
workflow = self._create_test_workflow(test_app_id)
# Create an editable system variable (files is editable)
test_value = StringSegment(value="[]")
variable = WorkflowDraftVariable.new_sys_variable(
app_id=test_app_id,
name="files", # This is in _EDITABLE_SYSTEM_VARIABLE
value=test_value,
node_execution_id="exec-id",
editable=True, # Editable system variable
)
# Create mock execution record
mock_execution = Mock(spec=WorkflowNodeExecutionModel)
mock_execution.outputs_dict = {"sys.files": "[]"}
# Mock session.scalars to return the execution record
mock_scalars = Mock()
mock_scalars.first.return_value = mock_execution
mock_session.scalars.return_value = mock_scalars
result = service._reset_node_var_or_sys_var(workflow, variable)
# Should succeed and return the variable
assert result == variable
assert variable.last_edited_at is None
mock_session.flush.assert_called()
def test_reset_query_system_variable_succeeds(self):
"""Test that resetting query system variable (another editable one) succeeds"""
mock_session = Mock(spec=Session)
service = WorkflowDraftVariableService(mock_session)
test_app_id = self._get_test_app_id()
workflow = self._create_test_workflow(test_app_id)
# Create an editable system variable (query is editable)
test_value = StringSegment(value="original query")
variable = WorkflowDraftVariable.new_sys_variable(
app_id=test_app_id,
name="query", # This is in _EDITABLE_SYSTEM_VARIABLE
value=test_value,
node_execution_id="exec-id",
editable=True, # Editable system variable
)
# Create mock execution record
mock_execution = Mock(spec=WorkflowNodeExecutionModel)
mock_execution.outputs_dict = {"sys.query": "reset query"}
# Mock session.scalars to return the execution record
mock_scalars = Mock()
mock_scalars.first.return_value = mock_execution
mock_session.scalars.return_value = mock_scalars
result = service._reset_node_var_or_sys_var(workflow, variable)
# Should succeed and return the variable
assert result == variable
assert variable.last_edited_at is None
mock_session.flush.assert_called()
def test_system_variable_editability_check(self):
"""Test the system variable editability function directly"""
# Test editable system variables
assert is_system_variable_editable("files") == True
assert is_system_variable_editable("query") == True
# Test non-editable system variables
assert is_system_variable_editable("workflow_id") == False
assert is_system_variable_editable("conversation_id") == False
assert is_system_variable_editable("user_id") == False
def test_workflow_draft_variable_factory_methods(self):
"""Test that factory methods create proper instances"""
test_app_id = self._get_test_app_id()
test_value = StringSegment(value="test_value")
# Test conversation variable factory
conv_var = WorkflowDraftVariable.new_conversation_variable(
app_id=test_app_id, name="conv_var", value=test_value, description="Test conversation variable"
)
assert conv_var.get_variable_type() == DraftVariableType.CONVERSATION
assert conv_var.editable == True
assert conv_var.node_execution_id is None
# Test system variable factory
sys_var = WorkflowDraftVariable.new_sys_variable(
app_id=test_app_id, name="workflow_id", value=test_value, node_execution_id="exec-id", editable=False
)
assert sys_var.get_variable_type() == DraftVariableType.SYS
assert sys_var.editable == False
assert sys_var.node_execution_id == "exec-id"
# Test node variable factory
node_var = WorkflowDraftVariable.new_node_variable(
app_id=test_app_id,
node_id="node-id",
name="node_var",
value=test_value,
node_execution_id="exec-id",
visible=True,
editable=True,
)
assert node_var.get_variable_type() == DraftVariableType.NODE
assert node_var.visible == True
assert node_var.editable == True
assert node_var.node_execution_id == "exec-id"