mirror of
https://github.com/AppFlowy-IO/AppFlowy.git
synced 2025-08-20 06:40:42 +00:00
chore: fix upload file limit (#6825)
* chore: fix file upload limit * chore: update test
This commit is contained in:
parent
cc7476c75b
commit
51a11fbebd
@ -36,6 +36,9 @@ class StoreageNotificationListener {
|
|||||||
case StorageNotification.FileStorageLimitExceeded:
|
case StorageNotification.FileStorageLimitExceeded:
|
||||||
onError?.call(FlowyError.fromBuffer(data));
|
onError?.call(FlowyError.fromBuffer(data));
|
||||||
break;
|
break;
|
||||||
|
case StorageNotification.SingleFileLimitExceeded:
|
||||||
|
onError?.call(FlowyError.fromBuffer(data));
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
Log.error(
|
Log.error(
|
||||||
|
@ -110,6 +110,12 @@ class SidebarPlanBloc extends Bloc<SidebarPlanEvent, SidebarPlanState> {
|
|||||||
tierIndicator: const SidebarToastTierIndicator.storageLimitHit(),
|
tierIndicator: const SidebarToastTierIndicator.storageLimitHit(),
|
||||||
),
|
),
|
||||||
);
|
);
|
||||||
|
} else if (error.code == ErrorCode.SingleUploadLimitExceeded) {
|
||||||
|
emit(
|
||||||
|
state.copyWith(
|
||||||
|
tierIndicator: const SidebarToastTierIndicator.singleFileLimitHit(),
|
||||||
|
),
|
||||||
|
);
|
||||||
} else {
|
} else {
|
||||||
Log.error("Unhandle Unexpected error: $error");
|
Log.error("Unhandle Unexpected error: $error");
|
||||||
}
|
}
|
||||||
@ -225,6 +231,7 @@ class SidebarPlanState with _$SidebarPlanState {
|
|||||||
@freezed
|
@freezed
|
||||||
class SidebarToastTierIndicator with _$SidebarToastTierIndicator {
|
class SidebarToastTierIndicator with _$SidebarToastTierIndicator {
|
||||||
const factory SidebarToastTierIndicator.storageLimitHit() = _StorageLimitHit;
|
const factory SidebarToastTierIndicator.storageLimitHit() = _StorageLimitHit;
|
||||||
|
const factory SidebarToastTierIndicator.singleFileLimitHit() = _SingleFileLimitHit;
|
||||||
const factory SidebarToastTierIndicator.aiMaxiLimitHit() = _aiMaxLimitHit;
|
const factory SidebarToastTierIndicator.aiMaxiLimitHit() = _aiMaxLimitHit;
|
||||||
const factory SidebarToastTierIndicator.loading() = _Loading;
|
const factory SidebarToastTierIndicator.loading() = _Loading;
|
||||||
}
|
}
|
||||||
|
@ -30,6 +30,10 @@ class SidebarToast extends StatelessWidget {
|
|||||||
storageLimitHit: () => WidgetsBinding.instance.addPostFrameCallback(
|
storageLimitHit: () => WidgetsBinding.instance.addPostFrameCallback(
|
||||||
(_) => _showStorageLimitDialog(context),
|
(_) => _showStorageLimitDialog(context),
|
||||||
),
|
),
|
||||||
|
singleFileLimitHit: () =>
|
||||||
|
WidgetsBinding.instance.addPostFrameCallback(
|
||||||
|
(_) => _showSingleFileLimitDialog(context),
|
||||||
|
),
|
||||||
orElse: () {},
|
orElse: () {},
|
||||||
);
|
);
|
||||||
},
|
},
|
||||||
@ -48,6 +52,7 @@ class SidebarToast extends StatelessWidget {
|
|||||||
onTap: () => _handleOnTap(context, SubscriptionPlanPB.AiMax),
|
onTap: () => _handleOnTap(context, SubscriptionPlanPB.AiMax),
|
||||||
reason: LocaleKeys.sideBar_aiResponseLimitTitle.tr(),
|
reason: LocaleKeys.sideBar_aiResponseLimitTitle.tr(),
|
||||||
),
|
),
|
||||||
|
singleFileLimitHit: () => const SizedBox.shrink(),
|
||||||
);
|
);
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
@ -66,6 +71,20 @@ class SidebarToast extends StatelessWidget {
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
|
void _showSingleFileLimitDialog(BuildContext context) => showConfirmDialog(
|
||||||
|
context: context,
|
||||||
|
title: LocaleKeys.sideBar_upgradeToPro.tr(),
|
||||||
|
description:
|
||||||
|
LocaleKeys.sideBar_singleFileProPlanLimitationDescription.tr(),
|
||||||
|
confirmLabel:
|
||||||
|
LocaleKeys.settings_comparePlanDialog_actions_upgrade.tr(),
|
||||||
|
onConfirm: () {
|
||||||
|
WidgetsBinding.instance.addPostFrameCallback(
|
||||||
|
(_) => _handleOnTap(context, SubscriptionPlanPB.Pro),
|
||||||
|
);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
void _handleOnTap(BuildContext context, SubscriptionPlanPB plan) {
|
void _handleOnTap(BuildContext context, SubscriptionPlanPB plan) {
|
||||||
final userProfile = context.read<SidebarPlanBloc>().state.userProfile;
|
final userProfile = context.read<SidebarPlanBloc>().state.userProfile;
|
||||||
if (userProfile == null) {
|
if (userProfile == null) {
|
||||||
|
@ -338,6 +338,7 @@
|
|||||||
"askOwnerToUpgradeToAIMax": "Your workspace has ran out of free AI responses. Please ask your workspace owner to upgrade the plan or purchase AI add-ons",
|
"askOwnerToUpgradeToAIMax": "Your workspace has ran out of free AI responses. Please ask your workspace owner to upgrade the plan or purchase AI add-ons",
|
||||||
"askOwnerToUpgradeToAIMaxIOS": "Your workspace is running out of free AI responses.",
|
"askOwnerToUpgradeToAIMaxIOS": "Your workspace is running out of free AI responses.",
|
||||||
"purchaseStorageSpace": "Purchase Storage Space",
|
"purchaseStorageSpace": "Purchase Storage Space",
|
||||||
|
"singleFileProPlanLimitationDescription": "You need to upgrade to Pro Plan to upload large file",
|
||||||
"purchaseAIResponse": "Purchase ",
|
"purchaseAIResponse": "Purchase ",
|
||||||
"askOwnerToUpgradeToLocalAI": "Ask workspace owner to enable AI On-device",
|
"askOwnerToUpgradeToLocalAI": "Ask workspace owner to enable AI On-device",
|
||||||
"upgradeToAILocal": "Run local models on your device for ultimate privacy",
|
"upgradeToAILocal": "Run local models on your device for ultimate privacy",
|
||||||
|
@ -21,7 +21,6 @@ use flowy_document::entities::{DocumentSnapshotData, DocumentSnapshotMeta};
|
|||||||
use flowy_document::manager::{DocumentManager, DocumentSnapshotService, DocumentUserService};
|
use flowy_document::manager::{DocumentManager, DocumentSnapshotService, DocumentUserService};
|
||||||
use flowy_document_pub::cloud::*;
|
use flowy_document_pub::cloud::*;
|
||||||
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
|
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
|
||||||
use flowy_storage_pub::chunked_byte::ChunkedBytes;
|
|
||||||
use flowy_storage_pub::storage::{CreatedUpload, FileProgressReceiver, StorageService};
|
use flowy_storage_pub::storage::{CreatedUpload, FileProgressReceiver, StorageService};
|
||||||
use lib_infra::async_trait::async_trait;
|
use lib_infra::async_trait::async_trait;
|
||||||
use lib_infra::box_any::BoxAny;
|
use lib_infra::box_any::BoxAny;
|
||||||
@ -205,7 +204,7 @@ impl StorageService for DocumentTestFileStorageService {
|
|||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn start_upload(&self, _chunks: ChunkedBytes, _record: &BoxAny) -> Result<(), FlowyError> {
|
async fn start_upload(&self, _record: &BoxAny) -> Result<(), FlowyError> {
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -287,7 +287,7 @@ pub enum ErrorCode {
|
|||||||
#[error("Local AI unavailable")]
|
#[error("Local AI unavailable")]
|
||||||
LocalAIUnavailable = 99,
|
LocalAIUnavailable = 99,
|
||||||
|
|
||||||
#[error("File storage limit exceeded")]
|
#[error("Storage limit exceeded")]
|
||||||
FileStorageLimitExceeded = 100,
|
FileStorageLimitExceeded = 100,
|
||||||
|
|
||||||
#[error("AI Response limit exceeded")]
|
#[error("AI Response limit exceeded")]
|
||||||
|
@ -74,7 +74,17 @@ impl FlowyError {
|
|||||||
|
|
||||||
pub fn is_file_limit_exceeded(&self) -> bool {
|
pub fn is_file_limit_exceeded(&self) -> bool {
|
||||||
self.code == ErrorCode::FileStorageLimitExceeded
|
self.code == ErrorCode::FileStorageLimitExceeded
|
||||||
|| self.code == ErrorCode::SingleUploadLimitExceeded
|
}
|
||||||
|
|
||||||
|
pub fn is_single_file_limit_exceeded(&self) -> bool {
|
||||||
|
self.code == ErrorCode::SingleUploadLimitExceeded
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn should_retry_upload(&self) -> bool {
|
||||||
|
!matches!(
|
||||||
|
self.code,
|
||||||
|
ErrorCode::FileStorageLimitExceeded | ErrorCode::SingleUploadLimitExceeded
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn is_ai_response_limit_exceeded(&self) -> bool {
|
pub fn is_ai_response_limit_exceeded(&self) -> bool {
|
||||||
|
@ -1,27 +1,95 @@
|
|||||||
use anyhow::anyhow;
|
use anyhow::anyhow;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use std::fmt::Display;
|
use std::fmt::Display;
|
||||||
use std::ops::Deref;
|
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
use tokio::fs::File;
|
||||||
use tokio::io::AsyncReadExt;
|
use tokio::io::AsyncReadExt;
|
||||||
|
use tokio::io::SeekFrom;
|
||||||
|
use tokio::io::{self, AsyncSeekExt};
|
||||||
|
|
||||||
/// In Amazon S3, the minimum chunk size for multipart uploads is 5 MB,except for the last part,
|
/// In Amazon S3, the minimum chunk size for multipart uploads is 5 MB,except for the last part,
|
||||||
/// which can be smaller.(https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html)
|
/// which can be smaller.(https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html)
|
||||||
pub const MIN_CHUNK_SIZE: usize = 5 * 1024 * 1024; // Minimum Chunk Size 5 MB
|
pub const MIN_CHUNK_SIZE: usize = 5 * 1024 * 1024; // Minimum Chunk Size 5 MB
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug)]
|
||||||
pub struct ChunkedBytes {
|
pub struct ChunkedBytes {
|
||||||
pub data: Bytes,
|
file: File,
|
||||||
pub chunk_size: i32,
|
chunk_size: usize,
|
||||||
pub offsets: Vec<(usize, usize)>,
|
file_size: u64,
|
||||||
pub current_offset: i32,
|
current_offset: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Deref for ChunkedBytes {
|
impl ChunkedBytes {
|
||||||
type Target = Bytes;
|
/// Create a `ChunkedBytes` instance from a file.
|
||||||
|
pub async fn from_file<P: AsRef<Path>>(
|
||||||
|
file_path: P,
|
||||||
|
chunk_size: usize,
|
||||||
|
) -> Result<Self, anyhow::Error> {
|
||||||
|
if chunk_size < MIN_CHUNK_SIZE {
|
||||||
|
return Err(anyhow!(
|
||||||
|
"Chunk size should be greater than or equal to {} bytes",
|
||||||
|
MIN_CHUNK_SIZE
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
let file = File::open(file_path).await?;
|
||||||
&self.data
|
let file_size = file.metadata().await?.len();
|
||||||
|
|
||||||
|
Ok(ChunkedBytes {
|
||||||
|
file,
|
||||||
|
chunk_size,
|
||||||
|
file_size,
|
||||||
|
current_offset: 0,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read the next chunk from the file.
|
||||||
|
pub async fn next_chunk(&mut self) -> Option<Result<Bytes, io::Error>> {
|
||||||
|
if self.current_offset >= self.file_size {
|
||||||
|
return None; // End of file
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut buffer = vec![0u8; self.chunk_size];
|
||||||
|
let mut total_bytes_read = 0;
|
||||||
|
|
||||||
|
// Loop to ensure the buffer is filled or EOF is reached
|
||||||
|
while total_bytes_read < self.chunk_size {
|
||||||
|
let read_result = self.file.read(&mut buffer[total_bytes_read..]).await;
|
||||||
|
match read_result {
|
||||||
|
Ok(0) => break, // EOF
|
||||||
|
Ok(n) => total_bytes_read += n,
|
||||||
|
Err(e) => return Some(Err(e)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if total_bytes_read == 0 {
|
||||||
|
return None; // EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
self.current_offset += total_bytes_read as u64;
|
||||||
|
Some(Ok(Bytes::from(buffer[..total_bytes_read].to_vec())))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set the offset for the next chunk to be read.
|
||||||
|
pub async fn set_offset(&mut self, offset: u64) -> Result<(), io::Error> {
|
||||||
|
if offset > self.file_size {
|
||||||
|
return Err(io::Error::new(
|
||||||
|
io::ErrorKind::InvalidInput,
|
||||||
|
"Offset out of range",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
self.current_offset = offset;
|
||||||
|
self.file.seek(SeekFrom::Start(offset)).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the total number of chunks in the file.
|
||||||
|
pub fn total_chunks(&self) -> usize {
|
||||||
|
((self.file_size + self.chunk_size as u64 - 1) / self.chunk_size as u64) as usize
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the current offset in the file.
|
||||||
|
pub fn current_offset(&self) -> u64 {
|
||||||
|
self.current_offset
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -29,215 +97,321 @@ impl Display for ChunkedBytes {
|
|||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
write!(
|
write!(
|
||||||
f,
|
f,
|
||||||
"data:{}, chunk_size:{}, num chunk:{}, offset:{}",
|
"file_size: {}, chunk_size: {}, total_chunks: {}, current_offset: {}",
|
||||||
self.data.len(),
|
self.file_size,
|
||||||
self.chunk_size,
|
self.chunk_size,
|
||||||
self.offsets.len(),
|
self.total_chunks(),
|
||||||
self.current_offset
|
self.current_offset
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ChunkedBytes {
|
|
||||||
pub fn from_bytes_with_chunk_size(data: Bytes, chunk_size: i32) -> Result<Self, anyhow::Error> {
|
|
||||||
if chunk_size < MIN_CHUNK_SIZE as i32 {
|
|
||||||
return Err(anyhow!(
|
|
||||||
"Chunk size should be greater than or equal to {}",
|
|
||||||
MIN_CHUNK_SIZE
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
let offsets = split_into_chunks(&data, chunk_size as usize);
|
|
||||||
Ok(ChunkedBytes {
|
|
||||||
data,
|
|
||||||
offsets,
|
|
||||||
chunk_size,
|
|
||||||
current_offset: 0,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Used to create a `ChunkedBytes` from a `Bytes` object. The default chunk size is 5 MB.
|
|
||||||
pub fn from_bytes(data: Bytes) -> Self {
|
|
||||||
let chunk_size = MIN_CHUNK_SIZE as i32;
|
|
||||||
let offsets = split_into_chunks(&data, MIN_CHUNK_SIZE);
|
|
||||||
ChunkedBytes {
|
|
||||||
data,
|
|
||||||
offsets,
|
|
||||||
chunk_size,
|
|
||||||
current_offset: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn from_file<P: AsRef<Path>>(
|
|
||||||
file_path: P,
|
|
||||||
chunk_size: i32,
|
|
||||||
) -> Result<Self, tokio::io::Error> {
|
|
||||||
let mut file = tokio::fs::File::open(file_path).await?;
|
|
||||||
let mut buffer = Vec::new();
|
|
||||||
file.read_to_end(&mut buffer).await?;
|
|
||||||
let data = Bytes::from(buffer);
|
|
||||||
|
|
||||||
let offsets = split_into_chunks(&data, chunk_size as usize);
|
|
||||||
Ok(ChunkedBytes {
|
|
||||||
data,
|
|
||||||
offsets,
|
|
||||||
chunk_size,
|
|
||||||
current_offset: 0,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_current_offset(&mut self, offset: i32) {
|
|
||||||
self.current_offset = offset;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn iter(&self) -> ChunkedBytesIterator {
|
|
||||||
ChunkedBytesIterator {
|
|
||||||
chunked_data: self,
|
|
||||||
current_index: self.current_offset as usize,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct ChunkedBytesIterator<'a> {
|
|
||||||
chunked_data: &'a ChunkedBytes,
|
|
||||||
current_index: usize,
|
|
||||||
}
|
|
||||||
impl<'a> Iterator for ChunkedBytesIterator<'a> {
|
|
||||||
type Item = Bytes;
|
|
||||||
|
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
|
||||||
if self.current_index >= self.chunked_data.offsets.len() {
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
let (start, end) = self.chunked_data.offsets[self.current_index];
|
|
||||||
self.current_index += 1;
|
|
||||||
Some(self.chunked_data.data.slice(start..end))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Function to split input bytes into several chunks and return offsets
|
// Function to split input bytes into several chunks and return offsets
|
||||||
pub fn split_into_chunks(data: &Bytes, chunk_size: usize) -> Vec<(usize, usize)> {
|
pub fn split_into_chunks(data: &Bytes, chunk_size: usize) -> Vec<(usize, usize)> {
|
||||||
|
calculate_offsets(data.len(), chunk_size)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn calculate_offsets(data_len: usize, chunk_size: usize) -> Vec<(usize, usize)> {
|
||||||
let mut offsets = Vec::new();
|
let mut offsets = Vec::new();
|
||||||
let mut start = 0;
|
let mut start = 0;
|
||||||
|
|
||||||
while start < data.len() {
|
while start < data_len {
|
||||||
let end = std::cmp::min(start + chunk_size, data.len());
|
let end = std::cmp::min(start + chunk_size, data_len);
|
||||||
offsets.push((start, end));
|
offsets.push((start, end));
|
||||||
start = end;
|
start = end;
|
||||||
}
|
}
|
||||||
|
|
||||||
offsets
|
offsets
|
||||||
}
|
}
|
||||||
|
|
||||||
// Function to get chunk data using chunk number
|
|
||||||
pub async fn get_chunk(
|
|
||||||
data: Bytes,
|
|
||||||
chunk_number: usize,
|
|
||||||
offsets: &[(usize, usize)],
|
|
||||||
) -> Result<Bytes, anyhow::Error> {
|
|
||||||
if chunk_number >= offsets.len() {
|
|
||||||
return Err(anyhow!("Chunk number out of range"));
|
|
||||||
}
|
|
||||||
|
|
||||||
let (start, end) = offsets[chunk_number];
|
|
||||||
let chunk = data.slice(start..end);
|
|
||||||
|
|
||||||
Ok(chunk)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use crate::chunked_byte::{ChunkedBytes, MIN_CHUNK_SIZE};
|
use super::*;
|
||||||
use bytes::Bytes;
|
|
||||||
use std::env::temp_dir;
|
use std::env::temp_dir;
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_chunked_bytes_less_than_chunk_size() {
|
async fn test_chunked_bytes_small_file() {
|
||||||
let data = Bytes::from(vec![0; 1024 * 1024]); // 1 MB of zeroes
|
// Create a small file of 1 MB
|
||||||
let chunked_data =
|
|
||||||
ChunkedBytes::from_bytes_with_chunk_size(data.clone(), MIN_CHUNK_SIZE as i32).unwrap();
|
|
||||||
|
|
||||||
// Check if the offsets are correct
|
|
||||||
assert_eq!(chunked_data.offsets.len(), 1); // Should have 1 chunk
|
|
||||||
assert_eq!(chunked_data.offsets[0], (0, 1024 * 1024));
|
|
||||||
|
|
||||||
// Check if the data can be iterated correctly
|
|
||||||
let mut iter = chunked_data.iter();
|
|
||||||
assert_eq!(iter.next().unwrap().len(), 1024 * 1024);
|
|
||||||
assert!(iter.next().is_none());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_chunked_bytes_from_bytes() {
|
|
||||||
let data = Bytes::from(vec![0; 15 * 1024 * 1024]); // 15 MB of zeroes
|
|
||||||
let chunked_data =
|
|
||||||
ChunkedBytes::from_bytes_with_chunk_size(data.clone(), MIN_CHUNK_SIZE as i32).unwrap();
|
|
||||||
|
|
||||||
// Check if the offsets are correct
|
|
||||||
assert_eq!(chunked_data.offsets.len(), 3); // Should have 3 chunks
|
|
||||||
assert_eq!(chunked_data.offsets[0], (0, 5 * 1024 * 1024));
|
|
||||||
assert_eq!(chunked_data.offsets[1], (5 * 1024 * 1024, 10 * 1024 * 1024));
|
|
||||||
assert_eq!(
|
|
||||||
chunked_data.offsets[2],
|
|
||||||
(10 * 1024 * 1024, 15 * 1024 * 1024)
|
|
||||||
);
|
|
||||||
|
|
||||||
// Check if the data can be iterated correctly
|
|
||||||
let mut iter = chunked_data.iter();
|
|
||||||
assert_eq!(iter.next().unwrap().len(), 5 * 1024 * 1024);
|
|
||||||
assert_eq!(iter.next().unwrap().len(), 5 * 1024 * 1024);
|
|
||||||
assert_eq!(iter.next().unwrap().len(), 5 * 1024 * 1024);
|
|
||||||
assert!(iter.next().is_none());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_chunked_bytes_from_file() {
|
|
||||||
// Create a temporary file with 15 MB of zeroes
|
|
||||||
let mut file_path = temp_dir();
|
let mut file_path = temp_dir();
|
||||||
file_path.push("test_file");
|
file_path.push("test_small_file");
|
||||||
|
|
||||||
let mut file = tokio::fs::File::create(&file_path).await.unwrap();
|
let mut file = File::create(&file_path).await.unwrap();
|
||||||
file.write_all(&vec![0; 15 * 1024 * 1024]).await.unwrap();
|
file.write_all(&vec![0; 1 * 1024 * 1024]).await.unwrap(); // 1 MB
|
||||||
file.flush().await.unwrap();
|
file.flush().await.unwrap();
|
||||||
|
|
||||||
// Read the file into ChunkedBytes
|
// Create ChunkedBytes instance
|
||||||
let chunked_data = ChunkedBytes::from_file(&file_path, MIN_CHUNK_SIZE as i32)
|
let mut chunked_bytes = ChunkedBytes::from_file(&file_path, MIN_CHUNK_SIZE)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// Check if the offsets are correct
|
// Validate total chunks and read the data
|
||||||
assert_eq!(chunked_data.offsets.len(), 3); // Should have 3 chunks
|
assert_eq!(chunked_bytes.total_chunks(), 1); // Only 1 chunk due to file size
|
||||||
assert_eq!(chunked_data.offsets[0], (0, 5 * 1024 * 1024));
|
let chunk = chunked_bytes.next_chunk().await.unwrap().unwrap();
|
||||||
assert_eq!(chunked_data.offsets[1], (5 * 1024 * 1024, 10 * 1024 * 1024));
|
assert_eq!(chunk.len(), 1 * 1024 * 1024); // The full 1 MB
|
||||||
assert_eq!(
|
|
||||||
chunked_data.offsets[2],
|
|
||||||
(10 * 1024 * 1024, 15 * 1024 * 1024)
|
|
||||||
);
|
|
||||||
|
|
||||||
// Check if the data can be iterated correctly
|
// Ensure no more chunks are available
|
||||||
let mut iter = chunked_data.iter();
|
assert!(chunked_bytes.next_chunk().await.is_none());
|
||||||
assert_eq!(iter.next().unwrap().len(), 5 * 1024 * 1024);
|
|
||||||
assert_eq!(iter.next().unwrap().len(), 5 * 1024 * 1024);
|
|
||||||
assert_eq!(iter.next().unwrap().len(), 5 * 1024 * 1024);
|
|
||||||
assert!(iter.next().is_none());
|
|
||||||
|
|
||||||
// Clean up the temporary file
|
|
||||||
tokio::fs::remove_file(file_path).await.unwrap();
|
tokio::fs::remove_file(file_path).await.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_chunked_bytes_with_current_offset() {
|
async fn test_chunked_bytes_large_file() {
|
||||||
let data = Bytes::from(vec![0; 15 * 1024 * 1024]); // 15 MB of zeroes
|
// Create a large file of 15 MB
|
||||||
let mut chunked_data =
|
let mut file_path = temp_dir();
|
||||||
ChunkedBytes::from_bytes_with_chunk_size(data.clone(), MIN_CHUNK_SIZE as i32).unwrap();
|
file_path.push("test_large_file");
|
||||||
|
|
||||||
// Set the current offset to the second chunk
|
let mut file = File::create(&file_path).await.unwrap();
|
||||||
chunked_data.set_current_offset(1);
|
file.write_all(&vec![0; 15 * 1024 * 1024]).await.unwrap(); // 15 MB
|
||||||
|
file.flush().await.unwrap();
|
||||||
|
|
||||||
// Check if the iterator starts from the second chunk
|
// Create ChunkedBytes instance
|
||||||
let mut iter = chunked_data.iter();
|
let mut chunked_bytes = ChunkedBytes::from_file(&file_path, MIN_CHUNK_SIZE)
|
||||||
assert_eq!(iter.next().unwrap().len(), 5 * 1024 * 1024); // Second chunk
|
.await
|
||||||
assert_eq!(iter.next().unwrap().len(), 5 * 1024 * 1024); // Third chunk
|
.unwrap();
|
||||||
assert!(iter.next().is_none());
|
|
||||||
|
// Validate total chunks
|
||||||
|
assert_eq!(chunked_bytes.total_chunks(), 3); // 15 MB split into 3 chunks of 5 MB
|
||||||
|
|
||||||
|
// Read and validate all chunks
|
||||||
|
let mut chunk_sizes = vec![];
|
||||||
|
while let Some(chunk_result) = chunked_bytes.next_chunk().await {
|
||||||
|
chunk_sizes.push(chunk_result.unwrap().len());
|
||||||
|
}
|
||||||
|
assert_eq!(
|
||||||
|
chunk_sizes,
|
||||||
|
vec![5 * 1024 * 1024, 5 * 1024 * 1024, 5 * 1024 * 1024]
|
||||||
|
);
|
||||||
|
|
||||||
|
tokio::fs::remove_file(file_path).await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_set_offset() {
|
||||||
|
// Create a file of 10 MB
|
||||||
|
let mut file_path = temp_dir();
|
||||||
|
file_path.push("test_offset_file");
|
||||||
|
|
||||||
|
let mut file = File::create(&file_path).await.unwrap();
|
||||||
|
file.write_all(&vec![0; 10 * 1024 * 1024]).await.unwrap(); // 10 MB
|
||||||
|
file.flush().await.unwrap();
|
||||||
|
|
||||||
|
// Create ChunkedBytes instance
|
||||||
|
let mut chunked_bytes = ChunkedBytes::from_file(&file_path, MIN_CHUNK_SIZE)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Set the offset to 5 MB and read the next chunk
|
||||||
|
chunked_bytes.set_offset(5 * 1024 * 1024).await.unwrap();
|
||||||
|
let chunk = chunked_bytes.next_chunk().await.unwrap().unwrap();
|
||||||
|
assert_eq!(chunk.len(), 5 * 1024 * 1024); // Read the second chunk
|
||||||
|
|
||||||
|
// Ensure no more chunks are available
|
||||||
|
assert!(chunked_bytes.next_chunk().await.is_none());
|
||||||
|
|
||||||
|
tokio::fs::remove_file(file_path).await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_partial_chunk() {
|
||||||
|
// Create a file of 6 MB (one full chunk and one partial chunk)
|
||||||
|
let mut file_path = temp_dir();
|
||||||
|
file_path.push("test_partial_chunk_file");
|
||||||
|
|
||||||
|
let mut file = File::create(&file_path).await.unwrap();
|
||||||
|
file.write_all(&vec![0; 6 * 1024 * 1024]).await.unwrap(); // 6 MB
|
||||||
|
file.flush().await.unwrap();
|
||||||
|
|
||||||
|
// Create ChunkedBytes instance
|
||||||
|
let mut chunked_bytes = ChunkedBytes::from_file(&file_path, MIN_CHUNK_SIZE)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Validate total chunks
|
||||||
|
assert_eq!(chunked_bytes.total_chunks(), 2); // 6 MB split into 1 full chunk and 1 partial chunk
|
||||||
|
|
||||||
|
// Read the first chunk
|
||||||
|
let chunk1 = chunked_bytes.next_chunk().await.unwrap().unwrap();
|
||||||
|
assert_eq!(chunk1.len(), 5 * 1024 * 1024); // Full chunk
|
||||||
|
|
||||||
|
// Read the second chunk
|
||||||
|
let chunk2 = chunked_bytes.next_chunk().await.unwrap().unwrap();
|
||||||
|
assert_eq!(chunk2.len(), 1 * 1024 * 1024); // Partial chunk
|
||||||
|
|
||||||
|
// Ensure no more chunks are available
|
||||||
|
assert!(chunked_bytes.next_chunk().await.is_none());
|
||||||
|
|
||||||
|
tokio::fs::remove_file(file_path).await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_invalid_offset() {
|
||||||
|
// Create a file of 5 MB
|
||||||
|
let mut file_path = temp_dir();
|
||||||
|
file_path.push("test_invalid_offset_file");
|
||||||
|
|
||||||
|
let mut file = File::create(&file_path).await.unwrap();
|
||||||
|
file.write_all(&vec![0; 5 * 1024 * 1024]).await.unwrap(); // 5 MB
|
||||||
|
file.flush().await.unwrap();
|
||||||
|
|
||||||
|
// Create ChunkedBytes instance
|
||||||
|
let mut chunked_bytes = ChunkedBytes::from_file(&file_path, MIN_CHUNK_SIZE)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Try setting an invalid offset
|
||||||
|
let result = chunked_bytes.set_offset(10 * 1024 * 1024).await;
|
||||||
|
assert!(result.is_err()); // Offset out of range
|
||||||
|
|
||||||
|
tokio::fs::remove_file(file_path).await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_exact_multiple_chunk_file() {
|
||||||
|
// Create a file of 10 MB (exact multiple of 5 MB)
|
||||||
|
let mut file_path = temp_dir();
|
||||||
|
file_path.push("test_exact_multiple_chunk_file");
|
||||||
|
|
||||||
|
let mut file = File::create(&file_path).await.unwrap();
|
||||||
|
file.write_all(&vec![0; 10 * 1024 * 1024]).await.unwrap(); // 10 MB
|
||||||
|
file.flush().await.unwrap();
|
||||||
|
|
||||||
|
// Create ChunkedBytes instance
|
||||||
|
let mut chunked_bytes = ChunkedBytes::from_file(&file_path, MIN_CHUNK_SIZE)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Validate total chunks
|
||||||
|
let expected_offsets = calculate_offsets(10 * 1024 * 1024, MIN_CHUNK_SIZE);
|
||||||
|
assert_eq!(chunked_bytes.total_chunks(), expected_offsets.len()); // 2 chunks
|
||||||
|
|
||||||
|
// Read and validate all chunks
|
||||||
|
let mut chunk_sizes = vec![];
|
||||||
|
while let Some(chunk_result) = chunked_bytes.next_chunk().await {
|
||||||
|
chunk_sizes.push(chunk_result.unwrap().len());
|
||||||
|
}
|
||||||
|
assert_eq!(chunk_sizes, vec![5 * 1024 * 1024, 5 * 1024 * 1024]); // 2 full chunks
|
||||||
|
|
||||||
|
tokio::fs::remove_file(file_path).await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_small_file_less_than_chunk_size() {
|
||||||
|
// Create a file of 2 MB (smaller than 5 MB)
|
||||||
|
let mut file_path = temp_dir();
|
||||||
|
file_path.push("test_small_file_less_than_chunk_size");
|
||||||
|
|
||||||
|
let mut file = File::create(&file_path).await.unwrap();
|
||||||
|
file.write_all(&vec![0; 2 * 1024 * 1024]).await.unwrap(); // 2 MB
|
||||||
|
file.flush().await.unwrap();
|
||||||
|
|
||||||
|
// Create ChunkedBytes instance
|
||||||
|
let mut chunked_bytes = ChunkedBytes::from_file(&file_path, MIN_CHUNK_SIZE)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Validate total chunks
|
||||||
|
let expected_offsets = calculate_offsets(2 * 1024 * 1024, MIN_CHUNK_SIZE);
|
||||||
|
assert_eq!(chunked_bytes.total_chunks(), expected_offsets.len()); // 1 chunk
|
||||||
|
|
||||||
|
// Read and validate all chunks
|
||||||
|
let mut chunk_sizes = vec![];
|
||||||
|
while let Some(chunk_result) = chunked_bytes.next_chunk().await {
|
||||||
|
chunk_sizes.push(chunk_result.unwrap().len());
|
||||||
|
}
|
||||||
|
assert_eq!(chunk_sizes, vec![2 * 1024 * 1024]); // 1 partial chunk
|
||||||
|
|
||||||
|
tokio::fs::remove_file(file_path).await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_file_slightly_larger_than_chunk_size() {
|
||||||
|
// Create a file of 5.5 MB (slightly larger than 1 chunk)
|
||||||
|
let mut file_path = temp_dir();
|
||||||
|
file_path.push("test_file_slightly_larger_than_chunk_size");
|
||||||
|
|
||||||
|
let mut file = File::create(&file_path).await.unwrap();
|
||||||
|
file
|
||||||
|
.write_all(&vec![0; 5 * 1024 * 1024 + 512 * 1024])
|
||||||
|
.await
|
||||||
|
.unwrap(); // 5.5 MB
|
||||||
|
file.flush().await.unwrap();
|
||||||
|
|
||||||
|
// Create ChunkedBytes instance
|
||||||
|
let mut chunked_bytes = ChunkedBytes::from_file(&file_path, MIN_CHUNK_SIZE)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Validate total chunks
|
||||||
|
let expected_offsets = calculate_offsets(5 * 1024 * 1024 + 512 * 1024, MIN_CHUNK_SIZE);
|
||||||
|
assert_eq!(chunked_bytes.total_chunks(), expected_offsets.len()); // 2 chunks
|
||||||
|
|
||||||
|
// Read and validate all chunks
|
||||||
|
let mut chunk_sizes = vec![];
|
||||||
|
while let Some(chunk_result) = chunked_bytes.next_chunk().await {
|
||||||
|
chunk_sizes.push(chunk_result.unwrap().len());
|
||||||
|
}
|
||||||
|
assert_eq!(chunk_sizes, vec![5 * 1024 * 1024, 512 * 1024]); // 1 full chunk, 1 partial chunk
|
||||||
|
|
||||||
|
tokio::fs::remove_file(file_path).await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_large_file_with_many_chunks() {
|
||||||
|
// Create a file of 50 MB (10 chunks of 5 MB)
|
||||||
|
let mut file_path = temp_dir();
|
||||||
|
file_path.push("test_large_file_with_many_chunks");
|
||||||
|
|
||||||
|
let mut file = File::create(&file_path).await.unwrap();
|
||||||
|
file.write_all(&vec![0; 50 * 1024 * 1024]).await.unwrap(); // 50 MB
|
||||||
|
file.flush().await.unwrap();
|
||||||
|
|
||||||
|
// Create ChunkedBytes instance
|
||||||
|
let mut chunked_bytes = ChunkedBytes::from_file(&file_path, MIN_CHUNK_SIZE)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Validate total chunks
|
||||||
|
let expected_offsets = calculate_offsets(50 * 1024 * 1024, MIN_CHUNK_SIZE);
|
||||||
|
assert_eq!(chunked_bytes.total_chunks(), expected_offsets.len()); // 10 chunks
|
||||||
|
|
||||||
|
// Read and validate all chunks
|
||||||
|
let mut chunk_sizes = vec![];
|
||||||
|
while let Some(chunk_result) = chunked_bytes.next_chunk().await {
|
||||||
|
chunk_sizes.push(chunk_result.unwrap().len());
|
||||||
|
}
|
||||||
|
assert_eq!(chunk_sizes, vec![5 * 1024 * 1024; 10]); // 10 full chunks
|
||||||
|
|
||||||
|
tokio::fs::remove_file(file_path).await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_file_with_exact_chunk_size() {
|
||||||
|
// Create a file of exactly 5 MB
|
||||||
|
let mut file_path = temp_dir();
|
||||||
|
file_path.push("test_file_with_exact_chunk_size");
|
||||||
|
|
||||||
|
let mut file = File::create(&file_path).await.unwrap();
|
||||||
|
file.write_all(&vec![0; 5 * 1024 * 1024]).await.unwrap(); // 5 MB
|
||||||
|
file.flush().await.unwrap();
|
||||||
|
|
||||||
|
// Create ChunkedBytes instance
|
||||||
|
let mut chunked_bytes = ChunkedBytes::from_file(&file_path, MIN_CHUNK_SIZE)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Validate total chunks
|
||||||
|
let expected_offsets = calculate_offsets(5 * 1024 * 1024, MIN_CHUNK_SIZE);
|
||||||
|
assert_eq!(chunked_bytes.total_chunks(), expected_offsets.len()); // 1 chunk
|
||||||
|
|
||||||
|
// Read and validate all chunks
|
||||||
|
let mut chunk_sizes = vec![];
|
||||||
|
while let Some(chunk_result) = chunked_bytes.next_chunk().await {
|
||||||
|
chunk_sizes.push(chunk_result.unwrap().len());
|
||||||
|
}
|
||||||
|
assert_eq!(chunk_sizes, vec![5 * 1024 * 1024]); // 1 full chunk
|
||||||
|
|
||||||
|
tokio::fs::remove_file(file_path).await.unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,3 @@
|
|||||||
use crate::chunked_byte::ChunkedBytes;
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
pub use client_api_entity::{CompletedPartRequest, CreateUploadResponse, UploadPartResponse};
|
pub use client_api_entity::{CompletedPartRequest, CreateUploadResponse, UploadPartResponse};
|
||||||
use flowy_error::{FlowyError, FlowyResult};
|
use flowy_error::{FlowyError, FlowyResult};
|
||||||
@ -22,7 +21,7 @@ pub trait StorageService: Send + Sync {
|
|||||||
upload_immediately: bool,
|
upload_immediately: bool,
|
||||||
) -> Result<(CreatedUpload, Option<FileProgressReceiver>), FlowyError>;
|
) -> Result<(CreatedUpload, Option<FileProgressReceiver>), FlowyError>;
|
||||||
|
|
||||||
async fn start_upload(&self, chunks: ChunkedBytes, record: &BoxAny) -> Result<(), FlowyError>;
|
async fn start_upload(&self, record: &BoxAny) -> Result<(), FlowyError>;
|
||||||
|
|
||||||
async fn resume_upload(
|
async fn resume_upload(
|
||||||
&self,
|
&self,
|
||||||
|
@ -27,6 +27,7 @@ impl FileTempStorage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Creates a temporary file from an existing local file path.
|
/// Creates a temporary file from an existing local file path.
|
||||||
|
#[allow(dead_code)]
|
||||||
pub async fn create_temp_file_from_existing(
|
pub async fn create_temp_file_from_existing(
|
||||||
&self,
|
&self,
|
||||||
existing_file_path: &Path,
|
existing_file_path: &Path,
|
||||||
|
@ -13,7 +13,7 @@ use collab_importer::util::FileId;
|
|||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
|
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
|
||||||
use flowy_sqlite::DBConnection;
|
use flowy_sqlite::DBConnection;
|
||||||
use flowy_storage_pub::chunked_byte::{ChunkedBytes, MIN_CHUNK_SIZE};
|
use flowy_storage_pub::chunked_byte::{calculate_offsets, ChunkedBytes, MIN_CHUNK_SIZE};
|
||||||
use flowy_storage_pub::cloud::StorageCloudService;
|
use flowy_storage_pub::cloud::StorageCloudService;
|
||||||
use flowy_storage_pub::storage::{
|
use flowy_storage_pub::storage::{
|
||||||
CompletedPartRequest, CreatedUpload, FileProgress, FileProgressReceiver, FileUploadState,
|
CompletedPartRequest, CreatedUpload, FileProgress, FileProgressReceiver, FileUploadState,
|
||||||
@ -22,7 +22,6 @@ use flowy_storage_pub::storage::{
|
|||||||
use lib_infra::box_any::BoxAny;
|
use lib_infra::box_any::BoxAny;
|
||||||
use lib_infra::isolate_stream::{IsolateSink, SinkExt};
|
use lib_infra::isolate_stream::{IsolateSink, SinkExt};
|
||||||
use lib_infra::util::timestamp;
|
use lib_infra::util::timestamp;
|
||||||
use std::io::ErrorKind;
|
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::atomic::AtomicBool;
|
use std::sync::atomic::AtomicBool;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@ -331,18 +330,20 @@ impl StorageService for StorageServiceImpl {
|
|||||||
return Err(FlowyError::file_storage_limit());
|
return Err(FlowyError::file_storage_limit());
|
||||||
}
|
}
|
||||||
|
|
||||||
let local_file_path = self
|
let local_file_path = file_path;
|
||||||
.temp_storage
|
// skip copy the file, upload from source
|
||||||
.create_temp_file_from_existing(Path::new(&file_path))
|
// let local_file_path = self
|
||||||
.await
|
// .temp_storage
|
||||||
.map_err(|err| {
|
// .create_temp_file_from_existing(Path::new(&file_path))
|
||||||
error!("[File] create temp file failed: {}", err);
|
// .await
|
||||||
FlowyError::internal()
|
// .map_err(|err| {
|
||||||
.with_context(format!("create temp file for upload file failed: {}", err))
|
// error!("[File] create temp file failed: {}", err);
|
||||||
})?;
|
// FlowyError::internal()
|
||||||
|
// .with_context(format!("create temp file for upload file failed: {}", err))
|
||||||
|
// })?;
|
||||||
|
|
||||||
// 1. create a file record and chunk the file
|
// 1. create a file record and chunk the file
|
||||||
let (chunks, record) = create_upload_record(workspace_id, parent_dir, local_file_path).await?;
|
let record = create_upload_record(workspace_id, parent_dir, local_file_path.clone()).await?;
|
||||||
// 2. save the record to sqlite
|
// 2. save the record to sqlite
|
||||||
let conn = self
|
let conn = self
|
||||||
.user_service
|
.user_service
|
||||||
@ -359,7 +360,7 @@ impl StorageService for StorageServiceImpl {
|
|||||||
self
|
self
|
||||||
.task_queue
|
.task_queue
|
||||||
.queue_task(UploadTask::ImmediateTask {
|
.queue_task(UploadTask::ImmediateTask {
|
||||||
chunks,
|
local_file_path,
|
||||||
record,
|
record,
|
||||||
retry_count: 3,
|
retry_count: 3,
|
||||||
})
|
})
|
||||||
@ -368,7 +369,7 @@ impl StorageService for StorageServiceImpl {
|
|||||||
self
|
self
|
||||||
.task_queue
|
.task_queue
|
||||||
.queue_task(UploadTask::Task {
|
.queue_task(UploadTask::Task {
|
||||||
chunks,
|
local_file_path,
|
||||||
record,
|
record,
|
||||||
retry_count: 0,
|
retry_count: 0,
|
||||||
})
|
})
|
||||||
@ -393,7 +394,7 @@ impl StorageService for StorageServiceImpl {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn start_upload(&self, chunks: ChunkedBytes, record: &BoxAny) -> Result<(), FlowyError> {
|
async fn start_upload(&self, record: &BoxAny) -> Result<(), FlowyError> {
|
||||||
let file_record = record.downcast_ref::<UploadFileTable>().ok_or_else(|| {
|
let file_record = record.downcast_ref::<UploadFileTable>().ok_or_else(|| {
|
||||||
FlowyError::internal().with_context("failed to downcast record to UploadFileTable")
|
FlowyError::internal().with_context("failed to downcast record to UploadFileTable")
|
||||||
})?;
|
})?;
|
||||||
@ -402,7 +403,6 @@ impl StorageService for StorageServiceImpl {
|
|||||||
&self.cloud_service,
|
&self.cloud_service,
|
||||||
&self.user_service,
|
&self.user_service,
|
||||||
&self.temp_storage,
|
&self.temp_storage,
|
||||||
chunks,
|
|
||||||
file_record,
|
file_record,
|
||||||
self.global_notifier.clone(),
|
self.global_notifier.clone(),
|
||||||
)
|
)
|
||||||
@ -468,18 +468,18 @@ async fn create_upload_record(
|
|||||||
workspace_id: String,
|
workspace_id: String,
|
||||||
parent_dir: String,
|
parent_dir: String,
|
||||||
local_file_path: String,
|
local_file_path: String,
|
||||||
) -> FlowyResult<(ChunkedBytes, UploadFileTable)> {
|
) -> FlowyResult<UploadFileTable> {
|
||||||
// read file and chunk it base on CHUNK_SIZE. We use MIN_CHUNK_SIZE as the minimum chunk size
|
let file_path = Path::new(&local_file_path);
|
||||||
let chunked_bytes = ChunkedBytes::from_file(&local_file_path, MIN_CHUNK_SIZE as i32).await?;
|
let file = tokio::fs::File::open(&file_path).await?;
|
||||||
let ext = Path::new(&local_file_path)
|
let metadata = file.metadata().await?;
|
||||||
.extension()
|
let file_size = metadata.len() as usize;
|
||||||
.and_then(std::ffi::OsStr::to_str)
|
|
||||||
.unwrap_or("")
|
// Calculate the total number of chunks
|
||||||
.to_owned();
|
let num_chunk = calculate_offsets(file_size, MIN_CHUNK_SIZE).len();
|
||||||
let content_type = mime_guess::from_path(&local_file_path)
|
let content_type = mime_guess::from_path(&file_path)
|
||||||
.first_or_octet_stream()
|
.first_or_octet_stream()
|
||||||
.to_string();
|
.to_string();
|
||||||
let file_id = FileId::from_bytes(&chunked_bytes.data, ext);
|
let file_id = FileId::from_path(&file_path.to_path_buf()).await?;
|
||||||
let record = UploadFileTable {
|
let record = UploadFileTable {
|
||||||
workspace_id,
|
workspace_id,
|
||||||
file_id,
|
file_id,
|
||||||
@ -488,12 +488,12 @@ async fn create_upload_record(
|
|||||||
parent_dir,
|
parent_dir,
|
||||||
local_file_path,
|
local_file_path,
|
||||||
content_type,
|
content_type,
|
||||||
chunk_size: chunked_bytes.chunk_size,
|
chunk_size: MIN_CHUNK_SIZE as i32,
|
||||||
num_chunk: chunked_bytes.offsets.len() as i32,
|
num_chunk: num_chunk as i32,
|
||||||
created_at: timestamp(),
|
created_at: timestamp(),
|
||||||
is_finish: false,
|
is_finish: false,
|
||||||
};
|
};
|
||||||
Ok((chunked_bytes, record))
|
Ok(record)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(level = "debug", skip_all, err)]
|
#[instrument(level = "debug", skip_all, err)]
|
||||||
@ -501,7 +501,6 @@ async fn start_upload(
|
|||||||
cloud_service: &Arc<dyn StorageCloudService>,
|
cloud_service: &Arc<dyn StorageCloudService>,
|
||||||
user_service: &Arc<dyn StorageUserService>,
|
user_service: &Arc<dyn StorageUserService>,
|
||||||
temp_storage: &Arc<FileTempStorage>,
|
temp_storage: &Arc<FileTempStorage>,
|
||||||
mut chunked_bytes: ChunkedBytes,
|
|
||||||
upload_file: &UploadFileTable,
|
upload_file: &UploadFileTable,
|
||||||
global_notifier: GlobalNotifier,
|
global_notifier: GlobalNotifier,
|
||||||
) -> FlowyResult<()> {
|
) -> FlowyResult<()> {
|
||||||
@ -515,10 +514,32 @@ async fn start_upload(
|
|||||||
part_number: part.part_num,
|
part_number: part.part_num,
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
let upload_offset = completed_parts.len() as u64;
|
||||||
|
|
||||||
let upload_offset = completed_parts.len() as i32;
|
let file_path = Path::new(&upload_file.local_file_path);
|
||||||
let total_parts = chunked_bytes.iter().count();
|
if !file_path.exists() {
|
||||||
chunked_bytes.set_current_offset(upload_offset);
|
error!("[File] file not found: {}", upload_file.local_file_path);
|
||||||
|
if let Ok(uid) = user_service.user_id() {
|
||||||
|
if let Ok(conn) = user_service.sqlite_connection(uid) {
|
||||||
|
delete_upload_file(conn, &upload_file.upload_id)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut chunked_bytes =
|
||||||
|
ChunkedBytes::from_file(&upload_file.local_file_path, MIN_CHUNK_SIZE).await?;
|
||||||
|
let total_parts = chunked_bytes.total_chunks();
|
||||||
|
if let Err(err) = chunked_bytes.set_offset(upload_offset).await {
|
||||||
|
error!(
|
||||||
|
"[File] set offset failed: {} for file: {}",
|
||||||
|
err, upload_file.local_file_path
|
||||||
|
);
|
||||||
|
if let Ok(uid) = user_service.user_id() {
|
||||||
|
if let Ok(conn) = user_service.sqlite_connection(uid) {
|
||||||
|
delete_upload_file(conn, &upload_file.upload_id)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"[File] start upload: workspace: {}, parent_dir: {}, file_id: {}, chunk: {}",
|
"[File] start upload: workspace: {}, parent_dir: {}, file_id: {}, chunk: {}",
|
||||||
@ -543,11 +564,7 @@ async fn start_upload(
|
|||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
if let Err(err) = create_upload_resp_result.as_ref() {
|
if let Err(err) = create_upload_resp_result.as_ref() {
|
||||||
if err.is_file_limit_exceeded() {
|
handle_upload_error(user_service, &err, &upload_file.upload_id);
|
||||||
make_notification(StorageNotification::FileStorageLimitExceeded)
|
|
||||||
.payload(err.clone())
|
|
||||||
.send();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
let create_upload_resp = create_upload_resp_result?;
|
let create_upload_resp = create_upload_resp_result?;
|
||||||
|
|
||||||
@ -572,12 +589,14 @@ async fn start_upload(
|
|||||||
info!(
|
info!(
|
||||||
"[File] {} start uploading parts:{}, offset:{}",
|
"[File] {} start uploading parts:{}, offset:{}",
|
||||||
upload_file.file_id,
|
upload_file.file_id,
|
||||||
chunked_bytes.iter().count(),
|
chunked_bytes.total_chunks(),
|
||||||
upload_offset,
|
upload_offset,
|
||||||
);
|
);
|
||||||
let iter = chunked_bytes.iter().enumerate();
|
|
||||||
for (index, chunk_bytes) in iter {
|
let mut part_number = upload_offset + 1;
|
||||||
let part_number = upload_offset + index as i32 + 1;
|
while let Some(chunk_result) = chunked_bytes.next_chunk().await {
|
||||||
|
match chunk_result {
|
||||||
|
Ok(chunk_bytes) => {
|
||||||
info!(
|
info!(
|
||||||
"[File] {} uploading {}th part, size:{}KB",
|
"[File] {} uploading {}th part, size:{}KB",
|
||||||
upload_file.file_id,
|
upload_file.file_id,
|
||||||
@ -606,6 +625,11 @@ async fn start_upload(
|
|||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(resp) => {
|
Ok(resp) => {
|
||||||
|
trace!(
|
||||||
|
"[File] {} part {} uploaded",
|
||||||
|
upload_file.file_id,
|
||||||
|
part_number
|
||||||
|
);
|
||||||
let mut progress_value = (part_number as f64 / total_parts as f64).clamp(0.0, 1.0);
|
let mut progress_value = (part_number as f64 / total_parts as f64).clamp(0.0, 1.0);
|
||||||
// The 0.1 is reserved for the complete_upload progress
|
// The 0.1 is reserved for the complete_upload progress
|
||||||
if progress_value >= 0.9 {
|
if progress_value >= 0.9 {
|
||||||
@ -626,12 +650,11 @@ async fn start_upload(
|
|||||||
});
|
});
|
||||||
},
|
},
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
if err.is_file_limit_exceeded() {
|
error!(
|
||||||
make_notification(StorageNotification::FileStorageLimitExceeded)
|
"[File] {} failed to upload part: {}",
|
||||||
.payload(err.clone())
|
upload_file.file_id, err
|
||||||
.send();
|
);
|
||||||
}
|
handle_upload_error(user_service, &err, &upload_file.upload_id);
|
||||||
|
|
||||||
if let Err(err) = global_notifier.send(FileProgress::new_error(
|
if let Err(err) = global_notifier.send(FileProgress::new_error(
|
||||||
file_url,
|
file_url,
|
||||||
upload_file.file_id.clone(),
|
upload_file.file_id.clone(),
|
||||||
@ -642,6 +665,16 @@ async fn start_upload(
|
|||||||
return Err(err);
|
return Err(err);
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
part_number += 1; // Increment part number
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
error!(
|
||||||
|
"[File] {} failed to read chunk: {:?}",
|
||||||
|
upload_file.file_id, e
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// mark it as completed
|
// mark it as completed
|
||||||
@ -655,16 +688,38 @@ async fn start_upload(
|
|||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
if let Err(err) = complete_upload_result {
|
if let Err(err) = complete_upload_result {
|
||||||
|
handle_upload_error(user_service, &err, &upload_file.upload_id);
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_upload_error(
|
||||||
|
user_service: &Arc<dyn StorageUserService>,
|
||||||
|
err: &FlowyError,
|
||||||
|
upload_id: &str,
|
||||||
|
) {
|
||||||
if err.is_file_limit_exceeded() {
|
if err.is_file_limit_exceeded() {
|
||||||
make_notification(StorageNotification::FileStorageLimitExceeded)
|
make_notification(StorageNotification::FileStorageLimitExceeded)
|
||||||
.payload(err.clone())
|
.payload(err.clone())
|
||||||
.send();
|
.send();
|
||||||
}
|
}
|
||||||
|
|
||||||
return Err(err);
|
if err.is_single_file_limit_exceeded() {
|
||||||
|
info!("[File] file exceed limit:{}", upload_id);
|
||||||
|
if let Ok(user_id) = user_service.user_id() {
|
||||||
|
if let Ok(db_conn) = user_service.sqlite_connection(user_id) {
|
||||||
|
if let Err(err) = delete_upload_file(db_conn, upload_id) {
|
||||||
|
error!("[File] delete upload file:{} error:{}", upload_id, err);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
make_notification(StorageNotification::SingleFileLimitExceeded)
|
||||||
|
.payload(err.clone())
|
||||||
|
.send();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(level = "debug", skip_all, err)]
|
#[instrument(level = "debug", skip_all, err)]
|
||||||
@ -683,33 +738,15 @@ async fn resume_upload(
|
|||||||
upload_file.local_file_path
|
upload_file.local_file_path
|
||||||
);
|
);
|
||||||
|
|
||||||
match ChunkedBytes::from_file(&upload_file.local_file_path, MIN_CHUNK_SIZE as i32).await {
|
|
||||||
Ok(chunked_bytes) => {
|
|
||||||
// When there were any parts already uploaded, skip those parts by setting the current offset.
|
|
||||||
start_upload(
|
start_upload(
|
||||||
cloud_service,
|
cloud_service,
|
||||||
user_service,
|
user_service,
|
||||||
temp_storage,
|
temp_storage,
|
||||||
chunked_bytes,
|
|
||||||
&upload_file,
|
&upload_file,
|
||||||
global_notifier,
|
global_notifier,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
},
|
|
||||||
Err(err) => match err.kind() {
|
|
||||||
ErrorKind::NotFound => {
|
|
||||||
error!("[File] file not found: {}", upload_file.local_file_path);
|
|
||||||
if let Ok(uid) = user_service.user_id() {
|
|
||||||
if let Ok(conn) = user_service.sqlite_connection(uid) {
|
|
||||||
delete_upload_file(conn, &upload_file.upload_id)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
_ => {
|
|
||||||
error!("[File] read file failed: {}", err);
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -796,11 +833,12 @@ async fn complete_upload(
|
|||||||
|
|
||||||
let conn = user_service.sqlite_connection(user_service.user_id()?)?;
|
let conn = user_service.sqlite_connection(user_service.user_id()?)?;
|
||||||
update_upload_file_completed(conn, &upload_file.upload_id)?;
|
update_upload_file_completed(conn, &upload_file.upload_id)?;
|
||||||
|
|
||||||
if let Err(err) = temp_storage
|
if let Err(err) = temp_storage
|
||||||
.delete_temp_file(&upload_file.local_file_path)
|
.delete_temp_file(&upload_file.local_file_path)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
error!("[File] delete temp file failed: {}", err);
|
trace!("[File] delete temp file failed: {}", err);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
@ -7,6 +7,8 @@ const OBSERVABLE_SOURCE: &str = "storage";
|
|||||||
pub(crate) enum StorageNotification {
|
pub(crate) enum StorageNotification {
|
||||||
#[default]
|
#[default]
|
||||||
FileStorageLimitExceeded = 0,
|
FileStorageLimitExceeded = 0,
|
||||||
|
|
||||||
|
SingleFileLimitExceeded = 1,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::convert::From<StorageNotification> for i32 {
|
impl std::convert::From<StorageNotification> for i32 {
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
use crate::sqlite_sql::UploadFileTable;
|
use crate::sqlite_sql::UploadFileTable;
|
||||||
use crate::uploader::UploadTask::BackgroundTask;
|
use crate::uploader::UploadTask::BackgroundTask;
|
||||||
use flowy_storage_pub::chunked_byte::ChunkedBytes;
|
|
||||||
use flowy_storage_pub::storage::StorageService;
|
use flowy_storage_pub::storage::StorageService;
|
||||||
use lib_infra::box_any::BoxAny;
|
use lib_infra::box_any::BoxAny;
|
||||||
use std::cmp::Ordering;
|
use std::cmp::Ordering;
|
||||||
@ -154,39 +153,35 @@ impl FileUploader {
|
|||||||
|
|
||||||
match task {
|
match task {
|
||||||
UploadTask::ImmediateTask {
|
UploadTask::ImmediateTask {
|
||||||
chunks,
|
local_file_path,
|
||||||
record,
|
record,
|
||||||
mut retry_count,
|
mut retry_count,
|
||||||
}
|
}
|
||||||
| UploadTask::Task {
|
| UploadTask::Task {
|
||||||
chunks,
|
local_file_path,
|
||||||
record,
|
record,
|
||||||
mut retry_count,
|
mut retry_count,
|
||||||
} => {
|
} => {
|
||||||
let record = BoxAny::new(record);
|
let record = BoxAny::new(record);
|
||||||
if let Err(err) = self
|
if let Err(err) = self.storage_service.start_upload(&record).await {
|
||||||
.storage_service
|
|
||||||
.start_upload(chunks.clone(), &record)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
if err.is_file_limit_exceeded() {
|
if err.is_file_limit_exceeded() {
|
||||||
error!("[File] Failed to upload file: {}", err);
|
|
||||||
self.disable_storage_write();
|
self.disable_storage_write();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err.should_retry_upload() {
|
||||||
info!(
|
info!(
|
||||||
"[File] Failed to upload file: {}, retry_count:{}",
|
"[File] Failed to upload file: {}, retry_count:{}",
|
||||||
err, retry_count
|
err, retry_count
|
||||||
);
|
);
|
||||||
|
|
||||||
let record = record.unbox_or_error().unwrap();
|
let record = record.unbox_or_error().unwrap();
|
||||||
retry_count += 1;
|
retry_count += 1;
|
||||||
self.queue.tasks.write().await.push(UploadTask::Task {
|
self.queue.tasks.write().await.push(UploadTask::Task {
|
||||||
chunks,
|
local_file_path,
|
||||||
record,
|
record,
|
||||||
retry_count,
|
retry_count,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
}
|
||||||
},
|
},
|
||||||
UploadTask::BackgroundTask {
|
UploadTask::BackgroundTask {
|
||||||
workspace_id,
|
workspace_id,
|
||||||
@ -205,6 +200,7 @@ impl FileUploader {
|
|||||||
self.disable_storage_write();
|
self.disable_storage_write();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err.should_retry_upload() {
|
||||||
info!(
|
info!(
|
||||||
"[File] failed to resume upload file: {}, retry_count:{}",
|
"[File] failed to resume upload file: {}, retry_count:{}",
|
||||||
err, retry_count
|
err, retry_count
|
||||||
@ -218,6 +214,7 @@ impl FileUploader {
|
|||||||
retry_count,
|
retry_count,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -246,6 +243,10 @@ impl FileUploaderRunner {
|
|||||||
|
|
||||||
if let Some(uploader) = weak_uploader.upgrade() {
|
if let Some(uploader) = weak_uploader.upgrade() {
|
||||||
let value = notifier.borrow().clone();
|
let value = notifier.borrow().clone();
|
||||||
|
trace!(
|
||||||
|
"[File]: Uploader runner received signal, thread_id: {:?}",
|
||||||
|
std::thread::current().id()
|
||||||
|
);
|
||||||
match value {
|
match value {
|
||||||
Signal::Stop => {
|
Signal::Stop => {
|
||||||
info!("[File]:Uploader runner stopped, stop signal received");
|
info!("[File]:Uploader runner stopped, stop signal received");
|
||||||
@ -273,12 +274,12 @@ impl FileUploaderRunner {
|
|||||||
|
|
||||||
pub enum UploadTask {
|
pub enum UploadTask {
|
||||||
ImmediateTask {
|
ImmediateTask {
|
||||||
chunks: ChunkedBytes,
|
local_file_path: String,
|
||||||
record: UploadFileTable,
|
record: UploadFileTable,
|
||||||
retry_count: u8,
|
retry_count: u8,
|
||||||
},
|
},
|
||||||
Task {
|
Task {
|
||||||
chunks: ChunkedBytes,
|
local_file_path: String,
|
||||||
record: UploadFileTable,
|
record: UploadFileTable,
|
||||||
retry_count: u8,
|
retry_count: u8,
|
||||||
},
|
},
|
||||||
|
@ -153,7 +153,7 @@ pub async fn create_upload_file_record(
|
|||||||
local_file_path: String,
|
local_file_path: String,
|
||||||
) -> UploadFileTable {
|
) -> UploadFileTable {
|
||||||
// Create ChunkedBytes from file
|
// Create ChunkedBytes from file
|
||||||
let chunked_bytes = ChunkedBytes::from_file(&local_file_path, MIN_CHUNK_SIZE as i32)
|
let chunked_bytes = ChunkedBytes::from_file(&local_file_path, MIN_CHUNK_SIZE)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
@ -162,8 +162,17 @@ pub async fn create_upload_file_record(
|
|||||||
.first_or_octet_stream()
|
.first_or_octet_stream()
|
||||||
.to_string();
|
.to_string();
|
||||||
|
|
||||||
|
// let mut file_path = temp_dir();
|
||||||
|
// file_path.push("test_large_file_with_many_chunks");
|
||||||
|
// let mut file = File::create(&file_path).await.unwrap();
|
||||||
|
// file.write_all(&vec![0; 50 * 1024 * 1024]).await.unwrap(); // 50 MB
|
||||||
|
// file.flush().await.unwrap();
|
||||||
|
|
||||||
// Calculate file ID
|
// Calculate file ID
|
||||||
let file_id = FileId::from_bytes(&chunked_bytes.data, "b".to_string());
|
let file_id = FileId::from_path(&PathBuf::from(&local_file_path))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let num_chunk = chunked_bytes.total_chunks();
|
||||||
|
|
||||||
// Create UploadFileTable record
|
// Create UploadFileTable record
|
||||||
UploadFileTable {
|
UploadFileTable {
|
||||||
@ -174,7 +183,7 @@ pub async fn create_upload_file_record(
|
|||||||
local_file_path,
|
local_file_path,
|
||||||
content_type,
|
content_type,
|
||||||
chunk_size: MIN_CHUNK_SIZE as i32,
|
chunk_size: MIN_CHUNK_SIZE as i32,
|
||||||
num_chunk: chunked_bytes.offsets.len() as i32,
|
num_chunk: num_chunk as i32,
|
||||||
created_at: chrono::Utc::now().timestamp(),
|
created_at: chrono::Utc::now().timestamp(),
|
||||||
is_finish: false,
|
is_finish: false,
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user