From f4dd22b9cb6eff4b7bfc09c17b81b5f52f45eeb5 Mon Sep 17 00:00:00 2001 From: jyong <718720800@qq.com> Date: Fri, 25 Jul 2025 15:17:03 +0800 Subject: [PATCH] r2 transform --- .../app/apps/pipeline/pipeline_generator.py | 65 ++++++++++++++++--- api/core/app/apps/pipeline/pipeline_runner.py | 12 +++- 2 files changed, 68 insertions(+), 9 deletions(-) diff --git a/api/core/app/apps/pipeline/pipeline_generator.py b/api/core/app/apps/pipeline/pipeline_generator.py index 271abbfe2f..b37b2efdc6 100644 --- a/api/core/app/apps/pipeline/pipeline_generator.py +++ b/api/core/app/apps/pipeline/pipeline_generator.py @@ -11,7 +11,8 @@ from typing import Any, Literal, Optional, Union, cast, overload from flask import Flask, current_app from pydantic import ValidationError -from sqlalchemy.orm import sessionmaker +from sqlalchemy import select +from sqlalchemy.orm import Session, sessionmaker import contexts from configs import dify_config @@ -38,6 +39,7 @@ from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchem from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository +from core.workflow.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader from extensions.ext_database import db from libs.flask_utils import preserve_flask_contexts from models import Account, EndUser, Workflow, WorkflowNodeExecutionTriggeredFrom @@ -46,6 +48,7 @@ from models.enums import WorkflowRunTriggeredFrom from models.model import AppMode from services.dataset_service import DocumentService from services.datasource_provider_service import DatasourceProviderService +from services.workflow_draft_variable_service import DraftVarLoader, WorkflowDraftVariableService logger = logging.getLogger(__name__) @@ -275,6 +278,7 @@ class PipelineGenerator(BaseAppGenerator): workflow_execution_repository: WorkflowExecutionRepository, workflow_node_execution_repository: WorkflowNodeExecutionRepository, streaming: bool = True, + variable_loader: VariableLoader = DUMMY_VARIABLE_LOADER, workflow_thread_pool_id: Optional[str] = None, ) -> Union[Mapping[str, Any], Generator[str | Mapping[str, Any], None, None]]: """ @@ -312,6 +316,7 @@ class PipelineGenerator(BaseAppGenerator): "queue_manager": queue_manager, "application_generate_entity": application_generate_entity, "workflow_thread_pool_id": workflow_thread_pool_id, + "variable_loader": variable_loader, }, ) @@ -404,6 +409,13 @@ class PipelineGenerator(BaseAppGenerator): app_id=application_generate_entity.app_config.app_id, triggered_from=WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP, ) + draft_var_srv = WorkflowDraftVariableService(db.session()) + draft_var_srv.prefill_conversation_variable_default_values(workflow) + var_loader = DraftVarLoader( + engine=db.engine, + app_id=application_generate_entity.app_config.app_id, + tenant_id=application_generate_entity.app_config.tenant_id, + ) return self._generate( flask_app=current_app._get_current_object(), # type: ignore @@ -415,6 +427,7 @@ class PipelineGenerator(BaseAppGenerator): workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, streaming=streaming, + variable_loader=var_loader, ) def single_loop_generate( @@ -489,6 +502,13 @@ class PipelineGenerator(BaseAppGenerator): app_id=application_generate_entity.app_config.app_id, triggered_from=WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP, ) + draft_var_srv = WorkflowDraftVariableService(db.session()) + draft_var_srv.prefill_conversation_variable_default_values(workflow) + var_loader = DraftVarLoader( + engine=db.engine, + app_id=application_generate_entity.app_config.app_id, + tenant_id=application_generate_entity.app_config.tenant_id, + ) return self._generate( flask_app=current_app._get_current_object(), # type: ignore @@ -500,6 +520,7 @@ class PipelineGenerator(BaseAppGenerator): workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, streaming=streaming, + variable_loader=var_loader, ) def _generate_worker( @@ -508,6 +529,7 @@ class PipelineGenerator(BaseAppGenerator): application_generate_entity: RagPipelineGenerateEntity, queue_manager: AppQueueManager, context: contextvars.Context, + variable_loader: VariableLoader, workflow_thread_pool_id: Optional[str] = None, ) -> None: """ @@ -521,14 +543,41 @@ class PipelineGenerator(BaseAppGenerator): with preserve_flask_contexts(flask_app, context_vars=context): try: - # workflow app - runner = PipelineRunner( - application_generate_entity=application_generate_entity, - queue_manager=queue_manager, - workflow_thread_pool_id=workflow_thread_pool_id, - ) + with Session(db.engine, expire_on_commit=False) as session: + workflow = session.scalar( + select(Workflow).where( + Workflow.tenant_id == application_generate_entity.app_config.tenant_id, + Workflow.app_id == application_generate_entity.app_config.app_id, + Workflow.id == application_generate_entity.app_config.workflow_id, + ) + ) + if workflow is None: + raise ValueError("Workflow not found") - runner.run() + # Determine system_user_id based on invocation source + is_external_api_call = application_generate_entity.invoke_from in { + InvokeFrom.WEB_APP, + InvokeFrom.SERVICE_API, + } + + if is_external_api_call: + # For external API calls, use end user's session ID + end_user = session.scalar(select(EndUser).where(EndUser.id == application_generate_entity.user_id)) + system_user_id = end_user.session_id if end_user else "" + else: + # For internal calls, use the original user ID + system_user_id = application_generate_entity.user_id + # workflow app + runner = PipelineRunner( + application_generate_entity=application_generate_entity, + queue_manager=queue_manager, + workflow_thread_pool_id=workflow_thread_pool_id, + variable_loader=variable_loader, + workflow=workflow, + system_user_id=system_user_id, + ) + + runner.run() except GenerateTaskStoppedError: pass except InvokeAuthorizationError: diff --git a/api/core/app/apps/pipeline/pipeline_runner.py b/api/core/app/apps/pipeline/pipeline_runner.py index 6cdf633ca6..51c4865ad1 100644 --- a/api/core/app/apps/pipeline/pipeline_runner.py +++ b/api/core/app/apps/pipeline/pipeline_runner.py @@ -17,6 +17,7 @@ from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.event import GraphEngineEvent, GraphRunFailedEvent from core.workflow.graph_engine.entities.graph import Graph from core.workflow.system_variable import SystemVariable +from core.workflow.variable_loader import VariableLoader from core.workflow.workflow_entry import WorkflowEntry from extensions.ext_database import db from models.dataset import Document, Pipeline @@ -36,6 +37,9 @@ class PipelineRunner(WorkflowBasedAppRunner): self, application_generate_entity: RagPipelineGenerateEntity, queue_manager: AppQueueManager, + variable_loader: VariableLoader, + workflow: Workflow, + system_user_id: str, workflow_thread_pool_id: Optional[str] = None, ) -> None: """ @@ -43,9 +47,15 @@ class PipelineRunner(WorkflowBasedAppRunner): :param queue_manager: application queue manager :param workflow_thread_pool_id: workflow thread pool id """ + super().__init__( + queue_manager=queue_manager, + variable_loader=variable_loader, + app_id=application_generate_entity.app_config.app_id, + ) self.application_generate_entity = application_generate_entity - self.queue_manager = queue_manager self.workflow_thread_pool_id = workflow_thread_pool_id + self._workflow = workflow + self._sys_user_id = system_user_id def _get_app_id(self) -> str: return self.application_generate_entity.app_config.app_id