mirror of
				https://github.com/langgenius/dify.git
				synced 2025-10-31 02:42:59 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			151 lines
		
	
	
		
			5.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			151 lines
		
	
	
		
			5.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from core.model_runtime.entities.llm_entities import LLMResult
 | |
| from core.model_runtime.entities.message_entities import PromptMessage, SystemPromptMessage, UserPromptMessage
 | |
| from core.tools.__base.tool import Tool
 | |
| from core.tools.__base.tool_runtime import ToolRuntime
 | |
| from core.tools.entities.tool_entities import ToolProviderType
 | |
| from core.tools.utils.model_invocation_utils import ModelInvocationUtils
 | |
| 
 | |
| _SUMMARY_PROMPT = """You are a professional language researcher, you are interested in the language
 | |
| and you can quickly aimed at the main point of an webpage and reproduce it in your own words but
 | |
| retain the original meaning and keep the key points.
 | |
| however, the text you got is too long, what you got is possible a part of the text.
 | |
| Please summarize the text you got.
 | |
| """
 | |
| 
 | |
| 
 | |
| class BuiltinTool(Tool):
 | |
|     """
 | |
|     Builtin tool
 | |
| 
 | |
|     :param meta: the meta data of a tool call processing
 | |
|     """
 | |
| 
 | |
|     provider: str
 | |
| 
 | |
|     def __init__(self, provider: str, **kwargs):
 | |
|         super().__init__(**kwargs)
 | |
|         self.provider = provider
 | |
| 
 | |
|     def fork_tool_runtime(self, runtime: ToolRuntime) -> "BuiltinTool":
 | |
|         """
 | |
|         fork a new tool with metadata
 | |
|         :return: the new tool
 | |
|         """
 | |
|         return self.__class__(
 | |
|             entity=self.entity.model_copy(),
 | |
|             runtime=runtime,
 | |
|             provider=self.provider,
 | |
|         )
 | |
| 
 | |
|     def invoke_model(self, user_id: str, prompt_messages: list[PromptMessage], stop: list[str]) -> LLMResult:
 | |
|         """
 | |
|         invoke model
 | |
| 
 | |
|         :param user_id: the user id
 | |
|         :param prompt_messages: the prompt messages
 | |
|         :param stop: the stop words
 | |
|         :return: the model result
 | |
|         """
 | |
|         # invoke model
 | |
|         return ModelInvocationUtils.invoke(
 | |
|             user_id=user_id,
 | |
|             tenant_id=self.runtime.tenant_id or "",
 | |
|             tool_type="builtin",
 | |
|             tool_name=self.entity.identity.name,
 | |
|             prompt_messages=prompt_messages,
 | |
|         )
 | |
| 
 | |
|     def tool_provider_type(self) -> ToolProviderType:
 | |
|         return ToolProviderType.BUILT_IN
 | |
| 
 | |
|     def get_max_tokens(self) -> int:
 | |
|         """
 | |
|         get max tokens
 | |
| 
 | |
|         :return: the max tokens
 | |
|         """
 | |
|         if self.runtime is None:
 | |
|             raise ValueError("runtime is required")
 | |
| 
 | |
|         return ModelInvocationUtils.get_max_llm_context_tokens(
 | |
|             tenant_id=self.runtime.tenant_id or "",
 | |
|         )
 | |
| 
 | |
|     def get_prompt_tokens(self, prompt_messages: list[PromptMessage]) -> int:
 | |
|         """
 | |
|         get prompt tokens
 | |
| 
 | |
|         :param prompt_messages: the prompt messages
 | |
|         :return: the tokens
 | |
|         """
 | |
|         if self.runtime is None:
 | |
|             raise ValueError("runtime is required")
 | |
| 
 | |
|         return ModelInvocationUtils.calculate_tokens(
 | |
|             tenant_id=self.runtime.tenant_id or "", prompt_messages=prompt_messages
 | |
|         )
 | |
| 
 | |
|     def summary(self, user_id: str, content: str) -> str:
 | |
|         max_tokens = self.get_max_tokens()
 | |
| 
 | |
|         if self.get_prompt_tokens(prompt_messages=[UserPromptMessage(content=content)]) < max_tokens * 0.6:
 | |
|             return content
 | |
| 
 | |
|         def get_prompt_tokens(content: str) -> int:
 | |
|             return self.get_prompt_tokens(
 | |
|                 prompt_messages=[SystemPromptMessage(content=_SUMMARY_PROMPT), UserPromptMessage(content=content)]
 | |
|             )
 | |
| 
 | |
|         def summarize(content: str) -> str:
 | |
|             summary = self.invoke_model(
 | |
|                 user_id=user_id,
 | |
|                 prompt_messages=[SystemPromptMessage(content=_SUMMARY_PROMPT), UserPromptMessage(content=content)],
 | |
|                 stop=[],
 | |
|             )
 | |
| 
 | |
|             assert isinstance(summary.message.content, str)
 | |
|             return summary.message.content
 | |
| 
 | |
|         lines = content.split("\n")
 | |
|         new_lines = []
 | |
|         # split long line into multiple lines
 | |
|         for i in range(len(lines)):
 | |
|             line = lines[i]
 | |
|             if not line.strip():
 | |
|                 continue
 | |
|             if len(line) < max_tokens * 0.5:
 | |
|                 new_lines.append(line)
 | |
|             elif get_prompt_tokens(line) > max_tokens * 0.7:
 | |
|                 while get_prompt_tokens(line) > max_tokens * 0.7:
 | |
|                     new_lines.append(line[: int(max_tokens * 0.5)])
 | |
|                     line = line[int(max_tokens * 0.5) :]
 | |
|                 new_lines.append(line)
 | |
|             else:
 | |
|                 new_lines.append(line)
 | |
| 
 | |
|         # merge lines into messages with max tokens
 | |
|         messages: list[str] = []
 | |
|         for j in new_lines:
 | |
|             if len(messages) == 0:
 | |
|                 messages.append(j)
 | |
|             else:
 | |
|                 if len(messages[-1]) + len(j) < max_tokens * 0.5:
 | |
|                     messages[-1] += j
 | |
|                 if get_prompt_tokens(messages[-1] + j) > max_tokens * 0.7:
 | |
|                     messages.append(j)
 | |
|                 else:
 | |
|                     messages[-1] += j
 | |
| 
 | |
|         summaries = []
 | |
|         for i in range(len(messages)):
 | |
|             message = messages[i]
 | |
|             summary = summarize(message)
 | |
|             summaries.append(summary)
 | |
| 
 | |
|         result = "\n".join(summaries)
 | |
| 
 | |
|         if self.get_prompt_tokens(prompt_messages=[UserPromptMessage(content=result)]) > max_tokens * 0.7:
 | |
|             return self.summary(user_id=user_id, content=result)
 | |
| 
 | |
|         return result
 | 
