diff --git a/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py b/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py index bb7e27a4bc..903bd30286 100644 --- a/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py +++ b/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py @@ -471,38 +471,6 @@ class RagPipelineDraftDatasourceNodeRunApi(Resource): ) -class RagPipelinePublishedNodeRunApi(Resource): - @setup_required - @login_required - @account_initialization_required - @get_rag_pipeline - def post(self, pipeline: Pipeline, node_id: str): - """ - Run rag pipeline datasource - """ - # The role of the current user in the ta table must be admin, owner, or editor - if not current_user.is_editor: - raise Forbidden() - - if not isinstance(current_user, Account): - raise Forbidden() - - parser = reqparse.RequestParser() - parser.add_argument("inputs", type=dict, required=True, nullable=False, location="json") - args = parser.parse_args() - - inputs = args.get("inputs") - if inputs == None: - raise ValueError("missing inputs") - - rag_pipeline_service = RagPipelineService() - workflow_node_execution = rag_pipeline_service.run_published_workflow_node( - pipeline=pipeline, node_id=node_id, user_inputs=inputs, account=current_user - ) - - return workflow_node_execution - - class RagPipelineDraftNodeRunApi(Resource): @setup_required @login_required @@ -1027,11 +995,6 @@ api.add_resource( "/rag/pipelines//workflows/draft/iteration/nodes//run", ) -api.add_resource( - RagPipelinePublishedNodeRunApi, - "/rag/pipelines//workflows/published/nodes//run", -) - api.add_resource( RagPipelineDraftRunLoopNodeApi, "/rag/pipelines//workflows/draft/loop/nodes//run", diff --git a/api/services/rag_pipeline/rag_pipeline.py b/api/services/rag_pipeline/rag_pipeline.py index 1e7cc4c582..4c7712ae33 100644 --- a/api/services/rag_pipeline/rag_pipeline.py +++ b/api/services/rag_pipeline/rag_pipeline.py @@ -36,6 +36,7 @@ from core.rag.entities.event import ( from core.repositories.sqlalchemy_workflow_node_execution_repository import SQLAlchemyWorkflowNodeExecutionRepository from core.variables.variables import Variable from core.workflow.entities.node_entities import NodeRunResult +from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.workflow_node_execution import ( WorkflowNodeExecution, WorkflowNodeExecutionStatus, @@ -50,6 +51,7 @@ from core.workflow.nodes.event.types import NodeEvent from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING from core.workflow.repositories.workflow_node_execution_repository import OrderConfig from core.workflow.workflow_entry import WorkflowEntry +from core.workflow.workflow_loader import VariableLoader from extensions.ext_database import db from libs.infinite_scroll_pagination import InfiniteScrollPagination from models.account import Account @@ -71,6 +73,7 @@ from services.entities.knowledge_entities.rag_pipeline_entities import ( ) from services.errors.app import WorkflowHashNotEqualError from services.rag_pipeline.pipeline_template.pipeline_template_factory import PipelineTemplateRetrievalFactory +from services.workflow_draft_variable_service import DraftVarLoader logger = logging.getLogger(__name__) @@ -405,6 +408,18 @@ class RagPipelineService: node_id=node_id, user_inputs=user_inputs, user_id=account.id, + variable_pool=VariablePool( + system_variables={}, + user_inputs=user_inputs, + environment_variables=[], + conversation_variables=[], + rag_pipeline_variables=[], + ), + variable_loader=DraftVarLoader( + engine=db.engine, + app_id=pipeline.id, + tenant_id=pipeline.tenant_id, + ), ), start_at=start_at, tenant_id=pipeline.tenant_id, @@ -417,39 +432,6 @@ class RagPipelineService: return workflow_node_execution - def run_published_workflow_node( - self, pipeline: Pipeline, node_id: str, user_inputs: dict, account: Account - ) -> WorkflowNodeExecution: - """ - Run published workflow node - """ - # fetch published workflow by app_model - published_workflow = self.get_published_workflow(pipeline=pipeline) - if not published_workflow: - raise ValueError("Workflow not initialized") - - # run draft workflow node - start_at = time.perf_counter() - - workflow_node_execution = self._handle_node_run_result( - getter=lambda: WorkflowEntry.single_step_run( - workflow=published_workflow, - node_id=node_id, - user_inputs=user_inputs, - user_id=account.id, - ), - start_at=start_at, - tenant_id=pipeline.tenant_id, - node_id=node_id, - ) - - workflow_node_execution.workflow_id = published_workflow.id - - db.session.add(workflow_node_execution) - db.session.commit() - - return workflow_node_execution - def run_datasource_workflow_node( self, pipeline: Pipeline,