Guangdong Liu 3ff52f1809
feat: Enhance response validation and parsing in tool.py (#23456)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2025-08-07 09:04:51 +08:00

413 lines
18 KiB
Python

import json
from collections.abc import Generator
from dataclasses import dataclass
from os import getenv
from typing import Any, Optional, Union
from urllib.parse import urlencode
import httpx
from core.file.file_manager import download
from core.helper import ssrf_proxy
from core.tools.__base.tool import Tool
from core.tools.__base.tool_runtime import ToolRuntime
from core.tools.entities.tool_bundle import ApiToolBundle
from core.tools.entities.tool_entities import ToolEntity, ToolInvokeMessage, ToolProviderType
from core.tools.errors import ToolInvokeError, ToolParameterValidationError, ToolProviderCredentialValidationError
API_TOOL_DEFAULT_TIMEOUT = (
int(getenv("API_TOOL_DEFAULT_CONNECT_TIMEOUT", "10")),
int(getenv("API_TOOL_DEFAULT_READ_TIMEOUT", "60")),
)
@dataclass
class ParsedResponse:
"""Represents a parsed HTTP response with type information"""
content: Union[str, dict]
is_json: bool
def to_string(self) -> str:
"""Convert response to string format for credential validation"""
if isinstance(self.content, dict):
return json.dumps(self.content, ensure_ascii=False)
return str(self.content)
class ApiTool(Tool):
"""
Api tool
"""
def __init__(self, entity: ToolEntity, api_bundle: ApiToolBundle, runtime: ToolRuntime, provider_id: str):
super().__init__(entity, runtime)
self.api_bundle = api_bundle
self.provider_id = provider_id
def fork_tool_runtime(self, runtime: ToolRuntime):
"""
fork a new tool with metadata
:return: the new tool
"""
if self.api_bundle is None:
raise ValueError("api_bundle is required")
return self.__class__(
entity=self.entity,
api_bundle=self.api_bundle.model_copy(),
runtime=runtime,
provider_id=self.provider_id,
)
def validate_credentials(
self, credentials: dict[str, Any], parameters: dict[str, Any], format_only: bool = False
) -> str:
"""
validate the credentials for Api tool
"""
# assemble validate request and request parameters
headers = self.assembling_request(parameters)
if format_only:
return ""
response = self.do_http_request(self.api_bundle.server_url, self.api_bundle.method, headers, parameters)
# validate response
parsed_response = self.validate_and_parse_response(response)
# For credential validation, always return as string
return parsed_response.to_string()
def tool_provider_type(self) -> ToolProviderType:
return ToolProviderType.API
def assembling_request(self, parameters: dict[str, Any]) -> dict[str, Any]:
if self.runtime is None:
raise ToolProviderCredentialValidationError("runtime not initialized")
headers = {}
if self.runtime is None:
raise ValueError("runtime is required")
credentials = self.runtime.credentials or {}
if "auth_type" not in credentials:
raise ToolProviderCredentialValidationError("Missing auth_type")
if credentials["auth_type"] in ("api_key_header", "api_key"): # backward compatibility:
api_key_header = "Authorization"
if "api_key_header" in credentials:
api_key_header = credentials["api_key_header"]
if "api_key_value" not in credentials:
raise ToolProviderCredentialValidationError("Missing api_key_value")
elif not isinstance(credentials["api_key_value"], str):
raise ToolProviderCredentialValidationError("api_key_value must be a string")
if "api_key_header_prefix" in credentials:
api_key_header_prefix = credentials["api_key_header_prefix"]
if api_key_header_prefix == "basic" and credentials["api_key_value"]:
credentials["api_key_value"] = f"Basic {credentials['api_key_value']}"
elif api_key_header_prefix == "bearer" and credentials["api_key_value"]:
credentials["api_key_value"] = f"Bearer {credentials['api_key_value']}"
elif api_key_header_prefix == "custom":
pass
headers[api_key_header] = credentials["api_key_value"]
elif credentials["auth_type"] == "api_key_query":
# For query parameter authentication, we don't add anything to headers
# The query parameter will be added in do_http_request method
pass
needed_parameters = [parameter for parameter in (self.api_bundle.parameters or []) if parameter.required]
for parameter in needed_parameters:
if parameter.required and parameter.name not in parameters:
if parameter.default is not None:
parameters[parameter.name] = parameter.default
else:
raise ToolParameterValidationError(f"Missing required parameter {parameter.name}")
return headers
def validate_and_parse_response(self, response: httpx.Response) -> ParsedResponse:
"""
validate the response and return parsed content with type information
:return: ParsedResponse with content and is_json flag
"""
if isinstance(response, httpx.Response):
if response.status_code >= 400:
raise ToolInvokeError(f"Request failed with status code {response.status_code} and {response.text}")
if not response.content:
return ParsedResponse(
"Empty response from the tool, please check your parameters and try again.", False
)
# Check content type
content_type = response.headers.get("content-type", "").lower()
is_json_content_type = "application/json" in content_type
# Try to parse as JSON
try:
json_response = response.json()
# If content-type indicates JSON, return as JSON object
if is_json_content_type:
return ParsedResponse(json_response, True)
else:
# If content-type doesn't indicate JSON, treat as text regardless of content
return ParsedResponse(response.text, False)
except Exception:
# Not valid JSON, return as text
return ParsedResponse(response.text, False)
else:
raise ValueError(f"Invalid response type {type(response)}")
@staticmethod
def get_parameter_value(parameter, parameters):
if parameter["name"] in parameters:
return parameters[parameter["name"]]
elif parameter.get("required", False):
raise ToolParameterValidationError(f"Missing required parameter {parameter['name']}")
else:
return (parameter.get("schema", {}) or {}).get("default", "")
def do_http_request(
self, url: str, method: str, headers: dict[str, Any], parameters: dict[str, Any]
) -> httpx.Response:
"""
do http request depending on api bundle
"""
method = method.lower()
params = {}
path_params = {}
# FIXME: body should be a dict[str, Any] but it changed a lot in this function
body: Any = {}
cookies = {}
files = []
# Add API key to query parameters if auth_type is api_key_query
if self.runtime and self.runtime.credentials:
credentials = self.runtime.credentials
if credentials.get("auth_type") == "api_key_query":
api_key_query_param = credentials.get("api_key_query_param", "key")
api_key_value = credentials.get("api_key_value")
if api_key_value:
params[api_key_query_param] = api_key_value
# check parameters
for parameter in self.api_bundle.openapi.get("parameters", []):
value = self.get_parameter_value(parameter, parameters)
if parameter["in"] == "path":
path_params[parameter["name"]] = value
elif parameter["in"] == "query":
if value != "":
params[parameter["name"]] = value
elif parameter["in"] == "cookie":
cookies[parameter["name"]] = value
elif parameter["in"] == "header":
headers[parameter["name"]] = str(value)
# check if there is a request body and handle it
if "requestBody" in self.api_bundle.openapi and self.api_bundle.openapi["requestBody"] is not None:
# handle json request body
if "content" in self.api_bundle.openapi["requestBody"]:
for content_type in self.api_bundle.openapi["requestBody"]["content"]:
headers["Content-Type"] = content_type
body_schema = self.api_bundle.openapi["requestBody"]["content"][content_type]["schema"]
# handle ref schema
if "$ref" in body_schema:
ref_path = body_schema["$ref"].split("/")
ref_name = ref_path[-1]
if (
"components" in self.api_bundle.openapi
and "schemas" in self.api_bundle.openapi["components"]
):
if ref_name in self.api_bundle.openapi["components"]["schemas"]:
body_schema = self.api_bundle.openapi["components"]["schemas"][ref_name]
required = body_schema.get("required", [])
properties = body_schema.get("properties", {})
for name, property in properties.items():
if name in parameters:
# multiple file upload: if the type is array and the items have format as binary
if property.get("type") == "array" and property.get("items", {}).get("format") == "binary":
# parameters[name] should be a list of file objects.
for f in parameters[name]:
files.append((name, (f.filename, download(f), f.mime_type)))
elif property.get("format") == "binary":
f = parameters[name]
files.append((name, (f.filename, download(f), f.mime_type)))
elif "$ref" in property:
body[name] = parameters[name]
else:
# convert type
body[name] = self._convert_body_property_type(property, parameters[name])
elif name in required:
raise ToolParameterValidationError(
f"Missing required parameter {name} in operation {self.api_bundle.operation_id}"
)
elif "default" in property:
body[name] = property["default"]
else:
# omit optional parameters that weren't provided, instead of setting them to None
pass
break
# replace path parameters
for name, value in path_params.items():
url = url.replace(f"{{{name}}}", f"{value}")
# parse http body data if needed
if "Content-Type" in headers:
if headers["Content-Type"] == "application/json":
body = json.dumps(body)
elif headers["Content-Type"] == "application/x-www-form-urlencoded":
body = urlencode(body)
else:
body = body
# if there is a file upload, remove the Content-Type header
# so that httpx can automatically generate the boundary header required for multipart/form-data.
# issue: https://github.com/langgenius/dify/issues/13684
# reference: https://stackoverflow.com/questions/39280438/fetch-missing-boundary-in-multipart-form-data-post
if files:
headers.pop("Content-Type", None)
if method in {
"get",
"head",
"post",
"put",
"delete",
"patch",
"options",
"GET",
"POST",
"PUT",
"PATCH",
"DELETE",
"HEAD",
"OPTIONS",
}:
response: httpx.Response = getattr(ssrf_proxy, method.lower())(
url,
params=params,
headers=headers,
cookies=cookies,
data=body,
files=files,
timeout=API_TOOL_DEFAULT_TIMEOUT,
follow_redirects=True,
)
return response
else:
raise ValueError(f"Invalid http method {method}")
def _convert_body_property_any_of(
self, property: dict[str, Any], value: Any, any_of: list[dict[str, Any]], max_recursive=10
) -> Any:
if max_recursive <= 0:
raise Exception("Max recursion depth reached")
for option in any_of or []:
try:
if "type" in option:
# Attempt to convert the value based on the type.
if option["type"] == "integer" or option["type"] == "int":
return int(value)
elif option["type"] == "number":
if "." in str(value):
return float(value)
else:
return int(value)
elif option["type"] == "string":
return str(value)
elif option["type"] == "boolean":
if str(value).lower() in {"true", "1"}:
return True
elif str(value).lower() in {"false", "0"}:
return False
else:
continue # Not a boolean, try next option
elif option["type"] == "null" and not value:
return None
else:
continue # Unsupported type, try next option
elif "anyOf" in option and isinstance(option["anyOf"], list):
# Recursive call to handle nested anyOf
return self._convert_body_property_any_of(property, value, option["anyOf"], max_recursive - 1)
except ValueError:
continue # Conversion failed, try next option
# If no option succeeded, you might want to return the value as is or raise an error
return value # or raise ValueError(f"Cannot convert value '{value}' to any specified type in anyOf")
def _convert_body_property_type(self, property: dict[str, Any], value: Any) -> Any:
try:
if "type" in property:
if property["type"] == "integer" or property["type"] == "int":
return int(value)
elif property["type"] == "number":
# check if it is a float
if "." in str(value):
return float(value)
else:
return int(value)
elif property["type"] == "string":
return str(value)
elif property["type"] == "boolean":
return bool(value)
elif property["type"] == "null":
if value is None:
return None
elif property["type"] == "object" or property["type"] == "array":
if isinstance(value, str):
try:
return json.loads(value)
except ValueError:
return value
elif isinstance(value, dict):
return value
else:
return value
else:
raise ValueError(f"Invalid type {property['type']} for property {property}")
elif "anyOf" in property and isinstance(property["anyOf"], list):
return self._convert_body_property_any_of(property, value, property["anyOf"])
except ValueError:
return value
def _invoke(
self,
user_id: str,
tool_parameters: dict[str, Any],
conversation_id: Optional[str] = None,
app_id: Optional[str] = None,
message_id: Optional[str] = None,
) -> Generator[ToolInvokeMessage, None, None]:
"""
invoke http request
"""
response: httpx.Response | str = ""
# assemble request
headers = self.assembling_request(tool_parameters)
# do http request
response = self.do_http_request(self.api_bundle.server_url, self.api_bundle.method, headers, tool_parameters)
# validate response
parsed_response = self.validate_and_parse_response(response)
# assemble invoke message based on response type
if parsed_response.is_json and isinstance(parsed_response.content, dict):
yield self.create_json_message(parsed_response.content)
else:
# Convert to string if needed and create text message
text_response = (
parsed_response.content if isinstance(parsed_response.content, str) else str(parsed_response.content)
)
yield self.create_text_message(text_response)