From 2acef1cc0787d84e7d4cfde1f6e77aa25f0735ec Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Thu, 14 Dec 2017 23:24:40 +0100 Subject: [PATCH] cleanup kcp connection --- transport/internet/kcp/connection.go | 21 ++++++++++----------- transport/internet/kcp/connection_test.go | 2 +- transport/internet/kcp/dialer.go | 7 ++++--- transport/internet/kcp/listener.go | 7 ++++--- transport/internet/kcp/receiving.go | 2 +- transport/internet/kcp/sending.go | 2 +- 6 files changed, 21 insertions(+), 20 deletions(-) diff --git a/transport/internet/kcp/connection.go b/transport/internet/kcp/connection.go index 88d7d05a3..7c118a774 100644 --- a/transport/internet/kcp/connection.go +++ b/transport/internet/kcp/connection.go @@ -166,13 +166,14 @@ func (u *Updater) SetInterval(d time.Duration) { } type ConnMetadata struct { - LocalAddr net.Addr - RemoteAddr net.Addr + LocalAddr net.Addr + RemoteAddr net.Addr + Conversation uint16 } // Connection is a KCP connection over UDP. type Connection struct { - meta *ConnMetadata + meta ConnMetadata closer io.Closer rd time.Time wd time.Time // write deadline @@ -181,7 +182,6 @@ type Connection struct { dataOutput chan bool Config *Config - conv uint16 state State stateBeginTime uint32 lastIncomingTime uint32 @@ -200,11 +200,10 @@ type Connection struct { } // NewConnection create a new KCP connection between local and remote. -func NewConnection(conv uint16, meta *ConnMetadata, writer PacketWriter, closer io.Closer, config *Config) *Connection { - log.Trace(newError("creating connection ", conv)) +func NewConnection(meta ConnMetadata, writer PacketWriter, closer io.Closer, config *Config) *Connection { + log.Trace(newError("creating connection ", meta.Conversation)) conn := &Connection{ - conv: conv, meta: meta, closer: closer, since: nowMillisec(), @@ -414,7 +413,7 @@ func (v *Connection) SetState(state State) { current := v.Elapsed() atomic.StoreInt32((*int32)(&v.state), int32(state)) atomic.StoreUint32(&v.stateBeginTime, current) - log.Trace(newError("#", v.conv, " entering state ", state, " at ", current).AtDebug()) + log.Trace(newError("#", v.meta.Conversation, " entering state ", state, " at ", current).AtDebug()) switch state { case StateReadyToClose: @@ -553,7 +552,7 @@ func (v *Connection) Input(segments []Segment) { atomic.StoreUint32(&v.lastIncomingTime, current) for _, seg := range segments { - if seg.Conversation() != v.conv { + if seg.Conversation() != v.meta.Conversation { break } @@ -610,7 +609,7 @@ func (v *Connection) flush() { } if v.State() == StateTerminating { - log.Trace(newError("#", v.conv, " sending terminating cmd.").AtDebug()) + log.Trace(newError("#", v.meta.Conversation, " sending terminating cmd.").AtDebug()) v.Ping(current, CommandTerminate) if current-atomic.LoadUint32(&v.stateBeginTime) > 8000 { @@ -641,7 +640,7 @@ func (v *Connection) State() State { func (v *Connection) Ping(current uint32, cmd Command) { seg := NewCmdOnlySegment() - seg.Conv = v.conv + seg.Conv = v.meta.Conversation seg.Cmd = cmd seg.ReceivinNext = v.receivingWorker.NextNumber() seg.SendingNext = v.sendingWorker.FirstUnacknowledged() diff --git a/transport/internet/kcp/connection_test.go b/transport/internet/kcp/connection_test.go index 89a224ca0..b09298def 100644 --- a/transport/internet/kcp/connection_test.go +++ b/transport/internet/kcp/connection_test.go @@ -19,7 +19,7 @@ func (NoOpCloser) Close() error { func TestConnectionReadTimeout(t *testing.T) { assert := With(t) - conn := NewConnection(1, &ConnMetadata{}, &KCPPacketWriter{ + conn := NewConnection(ConnMetadata{Conversation: 1}, &KCPPacketWriter{ Writer: buf.DiscardBytes, }, NoOpCloser(0), &Config{}) conn.SetReadDeadline(time.Now().Add(time.Second)) diff --git a/transport/internet/kcp/dialer.go b/transport/internet/kcp/dialer.go index ddb0c7411..35fcc94f9 100644 --- a/transport/internet/kcp/dialer.go +++ b/transport/internet/kcp/dialer.go @@ -67,9 +67,10 @@ func DialKCP(ctx context.Context, dest net.Destination) (internet.Connection, er } conv := uint16(atomic.AddUint32(&globalConv, 1)) - session := NewConnection(conv, &ConnMetadata{ - LocalAddr: rawConn.LocalAddr(), - RemoteAddr: rawConn.RemoteAddr(), + session := NewConnection(ConnMetadata{ + LocalAddr: rawConn.LocalAddr(), + RemoteAddr: rawConn.RemoteAddr(), + Conversation: conv, }, writer, rawConn, kcpSettings) go fetchInput(ctx, rawConn, reader, session) diff --git a/transport/internet/kcp/listener.go b/transport/internet/kcp/listener.go index b111c5137..aabe680d1 100644 --- a/transport/internet/kcp/listener.go +++ b/transport/internet/kcp/listener.go @@ -124,9 +124,10 @@ func (v *Listener) OnReceive(payload *buf.Buffer, src net.Destination, originalD Port: int(src.Port), } localAddr := v.hub.Addr() - conn = NewConnection(conv, &ConnMetadata{ - LocalAddr: localAddr, - RemoteAddr: remoteAddr, + conn = NewConnection(ConnMetadata{ + LocalAddr: localAddr, + RemoteAddr: remoteAddr, + Conversation: conv, }, &KCPPacketWriter{ Header: v.header, Security: v.security, diff --git a/transport/internet/kcp/receiving.go b/transport/internet/kcp/receiving.go index b1e37c1f5..65e3afc53 100644 --- a/transport/internet/kcp/receiving.go +++ b/transport/internet/kcp/receiving.go @@ -252,7 +252,7 @@ func (w *ReceivingWorker) Flush(current uint32) { func (w *ReceivingWorker) Write(seg Segment) error { ackSeg := seg.(*AckSegment) - ackSeg.Conv = w.conn.conv + ackSeg.Conv = w.conn.meta.Conversation ackSeg.ReceivingNext = w.nextNumber ackSeg.ReceivingWindow = w.nextNumber + w.windowSize if w.conn.State() == StateReadyToClose { diff --git a/transport/internet/kcp/sending.go b/transport/internet/kcp/sending.go index 8e51fe34a..cfd40eaee 100644 --- a/transport/internet/kcp/sending.go +++ b/transport/internet/kcp/sending.go @@ -300,7 +300,7 @@ func (v *SendingWorker) Push() *buf.Buffer { func (v *SendingWorker) Write(seg Segment) error { dataSeg := seg.(*DataSegment) - dataSeg.Conv = v.conn.conv + dataSeg.Conv = v.conn.meta.Conversation dataSeg.SendingNext = v.firstUnacknowledged dataSeg.Option = 0 if v.conn.State() == StateReadyToClose {