From 51a11fbebda5e174ef809e48ad14b20b6ee30ccd Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Mon, 18 Nov 2024 16:21:47 +0800 Subject: [PATCH] chore: fix upload file limit (#6825) * chore: fix file upload limit * chore: update test --- .../file_storage/file_storage_listener.dart | 3 + .../sidebar/billing/sidebar_plan_bloc.dart | 7 + .../menu/sidebar/footer/sidebar_toast.dart | 19 + frontend/resources/translations/en.json | 3 +- .../flowy-document/tests/document/util.rs | 3 +- frontend/rust-lib/flowy-error/src/code.rs | 2 +- frontend/rust-lib/flowy-error/src/errors.rs | 12 +- .../flowy-storage-pub/src/chunked_byte.rs | 540 ++++++++++++------ .../rust-lib/flowy-storage-pub/src/storage.rs | 3 +- .../rust-lib/flowy-storage/src/file_cache.rs | 1 + .../rust-lib/flowy-storage/src/manager.rs | 308 +++++----- .../flowy-storage/src/notification.rs | 2 + .../rust-lib/flowy-storage/src/uploader.rs | 71 +-- .../tests/multiple_part_upload_test.rs | 15 +- 14 files changed, 626 insertions(+), 363 deletions(-) diff --git a/frontend/appflowy_flutter/lib/workspace/application/settings/file_storage/file_storage_listener.dart b/frontend/appflowy_flutter/lib/workspace/application/settings/file_storage/file_storage_listener.dart index 551d97fa64..58560bae03 100644 --- a/frontend/appflowy_flutter/lib/workspace/application/settings/file_storage/file_storage_listener.dart +++ b/frontend/appflowy_flutter/lib/workspace/application/settings/file_storage/file_storage_listener.dart @@ -36,6 +36,9 @@ class StoreageNotificationListener { case StorageNotification.FileStorageLimitExceeded: onError?.call(FlowyError.fromBuffer(data)); break; + case StorageNotification.SingleFileLimitExceeded: + onError?.call(FlowyError.fromBuffer(data)); + break; } } catch (e) { Log.error( diff --git a/frontend/appflowy_flutter/lib/workspace/application/sidebar/billing/sidebar_plan_bloc.dart b/frontend/appflowy_flutter/lib/workspace/application/sidebar/billing/sidebar_plan_bloc.dart index 6b3c6c29ee..7af2a547dd 100644 --- a/frontend/appflowy_flutter/lib/workspace/application/sidebar/billing/sidebar_plan_bloc.dart +++ b/frontend/appflowy_flutter/lib/workspace/application/sidebar/billing/sidebar_plan_bloc.dart @@ -110,6 +110,12 @@ class SidebarPlanBloc extends Bloc { tierIndicator: const SidebarToastTierIndicator.storageLimitHit(), ), ); + } else if (error.code == ErrorCode.SingleUploadLimitExceeded) { + emit( + state.copyWith( + tierIndicator: const SidebarToastTierIndicator.singleFileLimitHit(), + ), + ); } else { Log.error("Unhandle Unexpected error: $error"); } @@ -225,6 +231,7 @@ class SidebarPlanState with _$SidebarPlanState { @freezed class SidebarToastTierIndicator with _$SidebarToastTierIndicator { const factory SidebarToastTierIndicator.storageLimitHit() = _StorageLimitHit; + const factory SidebarToastTierIndicator.singleFileLimitHit() = _SingleFileLimitHit; const factory SidebarToastTierIndicator.aiMaxiLimitHit() = _aiMaxLimitHit; const factory SidebarToastTierIndicator.loading() = _Loading; } diff --git a/frontend/appflowy_flutter/lib/workspace/presentation/home/menu/sidebar/footer/sidebar_toast.dart b/frontend/appflowy_flutter/lib/workspace/presentation/home/menu/sidebar/footer/sidebar_toast.dart index aec8ceeb2f..37c1e068d3 100644 --- a/frontend/appflowy_flutter/lib/workspace/presentation/home/menu/sidebar/footer/sidebar_toast.dart +++ b/frontend/appflowy_flutter/lib/workspace/presentation/home/menu/sidebar/footer/sidebar_toast.dart @@ -30,6 +30,10 @@ class SidebarToast extends StatelessWidget { storageLimitHit: () => WidgetsBinding.instance.addPostFrameCallback( (_) => _showStorageLimitDialog(context), ), + singleFileLimitHit: () => + WidgetsBinding.instance.addPostFrameCallback( + (_) => _showSingleFileLimitDialog(context), + ), orElse: () {}, ); }, @@ -48,6 +52,7 @@ class SidebarToast extends StatelessWidget { onTap: () => _handleOnTap(context, SubscriptionPlanPB.AiMax), reason: LocaleKeys.sideBar_aiResponseLimitTitle.tr(), ), + singleFileLimitHit: () => const SizedBox.shrink(), ); }, ); @@ -66,6 +71,20 @@ class SidebarToast extends StatelessWidget { }, ); + void _showSingleFileLimitDialog(BuildContext context) => showConfirmDialog( + context: context, + title: LocaleKeys.sideBar_upgradeToPro.tr(), + description: + LocaleKeys.sideBar_singleFileProPlanLimitationDescription.tr(), + confirmLabel: + LocaleKeys.settings_comparePlanDialog_actions_upgrade.tr(), + onConfirm: () { + WidgetsBinding.instance.addPostFrameCallback( + (_) => _handleOnTap(context, SubscriptionPlanPB.Pro), + ); + }, + ); + void _handleOnTap(BuildContext context, SubscriptionPlanPB plan) { final userProfile = context.read().state.userProfile; if (userProfile == null) { diff --git a/frontend/resources/translations/en.json b/frontend/resources/translations/en.json index 7f62a25bda..43a73cb95d 100644 --- a/frontend/resources/translations/en.json +++ b/frontend/resources/translations/en.json @@ -338,6 +338,7 @@ "askOwnerToUpgradeToAIMax": "Your workspace has ran out of free AI responses. Please ask your workspace owner to upgrade the plan or purchase AI add-ons", "askOwnerToUpgradeToAIMaxIOS": "Your workspace is running out of free AI responses.", "purchaseStorageSpace": "Purchase Storage Space", + "singleFileProPlanLimitationDescription": "You need to upgrade to Pro Plan to upload large file", "purchaseAIResponse": "Purchase ", "askOwnerToUpgradeToLocalAI": "Ask workspace owner to enable AI On-device", "upgradeToAILocal": "Run local models on your device for ultimate privacy", @@ -2864,4 +2865,4 @@ "unfavorite": "Unfavorite", "favoriteDisabledHint": "Cannot favorite this view" } -} +} \ No newline at end of file diff --git a/frontend/rust-lib/flowy-document/tests/document/util.rs b/frontend/rust-lib/flowy-document/tests/document/util.rs index 70ce0cf967..288233164b 100644 --- a/frontend/rust-lib/flowy-document/tests/document/util.rs +++ b/frontend/rust-lib/flowy-document/tests/document/util.rs @@ -21,7 +21,6 @@ use flowy_document::entities::{DocumentSnapshotData, DocumentSnapshotMeta}; use flowy_document::manager::{DocumentManager, DocumentSnapshotService, DocumentUserService}; use flowy_document_pub::cloud::*; use flowy_error::{ErrorCode, FlowyError, FlowyResult}; -use flowy_storage_pub::chunked_byte::ChunkedBytes; use flowy_storage_pub::storage::{CreatedUpload, FileProgressReceiver, StorageService}; use lib_infra::async_trait::async_trait; use lib_infra::box_any::BoxAny; @@ -205,7 +204,7 @@ impl StorageService for DocumentTestFileStorageService { todo!() } - async fn start_upload(&self, _chunks: ChunkedBytes, _record: &BoxAny) -> Result<(), FlowyError> { + async fn start_upload(&self, _record: &BoxAny) -> Result<(), FlowyError> { todo!() } diff --git a/frontend/rust-lib/flowy-error/src/code.rs b/frontend/rust-lib/flowy-error/src/code.rs index 9b13f235fa..ecdaaa0336 100644 --- a/frontend/rust-lib/flowy-error/src/code.rs +++ b/frontend/rust-lib/flowy-error/src/code.rs @@ -287,7 +287,7 @@ pub enum ErrorCode { #[error("Local AI unavailable")] LocalAIUnavailable = 99, - #[error("File storage limit exceeded")] + #[error("Storage limit exceeded")] FileStorageLimitExceeded = 100, #[error("AI Response limit exceeded")] diff --git a/frontend/rust-lib/flowy-error/src/errors.rs b/frontend/rust-lib/flowy-error/src/errors.rs index 581eb884fb..f176f0fccd 100644 --- a/frontend/rust-lib/flowy-error/src/errors.rs +++ b/frontend/rust-lib/flowy-error/src/errors.rs @@ -74,7 +74,17 @@ impl FlowyError { pub fn is_file_limit_exceeded(&self) -> bool { self.code == ErrorCode::FileStorageLimitExceeded - || self.code == ErrorCode::SingleUploadLimitExceeded + } + + pub fn is_single_file_limit_exceeded(&self) -> bool { + self.code == ErrorCode::SingleUploadLimitExceeded + } + + pub fn should_retry_upload(&self) -> bool { + !matches!( + self.code, + ErrorCode::FileStorageLimitExceeded | ErrorCode::SingleUploadLimitExceeded + ) } pub fn is_ai_response_limit_exceeded(&self) -> bool { diff --git a/frontend/rust-lib/flowy-storage-pub/src/chunked_byte.rs b/frontend/rust-lib/flowy-storage-pub/src/chunked_byte.rs index 8614fb4489..a7ab12d49f 100644 --- a/frontend/rust-lib/flowy-storage-pub/src/chunked_byte.rs +++ b/frontend/rust-lib/flowy-storage-pub/src/chunked_byte.rs @@ -1,27 +1,95 @@ use anyhow::anyhow; use bytes::Bytes; use std::fmt::Display; -use std::ops::Deref; use std::path::Path; - +use tokio::fs::File; use tokio::io::AsyncReadExt; +use tokio::io::SeekFrom; +use tokio::io::{self, AsyncSeekExt}; /// In Amazon S3, the minimum chunk size for multipart uploads is 5 MB,except for the last part, /// which can be smaller.(https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html) pub const MIN_CHUNK_SIZE: usize = 5 * 1024 * 1024; // Minimum Chunk Size 5 MB -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct ChunkedBytes { - pub data: Bytes, - pub chunk_size: i32, - pub offsets: Vec<(usize, usize)>, - pub current_offset: i32, + file: File, + chunk_size: usize, + file_size: u64, + current_offset: u64, } -impl Deref for ChunkedBytes { - type Target = Bytes; +impl ChunkedBytes { + /// Create a `ChunkedBytes` instance from a file. + pub async fn from_file>( + file_path: P, + chunk_size: usize, + ) -> Result { + if chunk_size < MIN_CHUNK_SIZE { + return Err(anyhow!( + "Chunk size should be greater than or equal to {} bytes", + MIN_CHUNK_SIZE + )); + } - fn deref(&self) -> &Self::Target { - &self.data + let file = File::open(file_path).await?; + let file_size = file.metadata().await?.len(); + + Ok(ChunkedBytes { + file, + chunk_size, + file_size, + current_offset: 0, + }) + } + + /// Read the next chunk from the file. + pub async fn next_chunk(&mut self) -> Option> { + if self.current_offset >= self.file_size { + return None; // End of file + } + + let mut buffer = vec![0u8; self.chunk_size]; + let mut total_bytes_read = 0; + + // Loop to ensure the buffer is filled or EOF is reached + while total_bytes_read < self.chunk_size { + let read_result = self.file.read(&mut buffer[total_bytes_read..]).await; + match read_result { + Ok(0) => break, // EOF + Ok(n) => total_bytes_read += n, + Err(e) => return Some(Err(e)), + } + } + + if total_bytes_read == 0 { + return None; // EOF + } + + self.current_offset += total_bytes_read as u64; + Some(Ok(Bytes::from(buffer[..total_bytes_read].to_vec()))) + } + + /// Set the offset for the next chunk to be read. + pub async fn set_offset(&mut self, offset: u64) -> Result<(), io::Error> { + if offset > self.file_size { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Offset out of range", + )); + } + self.current_offset = offset; + self.file.seek(SeekFrom::Start(offset)).await?; + Ok(()) + } + + /// Get the total number of chunks in the file. + pub fn total_chunks(&self) -> usize { + ((self.file_size + self.chunk_size as u64 - 1) / self.chunk_size as u64) as usize + } + + /// Get the current offset in the file. + pub fn current_offset(&self) -> u64 { + self.current_offset } } @@ -29,215 +97,321 @@ impl Display for ChunkedBytes { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "data:{}, chunk_size:{}, num chunk:{}, offset:{}", - self.data.len(), + "file_size: {}, chunk_size: {}, total_chunks: {}, current_offset: {}", + self.file_size, self.chunk_size, - self.offsets.len(), + self.total_chunks(), self.current_offset ) } } -impl ChunkedBytes { - pub fn from_bytes_with_chunk_size(data: Bytes, chunk_size: i32) -> Result { - if chunk_size < MIN_CHUNK_SIZE as i32 { - return Err(anyhow!( - "Chunk size should be greater than or equal to {}", - MIN_CHUNK_SIZE - )); - } - - let offsets = split_into_chunks(&data, chunk_size as usize); - Ok(ChunkedBytes { - data, - offsets, - chunk_size, - current_offset: 0, - }) - } - - /// Used to create a `ChunkedBytes` from a `Bytes` object. The default chunk size is 5 MB. - pub fn from_bytes(data: Bytes) -> Self { - let chunk_size = MIN_CHUNK_SIZE as i32; - let offsets = split_into_chunks(&data, MIN_CHUNK_SIZE); - ChunkedBytes { - data, - offsets, - chunk_size, - current_offset: 0, - } - } - - pub async fn from_file>( - file_path: P, - chunk_size: i32, - ) -> Result { - let mut file = tokio::fs::File::open(file_path).await?; - let mut buffer = Vec::new(); - file.read_to_end(&mut buffer).await?; - let data = Bytes::from(buffer); - - let offsets = split_into_chunks(&data, chunk_size as usize); - Ok(ChunkedBytes { - data, - offsets, - chunk_size, - current_offset: 0, - }) - } - - pub fn set_current_offset(&mut self, offset: i32) { - self.current_offset = offset; - } - - pub fn iter(&self) -> ChunkedBytesIterator { - ChunkedBytesIterator { - chunked_data: self, - current_index: self.current_offset as usize, - } - } -} - -pub struct ChunkedBytesIterator<'a> { - chunked_data: &'a ChunkedBytes, - current_index: usize, -} -impl<'a> Iterator for ChunkedBytesIterator<'a> { - type Item = Bytes; - - fn next(&mut self) -> Option { - if self.current_index >= self.chunked_data.offsets.len() { - None - } else { - let (start, end) = self.chunked_data.offsets[self.current_index]; - self.current_index += 1; - Some(self.chunked_data.data.slice(start..end)) - } - } -} // Function to split input bytes into several chunks and return offsets pub fn split_into_chunks(data: &Bytes, chunk_size: usize) -> Vec<(usize, usize)> { + calculate_offsets(data.len(), chunk_size) +} + +pub fn calculate_offsets(data_len: usize, chunk_size: usize) -> Vec<(usize, usize)> { let mut offsets = Vec::new(); let mut start = 0; - while start < data.len() { - let end = std::cmp::min(start + chunk_size, data.len()); + while start < data_len { + let end = std::cmp::min(start + chunk_size, data_len); offsets.push((start, end)); start = end; } + offsets } -// Function to get chunk data using chunk number -pub async fn get_chunk( - data: Bytes, - chunk_number: usize, - offsets: &[(usize, usize)], -) -> Result { - if chunk_number >= offsets.len() { - return Err(anyhow!("Chunk number out of range")); - } - - let (start, end) = offsets[chunk_number]; - let chunk = data.slice(start..end); - - Ok(chunk) -} - #[cfg(test)] mod tests { - use crate::chunked_byte::{ChunkedBytes, MIN_CHUNK_SIZE}; - use bytes::Bytes; + use super::*; use std::env::temp_dir; use tokio::io::AsyncWriteExt; #[tokio::test] - async fn test_chunked_bytes_less_than_chunk_size() { - let data = Bytes::from(vec![0; 1024 * 1024]); // 1 MB of zeroes - let chunked_data = - ChunkedBytes::from_bytes_with_chunk_size(data.clone(), MIN_CHUNK_SIZE as i32).unwrap(); - - // Check if the offsets are correct - assert_eq!(chunked_data.offsets.len(), 1); // Should have 1 chunk - assert_eq!(chunked_data.offsets[0], (0, 1024 * 1024)); - - // Check if the data can be iterated correctly - let mut iter = chunked_data.iter(); - assert_eq!(iter.next().unwrap().len(), 1024 * 1024); - assert!(iter.next().is_none()); - } - - #[tokio::test] - async fn test_chunked_bytes_from_bytes() { - let data = Bytes::from(vec![0; 15 * 1024 * 1024]); // 15 MB of zeroes - let chunked_data = - ChunkedBytes::from_bytes_with_chunk_size(data.clone(), MIN_CHUNK_SIZE as i32).unwrap(); - - // Check if the offsets are correct - assert_eq!(chunked_data.offsets.len(), 3); // Should have 3 chunks - assert_eq!(chunked_data.offsets[0], (0, 5 * 1024 * 1024)); - assert_eq!(chunked_data.offsets[1], (5 * 1024 * 1024, 10 * 1024 * 1024)); - assert_eq!( - chunked_data.offsets[2], - (10 * 1024 * 1024, 15 * 1024 * 1024) - ); - - // Check if the data can be iterated correctly - let mut iter = chunked_data.iter(); - assert_eq!(iter.next().unwrap().len(), 5 * 1024 * 1024); - assert_eq!(iter.next().unwrap().len(), 5 * 1024 * 1024); - assert_eq!(iter.next().unwrap().len(), 5 * 1024 * 1024); - assert!(iter.next().is_none()); - } - - #[tokio::test] - async fn test_chunked_bytes_from_file() { - // Create a temporary file with 15 MB of zeroes + async fn test_chunked_bytes_small_file() { + // Create a small file of 1 MB let mut file_path = temp_dir(); - file_path.push("test_file"); + file_path.push("test_small_file"); - let mut file = tokio::fs::File::create(&file_path).await.unwrap(); - file.write_all(&vec![0; 15 * 1024 * 1024]).await.unwrap(); + let mut file = File::create(&file_path).await.unwrap(); + file.write_all(&vec![0; 1 * 1024 * 1024]).await.unwrap(); // 1 MB file.flush().await.unwrap(); - // Read the file into ChunkedBytes - let chunked_data = ChunkedBytes::from_file(&file_path, MIN_CHUNK_SIZE as i32) + // Create ChunkedBytes instance + let mut chunked_bytes = ChunkedBytes::from_file(&file_path, MIN_CHUNK_SIZE) .await .unwrap(); - // Check if the offsets are correct - assert_eq!(chunked_data.offsets.len(), 3); // Should have 3 chunks - assert_eq!(chunked_data.offsets[0], (0, 5 * 1024 * 1024)); - assert_eq!(chunked_data.offsets[1], (5 * 1024 * 1024, 10 * 1024 * 1024)); - assert_eq!( - chunked_data.offsets[2], - (10 * 1024 * 1024, 15 * 1024 * 1024) - ); + // Validate total chunks and read the data + assert_eq!(chunked_bytes.total_chunks(), 1); // Only 1 chunk due to file size + let chunk = chunked_bytes.next_chunk().await.unwrap().unwrap(); + assert_eq!(chunk.len(), 1 * 1024 * 1024); // The full 1 MB - // Check if the data can be iterated correctly - let mut iter = chunked_data.iter(); - assert_eq!(iter.next().unwrap().len(), 5 * 1024 * 1024); - assert_eq!(iter.next().unwrap().len(), 5 * 1024 * 1024); - assert_eq!(iter.next().unwrap().len(), 5 * 1024 * 1024); - assert!(iter.next().is_none()); + // Ensure no more chunks are available + assert!(chunked_bytes.next_chunk().await.is_none()); - // Clean up the temporary file tokio::fs::remove_file(file_path).await.unwrap(); } #[tokio::test] - async fn test_chunked_bytes_with_current_offset() { - let data = Bytes::from(vec![0; 15 * 1024 * 1024]); // 15 MB of zeroes - let mut chunked_data = - ChunkedBytes::from_bytes_with_chunk_size(data.clone(), MIN_CHUNK_SIZE as i32).unwrap(); + async fn test_chunked_bytes_large_file() { + // Create a large file of 15 MB + let mut file_path = temp_dir(); + file_path.push("test_large_file"); - // Set the current offset to the second chunk - chunked_data.set_current_offset(1); + let mut file = File::create(&file_path).await.unwrap(); + file.write_all(&vec![0; 15 * 1024 * 1024]).await.unwrap(); // 15 MB + file.flush().await.unwrap(); - // Check if the iterator starts from the second chunk - let mut iter = chunked_data.iter(); - assert_eq!(iter.next().unwrap().len(), 5 * 1024 * 1024); // Second chunk - assert_eq!(iter.next().unwrap().len(), 5 * 1024 * 1024); // Third chunk - assert!(iter.next().is_none()); + // Create ChunkedBytes instance + let mut chunked_bytes = ChunkedBytes::from_file(&file_path, MIN_CHUNK_SIZE) + .await + .unwrap(); + + // Validate total chunks + assert_eq!(chunked_bytes.total_chunks(), 3); // 15 MB split into 3 chunks of 5 MB + + // Read and validate all chunks + let mut chunk_sizes = vec![]; + while let Some(chunk_result) = chunked_bytes.next_chunk().await { + chunk_sizes.push(chunk_result.unwrap().len()); + } + assert_eq!( + chunk_sizes, + vec![5 * 1024 * 1024, 5 * 1024 * 1024, 5 * 1024 * 1024] + ); + + tokio::fs::remove_file(file_path).await.unwrap(); + } + + #[tokio::test] + async fn test_set_offset() { + // Create a file of 10 MB + let mut file_path = temp_dir(); + file_path.push("test_offset_file"); + + let mut file = File::create(&file_path).await.unwrap(); + file.write_all(&vec![0; 10 * 1024 * 1024]).await.unwrap(); // 10 MB + file.flush().await.unwrap(); + + // Create ChunkedBytes instance + let mut chunked_bytes = ChunkedBytes::from_file(&file_path, MIN_CHUNK_SIZE) + .await + .unwrap(); + + // Set the offset to 5 MB and read the next chunk + chunked_bytes.set_offset(5 * 1024 * 1024).await.unwrap(); + let chunk = chunked_bytes.next_chunk().await.unwrap().unwrap(); + assert_eq!(chunk.len(), 5 * 1024 * 1024); // Read the second chunk + + // Ensure no more chunks are available + assert!(chunked_bytes.next_chunk().await.is_none()); + + tokio::fs::remove_file(file_path).await.unwrap(); + } + + #[tokio::test] + async fn test_partial_chunk() { + // Create a file of 6 MB (one full chunk and one partial chunk) + let mut file_path = temp_dir(); + file_path.push("test_partial_chunk_file"); + + let mut file = File::create(&file_path).await.unwrap(); + file.write_all(&vec![0; 6 * 1024 * 1024]).await.unwrap(); // 6 MB + file.flush().await.unwrap(); + + // Create ChunkedBytes instance + let mut chunked_bytes = ChunkedBytes::from_file(&file_path, MIN_CHUNK_SIZE) + .await + .unwrap(); + + // Validate total chunks + assert_eq!(chunked_bytes.total_chunks(), 2); // 6 MB split into 1 full chunk and 1 partial chunk + + // Read the first chunk + let chunk1 = chunked_bytes.next_chunk().await.unwrap().unwrap(); + assert_eq!(chunk1.len(), 5 * 1024 * 1024); // Full chunk + + // Read the second chunk + let chunk2 = chunked_bytes.next_chunk().await.unwrap().unwrap(); + assert_eq!(chunk2.len(), 1 * 1024 * 1024); // Partial chunk + + // Ensure no more chunks are available + assert!(chunked_bytes.next_chunk().await.is_none()); + + tokio::fs::remove_file(file_path).await.unwrap(); + } + + #[tokio::test] + async fn test_invalid_offset() { + // Create a file of 5 MB + let mut file_path = temp_dir(); + file_path.push("test_invalid_offset_file"); + + let mut file = File::create(&file_path).await.unwrap(); + file.write_all(&vec![0; 5 * 1024 * 1024]).await.unwrap(); // 5 MB + file.flush().await.unwrap(); + + // Create ChunkedBytes instance + let mut chunked_bytes = ChunkedBytes::from_file(&file_path, MIN_CHUNK_SIZE) + .await + .unwrap(); + + // Try setting an invalid offset + let result = chunked_bytes.set_offset(10 * 1024 * 1024).await; + assert!(result.is_err()); // Offset out of range + + tokio::fs::remove_file(file_path).await.unwrap(); + } + + #[tokio::test] + async fn test_exact_multiple_chunk_file() { + // Create a file of 10 MB (exact multiple of 5 MB) + let mut file_path = temp_dir(); + file_path.push("test_exact_multiple_chunk_file"); + + let mut file = File::create(&file_path).await.unwrap(); + file.write_all(&vec![0; 10 * 1024 * 1024]).await.unwrap(); // 10 MB + file.flush().await.unwrap(); + + // Create ChunkedBytes instance + let mut chunked_bytes = ChunkedBytes::from_file(&file_path, MIN_CHUNK_SIZE) + .await + .unwrap(); + + // Validate total chunks + let expected_offsets = calculate_offsets(10 * 1024 * 1024, MIN_CHUNK_SIZE); + assert_eq!(chunked_bytes.total_chunks(), expected_offsets.len()); // 2 chunks + + // Read and validate all chunks + let mut chunk_sizes = vec![]; + while let Some(chunk_result) = chunked_bytes.next_chunk().await { + chunk_sizes.push(chunk_result.unwrap().len()); + } + assert_eq!(chunk_sizes, vec![5 * 1024 * 1024, 5 * 1024 * 1024]); // 2 full chunks + + tokio::fs::remove_file(file_path).await.unwrap(); + } + + #[tokio::test] + async fn test_small_file_less_than_chunk_size() { + // Create a file of 2 MB (smaller than 5 MB) + let mut file_path = temp_dir(); + file_path.push("test_small_file_less_than_chunk_size"); + + let mut file = File::create(&file_path).await.unwrap(); + file.write_all(&vec![0; 2 * 1024 * 1024]).await.unwrap(); // 2 MB + file.flush().await.unwrap(); + + // Create ChunkedBytes instance + let mut chunked_bytes = ChunkedBytes::from_file(&file_path, MIN_CHUNK_SIZE) + .await + .unwrap(); + + // Validate total chunks + let expected_offsets = calculate_offsets(2 * 1024 * 1024, MIN_CHUNK_SIZE); + assert_eq!(chunked_bytes.total_chunks(), expected_offsets.len()); // 1 chunk + + // Read and validate all chunks + let mut chunk_sizes = vec![]; + while let Some(chunk_result) = chunked_bytes.next_chunk().await { + chunk_sizes.push(chunk_result.unwrap().len()); + } + assert_eq!(chunk_sizes, vec![2 * 1024 * 1024]); // 1 partial chunk + + tokio::fs::remove_file(file_path).await.unwrap(); + } + + #[tokio::test] + async fn test_file_slightly_larger_than_chunk_size() { + // Create a file of 5.5 MB (slightly larger than 1 chunk) + let mut file_path = temp_dir(); + file_path.push("test_file_slightly_larger_than_chunk_size"); + + let mut file = File::create(&file_path).await.unwrap(); + file + .write_all(&vec![0; 5 * 1024 * 1024 + 512 * 1024]) + .await + .unwrap(); // 5.5 MB + file.flush().await.unwrap(); + + // Create ChunkedBytes instance + let mut chunked_bytes = ChunkedBytes::from_file(&file_path, MIN_CHUNK_SIZE) + .await + .unwrap(); + + // Validate total chunks + let expected_offsets = calculate_offsets(5 * 1024 * 1024 + 512 * 1024, MIN_CHUNK_SIZE); + assert_eq!(chunked_bytes.total_chunks(), expected_offsets.len()); // 2 chunks + + // Read and validate all chunks + let mut chunk_sizes = vec![]; + while let Some(chunk_result) = chunked_bytes.next_chunk().await { + chunk_sizes.push(chunk_result.unwrap().len()); + } + assert_eq!(chunk_sizes, vec![5 * 1024 * 1024, 512 * 1024]); // 1 full chunk, 1 partial chunk + + tokio::fs::remove_file(file_path).await.unwrap(); + } + + #[tokio::test] + async fn test_large_file_with_many_chunks() { + // Create a file of 50 MB (10 chunks of 5 MB) + let mut file_path = temp_dir(); + file_path.push("test_large_file_with_many_chunks"); + + let mut file = File::create(&file_path).await.unwrap(); + file.write_all(&vec![0; 50 * 1024 * 1024]).await.unwrap(); // 50 MB + file.flush().await.unwrap(); + + // Create ChunkedBytes instance + let mut chunked_bytes = ChunkedBytes::from_file(&file_path, MIN_CHUNK_SIZE) + .await + .unwrap(); + + // Validate total chunks + let expected_offsets = calculate_offsets(50 * 1024 * 1024, MIN_CHUNK_SIZE); + assert_eq!(chunked_bytes.total_chunks(), expected_offsets.len()); // 10 chunks + + // Read and validate all chunks + let mut chunk_sizes = vec![]; + while let Some(chunk_result) = chunked_bytes.next_chunk().await { + chunk_sizes.push(chunk_result.unwrap().len()); + } + assert_eq!(chunk_sizes, vec![5 * 1024 * 1024; 10]); // 10 full chunks + + tokio::fs::remove_file(file_path).await.unwrap(); + } + + #[tokio::test] + async fn test_file_with_exact_chunk_size() { + // Create a file of exactly 5 MB + let mut file_path = temp_dir(); + file_path.push("test_file_with_exact_chunk_size"); + + let mut file = File::create(&file_path).await.unwrap(); + file.write_all(&vec![0; 5 * 1024 * 1024]).await.unwrap(); // 5 MB + file.flush().await.unwrap(); + + // Create ChunkedBytes instance + let mut chunked_bytes = ChunkedBytes::from_file(&file_path, MIN_CHUNK_SIZE) + .await + .unwrap(); + + // Validate total chunks + let expected_offsets = calculate_offsets(5 * 1024 * 1024, MIN_CHUNK_SIZE); + assert_eq!(chunked_bytes.total_chunks(), expected_offsets.len()); // 1 chunk + + // Read and validate all chunks + let mut chunk_sizes = vec![]; + while let Some(chunk_result) = chunked_bytes.next_chunk().await { + chunk_sizes.push(chunk_result.unwrap().len()); + } + assert_eq!(chunk_sizes, vec![5 * 1024 * 1024]); // 1 full chunk + + tokio::fs::remove_file(file_path).await.unwrap(); } } diff --git a/frontend/rust-lib/flowy-storage-pub/src/storage.rs b/frontend/rust-lib/flowy-storage-pub/src/storage.rs index 25083535c7..3d344e957e 100644 --- a/frontend/rust-lib/flowy-storage-pub/src/storage.rs +++ b/frontend/rust-lib/flowy-storage-pub/src/storage.rs @@ -1,4 +1,3 @@ -use crate::chunked_byte::ChunkedBytes; use async_trait::async_trait; pub use client_api_entity::{CompletedPartRequest, CreateUploadResponse, UploadPartResponse}; use flowy_error::{FlowyError, FlowyResult}; @@ -22,7 +21,7 @@ pub trait StorageService: Send + Sync { upload_immediately: bool, ) -> Result<(CreatedUpload, Option), FlowyError>; - async fn start_upload(&self, chunks: ChunkedBytes, record: &BoxAny) -> Result<(), FlowyError>; + async fn start_upload(&self, record: &BoxAny) -> Result<(), FlowyError>; async fn resume_upload( &self, diff --git a/frontend/rust-lib/flowy-storage/src/file_cache.rs b/frontend/rust-lib/flowy-storage/src/file_cache.rs index 106d2824ee..2a71c5f72f 100644 --- a/frontend/rust-lib/flowy-storage/src/file_cache.rs +++ b/frontend/rust-lib/flowy-storage/src/file_cache.rs @@ -27,6 +27,7 @@ impl FileTempStorage { } /// Creates a temporary file from an existing local file path. + #[allow(dead_code)] pub async fn create_temp_file_from_existing( &self, existing_file_path: &Path, diff --git a/frontend/rust-lib/flowy-storage/src/manager.rs b/frontend/rust-lib/flowy-storage/src/manager.rs index 46be1a4d66..9ee50352a3 100644 --- a/frontend/rust-lib/flowy-storage/src/manager.rs +++ b/frontend/rust-lib/flowy-storage/src/manager.rs @@ -13,7 +13,7 @@ use collab_importer::util::FileId; use dashmap::DashMap; use flowy_error::{ErrorCode, FlowyError, FlowyResult}; use flowy_sqlite::DBConnection; -use flowy_storage_pub::chunked_byte::{ChunkedBytes, MIN_CHUNK_SIZE}; +use flowy_storage_pub::chunked_byte::{calculate_offsets, ChunkedBytes, MIN_CHUNK_SIZE}; use flowy_storage_pub::cloud::StorageCloudService; use flowy_storage_pub::storage::{ CompletedPartRequest, CreatedUpload, FileProgress, FileProgressReceiver, FileUploadState, @@ -22,7 +22,6 @@ use flowy_storage_pub::storage::{ use lib_infra::box_any::BoxAny; use lib_infra::isolate_stream::{IsolateSink, SinkExt}; use lib_infra::util::timestamp; -use std::io::ErrorKind; use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicBool; use std::sync::Arc; @@ -331,18 +330,20 @@ impl StorageService for StorageServiceImpl { return Err(FlowyError::file_storage_limit()); } - let local_file_path = self - .temp_storage - .create_temp_file_from_existing(Path::new(&file_path)) - .await - .map_err(|err| { - error!("[File] create temp file failed: {}", err); - FlowyError::internal() - .with_context(format!("create temp file for upload file failed: {}", err)) - })?; + let local_file_path = file_path; + // skip copy the file, upload from source + // let local_file_path = self + // .temp_storage + // .create_temp_file_from_existing(Path::new(&file_path)) + // .await + // .map_err(|err| { + // error!("[File] create temp file failed: {}", err); + // FlowyError::internal() + // .with_context(format!("create temp file for upload file failed: {}", err)) + // })?; // 1. create a file record and chunk the file - let (chunks, record) = create_upload_record(workspace_id, parent_dir, local_file_path).await?; + let record = create_upload_record(workspace_id, parent_dir, local_file_path.clone()).await?; // 2. save the record to sqlite let conn = self .user_service @@ -359,7 +360,7 @@ impl StorageService for StorageServiceImpl { self .task_queue .queue_task(UploadTask::ImmediateTask { - chunks, + local_file_path, record, retry_count: 3, }) @@ -368,7 +369,7 @@ impl StorageService for StorageServiceImpl { self .task_queue .queue_task(UploadTask::Task { - chunks, + local_file_path, record, retry_count: 0, }) @@ -393,7 +394,7 @@ impl StorageService for StorageServiceImpl { } } - async fn start_upload(&self, chunks: ChunkedBytes, record: &BoxAny) -> Result<(), FlowyError> { + async fn start_upload(&self, record: &BoxAny) -> Result<(), FlowyError> { let file_record = record.downcast_ref::().ok_or_else(|| { FlowyError::internal().with_context("failed to downcast record to UploadFileTable") })?; @@ -402,7 +403,6 @@ impl StorageService for StorageServiceImpl { &self.cloud_service, &self.user_service, &self.temp_storage, - chunks, file_record, self.global_notifier.clone(), ) @@ -468,18 +468,18 @@ async fn create_upload_record( workspace_id: String, parent_dir: String, local_file_path: String, -) -> FlowyResult<(ChunkedBytes, UploadFileTable)> { - // read file and chunk it base on CHUNK_SIZE. We use MIN_CHUNK_SIZE as the minimum chunk size - let chunked_bytes = ChunkedBytes::from_file(&local_file_path, MIN_CHUNK_SIZE as i32).await?; - let ext = Path::new(&local_file_path) - .extension() - .and_then(std::ffi::OsStr::to_str) - .unwrap_or("") - .to_owned(); - let content_type = mime_guess::from_path(&local_file_path) +) -> FlowyResult { + let file_path = Path::new(&local_file_path); + let file = tokio::fs::File::open(&file_path).await?; + let metadata = file.metadata().await?; + let file_size = metadata.len() as usize; + + // Calculate the total number of chunks + let num_chunk = calculate_offsets(file_size, MIN_CHUNK_SIZE).len(); + let content_type = mime_guess::from_path(&file_path) .first_or_octet_stream() .to_string(); - let file_id = FileId::from_bytes(&chunked_bytes.data, ext); + let file_id = FileId::from_path(&file_path.to_path_buf()).await?; let record = UploadFileTable { workspace_id, file_id, @@ -488,12 +488,12 @@ async fn create_upload_record( parent_dir, local_file_path, content_type, - chunk_size: chunked_bytes.chunk_size, - num_chunk: chunked_bytes.offsets.len() as i32, + chunk_size: MIN_CHUNK_SIZE as i32, + num_chunk: num_chunk as i32, created_at: timestamp(), is_finish: false, }; - Ok((chunked_bytes, record)) + Ok(record) } #[instrument(level = "debug", skip_all, err)] @@ -501,7 +501,6 @@ async fn start_upload( cloud_service: &Arc, user_service: &Arc, temp_storage: &Arc, - mut chunked_bytes: ChunkedBytes, upload_file: &UploadFileTable, global_notifier: GlobalNotifier, ) -> FlowyResult<()> { @@ -515,10 +514,32 @@ async fn start_upload( part_number: part.part_num, }) .collect::>(); + let upload_offset = completed_parts.len() as u64; - let upload_offset = completed_parts.len() as i32; - let total_parts = chunked_bytes.iter().count(); - chunked_bytes.set_current_offset(upload_offset); + let file_path = Path::new(&upload_file.local_file_path); + if !file_path.exists() { + error!("[File] file not found: {}", upload_file.local_file_path); + if let Ok(uid) = user_service.user_id() { + if let Ok(conn) = user_service.sqlite_connection(uid) { + delete_upload_file(conn, &upload_file.upload_id)?; + } + } + } + + let mut chunked_bytes = + ChunkedBytes::from_file(&upload_file.local_file_path, MIN_CHUNK_SIZE).await?; + let total_parts = chunked_bytes.total_chunks(); + if let Err(err) = chunked_bytes.set_offset(upload_offset).await { + error!( + "[File] set offset failed: {} for file: {}", + err, upload_file.local_file_path + ); + if let Ok(uid) = user_service.user_id() { + if let Ok(conn) = user_service.sqlite_connection(uid) { + delete_upload_file(conn, &upload_file.upload_id)?; + } + } + } info!( "[File] start upload: workspace: {}, parent_dir: {}, file_id: {}, chunk: {}", @@ -543,11 +564,7 @@ async fn start_upload( ) .await; if let Err(err) = create_upload_resp_result.as_ref() { - if err.is_file_limit_exceeded() { - make_notification(StorageNotification::FileStorageLimitExceeded) - .payload(err.clone()) - .send(); - } + handle_upload_error(user_service, &err, &upload_file.upload_id); } let create_upload_resp = create_upload_resp_result?; @@ -572,74 +589,90 @@ async fn start_upload( info!( "[File] {} start uploading parts:{}, offset:{}", upload_file.file_id, - chunked_bytes.iter().count(), + chunked_bytes.total_chunks(), upload_offset, ); - let iter = chunked_bytes.iter().enumerate(); - for (index, chunk_bytes) in iter { - let part_number = upload_offset + index as i32 + 1; - info!( - "[File] {} uploading {}th part, size:{}KB", - upload_file.file_id, - part_number, - chunk_bytes.len() / 1000, - ); - let file_url = cloud_service - .get_object_url_v1( - &upload_file.workspace_id, - &upload_file.parent_dir, - &upload_file.file_id, - ) - .await?; - // start uploading parts - match upload_part( - cloud_service, - user_service, - &upload_file.workspace_id, - &upload_file.parent_dir, - &upload_file.upload_id, - &upload_file.file_id, - part_number as i32, - chunk_bytes.to_vec(), - ) - .await - { - Ok(resp) => { - let mut progress_value = (part_number as f64 / total_parts as f64).clamp(0.0, 1.0); - // The 0.1 is reserved for the complete_upload progress - if progress_value >= 0.9 { - progress_value = 0.9; + let mut part_number = upload_offset + 1; + while let Some(chunk_result) = chunked_bytes.next_chunk().await { + match chunk_result { + Ok(chunk_bytes) => { + info!( + "[File] {} uploading {}th part, size:{}KB", + upload_file.file_id, + part_number, + chunk_bytes.len() / 1000, + ); + + let file_url = cloud_service + .get_object_url_v1( + &upload_file.workspace_id, + &upload_file.parent_dir, + &upload_file.file_id, + ) + .await?; + // start uploading parts + match upload_part( + cloud_service, + user_service, + &upload_file.workspace_id, + &upload_file.parent_dir, + &upload_file.upload_id, + &upload_file.file_id, + part_number as i32, + chunk_bytes.to_vec(), + ) + .await + { + Ok(resp) => { + trace!( + "[File] {} part {} uploaded", + upload_file.file_id, + part_number + ); + let mut progress_value = (part_number as f64 / total_parts as f64).clamp(0.0, 1.0); + // The 0.1 is reserved for the complete_upload progress + if progress_value >= 0.9 { + progress_value = 0.9; + } + let progress = + FileProgress::new_progress(file_url, upload_file.file_id.clone(), progress_value); + trace!("[File] upload progress: {}", progress); + + if let Err(err) = global_notifier.send(progress) { + error!("[File] send global notifier failed: {}", err); + } + + // gather completed part + completed_parts.push(CompletedPartRequest { + e_tag: resp.e_tag, + part_number: resp.part_num, + }); + }, + Err(err) => { + error!( + "[File] {} failed to upload part: {}", + upload_file.file_id, err + ); + handle_upload_error(user_service, &err, &upload_file.upload_id); + if let Err(err) = global_notifier.send(FileProgress::new_error( + file_url, + upload_file.file_id.clone(), + err.msg.clone(), + )) { + error!("[File] send global notifier failed: {}", err); + } + return Err(err); + }, } - let progress = - FileProgress::new_progress(file_url, upload_file.file_id.clone(), progress_value); - trace!("[File] upload progress: {}", progress); - - if let Err(err) = global_notifier.send(progress) { - error!("[File] send global notifier failed: {}", err); - } - - // gather completed part - completed_parts.push(CompletedPartRequest { - e_tag: resp.e_tag, - part_number: resp.part_num, - }); + part_number += 1; // Increment part number }, - Err(err) => { - if err.is_file_limit_exceeded() { - make_notification(StorageNotification::FileStorageLimitExceeded) - .payload(err.clone()) - .send(); - } - - if let Err(err) = global_notifier.send(FileProgress::new_error( - file_url, - upload_file.file_id.clone(), - err.msg.clone(), - )) { - error!("[File] send global notifier failed: {}", err); - } - return Err(err); + Err(e) => { + error!( + "[File] {} failed to read chunk: {:?}", + upload_file.file_id, e + ); + break; }, } } @@ -655,18 +688,40 @@ async fn start_upload( ) .await; if let Err(err) = complete_upload_result { - if err.is_file_limit_exceeded() { - make_notification(StorageNotification::FileStorageLimitExceeded) - .payload(err.clone()) - .send(); - } - + handle_upload_error(user_service, &err, &upload_file.upload_id); return Err(err); } Ok(()) } +fn handle_upload_error( + user_service: &Arc, + err: &FlowyError, + upload_id: &str, +) { + if err.is_file_limit_exceeded() { + make_notification(StorageNotification::FileStorageLimitExceeded) + .payload(err.clone()) + .send(); + } + + if err.is_single_file_limit_exceeded() { + info!("[File] file exceed limit:{}", upload_id); + if let Ok(user_id) = user_service.user_id() { + if let Ok(db_conn) = user_service.sqlite_connection(user_id) { + if let Err(err) = delete_upload_file(db_conn, upload_id) { + error!("[File] delete upload file:{} error:{}", upload_id, err); + } + } + } + + make_notification(StorageNotification::SingleFileLimitExceeded) + .payload(err.clone()) + .send(); + } +} + #[instrument(level = "debug", skip_all, err)] async fn resume_upload( cloud_service: &Arc, @@ -683,33 +738,15 @@ async fn resume_upload( upload_file.local_file_path ); - match ChunkedBytes::from_file(&upload_file.local_file_path, MIN_CHUNK_SIZE as i32).await { - Ok(chunked_bytes) => { - // When there were any parts already uploaded, skip those parts by setting the current offset. - start_upload( - cloud_service, - user_service, - temp_storage, - chunked_bytes, - &upload_file, - global_notifier, - ) - .await?; - }, - Err(err) => match err.kind() { - ErrorKind::NotFound => { - error!("[File] file not found: {}", upload_file.local_file_path); - if let Ok(uid) = user_service.user_id() { - if let Ok(conn) = user_service.sqlite_connection(uid) { - delete_upload_file(conn, &upload_file.upload_id)?; - } - } - }, - _ => { - error!("[File] read file failed: {}", err); - }, - }, - } + start_upload( + cloud_service, + user_service, + temp_storage, + &upload_file, + global_notifier, + ) + .await?; + Ok(()) } @@ -796,11 +833,12 @@ async fn complete_upload( let conn = user_service.sqlite_connection(user_service.user_id()?)?; update_upload_file_completed(conn, &upload_file.upload_id)?; + if let Err(err) = temp_storage .delete_temp_file(&upload_file.local_file_path) .await { - error!("[File] delete temp file failed: {}", err); + trace!("[File] delete temp file failed: {}", err); } }, Err(err) => { diff --git a/frontend/rust-lib/flowy-storage/src/notification.rs b/frontend/rust-lib/flowy-storage/src/notification.rs index a1b990bbd1..86af2d222c 100644 --- a/frontend/rust-lib/flowy-storage/src/notification.rs +++ b/frontend/rust-lib/flowy-storage/src/notification.rs @@ -7,6 +7,8 @@ const OBSERVABLE_SOURCE: &str = "storage"; pub(crate) enum StorageNotification { #[default] FileStorageLimitExceeded = 0, + + SingleFileLimitExceeded = 1, } impl std::convert::From for i32 { diff --git a/frontend/rust-lib/flowy-storage/src/uploader.rs b/frontend/rust-lib/flowy-storage/src/uploader.rs index e4d609bbe4..6c59ff521f 100644 --- a/frontend/rust-lib/flowy-storage/src/uploader.rs +++ b/frontend/rust-lib/flowy-storage/src/uploader.rs @@ -1,6 +1,5 @@ use crate::sqlite_sql::UploadFileTable; use crate::uploader::UploadTask::BackgroundTask; -use flowy_storage_pub::chunked_byte::ChunkedBytes; use flowy_storage_pub::storage::StorageService; use lib_infra::box_any::BoxAny; use std::cmp::Ordering; @@ -154,38 +153,34 @@ impl FileUploader { match task { UploadTask::ImmediateTask { - chunks, + local_file_path, record, mut retry_count, } | UploadTask::Task { - chunks, + local_file_path, record, mut retry_count, } => { let record = BoxAny::new(record); - if let Err(err) = self - .storage_service - .start_upload(chunks.clone(), &record) - .await - { + if let Err(err) = self.storage_service.start_upload(&record).await { if err.is_file_limit_exceeded() { - error!("[File] Failed to upload file: {}", err); self.disable_storage_write(); } - info!( - "[File] Failed to upload file: {}, retry_count:{}", - err, retry_count - ); - - let record = record.unbox_or_error().unwrap(); - retry_count += 1; - self.queue.tasks.write().await.push(UploadTask::Task { - chunks, - record, - retry_count, - }); + if err.should_retry_upload() { + info!( + "[File] Failed to upload file: {}, retry_count:{}", + err, retry_count + ); + let record = record.unbox_or_error().unwrap(); + retry_count += 1; + self.queue.tasks.write().await.push(UploadTask::Task { + local_file_path, + record, + retry_count, + }); + } } }, UploadTask::BackgroundTask { @@ -205,18 +200,20 @@ impl FileUploader { self.disable_storage_write(); } - info!( - "[File] failed to resume upload file: {}, retry_count:{}", - err, retry_count - ); - retry_count += 1; - self.queue.tasks.write().await.push(BackgroundTask { - workspace_id, - parent_dir, - file_id, - created_at, - retry_count, - }); + if err.should_retry_upload() { + info!( + "[File] failed to resume upload file: {}, retry_count:{}", + err, retry_count + ); + retry_count += 1; + self.queue.tasks.write().await.push(BackgroundTask { + workspace_id, + parent_dir, + file_id, + created_at, + retry_count, + }); + } } }, } @@ -246,6 +243,10 @@ impl FileUploaderRunner { if let Some(uploader) = weak_uploader.upgrade() { let value = notifier.borrow().clone(); + trace!( + "[File]: Uploader runner received signal, thread_id: {:?}", + std::thread::current().id() + ); match value { Signal::Stop => { info!("[File]:Uploader runner stopped, stop signal received"); @@ -273,12 +274,12 @@ impl FileUploaderRunner { pub enum UploadTask { ImmediateTask { - chunks: ChunkedBytes, + local_file_path: String, record: UploadFileTable, retry_count: u8, }, Task { - chunks: ChunkedBytes, + local_file_path: String, record: UploadFileTable, retry_count: u8, }, diff --git a/frontend/rust-lib/flowy-storage/tests/multiple_part_upload_test.rs b/frontend/rust-lib/flowy-storage/tests/multiple_part_upload_test.rs index af69fc65e6..b02f64fd97 100644 --- a/frontend/rust-lib/flowy-storage/tests/multiple_part_upload_test.rs +++ b/frontend/rust-lib/flowy-storage/tests/multiple_part_upload_test.rs @@ -153,7 +153,7 @@ pub async fn create_upload_file_record( local_file_path: String, ) -> UploadFileTable { // Create ChunkedBytes from file - let chunked_bytes = ChunkedBytes::from_file(&local_file_path, MIN_CHUNK_SIZE as i32) + let chunked_bytes = ChunkedBytes::from_file(&local_file_path, MIN_CHUNK_SIZE) .await .unwrap(); @@ -162,8 +162,17 @@ pub async fn create_upload_file_record( .first_or_octet_stream() .to_string(); + // let mut file_path = temp_dir(); + // file_path.push("test_large_file_with_many_chunks"); + // let mut file = File::create(&file_path).await.unwrap(); + // file.write_all(&vec![0; 50 * 1024 * 1024]).await.unwrap(); // 50 MB + // file.flush().await.unwrap(); + // Calculate file ID - let file_id = FileId::from_bytes(&chunked_bytes.data, "b".to_string()); + let file_id = FileId::from_path(&PathBuf::from(&local_file_path)) + .await + .unwrap(); + let num_chunk = chunked_bytes.total_chunks(); // Create UploadFileTable record UploadFileTable { @@ -174,7 +183,7 @@ pub async fn create_upload_file_record( local_file_path, content_type, chunk_size: MIN_CHUNK_SIZE as i32, - num_chunk: chunked_bytes.offsets.len() as i32, + num_chunk: num_chunk as i32, created_at: chrono::Utc::now().timestamp(), is_finish: false, }