diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/protobuf.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/protobuf.dart index fd31971c76..79780212c2 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/protobuf.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/protobuf.dart @@ -2,5 +2,6 @@ export './ws.pb.dart'; export './observable.pb.dart'; export './errors.pb.dart'; +export './revision.pb.dart'; export './event.pb.dart'; export './doc.pb.dart'; diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pb.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pb.dart new file mode 100644 index 0000000000..152a95f90c --- /dev/null +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pb.dart @@ -0,0 +1,101 @@ +/// +// Generated code. Do not modify. +// source: revision.proto +// +// @dart = 2.12 +// ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields + +import 'dart:core' as $core; + +import 'package:fixnum/fixnum.dart' as $fixnum; +import 'package:protobuf/protobuf.dart' as $pb; + +class Revision extends $pb.GeneratedMessage { + static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'Revision', createEmptyInstance: create) + ..aInt64(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'baseRevId') + ..aInt64(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'revId') + ..a<$core.List<$core.int>>(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'delta', $pb.PbFieldType.OY) + ..aOS(4, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'md5') + ..hasRequiredFields = false + ; + + Revision._() : super(); + factory Revision({ + $fixnum.Int64? baseRevId, + $fixnum.Int64? revId, + $core.List<$core.int>? delta, + $core.String? md5, + }) { + final _result = create(); + if (baseRevId != null) { + _result.baseRevId = baseRevId; + } + if (revId != null) { + _result.revId = revId; + } + if (delta != null) { + _result.delta = delta; + } + if (md5 != null) { + _result.md5 = md5; + } + return _result; + } + factory Revision.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); + factory Revision.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); + @$core.Deprecated( + 'Using this can add significant overhead to your binary. ' + 'Use [GeneratedMessageGenericExtensions.deepCopy] instead. ' + 'Will be removed in next major version') + Revision clone() => Revision()..mergeFromMessage(this); + @$core.Deprecated( + 'Using this can add significant overhead to your binary. ' + 'Use [GeneratedMessageGenericExtensions.rebuild] instead. ' + 'Will be removed in next major version') + Revision copyWith(void Function(Revision) updates) => super.copyWith((message) => updates(message as Revision)) as Revision; // ignore: deprecated_member_use + $pb.BuilderInfo get info_ => _i; + @$core.pragma('dart2js:noInline') + static Revision create() => Revision._(); + Revision createEmptyInstance() => create(); + static $pb.PbList createRepeated() => $pb.PbList(); + @$core.pragma('dart2js:noInline') + static Revision getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); + static Revision? _defaultInstance; + + @$pb.TagNumber(1) + $fixnum.Int64 get baseRevId => $_getI64(0); + @$pb.TagNumber(1) + set baseRevId($fixnum.Int64 v) { $_setInt64(0, v); } + @$pb.TagNumber(1) + $core.bool hasBaseRevId() => $_has(0); + @$pb.TagNumber(1) + void clearBaseRevId() => clearField(1); + + @$pb.TagNumber(2) + $fixnum.Int64 get revId => $_getI64(1); + @$pb.TagNumber(2) + set revId($fixnum.Int64 v) { $_setInt64(1, v); } + @$pb.TagNumber(2) + $core.bool hasRevId() => $_has(1); + @$pb.TagNumber(2) + void clearRevId() => clearField(2); + + @$pb.TagNumber(3) + $core.List<$core.int> get delta => $_getN(2); + @$pb.TagNumber(3) + set delta($core.List<$core.int> v) { $_setBytes(2, v); } + @$pb.TagNumber(3) + $core.bool hasDelta() => $_has(2); + @$pb.TagNumber(3) + void clearDelta() => clearField(3); + + @$pb.TagNumber(4) + $core.String get md5 => $_getSZ(3); + @$pb.TagNumber(4) + set md5($core.String v) { $_setString(3, v); } + @$pb.TagNumber(4) + $core.bool hasMd5() => $_has(3); + @$pb.TagNumber(4) + void clearMd5() => clearField(4); +} + diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pbenum.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pbenum.dart new file mode 100644 index 0000000000..81d387d5be --- /dev/null +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pbenum.dart @@ -0,0 +1,7 @@ +/// +// Generated code. Do not modify. +// source: revision.proto +// +// @dart = 2.12 +// ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields + diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pbjson.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pbjson.dart new file mode 100644 index 0000000000..ef2e3b1275 --- /dev/null +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pbjson.dart @@ -0,0 +1,23 @@ +/// +// Generated code. Do not modify. +// source: revision.proto +// +// @dart = 2.12 +// ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields,deprecated_member_use_from_same_package + +import 'dart:core' as $core; +import 'dart:convert' as $convert; +import 'dart:typed_data' as $typed_data; +@$core.Deprecated('Use revisionDescriptor instead') +const Revision$json = const { + '1': 'Revision', + '2': const [ + const {'1': 'base_rev_id', '3': 1, '4': 1, '5': 3, '10': 'baseRevId'}, + const {'1': 'rev_id', '3': 2, '4': 1, '5': 3, '10': 'revId'}, + const {'1': 'delta', '3': 3, '4': 1, '5': 12, '10': 'delta'}, + const {'1': 'md5', '3': 4, '4': 1, '5': 9, '10': 'md5'}, + ], +}; + +/// Descriptor for `Revision`. Decode as a `google.protobuf.DescriptorProto`. +final $typed_data.Uint8List revisionDescriptor = $convert.base64Decode('CghSZXZpc2lvbhIeCgtiYXNlX3Jldl9pZBgBIAEoA1IJYmFzZVJldklkEhUKBnJldl9pZBgCIAEoA1IFcmV2SWQSFAoFZGVsdGEYAyABKAxSBWRlbHRhEhAKA21kNRgEIAEoCVIDbWQ1'); diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pbserver.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pbserver.dart new file mode 100644 index 0000000000..4797cc3361 --- /dev/null +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pbserver.dart @@ -0,0 +1,9 @@ +/// +// Generated code. Do not modify. +// source: revision.proto +// +// @dart = 2.12 +// ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields,deprecated_member_use_from_same_package + +export 'revision.pb.dart'; + diff --git a/rust-lib/flowy-database/migrations/2021-09-22-074638_flowy-doc-op/up.sql b/rust-lib/flowy-database/migrations/2021-09-22-074638_flowy-doc-op/up.sql index b7e0c538af..def7ef3ce5 100644 --- a/rust-lib/flowy-database/migrations/2021-09-22-074638_flowy-doc-op/up.sql +++ b/rust-lib/flowy-database/migrations/2021-09-22-074638_flowy-doc-op/up.sql @@ -1,7 +1,7 @@ -- Your SQL goes here CREATE TABLE op_table ( - base_rev BIGINT NOT NULL DEFAULT 0, - rev BIGINT NOT NULL PRIMARY KEY, + base_rev_id BIGINT NOT NULL DEFAULT 0, + rev_id BIGINT NOT NULL PRIMARY KEY, data BLOB NOT NULL DEFAULT (x''), md5 TEXT NOT NULL DEFAULT '', state INTEGER NOT NULL DEFAULT 0 diff --git a/rust-lib/flowy-database/src/schema.rs b/rust-lib/flowy-database/src/schema.rs index 81b52a1d29..2a003445e0 100644 --- a/rust-lib/flowy-database/src/schema.rs +++ b/rust-lib/flowy-database/src/schema.rs @@ -22,9 +22,9 @@ table! { } table! { - op_table (rev) { - base_rev -> BigInt, - rev -> BigInt, + op_table (rev_id) { + base_rev_id -> BigInt, + rev_id -> BigInt, data -> Binary, md5 -> Text, state -> Integer, diff --git a/rust-lib/flowy-derive/src/derive_cache/derive_cache.rs b/rust-lib/flowy-derive/src/derive_cache/derive_cache.rs index 95cc70d716..fbdde1ed88 100644 --- a/rust-lib/flowy-derive/src/derive_cache/derive_cache.rs +++ b/rust-lib/flowy-derive/src/derive_cache/derive_cache.rs @@ -60,6 +60,7 @@ pub fn category_from_str(type_str: &str) -> TypeCategory { | "UpdateDocParams" | "DocDelta" | "QueryDocParams" + | "Revision" | "WsDocumentData" | "DocError" | "FFIRequest" diff --git a/rust-lib/flowy-document/Cargo.toml b/rust-lib/flowy-document/Cargo.toml index 3f5ad07d94..9e0e056af1 100644 --- a/rust-lib/flowy-document/Cargo.toml +++ b/rust-lib/flowy-document/Cargo.toml @@ -35,6 +35,8 @@ url = "2.2" serde = { version = "1.0", features = ["derive"] } serde_json = {version = "1.0"} chrono = "0.4.19" +futures-core = { version = "0.3", default-features = false } +md5 = "0.7.0" [dev-dependencies] flowy-test = { path = "../flowy-test" } diff --git a/rust-lib/flowy-document/src/entities/doc/mod.rs b/rust-lib/flowy-document/src/entities/doc/mod.rs index 4d548bdd5e..3c607e690c 100644 --- a/rust-lib/flowy-document/src/entities/doc/mod.rs +++ b/rust-lib/flowy-document/src/entities/doc/mod.rs @@ -1,4 +1,6 @@ mod doc; pub mod parser; +mod revision; pub use doc::*; +pub use revision::*; diff --git a/rust-lib/flowy-document/src/entities/doc/revision.rs b/rust-lib/flowy-document/src/entities/doc/revision.rs new file mode 100644 index 0000000000..b89ca8ccc7 --- /dev/null +++ b/rust-lib/flowy-document/src/entities/doc/revision.rs @@ -0,0 +1,27 @@ +use flowy_derive::ProtoBuf; + +#[derive(Debug, Clone, Default, ProtoBuf)] +pub struct Revision { + #[pb(index = 1)] + pub base_rev_id: i64, + + #[pb(index = 2)] + pub rev_id: i64, + + #[pb(index = 3)] + pub delta: Vec, + + #[pb(index = 4)] + pub md5: String, +} + +impl Revision { + pub fn new(base_rev_id: i64, rev_id: i64, delta: Vec, md5: String) -> Revision { + Self { + base_rev_id, + rev_id, + delta, + md5, + } + } +} diff --git a/rust-lib/flowy-document/src/errors.rs b/rust-lib/flowy-document/src/errors.rs index 45d3015b57..bac02d51d1 100644 --- a/rust-lib/flowy-document/src/errors.rs +++ b/rust-lib/flowy-document/src/errors.rs @@ -104,6 +104,10 @@ impl std::convert::From for DocError { fn from(error: serde_json::Error) -> Self { DocError::internal().context(error) } } +impl std::convert::From for DocError { + fn from(e: protobuf::ProtobufError) -> Self { DocError::internal().context(e) } +} + // impl std::convert::From<::r2d2::Error> for DocError { // fn from(error: r2d2::Error) -> Self { // ErrorBuilder::new(ErrorCode::InternalError).error(error).build() } } diff --git a/rust-lib/flowy-document/src/module.rs b/rust-lib/flowy-document/src/module.rs index 040b6666e0..58aaa64ead 100644 --- a/rust-lib/flowy-document/src/module.rs +++ b/rust-lib/flowy-document/src/module.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use bytes::Bytes; + use diesel::SqliteConnection; use parking_lot::RwLock; diff --git a/rust-lib/flowy-document/src/protobuf/model/mod.rs b/rust-lib/flowy-document/src/protobuf/model/mod.rs index 92ccda63e6..eb8e589277 100644 --- a/rust-lib/flowy-document/src/protobuf/model/mod.rs +++ b/rust-lib/flowy-document/src/protobuf/model/mod.rs @@ -9,6 +9,9 @@ pub use observable::*; mod errors; pub use errors::*; +mod revision; +pub use revision::*; + mod event; pub use event::*; diff --git a/rust-lib/flowy-document/src/protobuf/model/revision.rs b/rust-lib/flowy-document/src/protobuf/model/revision.rs new file mode 100644 index 0000000000..e6bbf22838 --- /dev/null +++ b/rust-lib/flowy-document/src/protobuf/model/revision.rs @@ -0,0 +1,327 @@ +// This file is generated by rust-protobuf 2.22.1. Do not edit +// @generated + +// https://github.com/rust-lang/rust-clippy/issues/702 +#![allow(unknown_lints)] +#![allow(clippy::all)] + +#![allow(unused_attributes)] +#![cfg_attr(rustfmt, rustfmt::skip)] + +#![allow(box_pointers)] +#![allow(dead_code)] +#![allow(missing_docs)] +#![allow(non_camel_case_types)] +#![allow(non_snake_case)] +#![allow(non_upper_case_globals)] +#![allow(trivial_casts)] +#![allow(unused_imports)] +#![allow(unused_results)] +//! Generated file from `revision.proto` + +/// Generated files are compatible only with the same version +/// of protobuf runtime. +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_22_1; + +#[derive(PartialEq,Clone,Default)] +pub struct Revision { + // message fields + pub base_rev_id: i64, + pub rev_id: i64, + pub delta: ::std::vec::Vec, + pub md5: ::std::string::String, + // special fields + pub unknown_fields: ::protobuf::UnknownFields, + pub cached_size: ::protobuf::CachedSize, +} + +impl<'a> ::std::default::Default for &'a Revision { + fn default() -> &'a Revision { + ::default_instance() + } +} + +impl Revision { + pub fn new() -> Revision { + ::std::default::Default::default() + } + + // int64 base_rev_id = 1; + + + pub fn get_base_rev_id(&self) -> i64 { + self.base_rev_id + } + pub fn clear_base_rev_id(&mut self) { + self.base_rev_id = 0; + } + + // Param is passed by value, moved + pub fn set_base_rev_id(&mut self, v: i64) { + self.base_rev_id = v; + } + + // int64 rev_id = 2; + + + pub fn get_rev_id(&self) -> i64 { + self.rev_id + } + pub fn clear_rev_id(&mut self) { + self.rev_id = 0; + } + + // Param is passed by value, moved + pub fn set_rev_id(&mut self, v: i64) { + self.rev_id = v; + } + + // bytes delta = 3; + + + pub fn get_delta(&self) -> &[u8] { + &self.delta + } + pub fn clear_delta(&mut self) { + self.delta.clear(); + } + + // Param is passed by value, moved + pub fn set_delta(&mut self, v: ::std::vec::Vec) { + self.delta = v; + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_delta(&mut self) -> &mut ::std::vec::Vec { + &mut self.delta + } + + // Take field + pub fn take_delta(&mut self) -> ::std::vec::Vec { + ::std::mem::replace(&mut self.delta, ::std::vec::Vec::new()) + } + + // string md5 = 4; + + + pub fn get_md5(&self) -> &str { + &self.md5 + } + pub fn clear_md5(&mut self) { + self.md5.clear(); + } + + // Param is passed by value, moved + pub fn set_md5(&mut self, v: ::std::string::String) { + self.md5 = v; + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_md5(&mut self) -> &mut ::std::string::String { + &mut self.md5 + } + + // Take field + pub fn take_md5(&mut self) -> ::std::string::String { + ::std::mem::replace(&mut self.md5, ::std::string::String::new()) + } +} + +impl ::protobuf::Message for Revision { + fn is_initialized(&self) -> bool { + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> { + while !is.eof()? { + let (field_number, wire_type) = is.read_tag_unpack()?; + match field_number { + 1 => { + if wire_type != ::protobuf::wire_format::WireTypeVarint { + return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); + } + let tmp = is.read_int64()?; + self.base_rev_id = tmp; + }, + 2 => { + if wire_type != ::protobuf::wire_format::WireTypeVarint { + return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); + } + let tmp = is.read_int64()?; + self.rev_id = tmp; + }, + 3 => { + ::protobuf::rt::read_singular_proto3_bytes_into(wire_type, is, &mut self.delta)?; + }, + 4 => { + ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.md5)?; + }, + _ => { + ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u32 { + let mut my_size = 0; + if self.base_rev_id != 0 { + my_size += ::protobuf::rt::value_size(1, self.base_rev_id, ::protobuf::wire_format::WireTypeVarint); + } + if self.rev_id != 0 { + my_size += ::protobuf::rt::value_size(2, self.rev_id, ::protobuf::wire_format::WireTypeVarint); + } + if !self.delta.is_empty() { + my_size += ::protobuf::rt::bytes_size(3, &self.delta); + } + if !self.md5.is_empty() { + my_size += ::protobuf::rt::string_size(4, &self.md5); + } + my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); + self.cached_size.set(my_size); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { + if self.base_rev_id != 0 { + os.write_int64(1, self.base_rev_id)?; + } + if self.rev_id != 0 { + os.write_int64(2, self.rev_id)?; + } + if !self.delta.is_empty() { + os.write_bytes(3, &self.delta)?; + } + if !self.md5.is_empty() { + os.write_string(4, &self.md5)?; + } + os.write_unknown_fields(self.get_unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn get_cached_size(&self) -> u32 { + self.cached_size.get() + } + + fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { + &self.unknown_fields + } + + fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { + &mut self.unknown_fields + } + + fn as_any(&self) -> &dyn (::std::any::Any) { + self as &dyn (::std::any::Any) + } + fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { + self as &mut dyn (::std::any::Any) + } + fn into_any(self: ::std::boxed::Box) -> ::std::boxed::Box { + self + } + + fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { + Self::descriptor_static() + } + + fn new() -> Revision { + Revision::new() + } + + fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { + static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT; + descriptor.get(|| { + let mut fields = ::std::vec::Vec::new(); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeInt64>( + "base_rev_id", + |m: &Revision| { &m.base_rev_id }, + |m: &mut Revision| { &mut m.base_rev_id }, + )); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeInt64>( + "rev_id", + |m: &Revision| { &m.rev_id }, + |m: &mut Revision| { &mut m.rev_id }, + )); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>( + "delta", + |m: &Revision| { &m.delta }, + |m: &mut Revision| { &mut m.delta }, + )); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( + "md5", + |m: &Revision| { &m.md5 }, + |m: &mut Revision| { &mut m.md5 }, + )); + ::protobuf::reflect::MessageDescriptor::new_pb_name::( + "Revision", + fields, + file_descriptor_proto() + ) + }) + } + + fn default_instance() -> &'static Revision { + static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; + instance.get(Revision::new) + } +} + +impl ::protobuf::Clear for Revision { + fn clear(&mut self) { + self.base_rev_id = 0; + self.rev_id = 0; + self.delta.clear(); + self.md5.clear(); + self.unknown_fields.clear(); + } +} + +impl ::std::fmt::Debug for Revision { + fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for Revision { + fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { + ::protobuf::reflect::ReflectValueRef::Message(self) + } +} + +static file_descriptor_proto_data: &'static [u8] = b"\ + \n\x0erevision.proto\"i\n\x08Revision\x12\x1e\n\x0bbase_rev_id\x18\x01\ + \x20\x01(\x03R\tbaseRevId\x12\x15\n\x06rev_id\x18\x02\x20\x01(\x03R\x05r\ + evId\x12\x14\n\x05delta\x18\x03\x20\x01(\x0cR\x05delta\x12\x10\n\x03md5\ + \x18\x04\x20\x01(\tR\x03md5J\x86\x02\n\x06\x12\x04\0\0\x06\x01\n\x08\n\ + \x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x01\0\x06\x01\n\n\n\x03\ + \x04\0\x01\x12\x03\x01\x08\x10\n\x0b\n\x04\x04\0\x02\0\x12\x03\x02\x04\ + \x1a\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x02\x04\t\n\x0c\n\x05\x04\0\x02\ + \0\x01\x12\x03\x02\n\x15\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x02\x18\x19\ + \n\x0b\n\x04\x04\0\x02\x01\x12\x03\x03\x04\x15\n\x0c\n\x05\x04\0\x02\x01\ + \x05\x12\x03\x03\x04\t\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x03\n\x10\n\ + \x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x03\x13\x14\n\x0b\n\x04\x04\0\x02\ + \x02\x12\x03\x04\x04\x14\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x04\x04\t\ + \n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x04\n\x0f\n\x0c\n\x05\x04\0\x02\ + \x02\x03\x12\x03\x04\x12\x13\n\x0b\n\x04\x04\0\x02\x03\x12\x03\x05\x04\ + \x13\n\x0c\n\x05\x04\0\x02\x03\x05\x12\x03\x05\x04\n\n\x0c\n\x05\x04\0\ + \x02\x03\x01\x12\x03\x05\x0b\x0e\n\x0c\n\x05\x04\0\x02\x03\x03\x12\x03\ + \x05\x11\x12b\x06proto3\ +"; + +static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; + +fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto { + ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap() +} + +pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto { + file_descriptor_proto_lazy.get(|| { + parse_descriptor_proto() + }) +} diff --git a/rust-lib/flowy-document/src/protobuf/proto/revision.proto b/rust-lib/flowy-document/src/protobuf/proto/revision.proto new file mode 100644 index 0000000000..1b88b1bf0d --- /dev/null +++ b/rust-lib/flowy-document/src/protobuf/proto/revision.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; +message Revision { + int64 base_rev_id = 1; + int64 rev_id = 2; + bytes delta = 3; + string md5 = 4; +} diff --git a/rust-lib/flowy-document/src/services/cache.rs b/rust-lib/flowy-document/src/services/cache.rs index 84df313353..abf5abb612 100644 --- a/rust-lib/flowy-document/src/services/cache.rs +++ b/rust-lib/flowy-document/src/services/cache.rs @@ -1,19 +1,10 @@ -use std::{convert::TryInto, fmt::Debug, sync::Arc}; +use std::sync::Arc; -use bytes::Bytes; use dashmap::DashMap; -use parking_lot::RwLock; - -use flowy_database::ConnectionPool; -use flowy_ot::{core::Delta, errors::OTError}; use crate::{ - entities::doc::Doc, errors::DocError, - services::{ - doc::edit_context::{DocId, EditDocContext}, - ws::WsManager, - }, + services::doc::edit_context::{DocId, EditDocContext}, }; pub(crate) struct DocCache { diff --git a/rust-lib/flowy-document/src/services/doc/doc_controller.rs b/rust-lib/flowy-document/src/services/doc/doc_controller.rs index d41e58bedc..e87aacef8a 100644 --- a/rust-lib/flowy-document/src/services/doc/doc_controller.rs +++ b/rust-lib/flowy-document/src/services/doc/doc_controller.rs @@ -2,19 +2,15 @@ use crate::{ entities::doc::{CreateDocParams, Doc, DocDelta, QueryDocParams, UpdateDocParams}, errors::{internal_error, DocError}, module::DocumentUser, - services::{ - cache::DocCache, - doc::edit_context::{DocId, EditDocContext, EditDocPersistence}, - server::Server, - ws::WsManager, - }, - sql_tables::doc::{DocTable, DocTableChangeset, DocTableSql, OpTableSql}, + services::{cache::DocCache, doc::edit_context::EditDocContext, server::Server, ws::WsManager}, + sql_tables::doc::{DocTable, DocTableSql, OpTableSql}, }; use bytes::Bytes; use flowy_database::{ConnectionPool, SqliteConnection}; +use flowy_infra::future::ClosureFuture; + use parking_lot::RwLock; use std::sync::Arc; -use tokio::task::JoinHandle; pub(crate) struct DocController { server: Server, @@ -107,29 +103,17 @@ impl DocController { } #[tracing::instrument(level = "debug", skip(self, pool), err)] - fn read_doc_from_server( - &self, - params: QueryDocParams, - pool: Arc, - ) -> Result, DocError>>, DocError> { + async fn read_doc_from_server(&self, params: QueryDocParams, pool: Arc) -> Result, DocError> { let token = self.user.token()?; - let server = self.server.clone(); - let doc_sql = self.doc_sql.clone(); - let op_sql = self.op_sql.clone(); - let ws = self.ws.clone(); - let cache = self.cache.clone(); - - Ok(tokio::spawn(async move { - match server.read_doc(&token, params).await? { - None => Err(DocError::not_found()), - Some(doc) => { - let doc_table = DocTable::new(doc.clone()); - let _ = doc_sql.create_doc_table(doc_table, &*(pool.get().map_err(internal_error)?))?; - let edit_doc_ctx = make_edit_context(ws, cache, op_sql, doc)?; - Ok(edit_doc_ctx) - }, - } - })) + match self.server.read_doc(&token, params).await? { + None => Err(DocError::not_found()), + Some(doc) => { + let edit = self.make_edit_context(doc.clone())?; + let conn = &*(pool.get().map_err(internal_error)?); + let _ = self.doc_sql.create_doc_table(doc.into(), conn)?; + Ok(edit) + }, + } } #[tracing::instrument(level = "debug", skip(self), err)] @@ -149,44 +133,26 @@ impl DocController { } async fn _open(&self, params: QueryDocParams, pool: Arc) -> Result, DocError> { - match self.doc_sql.read_doc_table(¶ms.doc_id, &*(pool.get().map_err(internal_error)?)) { - Ok(doc_table) => { - let edit_doc_ctx = make_edit_context(self.ws.clone(), self.cache.clone(), self.op_sql.clone(), doc_table.into())?; - Ok(edit_doc_ctx) - }, + match self.doc_sql.read_doc_table(¶ms.doc_id, pool.clone()) { + Ok(doc_table) => Ok(self.make_edit_context(doc_table.into())?), Err(error) => { if error.is_record_not_found() { log::debug!("Doc:{} don't exist, reading from server", params.doc_id); - match self.read_doc_from_server(params, pool)?.await.map_err(internal_error)? { - Ok(edit_doc_ctx) => Ok(edit_doc_ctx), - Err(error) => Err(error), - } + Ok(self.read_doc_from_server(params, pool.clone()).await?) } else { return Err(error); } }, } } -} -fn make_edit_context( - ws: Arc>, - cache: Arc, - op_sql: Arc, - doc: Doc, -) -> Result, DocError> { - // Opti: require upgradable_read lock and then upgrade to write lock using - // RwLockUpgradableReadGuard::upgrade(xx) - let edit_doc_ctx = Arc::new(EditDocContext::new(doc, ws.read().sender.clone(), op_sql)?); - ws.write().register_handler(edit_doc_ctx.id.as_ref(), edit_doc_ctx.clone()); - cache.set(edit_doc_ctx.clone()); - Ok(edit_doc_ctx) -} - -impl EditDocPersistence for DocController { - fn save(&self, params: UpdateDocParams, pool: Arc) -> Result<(), DocError> { - let changeset = DocTableChangeset::new(params.clone()); - let _ = self.doc_sql.update_doc_table(changeset, &*(pool.get().map_err(internal_error)?))?; - Ok(()) + fn make_edit_context(&self, doc: Doc) -> Result, DocError> { + // Opti: require upgradable_read lock and then upgrade to write lock using + // RwLockUpgradableReadGuard::upgrade(xx) of ws + let sender = self.ws.read().sender.clone(); + let edit_ctx = Arc::new(EditDocContext::new(doc, sender, self.op_sql.clone())?); + self.ws.write().register_handler(edit_ctx.id.as_ref(), edit_ctx.clone()); + self.cache.set(edit_ctx.clone()); + Ok(edit_ctx) } } diff --git a/rust-lib/flowy-document/src/services/doc/document/document.rs b/rust-lib/flowy-document/src/services/doc/document/document.rs index 081ed43bc5..4d64a53c46 100644 --- a/rust-lib/flowy-document/src/services/doc/document/document.rs +++ b/rust-lib/flowy-document/src/services/doc/document/document.rs @@ -4,7 +4,6 @@ use crate::{ }; use bytes::Bytes; use flowy_ot::core::*; -use std::convert::TryInto; pub trait DocumentData { fn into_string(self) -> Result; @@ -24,24 +23,10 @@ impl CustomDocument for FlowyDoc { fn init_delta() -> Delta { DeltaBuilder::new().insert("\n").build() } } -#[derive(Debug, Clone)] -pub struct RevId(pub usize); - -#[derive(Debug, Clone)] -pub struct Revision { - rev_id: RevId, - pub delta: Delta, -} - -impl Revision { - pub fn new(rev_id: RevId, delta: Delta) -> Revision { Self { rev_id, delta } } -} - pub struct Document { delta: Delta, history: History, view: View, - rev_id_counter: usize, last_edit_time: usize, } @@ -53,7 +38,6 @@ impl Document { delta, history: History::new(), view: View::new(), - rev_id_counter: 1, last_edit_time: 0, } } @@ -67,16 +51,11 @@ impl Document { pub fn to_bytes(&self) -> Vec { self.delta.clone().into_bytes() } - pub fn to_string(&self) -> String { self.delta.apply("").unwrap() } + pub fn to_plain_string(&self) -> String { self.delta.apply("").unwrap() } pub fn apply_delta(&mut self, data: Bytes) -> Result<(), DocError> { let new_delta = Delta::from_bytes(data.to_vec())?; - log::debug!("Apply delta: {}", new_delta); - - let rev_id = self.next_rev_id(); - let revision = Revision::new(rev_id, new_delta.clone()); - let _ = self.add_delta(&new_delta)?; log::debug!("Document: {}", self.to_json()); Ok(()) @@ -173,7 +152,6 @@ impl Document { fn add_delta(&mut self, delta: &Delta) -> Result<(), DocError> { let composed_delta = self.delta.compose(delta)?; let mut undo_delta = delta.invert(&self.delta); - self.rev_id_counter += 1; let now = chrono::Utc::now().timestamp_millis() as usize; if now - self.last_edit_time < RECORD_THRESHOLD { @@ -206,9 +184,6 @@ impl Document { let inverted_delta = change.invert(&self.delta); Ok((new_delta, inverted_delta)) } - - #[allow(dead_code)] - fn next_rev_id(&self) -> RevId { RevId(self.rev_id_counter) } } fn validate_interval(delta: &Delta, interval: &Interval) -> Result<(), DocError> { diff --git a/rust-lib/flowy-document/src/services/doc/edit_context.rs b/rust-lib/flowy-document/src/services/doc/edit_context.rs index 617b1d2d53..b14d5e02e5 100644 --- a/rust-lib/flowy-document/src/services/doc/edit_context.rs +++ b/rust-lib/flowy-document/src/services/doc/edit_context.rs @@ -1,40 +1,30 @@ use crate::{ entities::{ - doc::{Doc, UpdateDocParams}, + doc::{Doc, Revision}, ws::{WsDocumentData, WsSource}, }, - errors::DocError, + errors::{internal_error, DocError}, services::{ doc::Document, ws::{WsHandler, WsSender}, }, - sql_tables::doc::OpTableSql, + sql_tables::doc::{OpState, OpTable, OpTableSql}, }; use bytes::Bytes; use flowy_database::ConnectionPool; use flowy_ot::core::Delta; -use parking_lot::RwLock; -use std::{convert::TryInto, sync::Arc}; - -#[derive(Debug, Clone, Eq, PartialEq, Hash)] -pub struct DocId(pub(crate) String); -impl AsRef for DocId { - fn as_ref(&self) -> &str { &self.0 } -} -impl std::convert::From for DocId -where - T: ToString, -{ - fn from(s: T) -> Self { DocId(s.to_string()) } -} - -pub(crate) trait EditDocPersistence: Send + Sync { - fn save(&self, params: UpdateDocParams, pool: Arc) -> Result<(), DocError>; -} +use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock}; +use std::{ + convert::TryInto, + sync::{ + atomic::{AtomicI64, AtomicUsize, Ordering::SeqCst}, + Arc, + }, +}; pub(crate) struct EditDocContext { pub(crate) id: DocId, - pub(crate) revision: i64, + pub(crate) rev_counter: RevCounter, document: RwLock, ws_sender: Arc, op_sql: Arc, @@ -43,13 +33,13 @@ pub(crate) struct EditDocContext { impl EditDocContext { pub(crate) fn new(doc: Doc, ws_sender: Arc, op_sql: Arc) -> Result { let id: DocId = doc.id.into(); - let revision = doc.revision; + let rev_counter = RevCounter::new(doc.revision); let delta: Delta = doc.data.try_into()?; let document = RwLock::new(Document::from_delta(delta)); Ok(Self { id, - revision, + rev_counter, document, ws_sender, op_sql, @@ -58,33 +48,57 @@ impl EditDocContext { pub(crate) fn doc(&self) -> Doc { Doc { - id: self.id.0.clone(), + id: self.id.clone().into(), data: self.document.read().to_bytes(), - revision: self.revision, + revision: self.rev_counter.value(), } } pub(crate) fn apply_delta(&self, data: Bytes, pool: Arc) -> Result<(), DocError> { - let mut write_guard = self.document.write(); - let _ = write_guard.apply_delta(data.clone())?; + let mut guard = self.document.write(); + let base_rev_id = self.rev_counter.value(); + let rev_id = self.rev_counter.next(); + let _ = guard.apply_delta(data.clone())?; + let json = guard.to_json(); + drop(guard); - match self.ws_sender.send_data(data) { - Ok(_) => {}, + // Opti: it is necessary to save the rev if send success? + let md5 = format!("{:x}", md5::compute(json)); + let revision = Revision::new(base_rev_id, rev_id, data.to_vec(), md5); + self.save_revision(revision.clone(), pool.clone()); + match self.ws_sender.send_data(revision.try_into()?) { + Ok(_) => { + // TODO: remove the rev if send success + // let _ = self.delete_revision(rev_id, pool)?; + }, Err(e) => { - // TODO: save to local and retry log::error!("Send delta failed: {:?}", e); }, } + Ok(()) + } +} - // Opti: strategy to save the document - let save = UpdateDocParams { - doc_id: self.id.0.clone(), - data: write_guard.to_bytes(), - }; - // let _ = self.persistence.save(save, pool)?; +impl EditDocContext { + fn save_revision(&self, revision: Revision, pool: Arc) -> Result<(), DocError> { + let conn = &*pool.get().map_err(internal_error)?; + conn.immediate_transaction::<_, DocError, _>(|| { + let op_table: OpTable = revision.into(); + let _ = self.op_sql.create_op_table(op_table, conn)?; + Ok(()) + })?; Ok(()) } + + fn delete_revision(&self, rev_id: i64, pool: Arc) -> Result<(), DocError> { + let conn = &*pool.get().map_err(internal_error)?; + conn.immediate_transaction::<_, DocError, _>(|| { + let _ = self.op_sql.delete_op_table(rev_id, conn)?; + Ok(()) + })?; + Ok(()) + } } impl WsHandler for EditDocContext { @@ -94,3 +108,33 @@ impl WsHandler for EditDocContext { } } } + +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +pub struct DocId(pub(crate) String); + +impl AsRef for DocId { + fn as_ref(&self) -> &str { &self.0 } +} + +impl std::convert::From for DocId +where + T: ToString, +{ + fn from(s: T) -> Self { DocId(s.to_string()) } +} + +impl std::convert::Into for DocId { + fn into(self) -> String { self.0.clone() } +} + +#[derive(Debug)] +pub struct RevCounter(pub AtomicI64); + +impl RevCounter { + pub fn new(n: i64) -> Self { Self(AtomicI64::new(n)) } + pub fn next(&self) -> i64 { + let _ = self.0.fetch_add(1, SeqCst); + self.value() + } + pub fn value(&self) -> i64 { self.0.load(SeqCst) } +} diff --git a/rust-lib/flowy-document/src/sql_tables/doc/doc_op_sql.rs b/rust-lib/flowy-document/src/sql_tables/doc/doc_op_sql.rs index de0876b365..312714ef6f 100644 --- a/rust-lib/flowy-document/src/sql_tables/doc/doc_op_sql.rs +++ b/rust-lib/flowy-document/src/sql_tables/doc/doc_op_sql.rs @@ -17,7 +17,7 @@ impl OpTableSql { } pub(crate) fn update_op_table(&self, changeset: OpChangeset, conn: &SqliteConnection) -> Result<(), DocError> { - let filter = dsl::op_table.filter(op_table::dsl::rev.eq(changeset.rev)); + let filter = dsl::op_table.filter(op_table::dsl::rev_id.eq(changeset.rev_id)); let affected_row = diesel::update(filter).set(changeset).execute(conn)?; debug_assert_eq!(affected_row, 1); Ok(()) @@ -28,8 +28,8 @@ impl OpTableSql { Ok(ops) } - pub(crate) fn delete_op_table(&self, rev: i64, conn: &SqliteConnection) -> Result<(), DocError> { - let filter = dsl::op_table.filter(op_table::dsl::rev.eq(rev)); + pub(crate) fn delete_op_table(&self, rev_id: i64, conn: &SqliteConnection) -> Result<(), DocError> { + let filter = dsl::op_table.filter(op_table::dsl::rev_id.eq(rev_id)); let affected_row = diesel::delete(filter).execute(conn)?; debug_assert_eq!(affected_row, 1); Ok(()) diff --git a/rust-lib/flowy-document/src/sql_tables/doc/doc_op_table.rs b/rust-lib/flowy-document/src/sql_tables/doc/doc_op_table.rs index b8888f04e7..0f8fb89a7e 100644 --- a/rust-lib/flowy-document/src/sql_tables/doc/doc_op_table.rs +++ b/rust-lib/flowy-document/src/sql_tables/doc/doc_op_table.rs @@ -1,12 +1,13 @@ +use crate::entities::doc::Revision; use diesel::sql_types::Integer; use flowy_database::schema::op_table; #[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)] #[table_name = "op_table"] -#[primary_key(rev)] +#[primary_key(rev_id)] pub(crate) struct OpTable { - pub(crate) base_rev: i64, - pub(crate) rev: i64, + pub(crate) base_rev_id: i64, + pub(crate) rev_id: i64, pub(crate) data: Vec, pub(crate) md5: String, pub(crate) state: OpState, @@ -47,8 +48,20 @@ impl_sql_integer_expression!(OpState); #[derive(AsChangeset, Identifiable, Default, Debug)] #[table_name = "op_table"] -#[primary_key(rev)] +#[primary_key(rev_id)] pub(crate) struct OpChangeset { - pub(crate) rev: i64, + pub(crate) rev_id: i64, pub(crate) state: Option, } + +impl std::convert::Into for Revision { + fn into(self) -> OpTable { + OpTable { + base_rev_id: self.base_rev_id, + rev_id: self.rev_id, + data: self.delta, + md5: self.md5, + state: OpState::Local, + } + } +} diff --git a/rust-lib/flowy-document/src/sql_tables/doc/doc_sql.rs b/rust-lib/flowy-document/src/sql_tables/doc/doc_sql.rs index 9e7f57e43e..a0d5077c24 100644 --- a/rust-lib/flowy-document/src/sql_tables/doc/doc_sql.rs +++ b/rust-lib/flowy-document/src/sql_tables/doc/doc_sql.rs @@ -1,12 +1,14 @@ use crate::{ - errors::DocError, + errors::{internal_error, DocError}, sql_tables::doc::{DocTable, DocTableChangeset}, }; use flowy_database::{ prelude::*, schema::{doc_table, doc_table::dsl}, + ConnectionPool, SqliteConnection, }; +use std::sync::Arc; pub struct DocTableSql {} @@ -21,7 +23,8 @@ impl DocTableSql { Ok(()) } - pub(crate) fn read_doc_table(&self, doc_id: &str, conn: &SqliteConnection) -> Result { + pub(crate) fn read_doc_table(&self, doc_id: &str, pool: Arc) -> Result { + let conn = &*pool.get().map_err(internal_error)?; let doc_table = dsl::doc_table.filter(doc_table::id.eq(doc_id)).first::(conn)?; Ok(doc_table) diff --git a/rust-lib/flowy-document/src/sql_tables/doc/doc_table.rs b/rust-lib/flowy-document/src/sql_tables/doc/doc_table.rs index ce4ea2daa8..b465536d9a 100644 --- a/rust-lib/flowy-document/src/sql_tables/doc/doc_table.rs +++ b/rust-lib/flowy-document/src/sql_tables/doc/doc_table.rs @@ -44,3 +44,13 @@ impl std::convert::Into for DocTable { } } } + +impl std::convert::From for DocTable { + fn from(doc: Doc) -> Self { + Self { + id: doc.id, + data: doc.data, + revision: doc.revision, + } + } +} diff --git a/rust-lib/flowy-document/tests/editor/mod.rs b/rust-lib/flowy-document/tests/editor/mod.rs index d8ff30d1c6..5891d0e884 100644 --- a/rust-lib/flowy-document/tests/editor/mod.rs +++ b/rust-lib/flowy-document/tests/editor/mod.rs @@ -174,7 +174,7 @@ impl TestBuilder { std::thread::sleep(Duration::from_millis(*mills_sec as u64)); }, TestOp::AssertStr(delta_i, expected) => { - assert_eq!(&self.documents[*delta_i].to_string(), expected); + assert_eq!(&self.documents[*delta_i].to_plain_string(), expected); }, TestOp::AssertOpsJson(delta_i, expected) => { @@ -199,10 +199,6 @@ impl TestBuilder { } } -pub fn debug_print_delta(delta: &Delta) { - eprintln!("😁 {}", serde_json::to_string(delta).unwrap()); -} - pub struct Rng(StdRng); impl Default for Rng { @@ -210,6 +206,7 @@ impl Default for Rng { } impl Rng { + #[allow(dead_code)] pub fn from_seed(seed: [u8; 32]) -> Self { Rng(StdRng::from_seed(seed)) } pub fn gen_string(&mut self, len: usize) -> String { (0..len).map(|_| self.0.gen::()).collect() } diff --git a/rust-lib/flowy-infra/src/future.rs b/rust-lib/flowy-infra/src/future.rs index 73778ffa26..63363b102d 100644 --- a/rust-lib/flowy-infra/src/future.rs +++ b/rust-lib/flowy-infra/src/future.rs @@ -8,12 +8,12 @@ use std::{ }; #[pin_project] -pub struct RequestFuture { +pub struct ClosureFuture { #[pin] pub fut: Pin + Sync + Send>>, } -impl Future for RequestFuture +impl Future for ClosureFuture where T: Send + Sync, { diff --git a/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs b/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs index 1015634e2b..ecc97972fe 100644 --- a/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs +++ b/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs @@ -5,7 +5,7 @@ use flowy_document::{ prelude::{WsManager, WsSender, WS_ID}, }; -use flowy_user::services::user::UserSession; +use flowy_user::{errors::ErrorCode, services::user::UserSession}; use flowy_ws::{WsMessage, WsMessageHandler}; use parking_lot::RwLock; use std::{path::Path, sync::Arc}; @@ -51,9 +51,19 @@ impl DocumentUser for DocumentUserImpl { Ok(doc_dir) } - fn user_id(&self) -> Result { self.user.user_id().map_err(|e| DocError::internal().context(e)) } + fn user_id(&self) -> Result { + self.user.user_id().map_err(|e| match e.code { + ErrorCode::InternalError => DocError::internal().context(e.msg), + _ => DocError::internal().context(e), + }) + } - fn token(&self) -> Result { self.user.token().map_err(|e| DocError::internal().context(e)) } + fn token(&self) -> Result { + self.user.token().map_err(|e| match e.code { + ErrorCode::InternalError => DocError::internal().context(e.msg), + _ => DocError::internal().context(e), + }) + } } struct WsSenderImpl { diff --git a/rust-lib/flowy-sqlite/src/pool.rs b/rust-lib/flowy-sqlite/src/pool.rs index a4a645249d..8522687b83 100644 --- a/rust-lib/flowy-sqlite/src/pool.rs +++ b/rust-lib/flowy-sqlite/src/pool.rs @@ -49,32 +49,6 @@ impl ConnectionPool { } } -#[derive(Default, Debug, Clone)] -pub struct ConnCounter(Arc); - -impl std::ops::Deref for ConnCounter { - type Target = ConnCounterInner; - - fn deref(&self) -> &Self::Target { &*self.0 } -} - -#[derive(Default, Debug)] -pub struct ConnCounterInner { - max_number: AtomicUsize, - current_number: AtomicUsize, -} - -impl ConnCounterInner { - pub fn get_max_num(&self) -> usize { self.max_number.load(SeqCst) } - - pub fn reset(&self) { - // reset max_number to current_number - let _ = self - .max_number - .fetch_update(SeqCst, SeqCst, |_| Some(self.current_number.load(SeqCst))); - } -} - pub type OnExecFunc = Box Box + Send + Sync>; pub struct PoolConfig { diff --git a/rust-lib/flowy-user/src/errors.rs b/rust-lib/flowy-user/src/errors.rs index 6686f3101b..2443eef466 100644 --- a/rust-lib/flowy-user/src/errors.rs +++ b/rust-lib/flowy-user/src/errors.rs @@ -110,7 +110,12 @@ impl std::convert::From<::r2d2::Error> for UserError { } impl std::convert::From for UserError { - fn from(error: flowy_ws::errors::WsError) -> Self { UserError::internal().context(error) } + fn from(error: flowy_ws::errors::WsError) -> Self { + match error.code { + flowy_ws::errors::ErrorCode::InternalError => UserError::internal().context(error.msg), + _ => UserError::internal().context(error), + } + } } // use diesel::result::{Error, DatabaseErrorKind}; diff --git a/rust-lib/flowy-workspace/src/sql_tables/app/app_table.rs b/rust-lib/flowy-workspace/src/sql_tables/app/app_table.rs index 4e65067f95..cde5d06671 100644 --- a/rust-lib/flowy-workspace/src/sql_tables/app/app_table.rs +++ b/rust-lib/flowy-workspace/src/sql_tables/app/app_table.rs @@ -3,7 +3,6 @@ use crate::{ app::{App, ColorStyle, UpdateAppParams}, view::RepeatedView, }, - impl_sql_binary_expression, sql_tables::workspace::WorkspaceTable, }; use diesel::sql_types::Binary; diff --git a/rust-lib/flowy-workspace/src/sql_tables/view/view_table.rs b/rust-lib/flowy-workspace/src/sql_tables/view/view_table.rs index 8aef18a34c..69bea0f907 100644 --- a/rust-lib/flowy-workspace/src/sql_tables/view/view_table.rs +++ b/rust-lib/flowy-workspace/src/sql_tables/view/view_table.rs @@ -1,6 +1,5 @@ use crate::{ entities::view::{RepeatedView, UpdateViewParams, View, ViewType}, - impl_sql_integer_expression, sql_tables::app::AppTable, }; use diesel::sql_types::Integer; diff --git a/rust-lib/flowy-ws/src/errors.rs b/rust-lib/flowy-ws/src/errors.rs index 77bb360cd1..9064dda60f 100644 --- a/rust-lib/flowy-ws/src/errors.rs +++ b/rust-lib/flowy-ws/src/errors.rs @@ -8,10 +8,10 @@ use url::ParseError; #[derive(Debug, Default, Clone, ProtoBuf)] pub struct WsError { #[pb(index = 1)] - code: ErrorCode, + pub code: ErrorCode, #[pb(index = 2)] - msg: String, + pub msg: String, } macro_rules! static_user_error {