use crate::entities::{SearchFilterPB, SearchResponsePB, SearchStatePB}; use allo_isolate::Isolate; use flowy_error::FlowyResult; use lib_infra::async_trait::async_trait; use lib_infra::isolate_stream::{IsolateSink, SinkExt}; use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use tokio_stream::{self, Stream, StreamExt}; use tracing::{error, trace}; #[derive(Debug, Clone, Eq, PartialEq, Hash)] pub enum SearchType { Folder, Document, } #[async_trait] pub trait SearchHandler: Send + Sync + 'static { /// returns the type of search this handler is responsible for fn search_type(&self) -> SearchType; /// performs a search and returns a stream of results async fn perform_search( &self, query: String, filter: Option, ) -> Pin> + Send + 'static>>; } /// The [SearchManager] is used to inject multiple [SearchHandler]'s /// to delegate a search to all relevant handlers, and stream the result /// to the client until the query has been fully completed. /// pub struct SearchManager { pub handlers: HashMap>, current_search: Arc>>, } impl SearchManager { pub fn new(handlers: Vec>) -> Self { let handlers: HashMap> = handlers .into_iter() .map(|handler| (handler.search_type(), handler)) .collect(); Self { handlers, current_search: Arc::new(tokio::sync::Mutex::new(None)), } } pub fn get_handler(&self, search_type: SearchType) -> Option<&Arc> { self.handlers.get(&search_type) } pub async fn perform_search( &self, query: String, stream_port: i64, filter: Option, search_id: String, ) { // Cancel previous search by updating current_search *self.current_search.lock().await = Some(search_id.clone()); let handlers = self.handlers.clone(); let sink = IsolateSink::new(Isolate::new(stream_port)); let mut join_handles = vec![]; let current_search = self.current_search.clone(); tracing::info!("[Search] perform search: {}", query); for (_, handler) in handlers { let mut clone_sink = sink.clone(); let query = query.clone(); let filter = filter.clone(); let search_id = search_id.clone(); let current_search = current_search.clone(); let handle = tokio::spawn(async move { if !is_current_search(¤t_search, &search_id).await { trace!("[Search] cancel search: {}", query); return; } let mut stream = handler.perform_search(query.clone(), filter).await; while let Some(Ok(search_result)) = stream.next().await { if !is_current_search(¤t_search, &search_id).await { trace!("[Search] discard search stream: {}", query); return; } let resp = SearchStatePB { response: Some(search_result), search_id: search_id.clone(), }; if let Ok::, _>(data) = resp.try_into() { if let Err(err) = clone_sink.send(data).await { error!("Failed to send search result: {}", err); break; } } } if !is_current_search(¤t_search, &search_id).await { trace!("[Search] discard search result: {}", query); return; } let resp = SearchStatePB { response: None, search_id: search_id.clone(), }; if let Ok::, _>(data) = resp.try_into() { let _ = clone_sink.send(data).await; } }); join_handles.push(handle); } futures::future::join_all(join_handles).await; } } async fn is_current_search( current_search: &Arc>>, search_id: &str, ) -> bool { let current = current_search.lock().await; current.as_ref().map_or(false, |id| id == search_id) }