| 
									
										
										
										
											2022-11-13 22:23:57 +08:00
										 |  |  | use anyhow::Error;
 | 
					
						
							| 
									
										
										
										
											2023-02-13 09:29:49 +08:00
										 |  |  | use flowy_task::{
 | 
					
						
							|  |  |  |   Task, TaskContent, TaskDispatcher, TaskHandler, TaskId, TaskResult, TaskRunner, TaskState,
 | 
					
						
							|  |  |  | };
 | 
					
						
							| 
									
										
										
										
											2022-11-13 22:23:57 +08:00
										 |  |  | use futures::stream::FuturesUnordered;
 | 
					
						
							|  |  |  | use futures::StreamExt;
 | 
					
						
							| 
									
										
										
										
											2022-12-09 09:19:47 +08:00
										 |  |  | use lib_infra::async_trait::async_trait;
 | 
					
						
							| 
									
										
										
										
											2022-11-13 22:23:57 +08:00
										 |  |  | use lib_infra::future::BoxResultFuture;
 | 
					
						
							|  |  |  | use lib_infra::ref_map::RefCountValue;
 | 
					
						
							|  |  |  | use rand::Rng;
 | 
					
						
							|  |  |  | use std::sync::Arc;
 | 
					
						
							|  |  |  | use std::time::Duration;
 | 
					
						
							|  |  |  | use tokio::sync::oneshot::Receiver;
 | 
					
						
							|  |  |  | use tokio::sync::RwLock;
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | pub enum SearchScript {
 | 
					
						
							| 
									
										
										
										
											2023-02-13 09:29:49 +08:00
										 |  |  |   AddTask {
 | 
					
						
							|  |  |  |     task: Task,
 | 
					
						
							|  |  |  |   },
 | 
					
						
							|  |  |  |   AddTasks {
 | 
					
						
							|  |  |  |     tasks: Vec<Task>,
 | 
					
						
							|  |  |  |   },
 | 
					
						
							|  |  |  |   #[allow(dead_code)]
 | 
					
						
							|  |  |  |   Wait {
 | 
					
						
							|  |  |  |     millisecond: u64,
 | 
					
						
							|  |  |  |   },
 | 
					
						
							|  |  |  |   CancelTask {
 | 
					
						
							|  |  |  |     task_id: TaskId,
 | 
					
						
							|  |  |  |   },
 | 
					
						
							|  |  |  |   UnregisterHandler {
 | 
					
						
							|  |  |  |     handler_id: String,
 | 
					
						
							|  |  |  |   },
 | 
					
						
							|  |  |  |   AssertTaskStatus {
 | 
					
						
							|  |  |  |     task_id: TaskId,
 | 
					
						
							|  |  |  |     expected_status: TaskState,
 | 
					
						
							|  |  |  |   },
 | 
					
						
							|  |  |  |   AssertExecuteOrder {
 | 
					
						
							|  |  |  |     execute_order: Vec<u32>,
 | 
					
						
							|  |  |  |     rets: Vec<Receiver<TaskResult>>,
 | 
					
						
							|  |  |  |   },
 | 
					
						
							| 
									
										
										
										
											2022-11-13 22:23:57 +08:00
										 |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | pub struct SearchTest {
 | 
					
						
							| 
									
										
										
										
											2023-02-13 09:29:49 +08:00
										 |  |  |   scheduler: Arc<RwLock<TaskDispatcher>>,
 | 
					
						
							| 
									
										
										
										
											2022-11-13 22:23:57 +08:00
										 |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | impl SearchTest {
 | 
					
						
							| 
									
										
										
										
											2023-02-13 09:29:49 +08:00
										 |  |  |   pub async fn new() -> Self {
 | 
					
						
							|  |  |  |     let duration = Duration::from_millis(1000);
 | 
					
						
							|  |  |  |     let mut scheduler = TaskDispatcher::new(duration);
 | 
					
						
							|  |  |  |     scheduler.register_handler(Arc::new(MockTextTaskHandler()));
 | 
					
						
							|  |  |  |     scheduler.register_handler(Arc::new(MockBlobTaskHandler()));
 | 
					
						
							|  |  |  |     scheduler.register_handler(Arc::new(MockTimeoutTaskHandler()));
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     let scheduler = Arc::new(RwLock::new(scheduler));
 | 
					
						
							|  |  |  |     tokio::spawn(TaskRunner::run(scheduler.clone()));
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     Self { scheduler }
 | 
					
						
							|  |  |  |   }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   pub async fn next_task_id(&self) -> TaskId {
 | 
					
						
							|  |  |  |     self.scheduler.read().await.next_task_id()
 | 
					
						
							|  |  |  |   }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   pub async fn run_scripts(&self, scripts: Vec<SearchScript>) {
 | 
					
						
							|  |  |  |     for script in scripts {
 | 
					
						
							|  |  |  |       self.run_script(script).await;
 | 
					
						
							| 
									
										
										
										
											2022-11-13 22:23:57 +08:00
										 |  |  |     }
 | 
					
						
							| 
									
										
										
										
											2023-02-13 09:29:49 +08:00
										 |  |  |   }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   pub async fn run_script(&self, script: SearchScript) {
 | 
					
						
							|  |  |  |     match script {
 | 
					
						
							|  |  |  |       SearchScript::AddTask { task } => {
 | 
					
						
							|  |  |  |         self.scheduler.write().await.add_task(task);
 | 
					
						
							|  |  |  |       },
 | 
					
						
							|  |  |  |       SearchScript::CancelTask { task_id } => {
 | 
					
						
							|  |  |  |         self.scheduler.write().await.cancel_task(task_id);
 | 
					
						
							|  |  |  |       },
 | 
					
						
							|  |  |  |       SearchScript::AddTasks { tasks } => {
 | 
					
						
							|  |  |  |         let mut scheduler = self.scheduler.write().await;
 | 
					
						
							|  |  |  |         for task in tasks {
 | 
					
						
							|  |  |  |           scheduler.add_task(task);
 | 
					
						
							| 
									
										
										
										
											2022-11-13 22:23:57 +08:00
										 |  |  |         }
 | 
					
						
							| 
									
										
										
										
											2023-02-13 09:29:49 +08:00
										 |  |  |       },
 | 
					
						
							|  |  |  |       SearchScript::Wait { millisecond } => {
 | 
					
						
							|  |  |  |         tokio::time::sleep(Duration::from_millis(millisecond)).await;
 | 
					
						
							|  |  |  |       },
 | 
					
						
							|  |  |  |       SearchScript::UnregisterHandler { handler_id } => {
 | 
					
						
							|  |  |  |         self
 | 
					
						
							|  |  |  |           .scheduler
 | 
					
						
							|  |  |  |           .write()
 | 
					
						
							|  |  |  |           .await
 | 
					
						
							|  |  |  |           .unregister_handler(handler_id)
 | 
					
						
							|  |  |  |           .await;
 | 
					
						
							|  |  |  |       },
 | 
					
						
							|  |  |  |       SearchScript::AssertTaskStatus {
 | 
					
						
							|  |  |  |         task_id,
 | 
					
						
							|  |  |  |         expected_status,
 | 
					
						
							|  |  |  |       } => {
 | 
					
						
							|  |  |  |         let status = self
 | 
					
						
							|  |  |  |           .scheduler
 | 
					
						
							|  |  |  |           .read()
 | 
					
						
							|  |  |  |           .await
 | 
					
						
							|  |  |  |           .read_task(&task_id)
 | 
					
						
							|  |  |  |           .unwrap()
 | 
					
						
							|  |  |  |           .state()
 | 
					
						
							|  |  |  |           .clone();
 | 
					
						
							|  |  |  |         assert_eq!(status, expected_status);
 | 
					
						
							|  |  |  |       },
 | 
					
						
							|  |  |  |       SearchScript::AssertExecuteOrder {
 | 
					
						
							|  |  |  |         execute_order,
 | 
					
						
							|  |  |  |         rets,
 | 
					
						
							|  |  |  |       } => {
 | 
					
						
							|  |  |  |         let mut futures = FuturesUnordered::new();
 | 
					
						
							|  |  |  |         for ret in rets {
 | 
					
						
							|  |  |  |           futures.push(ret);
 | 
					
						
							| 
									
										
										
										
											2022-11-13 22:23:57 +08:00
										 |  |  |         }
 | 
					
						
							| 
									
										
										
										
											2023-02-13 09:29:49 +08:00
										 |  |  |         let mut orders = vec![];
 | 
					
						
							|  |  |  |         while let Some(Ok(result)) = futures.next().await {
 | 
					
						
							|  |  |  |           orders.push(result.id);
 | 
					
						
							|  |  |  |           assert!(result.state.is_done());
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  |         assert_eq!(execute_order, orders);
 | 
					
						
							|  |  |  |       },
 | 
					
						
							| 
									
										
										
										
											2022-11-13 22:23:57 +08:00
										 |  |  |     }
 | 
					
						
							| 
									
										
										
										
											2023-02-13 09:29:49 +08:00
										 |  |  |   }
 | 
					
						
							| 
									
										
										
										
											2022-11-13 22:23:57 +08:00
										 |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | pub struct MockTextTaskHandler();
 | 
					
						
							| 
									
										
										
										
											2022-12-09 09:19:47 +08:00
										 |  |  | #[async_trait]
 | 
					
						
							| 
									
										
										
										
											2022-11-13 22:23:57 +08:00
										 |  |  | impl RefCountValue for MockTextTaskHandler {
 | 
					
						
							| 
									
										
										
										
											2023-02-13 09:29:49 +08:00
										 |  |  |   async fn did_remove(&self) {}
 | 
					
						
							| 
									
										
										
										
											2022-11-13 22:23:57 +08:00
										 |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | impl TaskHandler for MockTextTaskHandler {
 | 
					
						
							| 
									
										
										
										
											2023-02-13 09:29:49 +08:00
										 |  |  |   fn handler_id(&self) -> &str {
 | 
					
						
							|  |  |  |     "1"
 | 
					
						
							|  |  |  |   }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {
 | 
					
						
							|  |  |  |     let mut rng = rand::thread_rng();
 | 
					
						
							|  |  |  |     let millisecond = rng.gen_range(1..50);
 | 
					
						
							|  |  |  |     Box::pin(async move {
 | 
					
						
							|  |  |  |       match content {
 | 
					
						
							|  |  |  |         TaskContent::Text(_s) => {
 | 
					
						
							|  |  |  |           tokio::time::sleep(Duration::from_millis(millisecond)).await;
 | 
					
						
							|  |  |  |         },
 | 
					
						
							|  |  |  |         TaskContent::Blob(_) => panic!("Only support text"),
 | 
					
						
							|  |  |  |       }
 | 
					
						
							|  |  |  |       Ok(())
 | 
					
						
							|  |  |  |     })
 | 
					
						
							|  |  |  |   }
 | 
					
						
							| 
									
										
										
										
											2022-11-13 22:23:57 +08:00
										 |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | pub fn make_text_background_task(task_id: TaskId, s: &str) -> (Task, Receiver<TaskResult>) {
 | 
					
						
							| 
									
										
										
										
											2023-02-13 09:29:49 +08:00
										 |  |  |   let mut task = Task::background("1", task_id, TaskContent::Text(s.to_owned()));
 | 
					
						
							|  |  |  |   let recv = task.recv.take().unwrap();
 | 
					
						
							|  |  |  |   (task, recv)
 | 
					
						
							| 
									
										
										
										
											2022-11-13 22:23:57 +08:00
										 |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | pub fn make_text_user_interactive_task(task_id: TaskId, s: &str) -> (Task, Receiver<TaskResult>) {
 | 
					
						
							| 
									
										
										
										
											2023-02-13 09:29:49 +08:00
										 |  |  |   let mut task = Task::user_interactive("1", task_id, TaskContent::Text(s.to_owned()));
 | 
					
						
							|  |  |  |   let recv = task.recv.take().unwrap();
 | 
					
						
							|  |  |  |   (task, recv)
 | 
					
						
							| 
									
										
										
										
											2022-11-13 22:23:57 +08:00
										 |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | pub struct MockBlobTaskHandler();
 | 
					
						
							| 
									
										
										
										
											2022-12-09 09:19:47 +08:00
										 |  |  | #[async_trait]
 | 
					
						
							| 
									
										
										
										
											2022-11-13 22:23:57 +08:00
										 |  |  | impl RefCountValue for MockBlobTaskHandler {
 | 
					
						
							| 
									
										
										
										
											2023-02-13 09:29:49 +08:00
										 |  |  |   async fn did_remove(&self) {}
 | 
					
						
							| 
									
										
										
										
											2022-11-13 22:23:57 +08:00
										 |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | impl TaskHandler for MockBlobTaskHandler {
 | 
					
						
							| 
									
										
										
										
											2023-02-13 09:29:49 +08:00
										 |  |  |   fn handler_id(&self) -> &str {
 | 
					
						
							|  |  |  |     "2"
 | 
					
						
							|  |  |  |   }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {
 | 
					
						
							|  |  |  |     Box::pin(async move {
 | 
					
						
							|  |  |  |       match content {
 | 
					
						
							|  |  |  |         TaskContent::Text(_) => panic!("Only support blob"),
 | 
					
						
							|  |  |  |         TaskContent::Blob(bytes) => {
 | 
					
						
							|  |  |  |           let _msg = String::from_utf8(bytes).unwrap();
 | 
					
						
							|  |  |  |           tokio::time::sleep(Duration::from_millis(20)).await;
 | 
					
						
							|  |  |  |         },
 | 
					
						
							|  |  |  |       }
 | 
					
						
							|  |  |  |       Ok(())
 | 
					
						
							|  |  |  |     })
 | 
					
						
							|  |  |  |   }
 | 
					
						
							| 
									
										
										
										
											2022-11-13 22:23:57 +08:00
										 |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | pub struct MockTimeoutTaskHandler();
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | impl TaskHandler for MockTimeoutTaskHandler {
 | 
					
						
							| 
									
										
										
										
											2023-02-13 09:29:49 +08:00
										 |  |  |   fn handler_id(&self) -> &str {
 | 
					
						
							|  |  |  |     "3"
 | 
					
						
							|  |  |  |   }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {
 | 
					
						
							|  |  |  |     Box::pin(async move {
 | 
					
						
							|  |  |  |       match content {
 | 
					
						
							|  |  |  |         TaskContent::Text(_) => panic!("Only support blob"),
 | 
					
						
							|  |  |  |         TaskContent::Blob(_bytes) => {
 | 
					
						
							|  |  |  |           tokio::time::sleep(Duration::from_millis(2000)).await;
 | 
					
						
							|  |  |  |         },
 | 
					
						
							|  |  |  |       }
 | 
					
						
							|  |  |  |       Ok(())
 | 
					
						
							|  |  |  |     })
 | 
					
						
							|  |  |  |   }
 | 
					
						
							| 
									
										
										
										
											2022-11-13 22:23:57 +08:00
										 |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | pub fn make_timeout_task(task_id: TaskId) -> (Task, Receiver<TaskResult>) {
 | 
					
						
							| 
									
										
										
										
											2023-02-13 09:29:49 +08:00
										 |  |  |   let mut task = Task::background("3", task_id, TaskContent::Blob(vec![]));
 | 
					
						
							|  |  |  |   let recv = task.recv.take().unwrap();
 | 
					
						
							|  |  |  |   (task, recv)
 | 
					
						
							| 
									
										
										
										
											2022-11-13 22:23:57 +08:00
										 |  |  | }
 |