From ea080b2ebfaf96ce9a50c709e9f70b2e7184e24a Mon Sep 17 00:00:00 2001 From: v2ray Date: Mon, 11 Jan 2016 22:51:35 +0100 Subject: [PATCH] rewrite id hash queue --- common/collect/sized_queue.go | 24 ++++++ common/collect/timed_queue.go | 105 --------------------------- common/collect/timed_queue_test.go | 56 -------------- proxy/vmess/protocol/user/userset.go | 73 +++++++++++-------- 4 files changed, 68 insertions(+), 190 deletions(-) create mode 100644 common/collect/sized_queue.go delete mode 100644 common/collect/timed_queue.go delete mode 100644 common/collect/timed_queue_test.go diff --git a/common/collect/sized_queue.go b/common/collect/sized_queue.go new file mode 100644 index 000000000..6386ecef5 --- /dev/null +++ b/common/collect/sized_queue.go @@ -0,0 +1,24 @@ +package collect + +type SizedQueue struct { + elements []interface{} + nextPos int +} + +func NewSizedQueue(size int) *SizedQueue { + return &SizedQueue{ + elements: make([]interface{}, size), + nextPos: 0, + } +} + +// Put puts a new element into the queue and pop out the first element if queue is full. +func (this *SizedQueue) Put(element interface{}) interface{} { + res := this.elements[this.nextPos] + this.elements[this.nextPos] = element + this.nextPos++ + if this.nextPos == len(this.elements) { + this.nextPos = 0 + } + return res +} diff --git a/common/collect/timed_queue.go b/common/collect/timed_queue.go deleted file mode 100644 index 3a498b584..000000000 --- a/common/collect/timed_queue.go +++ /dev/null @@ -1,105 +0,0 @@ -package collect - -import ( - "container/heap" - "sync" - "time" -) - -type timedQueueEntry struct { - timeSec int64 - value interface{} -} - -type timedQueueImpl []*timedQueueEntry - -func (queue timedQueueImpl) Len() int { - return len(queue) -} - -func (queue timedQueueImpl) Less(i, j int) bool { - return queue[i].timeSec < queue[j].timeSec -} - -func (queue timedQueueImpl) Swap(i, j int) { - queue[i], queue[j] = queue[j], queue[i] -} - -func (queue *timedQueueImpl) Push(value interface{}) { - entry := value.(*timedQueueEntry) - *queue = append(*queue, entry) -} - -func (queue *timedQueueImpl) Pop() interface{} { - old := *queue - n := len(old) - v := old[n-1] - old[n-1] = nil - *queue = old[:n-1] - return v -} - -// TimedQueue is a priority queue that entries with oldest timestamp get removed first. -type TimedQueue struct { - queue timedQueueImpl - access sync.Mutex - removedCallback func(interface{}) -} - -func NewTimedQueue(updateInterval int, removedCallback func(interface{})) *TimedQueue { - queue := &TimedQueue{ - queue: make([]*timedQueueEntry, 0, 256), - removedCallback: removedCallback, - access: sync.Mutex{}, - } - go queue.cleanup(time.Tick(time.Duration(updateInterval) * time.Second)) - return queue -} - -func (queue *TimedQueue) Add(value interface{}, time2Remove int64) { - newEntry := &timedQueueEntry{ - timeSec: time2Remove, - value: value, - } - var removedEntry *timedQueueEntry - queue.access.Lock() - nowSec := time.Now().Unix() - if queue.queue.Len() > 0 && queue.queue[0].timeSec < nowSec { - removedEntry = queue.queue[0] - queue.queue[0] = newEntry - heap.Fix(&queue.queue, 0) - } else { - heap.Push(&queue.queue, newEntry) - } - queue.access.Unlock() - if removedEntry != nil { - queue.removedCallback(removedEntry.value) - } -} - -func (queue *TimedQueue) cleanup(tick <-chan time.Time) { - for now := range tick { - nowSec := now.Unix() - removedEntries := make([]*timedQueueEntry, 0, 128) - queue.access.Lock() - changed := false - for i := 0; i < queue.queue.Len(); i++ { - entry := queue.queue[i] - if entry.timeSec < nowSec { - removedEntries = append(removedEntries, entry) - queue.queue.Swap(i, queue.queue.Len()-1) - queue.queue.Pop() - changed = true - } else { - break - } - } - if changed { - heap.Init(&queue.queue) - } - queue.access.Unlock() - for _, entry := range removedEntries { - queue.removedCallback(entry.value) - } - } -} diff --git a/common/collect/timed_queue_test.go b/common/collect/timed_queue_test.go deleted file mode 100644 index 940f64e15..000000000 --- a/common/collect/timed_queue_test.go +++ /dev/null @@ -1,56 +0,0 @@ -package collect - -import ( - "testing" - "time" - - v2testing "github.com/v2ray/v2ray-core/testing" - "github.com/v2ray/v2ray-core/testing/assert" -) - -func TestTimedQueue(t *testing.T) { - v2testing.Current(t) - - removed := make(map[string]bool) - - nowSec := time.Now().Unix() - q := NewTimedQueue(2, func(v interface{}) { - removed[v.(string)] = true - }) - - q.Add("Value1", nowSec) - q.Add("Value2", nowSec+5) - - v1, ok := removed["Value1"] - assert.Bool(ok).IsFalse() - - v2, ok := removed["Value2"] - assert.Bool(ok).IsFalse() - - tick := time.Tick(4 * time.Second) - <-tick - - v1, ok = removed["Value1"] - assert.Bool(ok).IsTrue() - assert.Bool(v1).IsTrue() - removed["Value1"] = false - - v2, ok = removed["Value2"] - assert.Bool(ok).IsFalse() - - <-tick - v2, ok = removed["Value2"] - assert.Bool(ok).IsTrue() - assert.Bool(v2).IsTrue() - removed["Value2"] = false - - <-tick - assert.Bool(removed["Values"]).IsFalse() - - q.Add("Value1", time.Now().Unix()+10) - - <-tick - v1, ok = removed["Value1"] - assert.Bool(ok).IsTrue() - assert.Bool(v1).IsFalse() -} diff --git a/proxy/vmess/protocol/user/userset.go b/proxy/vmess/protocol/user/userset.go index 333dfa5d1..85501e19f 100644 --- a/proxy/vmess/protocol/user/userset.go +++ b/proxy/vmess/protocol/user/userset.go @@ -13,16 +13,23 @@ const ( cacheDurationSec = 120 ) +type idEntry struct { + id *vmess.ID + userIdx int + lastSec int64 + hashes *collect.SizedQueue +} + type UserSet interface { AddUser(user vmess.User) error GetUser(timeHash []byte) (vmess.User, int64, bool) } type TimedUserSet struct { - validUsers []vmess.User - userHash map[string]indexTimePair - userHashDeleteQueue *collect.TimedQueue - access sync.RWMutex + validUsers []vmess.User + userHash map[string]indexTimePair + ids []*idEntry + access sync.RWMutex } type indexTimePair struct { @@ -35,55 +42,63 @@ func NewTimedUserSet() UserSet { validUsers: make([]vmess.User, 0, 16), userHash: make(map[string]indexTimePair, 512), access: sync.RWMutex{}, + ids: make([]*idEntry, 0, 512), } - tus.userHashDeleteQueue = collect.NewTimedQueue(updateIntervalSec, tus.removeEntry) go tus.updateUserHash(time.Tick(updateIntervalSec * time.Second)) return tus } -func (us *TimedUserSet) removeEntry(entry interface{}) { - us.access.Lock() - delete(us.userHash, entry.(string)) - us.access.Unlock() -} - -func (us *TimedUserSet) generateNewHashes(lastSec, nowSec int64, idx int, id *vmess.ID) { +func (us *TimedUserSet) generateNewHashes(nowSec int64, idx int, entry *idEntry) { idHash := NewTimeHash(HMACHash{}) - for lastSec < nowSec { - idHash := idHash.Hash(id.Bytes(), lastSec) + for entry.lastSec <= nowSec { + idHashSlice := idHash.Hash(entry.id.Bytes(), entry.lastSec) + hashValue := string(idHashSlice) us.access.Lock() - us.userHash[string(idHash)] = indexTimePair{idx, lastSec} + us.userHash[hashValue] = indexTimePair{idx, entry.lastSec} us.access.Unlock() - us.userHashDeleteQueue.Add(string(idHash), lastSec+2*cacheDurationSec) - lastSec++ + + hash2Remove := entry.hashes.Put(hashValue) + if hash2Remove != nil { + us.access.Lock() + delete(us.userHash, hash2Remove.(string)) + us.access.Unlock() + } + entry.lastSec++ } } func (us *TimedUserSet) updateUserHash(tick <-chan time.Time) { - lastSec := time.Now().Unix() - cacheDurationSec - for now := range tick { nowSec := now.Unix() + cacheDurationSec - for idx, user := range us.validUsers { - us.generateNewHashes(lastSec, nowSec, idx, user.ID()) - for _, alterid := range user.AlterIDs() { - us.generateNewHashes(lastSec, nowSec, idx, alterid) - } + for _, entry := range us.ids { + us.generateNewHashes(nowSec, entry.userIdx, entry) } - lastSec = nowSec } } func (us *TimedUserSet) AddUser(user vmess.User) error { - id := user.ID() idx := len(us.validUsers) us.validUsers = append(us.validUsers, user) nowSec := time.Now().Unix() - lastSec := nowSec - cacheDurationSec - us.generateNewHashes(lastSec, nowSec+cacheDurationSec, idx, id) + + entry := &idEntry{ + id: user.ID(), + userIdx: idx, + lastSec: nowSec - cacheDurationSec, + hashes: collect.NewSizedQueue(2*cacheDurationSec + 1), + } + us.generateNewHashes(nowSec+cacheDurationSec, idx, entry) + us.ids = append(us.ids, entry) for _, alterid := range user.AlterIDs() { - us.generateNewHashes(lastSec, nowSec+cacheDurationSec, idx, alterid) + entry := &idEntry{ + id: alterid, + userIdx: idx, + lastSec: nowSec - cacheDurationSec, + hashes: collect.NewSizedQueue(2*cacheDurationSec + 1), + } + us.generateNewHashes(nowSec+cacheDurationSec, idx, entry) + us.ids = append(us.ids, entry) } return nil