diff --git a/common/alloc/buffer.go b/common/alloc/buffer.go index aa4e9ccbf..ee37fa02e 100644 --- a/common/alloc/buffer.go +++ b/common/alloc/buffer.go @@ -17,12 +17,12 @@ const ( // quickly. type Buffer struct { head []byte - pool *BufferPool + pool Pool Value []byte offset int } -func CreateBuffer(container []byte, parent *BufferPool) *Buffer { +func CreateBuffer(container []byte, parent Pool) *Buffer { b := new(Buffer) b.head = container b.pool = parent diff --git a/common/alloc/buffer_pool.go b/common/alloc/buffer_pool.go index 373ba89e9..3ec8ffaf4 100644 --- a/common/alloc/buffer_pool.go +++ b/common/alloc/buffer_pool.go @@ -4,6 +4,11 @@ import ( "sync" ) +type Pool interface { + Allocate() *Buffer + Free(*Buffer) +} + type BufferPool struct { chain chan []byte allocator *sync.Pool diff --git a/transport/internet/kcp/buffer.go b/transport/internet/kcp/buffer.go new file mode 100644 index 000000000..059934961 --- /dev/null +++ b/transport/internet/kcp/buffer.go @@ -0,0 +1,102 @@ +package kcp + +import ( + "sync" + + "github.com/v2ray/v2ray-core/common/alloc" +) + +const ( + NumDistro = 5 + DistroSize = 1600 +) + +type Buffer struct { + sync.Mutex + buffer *alloc.Buffer + + next int + released int + hold bool + distro [NumDistro]*alloc.Buffer +} + +func NewBuffer() *Buffer { + b := &Buffer{ + next: 0, + released: 0, + hold: true, + buffer: alloc.NewBuffer(), + } + for idx := range b.distro { + content := b.buffer.Value[idx*DistroSize : (idx+1)*DistroSize] + b.distro[idx] = alloc.CreateBuffer(content, b) + } + return b +} + +func (this *Buffer) IsEmpty() bool { + this.Lock() + defer this.Unlock() + + return this.next == NumDistro +} + +func (this *Buffer) Allocate() *alloc.Buffer { + this.Lock() + defer this.Unlock() + if this.next == NumDistro { + return nil + } + b := this.distro[this.next] + this.next++ + return b +} + +func (this *Buffer) Free(b *alloc.Buffer) { + this.Lock() + defer this.Unlock() + + this.released++ + if !this.hold && this.released == this.next { + this.ReleaseBuffer() + } +} + +func (this *Buffer) Release() { + this.Lock() + defer this.Unlock() + + if this.next == this.released { + this.ReleaseBuffer() + } + this.hold = false +} + +func (this *Buffer) ReleaseBuffer() { + this.buffer.Release() + this.buffer = nil + for idx := range this.distro { + this.distro[idx] = nil + } +} + +var ( + globalBuffer *Buffer + globalBufferAccess sync.Mutex +) + +func AllocateBuffer() *alloc.Buffer { + globalBufferAccess.Lock() + defer globalBufferAccess.Unlock() + + if globalBuffer == nil { + globalBuffer = NewBuffer() + } + b := globalBuffer.Allocate() + if globalBuffer.IsEmpty() { + globalBuffer.Release() + globalBuffer = nil + } + return b +} diff --git a/transport/internet/kcp/buffer_test.go b/transport/internet/kcp/buffer_test.go new file mode 100644 index 000000000..99d2569a3 --- /dev/null +++ b/transport/internet/kcp/buffer_test.go @@ -0,0 +1,22 @@ +package kcp_test + +import ( + "testing" + + "github.com/v2ray/v2ray-core/testing/assert" + . "github.com/v2ray/v2ray-core/transport/internet/kcp" +) + +func TestBuffer(t *testing.T) { + assert := assert.On(t) + + b := NewBuffer() + + for i := 0; i < NumDistro; i++ { + x := b.Allocate() + assert.Pointer(x).IsNotNil() + x.Release() + } + assert.Pointer(b.Allocate()).IsNil() + b.Release() +} diff --git a/transport/internet/kcp/crypt_test.go b/transport/internet/kcp/crypt_test.go index 174d4e8fa..77f09cac6 100644 --- a/transport/internet/kcp/crypt_test.go +++ b/transport/internet/kcp/crypt_test.go @@ -12,7 +12,7 @@ import ( func TestSimpleAuthenticator(t *testing.T) { assert := assert.On(t) - buffer := alloc.NewBuffer().Clear() + buffer := alloc.NewLocalBuffer(512).Clear() buffer.AppendBytes('a', 'b', 'c', 'd', 'e', 'f', 'g') auth := NewSimpleAuthenticator() @@ -25,7 +25,7 @@ func TestSimpleAuthenticator(t *testing.T) { func TestSimpleAuthenticator2(t *testing.T) { assert := assert.On(t) - buffer := alloc.NewBuffer().Clear() + buffer := alloc.NewLocalBuffer(512).Clear() buffer.AppendBytes('1', '2') auth := NewSimpleAuthenticator() @@ -36,7 +36,7 @@ func TestSimpleAuthenticator2(t *testing.T) { } func BenchmarkSimpleAuthenticator(b *testing.B) { - buffer := alloc.NewBuffer().Clear() + buffer := alloc.NewLocalBuffer(2048).Clear() buffer.Slice(0, 1024) rand.Read(buffer.Value) diff --git a/transport/internet/kcp/receiving_test.go b/transport/internet/kcp/receiving_test.go index 4372ae0c9..019baa92e 100644 --- a/transport/internet/kcp/receiving_test.go +++ b/transport/internet/kcp/receiving_test.go @@ -40,8 +40,8 @@ func TestRecivingQueue(t *testing.T) { assert := assert.On(t) queue := NewReceivingQueue(2) - queue.Put(alloc.NewSmallBuffer().Clear().AppendString("abcd")) - queue.Put(alloc.NewSmallBuffer().Clear().AppendString("efg")) + queue.Put(alloc.NewLocalBuffer(512).Clear().AppendString("abcd")) + queue.Put(alloc.NewLocalBuffer(512).Clear().AppendString("efg")) assert.Bool(queue.IsFull()).IsTrue() b := make([]byte, 1024) @@ -49,7 +49,7 @@ func TestRecivingQueue(t *testing.T) { assert.Int(nBytes).Equals(7) assert.String(string(b[:nBytes])).Equals("abcdefg") - queue.Put(alloc.NewSmallBuffer().Clear().AppendString("1")) + queue.Put(alloc.NewLocalBuffer(512).Clear().AppendString("1")) queue.Close() nBytes = queue.Read(b) assert.Int(nBytes).Equals(0) diff --git a/transport/internet/kcp/segment.go b/transport/internet/kcp/segment.go index 7a94d9a57..58853ea42 100644 --- a/transport/internet/kcp/segment.go +++ b/transport/internet/kcp/segment.go @@ -176,7 +176,7 @@ func ReadSegment(buf []byte) (Segment, []byte) { if len(buf) < dataLen { return nil, nil } - seg.Data = alloc.NewSmallBuffer().Clear().Append(buf[:dataLen]) + seg.Data = AllocateBuffer().Clear().Append(buf[:dataLen]) buf = buf[dataLen:] return seg, buf diff --git a/transport/internet/kcp/segment_test.go b/transport/internet/kcp/segment_test.go index 4ca2460cd..97fa67c7e 100644 --- a/transport/internet/kcp/segment_test.go +++ b/transport/internet/kcp/segment_test.go @@ -24,7 +24,7 @@ func TestDataSegment(t *testing.T) { Timestamp: 3, Number: 4, SendingNext: 5, - Data: alloc.NewSmallBuffer().Clear().Append([]byte{'a', 'b', 'c', 'd'}), + Data: alloc.NewLocalBuffer(512).Clear().Append([]byte{'a', 'b', 'c', 'd'}), } nBytes := seg.ByteSize() diff --git a/transport/internet/kcp/sending.go b/transport/internet/kcp/sending.go index e6022c7aa..c2aca9dc0 100644 --- a/transport/internet/kcp/sending.go +++ b/transport/internet/kcp/sending.go @@ -346,7 +346,7 @@ func (this *SendingWorker) Push(b []byte) int { } else { size = len(b) } - this.queue.Push(alloc.NewSmallBuffer().Clear().Append(b[:size])) + this.queue.Push(AllocateBuffer().Clear().Append(b[:size])) b = b[size:] nBytes += size } diff --git a/transport/internet/kcp/sending_test.go b/transport/internet/kcp/sending_test.go index f627dbab7..19cc5280c 100644 --- a/transport/internet/kcp/sending_test.go +++ b/transport/internet/kcp/sending_test.go @@ -13,10 +13,10 @@ func TestSendingQueue(t *testing.T) { queue := NewSendingQueue(3) - seg0 := alloc.NewBuffer() - seg1 := alloc.NewBuffer() - seg2 := alloc.NewBuffer() - seg3 := alloc.NewBuffer() + seg0 := alloc.NewLocalBuffer(512) + seg1 := alloc.NewLocalBuffer(512) + seg2 := alloc.NewLocalBuffer(512) + seg3 := alloc.NewLocalBuffer(512) assert.Bool(queue.IsEmpty()).IsTrue() assert.Bool(queue.IsFull()).IsFalse() @@ -45,10 +45,10 @@ func TestSendingQueueClear(t *testing.T) { queue := NewSendingQueue(3) - seg0 := alloc.NewBuffer() - seg1 := alloc.NewBuffer() - seg2 := alloc.NewBuffer() - seg3 := alloc.NewBuffer() + seg0 := alloc.NewLocalBuffer(512) + seg1 := alloc.NewLocalBuffer(512) + seg2 := alloc.NewLocalBuffer(512) + seg3 := alloc.NewLocalBuffer(512) queue.Push(seg0) assert.Bool(queue.IsEmpty()).IsFalse()