diff --git a/common/net/transport.go b/common/net/transport.go index eaf44e5f2..a25d4f6a7 100644 --- a/common/net/transport.go +++ b/common/net/transport.go @@ -4,12 +4,8 @@ import ( "io" ) -const ( - bufferSize = 4 * 1024 -) - -func ReadFrom(reader io.Reader) ([]byte, error) { - buffer := make([]byte, bufferSize) +func ReadFrom(reader io.Reader, sizeInKilo int) ([]byte, error) { + buffer := make([]byte, sizeInKilo<<10) nBytes, err := reader.Read(buffer) if nBytes == 0 { return nil, err @@ -19,8 +15,9 @@ func ReadFrom(reader io.Reader) ([]byte, error) { // ReaderToChan dumps all content from a given reader to a chan by constantly reading it until EOF. func ReaderToChan(stream chan<- []byte, reader io.Reader) error { + bufferSizeKilo := 4 for { - data, err := ReadFrom(reader) + data, err := ReadFrom(reader, bufferSizeKilo) if len(data) > 0 { stream <- data } diff --git a/common/net/transport_test.go b/common/net/transport_test.go index 767f87d19..a76a4464d 100644 --- a/common/net/transport_test.go +++ b/common/net/transport_test.go @@ -22,7 +22,7 @@ func TestReaderAndWrite(t *testing.T) { readerBuffer := bytes.NewReader(buffer) writerBuffer := bytes.NewBuffer(make([]byte, 0, size)) - transportChan := make(chan []byte, size/bufferSize*10) + transportChan := make(chan []byte, 1024) err = ReaderToChan(transportChan, readerBuffer) assert.Error(err).Equals(io.EOF) @@ -52,43 +52,96 @@ func (reader *StaticReader) Read(b []byte) (size int, err error) { return } -func BenchmarkTransport(b *testing.B) { +func BenchmarkTransport1K(b *testing.B) { + size := 1 * 1024 + + for i := 0; i < b.N; i++ { + runBenchmarkTransport(size) + } +} + +func BenchmarkTransport2K(b *testing.B) { + size := 2 * 1024 + + for i := 0; i < b.N; i++ { + runBenchmarkTransport(size) + } +} + +func BenchmarkTransport4K(b *testing.B) { + size := 4 * 1024 + + for i := 0; i < b.N; i++ { + runBenchmarkTransport(size) + } +} + +func BenchmarkTransport10K(b *testing.B) { + size := 10 * 1024 + + for i := 0; i < b.N; i++ { + runBenchmarkTransport(size) + } +} + +func BenchmarkTransport100K(b *testing.B) { + size := 100 * 1024 + + for i := 0; i < b.N; i++ { + runBenchmarkTransport(size) + } +} + +func BenchmarkTransport1M(b *testing.B) { size := 1024 * 1024 for i := 0; i < b.N; i++ { - transportChanA := make(chan []byte, 128) - transportChanB := make(chan []byte, 128) - - readerA := &StaticReader{size, 0} - readerB := &StaticReader{size, 0} - - writerA := ioutil.Discard - writerB := ioutil.Discard - - finishA := make(chan bool) - finishB := make(chan bool) - - go func() { - ChanToWriter(writerA, transportChanA) - close(finishA) - }() - - go func() { - ReaderToChan(transportChanA, readerA) - close(transportChanA) - }() - - go func() { - ChanToWriter(writerB, transportChanB) - close(finishB) - }() - - go func() { - ReaderToChan(transportChanB, readerB) - close(transportChanB) - }() - - <-transportChanA - <-transportChanB + runBenchmarkTransport(size) } } + +func BenchmarkTransport10M(b *testing.B) { + size := 10 * 1024 * 1024 + + for i := 0; i < b.N; i++ { + runBenchmarkTransport(size) + } +} + +func runBenchmarkTransport(size int) { + + transportChanA := make(chan []byte, 128) + transportChanB := make(chan []byte, 128) + + readerA := &StaticReader{size, 0} + readerB := &StaticReader{size, 0} + + writerA := ioutil.Discard + writerB := ioutil.Discard + + finishA := make(chan bool) + finishB := make(chan bool) + + go func() { + ChanToWriter(writerA, transportChanA) + close(finishA) + }() + + go func() { + ReaderToChan(transportChanA, readerA) + close(transportChanA) + }() + + go func() { + ChanToWriter(writerB, transportChanB) + close(finishB) + }() + + go func() { + ReaderToChan(transportChanB, readerB) + close(transportChanB) + }() + + <-transportChanA + <-transportChanB +} diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index 2adddfd72..7fb55b707 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -65,7 +65,11 @@ func dumpOutput(conn net.Conn, output chan<- []byte, finish *sync.Mutex, udp boo defer finish.Unlock() defer close(output) - response, err := v2net.ReadFrom(conn) + bufferSize := 4 /* KB */ + if udp { + bufferSize = 2 + } + response, err := v2net.ReadFrom(conn, bufferSize) log.Info("Freedom receives %d bytes from %s", len(response), conn.RemoteAddr().String()) if len(response) > 0 { output <- response diff --git a/proxy/socks/socks.go b/proxy/socks/socks.go index d28c96046..c97f0c6de 100644 --- a/proxy/socks/socks.go +++ b/proxy/socks/socks.go @@ -158,7 +158,7 @@ func (server *SocksServer) handleSocks5(reader *v2net.TimeOutReader, writer io.W } dest := request.Destination() - data, err := v2net.ReadFrom(reader) + data, err := v2net.ReadFrom(reader, 4) if err != nil { return err } @@ -192,8 +192,8 @@ func (server *SocksServer) handleUDP(reader *v2net.TimeOutReader, writer io.Writ return err } - reader.SetTimeOut(300) /* 5 minutes */ - v2net.ReadFrom(reader) // Just in case of anything left in the socket + reader.SetTimeOut(300) /* 5 minutes */ + v2net.ReadFrom(reader, 1) // Just in case of anything left in the socket // The TCP connection closes after this method returns. We need to wait until // the client closes it. // TODO: get notified from UDP part @@ -215,7 +215,7 @@ func (server *SocksServer) handleSocks4(reader io.Reader, writer io.Writer, auth } dest := v2net.NewTCPDestination(v2net.IPAddress(auth.IP[:], auth.Port)) - data, err := v2net.ReadFrom(reader) + data, err := v2net.ReadFrom(reader, 4) if err != nil { return err } diff --git a/proxy/vmess/vmessout.go b/proxy/vmess/vmessout.go index 8ad1ab84a..08e761925 100644 --- a/proxy/vmess/vmessout.go +++ b/proxy/vmess/vmessout.go @@ -173,7 +173,7 @@ func handleResponse(conn net.Conn, request *protocol.VMessRequest, output chan<- return } - buffer, err := v2net.ReadFrom(decryptResponseReader) + buffer, err := v2net.ReadFrom(decryptResponseReader, 4) if err != nil { log.Error("VMessOut: Failed to read VMess response (%d bytes): %v", len(buffer), err) return