use std::{ collections::HashMap, fmt::{Debug, Display}, future::Future, hash::Hash, pin::Pin, rc::Rc, task::{Context, Poll}, }; use futures_core::{future::LocalBoxFuture, ready}; use pin_project::pin_project; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use crate::{ error::{InternalError, SystemError}, module::{container::DataContainer, ModuleData}, request::{payload::Payload, EventRequest, FromRequest}, response::{EventResponse, Responder}, service::{ factory, BoxService, BoxServiceFactory, Handler, HandlerService, Service, ServiceFactory, ServiceRequest, ServiceResponse, }, }; #[derive(PartialEq, Eq, Hash, Debug, Clone)] pub struct Event(String); impl std::convert::From for Event { fn from(t: T) -> Self { Event(format!("{}", t)) } } pub type EventServiceFactory = BoxServiceFactory<(), ServiceRequest, ServiceResponse, SystemError>; pub struct Module { name: String, data: DataContainer, service_map: Rc>, req_tx: UnboundedSender, req_rx: UnboundedReceiver, } impl Module { pub fn new() -> Self { let (req_tx, req_rx) = unbounded_channel::(); Self { name: "".to_owned(), data: DataContainer::new(), service_map: Rc::new(HashMap::new()), req_tx, req_rx, } } pub fn name(mut self, s: &str) -> Self { self.name = s.to_owned(); self } pub fn data(mut self, data: D) -> Self { self.data.insert(ModuleData::new(data)); self } pub fn event(mut self, event: E, handler: H) -> Self where H: Handler, T: FromRequest + 'static, R: Future + 'static, R::Output: Responder + 'static, E: Eq + Hash + Debug + Clone + Display, { let event: Event = event.into(); if self.service_map.contains_key(&event) { log::error!("Duplicate Event: {:?}", &event); } Rc::get_mut(&mut self.service_map) .unwrap() .insert(event, factory(HandlerService::new(handler))); self } pub fn forward_map(&self) -> HashMap> { self.service_map .keys() .map(|key| (key.clone(), self.req_tx.clone())) .collect::>() } pub fn events(&self) -> Vec { self.service_map.keys().map(|key| key.clone()).collect::>() } } impl Future for Module { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { match ready!(Pin::new(&mut self.req_rx).poll_recv(cx)) { None => return Poll::Ready(()), Some(request) => { let mut service = self.new_service(()); if let Ok(service) = ready!(Pin::new(&mut service).poll(cx)) { log::trace!("Spawn module service for request {}", request.id()); tokio::task::spawn_local(async move { let _ = service.call(request).await; }); } }, } } } } #[derive(Debug)] pub struct ModuleRequest { inner: EventRequest, payload: Payload, } impl ModuleRequest { pub fn new(event: E) -> Self where E: Into, { Self { inner: EventRequest::new(event), payload: Payload::None, } } pub fn payload(mut self, payload: Payload) -> Self { self.payload = payload; self } pub(crate) fn into_parts(self) -> (EventRequest, Payload) { (self.inner, self.payload) } pub(crate) fn into_service_request(self) -> ServiceRequest { ServiceRequest::new(self.inner, self.payload) } pub(crate) fn id(&self) -> &str { &self.inner.id } pub(crate) fn event(&self) -> &Event { &self.inner.event } } impl ServiceFactory for Module { type Response = EventResponse; type Error = SystemError; type Service = BoxService; type Context = (); type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, cfg: Self::Context) -> Self::Future { let service_map = self.service_map.clone(); Box::pin(async move { let service = ModuleService { service_map }; let module_service = Box::new(service) as Self::Service; Ok(module_service) }) } } pub struct ModuleService { service_map: Rc>, } impl Service for ModuleService { type Response = EventResponse; type Error = SystemError; type Future = LocalBoxFuture<'static, Result>; fn call(&self, request: ModuleRequest) -> Self::Future { log::trace!("Call module service for request {}", &request.id()); match self.service_map.get(&request.event()) { Some(factory) => { let service_fut = factory.new_service(()); let fut = ModuleServiceFuture { fut: Box::pin(async { let service = service_fut.await?; let request = request.into_service_request(); service.call(request).await }), }; Box::pin(async move { Ok(fut.await.unwrap_or_else(|e| e.into())) }) }, None => Box::pin(async { Err(InternalError::new("".to_string()).into()) }), } } } // type BoxModuleService = BoxService; #[pin_project] pub struct ModuleServiceFuture { #[pin] fut: LocalBoxFuture<'static, Result>, } impl Future for ModuleServiceFuture { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { let (_, response) = ready!(self.as_mut().project().fut.poll(cx))?.into_parts(); return Poll::Ready(Ok(response)); } } } // #[cfg(test)] // mod tests { // use super::*; // use crate::rt::Runtime; // use futures_util::{future, pin_mut}; // use tokio::sync::mpsc::unbounded_channel; // pub async fn hello_service() -> String { "hello".to_string() } // #[test] // fn test() { // let runtime = Runtime::new().unwrap(); // runtime.block_on(async { // let (sys_tx, mut sys_rx) = unbounded_channel::(); // let event = "hello".to_string(); // let module = Module::new(sys_tx).event(event.clone(), // hello_service); let req_tx = module.req_tx(); // let event = async move { // let request = EventRequest::new(event.clone()); // req_tx.send(request).unwrap(); // // match sys_rx.recv().await { // Some(cmd) => { // log::info!("{:?}", cmd); // }, // None => panic!(""), // } // }; // // pin_mut!(module, event); // future::select(module, event).await; // }); // } // }