diff --git a/common/alloc/buffer.go b/common/alloc/buffer.go index 3afab300e..6e879db46 100644 --- a/common/alloc/buffer.go +++ b/common/alloc/buffer.go @@ -95,6 +95,10 @@ func (b *Buffer) Len() int { return len(b.Value) } +func (b *Buffer) IsEmpty() bool { + return b.Len() == 0 +} + // IsFull returns true if the buffer has no more room to grow. func (b *Buffer) IsFull() bool { return len(b.Value) == cap(b.Value) @@ -120,6 +124,14 @@ func (b *Buffer) Read(data []byte) (int, error) { return nBytes, nil } +func (b *Buffer) FillFrom(reader io.Reader) (int, error) { + begin := b.Len() + b.Value = b.Value[:cap(b.Value)] + nBytes, err := reader.Read(b.Value[begin:]) + b.Value = b.Value[:begin+nBytes] + return nBytes, err +} + type bufferPool struct { chain chan []byte allocator *sync.Pool diff --git a/common/io/buffered_reader.go b/common/io/buffered_reader.go new file mode 100644 index 000000000..9c08ace15 --- /dev/null +++ b/common/io/buffered_reader.go @@ -0,0 +1,54 @@ +package io + +import ( + "io" + + "github.com/v2ray/v2ray-core/common/alloc" +) + +type BufferedReader struct { + reader io.Reader + buffer *alloc.Buffer + cached bool +} + +func NewBufferedReader(rawReader io.Reader) *BufferedReader { + return &BufferedReader{ + reader: rawReader, + buffer: alloc.NewBuffer().Clear(), + cached: true, + } +} + +func (this *BufferedReader) Release() { + this.buffer.Release() +} + +func (this *BufferedReader) Cached() bool { + return this.cached +} + +func (this *BufferedReader) SetCached(cached bool) { + this.cached = cached +} + +func (this *BufferedReader) Read(b []byte) (int, error) { + if !this.cached { + if !this.buffer.IsEmpty() { + return this.buffer.Read(b) + } + return this.reader.Read(b) + } + if this.buffer.IsEmpty() { + _, err := this.buffer.FillFrom(this.reader) + if err != nil { + return 0, err + } + } + + if this.buffer.IsEmpty() { + return 0, nil + } + + return this.buffer.Read(b) +} diff --git a/common/io/buffered_reader_test.go b/common/io/buffered_reader_test.go new file mode 100644 index 000000000..3616ab1a8 --- /dev/null +++ b/common/io/buffered_reader_test.go @@ -0,0 +1,44 @@ +package io_test + +import ( + "testing" + + "github.com/v2ray/v2ray-core/common/alloc" + . "github.com/v2ray/v2ray-core/common/io" + v2testing "github.com/v2ray/v2ray-core/testing" + "github.com/v2ray/v2ray-core/testing/assert" +) + +func TestBufferedReader(t *testing.T) { + v2testing.Current(t) + + content := alloc.NewLargeBuffer() + len := content.Len() + + reader := NewBufferedReader(content) + assert.Bool(reader.Cached()).IsTrue() + + payload := make([]byte, 16) + + nBytes, err := reader.Read(payload) + assert.Int(nBytes).Equals(16) + assert.Error(err).IsNil() + + len2 := content.Len() + assert.Int(len - len2).GreaterThan(16) + + nBytes, err = reader.Read(payload) + assert.Int(nBytes).Equals(16) + assert.Error(err).IsNil() + + assert.Int(content.Len()).Equals(len2) + reader.SetCached(false) + + payload2 := alloc.NewBuffer() + reader.Read(payload2.Value) + + assert.Int(content.Len()).Equals(len2) + + reader.Read(payload2.Value) + assert.Int(content.Len()).LessThan(len2) +} diff --git a/common/io/buffered_writer.go b/common/io/buffered_writer.go new file mode 100644 index 000000000..f150200bc --- /dev/null +++ b/common/io/buffered_writer.go @@ -0,0 +1,63 @@ +package io + +import ( + "io" + + "github.com/v2ray/v2ray-core/common/alloc" +) + +type BufferedWriter struct { + writer io.Writer + buffer *alloc.Buffer + cached bool +} + +func NewBufferedWriter(rawWriter io.Writer) *BufferedWriter { + return &BufferedWriter{ + writer: rawWriter, + buffer: alloc.NewBuffer().Clear(), + cached: true, + } +} + +func (this *BufferedWriter) Write(b []byte) (int, error) { + if !this.cached { + return this.writer.Write(b) + } + nBytes, _ := this.buffer.Write(b) + if this.buffer.IsFull() { + err := this.flush() + if err != nil { + return nBytes, err + } + } + return nBytes, nil +} + +func (this *BufferedWriter) flush() error { + nBytes, err := this.writer.Write(this.buffer.Value) + this.buffer.SliceFrom(nBytes) + if !this.buffer.IsEmpty() { + nBytes, err = this.writer.Write(this.buffer.Value) + this.buffer.SliceFrom(nBytes) + } + if this.buffer.IsEmpty() { + this.buffer.Clear() + } + return err +} + +func (this *BufferedWriter) Cached() bool { + return this.cached +} + +func (this *BufferedWriter) SetCached(cached bool) { + this.cached = cached + if !cached && !this.buffer.IsEmpty() { + this.flush() + } +} + +func (this *BufferedWriter) Release() { + this.buffer.Release() +} diff --git a/common/io/buffered_writer_test.go b/common/io/buffered_writer_test.go new file mode 100644 index 000000000..ae368e26a --- /dev/null +++ b/common/io/buffered_writer_test.go @@ -0,0 +1,30 @@ +package io_test + +import ( + "testing" + + "github.com/v2ray/v2ray-core/common/alloc" + . "github.com/v2ray/v2ray-core/common/io" + v2testing "github.com/v2ray/v2ray-core/testing" + "github.com/v2ray/v2ray-core/testing/assert" +) + +func TestBufferedWriter(t *testing.T) { + v2testing.Current(t) + + content := alloc.NewLargeBuffer().Clear() + + writer := NewBufferedWriter(content) + assert.Bool(writer.Cached()).IsTrue() + + payload := make([]byte, 16) + + nBytes, err := writer.Write(payload) + assert.Int(nBytes).Equals(16) + assert.Error(err).IsNil() + + assert.Bool(content.IsEmpty()).IsTrue() + + writer.SetCached(false) + assert.Int(content.Len()).Equals(16) +}