From a8b676ade080fe678ce4fe270d24a48af0b51f58 Mon Sep 17 00:00:00 2001 From: jyong <718720800@qq.com> Date: Fri, 4 Jul 2025 18:05:58 +0800 Subject: [PATCH] r2 --- .../datasets/rag_pipeline/datasource_auth.py | 4 +- .../datasource/datasource_file_manager.py | 3 + .../nodes/datasource/datasource_node.py | 73 ++++++++++++++++++- 3 files changed, 78 insertions(+), 2 deletions(-) diff --git a/api/controllers/console/datasets/rag_pipeline/datasource_auth.py b/api/controllers/console/datasets/rag_pipeline/datasource_auth.py index 124d45f513..95cd080d8f 100644 --- a/api/controllers/console/datasets/rag_pipeline/datasource_auth.py +++ b/api/controllers/console/datasets/rag_pipeline/datasource_auth.py @@ -1,3 +1,5 @@ +import random + from flask import redirect, request from flask_login import current_user # type: ignore from flask_restful import ( # type: ignore @@ -109,7 +111,7 @@ class DatasourceAuth(Resource): provider=args["provider"], plugin_id=args["plugin_id"], credentials=args["credentials"], - name=args["name"], + name="test" + str(random.randint(1, 1000000)), ) except CredentialsValidateFailedError as ex: raise ValueError(str(ex)) diff --git a/api/core/datasource/datasource_file_manager.py b/api/core/datasource/datasource_file_manager.py index 51296b64d2..858bb79a9b 100644 --- a/api/core/datasource/datasource_file_manager.py +++ b/api/core/datasource/datasource_file_manager.py @@ -1,4 +1,5 @@ import base64 +from datetime import datetime import hashlib import hmac import logging @@ -91,6 +92,7 @@ class DatasourceFileManager: used=False, hash=hashlib.sha3_256(file_binary).hexdigest(), source_url="", + created_at=datetime.now(), ) db.session.add(upload_file) @@ -138,6 +140,7 @@ class DatasourceFileManager: used=False, hash=hashlib.sha3_256(blob).hexdigest(), source_url=file_url, + created_at=datetime.now(), ) db.session.add(upload_file) diff --git a/api/core/workflow/nodes/datasource/datasource_node.py b/api/core/workflow/nodes/datasource/datasource_node.py index 01f6f51648..d81e6a99e4 100644 --- a/api/core/workflow/nodes/datasource/datasource_node.py +++ b/api/core/workflow/nodes/datasource/datasource_node.py @@ -147,10 +147,12 @@ class DatasourceNode(BaseNode[DatasourceNodeData]): provider_type=datasource_type, ) ) - yield from self._transform_message( + yield from self._transform_datasource_file_message( messages=online_drive_result, parameters_for_log=parameters_for_log, datasource_info=datasource_info, + variable_pool=variable_pool, + datasource_type=datasource_type, ) case DatasourceProviderType.WEBSITE_CRAWL: yield RunCompletedEvent( @@ -466,3 +468,72 @@ class DatasourceNode(BaseNode[DatasourceNodeData]): @classmethod def version(cls) -> str: return "1" + + def _transform_datasource_file_message( + self, + messages: Generator[DatasourceMessage, None, None], + parameters_for_log: dict[str, Any], + datasource_info: dict[str, Any], + variable_pool: VariablePool, + datasource_type: DatasourceProviderType, + ) -> Generator: + """ + Convert ToolInvokeMessages into tuple[plain_text, files] + """ + # transform message and handle file storage + message_stream = DatasourceFileMessageTransformer.transform_datasource_invoke_messages( + messages=messages, + user_id=self.user_id, + tenant_id=self.tenant_id, + conversation_id=None, + ) + file = None + for message in message_stream: + if message.type == DatasourceMessage.MessageType.BINARY_LINK: + assert isinstance(message.message, DatasourceMessage.TextMessage) + + url = message.message.text + if message.meta: + transfer_method = message.meta.get("transfer_method", FileTransferMethod.DATASOURCE_FILE) + else: + transfer_method = FileTransferMethod.DATASOURCE_FILE + + datasource_file_id = str(url).split("/")[-1].split(".")[0] + + with Session(db.engine) as session: + stmt = select(UploadFile).where(UploadFile.id == datasource_file_id) + datasource_file = session.scalar(stmt) + if datasource_file is None: + raise ToolFileError(f"Tool file {datasource_file_id} does not exist") + + mapping = { + "datasource_file_id": datasource_file_id, + "type": file_factory.get_file_type_by_mime_type(datasource_file.mime_type), + "transfer_method": transfer_method, + "url": url, + } + file = file_factory.build_from_mapping( + mapping=mapping, + tenant_id=self.tenant_id, + ) + variable_pool.add([self.node_id, "file"], [file]) + for key, value in datasource_info.items(): + # construct new key list + new_key_list = ["file", key] + self._append_variables_recursively( + variable_pool=variable_pool, + node_id=self.node_id, + variable_key_list=new_key_list, + variable_value=value, + ) + yield RunCompletedEvent( + run_result=NodeRunResult( + status=WorkflowNodeExecutionStatus.SUCCEEDED, + inputs=parameters_for_log, + metadata={WorkflowNodeExecutionMetadataKey.DATASOURCE_INFO: datasource_info}, + outputs={ + "file_info": datasource_info, + "datasource_type": datasource_type, + }, + ) + )