| 
									
										
										
										
											2024-01-02 23:42:00 +08:00
										 |  |  | import dataclasses | 
					
						
							|  |  |  | import datetime | 
					
						
							|  |  |  | from collections import defaultdict, deque | 
					
						
							| 
									
										
										
										
											2024-02-09 15:21:33 +08:00
										 |  |  | from collections.abc import Callable | 
					
						
							| 
									
										
										
										
											2024-01-02 23:42:00 +08:00
										 |  |  | from decimal import Decimal | 
					
						
							|  |  |  | from enum import Enum | 
					
						
							| 
									
										
										
										
											2024-01-12 12:34:01 +08:00
										 |  |  | from ipaddress import IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network | 
					
						
							| 
									
										
										
										
											2024-01-02 23:42:00 +08:00
										 |  |  | from pathlib import Path, PurePath | 
					
						
							|  |  |  | from re import Pattern | 
					
						
							|  |  |  | from types import GeneratorType | 
					
						
							| 
									
										
										
										
											2024-07-04 18:18:26 +08:00
										 |  |  | from typing import Any, Literal, Optional, Union | 
					
						
							| 
									
										
										
										
											2024-01-02 23:42:00 +08:00
										 |  |  | from uuid import UUID | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from pydantic import BaseModel | 
					
						
							|  |  |  | from pydantic.networks import AnyUrl, NameEmail | 
					
						
							|  |  |  | from pydantic.types import SecretBytes, SecretStr | 
					
						
							| 
									
										
										
										
											2024-07-04 18:18:26 +08:00
										 |  |  | from pydantic_core import Url | 
					
						
							| 
									
										
										
										
											2024-06-14 01:05:37 +08:00
										 |  |  | from pydantic_extra_types.color import Color | 
					
						
							| 
									
										
										
										
											2024-01-02 23:42:00 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-10 17:00:20 +08:00
										 |  |  | def _model_dump(model: BaseModel, mode: Literal["json", "python"] = "json", **kwargs: Any) -> Any: | 
					
						
							| 
									
										
										
										
											2024-07-04 18:18:26 +08:00
										 |  |  |     return model.model_dump(mode=mode, **kwargs) | 
					
						
							| 
									
										
										
										
											2024-01-02 23:42:00 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-10 17:00:20 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-02 23:42:00 +08:00
										 |  |  | # Taken from Pydantic v1 as is | 
					
						
							|  |  |  | def isoformat(o: Union[datetime.date, datetime.time]) -> str: | 
					
						
							|  |  |  |     return o.isoformat() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # Taken from Pydantic v1 as is | 
					
						
							|  |  |  | # TODO: pv2 should this return strings instead? | 
					
						
							|  |  |  | def decimal_encoder(dec_value: Decimal) -> Union[int, float]: | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Encodes a Decimal as int of there's no exponent, otherwise float | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     This is useful when we use ConstrainedDecimal to represent Numeric(x,0) | 
					
						
							|  |  |  |     where a integer (but not int typed) is used. Encoding this as a float | 
					
						
							|  |  |  |     results in failed round-tripping between encode and parse. | 
					
						
							|  |  |  |     Our Id type is a prime example of this. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     >>> decimal_encoder(Decimal("1.0")) | 
					
						
							|  |  |  |     1.0 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     >>> decimal_encoder(Decimal("1")) | 
					
						
							|  |  |  |     1 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     if dec_value.as_tuple().exponent >= 0:  # type: ignore[operator] | 
					
						
							|  |  |  |         return int(dec_value) | 
					
						
							|  |  |  |     else: | 
					
						
							|  |  |  |         return float(dec_value) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-02-09 15:21:33 +08:00
										 |  |  | ENCODERS_BY_TYPE: dict[type[Any], Callable[[Any], Any]] = { | 
					
						
							| 
									
										
										
										
											2024-01-02 23:42:00 +08:00
										 |  |  |     bytes: lambda o: o.decode(), | 
					
						
							|  |  |  |     Color: str, | 
					
						
							|  |  |  |     datetime.date: isoformat, | 
					
						
							|  |  |  |     datetime.datetime: isoformat, | 
					
						
							|  |  |  |     datetime.time: isoformat, | 
					
						
							|  |  |  |     datetime.timedelta: lambda td: td.total_seconds(), | 
					
						
							|  |  |  |     Decimal: decimal_encoder, | 
					
						
							|  |  |  |     Enum: lambda o: o.value, | 
					
						
							|  |  |  |     frozenset: list, | 
					
						
							|  |  |  |     deque: list, | 
					
						
							|  |  |  |     GeneratorType: list, | 
					
						
							|  |  |  |     IPv4Address: str, | 
					
						
							|  |  |  |     IPv4Interface: str, | 
					
						
							|  |  |  |     IPv4Network: str, | 
					
						
							|  |  |  |     IPv6Address: str, | 
					
						
							|  |  |  |     IPv6Interface: str, | 
					
						
							|  |  |  |     IPv6Network: str, | 
					
						
							|  |  |  |     NameEmail: str, | 
					
						
							|  |  |  |     Path: str, | 
					
						
							|  |  |  |     Pattern: lambda o: o.pattern, | 
					
						
							|  |  |  |     SecretBytes: str, | 
					
						
							|  |  |  |     SecretStr: str, | 
					
						
							|  |  |  |     set: list, | 
					
						
							|  |  |  |     UUID: str, | 
					
						
							|  |  |  |     Url: str, | 
					
						
							|  |  |  |     AnyUrl: str, | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def generate_encoders_by_class_tuples( | 
					
						
							| 
									
										
										
										
											2024-09-10 17:00:20 +08:00
										 |  |  |     type_encoder_map: dict[Any, Callable[[Any], Any]], | 
					
						
							| 
									
										
										
										
											2024-02-09 15:21:33 +08:00
										 |  |  | ) -> dict[Callable[[Any], Any], tuple[Any, ...]]: | 
					
						
							| 
									
										
										
										
											2024-09-10 17:00:20 +08:00
										 |  |  |     encoders_by_class_tuples: dict[Callable[[Any], Any], tuple[Any, ...]] = defaultdict(tuple) | 
					
						
							| 
									
										
										
										
											2024-01-02 23:42:00 +08:00
										 |  |  |     for type_, encoder in type_encoder_map.items(): | 
					
						
							|  |  |  |         encoders_by_class_tuples[encoder] += (type_,) | 
					
						
							|  |  |  |     return encoders_by_class_tuples | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | encoders_by_class_tuples = generate_encoders_by_class_tuples(ENCODERS_BY_TYPE) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def jsonable_encoder( | 
					
						
							|  |  |  |     obj: Any, | 
					
						
							|  |  |  |     by_alias: bool = True, | 
					
						
							|  |  |  |     exclude_unset: bool = False, | 
					
						
							|  |  |  |     exclude_defaults: bool = False, | 
					
						
							|  |  |  |     exclude_none: bool = False, | 
					
						
							| 
									
										
										
										
											2024-02-09 15:21:33 +08:00
										 |  |  |     custom_encoder: Optional[dict[Any, Callable[[Any], Any]]] = None, | 
					
						
							| 
									
										
										
										
											2024-01-02 23:42:00 +08:00
										 |  |  |     sqlalchemy_safe: bool = True, | 
					
						
							|  |  |  | ) -> Any: | 
					
						
							|  |  |  |     custom_encoder = custom_encoder or {} | 
					
						
							|  |  |  |     if custom_encoder: | 
					
						
							|  |  |  |         if type(obj) in custom_encoder: | 
					
						
							|  |  |  |             return custom_encoder[type(obj)](obj) | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             for encoder_type, encoder_instance in custom_encoder.items(): | 
					
						
							|  |  |  |                 if isinstance(obj, encoder_type): | 
					
						
							|  |  |  |                     return encoder_instance(obj) | 
					
						
							|  |  |  |     if isinstance(obj, BaseModel): | 
					
						
							|  |  |  |         obj_dict = _model_dump( | 
					
						
							|  |  |  |             obj, | 
					
						
							|  |  |  |             mode="json", | 
					
						
							|  |  |  |             include=None, | 
					
						
							|  |  |  |             exclude=None, | 
					
						
							|  |  |  |             by_alias=by_alias, | 
					
						
							|  |  |  |             exclude_unset=exclude_unset, | 
					
						
							|  |  |  |             exclude_none=exclude_none, | 
					
						
							|  |  |  |             exclude_defaults=exclude_defaults, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         if "__root__" in obj_dict: | 
					
						
							|  |  |  |             obj_dict = obj_dict["__root__"] | 
					
						
							|  |  |  |         return jsonable_encoder( | 
					
						
							|  |  |  |             obj_dict, | 
					
						
							|  |  |  |             exclude_none=exclude_none, | 
					
						
							|  |  |  |             exclude_defaults=exclude_defaults, | 
					
						
							|  |  |  |             sqlalchemy_safe=sqlalchemy_safe, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |     if dataclasses.is_dataclass(obj): | 
					
						
							| 
									
										
										
										
											2024-12-24 18:38:51 +08:00
										 |  |  |         # FIXME: mypy error, try to fix it instead of using type: ignore | 
					
						
							|  |  |  |         obj_dict = dataclasses.asdict(obj)  # type: ignore | 
					
						
							| 
									
										
										
										
											2024-01-02 23:42:00 +08:00
										 |  |  |         return jsonable_encoder( | 
					
						
							|  |  |  |             obj_dict, | 
					
						
							|  |  |  |             by_alias=by_alias, | 
					
						
							|  |  |  |             exclude_unset=exclude_unset, | 
					
						
							|  |  |  |             exclude_defaults=exclude_defaults, | 
					
						
							|  |  |  |             exclude_none=exclude_none, | 
					
						
							|  |  |  |             custom_encoder=custom_encoder, | 
					
						
							|  |  |  |             sqlalchemy_safe=sqlalchemy_safe, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |     if isinstance(obj, Enum): | 
					
						
							|  |  |  |         return obj.value | 
					
						
							|  |  |  |     if isinstance(obj, PurePath): | 
					
						
							|  |  |  |         return str(obj) | 
					
						
							| 
									
										
										
										
											2024-02-09 15:21:33 +08:00
										 |  |  |     if isinstance(obj, str | int | float | type(None)): | 
					
						
							| 
									
										
										
										
											2024-01-02 23:42:00 +08:00
										 |  |  |         return obj | 
					
						
							| 
									
										
										
										
											2024-01-17 22:39:47 +08:00
										 |  |  |     if isinstance(obj, Decimal): | 
					
						
							| 
									
										
										
										
											2024-09-10 17:00:20 +08:00
										 |  |  |         return format(obj, "f") | 
					
						
							| 
									
										
										
										
											2024-01-02 23:42:00 +08:00
										 |  |  |     if isinstance(obj, dict): | 
					
						
							|  |  |  |         encoded_dict = {} | 
					
						
							|  |  |  |         allowed_keys = set(obj.keys()) | 
					
						
							|  |  |  |         for key, value in obj.items(): | 
					
						
							|  |  |  |             if ( | 
					
						
							| 
									
										
										
										
											2024-09-10 17:00:20 +08:00
										 |  |  |                 (not sqlalchemy_safe or (not isinstance(key, str)) or (not key.startswith("_sa"))) | 
					
						
							| 
									
										
										
										
											2024-01-02 23:42:00 +08:00
										 |  |  |                 and (value is not None or not exclude_none) | 
					
						
							|  |  |  |                 and key in allowed_keys | 
					
						
							|  |  |  |             ): | 
					
						
							|  |  |  |                 encoded_key = jsonable_encoder( | 
					
						
							|  |  |  |                     key, | 
					
						
							|  |  |  |                     by_alias=by_alias, | 
					
						
							|  |  |  |                     exclude_unset=exclude_unset, | 
					
						
							|  |  |  |                     exclude_none=exclude_none, | 
					
						
							|  |  |  |                     custom_encoder=custom_encoder, | 
					
						
							|  |  |  |                     sqlalchemy_safe=sqlalchemy_safe, | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  |                 encoded_value = jsonable_encoder( | 
					
						
							|  |  |  |                     value, | 
					
						
							|  |  |  |                     by_alias=by_alias, | 
					
						
							|  |  |  |                     exclude_unset=exclude_unset, | 
					
						
							|  |  |  |                     exclude_none=exclude_none, | 
					
						
							|  |  |  |                     custom_encoder=custom_encoder, | 
					
						
							|  |  |  |                     sqlalchemy_safe=sqlalchemy_safe, | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  |                 encoded_dict[encoded_key] = encoded_value | 
					
						
							|  |  |  |         return encoded_dict | 
					
						
							| 
									
										
										
										
											2024-02-09 15:21:33 +08:00
										 |  |  |     if isinstance(obj, list | set | frozenset | GeneratorType | tuple | deque): | 
					
						
							| 
									
										
										
										
											2024-01-02 23:42:00 +08:00
										 |  |  |         encoded_list = [] | 
					
						
							|  |  |  |         for item in obj: | 
					
						
							|  |  |  |             encoded_list.append( | 
					
						
							|  |  |  |                 jsonable_encoder( | 
					
						
							|  |  |  |                     item, | 
					
						
							|  |  |  |                     by_alias=by_alias, | 
					
						
							|  |  |  |                     exclude_unset=exclude_unset, | 
					
						
							|  |  |  |                     exclude_defaults=exclude_defaults, | 
					
						
							|  |  |  |                     exclude_none=exclude_none, | 
					
						
							|  |  |  |                     custom_encoder=custom_encoder, | 
					
						
							|  |  |  |                     sqlalchemy_safe=sqlalchemy_safe, | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |         return encoded_list | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if type(obj) in ENCODERS_BY_TYPE: | 
					
						
							|  |  |  |         return ENCODERS_BY_TYPE[type(obj)](obj) | 
					
						
							|  |  |  |     for encoder, classes_tuple in encoders_by_class_tuples.items(): | 
					
						
							|  |  |  |         if isinstance(obj, classes_tuple): | 
					
						
							|  |  |  |             return encoder(obj) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     try: | 
					
						
							|  |  |  |         data = dict(obj) | 
					
						
							|  |  |  |     except Exception as e: | 
					
						
							| 
									
										
										
										
											2024-02-09 15:21:33 +08:00
										 |  |  |         errors: list[Exception] = [] | 
					
						
							| 
									
										
										
										
											2024-01-02 23:42:00 +08:00
										 |  |  |         errors.append(e) | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             data = vars(obj) | 
					
						
							|  |  |  |         except Exception as e: | 
					
						
							|  |  |  |             errors.append(e) | 
					
						
							|  |  |  |             raise ValueError(errors) from e | 
					
						
							|  |  |  |     return jsonable_encoder( | 
					
						
							|  |  |  |         data, | 
					
						
							|  |  |  |         by_alias=by_alias, | 
					
						
							|  |  |  |         exclude_unset=exclude_unset, | 
					
						
							|  |  |  |         exclude_defaults=exclude_defaults, | 
					
						
							|  |  |  |         exclude_none=exclude_none, | 
					
						
							|  |  |  |         custom_encoder=custom_encoder, | 
					
						
							|  |  |  |         sqlalchemy_safe=sqlalchemy_safe, | 
					
						
							|  |  |  |     ) |