mirror of
				https://github.com/AppFlowy-IO/AppFlowy.git
				synced 2025-11-04 12:03:28 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			217 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
			
		
		
	
	
			217 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
use anyhow::Error;
 | 
						|
use flowy_task::{
 | 
						|
  Task, TaskContent, TaskDispatcher, TaskHandler, TaskId, TaskResult, TaskRunner, TaskState,
 | 
						|
};
 | 
						|
use futures::stream::FuturesUnordered;
 | 
						|
use futures::StreamExt;
 | 
						|
use lib_infra::async_trait::async_trait;
 | 
						|
use lib_infra::future::BoxResultFuture;
 | 
						|
use lib_infra::ref_map::RefCountValue;
 | 
						|
use rand::Rng;
 | 
						|
use std::sync::Arc;
 | 
						|
use std::time::Duration;
 | 
						|
use tokio::sync::oneshot::Receiver;
 | 
						|
use tokio::sync::RwLock;
 | 
						|
 | 
						|
pub enum SearchScript {
 | 
						|
  AddTask {
 | 
						|
    task: Task,
 | 
						|
  },
 | 
						|
  AddTasks {
 | 
						|
    tasks: Vec<Task>,
 | 
						|
  },
 | 
						|
  #[allow(dead_code)]
 | 
						|
  Wait {
 | 
						|
    millisecond: u64,
 | 
						|
  },
 | 
						|
  CancelTask {
 | 
						|
    task_id: TaskId,
 | 
						|
  },
 | 
						|
  UnregisterHandler {
 | 
						|
    handler_id: String,
 | 
						|
  },
 | 
						|
  AssertTaskStatus {
 | 
						|
    task_id: TaskId,
 | 
						|
    expected_status: TaskState,
 | 
						|
  },
 | 
						|
  AssertExecuteOrder {
 | 
						|
    execute_order: Vec<u32>,
 | 
						|
    rets: Vec<Receiver<TaskResult>>,
 | 
						|
  },
 | 
						|
}
 | 
						|
 | 
						|
pub struct SearchTest {
 | 
						|
  scheduler: Arc<RwLock<TaskDispatcher>>,
 | 
						|
}
 | 
						|
 | 
						|
impl SearchTest {
 | 
						|
  pub async fn new() -> Self {
 | 
						|
    let duration = Duration::from_millis(1000);
 | 
						|
    let mut scheduler = TaskDispatcher::new(duration);
 | 
						|
    scheduler.register_handler(Arc::new(MockTextTaskHandler()));
 | 
						|
    scheduler.register_handler(Arc::new(MockBlobTaskHandler()));
 | 
						|
    scheduler.register_handler(Arc::new(MockTimeoutTaskHandler()));
 | 
						|
 | 
						|
    let scheduler = Arc::new(RwLock::new(scheduler));
 | 
						|
    tokio::spawn(TaskRunner::run(scheduler.clone()));
 | 
						|
 | 
						|
    Self { scheduler }
 | 
						|
  }
 | 
						|
 | 
						|
  pub async fn next_task_id(&self) -> TaskId {
 | 
						|
    self.scheduler.read().await.next_task_id()
 | 
						|
  }
 | 
						|
 | 
						|
  pub async fn run_scripts(&self, scripts: Vec<SearchScript>) {
 | 
						|
    for script in scripts {
 | 
						|
      self.run_script(script).await;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  pub async fn run_script(&self, script: SearchScript) {
 | 
						|
    match script {
 | 
						|
      SearchScript::AddTask { task } => {
 | 
						|
        self.scheduler.write().await.add_task(task);
 | 
						|
      },
 | 
						|
      SearchScript::CancelTask { task_id } => {
 | 
						|
        self.scheduler.write().await.cancel_task(task_id);
 | 
						|
      },
 | 
						|
      SearchScript::AddTasks { tasks } => {
 | 
						|
        let mut scheduler = self.scheduler.write().await;
 | 
						|
        for task in tasks {
 | 
						|
          scheduler.add_task(task);
 | 
						|
        }
 | 
						|
      },
 | 
						|
      SearchScript::Wait { millisecond } => {
 | 
						|
        tokio::time::sleep(Duration::from_millis(millisecond)).await;
 | 
						|
      },
 | 
						|
      SearchScript::UnregisterHandler { handler_id } => {
 | 
						|
        self
 | 
						|
          .scheduler
 | 
						|
          .write()
 | 
						|
          .await
 | 
						|
          .unregister_handler(handler_id)
 | 
						|
          .await;
 | 
						|
      },
 | 
						|
      SearchScript::AssertTaskStatus {
 | 
						|
        task_id,
 | 
						|
        expected_status,
 | 
						|
      } => {
 | 
						|
        let status = self
 | 
						|
          .scheduler
 | 
						|
          .read()
 | 
						|
          .await
 | 
						|
          .read_task(&task_id)
 | 
						|
          .unwrap()
 | 
						|
          .state()
 | 
						|
          .clone();
 | 
						|
        assert_eq!(status, expected_status);
 | 
						|
      },
 | 
						|
      SearchScript::AssertExecuteOrder {
 | 
						|
        execute_order,
 | 
						|
        rets,
 | 
						|
      } => {
 | 
						|
        let mut futures = FuturesUnordered::new();
 | 
						|
        for ret in rets {
 | 
						|
          futures.push(ret);
 | 
						|
        }
 | 
						|
        let mut orders = vec![];
 | 
						|
        while let Some(Ok(result)) = futures.next().await {
 | 
						|
          orders.push(result.id);
 | 
						|
          assert!(result.state.is_done());
 | 
						|
        }
 | 
						|
        assert_eq!(execute_order, orders);
 | 
						|
      },
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
pub struct MockTextTaskHandler();
 | 
						|
#[async_trait]
 | 
						|
impl RefCountValue for MockTextTaskHandler {
 | 
						|
  async fn did_remove(&self) {}
 | 
						|
}
 | 
						|
 | 
						|
impl TaskHandler for MockTextTaskHandler {
 | 
						|
  fn handler_id(&self) -> &str {
 | 
						|
    "1"
 | 
						|
  }
 | 
						|
 | 
						|
  fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {
 | 
						|
    let mut rng = rand::thread_rng();
 | 
						|
    let millisecond = rng.gen_range(1..50);
 | 
						|
    Box::pin(async move {
 | 
						|
      match content {
 | 
						|
        TaskContent::Text(_s) => {
 | 
						|
          tokio::time::sleep(Duration::from_millis(millisecond)).await;
 | 
						|
        },
 | 
						|
        TaskContent::Blob(_) => panic!("Only support text"),
 | 
						|
      }
 | 
						|
      Ok(())
 | 
						|
    })
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
pub fn make_text_background_task(task_id: TaskId, s: &str) -> (Task, Receiver<TaskResult>) {
 | 
						|
  let mut task = Task::background("1", task_id, TaskContent::Text(s.to_owned()));
 | 
						|
  let recv = task.recv.take().unwrap();
 | 
						|
  (task, recv)
 | 
						|
}
 | 
						|
 | 
						|
pub fn make_text_user_interactive_task(task_id: TaskId, s: &str) -> (Task, Receiver<TaskResult>) {
 | 
						|
  let mut task = Task::user_interactive("1", task_id, TaskContent::Text(s.to_owned()));
 | 
						|
  let recv = task.recv.take().unwrap();
 | 
						|
  (task, recv)
 | 
						|
}
 | 
						|
 | 
						|
pub struct MockBlobTaskHandler();
 | 
						|
#[async_trait]
 | 
						|
impl RefCountValue for MockBlobTaskHandler {
 | 
						|
  async fn did_remove(&self) {}
 | 
						|
}
 | 
						|
 | 
						|
impl TaskHandler for MockBlobTaskHandler {
 | 
						|
  fn handler_id(&self) -> &str {
 | 
						|
    "2"
 | 
						|
  }
 | 
						|
 | 
						|
  fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {
 | 
						|
    Box::pin(async move {
 | 
						|
      match content {
 | 
						|
        TaskContent::Text(_) => panic!("Only support blob"),
 | 
						|
        TaskContent::Blob(bytes) => {
 | 
						|
          let _msg = String::from_utf8(bytes).unwrap();
 | 
						|
          tokio::time::sleep(Duration::from_millis(20)).await;
 | 
						|
        },
 | 
						|
      }
 | 
						|
      Ok(())
 | 
						|
    })
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
pub struct MockTimeoutTaskHandler();
 | 
						|
 | 
						|
impl TaskHandler for MockTimeoutTaskHandler {
 | 
						|
  fn handler_id(&self) -> &str {
 | 
						|
    "3"
 | 
						|
  }
 | 
						|
 | 
						|
  fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {
 | 
						|
    Box::pin(async move {
 | 
						|
      match content {
 | 
						|
        TaskContent::Text(_) => panic!("Only support blob"),
 | 
						|
        TaskContent::Blob(_bytes) => {
 | 
						|
          tokio::time::sleep(Duration::from_millis(2000)).await;
 | 
						|
        },
 | 
						|
      }
 | 
						|
      Ok(())
 | 
						|
    })
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
pub fn make_timeout_task(task_id: TaskId) -> (Task, Receiver<TaskResult>) {
 | 
						|
  let mut task = Task::background("3", task_id, TaskContent::Blob(vec![]));
 | 
						|
  let recv = task.recv.take().unwrap();
 | 
						|
  (task, recv)
 | 
						|
}
 |