mirror of
https://github.com/langgenius/dify.git
synced 2025-07-16 21:58:49 +00:00
30 lines
1.1 KiB
Python
30 lines
1.1 KiB
Python
![]() |
from collections.abc import Generator, Mapping
|
||
|
from typing import Generic, Optional, TypeVar
|
||
|
|
||
|
from pydantic import BaseModel
|
||
|
|
||
|
|
||
|
class BaseBackwardsInvocation:
|
||
|
@classmethod
|
||
|
def convert_to_event_stream(cls, response: Generator[BaseModel | Mapping | str, None, None] | BaseModel | Mapping):
|
||
|
if isinstance(response, Generator):
|
||
|
try:
|
||
|
for chunk in response:
|
||
|
if isinstance(chunk, BaseModel | dict):
|
||
|
yield BaseBackwardsInvocationResponse(data=chunk).model_dump_json().encode() + b"\n\n"
|
||
|
elif isinstance(chunk, str):
|
||
|
yield f"event: {chunk}\n\n".encode()
|
||
|
except Exception as e:
|
||
|
error_message = BaseBackwardsInvocationResponse(error=str(e)).model_dump_json()
|
||
|
yield f"{error_message}\n\n".encode()
|
||
|
else:
|
||
|
yield BaseBackwardsInvocationResponse(data=response).model_dump_json().encode() + b"\n\n"
|
||
|
|
||
|
|
||
|
T = TypeVar("T", bound=dict | Mapping | str | bool | int | BaseModel)
|
||
|
|
||
|
|
||
|
class BaseBackwardsInvocationResponse(BaseModel, Generic[T]):
|
||
|
data: Optional[T] = None
|
||
|
error: str = ""
|