226 lines
7.3 KiB
Rust
Raw Normal View History

2021-06-25 23:53:13 +08:00
use crate::{
data::container::DataContainer,
error::SystemError,
module::ModuleData,
request::FromRequest,
response::Responder,
service::{BoxService, Handler, Service, ServiceFactory, ServiceRequest, ServiceResponse},
};
2021-06-26 23:52:03 +08:00
use crate::{
2021-06-27 22:07:33 +08:00
error::InternalError,
2021-06-27 15:11:41 +08:00
request::{payload::Payload, EventRequest},
2021-06-28 14:27:16 +08:00
response::EventResponse,
2021-06-26 23:52:03 +08:00
service::{factory, BoxServiceFactory, HandlerService},
};
2021-06-25 23:53:13 +08:00
use futures_core::{future::LocalBoxFuture, ready};
2021-06-26 23:52:03 +08:00
use pin_project::pin_project;
2021-06-25 23:53:13 +08:00
use std::{
collections::HashMap,
fmt::{Debug, Display},
2021-06-25 23:53:13 +08:00
future::Future,
hash::Hash,
2021-06-25 23:53:13 +08:00
pin::Pin,
2021-06-27 22:07:33 +08:00
rc::Rc,
2021-06-25 23:53:13 +08:00
task::{Context, Poll},
};
2021-06-27 15:11:41 +08:00
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
2021-06-25 23:53:13 +08:00
#[derive(PartialEq, Eq, Hash, Debug, Clone)]
pub struct Event(String);
impl<T: Display + Eq + Hash + Debug + Clone> std::convert::From<T> for Event {
fn from(t: T) -> Self { Event(format!("{}", t)) }
}
2021-06-27 15:11:41 +08:00
pub type EventServiceFactory = BoxServiceFactory<(), ServiceRequest, ServiceResponse, SystemError>;
2021-06-25 23:53:13 +08:00
pub struct Module {
name: String,
data: DataContainer,
2021-06-27 22:07:33 +08:00
service_map: Rc<HashMap<Event, EventServiceFactory>>,
2021-06-27 15:11:41 +08:00
req_tx: UnboundedSender<EventRequest>,
req_rx: UnboundedReceiver<EventRequest>,
2021-06-25 23:53:13 +08:00
}
impl Module {
2021-06-28 14:27:16 +08:00
pub fn new() -> Self {
2021-06-27 15:11:41 +08:00
let (req_tx, req_rx) = unbounded_channel::<EventRequest>();
2021-06-25 23:53:13 +08:00
Self {
name: "".to_owned(),
data: DataContainer::new(),
2021-06-27 22:07:33 +08:00
service_map: Rc::new(HashMap::new()),
2021-06-26 23:52:03 +08:00
req_tx,
req_rx,
2021-06-25 23:53:13 +08:00
}
}
pub fn name(mut self, s: &str) -> Self {
self.name = s.to_owned();
self
}
pub fn data<D: 'static>(mut self, data: D) -> Self {
2021-06-27 15:11:41 +08:00
self.data.insert(ModuleData::new(data));
2021-06-25 23:53:13 +08:00
self
}
pub fn event<E, H, T, R>(mut self, event: E, handler: H) -> Self
2021-06-25 23:53:13 +08:00
where
H: Handler<T, R>,
T: FromRequest + 'static,
R: Future + 'static,
R::Output: Responder + 'static,
E: Eq + Hash + Debug + Clone + Display,
2021-06-25 23:53:13 +08:00
{
let event: Event = event.into();
2021-06-27 15:11:41 +08:00
if self.service_map.contains_key(&event) {
log::error!("Duplicate Event: {:?}", &event);
2021-06-27 15:11:41 +08:00
}
2021-06-27 22:07:33 +08:00
Rc::get_mut(&mut self.service_map)
.unwrap()
.insert(event, factory(HandlerService::new(handler)));
2021-06-25 23:53:13 +08:00
self
}
2021-06-26 23:52:03 +08:00
2021-06-27 15:11:41 +08:00
pub fn req_tx(&self) -> UnboundedSender<EventRequest> { self.req_tx.clone() }
2021-06-26 23:52:03 +08:00
2021-06-27 15:11:41 +08:00
pub fn handle(&self, request: EventRequest) {
2021-06-27 22:07:33 +08:00
log::debug!("Module: {} receive request: {:?}", self.name, request);
2021-06-26 23:52:03 +08:00
match self.req_tx.send(request) {
Ok(_) => {},
Err(e) => {
2021-06-27 15:11:41 +08:00
log::error!("Module: {} with error: {:?}", self.name, e);
2021-06-26 23:52:03 +08:00
},
}
}
2021-06-27 01:24:00 +08:00
2021-06-27 15:11:41 +08:00
pub fn forward_map(&self) -> HashMap<Event, UnboundedSender<EventRequest>> {
2021-06-27 01:24:00 +08:00
self.service_map
.keys()
.map(|key| (key.clone(), self.req_tx()))
.collect::<HashMap<_, _>>()
}
2021-06-28 14:27:16 +08:00
pub fn events(&self) -> Vec<Event> { self.service_map.keys().map(|key| key.clone()).collect::<Vec<_>>() }
2021-06-25 23:53:13 +08:00
}
impl Future for Module {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
2021-06-26 23:52:03 +08:00
match ready!(Pin::new(&mut self.req_rx).poll_recv(cx)) {
2021-06-25 23:53:13 +08:00
None => return Poll::Ready(()),
2021-06-27 22:07:33 +08:00
Some(request) => {
let mut service = self.new_service(request.get_id().to_string());
if let Ok(service) = ready!(Pin::new(&mut service).poll(cx)) {
log::trace!("Spawn module service for request {}", request.get_id());
2021-06-26 23:52:03 +08:00
tokio::task::spawn_local(async move {
2021-06-27 22:07:33 +08:00
let _ = service.call(request).await;
2021-06-25 23:53:13 +08:00
});
2021-06-27 22:07:33 +08:00
}
2021-06-25 23:53:13 +08:00
},
}
}
}
}
2021-06-27 22:07:33 +08:00
impl ServiceFactory<EventRequest> for Module {
2021-06-28 14:27:16 +08:00
type Response = EventResponse;
2021-06-27 22:07:33 +08:00
type Error = SystemError;
type Service = BoxService<EventRequest, Self::Response, Self::Error>;
type Config = String;
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::Error>>;
fn new_service(&self, cfg: Self::Config) -> Self::Future {
log::trace!("Create module service for request {}", cfg);
let service_map = self.service_map.clone();
Box::pin(async move {
2021-06-28 14:27:16 +08:00
let service = ModuleService { service_map };
2021-06-27 22:07:33 +08:00
let module_service = Box::new(service) as Self::Service;
Ok(module_service)
})
}
}
pub struct ModuleService {
service_map: Rc<HashMap<Event, EventServiceFactory>>,
}
impl Service<EventRequest> for ModuleService {
2021-06-28 14:27:16 +08:00
type Response = EventResponse;
2021-06-27 22:07:33 +08:00
type Error = SystemError;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
fn call(&self, request: EventRequest) -> Self::Future {
log::trace!("Call module service for request {}", request.get_id());
match self.service_map.get(request.get_event()) {
Some(factory) => {
let fut = ModuleServiceFuture {
request,
fut: factory.new_service(()),
};
2021-06-28 14:27:16 +08:00
Box::pin(async move { Ok(fut.await.unwrap_or_else(|e| e.into())) })
2021-06-27 22:07:33 +08:00
},
None => Box::pin(async { Err(InternalError::new("".to_string()).into()) }),
}
}
}
2021-06-26 23:52:03 +08:00
type BoxModuleService = BoxService<ServiceRequest, ServiceResponse, SystemError>;
2021-06-27 22:07:33 +08:00
2021-06-26 23:52:03 +08:00
#[pin_project]
pub struct ModuleServiceFuture {
2021-06-27 15:11:41 +08:00
request: EventRequest,
2021-06-25 23:53:13 +08:00
#[pin]
2021-06-26 23:52:03 +08:00
fut: LocalBoxFuture<'static, Result<BoxModuleService, SystemError>>,
2021-06-25 23:53:13 +08:00
}
2021-06-26 23:52:03 +08:00
impl Future for ModuleServiceFuture {
2021-06-27 15:11:41 +08:00
type Output = Result<EventResponse, SystemError>;
2021-06-25 23:53:13 +08:00
2021-06-26 23:52:03 +08:00
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
let service = ready!(self.as_mut().project().fut.poll(cx))?;
let req = ServiceRequest::new(self.as_mut().request.clone(), Payload::None);
2021-06-27 22:07:33 +08:00
log::debug!("Call service to handle request {:?}", self.request);
2021-06-26 23:52:03 +08:00
let (_, resp) = ready!(Pin::new(&mut service.call(req)).poll(cx))?.into_parts();
return Poll::Ready(Ok(resp));
}
}
2021-06-25 23:53:13 +08:00
}
2021-06-28 14:27:16 +08:00
// #[cfg(test)]
// mod tests {
// use super::*;
// use crate::rt::Runtime;
// use futures_util::{future, pin_mut};
// use tokio::sync::mpsc::unbounded_channel;
// pub async fn hello_service() -> String { "hello".to_string() }
// #[test]
// fn test() {
// let runtime = Runtime::new().unwrap();
// runtime.block_on(async {
// let (sys_tx, mut sys_rx) = unbounded_channel::<SystemCommand>();
// let event = "hello".to_string();
// let module = Module::new(sys_tx).event(event.clone(),
// hello_service); let req_tx = module.req_tx();
// let event = async move {
// let request = EventRequest::new(event.clone());
// req_tx.send(request).unwrap();
//
// match sys_rx.recv().await {
// Some(cmd) => {
// log::info!("{:?}", cmd);
// },
// None => panic!(""),
// }
// };
//
// pin_mut!(module, event);
// future::select(module, event).await;
// });
// }
// }