diff --git a/api/controllers/console/app/workflow_app_log.py b/api/controllers/console/app/workflow_app_log.py index c475aea9fc..b9579e2120 100644 --- a/api/controllers/console/app/workflow_app_log.py +++ b/api/controllers/console/app/workflow_app_log.py @@ -6,12 +6,12 @@ from sqlalchemy.orm import Session from controllers.console import api from controllers.console.app.wraps import get_app_model from controllers.console.wraps import account_initialization_required, setup_required +from core.workflow.entities.workflow_execution import WorkflowExecutionStatus from extensions.ext_database import db from fields.workflow_app_log_fields import workflow_app_log_pagination_fields from libs.login import login_required from models import App from models.model import AppMode -from models.workflow import WorkflowRunStatus from services.workflow_app_service import WorkflowAppService @@ -38,7 +38,7 @@ class WorkflowAppLogApi(Resource): parser.add_argument("limit", type=int_range(1, 100), default=20, location="args") args = parser.parse_args() - args.status = WorkflowRunStatus(args.status) if args.status else None + args.status = WorkflowExecutionStatus(args.status) if args.status else None if args.created_at__before: args.created_at__before = isoparse(args.created_at__before) diff --git a/api/controllers/service_api/app/workflow.py b/api/controllers/service_api/app/workflow.py index e9bb2b046a..df52b49424 100644 --- a/api/controllers/service_api/app/workflow.py +++ b/api/controllers/service_api/app/workflow.py @@ -24,12 +24,13 @@ from core.errors.error import ( QuotaExceededError, ) from core.model_runtime.errors.invoke import InvokeError +from core.workflow.entities.workflow_execution import WorkflowExecutionStatus from extensions.ext_database import db from fields.workflow_app_log_fields import workflow_app_log_pagination_fields from libs import helper from libs.helper import TimestampField from models.model import App, AppMode, EndUser -from models.workflow import WorkflowRun, WorkflowRunStatus +from models.workflow import WorkflowRun from services.app_generate_service import AppGenerateService from services.errors.llm import InvokeRateLimitError from services.workflow_app_service import WorkflowAppService @@ -138,7 +139,7 @@ class WorkflowAppLogApi(Resource): parser.add_argument("limit", type=int_range(1, 100), default=20, location="args") args = parser.parse_args() - args.status = WorkflowRunStatus(args.status) if args.status else None + args.status = WorkflowExecutionStatus(args.status) if args.status else None if args.created_at__before: args.created_at__before = isoparse(args.created_at__before) diff --git a/api/core/app/apps/advanced_chat/app_generator.py b/api/core/app/apps/advanced_chat/app_generator.py index fdd1a776f8..8c85f91d7e 100644 --- a/api/core/app/apps/advanced_chat/app_generator.py +++ b/api/core/app/apps/advanced_chat/app_generator.py @@ -27,8 +27,8 @@ from core.ops.ops_trace_manager import TraceQueueManager from core.prompt.utils.get_thread_messages_length import get_thread_messages_length from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository -from core.workflow.repository.workflow_execution_repository import WorkflowExecutionRepository -from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository +from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository +from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from extensions.ext_database import db from factories import file_factory from models import Account, App, Conversation, EndUser, Message, Workflow, WorkflowNodeExecutionTriggeredFrom diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 0a2401f953..ffce11187b 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -62,21 +62,19 @@ from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk from core.model_runtime.entities.llm_entities import LLMUsage from core.model_runtime.utils.encoders import jsonable_encoder from core.ops.ops_trace_manager import TraceQueueManager +from core.workflow.entities.workflow_execution import WorkflowExecutionStatus, WorkflowType from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.nodes import NodeType -from core.workflow.repository.workflow_execution_repository import WorkflowExecutionRepository -from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository -from core.workflow.workflow_cycle_manager import WorkflowCycleManager +from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository +from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository +from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager from events.message_event import message_was_created from extensions.ext_database import db from models import Conversation, EndUser, Message, MessageFile from models.account import Account from models.enums import CreatorUserRole -from models.workflow import ( - Workflow, - WorkflowRunStatus, -) +from models.workflow import Workflow logger = logging.getLogger(__name__) @@ -128,6 +126,12 @@ class AdvancedChatAppGenerateTaskPipeline: SystemVariableKey.WORKFLOW_ID: workflow.id, SystemVariableKey.WORKFLOW_RUN_ID: application_generate_entity.workflow_run_id, }, + workflow_info=CycleManagerWorkflowInfo( + workflow_id=workflow.id, + workflow_type=WorkflowType(workflow.type), + version=workflow.version, + graph_data=workflow.graph_dict, + ), workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, ) @@ -302,15 +306,12 @@ class AdvancedChatAppGenerateTaskPipeline: with Session(db.engine, expire_on_commit=False) as session: # init workflow run - workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start( - session=session, - workflow_id=self._workflow_id, - ) - self._workflow_run_id = workflow_execution.id + workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start() + self._workflow_run_id = workflow_execution.id_ message = self._get_message(session=session) if not message: raise ValueError(f"Message not found: {self._message_id}") - message.workflow_run_id = workflow_execution.id + message.workflow_run_id = workflow_execution.id_ workflow_start_resp = self._workflow_response_converter.workflow_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, @@ -550,7 +551,7 @@ class AdvancedChatAppGenerateTaskPipeline: workflow_run_id=self._workflow_run_id, total_tokens=graph_runtime_state.total_tokens, total_steps=graph_runtime_state.node_run_steps, - status=WorkflowRunStatus.FAILED, + status=WorkflowExecutionStatus.FAILED, error_message=event.error, conversation_id=self._conversation_id, trace_manager=trace_manager, @@ -576,7 +577,7 @@ class AdvancedChatAppGenerateTaskPipeline: workflow_run_id=self._workflow_run_id, total_tokens=graph_runtime_state.total_tokens, total_steps=graph_runtime_state.node_run_steps, - status=WorkflowRunStatus.STOPPED, + status=WorkflowExecutionStatus.STOPPED, error_message=event.get_stop_reason(), conversation_id=self._conversation_id, trace_manager=trace_manager, diff --git a/api/core/app/apps/common/workflow_response_converter.py b/api/core/app/apps/common/workflow_response_converter.py index 7669bf74bb..6df25b20e3 100644 --- a/api/core/app/apps/common/workflow_response_converter.py +++ b/api/core/app/apps/common/workflow_response_converter.py @@ -44,15 +44,14 @@ from core.app.entities.task_entities import ( ) from core.file import FILE_MODEL_IDENTITY, File from core.tools.tool_manager import ToolManager -from core.workflow.entities.node_execution_entities import NodeExecution -from core.workflow.entities.workflow_execution_entities import WorkflowExecution +from core.workflow.entities.workflow_execution import WorkflowExecution +from core.workflow.entities.workflow_node_execution import NodeExecution, WorkflowNodeExecutionStatus from core.workflow.nodes import NodeType from core.workflow.nodes.tool.entities import ToolNodeData from models import ( Account, CreatorUserRole, EndUser, - WorkflowNodeExecutionStatus, WorkflowRun, ) @@ -73,11 +72,10 @@ class WorkflowResponseConverter: ) -> WorkflowStartStreamResponse: return WorkflowStartStreamResponse( task_id=task_id, - workflow_run_id=workflow_execution.id, + workflow_run_id=workflow_execution.id_, data=WorkflowStartStreamResponse.Data( - id=workflow_execution.id, + id=workflow_execution.id_, workflow_id=workflow_execution.workflow_id, - sequence_number=workflow_execution.sequence_number, inputs=workflow_execution.inputs, created_at=int(workflow_execution.started_at.timestamp()), ), @@ -91,7 +89,7 @@ class WorkflowResponseConverter: workflow_execution: WorkflowExecution, ) -> WorkflowFinishStreamResponse: created_by = None - workflow_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == workflow_execution.id)) + workflow_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == workflow_execution.id_)) assert workflow_run is not None if workflow_run.created_by_role == CreatorUserRole.ACCOUNT: stmt = select(Account).where(Account.id == workflow_run.created_by) @@ -122,11 +120,10 @@ class WorkflowResponseConverter: return WorkflowFinishStreamResponse( task_id=task_id, - workflow_run_id=workflow_execution.id, + workflow_run_id=workflow_execution.id_, data=WorkflowFinishStreamResponse.Data( - id=workflow_execution.id, + id=workflow_execution.id_, workflow_id=workflow_execution.workflow_id, - sequence_number=workflow_execution.sequence_number, status=workflow_execution.status, outputs=workflow_execution.outputs, error=workflow_execution.error_message, diff --git a/api/core/app/apps/workflow/app_generator.py b/api/core/app/apps/workflow/app_generator.py index 6ea90e5a3d..f4aec3479b 100644 --- a/api/core/app/apps/workflow/app_generator.py +++ b/api/core/app/apps/workflow/app_generator.py @@ -25,8 +25,8 @@ from core.model_runtime.errors.invoke import InvokeAuthorizationError from core.ops.ops_trace_manager import TraceQueueManager from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository -from core.workflow.repository.workflow_execution_repository import WorkflowExecutionRepository -from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository +from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository +from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from extensions.ext_database import db from factories import file_factory from models import Account, App, EndUser, Workflow, WorkflowNodeExecutionTriggeredFrom @@ -132,7 +132,7 @@ class WorkflowAppGenerator(BaseAppGenerator): invoke_from=invoke_from, call_depth=call_depth, trace_manager=trace_manager, - workflow_run_id=workflow_run_id, + workflow_execution_id=workflow_run_id, ) contexts.plugin_tool_providers.set({}) @@ -279,7 +279,7 @@ class WorkflowAppGenerator(BaseAppGenerator): single_iteration_run=WorkflowAppGenerateEntity.SingleIterationRunEntity( node_id=node_id, inputs=args["inputs"] ), - workflow_run_id=str(uuid.uuid4()), + workflow_execution_id=str(uuid.uuid4()), ) contexts.plugin_tool_providers.set({}) contexts.plugin_tool_providers_lock.set(threading.Lock()) @@ -355,7 +355,7 @@ class WorkflowAppGenerator(BaseAppGenerator): invoke_from=InvokeFrom.DEBUGGER, extras={"auto_generate_conversation_name": False}, single_loop_run=WorkflowAppGenerateEntity.SingleLoopRunEntity(node_id=node_id, inputs=args["inputs"]), - workflow_run_id=str(uuid.uuid4()), + workflow_execution_id=str(uuid.uuid4()), ) contexts.plugin_tool_providers.set({}) contexts.plugin_tool_providers_lock.set(threading.Lock()) diff --git a/api/core/app/apps/workflow/app_runner.py b/api/core/app/apps/workflow/app_runner.py index b38ee18ac4..c93a49b7e4 100644 --- a/api/core/app/apps/workflow/app_runner.py +++ b/api/core/app/apps/workflow/app_runner.py @@ -95,7 +95,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner): SystemVariableKey.USER_ID: user_id, SystemVariableKey.APP_ID: app_config.app_id, SystemVariableKey.WORKFLOW_ID: app_config.workflow_id, - SystemVariableKey.WORKFLOW_RUN_ID: self.application_generate_entity.workflow_run_id, + SystemVariableKey.WORKFLOW_RUN_ID: self.application_generate_entity.workflow_execution_id, } variable_pool = VariablePool( diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index 0291f49cac..e678774fae 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -55,11 +55,11 @@ from core.app.entities.task_entities import ( from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk from core.ops.ops_trace_manager import TraceQueueManager -from core.workflow.entities.workflow_execution_entities import WorkflowExecution +from core.workflow.entities.workflow_execution import WorkflowExecution, WorkflowExecutionStatus, WorkflowType from core.workflow.enums import SystemVariableKey -from core.workflow.repository.workflow_execution_repository import WorkflowExecutionRepository -from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository -from core.workflow.workflow_cycle_manager import WorkflowCycleManager +from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository +from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository +from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager from extensions.ext_database import db from models.account import Account from models.enums import CreatorUserRole @@ -69,7 +69,6 @@ from models.workflow import ( WorkflowAppLog, WorkflowAppLogCreatedFrom, WorkflowRun, - WorkflowRunStatus, ) logger = logging.getLogger(__name__) @@ -114,8 +113,14 @@ class WorkflowAppGenerateTaskPipeline: SystemVariableKey.USER_ID: user_session_id, SystemVariableKey.APP_ID: application_generate_entity.app_config.app_id, SystemVariableKey.WORKFLOW_ID: workflow.id, - SystemVariableKey.WORKFLOW_RUN_ID: application_generate_entity.workflow_run_id, + SystemVariableKey.WORKFLOW_RUN_ID: application_generate_entity.workflow_execution_id, }, + workflow_info=CycleManagerWorkflowInfo( + workflow_id=workflow.id, + workflow_type=WorkflowType(workflow.type), + version=workflow.version, + graph_data=workflow.graph_dict, + ), workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, ) @@ -266,17 +271,13 @@ class WorkflowAppGenerateTaskPipeline: # override graph runtime state graph_runtime_state = event.graph_runtime_state - with Session(db.engine, expire_on_commit=False) as session: - # init workflow run - workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start( - session=session, - workflow_id=self._workflow_id, - ) - self._workflow_run_id = workflow_execution.id - start_resp = self._workflow_response_converter.workflow_start_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution=workflow_execution, - ) + # init workflow run + workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start() + self._workflow_run_id = workflow_execution.id_ + start_resp = self._workflow_response_converter.workflow_start_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution=workflow_execution, + ) yield start_resp elif isinstance( @@ -511,9 +512,9 @@ class WorkflowAppGenerateTaskPipeline: workflow_run_id=self._workflow_run_id, total_tokens=graph_runtime_state.total_tokens, total_steps=graph_runtime_state.node_run_steps, - status=WorkflowRunStatus.FAILED + status=WorkflowExecutionStatus.FAILED if isinstance(event, QueueWorkflowFailedEvent) - else WorkflowRunStatus.STOPPED, + else WorkflowExecutionStatus.STOPPED, error_message=event.error if isinstance(event, QueueWorkflowFailedEvent) else event.get_stop_reason(), @@ -557,7 +558,7 @@ class WorkflowAppGenerateTaskPipeline: tts_publisher.publish(None) def _save_workflow_app_log(self, *, session: Session, workflow_execution: WorkflowExecution) -> None: - workflow_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == workflow_execution.id)) + workflow_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == workflow_execution.id_)) assert workflow_run is not None invoke_from = self._application_generate_entity.invoke_from if invoke_from == InvokeFrom.SERVICE_API: diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index 0884fac4a9..613bd8e8fc 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -29,8 +29,8 @@ from core.app.entities.queue_entities import ( QueueWorkflowStartedEvent, QueueWorkflowSucceededEvent, ) -from core.workflow.entities.node_entities import NodeRunMetadataKey from core.workflow.entities.variable_pool import VariablePool +from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey from core.workflow.graph_engine.entities.event import ( AgentLogEvent, GraphEngineEvent, diff --git a/api/core/app/entities/app_invoke_entities.py b/api/core/app/entities/app_invoke_entities.py index 56e6b46a60..c0d99693b0 100644 --- a/api/core/app/entities/app_invoke_entities.py +++ b/api/core/app/entities/app_invoke_entities.py @@ -76,6 +76,8 @@ class AppGenerateEntity(BaseModel): App Generate Entity. """ + model_config = ConfigDict(arbitrary_types_allowed=True) + task_id: str # app config @@ -99,9 +101,6 @@ class AppGenerateEntity(BaseModel): # tracing instance trace_manager: Optional[TraceQueueManager] = None - class Config: - arbitrary_types_allowed = True - class EasyUIBasedAppGenerateEntity(AppGenerateEntity): """ @@ -205,7 +204,7 @@ class WorkflowAppGenerateEntity(AppGenerateEntity): # app config app_config: WorkflowUIBasedAppConfig - workflow_run_id: str + workflow_execution_id: str class SingleIterationRunEntity(BaseModel): """ diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index 7228020e9b..e4ff123134 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -6,7 +6,8 @@ from typing import Any, Optional from pydantic import BaseModel from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk -from core.workflow.entities.node_entities import AgentNodeStrategyInit, NodeRunMetadataKey +from core.workflow.entities.node_entities import AgentNodeStrategyInit +from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.nodes import NodeType from core.workflow.nodes.base import BaseNodeData diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index 9b2bfcbf61..39b530cdfe 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -6,8 +6,8 @@ from pydantic import BaseModel, ConfigDict from core.model_runtime.entities.llm_entities import LLMResult from core.model_runtime.utils.encoders import jsonable_encoder -from core.workflow.entities.node_entities import AgentNodeStrategyInit, NodeRunMetadataKey -from models.workflow import WorkflowNodeExecutionStatus +from core.workflow.entities.node_entities import AgentNodeStrategyInit +from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey, WorkflowNodeExecutionStatus class TaskState(BaseModel): @@ -189,7 +189,6 @@ class WorkflowStartStreamResponse(StreamResponse): id: str workflow_id: str - sequence_number: int inputs: Mapping[str, Any] created_at: int @@ -210,7 +209,6 @@ class WorkflowFinishStreamResponse(StreamResponse): id: str workflow_id: str - sequence_number: int status: str outputs: Optional[Mapping[str, Any]] = None error: Optional[str] = None diff --git a/api/core/ops/entities/trace_entity.py b/api/core/ops/entities/trace_entity.py index f0e34c0cd7..151fa2aaf4 100644 --- a/api/core/ops/entities/trace_entity.py +++ b/api/core/ops/entities/trace_entity.py @@ -3,7 +3,7 @@ from datetime import datetime from enum import StrEnum from typing import Any, Optional, Union -from pydantic import BaseModel, ConfigDict, field_validator +from pydantic import BaseModel, ConfigDict, field_serializer, field_validator class BaseTraceInfo(BaseModel): @@ -24,10 +24,13 @@ class BaseTraceInfo(BaseModel): return v return "" - class Config: - json_encoders = { - datetime: lambda v: v.isoformat(), - } + model_config = ConfigDict(protected_namespaces=()) + + @field_serializer("start_time", "end_time") + def serialize_datetime(self, dt: datetime | None) -> str | None: + if dt is None: + return None + return dt.isoformat() class WorkflowTraceInfo(BaseTraceInfo): diff --git a/api/core/ops/langsmith_trace/langsmith_trace.py b/api/core/ops/langsmith_trace/langsmith_trace.py index 6631727c79..43b866e1af 100644 --- a/api/core/ops/langsmith_trace/langsmith_trace.py +++ b/api/core/ops/langsmith_trace/langsmith_trace.py @@ -28,7 +28,7 @@ from core.ops.langsmith_trace.entities.langsmith_trace_entity import ( ) from core.ops.utils import filter_none_values, generate_dotted_order from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository -from core.workflow.entities.node_entities import NodeRunMetadataKey +from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey from core.workflow.nodes.enums import NodeType from extensions.ext_database import db from models import Account, App, EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom diff --git a/api/core/ops/opik_trace/opik_trace.py b/api/core/ops/opik_trace/opik_trace.py index 6c159a4831..7d68dca831 100644 --- a/api/core/ops/opik_trace/opik_trace.py +++ b/api/core/ops/opik_trace/opik_trace.py @@ -22,7 +22,7 @@ from core.ops.entities.trace_entity import ( WorkflowTraceInfo, ) from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository -from core.workflow.entities.node_entities import NodeRunMetadataKey +from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey from core.workflow.nodes.enums import NodeType from extensions.ext_database import db from models import Account, App, EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom diff --git a/api/core/ops/ops_trace_manager.py b/api/core/ops/ops_trace_manager.py index 32301e11e7..addf164e6f 100644 --- a/api/core/ops/ops_trace_manager.py +++ b/api/core/ops/ops_trace_manager.py @@ -30,7 +30,7 @@ from core.ops.entities.trace_entity import ( WorkflowTraceInfo, ) from core.ops.utils import get_message_data -from core.workflow.entities.workflow_execution_entities import WorkflowExecution +from core.workflow.entities.workflow_execution import WorkflowExecution from extensions.ext_database import db from extensions.ext_storage import storage from models.model import App, AppModelConfig, Conversation, Message, MessageFile, TraceAppConfig @@ -386,7 +386,7 @@ class TraceTask: ): self.trace_type = trace_type self.message_id = message_id - self.workflow_run_id = workflow_execution.id if workflow_execution else None + self.workflow_run_id = workflow_execution.id_ if workflow_execution else None self.conversation_id = conversation_id self.user_id = user_id self.timer = timer diff --git a/api/core/ops/weave_trace/weave_trace.py b/api/core/ops/weave_trace/weave_trace.py index a4f38dfbba..d1bd97176e 100644 --- a/api/core/ops/weave_trace/weave_trace.py +++ b/api/core/ops/weave_trace/weave_trace.py @@ -23,7 +23,7 @@ from core.ops.entities.trace_entity import ( ) from core.ops.weave_trace.entities.weave_trace_entity import WeaveTraceModel from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository -from core.workflow.entities.node_entities import NodeRunMetadataKey +from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey from core.workflow.nodes.enums import NodeType from extensions.ext_database import db from models import Account, App, EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom diff --git a/api/core/rag/extractor/entity/extract_setting.py b/api/core/rag/extractor/entity/extract_setting.py index 7c00c668dd..1593ad1475 100644 --- a/api/core/rag/extractor/entity/extract_setting.py +++ b/api/core/rag/extractor/entity/extract_setting.py @@ -27,6 +27,8 @@ class WebsiteInfo(BaseModel): website import info. """ + model_config = ConfigDict(arbitrary_types_allowed=True) + provider: str job_id: str url: str @@ -34,12 +36,6 @@ class WebsiteInfo(BaseModel): tenant_id: str only_main_content: bool = False - class Config: - arbitrary_types_allowed = True - - def __init__(self, **data) -> None: - super().__init__(**data) - class ExtractSetting(BaseModel): """ diff --git a/api/core/rag/models/document.py b/api/core/rag/models/document.py index 421cdc05df..04a3428ad8 100644 --- a/api/core/rag/models/document.py +++ b/api/core/rag/models/document.py @@ -45,13 +45,12 @@ class BaseDocumentTransformer(ABC): .. code-block:: python class EmbeddingsRedundantFilter(BaseDocumentTransformer, BaseModel): + model_config = ConfigDict(arbitrary_types_allowed=True) + embeddings: Embeddings similarity_fn: Callable = cosine_similarity similarity_threshold: float = 0.95 - class Config: - arbitrary_types_allowed = True - def transform_documents( self, documents: Sequence[Document], **kwargs: Any ) -> Sequence[Document]: diff --git a/api/core/repositories/sqlalchemy_workflow_execution_repository.py b/api/core/repositories/sqlalchemy_workflow_execution_repository.py index c1a71b45d0..19086cffff 100644 --- a/api/core/repositories/sqlalchemy_workflow_execution_repository.py +++ b/api/core/repositories/sqlalchemy_workflow_execution_repository.py @@ -10,12 +10,12 @@ from sqlalchemy import select from sqlalchemy.engine import Engine from sqlalchemy.orm import sessionmaker -from core.workflow.entities.workflow_execution_entities import ( +from core.workflow.entities.workflow_execution import ( WorkflowExecution, WorkflowExecutionStatus, WorkflowType, ) -from core.workflow.repository.workflow_execution_repository import WorkflowExecutionRepository +from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from models import ( Account, CreatorUserRole, @@ -104,10 +104,9 @@ class SQLAlchemyWorkflowExecutionRepository(WorkflowExecutionRepository): status = WorkflowExecutionStatus(db_model.status) return WorkflowExecution( - id=db_model.id, + id_=db_model.id, workflow_id=db_model.workflow_id, - sequence_number=db_model.sequence_number, - type=WorkflowType(db_model.type), + workflow_type=WorkflowType(db_model.type), workflow_version=db_model.version, graph=graph, inputs=inputs, @@ -140,14 +139,29 @@ class SQLAlchemyWorkflowExecutionRepository(WorkflowExecutionRepository): raise ValueError("created_by_role is required in repository constructor") db_model = WorkflowRun() - db_model.id = domain_model.id + db_model.id = domain_model.id_ db_model.tenant_id = self._tenant_id if self._app_id is not None: db_model.app_id = self._app_id db_model.workflow_id = domain_model.workflow_id db_model.triggered_from = self._triggered_from - db_model.sequence_number = domain_model.sequence_number - db_model.type = domain_model.type + + # Check if this is a new record + with self._session_factory() as session: + existing = session.scalar(select(WorkflowRun).where(WorkflowRun.id == domain_model.id_)) + if not existing: + # For new records, get the next sequence number + stmt = select(WorkflowRun.sequence_number).where( + WorkflowRun.app_id == self._app_id, + WorkflowRun.tenant_id == self._tenant_id, + ) + max_sequence = session.scalar(stmt.order_by(WorkflowRun.sequence_number.desc())) + db_model.sequence_number = (max_sequence or 0) + 1 + else: + # For updates, keep the existing sequence number + db_model.sequence_number = existing.sequence_number + + db_model.type = domain_model.workflow_type db_model.version = domain_model.workflow_version db_model.graph = json.dumps(domain_model.graph) if domain_model.graph else None db_model.inputs = json.dumps(domain_model.inputs) if domain_model.inputs else None diff --git a/api/core/repositories/sqlalchemy_workflow_node_execution_repository.py b/api/core/repositories/sqlalchemy_workflow_node_execution_repository.py index 8d916a19db..ee4465db5d 100644 --- a/api/core/repositories/sqlalchemy_workflow_node_execution_repository.py +++ b/api/core/repositories/sqlalchemy_workflow_node_execution_repository.py @@ -12,19 +12,18 @@ from sqlalchemy.engine import Engine from sqlalchemy.orm import sessionmaker from core.model_runtime.utils.encoders import jsonable_encoder -from core.workflow.entities.node_entities import NodeRunMetadataKey -from core.workflow.entities.node_execution_entities import ( +from core.workflow.entities.workflow_node_execution import ( NodeExecution, - NodeExecutionStatus, + NodeRunMetadataKey, + WorkflowNodeExecutionStatus, ) from core.workflow.nodes.enums import NodeType -from core.workflow.repository.workflow_node_execution_repository import OrderConfig, WorkflowNodeExecutionRepository +from core.workflow.repositories.workflow_node_execution_repository import OrderConfig, WorkflowNodeExecutionRepository from models import ( Account, CreatorUserRole, EndUser, WorkflowNodeExecution, - WorkflowNodeExecutionStatus, WorkflowNodeExecutionTriggeredFrom, ) @@ -106,7 +105,7 @@ class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository) metadata = {NodeRunMetadataKey(k): v for k, v in db_model.execution_metadata_dict.items()} # Convert status to domain enum - status = NodeExecutionStatus(db_model.status) + status = WorkflowNodeExecutionStatus(db_model.status) return NodeExecution( id=db_model.id, diff --git a/api/core/workflow/entities/node_entities.py b/api/core/workflow/entities/node_entities.py index 82fd6cdc30..6d01028ffc 100644 --- a/api/core/workflow/entities/node_entities.py +++ b/api/core/workflow/entities/node_entities.py @@ -1,36 +1,10 @@ from collections.abc import Mapping -from enum import StrEnum from typing import Any, Optional from pydantic import BaseModel from core.model_runtime.entities.llm_entities import LLMUsage -from models.workflow import WorkflowNodeExecutionStatus - - -class NodeRunMetadataKey(StrEnum): - """ - Node Run Metadata Key. - """ - - TOTAL_TOKENS = "total_tokens" - TOTAL_PRICE = "total_price" - CURRENCY = "currency" - TOOL_INFO = "tool_info" - AGENT_LOG = "agent_log" - ITERATION_ID = "iteration_id" - ITERATION_INDEX = "iteration_index" - LOOP_ID = "loop_id" - LOOP_INDEX = "loop_index" - PARALLEL_ID = "parallel_id" - PARALLEL_START_NODE_ID = "parallel_start_node_id" - PARENT_PARALLEL_ID = "parent_parallel_id" - PARENT_PARALLEL_START_NODE_ID = "parent_parallel_start_node_id" - PARALLEL_MODE_RUN_ID = "parallel_mode_run_id" - ITERATION_DURATION_MAP = "iteration_duration_map" # single iteration duration if iteration node runs - LOOP_DURATION_MAP = "loop_duration_map" # single loop duration if loop node runs - ERROR_STRATEGY = "error_strategy" # node in continue on error mode return the field - LOOP_VARIABLE_MAP = "loop_variable_map" # single loop variable output +from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey, WorkflowNodeExecutionStatus class NodeRunResult(BaseModel): diff --git a/api/core/workflow/entities/workflow_execution_entities.py b/api/core/workflow/entities/workflow_execution.py similarity index 89% rename from api/core/workflow/entities/workflow_execution_entities.py rename to api/core/workflow/entities/workflow_execution.py index 200d4697b5..781be4b3c6 100644 --- a/api/core/workflow/entities/workflow_execution_entities.py +++ b/api/core/workflow/entities/workflow_execution.py @@ -36,12 +36,10 @@ class WorkflowExecution(BaseModel): user, tenant, and app attributes. """ - id: str = Field(...) + id_: str = Field(...) workflow_id: str = Field(...) workflow_version: str = Field(...) - sequence_number: int = Field(...) - - type: WorkflowType = Field(...) + workflow_type: WorkflowType = Field(...) graph: Mapping[str, Any] = Field(...) inputs: Mapping[str, Any] = Field(...) @@ -69,20 +67,18 @@ class WorkflowExecution(BaseModel): def new( cls, *, - id: str, + id_: str, workflow_id: str, - sequence_number: int, - type: WorkflowType, + workflow_type: WorkflowType, workflow_version: str, graph: Mapping[str, Any], inputs: Mapping[str, Any], started_at: datetime, ) -> "WorkflowExecution": return WorkflowExecution( - id=id, + id_=id_, workflow_id=workflow_id, - sequence_number=sequence_number, - type=type, + workflow_type=workflow_type, workflow_version=workflow_version, graph=graph, inputs=inputs, diff --git a/api/core/workflow/entities/node_execution_entities.py b/api/core/workflow/entities/workflow_node_execution.py similarity index 75% rename from api/core/workflow/entities/node_execution_entities.py rename to api/core/workflow/entities/workflow_node_execution.py index 5e5ead062f..dccb6b1539 100644 --- a/api/core/workflow/entities/node_execution_entities.py +++ b/api/core/workflow/entities/workflow_node_execution.py @@ -13,11 +13,35 @@ from typing import Any, Optional from pydantic import BaseModel, Field -from core.workflow.entities.node_entities import NodeRunMetadataKey from core.workflow.nodes.enums import NodeType -class NodeExecutionStatus(StrEnum): +class NodeRunMetadataKey(StrEnum): + """ + Node Run Metadata Key. + """ + + TOTAL_TOKENS = "total_tokens" + TOTAL_PRICE = "total_price" + CURRENCY = "currency" + TOOL_INFO = "tool_info" + AGENT_LOG = "agent_log" + ITERATION_ID = "iteration_id" + ITERATION_INDEX = "iteration_index" + LOOP_ID = "loop_id" + LOOP_INDEX = "loop_index" + PARALLEL_ID = "parallel_id" + PARALLEL_START_NODE_ID = "parallel_start_node_id" + PARENT_PARALLEL_ID = "parent_parallel_id" + PARENT_PARALLEL_START_NODE_ID = "parent_parallel_start_node_id" + PARALLEL_MODE_RUN_ID = "parallel_mode_run_id" + ITERATION_DURATION_MAP = "iteration_duration_map" # single iteration duration if iteration node runs + LOOP_DURATION_MAP = "loop_duration_map" # single loop duration if loop node runs + ERROR_STRATEGY = "error_strategy" # node in continue on error mode return the field + LOOP_VARIABLE_MAP = "loop_variable_map" # single loop variable output + + +class WorkflowNodeExecutionStatus(StrEnum): """ Node Execution Status Enum. """ @@ -61,7 +85,7 @@ class NodeExecution(BaseModel): outputs: Optional[Mapping[str, Any]] = None # Output variables produced by this node # Execution state - status: NodeExecutionStatus = NodeExecutionStatus.RUNNING # Current execution status + status: WorkflowNodeExecutionStatus = WorkflowNodeExecutionStatus.RUNNING # Current execution status error: Optional[str] = None # Error message if execution failed elapsed_time: float = Field(default=0.0) # Time taken for execution in seconds diff --git a/api/core/workflow/graph_engine/entities/runtime_route_state.py b/api/core/workflow/graph_engine/entities/runtime_route_state.py index 7683dcc9dc..f2d9c98936 100644 --- a/api/core/workflow/graph_engine/entities/runtime_route_state.py +++ b/api/core/workflow/graph_engine/entities/runtime_route_state.py @@ -6,7 +6,7 @@ from typing import Optional from pydantic import BaseModel, Field from core.workflow.entities.node_entities import NodeRunResult -from models.workflow import WorkflowNodeExecutionStatus +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus class RouteNodeState(BaseModel): diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index f61965e07e..0d71a70971 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -14,8 +14,9 @@ from flask import Flask, current_app, has_request_context from configs import dify_config from core.app.apps.base_app_queue_manager import GenerateTaskStoppedError from core.app.entities.app_invoke_entities import InvokeFrom -from core.workflow.entities.node_entities import AgentNodeStrategyInit, NodeRunMetadataKey, NodeRunResult +from core.workflow.entities.node_entities import AgentNodeStrategyInit, NodeRunResult from core.workflow.entities.variable_pool import VariablePool, VariableValue +from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey, WorkflowNodeExecutionStatus from core.workflow.graph_engine.condition_handlers.condition_manager import ConditionManager from core.workflow.graph_engine.entities.event import ( BaseAgentEvent, @@ -54,7 +55,7 @@ from core.workflow.nodes.event import RunCompletedEvent, RunRetrieverResourceEve from core.workflow.nodes.node_mapping import NODE_TYPE_CLASSES_MAPPING from extensions.ext_database import db from models.enums import UserFrom -from models.workflow import WorkflowNodeExecutionStatus, WorkflowType +from models.workflow import WorkflowType logger = logging.getLogger(__name__) diff --git a/api/core/workflow/nodes/agent/agent_node.py b/api/core/workflow/nodes/agent/agent_node.py index 9bed8862fc..30b17cbd84 100644 --- a/api/core/workflow/nodes/agent/agent_node.py +++ b/api/core/workflow/nodes/agent/agent_node.py @@ -15,6 +15,7 @@ from core.tools.tool_manager import ToolManager from core.variables.segments import StringSegment from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.variable_pool import VariablePool +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.enums import SystemVariableKey from core.workflow.nodes.agent.entities import AgentNodeData, AgentOldVersionModelFeatures, ParamsAutoGenerated from core.workflow.nodes.base.entities import BaseNodeData @@ -25,7 +26,6 @@ from core.workflow.utils.variable_template_parser import VariableTemplateParser from extensions.ext_database import db from factories.agent_factory import get_plugin_agent_strategy from models.model import Conversation -from models.workflow import WorkflowNodeExecutionStatus class AgentNode(ToolNode): diff --git a/api/core/workflow/nodes/answer/answer_node.py b/api/core/workflow/nodes/answer/answer_node.py index 520cbdbb60..aa030870e2 100644 --- a/api/core/workflow/nodes/answer/answer_node.py +++ b/api/core/workflow/nodes/answer/answer_node.py @@ -3,6 +3,7 @@ from typing import Any, cast from core.variables import ArrayFileSegment, FileSegment from core.workflow.entities.node_entities import NodeRunResult +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.nodes.answer.answer_stream_generate_router import AnswerStreamGeneratorRouter from core.workflow.nodes.answer.entities import ( AnswerNodeData, @@ -13,7 +14,6 @@ from core.workflow.nodes.answer.entities import ( from core.workflow.nodes.base import BaseNode from core.workflow.nodes.enums import NodeType from core.workflow.utils.variable_template_parser import VariableTemplateParser -from models.workflow import WorkflowNodeExecutionStatus class AnswerNode(BaseNode[AnswerNodeData]): diff --git a/api/core/workflow/nodes/base/node.py b/api/core/workflow/nodes/base/node.py index e566770870..7da0c19740 100644 --- a/api/core/workflow/nodes/base/node.py +++ b/api/core/workflow/nodes/base/node.py @@ -4,9 +4,9 @@ from collections.abc import Generator, Mapping, Sequence from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar, Union, cast from core.workflow.entities.node_entities import NodeRunResult +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.nodes.enums import CONTINUE_ON_ERROR_NODE_TYPE, RETRY_ON_ERROR_NODE_TYPE, NodeType from core.workflow.nodes.event import NodeEvent, RunCompletedEvent -from models.workflow import WorkflowNodeExecutionStatus from .entities import BaseNodeData diff --git a/api/core/workflow/nodes/code/code_node.py b/api/core/workflow/nodes/code/code_node.py index 59dae792e8..61c08a7d71 100644 --- a/api/core/workflow/nodes/code/code_node.py +++ b/api/core/workflow/nodes/code/code_node.py @@ -8,10 +8,10 @@ from core.helper.code_executor.javascript.javascript_code_provider import Javasc from core.helper.code_executor.python3.python3_code_provider import Python3CodeProvider from core.variables.segments import ArrayFileSegment from core.workflow.entities.node_entities import NodeRunResult +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.nodes.base import BaseNode from core.workflow.nodes.code.entities import CodeNodeData from core.workflow.nodes.enums import NodeType -from models.workflow import WorkflowNodeExecutionStatus from .exc import ( CodeNodeError, diff --git a/api/core/workflow/nodes/document_extractor/node.py b/api/core/workflow/nodes/document_extractor/node.py index 65b5623a2e..d39eb9c932 100644 --- a/api/core/workflow/nodes/document_extractor/node.py +++ b/api/core/workflow/nodes/document_extractor/node.py @@ -26,9 +26,9 @@ from core.helper import ssrf_proxy from core.variables import ArrayFileSegment from core.variables.segments import FileSegment from core.workflow.entities.node_entities import NodeRunResult +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.nodes.base import BaseNode from core.workflow.nodes.enums import NodeType -from models.workflow import WorkflowNodeExecutionStatus from .entities import DocumentExtractorNodeData from .exc import DocumentExtractorError, FileDownloadError, TextExtractionError, UnsupportedFileTypeError diff --git a/api/core/workflow/nodes/end/end_node.py b/api/core/workflow/nodes/end/end_node.py index 6acc915ab5..0e9756b243 100644 --- a/api/core/workflow/nodes/end/end_node.py +++ b/api/core/workflow/nodes/end/end_node.py @@ -1,8 +1,8 @@ from core.workflow.entities.node_entities import NodeRunResult +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.nodes.base import BaseNode from core.workflow.nodes.end.entities import EndNodeData from core.workflow.nodes.enums import NodeType -from models.workflow import WorkflowNodeExecutionStatus class EndNode(BaseNode[EndNodeData]): diff --git a/api/core/workflow/nodes/event/event.py b/api/core/workflow/nodes/event/event.py index 9fea3fbda3..f45919caf5 100644 --- a/api/core/workflow/nodes/event/event.py +++ b/api/core/workflow/nodes/event/event.py @@ -4,7 +4,7 @@ from pydantic import BaseModel, Field from core.model_runtime.entities.llm_entities import LLMUsage from core.workflow.entities.node_entities import NodeRunResult -from models.workflow import WorkflowNodeExecutionStatus +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus class RunCompletedEvent(BaseModel): diff --git a/api/core/workflow/nodes/http_request/node.py b/api/core/workflow/nodes/http_request/node.py index 1c82637974..6b1ac57c06 100644 --- a/api/core/workflow/nodes/http_request/node.py +++ b/api/core/workflow/nodes/http_request/node.py @@ -8,12 +8,12 @@ from core.file import File, FileTransferMethod from core.tools.tool_file_manager import ToolFileManager from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.variable_entities import VariableSelector +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.nodes.base import BaseNode from core.workflow.nodes.enums import NodeType from core.workflow.nodes.http_request.executor import Executor from core.workflow.utils import variable_template_parser from factories import file_factory -from models.workflow import WorkflowNodeExecutionStatus from .entities import ( HttpRequestNodeData, diff --git a/api/core/workflow/nodes/if_else/if_else_node.py b/api/core/workflow/nodes/if_else/if_else_node.py index cb51b1ddd5..976922f75d 100644 --- a/api/core/workflow/nodes/if_else/if_else_node.py +++ b/api/core/workflow/nodes/if_else/if_else_node.py @@ -4,12 +4,12 @@ from typing_extensions import deprecated from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.variable_pool import VariablePool +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.nodes.base import BaseNode from core.workflow.nodes.enums import NodeType from core.workflow.nodes.if_else.entities import IfElseNodeData from core.workflow.utils.condition.entities import Condition from core.workflow.utils.condition.processor import ConditionProcessor -from models.workflow import WorkflowNodeExecutionStatus class IfElseNode(BaseNode[IfElseNodeData]): diff --git a/api/core/workflow/nodes/iteration/iteration_node.py b/api/core/workflow/nodes/iteration/iteration_node.py index ea0b6863c9..7d22a78895 100644 --- a/api/core/workflow/nodes/iteration/iteration_node.py +++ b/api/core/workflow/nodes/iteration/iteration_node.py @@ -12,10 +12,10 @@ from flask import Flask, current_app, has_request_context from configs import dify_config from core.variables import ArrayVariable, IntegerVariable, NoneVariable from core.workflow.entities.node_entities import ( - NodeRunMetadataKey, NodeRunResult, ) from core.workflow.entities.variable_pool import VariablePool +from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey, WorkflowNodeExecutionStatus from core.workflow.graph_engine.entities.event import ( BaseGraphEvent, BaseNodeEvent, @@ -37,7 +37,6 @@ from core.workflow.nodes.base import BaseNode from core.workflow.nodes.enums import NodeType from core.workflow.nodes.event import NodeEvent, RunCompletedEvent from core.workflow.nodes.iteration.entities import ErrorHandleMode, IterationNodeData -from models.workflow import WorkflowNodeExecutionStatus from .exc import ( InvalidIteratorValueError, diff --git a/api/core/workflow/nodes/iteration/iteration_start_node.py b/api/core/workflow/nodes/iteration/iteration_start_node.py index fe955e47d1..bee481ebdb 100644 --- a/api/core/workflow/nodes/iteration/iteration_start_node.py +++ b/api/core/workflow/nodes/iteration/iteration_start_node.py @@ -1,8 +1,8 @@ from core.workflow.entities.node_entities import NodeRunResult +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.nodes.base import BaseNode from core.workflow.nodes.enums import NodeType from core.workflow.nodes.iteration.entities import IterationStartNodeData -from models.workflow import WorkflowNodeExecutionStatus class IterationStartNode(BaseNode[IterationStartNodeData]): diff --git a/api/core/workflow/nodes/knowledge_retrieval/knowledge_retrieval_node.py b/api/core/workflow/nodes/knowledge_retrieval/knowledge_retrieval_node.py index 5955022e5f..2ddb4f8a0b 100644 --- a/api/core/workflow/nodes/knowledge_retrieval/knowledge_retrieval_node.py +++ b/api/core/workflow/nodes/knowledge_retrieval/knowledge_retrieval_node.py @@ -24,6 +24,7 @@ from core.rag.retrieval.dataset_retrieval import DatasetRetrieval from core.rag.retrieval.retrieval_methods import RetrievalMethod from core.variables import StringSegment from core.workflow.entities.node_entities import NodeRunResult +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.nodes.enums import NodeType from core.workflow.nodes.event.event import ModelInvokeCompletedEvent from core.workflow.nodes.knowledge_retrieval.template_prompts import ( @@ -41,7 +42,6 @@ from extensions.ext_database import db from extensions.ext_redis import redis_client from libs.json_in_md_parser import parse_and_check_json_markdown from models.dataset import Dataset, DatasetMetadata, Document, RateLimitLog -from models.workflow import WorkflowNodeExecutionStatus from services.feature_service import FeatureService from .entities import KnowledgeRetrievalNodeData, ModelConfig diff --git a/api/core/workflow/nodes/list_operator/node.py b/api/core/workflow/nodes/list_operator/node.py index 04ccfc5405..e698d3f5d8 100644 --- a/api/core/workflow/nodes/list_operator/node.py +++ b/api/core/workflow/nodes/list_operator/node.py @@ -4,9 +4,9 @@ from typing import Any, Literal, Union from core.file import File from core.variables import ArrayFileSegment, ArrayNumberSegment, ArrayStringSegment from core.workflow.entities.node_entities import NodeRunResult +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.nodes.base import BaseNode from core.workflow.nodes.enums import NodeType -from models.workflow import WorkflowNodeExecutionStatus from .entities import ListOperatorNodeData from .exc import InvalidConditionError, InvalidFilterValueError, InvalidKeyError, ListOperatorError diff --git a/api/core/workflow/nodes/llm/node.py b/api/core/workflow/nodes/llm/node.py index eeb44601ec..ceda0287fd 100644 --- a/api/core/workflow/nodes/llm/node.py +++ b/api/core/workflow/nodes/llm/node.py @@ -53,9 +53,10 @@ from core.variables import ( StringSegment, ) from core.workflow.constants import SYSTEM_VARIABLE_NODE_ID -from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult +from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.variable_entities import VariableSelector from core.workflow.entities.variable_pool import VariablePool +from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey, WorkflowNodeExecutionStatus from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.event import InNodeEvent from core.workflow.nodes.base import BaseNode @@ -77,7 +78,6 @@ from core.workflow.utils.variable_template_parser import VariableTemplateParser from extensions.ext_database import db from models.model import Conversation from models.provider import Provider, ProviderType -from models.workflow import WorkflowNodeExecutionStatus from .entities import ( LLMNodeChatModelMessage, diff --git a/api/core/workflow/nodes/loop/loop_end_node.py b/api/core/workflow/nodes/loop/loop_end_node.py index 5d4ce0ccbe..327b9e234b 100644 --- a/api/core/workflow/nodes/loop/loop_end_node.py +++ b/api/core/workflow/nodes/loop/loop_end_node.py @@ -1,8 +1,8 @@ from core.workflow.entities.node_entities import NodeRunResult +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.nodes.base import BaseNode from core.workflow.nodes.enums import NodeType from core.workflow.nodes.loop.entities import LoopEndNodeData -from models.workflow import WorkflowNodeExecutionStatus class LoopEndNode(BaseNode[LoopEndNodeData]): diff --git a/api/core/workflow/nodes/loop/loop_node.py b/api/core/workflow/nodes/loop/loop_node.py index bad3e2b928..eef63c5a92 100644 --- a/api/core/workflow/nodes/loop/loop_node.py +++ b/api/core/workflow/nodes/loop/loop_node.py @@ -15,7 +15,8 @@ from core.variables import ( SegmentType, StringSegment, ) -from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult +from core.workflow.entities.node_entities import NodeRunResult +from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey, WorkflowNodeExecutionStatus from core.workflow.graph_engine.entities.event import ( BaseGraphEvent, BaseNodeEvent, @@ -37,7 +38,6 @@ from core.workflow.nodes.enums import NodeType from core.workflow.nodes.event import NodeEvent, RunCompletedEvent from core.workflow.nodes.loop.entities import LoopNodeData from core.workflow.utils.condition.processor import ConditionProcessor -from models.workflow import WorkflowNodeExecutionStatus if TYPE_CHECKING: from core.workflow.entities.variable_pool import VariablePool diff --git a/api/core/workflow/nodes/loop/loop_start_node.py b/api/core/workflow/nodes/loop/loop_start_node.py index 7cf145e4e5..5a15f36044 100644 --- a/api/core/workflow/nodes/loop/loop_start_node.py +++ b/api/core/workflow/nodes/loop/loop_start_node.py @@ -1,8 +1,8 @@ from core.workflow.entities.node_entities import NodeRunResult +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.nodes.base import BaseNode from core.workflow.nodes.enums import NodeType from core.workflow.nodes.loop.entities import LoopStartNodeData -from models.workflow import WorkflowNodeExecutionStatus class LoopStartNode(BaseNode[LoopStartNodeData]): diff --git a/api/core/workflow/nodes/parameter_extractor/parameter_extractor_node.py b/api/core/workflow/nodes/parameter_extractor/parameter_extractor_node.py index 8db1e432fc..244b15594e 100644 --- a/api/core/workflow/nodes/parameter_extractor/parameter_extractor_node.py +++ b/api/core/workflow/nodes/parameter_extractor/parameter_extractor_node.py @@ -25,13 +25,13 @@ from core.prompt.advanced_prompt_transform import AdvancedPromptTransform from core.prompt.entities.advanced_prompt_entities import ChatModelMessage, CompletionModelPromptTemplate from core.prompt.simple_prompt_transform import ModelMode from core.prompt.utils.prompt_message_util import PromptMessageUtil -from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult +from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.variable_pool import VariablePool +from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey, WorkflowNodeExecutionStatus from core.workflow.nodes.enums import NodeType from core.workflow.nodes.llm import LLMNode, ModelConfig from core.workflow.utils import variable_template_parser from extensions.ext_database import db -from models.workflow import WorkflowNodeExecutionStatus from .entities import ParameterExtractorNodeData from .exc import ( diff --git a/api/core/workflow/nodes/question_classifier/question_classifier_node.py b/api/core/workflow/nodes/question_classifier/question_classifier_node.py index b4f34a3bef..47626e983d 100644 --- a/api/core/workflow/nodes/question_classifier/question_classifier_node.py +++ b/api/core/workflow/nodes/question_classifier/question_classifier_node.py @@ -10,7 +10,8 @@ from core.model_runtime.utils.encoders import jsonable_encoder from core.prompt.advanced_prompt_transform import AdvancedPromptTransform from core.prompt.simple_prompt_transform import ModelMode from core.prompt.utils.prompt_message_util import PromptMessageUtil -from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult +from core.workflow.entities.node_entities import NodeRunResult +from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey, WorkflowNodeExecutionStatus from core.workflow.nodes.enums import NodeType from core.workflow.nodes.event import ModelInvokeCompletedEvent from core.workflow.nodes.llm import ( @@ -20,7 +21,6 @@ from core.workflow.nodes.llm import ( ) from core.workflow.utils.variable_template_parser import VariableTemplateParser from libs.json_in_md_parser import parse_and_check_json_markdown -from models.workflow import WorkflowNodeExecutionStatus from .entities import QuestionClassifierNodeData from .exc import InvalidModelTypeError diff --git a/api/core/workflow/nodes/start/start_node.py b/api/core/workflow/nodes/start/start_node.py index 1b47b81517..8839aec9d6 100644 --- a/api/core/workflow/nodes/start/start_node.py +++ b/api/core/workflow/nodes/start/start_node.py @@ -1,9 +1,9 @@ from core.workflow.constants import SYSTEM_VARIABLE_NODE_ID from core.workflow.entities.node_entities import NodeRunResult +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.nodes.base import BaseNode from core.workflow.nodes.enums import NodeType from core.workflow.nodes.start.entities import StartNodeData -from models.workflow import WorkflowNodeExecutionStatus class StartNode(BaseNode[StartNodeData]): diff --git a/api/core/workflow/nodes/template_transform/template_transform_node.py b/api/core/workflow/nodes/template_transform/template_transform_node.py index 22a1b21888..476cf7eee4 100644 --- a/api/core/workflow/nodes/template_transform/template_transform_node.py +++ b/api/core/workflow/nodes/template_transform/template_transform_node.py @@ -4,10 +4,10 @@ from typing import Any, Optional from core.helper.code_executor.code_executor import CodeExecutionError, CodeExecutor, CodeLanguage from core.workflow.entities.node_entities import NodeRunResult +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.nodes.base import BaseNode from core.workflow.nodes.enums import NodeType from core.workflow.nodes.template_transform.entities import TemplateTransformNodeData -from models.workflow import WorkflowNodeExecutionStatus MAX_TEMPLATE_TRANSFORM_OUTPUT_LENGTH = int(os.environ.get("TEMPLATE_TRANSFORM_MAX_LENGTH", "80000")) diff --git a/api/core/workflow/nodes/tool/tool_node.py b/api/core/workflow/nodes/tool/tool_node.py index c72ae5b69b..077e21ade4 100644 --- a/api/core/workflow/nodes/tool/tool_node.py +++ b/api/core/workflow/nodes/tool/tool_node.py @@ -14,8 +14,9 @@ from core.tools.tool_engine import ToolEngine from core.tools.utils.message_transformer import ToolFileMessageTransformer from core.variables.segments import ArrayAnySegment from core.variables.variables import ArrayAnyVariable -from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult +from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.variable_pool import VariablePool +from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey, WorkflowNodeExecutionStatus from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.event import AgentLogEvent from core.workflow.nodes.base import BaseNode @@ -25,7 +26,6 @@ from core.workflow.utils.variable_template_parser import VariableTemplateParser from extensions.ext_database import db from factories import file_factory from models import ToolFile -from models.workflow import WorkflowNodeExecutionStatus from services.tools.builtin_tools_manage_service import BuiltinToolManageService from .entities import ToolNodeData diff --git a/api/core/workflow/nodes/variable_aggregator/variable_aggregator_node.py b/api/core/workflow/nodes/variable_aggregator/variable_aggregator_node.py index 372496a8fa..db3e25b015 100644 --- a/api/core/workflow/nodes/variable_aggregator/variable_aggregator_node.py +++ b/api/core/workflow/nodes/variable_aggregator/variable_aggregator_node.py @@ -1,8 +1,8 @@ from core.workflow.entities.node_entities import NodeRunResult +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.nodes.base import BaseNode from core.workflow.nodes.enums import NodeType from core.workflow.nodes.variable_aggregator.entities import VariableAssignerNodeData -from models.workflow import WorkflowNodeExecutionStatus class VariableAggregatorNode(BaseNode[VariableAssignerNodeData]): diff --git a/api/core/workflow/nodes/variable_assigner/v1/node.py b/api/core/workflow/nodes/variable_assigner/v1/node.py index 7c7f14c0b8..835e1d77b5 100644 --- a/api/core/workflow/nodes/variable_assigner/v1/node.py +++ b/api/core/workflow/nodes/variable_assigner/v1/node.py @@ -1,11 +1,11 @@ from core.variables import SegmentType, Variable from core.workflow.entities.node_entities import NodeRunResult +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.nodes.base import BaseNode from core.workflow.nodes.enums import NodeType from core.workflow.nodes.variable_assigner.common import helpers as common_helpers from core.workflow.nodes.variable_assigner.common.exc import VariableOperatorNodeError from factories import variable_factory -from models.workflow import WorkflowNodeExecutionStatus from .node_data import VariableAssignerData, WriteMode diff --git a/api/core/workflow/nodes/variable_assigner/v2/node.py b/api/core/workflow/nodes/variable_assigner/v2/node.py index 6a7ad86b51..8759a55b34 100644 --- a/api/core/workflow/nodes/variable_assigner/v2/node.py +++ b/api/core/workflow/nodes/variable_assigner/v2/node.py @@ -6,11 +6,11 @@ from core.app.entities.app_invoke_entities import InvokeFrom from core.variables import SegmentType, Variable from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID from core.workflow.entities.node_entities import NodeRunResult +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.nodes.base import BaseNode from core.workflow.nodes.enums import NodeType from core.workflow.nodes.variable_assigner.common import helpers as common_helpers from core.workflow.nodes.variable_assigner.common.exc import VariableOperatorNodeError -from models.workflow import WorkflowNodeExecutionStatus from . import helpers from .constants import EMPTY_VALUE_MAPPING diff --git a/api/core/workflow/repository/__init__.py b/api/core/workflow/repositories/__init__.py similarity index 69% rename from api/core/workflow/repository/__init__.py rename to api/core/workflow/repositories/__init__.py index 672abb6583..a778151baa 100644 --- a/api/core/workflow/repository/__init__.py +++ b/api/core/workflow/repositories/__init__.py @@ -6,7 +6,7 @@ for accessing and manipulating data, regardless of the underlying storage mechanism. """ -from core.workflow.repository.workflow_node_execution_repository import OrderConfig, WorkflowNodeExecutionRepository +from core.workflow.repositories.workflow_node_execution_repository import OrderConfig, WorkflowNodeExecutionRepository __all__ = [ "OrderConfig", diff --git a/api/core/workflow/repository/workflow_execution_repository.py b/api/core/workflow/repositories/workflow_execution_repository.py similarity index 94% rename from api/core/workflow/repository/workflow_execution_repository.py rename to api/core/workflow/repositories/workflow_execution_repository.py index a39a98ee33..5917310c8b 100644 --- a/api/core/workflow/repository/workflow_execution_repository.py +++ b/api/core/workflow/repositories/workflow_execution_repository.py @@ -1,6 +1,6 @@ from typing import Optional, Protocol -from core.workflow.entities.workflow_execution_entities import WorkflowExecution +from core.workflow.entities.workflow_execution import WorkflowExecution class WorkflowExecutionRepository(Protocol): diff --git a/api/core/workflow/repository/workflow_node_execution_repository.py b/api/core/workflow/repositories/workflow_node_execution_repository.py similarity index 97% rename from api/core/workflow/repository/workflow_node_execution_repository.py rename to api/core/workflow/repositories/workflow_node_execution_repository.py index 3ca9e2ecab..8c83b5ea6c 100644 --- a/api/core/workflow/repository/workflow_node_execution_repository.py +++ b/api/core/workflow/repositories/workflow_node_execution_repository.py @@ -2,7 +2,7 @@ from collections.abc import Sequence from dataclasses import dataclass from typing import Literal, Optional, Protocol -from core.workflow.entities.node_execution_entities import NodeExecution +from core.workflow.entities.workflow_node_execution import NodeExecution @dataclass diff --git a/api/core/workflow/workflow_cycle_manager.py b/api/core/workflow/workflow_cycle_manager.py index 24e23af093..6e3c2f3f78 100644 --- a/api/core/workflow/workflow_cycle_manager.py +++ b/api/core/workflow/workflow_cycle_manager.py @@ -1,11 +1,9 @@ from collections.abc import Mapping +from dataclasses import dataclass from datetime import UTC, datetime from typing import Any, Optional, Union from uuid import uuid4 -from sqlalchemy import func, select -from sqlalchemy.orm import Session - from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity from core.app.entities.queue_entities import ( QueueNodeExceptionEvent, @@ -19,21 +17,24 @@ from core.app.entities.queue_entities import ( from core.app.task_pipeline.exc import WorkflowRunNotFoundError from core.ops.entities.trace_entity import TraceTaskName from core.ops.ops_trace_manager import TraceQueueManager, TraceTask -from core.workflow.entities.node_entities import NodeRunMetadataKey -from core.workflow.entities.node_execution_entities import ( +from core.workflow.entities.workflow_execution import WorkflowExecution, WorkflowExecutionStatus, WorkflowType +from core.workflow.entities.workflow_node_execution import ( NodeExecution, - NodeExecutionStatus, + NodeRunMetadataKey, + WorkflowNodeExecutionStatus, ) -from core.workflow.entities.workflow_execution_entities import WorkflowExecution, WorkflowExecutionStatus, WorkflowType from core.workflow.enums import SystemVariableKey -from core.workflow.repository.workflow_execution_repository import WorkflowExecutionRepository -from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository +from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository +from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.workflow_entry import WorkflowEntry -from models import ( - Workflow, - WorkflowRun, - WorkflowRunStatus, -) + + +@dataclass +class CycleManagerWorkflowInfo: + workflow_id: str + workflow_type: WorkflowType + version: str + graph_data: Mapping[str, Any] class WorkflowCycleManager: @@ -42,32 +43,17 @@ class WorkflowCycleManager: *, application_generate_entity: Union[AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity], workflow_system_variables: dict[SystemVariableKey, Any], + workflow_info: CycleManagerWorkflowInfo, workflow_execution_repository: WorkflowExecutionRepository, workflow_node_execution_repository: WorkflowNodeExecutionRepository, ) -> None: self._application_generate_entity = application_generate_entity self._workflow_system_variables = workflow_system_variables + self._workflow_info = workflow_info self._workflow_execution_repository = workflow_execution_repository self._workflow_node_execution_repository = workflow_node_execution_repository - def handle_workflow_run_start( - self, - *, - session: Session, - workflow_id: str, - ) -> WorkflowExecution: - workflow_stmt = select(Workflow).where(Workflow.id == workflow_id) - workflow = session.scalar(workflow_stmt) - if not workflow: - raise ValueError(f"Workflow not found: {workflow_id}") - - max_sequence_stmt = select(func.max(WorkflowRun.sequence_number)).where( - WorkflowRun.tenant_id == workflow.tenant_id, - WorkflowRun.app_id == workflow.app_id, - ) - max_sequence = session.scalar(max_sequence_stmt) or 0 - new_sequence_number = max_sequence + 1 - + def handle_workflow_run_start(self) -> WorkflowExecution: inputs = {**self._application_generate_entity.inputs} for key, value in (self._workflow_system_variables or {}).items(): if key.value == "conversation": @@ -81,12 +67,11 @@ class WorkflowCycleManager: # TODO: This workflow_run_id should always not be None, maybe we can use a more elegant way to handle this execution_id = str(self._workflow_system_variables.get(SystemVariableKey.WORKFLOW_RUN_ID) or uuid4()) execution = WorkflowExecution.new( - id=execution_id, - workflow_id=workflow.id, - sequence_number=new_sequence_number, - type=WorkflowType(workflow.type), - workflow_version=workflow.version, - graph=workflow.graph_dict, + id_=execution_id, + workflow_id=self._workflow_info.workflow_id, + workflow_type=self._workflow_info.workflow_type, + workflow_version=self._workflow_info.version, + graph=self._workflow_info.graph_data, inputs=inputs, started_at=datetime.now(UTC).replace(tzinfo=None), ) @@ -168,7 +153,7 @@ class WorkflowCycleManager: workflow_run_id: str, total_tokens: int, total_steps: int, - status: WorkflowRunStatus, + status: WorkflowExecutionStatus, error_message: str, conversation_id: Optional[str] = None, trace_manager: Optional[TraceQueueManager] = None, @@ -185,7 +170,7 @@ class WorkflowCycleManager: # Use the instance repository to find running executions for a workflow run running_node_executions = self._workflow_node_execution_repository.get_running_executions( - workflow_run_id=workflow_execution.id + workflow_run_id=workflow_execution.id_ ) # Update the domain models @@ -193,7 +178,7 @@ class WorkflowCycleManager: for node_execution in running_node_executions: if node_execution.node_execution_id: # Update the domain model - node_execution.status = NodeExecutionStatus.FAILED + node_execution.status = WorkflowNodeExecutionStatus.FAILED node_execution.error = error_message node_execution.finished_at = now node_execution.elapsed_time = (now - node_execution.created_at).total_seconds() @@ -233,14 +218,14 @@ class WorkflowCycleManager: domain_execution = NodeExecution( id=str(uuid4()), workflow_id=workflow_execution.workflow_id, - workflow_run_id=workflow_execution.id, + workflow_run_id=workflow_execution.id_, predecessor_node_id=event.predecessor_node_id, index=event.node_run_index, node_execution_id=event.node_execution_id, node_id=event.node_id, node_type=event.node_type, title=event.node_data.title, - status=NodeExecutionStatus.RUNNING, + status=WorkflowNodeExecutionStatus.RUNNING, metadata=metadata, created_at=created_at, ) @@ -271,7 +256,7 @@ class WorkflowCycleManager: elapsed_time = (finished_at - event.start_at).total_seconds() # Update domain model - domain_execution.status = NodeExecutionStatus.SUCCEEDED + domain_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED domain_execution.update_from_mapping( inputs=inputs, process_data=process_data, outputs=outputs, metadata=execution_metadata_dict ) @@ -317,9 +302,9 @@ class WorkflowCycleManager: # Update domain model domain_execution.status = ( - NodeExecutionStatus.FAILED + WorkflowNodeExecutionStatus.FAILED if not isinstance(event, QueueNodeExceptionEvent) - else NodeExecutionStatus.EXCEPTION + else WorkflowNodeExecutionStatus.EXCEPTION ) domain_execution.error = event.error domain_execution.update_from_mapping( @@ -362,13 +347,13 @@ class WorkflowCycleManager: domain_execution = NodeExecution( id=str(uuid4()), workflow_id=workflow_execution.workflow_id, - workflow_run_id=workflow_execution.id, + workflow_run_id=workflow_execution.id_, predecessor_node_id=event.predecessor_node_id, node_execution_id=event.node_execution_id, node_id=event.node_id, node_type=event.node_type, title=event.node_data.title, - status=NodeExecutionStatus.RETRY, + status=WorkflowNodeExecutionStatus.RETRY, created_at=created_at, finished_at=finished_at, elapsed_time=elapsed_time, diff --git a/api/models/__init__.py b/api/models/__init__.py index f652449e98..13eab226b7 100644 --- a/api/models/__init__.py +++ b/api/models/__init__.py @@ -85,10 +85,8 @@ from .workflow import ( WorkflowAppLog, WorkflowAppLogCreatedFrom, WorkflowNodeExecution, - WorkflowNodeExecutionStatus, WorkflowNodeExecutionTriggeredFrom, WorkflowRun, - WorkflowRunStatus, WorkflowType, ) @@ -100,14 +98,14 @@ __all__ = [ "AccountStatus", "ApiRequest", "ApiToken", - "ApiToolProvider", # Added + "ApiToolProvider", "App", "AppAnnotationHitHistory", "AppAnnotationSetting", "AppDatasetJoin", "AppMode", "AppModelConfig", - "BuiltinToolProvider", # Added + "BuiltinToolProvider", "CeleryTask", "CeleryTaskSet", "Conversation", @@ -172,10 +170,8 @@ __all__ = [ "WorkflowAppLog", "WorkflowAppLogCreatedFrom", "WorkflowNodeExecution", - "WorkflowNodeExecutionStatus", "WorkflowNodeExecutionTriggeredFrom", "WorkflowRun", - "WorkflowRunStatus", "WorkflowRunTriggeredFrom", "WorkflowToolProvider", "WorkflowType", diff --git a/api/models/model.py b/api/models/model.py index 92a5c0d121..229e77134e 100644 --- a/api/models/model.py +++ b/api/models/model.py @@ -9,6 +9,7 @@ from typing import TYPE_CHECKING, Any, Literal, Optional, cast from core.plugin.entities.plugin import GenericProviderID from core.tools.entities.tool_entities import ToolProviderType from core.tools.signature import sign_tool_file +from core.workflow.entities.workflow_execution import WorkflowExecutionStatus from services.plugin.plugin_service import PluginService if TYPE_CHECKING: @@ -31,7 +32,6 @@ from .base import Base from .engine import db from .enums import CreatorUserRole from .types import StringUUID -from .workflow import WorkflowRunStatus if TYPE_CHECKING: from .workflow import Workflow @@ -794,22 +794,22 @@ class Conversation(Base): def status_count(self): messages = db.session.query(Message).filter(Message.conversation_id == self.id).all() status_counts = { - WorkflowRunStatus.RUNNING: 0, - WorkflowRunStatus.SUCCEEDED: 0, - WorkflowRunStatus.FAILED: 0, - WorkflowRunStatus.STOPPED: 0, - WorkflowRunStatus.PARTIAL_SUCCEEDED: 0, + WorkflowExecutionStatus.RUNNING: 0, + WorkflowExecutionStatus.SUCCEEDED: 0, + WorkflowExecutionStatus.FAILED: 0, + WorkflowExecutionStatus.STOPPED: 0, + WorkflowExecutionStatus.PARTIAL_SUCCEEDED: 0, } for message in messages: if message.workflow_run: - status_counts[WorkflowRunStatus(message.workflow_run.status)] += 1 + status_counts[WorkflowExecutionStatus(message.workflow_run.status)] += 1 return ( { - "success": status_counts[WorkflowRunStatus.SUCCEEDED], - "failed": status_counts[WorkflowRunStatus.FAILED], - "partial_success": status_counts[WorkflowRunStatus.PARTIAL_SUCCEEDED], + "success": status_counts[WorkflowExecutionStatus.SUCCEEDED], + "failed": status_counts[WorkflowExecutionStatus.FAILED], + "partial_success": status_counts[WorkflowExecutionStatus.PARTIAL_SUCCEEDED], } if messages else None diff --git a/api/models/workflow.py b/api/models/workflow.py index ae341dd1b5..6a162d52d4 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -377,18 +377,6 @@ class Workflow(Base): ) -class WorkflowRunStatus(StrEnum): - """ - Workflow Run Status Enum - """ - - RUNNING = "running" - SUCCEEDED = "succeeded" - FAILED = "failed" - STOPPED = "stopped" - PARTIAL_SUCCEEDED = "partial-succeeded" - - class WorkflowRun(Base): """ Workflow Run @@ -553,18 +541,6 @@ class WorkflowNodeExecutionTriggeredFrom(StrEnum): WORKFLOW_RUN = "workflow-run" -class WorkflowNodeExecutionStatus(StrEnum): - """ - Workflow Node Execution Status Enum - """ - - RUNNING = "running" - SUCCEEDED = "succeeded" - FAILED = "failed" - EXCEPTION = "exception" - RETRY = "retry" - - class WorkflowNodeExecution(Base): """ Workflow Node Execution diff --git a/api/pytest.ini b/api/pytest.ini index 618e921825..eb49619481 100644 --- a/api/pytest.ini +++ b/api/pytest.ini @@ -1,5 +1,4 @@ [pytest] -continue-on-collection-errors = true addopts = --cov=./api --cov-report=json --cov-report=xml env = ANTHROPIC_API_KEY = sk-ant-api11-IamNotARealKeyJustForMockTestKawaiiiiiiiiii-NotBaka-ASkksz diff --git a/api/services/workflow_app_service.py b/api/services/workflow_app_service.py index a899ebe278..6b30a70372 100644 --- a/api/services/workflow_app_service.py +++ b/api/services/workflow_app_service.py @@ -4,9 +4,9 @@ from datetime import datetime from sqlalchemy import and_, func, or_, select from sqlalchemy.orm import Session +from core.workflow.entities.workflow_execution import WorkflowExecutionStatus from models import App, EndUser, WorkflowAppLog, WorkflowRun from models.enums import CreatorUserRole -from models.workflow import WorkflowRunStatus class WorkflowAppService: @@ -16,7 +16,7 @@ class WorkflowAppService: session: Session, app_model: App, keyword: str | None = None, - status: WorkflowRunStatus | None = None, + status: WorkflowExecutionStatus | None = None, created_at_before: datetime | None = None, created_at_after: datetime | None = None, page: int = 1, diff --git a/api/services/workflow_run_service.py b/api/services/workflow_run_service.py index 21366a4552..0ad900d758 100644 --- a/api/services/workflow_run_service.py +++ b/api/services/workflow_run_service.py @@ -4,7 +4,7 @@ from typing import Optional import contexts from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository -from core.workflow.repository.workflow_node_execution_repository import OrderConfig +from core.workflow.repositories.workflow_node_execution_repository import OrderConfig from extensions.ext_database import db from libs.infinite_scroll_pagination import InfiniteScrollPagination from models import ( diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 50bb8f40ae..2460aa25a7 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -13,7 +13,7 @@ from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.variables import Variable from core.workflow.entities.node_entities import NodeRunResult -from core.workflow.entities.node_execution_entities import NodeExecution, NodeExecutionStatus +from core.workflow.entities.workflow_node_execution import NodeExecution, WorkflowNodeExecutionStatus from core.workflow.errors import WorkflowNodeRunFailedError from core.workflow.graph_engine.entities.event import InNodeEvent from core.workflow.nodes import NodeType @@ -31,7 +31,6 @@ from models.tools import WorkflowToolProvider from models.workflow import ( Workflow, WorkflowNodeExecution, - WorkflowNodeExecutionStatus, WorkflowNodeExecutionTriggeredFrom, WorkflowType, ) @@ -404,13 +403,13 @@ class WorkflowService: # Map status from WorkflowNodeExecutionStatus to NodeExecutionStatus if node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED: - node_execution.status = NodeExecutionStatus.SUCCEEDED + node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED elif node_run_result.status == WorkflowNodeExecutionStatus.EXCEPTION: - node_execution.status = NodeExecutionStatus.EXCEPTION + node_execution.status = WorkflowNodeExecutionStatus.EXCEPTION node_execution.error = node_run_result.error else: # Set failed status and error - node_execution.status = NodeExecutionStatus.FAILED + node_execution.status = WorkflowNodeExecutionStatus.FAILED node_execution.error = error return node_execution diff --git a/api/tests/integration_tests/workflow/nodes/test_code.py b/api/tests/integration_tests/workflow/nodes/test_code.py index 4de985ae7c..13d78c2d83 100644 --- a/api/tests/integration_tests/workflow/nodes/test_code.py +++ b/api/tests/integration_tests/workflow/nodes/test_code.py @@ -8,6 +8,7 @@ import pytest from core.app.entities.app_invoke_entities import InvokeFrom from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.variable_pool import VariablePool +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph import Graph from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams @@ -15,7 +16,7 @@ from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntime from core.workflow.nodes.code.code_node import CodeNode from core.workflow.nodes.code.entities import CodeNodeData from models.enums import UserFrom -from models.workflow import WorkflowNodeExecutionStatus, WorkflowType +from models.workflow import WorkflowType from tests.integration_tests.workflow.nodes.__mock.code_executor import setup_code_executor_mock CODE_MAX_STRING_LENGTH = int(getenv("CODE_MAX_STRING_LENGTH", "10000")) diff --git a/api/tests/integration_tests/workflow/nodes/test_llm.py b/api/tests/integration_tests/workflow/nodes/test_llm.py index 777a04bd7f..5fbee266bd 100644 --- a/api/tests/integration_tests/workflow/nodes/test_llm.py +++ b/api/tests/integration_tests/workflow/nodes/test_llm.py @@ -9,6 +9,7 @@ import pytest from core.app.entities.app_invoke_entities import InvokeFrom from core.workflow.entities.variable_pool import VariablePool +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph import Graph from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams @@ -17,7 +18,7 @@ from core.workflow.nodes.event import RunCompletedEvent from core.workflow.nodes.llm.node import LLMNode from extensions.ext_database import db from models.enums import UserFrom -from models.workflow import WorkflowNodeExecutionStatus, WorkflowType +from models.workflow import WorkflowType from tests.integration_tests.workflow.nodes.__mock.model import get_mocked_fetch_model_config """FOR MOCK FIXTURES, DO NOT REMOVE""" diff --git a/api/tests/integration_tests/workflow/nodes/test_parameter_extractor.py b/api/tests/integration_tests/workflow/nodes/test_parameter_extractor.py index 5c6bb82024..e89e03ae86 100644 --- a/api/tests/integration_tests/workflow/nodes/test_parameter_extractor.py +++ b/api/tests/integration_tests/workflow/nodes/test_parameter_extractor.py @@ -7,6 +7,7 @@ from unittest.mock import MagicMock from core.app.entities.app_invoke_entities import InvokeFrom from core.model_runtime.entities import AssistantPromptMessage from core.workflow.entities.variable_pool import VariablePool +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph import Graph from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams @@ -17,7 +18,7 @@ from models.enums import UserFrom from tests.integration_tests.workflow.nodes.__mock.model import get_mocked_fetch_model_config """FOR MOCK FIXTURES, DO NOT REMOVE""" -from models.workflow import WorkflowNodeExecutionStatus, WorkflowType +from models.workflow import WorkflowType from tests.integration_tests.model_runtime.__mock.plugin_daemon import setup_model_mock diff --git a/api/tests/integration_tests/workflow/nodes/test_template_transform.py b/api/tests/integration_tests/workflow/nodes/test_template_transform.py index 51d61a95ea..a5f2677a59 100644 --- a/api/tests/integration_tests/workflow/nodes/test_template_transform.py +++ b/api/tests/integration_tests/workflow/nodes/test_template_transform.py @@ -5,13 +5,14 @@ import pytest from core.app.entities.app_invoke_entities import InvokeFrom from core.workflow.entities.variable_pool import VariablePool +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph import Graph from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.nodes.template_transform.template_transform_node import TemplateTransformNode from models.enums import UserFrom -from models.workflow import WorkflowNodeExecutionStatus, WorkflowType +from models.workflow import WorkflowType from tests.integration_tests.workflow.nodes.__mock.code_executor import setup_code_executor_mock diff --git a/api/tests/integration_tests/workflow/nodes/test_tool.py b/api/tests/integration_tests/workflow/nodes/test_tool.py index 5a569a5983..039beedafe 100644 --- a/api/tests/integration_tests/workflow/nodes/test_tool.py +++ b/api/tests/integration_tests/workflow/nodes/test_tool.py @@ -5,6 +5,7 @@ from unittest.mock import MagicMock from core.app.entities.app_invoke_entities import InvokeFrom from core.tools.utils.configuration import ToolParameterConfigurationManager from core.workflow.entities.variable_pool import VariablePool +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph import Graph from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams @@ -12,7 +13,7 @@ from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntime from core.workflow.nodes.event.event import RunCompletedEvent from core.workflow.nodes.tool.tool_node import ToolNode from models.enums import UserFrom -from models.workflow import WorkflowNodeExecutionStatus, WorkflowType +from models.workflow import WorkflowType def init_tool_node(config: dict): diff --git a/api/tests/unit_tests/core/prompt/test_extract_thread_messages.py b/api/tests/unit_tests/core/prompt/test_extract_thread_messages.py index ba3c1eb5e0..e3e500e310 100644 --- a/api/tests/unit_tests/core/prompt/test_extract_thread_messages.py +++ b/api/tests/unit_tests/core/prompt/test_extract_thread_messages.py @@ -4,7 +4,7 @@ from constants import UUID_NIL from core.prompt.utils.extract_thread_messages import extract_thread_messages -class TestMessage: +class MockMessage: def __init__(self, id, parent_message_id): self.id = id self.parent_message_id = parent_message_id @@ -14,7 +14,7 @@ class TestMessage: def test_extract_thread_messages_single_message(): - messages = [TestMessage(str(uuid4()), UUID_NIL)] + messages = [MockMessage(str(uuid4()), UUID_NIL)] result = extract_thread_messages(messages) assert len(result) == 1 assert result[0] == messages[0] @@ -23,11 +23,11 @@ def test_extract_thread_messages_single_message(): def test_extract_thread_messages_linear_thread(): id1, id2, id3, id4, id5 = str(uuid4()), str(uuid4()), str(uuid4()), str(uuid4()), str(uuid4()) messages = [ - TestMessage(id5, id4), - TestMessage(id4, id3), - TestMessage(id3, id2), - TestMessage(id2, id1), - TestMessage(id1, UUID_NIL), + MockMessage(id5, id4), + MockMessage(id4, id3), + MockMessage(id3, id2), + MockMessage(id2, id1), + MockMessage(id1, UUID_NIL), ] result = extract_thread_messages(messages) assert len(result) == 5 @@ -37,10 +37,10 @@ def test_extract_thread_messages_linear_thread(): def test_extract_thread_messages_branched_thread(): id1, id2, id3, id4 = str(uuid4()), str(uuid4()), str(uuid4()), str(uuid4()) messages = [ - TestMessage(id4, id2), - TestMessage(id3, id2), - TestMessage(id2, id1), - TestMessage(id1, UUID_NIL), + MockMessage(id4, id2), + MockMessage(id3, id2), + MockMessage(id2, id1), + MockMessage(id1, UUID_NIL), ] result = extract_thread_messages(messages) assert len(result) == 3 @@ -56,9 +56,9 @@ def test_extract_thread_messages_empty_list(): def test_extract_thread_messages_partially_loaded(): id0, id1, id2, id3 = str(uuid4()), str(uuid4()), str(uuid4()), str(uuid4()) messages = [ - TestMessage(id3, id2), - TestMessage(id2, id1), - TestMessage(id1, id0), + MockMessage(id3, id2), + MockMessage(id2, id1), + MockMessage(id1, id0), ] result = extract_thread_messages(messages) assert len(result) == 3 @@ -68,9 +68,9 @@ def test_extract_thread_messages_partially_loaded(): def test_extract_thread_messages_legacy_messages(): id1, id2, id3 = str(uuid4()), str(uuid4()), str(uuid4()) messages = [ - TestMessage(id3, UUID_NIL), - TestMessage(id2, UUID_NIL), - TestMessage(id1, UUID_NIL), + MockMessage(id3, UUID_NIL), + MockMessage(id2, UUID_NIL), + MockMessage(id1, UUID_NIL), ] result = extract_thread_messages(messages) assert len(result) == 3 @@ -80,11 +80,11 @@ def test_extract_thread_messages_legacy_messages(): def test_extract_thread_messages_mixed_with_legacy_messages(): id1, id2, id3, id4, id5 = str(uuid4()), str(uuid4()), str(uuid4()), str(uuid4()), str(uuid4()) messages = [ - TestMessage(id5, id4), - TestMessage(id4, id2), - TestMessage(id3, id2), - TestMessage(id2, UUID_NIL), - TestMessage(id1, UUID_NIL), + MockMessage(id5, id4), + MockMessage(id4, id2), + MockMessage(id3, id2), + MockMessage(id2, UUID_NIL), + MockMessage(id1, UUID_NIL), ] result = extract_thread_messages(messages) assert len(result) == 4 diff --git a/api/tests/unit_tests/core/rag/datasource/vdb/milvus/test_milvus.py b/api/tests/unit_tests/core/rag/datasource/vdb/milvus/test_milvus.py index bd414c88f4..48cc8a7e1c 100644 --- a/api/tests/unit_tests/core/rag/datasource/vdb/milvus/test_milvus.py +++ b/api/tests/unit_tests/core/rag/datasource/vdb/milvus/test_milvus.py @@ -1,5 +1,5 @@ import pytest -from pydantic.error_wrappers import ValidationError +from pydantic import ValidationError from core.rag.datasource.vdb.milvus.milvus_vector import MilvusConfig diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py b/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py index f3dbd1836b..34c64121af 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py @@ -6,6 +6,7 @@ from flask import Flask from core.app.entities.app_invoke_entities import InvokeFrom from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult from core.workflow.entities.variable_pool import VariablePool +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.event import ( BaseNodeEvent, @@ -25,7 +26,7 @@ from core.workflow.nodes.event import RunCompletedEvent, RunStreamChunkEvent from core.workflow.nodes.llm.node import LLMNode from core.workflow.nodes.question_classifier.question_classifier_node import QuestionClassifierNode from models.enums import UserFrom -from models.workflow import WorkflowNodeExecutionStatus, WorkflowType +from models.workflow import WorkflowType @pytest.fixture diff --git a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py index 0369f3fa44..b7f78d91fa 100644 --- a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py +++ b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py @@ -4,6 +4,7 @@ from unittest.mock import MagicMock from core.app.entities.app_invoke_entities import InvokeFrom from core.workflow.entities.variable_pool import VariablePool +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph import Graph from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams @@ -11,7 +12,7 @@ from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntime from core.workflow.nodes.answer.answer_node import AnswerNode from extensions.ext_database import db from models.enums import UserFrom -from models.workflow import WorkflowNodeExecutionStatus, WorkflowType +from models.workflow import WorkflowType def test_execute_answer(): diff --git a/api/tests/unit_tests/core/workflow/nodes/http_request/test_http_request_node.py b/api/tests/unit_tests/core/workflow/nodes/http_request/test_http_request_node.py index 2073d355f0..7fd32a4826 100644 --- a/api/tests/unit_tests/core/workflow/nodes/http_request/test_http_request_node.py +++ b/api/tests/unit_tests/core/workflow/nodes/http_request/test_http_request_node.py @@ -4,6 +4,7 @@ from core.app.entities.app_invoke_entities import InvokeFrom from core.file import File, FileTransferMethod, FileType from core.variables import ArrayFileVariable, FileVariable from core.workflow.entities.variable_pool import VariablePool +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.graph_engine import Graph, GraphInitParams, GraphRuntimeState from core.workflow.nodes.answer import AnswerStreamGenerateRoute from core.workflow.nodes.end import EndStreamParam @@ -15,7 +16,7 @@ from core.workflow.nodes.http_request import ( HttpRequestNodeData, ) from models.enums import UserFrom -from models.workflow import WorkflowNodeExecutionStatus, WorkflowType +from models.workflow import WorkflowType def test_http_request_node_binary_file(monkeypatch): diff --git a/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py b/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py index 29bd4d6c6c..6d854c950d 100644 --- a/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py +++ b/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py @@ -5,6 +5,7 @@ from unittest.mock import patch from core.app.entities.app_invoke_entities import InvokeFrom from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.variable_pool import VariablePool +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph import Graph from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams @@ -14,7 +15,7 @@ from core.workflow.nodes.iteration.entities import ErrorHandleMode from core.workflow.nodes.iteration.iteration_node import IterationNode from core.workflow.nodes.template_transform.template_transform_node import TemplateTransformNode from models.enums import UserFrom -from models.workflow import WorkflowNodeExecutionStatus, WorkflowType +from models.workflow import WorkflowType def test_run(): diff --git a/api/tests/unit_tests/core/workflow/nodes/test_answer.py b/api/tests/unit_tests/core/workflow/nodes/test_answer.py index 2f0aa28b48..abc822e98b 100644 --- a/api/tests/unit_tests/core/workflow/nodes/test_answer.py +++ b/api/tests/unit_tests/core/workflow/nodes/test_answer.py @@ -4,6 +4,7 @@ from unittest.mock import MagicMock from core.app.entities.app_invoke_entities import InvokeFrom from core.workflow.entities.variable_pool import VariablePool +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph import Graph from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams @@ -11,7 +12,7 @@ from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntime from core.workflow.nodes.answer.answer_node import AnswerNode from extensions.ext_database import db from models.enums import UserFrom -from models.workflow import WorkflowNodeExecutionStatus, WorkflowType +from models.workflow import WorkflowType def test_execute_answer(): diff --git a/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py b/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py index 111c647d9c..c429ac7dd3 100644 --- a/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py +++ b/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py @@ -2,6 +2,7 @@ from unittest.mock import patch from core.app.entities.app_invoke_entities import InvokeFrom from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.event import ( GraphRunPartialSucceededEvent, @@ -14,7 +15,7 @@ from core.workflow.graph_engine.graph_engine import GraphEngine from core.workflow.nodes.event.event import RunCompletedEvent, RunStreamChunkEvent from core.workflow.nodes.llm.node import LLMNode from models.enums import UserFrom -from models.workflow import WorkflowNodeExecutionStatus, WorkflowType +from models.workflow import WorkflowType class ContinueOnErrorTestHelper: diff --git a/api/tests/unit_tests/core/workflow/nodes/test_document_extractor_node.py b/api/tests/unit_tests/core/workflow/nodes/test_document_extractor_node.py index 6d46ea9b89..35d83449c3 100644 --- a/api/tests/unit_tests/core/workflow/nodes/test_document_extractor_node.py +++ b/api/tests/unit_tests/core/workflow/nodes/test_document_extractor_node.py @@ -7,6 +7,7 @@ from core.file import File, FileTransferMethod from core.variables import ArrayFileSegment from core.variables.variables import StringVariable from core.workflow.entities.node_entities import NodeRunResult +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.nodes.document_extractor import DocumentExtractorNode, DocumentExtractorNodeData from core.workflow.nodes.document_extractor.node import ( _extract_text_from_docx, @@ -15,7 +16,6 @@ from core.workflow.nodes.document_extractor.node import ( _extract_text_from_plain_text, ) from core.workflow.nodes.enums import NodeType -from models.workflow import WorkflowNodeExecutionStatus @pytest.fixture diff --git a/api/tests/unit_tests/core/workflow/nodes/test_if_else.py b/api/tests/unit_tests/core/workflow/nodes/test_if_else.py index 41e2c5d484..c4e411f9d6 100644 --- a/api/tests/unit_tests/core/workflow/nodes/test_if_else.py +++ b/api/tests/unit_tests/core/workflow/nodes/test_if_else.py @@ -6,6 +6,7 @@ from core.app.entities.app_invoke_entities import InvokeFrom from core.file import File, FileTransferMethod, FileType from core.variables import ArrayFileSegment from core.workflow.entities.variable_pool import VariablePool +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph import Graph from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams @@ -15,7 +16,7 @@ from core.workflow.nodes.if_else.if_else_node import IfElseNode from core.workflow.utils.condition.entities import Condition, SubCondition, SubVariableCondition from extensions.ext_database import db from models.enums import UserFrom -from models.workflow import WorkflowNodeExecutionStatus, WorkflowType +from models.workflow import WorkflowType def test_execute_if_else_result_true(): diff --git a/api/tests/unit_tests/core/workflow/nodes/test_list_operator.py b/api/tests/unit_tests/core/workflow/nodes/test_list_operator.py index 36116d3540..77d42e2692 100644 --- a/api/tests/unit_tests/core/workflow/nodes/test_list_operator.py +++ b/api/tests/unit_tests/core/workflow/nodes/test_list_operator.py @@ -4,6 +4,7 @@ import pytest from core.file import File, FileTransferMethod, FileType from core.variables import ArrayFileSegment +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.nodes.list_operator.entities import ( ExtractConfig, FilterBy, @@ -14,7 +15,6 @@ from core.workflow.nodes.list_operator.entities import ( ) from core.workflow.nodes.list_operator.exc import InvalidKeyError from core.workflow.nodes.list_operator.node import ListOperatorNode, _get_file_extract_string_func -from models.workflow import WorkflowNodeExecutionStatus @pytest.fixture diff --git a/api/tests/unit_tests/core/workflow/nodes/tool/test_tool_node.py b/api/tests/unit_tests/core/workflow/nodes/tool/test_tool_node.py index f593510830..e121f6338c 100644 --- a/api/tests/unit_tests/core/workflow/nodes/tool/test_tool_node.py +++ b/api/tests/unit_tests/core/workflow/nodes/tool/test_tool_node.py @@ -7,6 +7,7 @@ from core.tools.entities.tool_entities import ToolInvokeMessage, ToolProviderTyp from core.tools.errors import ToolInvokeError from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.variable_pool import VariablePool +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.graph_engine import Graph, GraphInitParams, GraphRuntimeState from core.workflow.nodes.answer import AnswerStreamGenerateRoute from core.workflow.nodes.end import EndStreamParam @@ -14,7 +15,7 @@ from core.workflow.nodes.enums import ErrorStrategy from core.workflow.nodes.event import RunCompletedEvent from core.workflow.nodes.tool import ToolNode from core.workflow.nodes.tool.entities import ToolNodeData -from models import UserFrom, WorkflowNodeExecutionStatus, WorkflowType +from models import UserFrom, WorkflowType def _create_tool_node(): diff --git a/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py b/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py index 9c955fc086..a5574d309b 100644 --- a/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py +++ b/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py @@ -12,21 +12,20 @@ from core.app.entities.queue_entities import ( QueueNodeStartedEvent, QueueNodeSucceededEvent, ) -from core.workflow.entities.node_entities import NodeRunMetadataKey -from core.workflow.entities.node_execution_entities import NodeExecution, NodeExecutionStatus -from core.workflow.entities.workflow_execution_entities import WorkflowExecution, WorkflowExecutionStatus, WorkflowType +from core.workflow.entities.workflow_execution import WorkflowExecution, WorkflowExecutionStatus, WorkflowType +from core.workflow.entities.workflow_node_execution import ( + NodeExecution, + NodeRunMetadataKey, + WorkflowNodeExecutionStatus, +) from core.workflow.enums import SystemVariableKey from core.workflow.nodes import NodeType -from core.workflow.repository.workflow_execution_repository import WorkflowExecutionRepository -from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository -from core.workflow.workflow_cycle_manager import WorkflowCycleManager +from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository +from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository +from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager from models.enums import CreatorUserRole from models.model import AppMode -from models.workflow import ( - Workflow, - WorkflowRun, - WorkflowRunStatus, -) +from models.workflow import Workflow, WorkflowRun @pytest.fixture @@ -93,16 +92,38 @@ def mock_workflow_execution_repository(): return repo +@pytest.fixture +def real_workflow_entity(): + return CycleManagerWorkflowInfo( + workflow_id="test-workflow-id", # Matches ID used in other fixtures + workflow_type=WorkflowType.CHAT, + version="1.0.0", + graph_data={ + "nodes": [ + { + "id": "node1", + "type": "chat", # NodeType is a string enum + "name": "Chat Node", + "data": {"model": "gpt-3.5-turbo", "prompt": "test prompt"}, + } + ], + "edges": [], + }, + ) + + @pytest.fixture def workflow_cycle_manager( real_app_generate_entity, real_workflow_system_variables, mock_workflow_execution_repository, mock_node_execution_repository, + real_workflow_entity, ): return WorkflowCycleManager( application_generate_entity=real_app_generate_entity, workflow_system_variables=real_workflow_system_variables, + workflow_info=real_workflow_entity, workflow_execution_repository=mock_workflow_execution_repository, workflow_node_execution_repository=mock_node_execution_repository, ) @@ -148,7 +169,7 @@ def real_workflow_run(): workflow_run.version = "1.0" workflow_run.graph = json.dumps({"nodes": [], "edges": []}) workflow_run.inputs = json.dumps({"query": "test query"}) - workflow_run.status = WorkflowRunStatus.RUNNING + workflow_run.status = WorkflowExecutionStatus.RUNNING workflow_run.outputs = json.dumps({"answer": "test answer"}) workflow_run.created_by_role = CreatorUserRole.ACCOUNT workflow_run.created_by = "test-user-id" @@ -171,20 +192,13 @@ def test_init( assert workflow_cycle_manager._workflow_node_execution_repository == mock_node_execution_repository -def test_handle_workflow_run_start(workflow_cycle_manager, mock_session, real_workflow): +def test_handle_workflow_run_start(workflow_cycle_manager): """Test handle_workflow_run_start method""" - # Mock session.scalar to return the workflow and max sequence - mock_session.scalar.side_effect = [real_workflow, 5] - # Call the method - workflow_execution = workflow_cycle_manager.handle_workflow_run_start( - session=mock_session, - workflow_id="test-workflow-id", - ) + workflow_execution = workflow_cycle_manager.handle_workflow_run_start() # Verify the result - assert workflow_execution.workflow_id == real_workflow.id - assert workflow_execution.sequence_number == 6 # max_sequence + 1 + assert workflow_execution.workflow_id == "test-workflow-id" # Verify the workflow_execution_repository.save was called workflow_cycle_manager._workflow_execution_repository.save.assert_called_once_with(workflow_execution) @@ -195,11 +209,10 @@ def test_handle_workflow_run_success(workflow_cycle_manager, mock_workflow_execu # Create a real WorkflowExecution workflow_execution = WorkflowExecution( - id="test-workflow-run-id", + id_="test-workflow-run-id", workflow_id="test-workflow-id", workflow_version="1.0", - sequence_number=1, - type=WorkflowType.CHAT, + workflow_type=WorkflowType.CHAT, graph={"nodes": [], "edges": []}, inputs={"query": "test query"}, started_at=datetime.now(UTC).replace(tzinfo=None), @@ -230,11 +243,10 @@ def test_handle_workflow_run_failed(workflow_cycle_manager, mock_workflow_execut # Create a real WorkflowExecution workflow_execution = WorkflowExecution( - id="test-workflow-run-id", + id_="test-workflow-run-id", workflow_id="test-workflow-id", workflow_version="1.0", - sequence_number=1, - type=WorkflowType.CHAT, + workflow_type=WorkflowType.CHAT, graph={"nodes": [], "edges": []}, inputs={"query": "test query"}, started_at=datetime.now(UTC).replace(tzinfo=None), @@ -251,13 +263,13 @@ def test_handle_workflow_run_failed(workflow_cycle_manager, mock_workflow_execut workflow_run_id="test-workflow-run-id", total_tokens=50, total_steps=3, - status=WorkflowRunStatus.FAILED, + status=WorkflowExecutionStatus.FAILED, error_message="Test error message", ) # Verify the result assert result == workflow_execution - assert result.status == WorkflowExecutionStatus(WorkflowRunStatus.FAILED.value) + assert result.status == WorkflowExecutionStatus.FAILED assert result.error_message == "Test error message" assert result.total_tokens == 50 assert result.total_steps == 3 @@ -269,11 +281,10 @@ def test_handle_node_execution_start(workflow_cycle_manager, mock_workflow_execu # Create a real WorkflowExecution workflow_execution = WorkflowExecution( - id="test-workflow-execution-id", + id_="test-workflow-execution-id", workflow_id="test-workflow-id", workflow_version="1.0", - sequence_number=1, - type=WorkflowType.CHAT, + workflow_type=WorkflowType.CHAT, graph={"nodes": [], "edges": []}, inputs={"query": "test query"}, started_at=datetime.now(UTC).replace(tzinfo=None), @@ -301,18 +312,18 @@ def test_handle_node_execution_start(workflow_cycle_manager, mock_workflow_execu # Call the method result = workflow_cycle_manager.handle_node_execution_start( - workflow_execution_id=workflow_execution.id, + workflow_execution_id=workflow_execution.id_, event=event, ) # Verify the result assert result.workflow_id == workflow_execution.workflow_id - assert result.workflow_run_id == workflow_execution.id + assert result.workflow_run_id == workflow_execution.id_ assert result.node_execution_id == event.node_execution_id assert result.node_id == event.node_id assert result.node_type == event.node_type assert result.title == event.node_data.title - assert result.status == NodeExecutionStatus.RUNNING + assert result.status == WorkflowNodeExecutionStatus.RUNNING # Verify save was called workflow_cycle_manager._workflow_node_execution_repository.save.assert_called_once_with(result) @@ -323,11 +334,10 @@ def test_get_workflow_execution_or_raise_error(workflow_cycle_manager, mock_work # Create a real WorkflowExecution workflow_execution = WorkflowExecution( - id="test-workflow-run-id", + id_="test-workflow-run-id", workflow_id="test-workflow-id", workflow_version="1.0", - sequence_number=1, - type=WorkflowType.CHAT, + workflow_type=WorkflowType.CHAT, graph={"nodes": [], "edges": []}, inputs={"query": "test query"}, started_at=datetime.now(UTC).replace(tzinfo=None), @@ -385,7 +395,7 @@ def test_handle_workflow_node_execution_success(workflow_cycle_manager): # Verify the result assert result == node_execution - assert result.status == NodeExecutionStatus.SUCCEEDED + assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED # Verify save was called workflow_cycle_manager._workflow_node_execution_repository.save.assert_called_once_with(node_execution) @@ -396,11 +406,10 @@ def test_handle_workflow_run_partial_success(workflow_cycle_manager, mock_workfl # Create a real WorkflowExecution workflow_execution = WorkflowExecution( - id="test-workflow-run-id", + id_="test-workflow-run-id", workflow_id="test-workflow-id", workflow_version="1.0", - sequence_number=1, - type=WorkflowType.CHAT, + workflow_type=WorkflowType.CHAT, graph={"nodes": [], "edges": []}, inputs={"query": "test query"}, started_at=datetime.now(UTC).replace(tzinfo=None), @@ -464,7 +473,7 @@ def test_handle_workflow_node_execution_failed(workflow_cycle_manager): # Verify the result assert result == node_execution - assert result.status == NodeExecutionStatus.FAILED + assert result.status == WorkflowNodeExecutionStatus.FAILED assert result.error == "Test error message" # Verify save was called diff --git a/api/tests/unit_tests/repositories/workflow_node_execution/test_sqlalchemy_repository.py b/api/tests/unit_tests/repositories/workflow_node_execution/test_sqlalchemy_repository.py index 7c5020db02..f3cdfd135b 100644 --- a/api/tests/unit_tests/repositories/workflow_node_execution/test_sqlalchemy_repository.py +++ b/api/tests/unit_tests/repositories/workflow_node_execution/test_sqlalchemy_repository.py @@ -13,12 +13,15 @@ from sqlalchemy.orm import Session, sessionmaker from core.model_runtime.utils.encoders import jsonable_encoder from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository -from core.workflow.entities.node_entities import NodeRunMetadataKey -from core.workflow.entities.node_execution_entities import NodeExecution, NodeExecutionStatus +from core.workflow.entities.workflow_node_execution import ( + NodeExecution, + NodeRunMetadataKey, + WorkflowNodeExecutionStatus, +) from core.workflow.nodes.enums import NodeType -from core.workflow.repository.workflow_node_execution_repository import OrderConfig +from core.workflow.repositories.workflow_node_execution_repository import OrderConfig from models.account import Account, Tenant -from models.workflow import WorkflowNodeExecution, WorkflowNodeExecutionStatus, WorkflowNodeExecutionTriggeredFrom +from models.workflow import WorkflowNodeExecution, WorkflowNodeExecutionTriggeredFrom def configure_mock_execution(mock_execution): @@ -297,7 +300,7 @@ def test_to_db_model(repository): inputs={"input_key": "input_value"}, process_data={"process_key": "process_value"}, outputs={"output_key": "output_value"}, - status=NodeExecutionStatus.RUNNING, + status=WorkflowNodeExecutionStatus.RUNNING, error=None, elapsed_time=1.5, metadata={NodeRunMetadataKey.TOTAL_TOKENS: 100, NodeRunMetadataKey.TOTAL_PRICE: Decimal("0.0")}, @@ -388,7 +391,7 @@ def test_to_domain_model(repository): assert domain_model.inputs == inputs_dict assert domain_model.process_data == process_data_dict assert domain_model.outputs == outputs_dict - assert domain_model.status == NodeExecutionStatus(db_model.status) + assert domain_model.status == WorkflowNodeExecutionStatus(db_model.status) assert domain_model.error == db_model.error assert domain_model.elapsed_time == db_model.elapsed_time assert domain_model.metadata == metadata_dict