mirror of
				https://github.com/langgenius/dify.git
				synced 2025-10-31 02:42:59 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			219 lines
		
	
	
		
			7.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			219 lines
		
	
	
		
			7.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import dataclasses
 | |
| import datetime
 | |
| from collections import defaultdict, deque
 | |
| from collections.abc import Callable
 | |
| from decimal import Decimal
 | |
| from enum import Enum
 | |
| from ipaddress import IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network
 | |
| from pathlib import Path, PurePath
 | |
| from re import Pattern
 | |
| from types import GeneratorType
 | |
| from typing import Any, Literal, Optional, Union
 | |
| from uuid import UUID
 | |
| 
 | |
| from pydantic import BaseModel
 | |
| from pydantic.networks import AnyUrl, NameEmail
 | |
| from pydantic.types import SecretBytes, SecretStr
 | |
| from pydantic_core import Url
 | |
| from pydantic_extra_types.color import Color
 | |
| 
 | |
| 
 | |
| def _model_dump(model: BaseModel, mode: Literal["json", "python"] = "json", **kwargs: Any) -> Any:
 | |
|     return model.model_dump(mode=mode, **kwargs)
 | |
| 
 | |
| 
 | |
| # Taken from Pydantic v1 as is
 | |
| def isoformat(o: Union[datetime.date, datetime.time]) -> str:
 | |
|     return o.isoformat()
 | |
| 
 | |
| 
 | |
| # Taken from Pydantic v1 as is
 | |
| # TODO: pv2 should this return strings instead?
 | |
| def decimal_encoder(dec_value: Decimal) -> Union[int, float]:
 | |
|     """
 | |
|     Encodes a Decimal as int of there's no exponent, otherwise float
 | |
| 
 | |
|     This is useful when we use ConstrainedDecimal to represent Numeric(x,0)
 | |
|     where a integer (but not int typed) is used. Encoding this as a float
 | |
|     results in failed round-tripping between encode and parse.
 | |
|     Our Id type is a prime example of this.
 | |
| 
 | |
|     >>> decimal_encoder(Decimal("1.0"))
 | |
|     1.0
 | |
| 
 | |
|     >>> decimal_encoder(Decimal("1"))
 | |
|     1
 | |
|     """
 | |
|     if dec_value.as_tuple().exponent >= 0:  # type: ignore[operator]
 | |
|         return int(dec_value)
 | |
|     else:
 | |
|         return float(dec_value)
 | |
| 
 | |
| 
 | |
| ENCODERS_BY_TYPE: dict[type[Any], Callable[[Any], Any]] = {
 | |
|     bytes: lambda o: o.decode(),
 | |
|     Color: str,
 | |
|     datetime.date: isoformat,
 | |
|     datetime.datetime: isoformat,
 | |
|     datetime.time: isoformat,
 | |
|     datetime.timedelta: lambda td: td.total_seconds(),
 | |
|     Decimal: decimal_encoder,
 | |
|     Enum: lambda o: o.value,
 | |
|     frozenset: list,
 | |
|     deque: list,
 | |
|     GeneratorType: list,
 | |
|     IPv4Address: str,
 | |
|     IPv4Interface: str,
 | |
|     IPv4Network: str,
 | |
|     IPv6Address: str,
 | |
|     IPv6Interface: str,
 | |
|     IPv6Network: str,
 | |
|     NameEmail: str,
 | |
|     Path: str,
 | |
|     Pattern: lambda o: o.pattern,
 | |
|     SecretBytes: str,
 | |
|     SecretStr: str,
 | |
|     set: list,
 | |
|     UUID: str,
 | |
|     Url: str,
 | |
|     AnyUrl: str,
 | |
| }
 | |
| 
 | |
| 
 | |
| def generate_encoders_by_class_tuples(
 | |
|     type_encoder_map: dict[Any, Callable[[Any], Any]],
 | |
| ) -> dict[Callable[[Any], Any], tuple[Any, ...]]:
 | |
|     encoders_by_class_tuples: dict[Callable[[Any], Any], tuple[Any, ...]] = defaultdict(tuple)
 | |
|     for type_, encoder in type_encoder_map.items():
 | |
|         encoders_by_class_tuples[encoder] += (type_,)
 | |
|     return encoders_by_class_tuples
 | |
| 
 | |
| 
 | |
| encoders_by_class_tuples = generate_encoders_by_class_tuples(ENCODERS_BY_TYPE)
 | |
| 
 | |
| 
 | |
| def jsonable_encoder(
 | |
|     obj: Any,
 | |
|     by_alias: bool = True,
 | |
|     exclude_unset: bool = False,
 | |
|     exclude_defaults: bool = False,
 | |
|     exclude_none: bool = False,
 | |
|     custom_encoder: Optional[dict[Any, Callable[[Any], Any]]] = None,
 | |
|     sqlalchemy_safe: bool = True,
 | |
| ) -> Any:
 | |
|     custom_encoder = custom_encoder or {}
 | |
|     if custom_encoder:
 | |
|         if type(obj) in custom_encoder:
 | |
|             return custom_encoder[type(obj)](obj)
 | |
|         else:
 | |
|             for encoder_type, encoder_instance in custom_encoder.items():
 | |
|                 if isinstance(obj, encoder_type):
 | |
|                     return encoder_instance(obj)
 | |
|     if isinstance(obj, BaseModel):
 | |
|         obj_dict = _model_dump(
 | |
|             obj,
 | |
|             mode="json",
 | |
|             include=None,
 | |
|             exclude=None,
 | |
|             by_alias=by_alias,
 | |
|             exclude_unset=exclude_unset,
 | |
|             exclude_none=exclude_none,
 | |
|             exclude_defaults=exclude_defaults,
 | |
|         )
 | |
|         if "__root__" in obj_dict:
 | |
|             obj_dict = obj_dict["__root__"]
 | |
|         return jsonable_encoder(
 | |
|             obj_dict,
 | |
|             exclude_none=exclude_none,
 | |
|             exclude_defaults=exclude_defaults,
 | |
|             sqlalchemy_safe=sqlalchemy_safe,
 | |
|         )
 | |
|     if dataclasses.is_dataclass(obj):
 | |
|         # FIXME: mypy error, try to fix it instead of using type: ignore
 | |
|         obj_dict = dataclasses.asdict(obj)  # type: ignore
 | |
|         return jsonable_encoder(
 | |
|             obj_dict,
 | |
|             by_alias=by_alias,
 | |
|             exclude_unset=exclude_unset,
 | |
|             exclude_defaults=exclude_defaults,
 | |
|             exclude_none=exclude_none,
 | |
|             custom_encoder=custom_encoder,
 | |
|             sqlalchemy_safe=sqlalchemy_safe,
 | |
|         )
 | |
|     if isinstance(obj, Enum):
 | |
|         return obj.value
 | |
|     if isinstance(obj, PurePath):
 | |
|         return str(obj)
 | |
|     if isinstance(obj, str | int | float | type(None)):
 | |
|         return obj
 | |
|     if isinstance(obj, Decimal):
 | |
|         return format(obj, "f")
 | |
|     if isinstance(obj, dict):
 | |
|         encoded_dict = {}
 | |
|         allowed_keys = set(obj.keys())
 | |
|         for key, value in obj.items():
 | |
|             if (
 | |
|                 (not sqlalchemy_safe or (not isinstance(key, str)) or (not key.startswith("_sa")))
 | |
|                 and (value is not None or not exclude_none)
 | |
|                 and key in allowed_keys
 | |
|             ):
 | |
|                 encoded_key = jsonable_encoder(
 | |
|                     key,
 | |
|                     by_alias=by_alias,
 | |
|                     exclude_unset=exclude_unset,
 | |
|                     exclude_none=exclude_none,
 | |
|                     custom_encoder=custom_encoder,
 | |
|                     sqlalchemy_safe=sqlalchemy_safe,
 | |
|                 )
 | |
|                 encoded_value = jsonable_encoder(
 | |
|                     value,
 | |
|                     by_alias=by_alias,
 | |
|                     exclude_unset=exclude_unset,
 | |
|                     exclude_none=exclude_none,
 | |
|                     custom_encoder=custom_encoder,
 | |
|                     sqlalchemy_safe=sqlalchemy_safe,
 | |
|                 )
 | |
|                 encoded_dict[encoded_key] = encoded_value
 | |
|         return encoded_dict
 | |
|     if isinstance(obj, list | set | frozenset | GeneratorType | tuple | deque):
 | |
|         encoded_list = []
 | |
|         for item in obj:
 | |
|             encoded_list.append(
 | |
|                 jsonable_encoder(
 | |
|                     item,
 | |
|                     by_alias=by_alias,
 | |
|                     exclude_unset=exclude_unset,
 | |
|                     exclude_defaults=exclude_defaults,
 | |
|                     exclude_none=exclude_none,
 | |
|                     custom_encoder=custom_encoder,
 | |
|                     sqlalchemy_safe=sqlalchemy_safe,
 | |
|                 )
 | |
|             )
 | |
|         return encoded_list
 | |
| 
 | |
|     if type(obj) in ENCODERS_BY_TYPE:
 | |
|         return ENCODERS_BY_TYPE[type(obj)](obj)
 | |
|     for encoder, classes_tuple in encoders_by_class_tuples.items():
 | |
|         if isinstance(obj, classes_tuple):
 | |
|             return encoder(obj)
 | |
| 
 | |
|     try:
 | |
|         data = dict(obj)
 | |
|     except Exception as e:
 | |
|         errors: list[Exception] = []
 | |
|         errors.append(e)
 | |
|         try:
 | |
|             data = vars(obj)
 | |
|         except Exception as e:
 | |
|             errors.append(e)
 | |
|             raise ValueError(errors) from e
 | |
|     return jsonable_encoder(
 | |
|         data,
 | |
|         by_alias=by_alias,
 | |
|         exclude_unset=exclude_unset,
 | |
|         exclude_defaults=exclude_defaults,
 | |
|         exclude_none=exclude_none,
 | |
|         custom_encoder=custom_encoder,
 | |
|         sqlalchemy_safe=sqlalchemy_safe,
 | |
|     )
 | 
