use appflowy_plugin::error::PluginError; use bytes::Bytes; use flowy_ai_pub::cloud::QuestionStreamValue; use flowy_error::FlowyError; use futures::{ready, Stream}; use pin_project::pin_project; use std::pin::Pin; use std::task::{Context, Poll}; #[pin_project] pub struct LocalAIStreamAdaptor { stream: Pin> + Send>>, buffer: Vec, } impl LocalAIStreamAdaptor { pub fn new(stream: S) -> Self where S: Stream> + Send + 'static, { LocalAIStreamAdaptor { stream: Box::pin(stream), buffer: Vec::new(), } } } impl Stream for LocalAIStreamAdaptor { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); return match ready!(this.stream.as_mut().poll_next(cx)) { Some(Ok(bytes)) => match String::from_utf8(bytes.to_vec()) { Ok(s) => Poll::Ready(Some(Ok(QuestionStreamValue::Answer { value: s }))), Err(err) => Poll::Ready(Some(Err(FlowyError::internal().with_context(err)))), }, Some(Err(err)) => Poll::Ready(Some(Err(FlowyError::local_ai().with_context(err)))), None => Poll::Ready(None), }; } }