chore: index document even content is empty (#7996)

This commit is contained in:
Nathan.fooo 2025-05-28 10:11:45 +08:00 committed by GitHub
parent 837e754946
commit 8fcbf78714
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 293 additions and 120 deletions

View File

@ -889,7 +889,7 @@ dependencies = [
"parking_lot 0.12.1",
"percent-encoding",
"pin-project",
"prost 0.13.3",
"prost 0.13.5",
"rayon",
"reqwest 0.12.15",
"scraper 0.17.1",
@ -1063,7 +1063,7 @@ dependencies = [
"bytes",
"collab",
"getrandom 0.2.10",
"prost 0.13.3",
"prost 0.13.5",
"prost-build",
"protoc-bin-vendored",
"serde",
@ -1209,7 +1209,7 @@ dependencies = [
"collab-entity",
"collab-rt-protocol",
"database-entity",
"prost 0.13.3",
"prost 0.13.5",
"prost-build",
"protoc-bin-vendored",
"serde",
@ -1655,7 +1655,7 @@ dependencies = [
"chrono",
"collab-entity",
"infra",
"prost 0.13.3",
"prost 0.13.5",
"serde",
"serde_json",
"serde_repr",
@ -2162,9 +2162,9 @@ checksum = "25c7df09945d65ea8d70b3321547ed414bbc540aad5bac6883d021b970f35b04"
[[package]]
name = "fastrand"
version = "2.0.1"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5"
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
dependencies = [
"getrandom 0.2.10",
]
@ -2337,6 +2337,7 @@ dependencies = [
"flowy-storage-pub",
"flowy-user",
"flowy-user-pub",
"futures",
"futures-core",
"lib-dispatch",
"lib-infra",
@ -5454,12 +5455,12 @@ dependencies = [
[[package]]
name = "prost"
version = "0.13.3"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b0487d90e047de87f984913713b85c601c05609aad5b0df4b4573fbf69aa13f"
checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5"
dependencies = [
"bytes",
"prost-derive 0.13.3",
"prost-derive 0.13.5",
]
[[package]]
@ -5499,12 +5500,12 @@ dependencies = [
[[package]]
name = "prost-derive"
version = "0.13.3"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5"
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
dependencies = [
"anyhow",
"itertools 0.13.0",
"itertools 0.14.0",
"proc-macro2",
"quote",
"syn 2.0.94",
@ -6986,9 +6987,9 @@ dependencies = [
[[package]]
name = "smallvec"
version = "1.13.2"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
checksum = "8917285742e9f3e1683f0a9c4e6b57960b7314d0b08d30d1ecd426713ee2eee9"
[[package]]
name = "socket2"

View File

@ -100,53 +100,51 @@ impl InstantIndexedDataWriter {
match guard.get(&id) {
Some(wo) => {
if let Some(collab_rc) = wo.collab.upgrade() {
if let Some(data) = collab_rc
let data = collab_rc
.get_unindexed_data(&wo.collab_object.collab_type)
.await
{
// Snapshot consumers
let consumers_guard = consumers.read().await;
for consumer in consumers_guard.iter() {
let workspace_id = match Uuid::parse_str(&wo.collab_object.workspace_id) {
Ok(id) => id,
Err(err) => {
error!(
"Invalid workspace_id {}: {}",
wo.collab_object.workspace_id, err
);
continue;
},
};
let object_id = match Uuid::parse_str(&wo.collab_object.object_id) {
Ok(id) => id,
Err(err) => {
error!("Invalid object_id {}: {}", wo.collab_object.object_id, err);
continue;
},
};
match consumer
.consume_collab(
&workspace_id,
data.clone(),
&object_id,
wo.collab_object.collab_type,
)
.await
{
Ok(is_indexed) => {
if is_indexed {
trace!("[Indexing] {} consumed {}", consumer.consumer_id(), id);
}
},
Err(err) => {
error!(
"Consumer {} failed on {}: {}",
consumer.consumer_id(),
id,
err
);
},
}
.await;
let consumers_guard = consumers.read().await;
for consumer in consumers_guard.iter() {
let workspace_id = match Uuid::parse_str(&wo.collab_object.workspace_id) {
Ok(id) => id,
Err(err) => {
error!(
"Invalid workspace_id {}: {}",
wo.collab_object.workspace_id, err
);
continue;
},
};
let object_id = match Uuid::parse_str(&wo.collab_object.object_id) {
Ok(id) => id,
Err(err) => {
error!("Invalid object_id {}: {}", wo.collab_object.object_id, err);
continue;
},
};
match consumer
.consume_collab(
&workspace_id,
data.clone(),
&object_id,
wo.collab_object.collab_type,
)
.await
{
Ok(is_indexed) => {
if is_indexed {
trace!("[Indexing] {} consumed {}", consumer.consumer_id(), id);
}
},
Err(err) => {
error!(
"Consumer {} failed on {}: {}",
consumer.consumer_id(),
id,
err
);
},
}
}
} else {
@ -281,7 +279,7 @@ pub fn unindexed_collab_from_encoded_collab(
workspace_id,
object_id,
collab_type,
data,
data: Some(data),
metadata: UnindexedCollabMetadata::default(), // default means do not update metadata
})
},
@ -313,7 +311,7 @@ pub trait InstantIndexedDataConsumer: Send + Sync + 'static {
async fn consume_collab(
&self,
workspace_id: &Uuid,
data: UnindexedData,
data: Option<UnindexedData>,
object_id: &Uuid,
collab_type: CollabType,
) -> Result<bool, FlowyError>;

View File

@ -1,14 +1,59 @@
use crate::util::unzip;
use bytes::Bytes;
use event_integration_test::user_event::use_localhost_af_cloud;
use event_integration_test::EventIntegrationTest;
use flowy_core::DEFAULT_NAME;
use flowy_search::entities::SearchResponsePB;
use flowy_search::entities::{SearchResponsePB, SearchStatePB};
use flowy_search::services::manager::SearchType;
use flowy_user::errors::FlowyResult;
use flowy_user_pub::entities::WorkspaceType;
use futures::StreamExt;
use futures::{Sink, StreamExt};
use lib_infra::util::timestamp;
use std::convert::TryFrom;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use uuid::Uuid;
#[derive(Clone)]
struct CollectingSink {
results: Arc<Mutex<Vec<Vec<u8>>>>,
}
impl CollectingSink {
fn new() -> Self {
Self {
results: Arc::new(Mutex::new(Vec::new())),
}
}
fn get_results(&self) -> Vec<Vec<u8>> {
self.results.lock().unwrap().clone()
}
}
impl Sink<Vec<u8>> for CollectingSink {
type Error = String;
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
self.results.lock().unwrap().push(item);
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}
// Helper function to wait for search indexing to complete
async fn wait_for_indexing(test: &EventIntegrationTest) {
let mut rx = test
@ -22,7 +67,7 @@ async fn wait_for_indexing(test: &EventIntegrationTest) {
}
// Helper function to perform search and collect results
async fn perform_search(
async fn perform_search_with_workspace(
test: &EventIntegrationTest,
query: &str,
workspace_id: &Uuid,
@ -39,6 +84,31 @@ async fn perform_search(
stream.collect().await
}
async fn perform_search(
test: &EventIntegrationTest,
query: &str,
) -> Vec<FlowyResult<SearchResponsePB>> {
let sink = CollectingSink::new();
let search_id = timestamp();
test
.search_manager
.perform_search_with_sink(query.to_string(), sink.clone(), search_id)
.await;
// Parse the collected results
let mut results = Vec::new();
for data in sink.get_results() {
if let Ok(search_state) = SearchStatePB::try_from(Bytes::from(data)) {
if let Some(response) = search_state.response {
results.push(Ok(response));
}
}
}
results
}
// Helper function to wait for document to be indexed with a specified query
async fn wait_for_document_indexing(
test: &EventIntegrationTest,
@ -52,7 +122,7 @@ async fn wait_for_document_indexing(
let mut result = Vec::new();
while start_time.elapsed() < timeout {
result = perform_search(test, query, workspace_id).await;
result = perform_search_with_workspace(test, query, workspace_id).await;
if let Some(Ok(search_result)) = result.first() {
if let Some(local) = &search_result.local_search_result {
@ -83,7 +153,7 @@ async fn anon_user_multiple_workspace_search_test() {
// Wait for initial indexing to complete
wait_for_indexing(&test).await;
// TEST CASE 1: Search by page title
let result = perform_search(&test, "japan", &first_workspace_id).await;
let result = perform_search_with_workspace(&test, "japan", &first_workspace_id).await;
let local = result[0]
.as_ref()
.unwrap()
@ -106,7 +176,7 @@ async fn anon_user_multiple_workspace_search_test() {
);
// TEST CASE 2: Search by page content
let result = perform_search(&test, "Niseko", &first_workspace_id).await;
let result = perform_search_with_workspace(&test, "Niseko", &first_workspace_id).await;
let local = result[0]
.as_ref()
.unwrap()
@ -188,7 +258,7 @@ async fn anon_user_multiple_workspace_search_test() {
wait_for_indexing(&test).await;
// Search in second workspace
let result = perform_search(&test, "japan", &second_workspace_id).await;
let result = perform_search_with_workspace(&test, "japan", &second_workspace_id).await;
assert!(
result[0].as_ref().unwrap().local_search_result.is_none(),
"Empty workspace should not have results for 'japan'"
@ -199,9 +269,20 @@ async fn anon_user_multiple_workspace_search_test() {
.open_workspace(&first_workspace_id.to_string(), WorkspaceType::Local.into())
.await;
wait_for_indexing(&test).await;
let result = perform_search(&test, "japan", &first_workspace_id).await;
let result = perform_search_with_workspace(&test, "japan", &first_workspace_id).await;
assert!(
!result.is_empty(),
"First workspace should still have search results after switching workspaces"
);
}
#[tokio::test]
async fn search_with_empty_query_test() {
use_localhost_af_cloud().await;
let test = EventIntegrationTest::new().await;
let _ = test.af_cloud_sign_up().await;
let _ = test.get_workspace_id().await;
let result = perform_search(&test, "").await;
dbg!(&result);
assert!(result.is_empty());
}

View File

@ -24,7 +24,7 @@ pub struct UnindexedCollab {
pub workspace_id: Uuid,
pub object_id: Uuid,
pub collab_type: CollabType,
pub data: UnindexedData,
pub data: Option<UnindexedData>,
pub metadata: UnindexedCollabMetadata,
}

View File

@ -58,7 +58,7 @@ pulldown-cmark = { version = "0.13.0", optional = true }
[dev-dependencies]
uuid.workspace = true
tracing-subscriber = { version = "0.3.3", features = ["env-filter"] }
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
tempfile = "3.8.1"

View File

@ -284,9 +284,13 @@ async fn spawn_generate_embeddings(
for record in records {
if let Some(indexer) = indexer_provider.indexer_for(record.collab_type) {
let paragraphs = match record.data {
UnindexedData::Paragraphs(paragraphs) => paragraphs,
None => continue, Some(data) => {
match data {
UnindexedData::Paragraphs(paragraphs) => paragraphs,
UnindexedData::Text(text) => text.split('\n').map(|s| s.to_string()).collect(),
};
}
}};
let embedder = embedder.clone();
match indexer.create_embedded_chunks_from_text(
record.object_id,

View File

@ -42,6 +42,7 @@ flowy-ai-pub = { workspace = true }
tracing.workspace = true
futures-core = { version = "0.3", default-features = false }
futures = "0.3"
bytes.workspace = true
tokio = { workspace = true, features = ["full"] }
tokio-stream = { workspace = true, features = ["sync"] }
@ -84,5 +85,5 @@ openssl_vendored = ["flowy-sqlite/openssl_vendored"]
verbose_log = [
"flowy-document/verbose_log",
"flowy-database2/verbose_log",
"client-api/sync_verbose_log"
"client-api/sync_verbose_log",
]

View File

@ -6,7 +6,7 @@ use collab_folder::{View, ViewLayout};
use collab_integrate::instant_indexed_data_provider::unindexed_data_form_collab;
use collab_plugins::local_storage::kv::doc::CollabKVAction;
use collab_plugins::local_storage::kv::KVTransactionDB;
use flowy_ai_pub::entities::{UnindexedCollab, UnindexedCollabMetadata, UnindexedData};
use flowy_ai_pub::entities::{UnindexedCollab, UnindexedCollabMetadata};
use flowy_ai_pub::persistence::{
batch_upsert_index_collab, select_indexed_collab_ids, IndexCollabRecordTable,
};
@ -122,8 +122,7 @@ impl FullIndexedDataWriter {
return Ok(());
}
// chunk the unindex_ids into smaller chunks
let chunk_size = 20;
let chunk_size = 50;
info!(
"[Indexing] {} consumers start indexing {} unindexed documents",
self.consumers.read().await.len(),
@ -145,23 +144,37 @@ impl FullIndexedDataWriter {
info!("[Indexing] cancelled");
break;
}
info!(
"[Indexing] {} unindexed documents found in chunk",
unindexed.len()
);
for consumer in self.consumers.read().await.iter() {
for data in &unindexed {
trace!(
"[Indexing] {} consume {} unindexed data",
consumer.consumer_id(),
data.object_id
);
if let Err(err) = consumer.consume_indexed_data(uid, data).await {
error!(
"[Indexing] Failed to consume unindexed data: {}: {:?}",
consumer.consumer_id(),
err
);
}
}
let consumers = self.consumers.read().await;
for consumer in consumers.iter() {
let consumer_tasks: Vec<_> = unindexed
.iter()
.map(|data| {
let consumer_id = consumer.consumer_id();
let object_id = data.object_id;
async move {
trace!(
"[Indexing] {} consume {} unindexed data",
consumer_id,
object_id
);
if let Err(err) = consumer.consume_indexed_data(uid, data).await {
error!(
"[Indexing] Failed to consume unindexed data: {}: {:?}",
consumer_id, err
);
}
}
})
.collect();
futures::future::join_all(consumer_tasks).await;
}
if let Some(mut db) = self
.logged_user
.upgrade()
@ -172,7 +185,7 @@ impl FullIndexedDataWriter {
.map(|v| IndexCollabRecordTable {
oid: v.object_id.to_string(),
workspace_id: v.workspace_id.to_string(),
content_hash: v.data.content_hash(),
content_hash: v.data.map(|v| v.content_hash()).unwrap_or_default(),
})
.collect::<Vec<_>>();
@ -269,15 +282,22 @@ impl FullIndexedDataWriter {
};
if load_success {
if let Some(data) = unindexed_data_form_collab(&collab, &collab_type) {
results.push(UnindexedCollab {
workspace_id,
object_id,
collab_type,
data,
metadata,
});
}
let data = unindexed_data_form_collab(&collab, &collab_type);
results.push(UnindexedCollab {
workspace_id,
object_id,
collab_type,
data,
metadata,
});
} else {
results.push(UnindexedCollab {
workspace_id,
object_id,
collab_type,
data: None,
metadata,
});
}
},
CollabType::Database => {
@ -285,7 +305,7 @@ impl FullIndexedDataWriter {
workspace_id,
object_id,
collab_type: CollabType::Database,
data: UnindexedData::Text(String::new()),
data: None,
metadata,
});
},

View File

@ -34,8 +34,15 @@ impl FullIndexedDataConsumer for EmbeddingFullIndexConsumer {
return Ok(());
}
if data.data.is_empty() {
return Ok(());
match data.data.as_ref() {
None => {
return Ok(());
},
Some(data) => {
if data.is_empty() {
return Ok(());
}
},
}
let scheduler = flowy_ai::embeddings::context::EmbedContext::shared().get_scheduler()?;
@ -65,10 +72,15 @@ impl InstantIndexedDataConsumer for EmbeddingsInstantConsumerImpl {
async fn consume_collab(
&self,
workspace_id: &Uuid,
data: UnindexedData,
data: Option<UnindexedData>,
object_id: &Uuid,
collab_type: CollabType,
) -> Result<bool, FlowyError> {
if data.is_none() {
return Ok(false);
}
let data = data.unwrap();
if data.is_empty() {
return Ok(false);
}
@ -95,7 +107,7 @@ impl InstantIndexedDataConsumer for EmbeddingsInstantConsumerImpl {
workspace_id: *workspace_id,
object_id: *object_id,
collab_type,
data,
data: Some(data),
metadata: UnindexedCollabMetadata::default(),
};
@ -161,7 +173,7 @@ impl FullIndexedDataConsumer for SearchFullIndexConsumer {
.upgrade()
.ok_or_else(|| FlowyError::internal().with_context("Tantivy state dropped"))?;
let object_id = data.object_id.to_string();
let content = data.data.clone().into_string();
let content = data.data.clone().map(|v| v.into_string());
strong.write().await.add_document(
&object_id,
@ -282,7 +294,7 @@ impl InstantIndexedDataConsumer for SearchInstantIndexImpl {
async fn consume_collab(
&self,
workspace_id: &Uuid,
data: UnindexedData,
data: Option<UnindexedData>,
object_id: &Uuid,
_collab_type: CollabType,
) -> Result<bool, FlowyError> {
@ -302,7 +314,11 @@ impl InstantIndexedDataConsumer for SearchInstantIndexImpl {
let view = folder_manager.get_view(&object_id.to_string()).await?;
// Create a combined hash that includes content + view name + icon
let content_hash = data.content_hash();
let content_hash = match &data {
None => "".to_string(),
Some(data) => data.content_hash(),
};
let name_hash = format!("{}:{}", content_hash, view.name);
let combined_hash = if let Some(icon) = &view.icon {
format!("{}:{}:{}", name_hash, icon.ty.clone() as u8, icon.value)
@ -328,7 +344,7 @@ impl InstantIndexedDataConsumer for SearchInstantIndexImpl {
self.consume_history.insert(*object_id, combined_hash);
state.write().await.add_document(
&object_id.to_string(),
data.into_string(),
data.map(|v| v.into_string()),
Some(view.name.clone()),
view.icon.clone().map(|v| ViewIcon {
ty: IconType::from(v.ty as u8),

View File

@ -40,7 +40,7 @@ getrandom = { version = "0.2", features = ["js"] }
[dev-dependencies]
tempfile = "3.4.0"
tracing-subscriber = { version = "0.3.3", features = ["env-filter"] }
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
[build-dependencies]

View File

@ -87,6 +87,24 @@ impl DocumentTantivyState {
}
pub fn add_document(
&mut self,
id: &str,
content: Option<String>,
name: Option<String>,
icon: Option<ViewIcon>,
) -> FlowyResult<()> {
match content {
None => {
self.add_document_metadata(id, name, icon)?;
},
Some(content) => {
self.add_document_content(id, content, name, icon)?;
},
}
Ok(())
}
fn add_document_content(
&mut self,
id: &str,
content: String,

View File

@ -79,10 +79,17 @@ impl SearchHandler for DocumentCloudSearchHandler {
// Prepare input for search summary generation.
let summary_input: Vec<SearchResult> = result_items
.iter()
.map(|v| SearchResult {
object_id: v.object_id,
content: v.content.clone(),
})
.flat_map(|v| {
if v.content.is_empty() {
warn!("Search result item with empty content: {:?}", v);
None
} else {
Some(SearchResult {
object_id: v.object_id,
content: v.content.clone(),
})
}
})
.collect();
// Build search response items.

View File

@ -22,6 +22,12 @@ pub(crate) async fn stream_search_handler(
let query = data.into_inner();
let manager = upgrade_manager(manager)?;
let search_id = query.search_id.parse::<i64>().unwrap_or(timestamp());
if query.search.is_empty() {
tracing::trace!("Received empty search query, skipping search");
return Ok(());
}
manager
.perform_search(query.search, query.stream_port, search_id)
.await;

View File

@ -5,6 +5,7 @@ use arc_swap::ArcSwapOption;
use dashmap::DashMap;
use flowy_error::FlowyResult;
use flowy_search_pub::tantivy_state::DocumentTantivyState;
use futures::Sink;
use lib_infra::async_trait::async_trait;
use lib_infra::isolate_stream::{IsolateSink, SinkExt};
use std::pin::Pin;
@ -111,7 +112,15 @@ impl SearchManager {
}
pub async fn perform_search(&self, query: String, stream_port: i64, search_id: i64) {
// Check workspace_id before acquiring lock
let sink = IsolateSink::new(Isolate::new(stream_port));
self.perform_search_with_sink(query, sink, search_id).await;
}
pub async fn perform_search_with_sink<S>(&self, query: String, mut sink: S, search_id: i64)
where
S: Sink<Vec<u8>> + Clone + Send + Unpin + 'static,
S::Error: std::fmt::Display,
{
let workspace_id = match self.workspace_id.load_full() {
Some(id) => id,
None => {
@ -130,8 +139,20 @@ impl SearchManager {
}
info!("[Search] perform search: {}", query);
if query.is_empty() {
let resp = SearchStatePB {
response: None,
search_id: search_id.to_string(),
};
if let Ok::<Vec<u8>, _>(data) = resp.try_into() {
if let Err(err) = sink.send(data).await {
error!("Failed to send empty search result: {}", err);
}
}
return;
}
let handlers = self.handlers.clone();
let sink = IsolateSink::new(Isolate::new(stream_port));
let current_search = self.current_search.clone();
let mut join_handles = vec![];

View File

@ -91,7 +91,7 @@ impl Builder {
let subscriber = tracing_subscriber::fmt()
.with_timer(CustomTime)
.with_max_level(tracing::Level::TRACE)
.with_ansi(self.platform.is_not_ios())
.with_ansi(self.platform.is_desktop())
.with_writer(StreamLog {
sender: stream_log_sender.clone(),
})