diff --git a/app/pubsub/internal/pubsub.go b/app/pubsub/internal/pubsub.go deleted file mode 100644 index d7af629f6..000000000 --- a/app/pubsub/internal/pubsub.go +++ /dev/null @@ -1,64 +0,0 @@ -package internal - -import ( - "sync" - - "github.com/v2ray/v2ray-core/app" - "github.com/v2ray/v2ray-core/app/pubsub" -) - -type TopicHandlerList struct { - sync.RWMutex - handlers []pubsub.TopicHandler -} - -func NewTopicHandlerList(handlers ...pubsub.TopicHandler) *TopicHandlerList { - return &TopicHandlerList{ - handlers: handlers, - } -} - -func (this *TopicHandlerList) Add(handler pubsub.TopicHandler) { - this.Lock() - this.handlers = append(this.handlers, handler) - this.Unlock() -} - -func (this *TopicHandlerList) Dispatch(message pubsub.PubsubMessage) { - this.RLock() - for _, handler := range this.handlers { - go handler(message) - } - this.RUnlock() -} - -type Pubsub struct { - topics map[string]*TopicHandlerList - sync.RWMutex -} - -func New() *Pubsub { - return &Pubsub{ - topics: make(map[string]*TopicHandlerList), - } -} - -func (this *Pubsub) Publish(context app.Context, topic string, message pubsub.PubsubMessage) { - this.RLock() - list, found := this.topics[topic] - this.RUnlock() - - if found { - list.Dispatch(message) - } -} - -func (this *Pubsub) Subscribe(context app.Context, topic string, handler pubsub.TopicHandler) { - this.Lock() - defer this.Unlock() - if list, found := this.topics[topic]; found { - list.Add(handler) - } else { - this.topics[topic] = NewTopicHandlerList(handler) - } -} diff --git a/app/pubsub/internal/pubsub_test.go b/app/pubsub/internal/pubsub_test.go deleted file mode 100644 index 5b16fad53..000000000 --- a/app/pubsub/internal/pubsub_test.go +++ /dev/null @@ -1,38 +0,0 @@ -package internal_test - -import ( - "testing" - "time" - - "github.com/v2ray/v2ray-core/app/pubsub" - . "github.com/v2ray/v2ray-core/app/pubsub/internal" - apptesting "github.com/v2ray/v2ray-core/app/testing" - v2testing "github.com/v2ray/v2ray-core/testing" - "github.com/v2ray/v2ray-core/testing/assert" -) - -func TestPubsub(t *testing.T) { - v2testing.Current(t) - - messages := make(map[string]pubsub.PubsubMessage) - - ps := New() - ps.Subscribe(&apptesting.Context{}, "t1", func(message pubsub.PubsubMessage) { - messages["t1"] = message - }) - - ps.Subscribe(&apptesting.Context{}, "t2", func(message pubsub.PubsubMessage) { - messages["t2"] = message - }) - - message := pubsub.PubsubMessage([]byte("This is a pubsub message.")) - ps.Publish(&apptesting.Context{}, "t2", message) - <-time.Tick(time.Second) - - _, found := messages["t1"] - assert.Bool(found).IsFalse() - - actualMessage, found := messages["t2"] - assert.Bool(found).IsTrue() - assert.StringLiteral(string(actualMessage)).Equals(string(message)) -} diff --git a/app/pubsub/pubsub.go b/app/pubsub/pubsub.go deleted file mode 100644 index 5841bc391..000000000 --- a/app/pubsub/pubsub.go +++ /dev/null @@ -1,45 +0,0 @@ -package pubsub - -import ( - "github.com/v2ray/v2ray-core/app" -) - -const ( - APP_ID = app.ID(3) -) - -type PubsubMessage []byte -type TopicHandler func(PubsubMessage) - -type Pubsub interface { - Publish(topic string, message PubsubMessage) - Subscribe(topic string, handler TopicHandler) -} - -type pubsubWithContext interface { - Publish(context app.Context, topic string, message PubsubMessage) - Subscribe(context app.Context, topic string, handler TopicHandler) -} - -type contextedPubsub struct { - context app.Context - pubsub pubsubWithContext -} - -func (this *contextedPubsub) Publish(topic string, message PubsubMessage) { - this.pubsub.Publish(this.context, topic, message) -} - -func (this *contextedPubsub) Subscribe(topic string, handler TopicHandler) { - this.pubsub.Subscribe(this.context, topic, handler) -} - -func init() { - app.RegisterApp(APP_ID, func(context app.Context, obj interface{}) interface{} { - pubsub := obj.(pubsubWithContext) - return &contextedPubsub{ - context: context, - pubsub: pubsub, - } - }) -}