This commit is contained in:
jyong 2025-07-14 17:33:08 +08:00
parent 3e5772c50c
commit a919e3e135
2 changed files with 15 additions and 70 deletions

View File

@ -471,38 +471,6 @@ class RagPipelineDraftDatasourceNodeRunApi(Resource):
)
class RagPipelinePublishedNodeRunApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_rag_pipeline
def post(self, pipeline: Pipeline, node_id: str):
"""
Run rag pipeline datasource
"""
# The role of the current user in the ta table must be admin, owner, or editor
if not current_user.is_editor:
raise Forbidden()
if not isinstance(current_user, Account):
raise Forbidden()
parser = reqparse.RequestParser()
parser.add_argument("inputs", type=dict, required=True, nullable=False, location="json")
args = parser.parse_args()
inputs = args.get("inputs")
if inputs == None:
raise ValueError("missing inputs")
rag_pipeline_service = RagPipelineService()
workflow_node_execution = rag_pipeline_service.run_published_workflow_node(
pipeline=pipeline, node_id=node_id, user_inputs=inputs, account=current_user
)
return workflow_node_execution
class RagPipelineDraftNodeRunApi(Resource):
@setup_required
@login_required
@ -1027,11 +995,6 @@ api.add_resource(
"/rag/pipelines/<uuid:pipeline_id>/workflows/draft/iteration/nodes/<string:node_id>/run",
)
api.add_resource(
RagPipelinePublishedNodeRunApi,
"/rag/pipelines/<uuid:pipeline_id>/workflows/published/nodes/<string:node_id>/run",
)
api.add_resource(
RagPipelineDraftRunLoopNodeApi,
"/rag/pipelines/<uuid:pipeline_id>/workflows/draft/loop/nodes/<string:node_id>/run",

View File

@ -36,6 +36,7 @@ from core.rag.entities.event import (
from core.repositories.sqlalchemy_workflow_node_execution_repository import SQLAlchemyWorkflowNodeExecutionRepository
from core.variables.variables import Variable
from core.workflow.entities.node_entities import NodeRunResult
from core.workflow.entities.variable_pool import VariablePool
from core.workflow.entities.workflow_node_execution import (
WorkflowNodeExecution,
WorkflowNodeExecutionStatus,
@ -50,6 +51,7 @@ from core.workflow.nodes.event.types import NodeEvent
from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
from core.workflow.repositories.workflow_node_execution_repository import OrderConfig
from core.workflow.workflow_entry import WorkflowEntry
from core.workflow.workflow_loader import VariableLoader
from extensions.ext_database import db
from libs.infinite_scroll_pagination import InfiniteScrollPagination
from models.account import Account
@ -71,6 +73,7 @@ from services.entities.knowledge_entities.rag_pipeline_entities import (
)
from services.errors.app import WorkflowHashNotEqualError
from services.rag_pipeline.pipeline_template.pipeline_template_factory import PipelineTemplateRetrievalFactory
from services.workflow_draft_variable_service import DraftVarLoader
logger = logging.getLogger(__name__)
@ -405,6 +408,18 @@ class RagPipelineService:
node_id=node_id,
user_inputs=user_inputs,
user_id=account.id,
variable_pool=VariablePool(
system_variables={},
user_inputs=user_inputs,
environment_variables=[],
conversation_variables=[],
rag_pipeline_variables=[],
),
variable_loader=DraftVarLoader(
engine=db.engine,
app_id=pipeline.id,
tenant_id=pipeline.tenant_id,
),
),
start_at=start_at,
tenant_id=pipeline.tenant_id,
@ -417,39 +432,6 @@ class RagPipelineService:
return workflow_node_execution
def run_published_workflow_node(
self, pipeline: Pipeline, node_id: str, user_inputs: dict, account: Account
) -> WorkflowNodeExecution:
"""
Run published workflow node
"""
# fetch published workflow by app_model
published_workflow = self.get_published_workflow(pipeline=pipeline)
if not published_workflow:
raise ValueError("Workflow not initialized")
# run draft workflow node
start_at = time.perf_counter()
workflow_node_execution = self._handle_node_run_result(
getter=lambda: WorkflowEntry.single_step_run(
workflow=published_workflow,
node_id=node_id,
user_inputs=user_inputs,
user_id=account.id,
),
start_at=start_at,
tenant_id=pipeline.tenant_id,
node_id=node_id,
)
workflow_node_execution.workflow_id = published_workflow.id
db.session.add(workflow_node_execution)
db.session.commit()
return workflow_node_execution
def run_datasource_workflow_node(
self,
pipeline: Pipeline,