diff --git a/app/dispatcher/impl/default.go b/app/dispatcher/impl/default.go index 4f6c93136..5ae2f6475 100644 --- a/app/dispatcher/impl/default.go +++ b/app/dispatcher/impl/default.go @@ -62,7 +62,7 @@ func (v *DefaultDispatcher) DispatchToOutbound(session *proxy.SessionInfo) ray.I } if session.Inbound != nil && session.Inbound.AllowPassiveConnection { - go dispatcher.Dispatch(destination, buf.NewLocalBuffer(32), direct) + go dispatcher.Dispatch(destination, buf.NewLocal(32), direct) } else { go v.FilterPacketAndDispatch(destination, direct, dispatcher) } diff --git a/app/dispatcher/testing/dispatcher.go b/app/dispatcher/testing/dispatcher.go index 9ff73e6aa..dfea35799 100644 --- a/app/dispatcher/testing/dispatcher.go +++ b/app/dispatcher/testing/dispatcher.go @@ -20,7 +20,7 @@ func NewTestPacketDispatcher(handler func(destination v2net.Destination, traffic if err != nil { break } - output := buf.NewBuffer() + output := buf.New() output.Append([]byte("Processed: ")) output.Append(payload.Bytes()) payload.Release() diff --git a/app/dns/nameserver.go b/app/dns/nameserver.go index 27be872b0..94adff650 100644 --- a/app/dns/nameserver.go +++ b/app/dns/nameserver.go @@ -145,7 +145,7 @@ func (v *UDPNameServer) HandleResponse(dest v2net.Destination, payload *buf.Buff } func (v *UDPNameServer) BuildQueryA(domain string, id uint16) *buf.Buffer { - buffer := buf.NewBuffer() + buffer := buf.New() msg := new(dns.Msg) msg.Id = id msg.RecursionDesired = true diff --git a/common/buf/buffer.go b/common/buf/buffer.go index dc53d25e8..35957f308 100644 --- a/common/buf/buffer.go +++ b/common/buf/buffer.go @@ -1,12 +1,12 @@ -// Package alloc provides a light-weight memory allocation mechanism. +// Package buf provides a light-weight memory allocation mechanism. package buf import ( "io" ) -// BytesWriter is a writer that writes contents into the given buffer. -type BytesWriter func([]byte) int +// Supplier is a writer that writes contents into the given buffer. +type Supplier func([]byte) (int, error) // Buffer is a recyclable allocation of a byte array. Buffer.Release() recycles // the buffer into an internal buffer pool, in order to recreate a buffer more @@ -59,10 +59,11 @@ func (b *Buffer) Append(data []byte) { b.end += nBytes } -// AppendFunc appends the content of a BytesWriter to the buffer. -func (b *Buffer) AppendFunc(writer BytesWriter) { - nBytes := writer(b.v[b.end:]) +// AppendSupplier appends the content of a BytesWriter to the buffer. +func (b *Buffer) AppendSupplier(writer Supplier) error { + nBytes, err := writer(b.v[b.end:]) b.end += nBytes + return err } // Byte returns the bytes at index. @@ -80,9 +81,11 @@ func (b *Buffer) Bytes() []byte { return b.v[b.start:b.end] } -func (b *Buffer) SetBytesFunc(writer BytesWriter) { +func (b *Buffer) Reset(writer Supplier) error { b.start = 0 - b.end = b.start + writer(b.v[b.start:]) + nBytes, err := writer(b.v[b.start:]) + b.end = b.start + nBytes + return err } // BytesRange returns a slice of this buffer with given from and to bounary. @@ -172,33 +175,21 @@ func (b *Buffer) Read(data []byte) (int, error) { return nBytes, nil } -func (b *Buffer) FillFrom(reader io.Reader) (int, error) { - nBytes, err := reader.Read(b.v[b.end:]) - b.end += nBytes - return nBytes, err -} - -func (b *Buffer) FillFullFrom(reader io.Reader, amount int) (int, error) { - nBytes, err := io.ReadFull(reader, b.v[b.end:b.end+amount]) - b.end += nBytes - return nBytes, err -} - func (b *Buffer) String() string { return string(b.Bytes()) } -// NewBuffer creates a Buffer with 8K bytes of arbitrary content. -func NewBuffer() *Buffer { +// New creates a Buffer with 8K bytes of arbitrary content. +func New() *Buffer { return mediumPool.Allocate() } -// NewSmallBuffer returns a buffer with 2K bytes capacity. -func NewSmallBuffer() *Buffer { +// NewSmall returns a buffer with 2K bytes capacity. +func NewSmall() *Buffer { return smallPool.Allocate() } -// NewLocalBuffer creates and returns a buffer on current thread. -func NewLocalBuffer(size int) *Buffer { +// NewLocal creates and returns a buffer on current thread. +func NewLocal(size int) *Buffer { return CreateBuffer(make([]byte, size), nil) } diff --git a/common/buf/buffer_pool.go b/common/buf/buffer_pool.go index ec13cf8e9..2d369f0c1 100644 --- a/common/buf/buffer_pool.go +++ b/common/buf/buffer_pool.go @@ -84,15 +84,15 @@ func (p *BufferPool) Free(buffer *Buffer) { } const ( - BufferSize = 8 * 1024 - SmallBufferSize = 2 * 1024 + Size = 8 * 1024 + SizeSmall = 2 * 1024 PoolSizeEnvKey = "v2ray.buffer.size" ) var ( mediumPool Pool - smallPool = NewSyncPool(2048) + smallPool = NewSyncPool(SizeSmall) ) func init() { @@ -106,8 +106,8 @@ func init() { } if size > 0 { totalByteSize := size * 1024 * 1024 - mediumPool = NewBufferPool(BufferSize, totalByteSize/BufferSize) + mediumPool = NewBufferPool(Size, totalByteSize/Size) } else { - mediumPool = NewSyncPool(BufferSize) + mediumPool = NewSyncPool(Size) } } diff --git a/common/buf/buffer_test.go b/common/buf/buffer_test.go index 61bd01803..fac397b4b 100644 --- a/common/buf/buffer_test.go +++ b/common/buf/buffer_test.go @@ -11,7 +11,7 @@ import ( func TestBufferClear(t *testing.T) { assert := assert.On(t) - buffer := NewBuffer() + buffer := New() defer buffer.Release() payload := "Bytes" @@ -25,7 +25,7 @@ func TestBufferClear(t *testing.T) { func TestBufferIsEmpty(t *testing.T) { assert := assert.On(t) - buffer := NewBuffer() + buffer := New() defer buffer.Release() assert.Bool(buffer.IsEmpty()).IsTrue() @@ -34,17 +34,17 @@ func TestBufferIsEmpty(t *testing.T) { func TestBufferString(t *testing.T) { assert := assert.On(t) - buffer := NewBuffer() + buffer := New() defer buffer.Release() - buffer.AppendFunc(serial.WriteString("Test String")) + buffer.AppendSupplier(serial.WriteString("Test String")) assert.String(buffer.String()).Equals("Test String") } func TestBufferWrite(t *testing.T) { assert := assert.On(t) - buffer := NewLocalBuffer(8) + buffer := NewLocal(8) nBytes, err := buffer.Write([]byte("abcd")) assert.Error(err).IsNil() assert.Int(nBytes).Equals(4) @@ -56,28 +56,28 @@ func TestBufferWrite(t *testing.T) { func BenchmarkNewBuffer8192(b *testing.B) { for i := 0; i < b.N; i++ { - buffer := NewBuffer() + buffer := New() buffer.Release() } } func BenchmarkNewLocalBuffer8192(b *testing.B) { for i := 0; i < b.N; i++ { - buffer := NewLocalBuffer(8192) + buffer := NewLocal(8192) buffer.Release() } } func BenchmarkNewBuffer2048(b *testing.B) { for i := 0; i < b.N; i++ { - buffer := NewSmallBuffer() + buffer := NewSmall() buffer.Release() } } func BenchmarkNewLocalBuffer2048(b *testing.B) { for i := 0; i < b.N; i++ { - buffer := NewLocalBuffer(2048) + buffer := NewLocal(2048) buffer.Release() } } @@ -94,7 +94,7 @@ func BenchmarkBufferValue(b *testing.B) { } func BenchmarkBufferPointer(b *testing.B) { - x := NewSmallBuffer() + x := NewSmall() doSomething := func(a *Buffer) { _ = a.Len() } diff --git a/common/buf/io.go b/common/buf/io.go new file mode 100644 index 000000000..29e760a79 --- /dev/null +++ b/common/buf/io.go @@ -0,0 +1,15 @@ +package buf + +import "io" + +func ReadFrom(reader io.Reader) Supplier { + return func(b []byte) (int, error) { + return reader.Read(b) + } +} + +func ReadFullFrom(reader io.Reader, size int) Supplier { + return func(b []byte) (int, error) { + return io.ReadFull(reader, b[:size]) + } +} diff --git a/common/crypto/auth.go b/common/crypto/auth.go index f142a298c..dd443a680 100644 --- a/common/crypto/auth.go +++ b/common/crypto/auth.go @@ -81,7 +81,7 @@ type AuthenticationReader struct { func NewAuthenticationReader(auth Authenticator, reader io.Reader, aggressive bool) *AuthenticationReader { return &AuthenticationReader{ auth: auth, - buffer: buf.NewLocalBuffer(32 * 1024), + buffer: buf.NewLocal(32 * 1024), reader: reader, aggressive: aggressive, } @@ -135,11 +135,11 @@ func (v *AuthenticationReader) EnsureChunk() error { v.buffer.Clear() } else { leftover := v.buffer.Bytes() - v.buffer.SetBytesFunc(func(b []byte) int { - return copy(b, leftover) + v.buffer.Reset(func(b []byte) (int, error) { + return copy(b, leftover), nil }) } - _, err = v.buffer.FillFrom(v.reader) + err = v.buffer.AppendSupplier(buf.ReadFrom(v.reader)) } return err } diff --git a/common/io/buffered_reader.go b/common/io/buffered_reader.go index bf9c93f92..5cebe90a9 100644 --- a/common/io/buffered_reader.go +++ b/common/io/buffered_reader.go @@ -17,7 +17,7 @@ type BufferedReader struct { func NewBufferedReader(rawReader io.Reader) *BufferedReader { return &BufferedReader{ reader: rawReader, - buffer: buf.NewBuffer(), + buffer: buf.New(), cached: true, } } @@ -54,7 +54,7 @@ func (v *BufferedReader) Read(b []byte) (int, error) { return v.reader.Read(b) } if v.buffer.IsEmpty() { - _, err := v.buffer.FillFrom(v.reader) + err := v.buffer.AppendSupplier(buf.ReadFrom(v.reader)) if err != nil { return 0, err } diff --git a/common/io/buffered_reader_test.go b/common/io/buffered_reader_test.go index 49d9edb39..24bf6f11d 100644 --- a/common/io/buffered_reader_test.go +++ b/common/io/buffered_reader_test.go @@ -1,9 +1,9 @@ package io_test import ( + "crypto/rand" "testing" - "crypto/rand" "v2ray.com/core/common/buf" . "v2ray.com/core/common/io" "v2ray.com/core/testing/assert" @@ -12,8 +12,8 @@ import ( func TestBufferedReader(t *testing.T) { assert := assert.On(t) - content := buf.NewBuffer() - content.FillFrom(rand.Reader) + content := buf.New() + content.AppendSupplier(buf.ReadFrom(rand.Reader)) len := content.Len() diff --git a/common/io/buffered_writer.go b/common/io/buffered_writer.go index 67df6be80..4a868930d 100644 --- a/common/io/buffered_writer.go +++ b/common/io/buffered_writer.go @@ -3,6 +3,7 @@ package io import ( "io" "sync" + "v2ray.com/core/common/buf" "v2ray.com/core/common/errors" ) @@ -17,7 +18,7 @@ type BufferedWriter struct { func NewBufferedWriter(rawWriter io.Writer) *BufferedWriter { return &BufferedWriter{ writer: rawWriter, - buffer: buf.NewSmallBuffer(), + buffer: buf.NewSmall(), cached: true, } } @@ -32,8 +33,9 @@ func (v *BufferedWriter) ReadFrom(reader io.Reader) (int64, error) { totalBytes := int64(0) for { - nBytes, err := v.buffer.FillFrom(reader) - totalBytes += int64(nBytes) + oriSize := v.buffer.Len() + err := v.buffer.AppendSupplier(buf.ReadFrom(reader)) + totalBytes += int64(v.buffer.Len() - oriSize) if err != nil { if errors.Cause(err) == io.EOF { return totalBytes, nil diff --git a/common/io/buffered_writer_test.go b/common/io/buffered_writer_test.go index 215b918c0..56e0997af 100644 --- a/common/io/buffered_writer_test.go +++ b/common/io/buffered_writer_test.go @@ -12,7 +12,7 @@ import ( func TestBufferedWriter(t *testing.T) { assert := assert.On(t) - content := buf.NewBuffer() + content := buf.New() writer := NewBufferedWriter(content) assert.Bool(writer.Cached()).IsTrue() @@ -32,7 +32,7 @@ func TestBufferedWriter(t *testing.T) { func TestBufferedWriterLargePayload(t *testing.T) { assert := assert.On(t) - content := buf.NewLocalBuffer(128 * 1024) + content := buf.NewLocal(128 * 1024) writer := NewBufferedWriter(content) assert.Bool(writer.Cached()).IsTrue() diff --git a/common/io/chain_writer.go b/common/io/chain_writer.go index 6cf0d4705..446191fc8 100644 --- a/common/io/chain_writer.go +++ b/common/io/chain_writer.go @@ -28,7 +28,7 @@ func (v *ChainWriter) Write(payload []byte) (int, error) { bytesWritten := 0 size := len(payload) for size > 0 { - buffer := buf.NewBuffer() + buffer := buf.New() nBytes, _ := buffer.Write(payload) size -= nBytes payload = payload[nBytes:] diff --git a/common/io/reader.go b/common/io/reader.go index 5614a4a18..3e6c21996 100644 --- a/common/io/reader.go +++ b/common/io/reader.go @@ -33,24 +33,24 @@ func NewAdaptiveReader(reader io.Reader) *AdaptiveReader { func (v *AdaptiveReader) Read() (*buf.Buffer, error) { if v.highVolumn && v.largeBuffer.IsEmpty() { if v.largeBuffer == nil { - v.largeBuffer = buf.NewLocalBuffer(32 * 1024) + v.largeBuffer = buf.NewLocal(32 * 1024) } - nBytes, err := v.largeBuffer.FillFrom(v.reader) + err := v.largeBuffer.AppendSupplier(buf.ReadFrom(v.reader)) if err != nil { return nil, err } - if nBytes < buf.BufferSize { + if v.largeBuffer.Len() < buf.Size { v.highVolumn = false } } - buffer := buf.NewBuffer() + buffer := buf.New() if !v.largeBuffer.IsEmpty() { - buffer.FillFrom(v.largeBuffer) + buffer.AppendSupplier(buf.ReadFrom(v.largeBuffer)) return buffer, nil } - _, err := buffer.FillFrom(v.reader) + err := buffer.AppendSupplier(buf.ReadFrom(v.reader)) if err != nil { buffer.Release() return nil, err diff --git a/common/io/reader_test.go b/common/io/reader_test.go index 67ab1600d..f991a2af1 100644 --- a/common/io/reader_test.go +++ b/common/io/reader_test.go @@ -19,8 +19,8 @@ func TestAdaptiveReader(t *testing.T) { b1, err := reader.Read() assert.Error(err).IsNil() assert.Bool(b1.IsFull()).IsTrue() - assert.Int(b1.Len()).Equals(buf.BufferSize) - assert.Int(buffer.Len()).Equals(cap(rawContent) - buf.BufferSize) + assert.Int(b1.Len()).Equals(buf.Size) + assert.Int(buffer.Len()).Equals(cap(rawContent) - buf.Size) b2, err := reader.Read() assert.Error(err).IsNil() diff --git a/common/io/writer_test.go b/common/io/writer_test.go index 90ce69cb2..1ac93ad15 100644 --- a/common/io/writer_test.go +++ b/common/io/writer_test.go @@ -13,8 +13,8 @@ import ( func TestAdaptiveWriter(t *testing.T) { assert := assert.On(t) - lb := buf.NewBuffer() - lb.FillFrom(rand.Reader) + lb := buf.New() + lb.AppendSupplier(buf.ReadFrom(rand.Reader)) expectedBytes := append([]byte(nil), lb.Bytes()...) diff --git a/common/serial/hash.go b/common/serial/hash.go index 84683d84b..ffb576377 100644 --- a/common/serial/hash.go +++ b/common/serial/hash.go @@ -6,9 +6,9 @@ import ( "v2ray.com/core/common/buf" ) -func WriteHash(h hash.Hash) buf.BytesWriter { - return func(b []byte) int { +func WriteHash(h hash.Hash) buf.Supplier { + return func(b []byte) (int, error) { h.Sum(b[:0]) - return h.Size() + return h.Size(), nil } } diff --git a/common/serial/numbers.go b/common/serial/numbers.go index bd670b606..692942e09 100644 --- a/common/serial/numbers.go +++ b/common/serial/numbers.go @@ -14,10 +14,10 @@ func Uint16ToString(value uint16) string { return strconv.Itoa(int(value)) } -func WriteUint16(value uint16) buf.BytesWriter { - return func(b []byte) int { +func WriteUint16(value uint16) buf.Supplier { + return func(b []byte) (int, error) { b = Uint16ToBytes(value, b[:0]) - return 2 + return 2, nil } } @@ -29,10 +29,10 @@ func Uint32ToString(value uint32) string { return strconv.FormatUint(uint64(value), 10) } -func WriteUint32(value uint32) buf.BytesWriter { - return func(b []byte) int { +func WriteUint32(value uint32) buf.Supplier { + return func(b []byte) (int, error) { b = Uint32ToBytes(value, b[:0]) - return 4 + return 4, nil } } diff --git a/common/serial/string.go b/common/serial/string.go index e21eb126b..823d96656 100644 --- a/common/serial/string.go +++ b/common/serial/string.go @@ -36,9 +36,8 @@ func Concat(v ...interface{}) string { return strings.Join(values, "") } -func WriteString(s string) buf.BytesWriter { - return func(b []byte) int { - copy(b, []byte(s)) - return len(s) +func WriteString(s string) buf.Supplier { + return func(b []byte) (int, error) { + return copy(b, []byte(s)), nil } } diff --git a/proxy/blackhole/config.go b/proxy/blackhole/config.go index 05e5ab432..c73be808c 100644 --- a/proxy/blackhole/config.go +++ b/proxy/blackhole/config.go @@ -32,8 +32,8 @@ func (v *NoneResponse) AsAny() *any.Any { } func (v *HTTPResponse) WriteTo(writer v2io.Writer) { - b := buf.NewLocalBuffer(512) - b.AppendFunc(serial.WriteString(http403response)) + b := buf.NewLocal(512) + b.AppendSupplier(serial.WriteString(http403response)) writer.Write(b) } diff --git a/proxy/blackhole/config_test.go b/proxy/blackhole/config_test.go index 3d34ab92e..e0bc06cf6 100644 --- a/proxy/blackhole/config_test.go +++ b/proxy/blackhole/config_test.go @@ -14,7 +14,7 @@ import ( func TestHTTPResponse(t *testing.T) { assert := assert.On(t) - buffer := buf.NewBuffer() + buffer := buf.New() httpResponse := new(HTTPResponse) httpResponse.WriteTo(v2io.NewAdaptiveWriter(buffer)) diff --git a/proxy/freedom/freedom_test.go b/proxy/freedom/freedom_test.go index f0584e5f1..30c715cc7 100644 --- a/proxy/freedom/freedom_test.go +++ b/proxy/freedom/freedom_test.go @@ -47,7 +47,7 @@ func TestSinglePacket(t *testing.T) { traffic := ray.NewRay() data2Send := "Data to be sent to remote" - payload := buf.NewLocalBuffer(2048) + payload := buf.NewLocal(2048) payload.Append([]byte(data2Send)) go freedom.Dispatch(v2net.TCPDestination(v2net.LocalHostIP, tcpServer.Port), payload, traffic) diff --git a/proxy/shadowsocks/ota.go b/proxy/shadowsocks/ota.go index 174d2ae62..fbacac299 100644 --- a/proxy/shadowsocks/ota.go +++ b/proxy/shadowsocks/ota.go @@ -27,13 +27,12 @@ func NewAuthenticator(keygen KeyGenerator) *Authenticator { } } -func (v *Authenticator) Authenticate(data []byte) buf.BytesWriter { +func (v *Authenticator) Authenticate(data []byte) buf.Supplier { hasher := hmac.New(sha1.New, v.key()) hasher.Write(data) res := hasher.Sum(nil) - return func(b []byte) int { - copy(b, res[:AuthSize]) - return AuthSize + return func(b []byte) (int, error) { + return copy(b, res[:AuthSize]), nil } } @@ -75,22 +74,22 @@ func (v *ChunkReader) Release() { } func (v *ChunkReader) Read() (*buf.Buffer, error) { - buffer := buf.NewBuffer() - if _, err := buffer.FillFullFrom(v.reader, 2); err != nil { + buffer := buf.New() + if err := buffer.AppendSupplier(buf.ReadFullFrom(v.reader, 2)); err != nil { buffer.Release() return nil, err } // There is a potential buffer overflow here. Large buffer is 64K bytes, // while uin16 + 10 will be more than that length := serial.BytesToUint16(buffer.BytesTo(2)) + AuthSize - if length > buf.BufferSize { + if length > buf.Size { // Theoretically the size of a chunk is 64K, but most Shadowsocks implementations used <4K buffer. buffer.Release() - buffer = buf.NewLocalBuffer(int(length) + 128) + buffer = buf.NewLocal(int(length) + 128) } buffer.Clear() - if _, err := buffer.FillFullFrom(v.reader, int(length)); err != nil { + if err := buffer.AppendSupplier(buf.ReadFullFrom(v.reader, int(length))); err != nil { buffer.Release() return nil, err } diff --git a/proxy/shadowsocks/ota_test.go b/proxy/shadowsocks/ota_test.go index 2bad69762..8a93eb341 100644 --- a/proxy/shadowsocks/ota_test.go +++ b/proxy/shadowsocks/ota_test.go @@ -11,7 +11,7 @@ import ( func TestNormalChunkReading(t *testing.T) { assert := assert.On(t) - buffer := buf.NewBuffer() + buffer := buf.New() buffer.AppendBytes( 0, 8, 39, 228, 69, 96, 133, 39, 254, 26, 201, 70, 11, 12, 13, 14, 15, 16, 17, 18) reader := NewChunkReader(buffer, NewAuthenticator(ChunkKeyGenerator( @@ -24,11 +24,11 @@ func TestNormalChunkReading(t *testing.T) { func TestNormalChunkWriting(t *testing.T) { assert := assert.On(t) - buffer := buf.NewLocalBuffer(512) + buffer := buf.NewLocal(512) writer := NewChunkWriter(buffer, NewAuthenticator(ChunkKeyGenerator( []byte{21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36}))) - b := buf.NewLocalBuffer(256) + b := buf.NewLocal(256) b.Append([]byte{11, 12, 13, 14, 15, 16, 17, 18}) err := writer.Write(b) assert.Error(err).IsNil() diff --git a/proxy/shadowsocks/protocol.go b/proxy/shadowsocks/protocol.go index 75c8ebd37..e57cb5f7e 100644 --- a/proxy/shadowsocks/protocol.go +++ b/proxy/shadowsocks/protocol.go @@ -4,6 +4,7 @@ import ( "bytes" "crypto/rand" "io" + "v2ray.com/core/common/buf" "v2ray.com/core/common/crypto" "v2ray.com/core/common/errors" @@ -29,11 +30,11 @@ func ReadTCPSession(user *protocol.User, reader io.Reader) (*protocol.RequestHea } account := rawAccount.(*ShadowsocksAccount) - buffer := buf.NewLocalBuffer(512) + buffer := buf.NewLocal(512) defer buffer.Release() ivLen := account.Cipher.IVSize() - _, err = buffer.FillFullFrom(reader, ivLen) + err = buffer.AppendSupplier(buf.ReadFullFrom(reader, ivLen)) if err != nil { return nil, nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to read IV.") } @@ -54,7 +55,7 @@ func ReadTCPSession(user *protocol.User, reader io.Reader) (*protocol.RequestHea } buffer.Clear() - _, err = buffer.FillFullFrom(reader, 1) + err = buffer.AppendSupplier(buf.ReadFullFrom(reader, 1)) if err != nil { return nil, nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to read address type.") } @@ -74,24 +75,24 @@ func ReadTCPSession(user *protocol.User, reader io.Reader) (*protocol.RequestHea switch addrType { case AddrTypeIPv4: - _, err := buffer.FillFullFrom(reader, 4) + err := buffer.AppendSupplier(buf.ReadFullFrom(reader, 4)) if err != nil { return nil, nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to read IPv4 address.") } request.Address = v2net.IPAddress(buffer.BytesFrom(-4)) case AddrTypeIPv6: - _, err := buffer.FillFullFrom(reader, 16) + err := buffer.AppendSupplier(buf.ReadFullFrom(reader, 16)) if err != nil { return nil, nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to read IPv6 address.") } request.Address = v2net.IPAddress(buffer.BytesFrom(-16)) case AddrTypeDomain: - _, err := buffer.FillFullFrom(reader, 1) + err := buffer.AppendSupplier(buf.ReadFullFrom(reader, 1)) if err != nil { return nil, nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to read domain lenth.") } domainLength := int(buffer.BytesFrom(-1)[0]) - _, err = buffer.FillFullFrom(reader, domainLength) + err = buffer.AppendSupplier(buf.ReadFullFrom(reader, domainLength)) if err != nil { return nil, nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to read domain.") } @@ -100,7 +101,7 @@ func ReadTCPSession(user *protocol.User, reader io.Reader) (*protocol.RequestHea return nil, nil, errors.New("Shadowsocks|TCP: Unknown address type: ", addrType) } - _, err = buffer.FillFullFrom(reader, 2) + err = buffer.AppendSupplier(buf.ReadFullFrom(reader, 2)) if err != nil { return nil, nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to read port.") } @@ -110,7 +111,7 @@ func ReadTCPSession(user *protocol.User, reader io.Reader) (*protocol.RequestHea actualAuth := make([]byte, AuthSize) authenticator.Authenticate(buffer.Bytes())(actualAuth) - _, err := buffer.FillFullFrom(reader, AuthSize) + err := buffer.AppendSupplier(buf.ReadFullFrom(reader, AuthSize)) if err != nil { return nil, nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to read OTA.") } @@ -152,7 +153,7 @@ func WriteTCPRequest(request *protocol.RequestHeader, writer io.Writer) (v2io.Wr writer = crypto.NewCryptionWriter(stream, writer) - header := buf.NewLocalBuffer(512) + header := buf.NewLocal(512) switch request.Address.Family() { case v2net.AddressFamilyIPv4: @@ -168,13 +169,13 @@ func WriteTCPRequest(request *protocol.RequestHeader, writer io.Writer) (v2io.Wr return nil, errors.New("Shadowsocks|TCP: Unsupported address type: ", request.Address.Family()) } - header.AppendFunc(serial.WriteUint16(uint16(request.Port))) + header.AppendSupplier(serial.WriteUint16(uint16(request.Port))) if request.Option.Has(RequestOptionOneTimeAuth) { header.SetByte(0, header.Byte(0)|0x10) authenticator := NewAuthenticator(HeaderKeyGenerator(account.Key, iv)) - header.AppendFunc(authenticator.Authenticate(header.Bytes())) + header.AppendSupplier(authenticator.Authenticate(header.Bytes())) } _, err = writer.Write(header.Bytes()) @@ -243,9 +244,9 @@ func EncodeUDPPacket(request *protocol.RequestHeader, payload *buf.Buffer) (*buf } account := rawAccount.(*ShadowsocksAccount) - buffer := buf.NewSmallBuffer() + buffer := buf.NewSmall() ivLen := account.Cipher.IVSize() - buffer.FillFullFrom(rand.Reader, ivLen) + buffer.AppendSupplier(buf.ReadFullFrom(rand.Reader, ivLen)) iv := buffer.Bytes() switch request.Address.Family() { @@ -262,14 +263,14 @@ func EncodeUDPPacket(request *protocol.RequestHeader, payload *buf.Buffer) (*buf return nil, errors.New("Shadowsocks|UDP: Unsupported address type: ", request.Address.Family()) } - buffer.AppendFunc(serial.WriteUint16(uint16(request.Port))) + buffer.AppendSupplier(serial.WriteUint16(uint16(request.Port))) buffer.Append(payload.Bytes()) if request.Option.Has(RequestOptionOneTimeAuth) { authenticator := NewAuthenticator(HeaderKeyGenerator(account.Key, iv)) buffer.SetByte(ivLen, buffer.Byte(ivLen)|0x10) - buffer.AppendFunc(authenticator.Authenticate(buffer.BytesFrom(ivLen))) + buffer.AppendSupplier(authenticator.Authenticate(buffer.BytesFrom(ivLen))) } stream, err := account.Cipher.NewEncodingStream(account.Key, iv) @@ -360,8 +361,8 @@ type UDPReader struct { } func (v *UDPReader) Read() (*buf.Buffer, error) { - buffer := buf.NewSmallBuffer() - _, err := buffer.FillFrom(v.Reader) + buffer := buf.NewSmall() + err := buffer.AppendSupplier(buf.ReadFrom(v.Reader)) if err != nil { buffer.Release() return nil, err diff --git a/proxy/shadowsocks/protocol_test.go b/proxy/shadowsocks/protocol_test.go index d27d78b9d..f730ed016 100644 --- a/proxy/shadowsocks/protocol_test.go +++ b/proxy/shadowsocks/protocol_test.go @@ -30,8 +30,8 @@ func TestUDPEncoding(t *testing.T) { }, } - data := buf.NewLocalBuffer(256) - data.AppendFunc(serial.WriteString("test string")) + data := buf.NewLocal(256) + data.AppendSupplier(serial.WriteString("test string")) encodedData, err := EncodeUDPPacket(request, data) assert.Error(err).IsNil() @@ -60,9 +60,9 @@ func TestTCPRequest(t *testing.T) { }, } - data := buf.NewLocalBuffer(256) - data.AppendFunc(serial.WriteString("test string")) - cache := buf.NewBuffer() + data := buf.NewLocal(256) + data.AppendSupplier(serial.WriteString("test string")) + cache := buf.New() writer, err := WriteTCPRequest(request, cache) assert.Error(err).IsNil() @@ -88,7 +88,7 @@ func TestUDPReaderWriter(t *testing.T) { CipherType: CipherType_CHACHA20_IEFT, }), } - cache := buf.NewBuffer() + cache := buf.New() writer := &UDPWriter{ Writer: cache, Request: &protocol.RequestHeader{ @@ -105,8 +105,8 @@ func TestUDPReaderWriter(t *testing.T) { User: user, } - b := buf.NewBuffer() - b.AppendFunc(serial.WriteString("test payload")) + b := buf.New() + b.AppendSupplier(serial.WriteString("test payload")) err := writer.Write(b) assert.Error(err).IsNil() @@ -114,8 +114,8 @@ func TestUDPReaderWriter(t *testing.T) { assert.Error(err).IsNil() assert.String(payload.String()).Equals("test payload") - b = buf.NewBuffer() - b.AppendFunc(serial.WriteString("test payload 2")) + b = buf.New() + b.AppendSupplier(serial.WriteString("test payload 2")) err = writer.Write(b) assert.Error(err).IsNil() diff --git a/proxy/socks/protocol/socks.go b/proxy/socks/protocol/socks.go index 9408bb22c..4f59c3dcf 100644 --- a/proxy/socks/protocol/socks.go +++ b/proxy/socks/protocol/socks.go @@ -3,12 +3,13 @@ package protocol import ( "fmt" "io" + "v2ray.com/core/common/buf" + "v2ray.com/core/common/crypto" "v2ray.com/core/common/errors" "v2ray.com/core/common/log" v2net "v2ray.com/core/common/net" "v2ray.com/core/proxy" - "v2ray.com/core/common/crypto" ) const ( @@ -120,10 +121,10 @@ func (request Socks5UserPassRequest) AuthDetail() string { } func ReadUserPassRequest(reader io.Reader) (request Socks5UserPassRequest, err error) { - buffer := buf.NewLocalBuffer(512) + buffer := buf.NewLocal(512) defer buffer.Release() - _, err = buffer.FillFullFrom(reader, 2) + err = buffer.AppendSupplier(buf.ReadFullFrom(reader, 2)) if err != nil { return } @@ -131,18 +132,18 @@ func ReadUserPassRequest(reader io.Reader) (request Socks5UserPassRequest, err e nUsername := int(buffer.Byte(1)) buffer.Clear() - _, err = buffer.FillFullFrom(reader, nUsername) + err = buffer.AppendSupplier(buf.ReadFullFrom(reader, nUsername)) if err != nil { return } request.username = string(buffer.Bytes()) - _, err = buffer.FillFullFrom(reader, 1) + err = buffer.AppendSupplier(buf.ReadFullFrom(reader, 1)) if err != nil { return } nPassword := int(buffer.Byte(0)) - _, err = buffer.FillFullFrom(reader, nPassword) + err = buffer.AppendSupplier(buf.ReadFullFrom(reader, nPassword)) if err != nil { return } @@ -188,10 +189,10 @@ type Socks5Request struct { } func ReadRequest(reader io.Reader) (request *Socks5Request, err error) { - buffer := buf.NewLocalBuffer(512) + buffer := buf.NewLocal(512) defer buffer.Release() - _, err = buffer.FillFullFrom(reader, 4) + err = buffer.AppendSupplier(buf.ReadFullFrom(reader, 4)) if err != nil { return } @@ -210,12 +211,12 @@ func ReadRequest(reader io.Reader) (request *Socks5Request, err error) { } case AddrTypeDomain: buffer.Clear() - _, err = buffer.FillFullFrom(reader, 1) + err = buffer.AppendSupplier(buf.ReadFullFrom(reader, 1)) if err != nil { return } domainLength := int(buffer.Byte(0)) - _, err = buffer.FillFullFrom(reader, domainLength) + err = buffer.AppendSupplier(buf.ReadFullFrom(reader, domainLength)) if err != nil { return } @@ -231,7 +232,7 @@ func ReadRequest(reader io.Reader) (request *Socks5Request, err error) { return } - _, err = buffer.FillFullFrom(reader, 2) + err = buffer.AppendSupplier(buf.ReadFullFrom(reader, 2)) if err != nil { return } diff --git a/proxy/socks/protocol/socks4_test.go b/proxy/socks/protocol/socks4_test.go index 6050aa3ac..75b9b8986 100644 --- a/proxy/socks/protocol/socks4_test.go +++ b/proxy/socks/protocol/socks4_test.go @@ -31,7 +31,7 @@ func TestSocks4AuthenticationResponseToBytes(t *testing.T) { response := NewSocks4AuthenticationResponse(byte(0x10), 443, []byte{1, 2, 3, 4}) - buffer := buf.NewLocalBuffer(2048) + buffer := buf.NewLocal(2048) defer buffer.Release() response.Write(buffer) diff --git a/proxy/socks/protocol/socks_test.go b/proxy/socks/protocol/socks_test.go index f25164c2b..45f11930f 100644 --- a/proxy/socks/protocol/socks_test.go +++ b/proxy/socks/protocol/socks_test.go @@ -6,10 +6,10 @@ import ( "testing" "v2ray.com/core/common/buf" + "v2ray.com/core/common/crypto" v2net "v2ray.com/core/common/net" "v2ray.com/core/proxy" "v2ray.com/core/testing/assert" - "v2ray.com/core/common/crypto" ) func TestHasAuthenticationMethod(t *testing.T) { @@ -30,7 +30,7 @@ func TestHasAuthenticationMethod(t *testing.T) { func TestAuthenticationRequestRead(t *testing.T) { assert := assert.On(t) - buffer := buf.NewBuffer() + buffer := buf.New() buffer.AppendBytes( 0x05, // version 0x01, // nMethods @@ -85,7 +85,7 @@ func TestResponseWrite(t *testing.T) { [16]byte{}, v2net.Port(53), } - buffer := buf.NewLocalBuffer(2048) + buffer := buf.NewLocal(2048) defer buffer.Release() response.Write(buffer) @@ -106,7 +106,7 @@ func TestSetIPv6(t *testing.T) { response := NewSocks5Response() response.SetIPv6([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}) - buffer := buf.NewLocalBuffer(2048) + buffer := buf.NewLocal(2048) defer buffer.Release() response.Write(buffer) assert.Bytes(buffer.Bytes()).Equals([]byte{ @@ -119,7 +119,7 @@ func TestSetDomain(t *testing.T) { response := NewSocks5Response() response.SetDomain("v2ray.com") - buffer := buf.NewLocalBuffer(2048) + buffer := buf.NewLocal(2048) defer buffer.Release() response.Write(buffer) assert.Bytes(buffer.Bytes()).Equals([]byte{ @@ -129,7 +129,7 @@ func TestSetDomain(t *testing.T) { func TestEmptyAuthRequest(t *testing.T) { assert := assert.On(t) - _, _, err := ReadAuthentication(buf.NewBuffer()) + _, _, err := ReadAuthentication(buf.New()) assert.Error(err).Equals(io.EOF) } @@ -143,7 +143,7 @@ func TestSingleByteAuthRequest(t *testing.T) { func TestZeroAuthenticationMethod(t *testing.T) { assert := assert.On(t) - buffer := buf.NewBuffer() + buffer := buf.New() buffer.AppendBytes(5, 0) _, _, err := ReadAuthentication(buffer) assert.Error(err).Equals(crypto.ErrAuthenticationFailed) @@ -151,7 +151,7 @@ func TestZeroAuthenticationMethod(t *testing.T) { func TestWrongProtocolVersion(t *testing.T) { assert := assert.On(t) - buffer := buf.NewBuffer() + buffer := buf.New() buffer.AppendBytes(6, 1, 0) _, _, err := ReadAuthentication(buffer) assert.Error(err).Equals(proxy.ErrInvalidProtocolVersion) @@ -160,14 +160,14 @@ func TestWrongProtocolVersion(t *testing.T) { func TestEmptyRequest(t *testing.T) { assert := assert.On(t) - _, err := ReadRequest(buf.NewBuffer()) + _, err := ReadRequest(buf.New()) assert.Error(err).Equals(io.EOF) } func TestIPv6Request(t *testing.T) { assert := assert.On(t) - b := buf.NewBuffer() + b := buf.New() b.AppendBytes(5, 1, 0, 4, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 0, 8) request, err := ReadRequest(b) assert.Error(err).IsNil() diff --git a/proxy/socks/protocol/udp.go b/proxy/socks/protocol/udp.go index f85d7001e..89974b6dc 100644 --- a/proxy/socks/protocol/udp.go +++ b/proxy/socks/protocol/udp.go @@ -36,7 +36,7 @@ func (request *Socks5UDPRequest) Write(buffer *buf.Buffer) { buffer.AppendBytes(AddrTypeDomain, byte(len(request.Address.Domain()))) buffer.Append([]byte(request.Address.Domain())) } - buffer.AppendFunc(serial.WriteUint16(request.Port.Value())) + buffer.AppendSupplier(serial.WriteUint16(request.Port.Value())) buffer.Append(request.Data.Bytes()) } @@ -83,7 +83,7 @@ func ReadUDPRequest(packet []byte) (*Socks5UDPRequest, error) { } if len(packet) > dataBegin { - b := buf.NewSmallBuffer() + b := buf.NewSmall() b.Append(packet[dataBegin:]) request.Data = b } diff --git a/proxy/socks/server_udp.go b/proxy/socks/server_udp.go index 05c31063d..2a8a6b191 100644 --- a/proxy/socks/server_udp.go +++ b/proxy/socks/server_udp.go @@ -55,7 +55,7 @@ func (v *Server) handleUDPPayload(payload *buf.Buffer, session *proxy.SessionInf } log.Info("Socks: Writing back UDP response with ", payload.Len(), " bytes to ", destination) - udpMessage := buf.NewLocalBuffer(2048) + udpMessage := buf.NewLocal(2048) response.Write(udpMessage) v.udpMutex.RLock() diff --git a/proxy/vmess/encoding/commands.go b/proxy/vmess/encoding/commands.go index 654bb3cc7..5f6c6af47 100644 --- a/proxy/vmess/encoding/commands.go +++ b/proxy/vmess/encoding/commands.go @@ -32,7 +32,7 @@ func MarshalCommand(command interface{}, writer io.Writer) error { return ErrUnknownCommand } - buffer := buf.NewLocalBuffer(512) + buffer := buf.NewLocal(512) defer buffer.Release() err := factory.Marshal(command, buffer) diff --git a/proxy/vmess/encoding/commands_test.go b/proxy/vmess/encoding/commands_test.go index f5cbe3310..142c73cb7 100644 --- a/proxy/vmess/encoding/commands_test.go +++ b/proxy/vmess/encoding/commands_test.go @@ -21,7 +21,7 @@ func TestSwitchAccount(t *testing.T) { ValidMin: 16, } - buffer := buf.NewBuffer() + buffer := buf.New() err := MarshalCommand(sa, buffer) assert.Error(err).IsNil() diff --git a/proxy/vmess/encoding/encoding_test.go b/proxy/vmess/encoding/encoding_test.go index 7948e920a..e5452834d 100644 --- a/proxy/vmess/encoding/encoding_test.go +++ b/proxy/vmess/encoding/encoding_test.go @@ -36,7 +36,7 @@ func TestRequestSerialization(t *testing.T) { Security: protocol.Security(protocol.SecurityType_AES128_GCM), } - buffer := buf.NewBuffer() + buffer := buf.New() client := NewClientSession(protocol.DefaultIDHash) client.EncodeRequestHeader(expectedRequest, buffer) diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index 2f43796e8..8b0f562b4 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -225,7 +225,7 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) { } output.Release() if request.Option.Has(protocol.RequestOptionChunkStream) { - if err := bodyWriter.Write(buf.NewLocalBuffer(8)); err != nil { + if err := bodyWriter.Write(buf.NewLocal(8)); err != nil { connection.SetReusable(false) } } diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index 44255e38d..f7a02f80c 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -116,7 +116,7 @@ func (v *VMessOutboundHandler) handleRequest(session *encoding.ClientSession, co } if request.Option.Has(protocol.RequestOptionChunkStream) { - err := bodyWriter.Write(buf.NewLocalBuffer(8)) + err := bodyWriter.Write(buf.NewLocal(8)) if err != nil { conn.SetReusable(false) } diff --git a/testing/scenarios/shadowsocks_test.go b/testing/scenarios/shadowsocks_test.go index df7a3fa9a..e32c71956 100644 --- a/testing/scenarios/shadowsocks_test.go +++ b/testing/scenarios/shadowsocks_test.go @@ -4,6 +4,7 @@ import ( "fmt" "net" "testing" + "v2ray.com/core/common/buf" v2net "v2ray.com/core/common/net" "v2ray.com/core/testing/assert" @@ -42,11 +43,11 @@ func TestShadowsocksTCP(t *testing.T) { //conn.CloseWrite() - response := buf.NewBuffer() + response := buf.New() finished := false expectedResponse := "Processed: " + payload for { - _, err := response.FillFrom(conn) + err := response.AppendSupplier(buf.ReadFrom(conn)) assert.Error(err).IsNil() if err != nil { break diff --git a/transport/internet/authenticator.go b/transport/internet/authenticator.go deleted file mode 100644 index 45391542b..000000000 --- a/transport/internet/authenticator.go +++ /dev/null @@ -1,70 +0,0 @@ -package internet - -import ( - "v2ray.com/core/common" - "v2ray.com/core/common/buf" -) - -type Authenticator interface { - Seal(*buf.Buffer) - Open(*buf.Buffer) bool - Overhead() int -} - -type AuthenticatorFactory interface { - Create(interface{}) Authenticator -} - -var ( - authenticatorCache = make(map[string]AuthenticatorFactory) -) - -func RegisterAuthenticator(name string, factory AuthenticatorFactory) error { - if _, found := authenticatorCache[name]; found { - return common.ErrDuplicatedName - } - authenticatorCache[name] = factory - return nil -} - -func CreateAuthenticator(name string, config interface{}) (Authenticator, error) { - factory, found := authenticatorCache[name] - if !found { - return nil, common.ErrObjectNotFound - } - return factory.Create(config), nil -} - -type AuthenticatorChain struct { - authenticators []Authenticator -} - -func NewAuthenticatorChain(auths ...Authenticator) Authenticator { - return &AuthenticatorChain{ - authenticators: auths, - } -} - -func (v *AuthenticatorChain) Overhead() int { - total := 0 - for _, auth := range v.authenticators { - total += auth.Overhead() - } - return total -} - -func (v *AuthenticatorChain) Open(payload *buf.Buffer) bool { - for _, auth := range v.authenticators { - if !auth.Open(payload) { - return false - } - } - return true -} - -func (v *AuthenticatorChain) Seal(payload *buf.Buffer) { - for i := len(v.authenticators) - 1; i >= 0; i-- { - auth := v.authenticators[i] - auth.Seal(payload) - } -} diff --git a/transport/internet/header.go b/transport/internet/header.go index d49906422..30d4e7136 100644 --- a/transport/internet/header.go +++ b/transport/internet/header.go @@ -4,7 +4,7 @@ import "v2ray.com/core/common" type PacketHeader interface { Size() int - Write([]byte) int + Write([]byte) (int, error) } type PacketHeaderFactory interface { diff --git a/transport/internet/headers/http/http.go b/transport/internet/headers/http/http.go index 7ef1b0610..cd66d456f 100644 --- a/transport/internet/headers/http/http.go +++ b/transport/internet/headers/http/http.go @@ -7,6 +7,7 @@ import ( "net/http" "strings" "time" + "v2ray.com/core/common/buf" "v2ray.com/core/common/loader" "v2ray.com/core/common/serial" @@ -46,9 +47,9 @@ type HeaderReader struct { } func (*HeaderReader) Read(reader io.Reader) (*buf.Buffer, error) { - buffer := buf.NewSmallBuffer() + buffer := buf.NewSmall() for { - _, err := buffer.FillFrom(reader) + err := buffer.AppendSupplier(buf.ReadFrom(reader)) if err != nil { return nil, err } @@ -143,39 +144,39 @@ type HttpAuthenticator struct { } func (v HttpAuthenticator) GetClientWriter() *HeaderWriter { - header := buf.NewSmallBuffer() + header := buf.NewSmall() config := v.config.Request - header.AppendFunc(serial.WriteString(strings.Join([]string{config.Method.GetValue(), config.PickUri(), config.GetFullVersion()}, " "))) - header.AppendFunc(writeCRLF) + header.AppendSupplier(serial.WriteString(strings.Join([]string{config.Method.GetValue(), config.PickUri(), config.GetFullVersion()}, " "))) + header.AppendSupplier(writeCRLF) headers := config.PickHeaders() for _, h := range headers { - header.AppendFunc(serial.WriteString(h)) - header.AppendFunc(writeCRLF) + header.AppendSupplier(serial.WriteString(h)) + header.AppendSupplier(writeCRLF) } - header.AppendFunc(writeCRLF) + header.AppendSupplier(writeCRLF) return &HeaderWriter{ header: header, } } func (v HttpAuthenticator) GetServerWriter() *HeaderWriter { - header := buf.NewSmallBuffer() + header := buf.NewSmall() config := v.config.Response - header.AppendFunc(serial.WriteString(strings.Join([]string{config.GetFullVersion(), config.Status.GetCode(), config.Status.GetReason()}, " "))) - header.AppendFunc(writeCRLF) + header.AppendSupplier(serial.WriteString(strings.Join([]string{config.GetFullVersion(), config.Status.GetCode(), config.Status.GetReason()}, " "))) + header.AppendSupplier(writeCRLF) headers := config.PickHeaders() for _, h := range headers { - header.AppendFunc(serial.WriteString(h)) - header.AppendFunc(writeCRLF) + header.AppendSupplier(serial.WriteString(h)) + header.AppendSupplier(writeCRLF) } if !config.HasHeader("Date") { - header.AppendFunc(serial.WriteString("Date: ")) - header.AppendFunc(serial.WriteString(time.Now().Format(http.TimeFormat))) - header.AppendFunc(writeCRLF) + header.AppendSupplier(serial.WriteString("Date: ")) + header.AppendSupplier(serial.WriteString(time.Now().Format(http.TimeFormat))) + header.AppendSupplier(writeCRLF) } - header.AppendFunc(writeCRLF) + header.AppendSupplier(writeCRLF) return &HeaderWriter{ header: header, } diff --git a/transport/internet/headers/http/http_test.go b/transport/internet/headers/http/http_test.go index 612f08e72..8fb5a4e2e 100644 --- a/transport/internet/headers/http/http_test.go +++ b/transport/internet/headers/http/http_test.go @@ -12,9 +12,9 @@ import ( func TestReaderWriter(t *testing.T) { assert := assert.On(t) - cache := buf.NewBuffer() - b := buf.NewLocalBuffer(256) - b.AppendFunc(serial.WriteString("abcd" + ENDING)) + cache := buf.New() + b := buf.NewLocal(256) + b.AppendSupplier(serial.WriteString("abcd" + ENDING)) writer := NewHeaderWriter(b) writer.Write(cache) cache.Write([]byte{'e', 'f', 'g'}) @@ -41,7 +41,7 @@ func TestRequestHeader(t *testing.T) { }, }).(HttpAuthenticator) - cache := buf.NewBuffer() + cache := buf.New() err := auth.GetClientWriter().Write(cache) assert.Error(err).IsNil() diff --git a/transport/internet/headers/noop/noop.go b/transport/internet/headers/noop/noop.go index b976628e4..4f0ed5946 100644 --- a/transport/internet/headers/noop/noop.go +++ b/transport/internet/headers/noop/noop.go @@ -12,8 +12,8 @@ type NoOpHeader struct{} func (v NoOpHeader) Size() int { return 0 } -func (v NoOpHeader) Write([]byte) int { - return 0 +func (v NoOpHeader) Write([]byte) (int, error) { + return 0, nil } type NoOpHeaderFactory struct{} diff --git a/transport/internet/headers/srtp/srtp.go b/transport/internet/headers/srtp/srtp.go index ce0b1a616..5290a87bc 100644 --- a/transport/internet/headers/srtp/srtp.go +++ b/transport/internet/headers/srtp/srtp.go @@ -17,11 +17,11 @@ func (v *SRTP) Size() int { return 4 } -func (v *SRTP) Write(b []byte) int { +func (v *SRTP) Write(b []byte) (int, error) { v.number++ b = serial.Uint16ToBytes(v.number, b[:0]) b = serial.Uint16ToBytes(v.number, b) - return 4 + return 4, nil } type SRTPFactory struct { diff --git a/transport/internet/headers/srtp/srtp_test.go b/transport/internet/headers/srtp/srtp_test.go index 5e2c0de9c..f0e0be2ba 100644 --- a/transport/internet/headers/srtp/srtp_test.go +++ b/transport/internet/headers/srtp/srtp_test.go @@ -14,8 +14,8 @@ func TestSRTPWrite(t *testing.T) { content := []byte{'a', 'b', 'c', 'd', 'e', 'f', 'g'} srtp := SRTP{} - payload := buf.NewLocalBuffer(2048) - payload.AppendFunc(srtp.Write) + payload := buf.NewLocal(2048) + payload.AppendSupplier(srtp.Write) payload.Append(content) assert.Int(payload.Len()).Equals(len(content) + srtp.Size()) diff --git a/transport/internet/headers/utp/utp.go b/transport/internet/headers/utp/utp.go index 9334483f8..7dab46c33 100644 --- a/transport/internet/headers/utp/utp.go +++ b/transport/internet/headers/utp/utp.go @@ -18,10 +18,10 @@ func (v *UTP) Size() int { return 4 } -func (v *UTP) Write(b []byte) int { +func (v *UTP) Write(b []byte) (int, error) { b = serial.Uint16ToBytes(v.connectionId, b[:0]) b = append(b, v.header, v.extension) - return 4 + return 4, nil } type UTPFactory struct{} diff --git a/transport/internet/headers/utp/utp_test.go b/transport/internet/headers/utp/utp_test.go index 87808904d..3faa3bbcd 100644 --- a/transport/internet/headers/utp/utp_test.go +++ b/transport/internet/headers/utp/utp_test.go @@ -14,8 +14,8 @@ func TestUTPWrite(t *testing.T) { content := []byte{'a', 'b', 'c', 'd', 'e', 'f', 'g'} utp := UTP{} - payload := buf.NewLocalBuffer(2048) - payload.AppendFunc(utp.Write) + payload := buf.NewLocal(2048) + payload.AppendSupplier(utp.Write) payload.Append(content) assert.Int(payload.Len()).Equals(len(content) + utp.Size()) diff --git a/transport/internet/kcp/connection.go b/transport/internet/kcp/connection.go index 53154f674..280b20a38 100644 --- a/transport/internet/kcp/connection.go +++ b/transport/internet/kcp/connection.go @@ -10,7 +10,6 @@ import ( "v2ray.com/core/common/errors" "v2ray.com/core/common/log" "v2ray.com/core/common/predicate" - "v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet/internal" ) @@ -170,7 +169,6 @@ type SystemConnection interface { type Connection struct { conn SystemConnection connRecycler internal.ConnectionRecyler - block internet.Authenticator rd time.Time wd time.Time // write deadline since int64 diff --git a/transport/internet/kcp/dialer.go b/transport/internet/kcp/dialer.go index e85fc81c5..301dddfe9 100644 --- a/transport/internet/kcp/dialer.go +++ b/transport/internet/kcp/dialer.go @@ -88,12 +88,12 @@ func (o *ClientConnection) ResetSecurity(header internet.PacketHeader, security } func (o *ClientConnection) Run() { - payload := buf.NewSmallBuffer() + payload := buf.NewSmall() defer payload.Release() for { payload.Clear() - _, err := payload.FillFrom(o.Conn) + err := payload.AppendSupplier(buf.ReadFrom(o.Conn)) if err != nil { payload.Release() return diff --git a/transport/internet/kcp/io.go b/transport/internet/kcp/io.go index bddfa8520..253d48a29 100644 --- a/transport/internet/kcp/io.go +++ b/transport/internet/kcp/io.go @@ -69,7 +69,7 @@ func (v *KCPPacketWriter) Write(b []byte) (int, error) { x := v.buffer[:] size := 0 if v.Header != nil { - nBytes := v.Header.Write(x) + nBytes, _ := v.Header.Write(x) size += nBytes x = x[nBytes:] } diff --git a/transport/internet/kcp/output.go b/transport/internet/kcp/output.go index 3f5861ff1..47bd6ee7e 100644 --- a/transport/internet/kcp/output.go +++ b/transport/internet/kcp/output.go @@ -22,7 +22,7 @@ func NewSegmentWriter(writer io.Writer, mtu uint32) *BufferedSegmentWriter { return &BufferedSegmentWriter{ mtu: mtu, writer: writer, - buffer: buf.NewSmallBuffer(), + buffer: buf.NewSmall(), } } @@ -35,7 +35,7 @@ func (v *BufferedSegmentWriter) Write(seg Segment) { v.FlushWithoutLock() } - v.buffer.AppendFunc(seg.Bytes()) + v.buffer.AppendSupplier(seg.Bytes()) } func (v *BufferedSegmentWriter) FlushWithoutLock() { diff --git a/transport/internet/kcp/segment.go b/transport/internet/kcp/segment.go index 7514888f6..9b3ed26a4 100644 --- a/transport/internet/kcp/segment.go +++ b/transport/internet/kcp/segment.go @@ -31,7 +31,7 @@ type Segment interface { Conversation() uint16 Command() Command ByteSize() int - Bytes() buf.BytesWriter + Bytes() buf.Supplier } const ( @@ -64,14 +64,14 @@ func (v *DataSegment) Command() Command { func (v *DataSegment) SetData(b []byte) { if v.Data == nil { - v.Data = buf.NewSmallBuffer() + v.Data = buf.NewSmall() } v.Data.Clear() v.Data.Append(b) } -func (v *DataSegment) Bytes() buf.BytesWriter { - return func(b []byte) int { +func (v *DataSegment) Bytes() buf.Supplier { + return func(b []byte) (int, error) { b = serial.Uint16ToBytes(v.Conv, b[:0]) b = append(b, byte(CommandData), byte(v.Option)) b = serial.Uint32ToBytes(v.Timestamp, b) @@ -79,7 +79,7 @@ func (v *DataSegment) Bytes() buf.BytesWriter { b = serial.Uint32ToBytes(v.SendingNext, b) b = serial.Uint16ToBytes(uint16(v.Data.Len()), b) b = append(b, v.Data.Bytes()...) - return v.ByteSize() + return v.ByteSize(), nil } } @@ -137,8 +137,8 @@ func (v *AckSegment) ByteSize() int { return 2 + 1 + 1 + 4 + 4 + 4 + 1 + int(v.Count)*4 } -func (v *AckSegment) Bytes() buf.BytesWriter { - return func(b []byte) int { +func (v *AckSegment) Bytes() buf.Supplier { + return func(b []byte) (int, error) { b = serial.Uint16ToBytes(v.Conv, b[:0]) b = append(b, byte(CommandACK), byte(v.Option)) b = serial.Uint32ToBytes(v.ReceivingWindow, b) @@ -148,7 +148,7 @@ func (v *AckSegment) Bytes() buf.BytesWriter { for i := byte(0); i < v.Count; i++ { b = serial.Uint32ToBytes(v.NumberList[i], b) } - return v.ByteSize() + return v.ByteSize(), nil } } @@ -181,14 +181,14 @@ func (v *CmdOnlySegment) ByteSize() int { return 2 + 1 + 1 + 4 + 4 + 4 } -func (v *CmdOnlySegment) Bytes() buf.BytesWriter { - return func(b []byte) int { +func (v *CmdOnlySegment) Bytes() buf.Supplier { + return func(b []byte) (int, error) { b = serial.Uint16ToBytes(v.Conv, b[:0]) b = append(b, byte(v.Cmd), byte(v.Option)) b = serial.Uint32ToBytes(v.SendingNext, b) b = serial.Uint32ToBytes(v.ReceivinNext, b) b = serial.Uint32ToBytes(v.PeerRTO, b) - return v.ByteSize() + return v.ByteSize(), nil } } diff --git a/transport/internet/kcp/segment_test.go b/transport/internet/kcp/segment_test.go index 079969308..571a6defb 100644 --- a/transport/internet/kcp/segment_test.go +++ b/transport/internet/kcp/segment_test.go @@ -19,7 +19,7 @@ func TestBadSegment(t *testing.T) { func TestDataSegment(t *testing.T) { assert := assert.On(t) - b := buf.NewLocalBuffer(512) + b := buf.NewLocal(512) b.Append([]byte{'a', 'b', 'c', 'd'}) seg := &DataSegment{ Conv: 1, diff --git a/transport/internet/udp/hub.go b/transport/internet/udp/hub.go index c95a6c412..4aca426a1 100644 --- a/transport/internet/udp/hub.go +++ b/transport/internet/udp/hub.go @@ -135,16 +135,14 @@ func (v *UDPHub) start() { oobBytes := make([]byte, 256) for v.Running() { - buffer := buf.NewSmallBuffer() + buffer := buf.NewSmall() var noob int var addr *net.UDPAddr - var err error - buffer.AppendFunc(func(b []byte) int { + err := buffer.AppendSupplier(func(b []byte) (int, error) { n, nb, _, a, e := ReadUDPMsg(v.conn, b, oobBytes) noob = nb addr = a - err = e - return n + return n, e }) if err != nil {