v2ray-core/common/collect/timed_queue.go

106 lines
2.3 KiB
Go
Raw Normal View History

2015-09-28 01:11:40 +02:00
package collect
import (
"container/heap"
"sync"
"time"
)
type timedQueueEntry struct {
timeSec int64
value interface{}
}
type timedQueueImpl []*timedQueueEntry
func (queue timedQueueImpl) Len() int {
return len(queue)
}
func (queue timedQueueImpl) Less(i, j int) bool {
return queue[i].timeSec < queue[j].timeSec
}
func (queue timedQueueImpl) Swap(i, j int) {
2015-11-17 14:26:13 +01:00
queue[i], queue[j] = queue[j], queue[i]
2015-09-28 01:11:40 +02:00
}
func (queue *timedQueueImpl) Push(value interface{}) {
entry := value.(*timedQueueEntry)
*queue = append(*queue, entry)
}
func (queue *timedQueueImpl) Pop() interface{} {
old := *queue
n := len(old)
v := old[n-1]
2015-10-10 20:52:23 +02:00
old[n-1] = nil
2015-09-28 01:11:40 +02:00
*queue = old[:n-1]
return v
}
2015-12-04 12:13:35 +00:00
// TimedQueue is a priority queue that entries with oldest timestamp get removed first.
2015-09-28 01:11:40 +02:00
type TimedQueue struct {
2016-01-10 09:11:46 +01:00
queue timedQueueImpl
access sync.Mutex
removedCallback func(interface{})
2015-09-28 01:11:40 +02:00
}
2016-01-10 09:11:46 +01:00
func NewTimedQueue(updateInterval int, removedCallback func(interface{})) *TimedQueue {
2015-09-28 01:11:40 +02:00
queue := &TimedQueue{
2016-01-10 09:11:46 +01:00
queue: make([]*timedQueueEntry, 0, 256),
removedCallback: removedCallback,
access: sync.Mutex{},
2015-09-28 01:11:40 +02:00
}
go queue.cleanup(time.Tick(time.Duration(updateInterval) * time.Second))
return queue
}
func (queue *TimedQueue) Add(value interface{}, time2Remove int64) {
2016-01-10 09:11:46 +01:00
newEntry := &timedQueueEntry{
2015-09-28 01:11:40 +02:00
timeSec: time2Remove,
value: value,
2016-01-10 09:11:46 +01:00
}
var removedEntry *timedQueueEntry
queue.access.Lock()
nowSec := time.Now().Unix()
if queue.queue.Len() > 0 && queue.queue[0].timeSec < nowSec {
removedEntry = queue.queue[0]
queue.queue[0] = newEntry
heap.Fix(&queue.queue, 0)
} else {
heap.Push(&queue.queue, newEntry)
}
2015-09-28 01:11:40 +02:00
queue.access.Unlock()
2016-01-10 09:11:46 +01:00
if removedEntry != nil {
2016-01-10 10:24:21 +01:00
queue.removedCallback(removedEntry.value)
2016-01-10 09:11:46 +01:00
}
2015-09-28 01:11:40 +02:00
}
func (queue *TimedQueue) cleanup(tick <-chan time.Time) {
2015-10-10 21:29:37 +02:00
for now := range tick {
nowSec := now.Unix()
2016-01-10 09:11:46 +01:00
removedEntries := make([]*timedQueueEntry, 0, 128)
queue.access.Lock()
changed := false
for i := 0; i < queue.queue.Len(); i++ {
entry := queue.queue[i]
if entry.timeSec < nowSec {
removedEntries = append(removedEntries, entry)
queue.queue.Swap(i, queue.queue.Len()-1)
queue.queue.Pop()
changed = true
2016-01-10 10:31:27 +01:00
} else {
break
2015-10-10 21:29:37 +02:00
}
2016-01-10 09:11:46 +01:00
}
if changed {
heap.Init(&queue.queue)
}
queue.access.Unlock()
for _, entry := range removedEntries {
queue.removedCallback(entry.value)
2015-10-10 21:29:37 +02:00
}
2015-09-28 01:11:40 +02:00
}
}