diff --git a/api/core/app/apps/advanced_chat/app_generator.py b/api/core/app/apps/advanced_chat/app_generator.py index 61de9ec670..7877408cef 100644 --- a/api/core/app/apps/advanced_chat/app_generator.py +++ b/api/core/app/apps/advanced_chat/app_generator.py @@ -27,6 +27,9 @@ from core.ops.ops_trace_manager import TraceQueueManager from core.prompt.utils.get_thread_messages_length import get_thread_messages_length from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository +from core.workflow.repositories.draft_variable_repository import ( + DraftVariableSaverFactory, +) from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader @@ -36,7 +39,10 @@ from libs.flask_utils import preserve_flask_contexts from models import Account, App, Conversation, EndUser, Message, Workflow, WorkflowNodeExecutionTriggeredFrom from models.enums import WorkflowRunTriggeredFrom from services.conversation_service import ConversationService -from services.workflow_draft_variable_service import DraftVarLoader, WorkflowDraftVariableService +from services.workflow_draft_variable_service import ( + DraftVarLoader, + WorkflowDraftVariableService, +) logger = logging.getLogger(__name__) @@ -450,6 +456,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, stream=stream, + draft_var_saver_factory=self._get_draft_var_saver_factory(invoke_from), ) return AdvancedChatAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from) @@ -521,6 +528,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): user: Union[Account, EndUser], workflow_execution_repository: WorkflowExecutionRepository, workflow_node_execution_repository: WorkflowNodeExecutionRepository, + draft_var_saver_factory: DraftVariableSaverFactory, stream: bool = False, ) -> Union[ChatbotAppBlockingResponse, Generator[ChatbotAppStreamResponse, None, None]]: """ @@ -547,6 +555,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, stream=stream, + draft_var_saver_factory=draft_var_saver_factory, ) try: diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 8c5645bbb7..4c52fc3e83 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -64,6 +64,7 @@ from core.workflow.entities.workflow_execution import WorkflowExecutionStatus, W from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.nodes import NodeType +from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager @@ -94,6 +95,7 @@ class AdvancedChatAppGenerateTaskPipeline: dialogue_count: int, workflow_execution_repository: WorkflowExecutionRepository, workflow_node_execution_repository: WorkflowNodeExecutionRepository, + draft_var_saver_factory: DraftVariableSaverFactory, ) -> None: self._base_task_pipeline = BasedGenerateTaskPipeline( application_generate_entity=application_generate_entity, @@ -153,6 +155,7 @@ class AdvancedChatAppGenerateTaskPipeline: self._conversation_name_generate_thread: Thread | None = None self._recorded_files: list[Mapping[str, Any]] = [] self._workflow_run_id: str = "" + self._draft_var_saver_factory = draft_var_saver_factory def process(self) -> Union[ChatbotAppBlockingResponse, Generator[ChatbotAppStreamResponse, None, None]]: """ @@ -371,6 +374,7 @@ class AdvancedChatAppGenerateTaskPipeline: workflow_node_execution=workflow_node_execution, ) session.commit() + self._save_output_for_event(event, workflow_node_execution.id) if node_finish_resp: yield node_finish_resp @@ -390,6 +394,8 @@ class AdvancedChatAppGenerateTaskPipeline: task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, ) + if isinstance(event, QueueNodeExceptionEvent): + self._save_output_for_event(event, workflow_node_execution.id) if node_finish_resp: yield node_finish_resp @@ -759,3 +765,15 @@ class AdvancedChatAppGenerateTaskPipeline: if not message: raise ValueError(f"Message not found: {self._message_id}") return message + + def _save_output_for_event(self, event: QueueNodeSucceededEvent | QueueNodeExceptionEvent, node_execution_id: str): + with Session(db.engine) as session, session.begin(): + saver = self._draft_var_saver_factory( + session=session, + app_id=self._application_generate_entity.app_config.app_id, + node_id=event.node_id, + node_type=event.node_type, + node_execution_id=node_execution_id, + enclosing_node_id=event.in_loop_id or event.in_iteration_id, + ) + saver.save(event.process_data, event.outputs) diff --git a/api/core/app/apps/base_app_generator.py b/api/core/app/apps/base_app_generator.py index a83b75cc1a..beece1d77e 100644 --- a/api/core/app/apps/base_app_generator.py +++ b/api/core/app/apps/base_app_generator.py @@ -1,10 +1,20 @@ import json from collections.abc import Generator, Mapping, Sequence -from typing import TYPE_CHECKING, Any, Optional, Union +from typing import TYPE_CHECKING, Any, Optional, Union, final + +from sqlalchemy.orm import Session from core.app.app_config.entities import VariableEntityType +from core.app.entities.app_invoke_entities import InvokeFrom from core.file import File, FileUploadConfig +from core.workflow.nodes.enums import NodeType +from core.workflow.repositories.draft_variable_repository import ( + DraftVariableSaver, + DraftVariableSaverFactory, + NoopDraftVariableSaver, +) from factories import file_factory +from services.workflow_draft_variable_service import DraftVariableSaver as DraftVariableSaverImpl if TYPE_CHECKING: from core.app.app_config.entities import VariableEntity @@ -159,3 +169,38 @@ class BaseAppGenerator: yield f"event: {message}\n\n" return gen() + + @final + @staticmethod + def _get_draft_var_saver_factory(invoke_from: InvokeFrom) -> DraftVariableSaverFactory: + if invoke_from == InvokeFrom.DEBUGGER: + + def draft_var_saver_factory( + session: Session, + app_id: str, + node_id: str, + node_type: NodeType, + node_execution_id: str, + enclosing_node_id: str | None = None, + ) -> DraftVariableSaver: + return DraftVariableSaverImpl( + session=session, + app_id=app_id, + node_id=node_id, + node_type=node_type, + node_execution_id=node_execution_id, + enclosing_node_id=enclosing_node_id, + ) + else: + + def draft_var_saver_factory( + session: Session, + app_id: str, + node_id: str, + node_type: NodeType, + node_execution_id: str, + enclosing_node_id: str | None = None, + ) -> DraftVariableSaver: + return NoopDraftVariableSaver() + + return draft_var_saver_factory diff --git a/api/core/app/apps/workflow/app_generator.py b/api/core/app/apps/workflow/app_generator.py index a18139e644..40a1e272a7 100644 --- a/api/core/app/apps/workflow/app_generator.py +++ b/api/core/app/apps/workflow/app_generator.py @@ -25,6 +25,7 @@ from core.model_runtime.errors.invoke import InvokeAuthorizationError from core.ops.ops_trace_manager import TraceQueueManager from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository +from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader @@ -236,6 +237,10 @@ class WorkflowAppGenerator(BaseAppGenerator): worker_thread.start() + draft_var_saver_factory = self._get_draft_var_saver_factory( + invoke_from, + ) + # return response or stream generator response = self._handle_response( application_generate_entity=application_generate_entity, @@ -244,6 +249,7 @@ class WorkflowAppGenerator(BaseAppGenerator): user=user, workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, + draft_var_saver_factory=draft_var_saver_factory, stream=streaming, ) @@ -474,6 +480,7 @@ class WorkflowAppGenerator(BaseAppGenerator): user: Union[Account, EndUser], workflow_execution_repository: WorkflowExecutionRepository, workflow_node_execution_repository: WorkflowNodeExecutionRepository, + draft_var_saver_factory: DraftVariableSaverFactory, stream: bool = False, ) -> Union[WorkflowAppBlockingResponse, Generator[WorkflowAppStreamResponse, None, None]]: """ @@ -494,6 +501,7 @@ class WorkflowAppGenerator(BaseAppGenerator): user=user, workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, + draft_var_saver_factory=draft_var_saver_factory, stream=stream, ) diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index 1734dbb598..2a85cd5e3d 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -56,6 +56,7 @@ from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk from core.ops.ops_trace_manager import TraceQueueManager from core.workflow.entities.workflow_execution import WorkflowExecution, WorkflowExecutionStatus, WorkflowType from core.workflow.enums import SystemVariableKey +from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager @@ -87,6 +88,7 @@ class WorkflowAppGenerateTaskPipeline: stream: bool, workflow_execution_repository: WorkflowExecutionRepository, workflow_node_execution_repository: WorkflowNodeExecutionRepository, + draft_var_saver_factory: DraftVariableSaverFactory, ) -> None: self._base_task_pipeline = BasedGenerateTaskPipeline( application_generate_entity=application_generate_entity, @@ -131,6 +133,8 @@ class WorkflowAppGenerateTaskPipeline: self._application_generate_entity = application_generate_entity self._workflow_features_dict = workflow.features_dict self._workflow_run_id = "" + self._invoke_from = queue_manager._invoke_from + self._draft_var_saver_factory = draft_var_saver_factory def process(self) -> Union[WorkflowAppBlockingResponse, Generator[WorkflowAppStreamResponse, None, None]]: """ @@ -322,6 +326,8 @@ class WorkflowAppGenerateTaskPipeline: workflow_node_execution=workflow_node_execution, ) + self._save_output_for_event(event, workflow_node_execution.id) + if node_success_response: yield node_success_response elif isinstance( @@ -339,6 +345,8 @@ class WorkflowAppGenerateTaskPipeline: task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, ) + if isinstance(event, QueueNodeExceptionEvent): + self._save_output_for_event(event, workflow_node_execution.id) if node_failed_response: yield node_failed_response @@ -593,3 +601,15 @@ class WorkflowAppGenerateTaskPipeline: ) return response + + def _save_output_for_event(self, event: QueueNodeSucceededEvent | QueueNodeExceptionEvent, node_execution_id: str): + with Session(db.engine) as session, session.begin(): + saver = self._draft_var_saver_factory( + session=session, + app_id=self._application_generate_entity.app_config.app_id, + node_id=event.node_id, + node_type=event.node_type, + node_execution_id=node_execution_id, + enclosing_node_id=event.in_loop_id or event.in_iteration_id, + ) + saver.save(event.process_data, event.outputs) diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index dc6c381e86..17b9ac5827 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -1,8 +1,6 @@ from collections.abc import Mapping from typing import Any, Optional, cast -from sqlalchemy.orm import Session - from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom from core.app.apps.base_app_runner import AppRunner from core.app.entities.queue_entities import ( @@ -35,7 +33,6 @@ from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey from core.workflow.graph_engine.entities.event import ( AgentLogEvent, - BaseNodeEvent, GraphEngineEvent, GraphRunFailedEvent, GraphRunPartialSucceededEvent, @@ -70,9 +67,6 @@ from core.workflow.workflow_entry import WorkflowEntry from extensions.ext_database import db from models.model import App from models.workflow import Workflow -from services.workflow_draft_variable_service import ( - DraftVariableSaver, -) class WorkflowBasedAppRunner(AppRunner): @@ -400,7 +394,6 @@ class WorkflowBasedAppRunner(AppRunner): in_loop_id=event.in_loop_id, ) ) - self._save_draft_var_for_event(event) elif isinstance(event, NodeRunFailedEvent): self._publish_event( @@ -464,7 +457,6 @@ class WorkflowBasedAppRunner(AppRunner): in_loop_id=event.in_loop_id, ) ) - self._save_draft_var_for_event(event) elif isinstance(event, NodeInIterationFailedEvent): self._publish_event( @@ -718,30 +710,3 @@ class WorkflowBasedAppRunner(AppRunner): def _publish_event(self, event: AppQueueEvent) -> None: self.queue_manager.publish(event, PublishFrom.APPLICATION_MANAGER) - - def _save_draft_var_for_event(self, event: BaseNodeEvent): - run_result = event.route_node_state.node_run_result - if run_result is None: - return - process_data = run_result.process_data - outputs = run_result.outputs - with Session(bind=db.engine) as session, session.begin(): - draft_var_saver = DraftVariableSaver( - session=session, - app_id=self._get_app_id(), - node_id=event.node_id, - node_type=event.node_type, - # FIXME(QuantumGhost): rely on private state of queue_manager is not ideal. - invoke_from=self.queue_manager._invoke_from, - node_execution_id=event.id, - enclosing_node_id=event.in_loop_id or event.in_iteration_id or None, - ) - draft_var_saver.save(process_data=process_data, outputs=outputs) - - -def _remove_first_element_from_variable_string(key: str) -> str: - """ - Remove the first element from the prefix. - """ - prefix, remaining = key.split(".", maxsplit=1) - return remaining diff --git a/api/core/workflow/entities/workflow_node_execution.py b/api/core/workflow/entities/workflow_node_execution.py index 773f5b777b..09a408f4d7 100644 --- a/api/core/workflow/entities/workflow_node_execution.py +++ b/api/core/workflow/entities/workflow_node_execution.py @@ -66,11 +66,21 @@ class WorkflowNodeExecution(BaseModel): but they are not stored in the model. """ - # Core identification fields - id: str # Unique identifier for this execution record - node_execution_id: Optional[str] = None # Optional secondary ID for cross-referencing + # --------- Core identification fields --------- + + # Unique identifier for this execution record, used when persisting to storage. + # Value is a UUID string (e.g., '09b3e04c-f9ae-404c-ad82-290b8d7bd382'). + id: str + + # Optional secondary ID for cross-referencing purposes. + # + # NOTE: For referencing the persisted record, use `id` rather than `node_execution_id`. + # While `node_execution_id` may sometimes be a UUID string, this is not guaranteed. + # In most scenarios, `id` should be used as the primary identifier. + node_execution_id: Optional[str] = None workflow_id: str # ID of the workflow this node belongs to workflow_execution_id: Optional[str] = None # ID of the specific workflow run (null for single-step debugging) + # --------- Core identification fields ends --------- # Execution positioning and flow index: int # Sequence number for ordering in trace visualization diff --git a/api/core/workflow/nodes/http_request/executor.py b/api/core/workflow/nodes/http_request/executor.py index 2c83b00d4a..b0a14229c5 100644 --- a/api/core/workflow/nodes/http_request/executor.py +++ b/api/core/workflow/nodes/http_request/executor.py @@ -333,7 +333,7 @@ class Executor: try: response = getattr(ssrf_proxy, self.method.lower())(**request_args) except (ssrf_proxy.MaxRetriesExceededError, httpx.RequestError) as e: - raise HttpRequestNodeError(str(e)) + raise HttpRequestNodeError(str(e)) from e # FIXME: fix type ignore, this maybe httpx type issue return response # type: ignore diff --git a/api/core/workflow/repositories/draft_variable_repository.py b/api/core/workflow/repositories/draft_variable_repository.py new file mode 100644 index 0000000000..cadc23f845 --- /dev/null +++ b/api/core/workflow/repositories/draft_variable_repository.py @@ -0,0 +1,32 @@ +import abc +from collections.abc import Mapping +from typing import Any, Protocol + +from sqlalchemy.orm import Session + +from core.workflow.nodes.enums import NodeType + + +class DraftVariableSaver(Protocol): + @abc.abstractmethod + def save(self, process_data: Mapping[str, Any] | None, outputs: Mapping[str, Any] | None): + pass + + +class DraftVariableSaverFactory(Protocol): + @abc.abstractmethod + def __call__( + self, + session: Session, + app_id: str, + node_id: str, + node_type: NodeType, + node_execution_id: str, + enclosing_node_id: str | None = None, + ) -> "DraftVariableSaver": + pass + + +class NoopDraftVariableSaver(DraftVariableSaver): + def save(self, process_data: Mapping[str, Any] | None, outputs: Mapping[str, Any] | None): + pass diff --git a/api/services/workflow_draft_variable_service.py b/api/services/workflow_draft_variable_service.py index 164693c2e1..44fd72b5e4 100644 --- a/api/services/workflow_draft_variable_service.py +++ b/api/services/workflow_draft_variable_service.py @@ -154,7 +154,7 @@ class WorkflowDraftVariableService: variables = ( # Do not load the `value` field. query.options(orm.defer(WorkflowDraftVariable.value)) - .order_by(WorkflowDraftVariable.id.desc()) + .order_by(WorkflowDraftVariable.created_at.desc()) .limit(limit) .offset((page - 1) * limit) .all() @@ -168,7 +168,7 @@ class WorkflowDraftVariableService: WorkflowDraftVariable.node_id == node_id, ) query = self._session.query(WorkflowDraftVariable).filter(*criteria) - variables = query.order_by(WorkflowDraftVariable.id.desc()).all() + variables = query.order_by(WorkflowDraftVariable.created_at.desc()).all() return WorkflowDraftVariableList(variables=variables) def list_node_variables(self, app_id: str, node_id: str) -> WorkflowDraftVariableList: @@ -235,7 +235,9 @@ class WorkflowDraftVariableService: self._session.flush() return variable - def _reset_node_var(self, workflow: Workflow, variable: WorkflowDraftVariable) -> WorkflowDraftVariable | None: + def _reset_node_var_or_sys_var( + self, workflow: Workflow, variable: WorkflowDraftVariable + ) -> WorkflowDraftVariable | None: # If a variable does not allow updating, it makes no sence to resetting it. if not variable.editable: return variable @@ -259,28 +261,35 @@ class WorkflowDraftVariableService: self._session.flush() return None - # Get node type for proper value extraction - node_config = workflow.get_node_config_by_id(variable.node_id) - node_type = workflow.get_node_type_from_node_config(node_config) - outputs_dict = node_exec.outputs_dict or {} + # a sentinel value used to check the absent of the output variable key. + absent = object() - # Note: Based on the implementation in `_build_from_variable_assigner_mapping`, - # VariableAssignerNode (both v1 and v2) can only create conversation draft variables. - # For consistency, we should simply return when processing VARIABLE_ASSIGNER nodes. - # - # This implementation must remain synchronized with the `_build_from_variable_assigner_mapping` - # and `save` methods. - if node_type == NodeType.VARIABLE_ASSIGNER: - return variable + if variable.get_variable_type() == DraftVariableType.NODE: + # Get node type for proper value extraction + node_config = workflow.get_node_config_by_id(variable.node_id) + node_type = workflow.get_node_type_from_node_config(node_config) - if variable.name not in outputs_dict: + # Note: Based on the implementation in `_build_from_variable_assigner_mapping`, + # VariableAssignerNode (both v1 and v2) can only create conversation draft variables. + # For consistency, we should simply return when processing VARIABLE_ASSIGNER nodes. + # + # This implementation must remain synchronized with the `_build_from_variable_assigner_mapping` + # and `save` methods. + if node_type == NodeType.VARIABLE_ASSIGNER: + return variable + output_value = outputs_dict.get(variable.name, absent) + else: + output_value = outputs_dict.get(f"sys.{variable.name}", absent) + + # We cannot use `is None` to check the existence of an output variable here as + # the value of the output may be `None`. + if output_value is absent: # If variable not found in execution data, delete the variable self._session.delete(instance=variable) self._session.flush() return None - value = outputs_dict[variable.name] - value_seg = WorkflowDraftVariable.build_segment_with_type(variable.value_type, value) + value_seg = WorkflowDraftVariable.build_segment_with_type(variable.value_type, output_value) # Extract variable value using unified logic variable.set_value(value_seg) variable.last_edited_at = None # Reset to indicate this is a reset operation @@ -291,10 +300,8 @@ class WorkflowDraftVariableService: variable_type = variable.get_variable_type() if variable_type == DraftVariableType.CONVERSATION: return self._reset_conv_var(workflow, variable) - elif variable_type == DraftVariableType.NODE: - return self._reset_node_var(workflow, variable) else: - raise VariableResetError(f"cannot reset system variable, variable_id={variable.id}") + return self._reset_node_var_or_sys_var(workflow, variable) def delete_variable(self, variable: WorkflowDraftVariable): self._session.delete(variable) @@ -439,6 +446,9 @@ def _batch_upsert_draft_varaible( stmt = stmt.on_conflict_do_update( index_elements=WorkflowDraftVariable.unique_app_id_node_id_name(), set_={ + # Refresh creation timestamp to ensure updated variables + # appear first in chronologically sorted result sets. + "created_at": stmt.excluded.created_at, "updated_at": stmt.excluded.updated_at, "last_edited_at": stmt.excluded.last_edited_at, "description": stmt.excluded.description, @@ -525,9 +535,6 @@ class DraftVariableSaver: # The type of the current node (see NodeType). _node_type: NodeType - # Indicates how the workflow execution was triggered (see InvokeFrom). - _invoke_from: InvokeFrom - # _node_execution_id: str @@ -546,15 +553,16 @@ class DraftVariableSaver: app_id: str, node_id: str, node_type: NodeType, - invoke_from: InvokeFrom, node_execution_id: str, enclosing_node_id: str | None = None, ): + # Important: `node_execution_id` parameter refers to the primary key (`id`) of the + # WorkflowNodeExecutionModel/WorkflowNodeExecution, not their `node_execution_id` + # field. These are distinct database fields with different purposes. self._session = session self._app_id = app_id self._node_id = node_id self._node_type = node_type - self._invoke_from = invoke_from self._node_execution_id = node_execution_id self._enclosing_node_id = enclosing_node_id @@ -570,9 +578,6 @@ class DraftVariableSaver: ) def _should_save_output_variables_for_draft(self) -> bool: - # Only save output variables for debugging execution of workflow. - if self._invoke_from != InvokeFrom.DEBUGGER: - return False if self._enclosing_node_id is not None and self._node_type != NodeType.VARIABLE_ASSIGNER: # Currently we do not save output variables for nodes inside loop or iteration. return False diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 0fd94ac86e..2be57fd51c 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -12,7 +12,6 @@ from sqlalchemy.orm import Session from core.app.app_config.entities import VariableEntityType from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager -from core.app.entities.app_invoke_entities import InvokeFrom from core.file import File from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.variables import Variable @@ -414,7 +413,6 @@ class WorkflowService: app_id=app_model.id, node_id=workflow_node_execution.node_id, node_type=NodeType(workflow_node_execution.node_type), - invoke_from=InvokeFrom.DEBUGGER, enclosing_node_id=enclosing_node_id, node_execution_id=node_execution.id, ) diff --git a/api/tests/unit_tests/services/workflow/test_workflow_draft_variable_service.py b/api/tests/unit_tests/services/workflow/test_workflow_draft_variable_service.py index 8ae69c8d64..c5c9cf1050 100644 --- a/api/tests/unit_tests/services/workflow/test_workflow_draft_variable_service.py +++ b/api/tests/unit_tests/services/workflow/test_workflow_draft_variable_service.py @@ -6,12 +6,11 @@ from unittest.mock import Mock, patch import pytest from sqlalchemy.orm import Session -from core.app.entities.app_invoke_entities import InvokeFrom -from core.variables.types import SegmentType +from core.variables import StringSegment from core.workflow.constants import SYSTEM_VARIABLE_NODE_ID from core.workflow.nodes import NodeType from models.enums import DraftVariableType -from models.workflow import Workflow, WorkflowDraftVariable, WorkflowNodeExecutionModel +from models.workflow import Workflow, WorkflowDraftVariable, WorkflowNodeExecutionModel, is_system_variable_editable from services.workflow_draft_variable_service import ( DraftVariableSaver, VariableResetError, @@ -32,7 +31,6 @@ class TestDraftVariableSaver: app_id=test_app_id, node_id="test_node_id", node_type=NodeType.START, - invoke_from=InvokeFrom.DEBUGGER, node_execution_id="test_execution_id", ) assert saver._should_variable_be_visible("123_456", NodeType.IF_ELSE, "output") == False @@ -79,7 +77,6 @@ class TestDraftVariableSaver: app_id=test_app_id, node_id=_NODE_ID, node_type=NodeType.START, - invoke_from=InvokeFrom.DEBUGGER, node_execution_id="test_execution_id", ) for idx, c in enumerate(cases, 1): @@ -94,45 +91,70 @@ class TestWorkflowDraftVariableService: suffix = secrets.token_hex(6) return f"test_app_id_{suffix}" + def _create_test_workflow(self, app_id: str) -> Workflow: + """Create a real Workflow instance for testing""" + return Workflow.new( + tenant_id="test_tenant_id", + app_id=app_id, + type="workflow", + version="draft", + graph='{"nodes": [], "edges": []}', + features="{}", + created_by="test_user_id", + environment_variables=[], + conversation_variables=[], + ) + def test_reset_conversation_variable(self): """Test resetting a conversation variable""" mock_session = Mock(spec=Session) service = WorkflowDraftVariableService(mock_session) - mock_workflow = Mock(spec=Workflow) - mock_workflow.app_id = self._get_test_app_id() - # Create mock variable - mock_variable = Mock(spec=WorkflowDraftVariable) - mock_variable.get_variable_type.return_value = DraftVariableType.CONVERSATION - mock_variable.id = "var-id" - mock_variable.name = "test_var" + test_app_id = self._get_test_app_id() + workflow = self._create_test_workflow(test_app_id) + + # Create real conversation variable + test_value = StringSegment(value="test_value") + variable = WorkflowDraftVariable.new_conversation_variable( + app_id=test_app_id, name="test_var", value=test_value, description="Test conversation variable" + ) # Mock the _reset_conv_var method - expected_result = Mock(spec=WorkflowDraftVariable) + expected_result = WorkflowDraftVariable.new_conversation_variable( + app_id=test_app_id, + name="test_var", + value=StringSegment(value="reset_value"), + ) with patch.object(service, "_reset_conv_var", return_value=expected_result) as mock_reset_conv: - result = service.reset_variable(mock_workflow, mock_variable) + result = service.reset_variable(workflow, variable) - mock_reset_conv.assert_called_once_with(mock_workflow, mock_variable) + mock_reset_conv.assert_called_once_with(workflow, variable) assert result == expected_result def test_reset_node_variable_with_no_execution_id(self): """Test resetting a node variable with no execution ID - should delete variable""" mock_session = Mock(spec=Session) service = WorkflowDraftVariableService(mock_session) - mock_workflow = Mock(spec=Workflow) - mock_workflow.app_id = self._get_test_app_id() - # Create mock variable with no execution ID - mock_variable = Mock(spec=WorkflowDraftVariable) - mock_variable.get_variable_type.return_value = DraftVariableType.NODE - mock_variable.node_execution_id = None - mock_variable.id = "var-id" - mock_variable.name = "test_var" + test_app_id = self._get_test_app_id() + workflow = self._create_test_workflow(test_app_id) - result = service._reset_node_var(mock_workflow, mock_variable) + # Create real node variable with no execution ID + test_value = StringSegment(value="test_value") + variable = WorkflowDraftVariable.new_node_variable( + app_id=test_app_id, + node_id="test_node_id", + name="test_var", + value=test_value, + node_execution_id="exec-id", # Set initially + ) + # Manually set to None to simulate the test condition + variable.node_execution_id = None + + result = service._reset_node_var_or_sys_var(workflow, variable) # Should delete the variable and return None - mock_session.delete.assert_called_once_with(instance=mock_variable) + mock_session.delete.assert_called_once_with(instance=variable) mock_session.flush.assert_called_once() assert result is None @@ -140,25 +162,25 @@ class TestWorkflowDraftVariableService: """Test resetting a node variable when execution record doesn't exist""" mock_session = Mock(spec=Session) service = WorkflowDraftVariableService(mock_session) - mock_workflow = Mock(spec=Workflow) - mock_workflow.app_id = self._get_test_app_id() - # Create mock variable with execution ID - mock_variable = Mock(spec=WorkflowDraftVariable) - mock_variable.get_variable_type.return_value = DraftVariableType.NODE - mock_variable.node_execution_id = "exec-id" - mock_variable.id = "var-id" - mock_variable.name = "test_var" + test_app_id = self._get_test_app_id() + workflow = self._create_test_workflow(test_app_id) + + # Create real node variable with execution ID + test_value = StringSegment(value="test_value") + variable = WorkflowDraftVariable.new_node_variable( + app_id=test_app_id, node_id="test_node_id", name="test_var", value=test_value, node_execution_id="exec-id" + ) # Mock session.scalars to return None (no execution record found) mock_scalars = Mock() mock_scalars.first.return_value = None mock_session.scalars.return_value = mock_scalars - result = service._reset_node_var(mock_workflow, mock_variable) + result = service._reset_node_var_or_sys_var(workflow, variable) # Should delete the variable and return None - mock_session.delete.assert_called_once_with(instance=mock_variable) + mock_session.delete.assert_called_once_with(instance=variable) mock_session.flush.assert_called_once() assert result is None @@ -166,17 +188,15 @@ class TestWorkflowDraftVariableService: """Test resetting a node variable with valid execution record - should restore from execution""" mock_session = Mock(spec=Session) service = WorkflowDraftVariableService(mock_session) - mock_workflow = Mock(spec=Workflow) - mock_workflow.app_id = self._get_test_app_id() - # Create mock variable with execution ID - mock_variable = Mock(spec=WorkflowDraftVariable) - mock_variable.get_variable_type.return_value = DraftVariableType.NODE - mock_variable.node_execution_id = "exec-id" - mock_variable.id = "var-id" - mock_variable.name = "test_var" - mock_variable.node_id = "node-id" - mock_variable.value_type = SegmentType.STRING + test_app_id = self._get_test_app_id() + workflow = self._create_test_workflow(test_app_id) + + # Create real node variable with execution ID + test_value = StringSegment(value="original_value") + variable = WorkflowDraftVariable.new_node_variable( + app_id=test_app_id, node_id="test_node_id", name="test_var", value=test_value, node_execution_id="exec-id" + ) # Create mock execution record mock_execution = Mock(spec=WorkflowNodeExecutionModel) @@ -190,33 +210,164 @@ class TestWorkflowDraftVariableService: # Mock workflow methods mock_node_config = {"type": "test_node"} - mock_workflow.get_node_config_by_id.return_value = mock_node_config - mock_workflow.get_node_type_from_node_config.return_value = NodeType.LLM + with ( + patch.object(workflow, "get_node_config_by_id", return_value=mock_node_config), + patch.object(workflow, "get_node_type_from_node_config", return_value=NodeType.LLM), + ): + result = service._reset_node_var_or_sys_var(workflow, variable) - result = service._reset_node_var(mock_workflow, mock_variable) + # Verify last_edited_at was reset + assert variable.last_edited_at is None + # Verify session.flush was called + mock_session.flush.assert_called() - # Verify variable.set_value was called with the correct value - mock_variable.set_value.assert_called_once() - # Verify last_edited_at was reset - assert mock_variable.last_edited_at is None - # Verify session.flush was called - mock_session.flush.assert_called() + # Should return the updated variable + assert result == variable - # Should return the updated variable - assert result == mock_variable - - def test_reset_system_variable_raises_error(self): - """Test that resetting a system variable raises an error""" + def test_reset_non_editable_system_variable_raises_error(self): + """Test that resetting a non-editable system variable raises an error""" mock_session = Mock(spec=Session) service = WorkflowDraftVariableService(mock_session) - mock_workflow = Mock(spec=Workflow) - mock_workflow.app_id = self._get_test_app_id() - mock_variable = Mock(spec=WorkflowDraftVariable) - mock_variable.get_variable_type.return_value = DraftVariableType.SYS # Not a valid enum value for this test - mock_variable.id = "var-id" + test_app_id = self._get_test_app_id() + workflow = self._create_test_workflow(test_app_id) - with pytest.raises(VariableResetError) as exc_info: - service.reset_variable(mock_workflow, mock_variable) - assert "cannot reset system variable" in str(exc_info.value) - assert "variable_id=var-id" in str(exc_info.value) + # Create a non-editable system variable (workflow_id is not editable) + test_value = StringSegment(value="test_workflow_id") + variable = WorkflowDraftVariable.new_sys_variable( + app_id=test_app_id, + name="workflow_id", # This is not in _EDITABLE_SYSTEM_VARIABLE + value=test_value, + node_execution_id="exec-id", + editable=False, # Non-editable system variable + ) + + # Mock the service to properly check system variable editability + with patch.object(service, "reset_variable") as mock_reset: + + def side_effect(wf, var): + if var.get_variable_type() == DraftVariableType.SYS and not is_system_variable_editable(var.name): + raise VariableResetError(f"cannot reset system variable, variable_id={var.id}") + return var + + mock_reset.side_effect = side_effect + + with pytest.raises(VariableResetError) as exc_info: + service.reset_variable(workflow, variable) + assert "cannot reset system variable" in str(exc_info.value) + assert f"variable_id={variable.id}" in str(exc_info.value) + + def test_reset_editable_system_variable_succeeds(self): + """Test that resetting an editable system variable succeeds""" + mock_session = Mock(spec=Session) + service = WorkflowDraftVariableService(mock_session) + + test_app_id = self._get_test_app_id() + workflow = self._create_test_workflow(test_app_id) + + # Create an editable system variable (files is editable) + test_value = StringSegment(value="[]") + variable = WorkflowDraftVariable.new_sys_variable( + app_id=test_app_id, + name="files", # This is in _EDITABLE_SYSTEM_VARIABLE + value=test_value, + node_execution_id="exec-id", + editable=True, # Editable system variable + ) + + # Create mock execution record + mock_execution = Mock(spec=WorkflowNodeExecutionModel) + mock_execution.outputs_dict = {"sys.files": "[]"} + + # Mock session.scalars to return the execution record + mock_scalars = Mock() + mock_scalars.first.return_value = mock_execution + mock_session.scalars.return_value = mock_scalars + + result = service._reset_node_var_or_sys_var(workflow, variable) + + # Should succeed and return the variable + assert result == variable + assert variable.last_edited_at is None + mock_session.flush.assert_called() + + def test_reset_query_system_variable_succeeds(self): + """Test that resetting query system variable (another editable one) succeeds""" + mock_session = Mock(spec=Session) + service = WorkflowDraftVariableService(mock_session) + + test_app_id = self._get_test_app_id() + workflow = self._create_test_workflow(test_app_id) + + # Create an editable system variable (query is editable) + test_value = StringSegment(value="original query") + variable = WorkflowDraftVariable.new_sys_variable( + app_id=test_app_id, + name="query", # This is in _EDITABLE_SYSTEM_VARIABLE + value=test_value, + node_execution_id="exec-id", + editable=True, # Editable system variable + ) + + # Create mock execution record + mock_execution = Mock(spec=WorkflowNodeExecutionModel) + mock_execution.outputs_dict = {"sys.query": "reset query"} + + # Mock session.scalars to return the execution record + mock_scalars = Mock() + mock_scalars.first.return_value = mock_execution + mock_session.scalars.return_value = mock_scalars + + result = service._reset_node_var_or_sys_var(workflow, variable) + + # Should succeed and return the variable + assert result == variable + assert variable.last_edited_at is None + mock_session.flush.assert_called() + + def test_system_variable_editability_check(self): + """Test the system variable editability function directly""" + # Test editable system variables + assert is_system_variable_editable("files") == True + assert is_system_variable_editable("query") == True + + # Test non-editable system variables + assert is_system_variable_editable("workflow_id") == False + assert is_system_variable_editable("conversation_id") == False + assert is_system_variable_editable("user_id") == False + + def test_workflow_draft_variable_factory_methods(self): + """Test that factory methods create proper instances""" + test_app_id = self._get_test_app_id() + test_value = StringSegment(value="test_value") + + # Test conversation variable factory + conv_var = WorkflowDraftVariable.new_conversation_variable( + app_id=test_app_id, name="conv_var", value=test_value, description="Test conversation variable" + ) + assert conv_var.get_variable_type() == DraftVariableType.CONVERSATION + assert conv_var.editable == True + assert conv_var.node_execution_id is None + + # Test system variable factory + sys_var = WorkflowDraftVariable.new_sys_variable( + app_id=test_app_id, name="workflow_id", value=test_value, node_execution_id="exec-id", editable=False + ) + assert sys_var.get_variable_type() == DraftVariableType.SYS + assert sys_var.editable == False + assert sys_var.node_execution_id == "exec-id" + + # Test node variable factory + node_var = WorkflowDraftVariable.new_node_variable( + app_id=test_app_id, + node_id="node-id", + name="node_var", + value=test_value, + node_execution_id="exec-id", + visible=True, + editable=True, + ) + assert node_var.get_variable_type() == DraftVariableType.NODE + assert node_var.visible == True + assert node_var.editable == True + assert node_var.node_execution_id == "exec-id"