use appflowy_plugin::error::PluginError; use flowy_ai_pub::cloud::QuestionStreamValue; use flowy_error::FlowyError; use futures::{ready, Stream}; use pin_project::pin_project; use serde_json::Value; use std::pin::Pin; use std::task::{Context, Poll}; use tracing::error; pub const STEAM_METADATA_KEY: &str = "0"; pub const STEAM_ANSWER_KEY: &str = "1"; #[pin_project] pub struct QuestionStream { stream: Pin> + Send>>, buffer: Vec, } impl QuestionStream { pub fn new(stream: S) -> Self where S: Stream> + Send + 'static, { QuestionStream { stream: Box::pin(stream), buffer: Vec::new(), } } } impl Stream for QuestionStream { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); match ready!(this.stream.as_mut().poll_next(cx)) { Some(Ok(value)) => match value { Value::Object(mut value) => { if let Some(metadata) = value.remove(STEAM_METADATA_KEY) { return Poll::Ready(Some(Ok(QuestionStreamValue::Metadata { value: metadata }))); } if let Some(answer) = value .remove(STEAM_ANSWER_KEY) .and_then(|s| s.as_str().map(ToString::to_string)) { return Poll::Ready(Some(Ok(QuestionStreamValue::Answer { value: answer }))); } error!("Invalid streaming value: {:?}", value); Poll::Ready(None) }, _ => { error!("Unexpected JSON value type: {:?}", value); Poll::Ready(None) }, }, Some(Err(err)) => Poll::Ready(Some(Err(FlowyError::local_ai().with_context(err)))), None => Poll::Ready(None), } } }