mirror of
				https://github.com/v2fly/v2ray-core.git
				synced 2025-11-04 11:49:16 +00:00 
			
		
		
		
	Stats: Add ChannelConfig & Return error on subscription
This commit is contained in:
		
							parent
							
								
									2cc8c1aa01
								
							
						
					
					
						commit
						fa37f82b89
					
				@ -5,15 +5,33 @@ package stats
 | 
				
			|||||||
import (
 | 
					import (
 | 
				
			||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"v2ray.com/core/common"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Channel is an implementation of stats.Channel.
 | 
					// Channel is an implementation of stats.Channel.
 | 
				
			||||||
type Channel struct {
 | 
					type Channel struct {
 | 
				
			||||||
 | 
						channel     chan interface{}
 | 
				
			||||||
 | 
						subscribers []chan interface{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Synchronization components
 | 
				
			||||||
	access sync.RWMutex
 | 
						access sync.RWMutex
 | 
				
			||||||
	closed chan struct{}
 | 
						closed chan struct{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	channel     chan interface{}
 | 
						// Channel options
 | 
				
			||||||
	subscribers []chan interface{}
 | 
						subscriberLimit   int           // Set to 0 as no subscriber limit
 | 
				
			||||||
 | 
						channelBufferSize int           // Set to 0 as no buffering
 | 
				
			||||||
 | 
						broadcastTimeout  time.Duration // Set to 0 as non-blocking immediate timeout
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// NewChannel creates an instance of Statistics Channel.
 | 
				
			||||||
 | 
					func NewChannel(config *ChannelConfig) *Channel {
 | 
				
			||||||
 | 
						return &Channel{
 | 
				
			||||||
 | 
							channel:           make(chan interface{}, config.BufferSize),
 | 
				
			||||||
 | 
							subscriberLimit:   int(config.SubscriberLimit),
 | 
				
			||||||
 | 
							channelBufferSize: int(config.BufferSize),
 | 
				
			||||||
 | 
							broadcastTimeout:  time.Duration(config.BroadcastTimeout+1) * time.Millisecond,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Channel returns the underlying go channel.
 | 
					// Channel returns the underlying go channel.
 | 
				
			||||||
@ -31,16 +49,19 @@ func (c *Channel) Subscribers() []chan interface{} {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Subscribe implements stats.Channel.
 | 
					// Subscribe implements stats.Channel.
 | 
				
			||||||
func (c *Channel) Subscribe() chan interface{} {
 | 
					func (c *Channel) Subscribe() (chan interface{}, error) {
 | 
				
			||||||
	c.access.Lock()
 | 
						c.access.Lock()
 | 
				
			||||||
	defer c.access.Unlock()
 | 
						defer c.access.Unlock()
 | 
				
			||||||
	subscriber := make(chan interface{})
 | 
						if c.subscriberLimit > 0 && len(c.subscribers) >= c.subscriberLimit {
 | 
				
			||||||
 | 
							return nil, newError("Number of subscribers has reached limit")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						subscriber := make(chan interface{}, c.channelBufferSize)
 | 
				
			||||||
	c.subscribers = append(c.subscribers, subscriber)
 | 
						c.subscribers = append(c.subscribers, subscriber)
 | 
				
			||||||
	return subscriber
 | 
						return subscriber, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Unsubscribe implements stats.Channel.
 | 
					// Unsubscribe implements stats.Channel.
 | 
				
			||||||
func (c *Channel) Unsubscribe(subscriber chan interface{}) {
 | 
					func (c *Channel) Unsubscribe(subscriber chan interface{}) error {
 | 
				
			||||||
	c.access.Lock()
 | 
						c.access.Lock()
 | 
				
			||||||
	defer c.access.Unlock()
 | 
						defer c.access.Unlock()
 | 
				
			||||||
	for i, s := range c.subscribers {
 | 
						for i, s := range c.subscribers {
 | 
				
			||||||
@ -50,9 +71,9 @@ func (c *Channel) Unsubscribe(subscriber chan interface{}) {
 | 
				
			|||||||
			copy(subscribers[:i], c.subscribers[:i])
 | 
								copy(subscribers[:i], c.subscribers[:i])
 | 
				
			||||||
			copy(subscribers[i:], c.subscribers[i+1:])
 | 
								copy(subscribers[i:], c.subscribers[i+1:])
 | 
				
			||||||
			c.subscribers = subscribers
 | 
								c.subscribers = subscribers
 | 
				
			||||||
			return
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Publish implements stats.Channel.
 | 
					// Publish implements stats.Channel.
 | 
				
			||||||
@ -85,12 +106,7 @@ func (c *Channel) Running() bool {
 | 
				
			|||||||
func (c *Channel) Start() error {
 | 
					func (c *Channel) Start() error {
 | 
				
			||||||
	c.access.Lock()
 | 
						c.access.Lock()
 | 
				
			||||||
	defer c.access.Unlock()
 | 
						defer c.access.Unlock()
 | 
				
			||||||
	if c.Running() {
 | 
						if !c.Running() {
 | 
				
			||||||
		return nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if c.channel == nil { // Initialize publisher channel
 | 
					 | 
				
			||||||
		c.channel = make(chan interface{}, 16)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
		c.closed = make(chan struct{}) // Reset close signal
 | 
							c.closed = make(chan struct{}) // Reset close signal
 | 
				
			||||||
		go func() {
 | 
							go func() {
 | 
				
			||||||
			for {
 | 
								for {
 | 
				
			||||||
@ -99,20 +115,21 @@ func (c *Channel) Start() error {
 | 
				
			|||||||
					for _, sub := range c.Subscribers() { // Concurrency-safe subscribers retreivement
 | 
										for _, sub := range c.Subscribers() { // Concurrency-safe subscribers retreivement
 | 
				
			||||||
						select {
 | 
											select {
 | 
				
			||||||
						case sub <- message: // Successfully sent message
 | 
											case sub <- message: // Successfully sent message
 | 
				
			||||||
					case <-time.After(100 * time.Millisecond):
 | 
											case <-time.After(c.broadcastTimeout): // Remove timeout subscriber
 | 
				
			||||||
						c.Unsubscribe(sub) // Remove timeout subscriber
 | 
												common.Must(c.Unsubscribe(sub))
 | 
				
			||||||
							close(sub) // Actively close subscriber as notification
 | 
												close(sub) // Actively close subscriber as notification
 | 
				
			||||||
						}
 | 
											}
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
				case <-c.closed: // Channel closed
 | 
									case <-c.closed: // Channel closed
 | 
				
			||||||
					for _, sub := range c.Subscribers() { // Remove all subscribers
 | 
										for _, sub := range c.Subscribers() { // Remove all subscribers
 | 
				
			||||||
					c.Unsubscribe(sub)
 | 
											common.Must(c.Unsubscribe(sub))
 | 
				
			||||||
						close(sub)
 | 
											close(sub)
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
					return
 | 
										return
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}()
 | 
							}()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -1,7 +1,6 @@
 | 
				
			|||||||
package stats_test
 | 
					package stats_test
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
					 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"testing"
 | 
						"testing"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
@ -12,25 +11,30 @@ import (
 | 
				
			|||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestStatsChannel(t *testing.T) {
 | 
					func TestStatsChannel(t *testing.T) {
 | 
				
			||||||
	raw, err := common.CreateObject(context.Background(), &Config{})
 | 
						// At most 2 subscribers could be registered
 | 
				
			||||||
 | 
						c := NewChannel(&ChannelConfig{SubscriberLimit: 2})
 | 
				
			||||||
 | 
						source := c.Channel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						a, err := stats.SubscribeRunnableChannel(c)
 | 
				
			||||||
 | 
						common.Must(err)
 | 
				
			||||||
 | 
						if !c.Running() {
 | 
				
			||||||
 | 
							t.Fatal("unexpected failure in running channel after first subscription")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						b, err := c.Subscribe()
 | 
				
			||||||
	common.Must(err)
 | 
						common.Must(err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	m := raw.(stats.Manager)
 | 
						// Test that third subscriber is forbidden
 | 
				
			||||||
	c, err := m.RegisterChannel("test.channel")
 | 
						_, err = c.Subscribe()
 | 
				
			||||||
	common.Must(err)
 | 
						if err == nil {
 | 
				
			||||||
	common.Must(m.Start())
 | 
							t.Fatal("unexpected successful subscription")
 | 
				
			||||||
	defer m.Close()
 | 
						}
 | 
				
			||||||
 | 
						t.Log("expected error: ", err)
 | 
				
			||||||
	source := c.(*Channel).Channel()
 | 
					 | 
				
			||||||
	a := c.Subscribe()
 | 
					 | 
				
			||||||
	b := c.Subscribe()
 | 
					 | 
				
			||||||
	defer c.Unsubscribe(a)
 | 
					 | 
				
			||||||
	defer c.Unsubscribe(b)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	stopCh := make(chan struct{})
 | 
						stopCh := make(chan struct{})
 | 
				
			||||||
	errCh := make(chan string)
 | 
						errCh := make(chan string)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	go func() {
 | 
						go func() { // Blocking publish
 | 
				
			||||||
		source <- 1
 | 
							source <- 1
 | 
				
			||||||
		source <- 2
 | 
							source <- 2
 | 
				
			||||||
		source <- "3"
 | 
							source <- "3"
 | 
				
			||||||
@ -84,22 +88,31 @@ func TestStatsChannel(t *testing.T) {
 | 
				
			|||||||
		t.Fatal(e)
 | 
							t.Fatal(e)
 | 
				
			||||||
	case <-stopCh:
 | 
						case <-stopCh:
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Test the unsubscription of channel
 | 
				
			||||||
 | 
						common.Must(c.Unsubscribe(b))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Test the last subscriber will close channel with `UnsubscribeClosableChannel`
 | 
				
			||||||
 | 
						common.Must(stats.UnsubscribeClosableChannel(c, a))
 | 
				
			||||||
 | 
						if c.Running() {
 | 
				
			||||||
 | 
							t.Fatal("unexpected running channel after unsubscribing the last subscriber")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestStatsChannelUnsubcribe(t *testing.T) {
 | 
					func TestStatsChannelUnsubcribe(t *testing.T) {
 | 
				
			||||||
	raw, err := common.CreateObject(context.Background(), &Config{})
 | 
						c := NewChannel(&ChannelConfig{})
 | 
				
			||||||
	common.Must(err)
 | 
						common.Must(c.Start())
 | 
				
			||||||
 | 
						defer c.Close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	m := raw.(stats.Manager)
 | 
						source := c.Channel()
 | 
				
			||||||
	c, err := m.RegisterChannel("test.channel")
 | 
					 | 
				
			||||||
	common.Must(err)
 | 
					 | 
				
			||||||
	common.Must(m.Start())
 | 
					 | 
				
			||||||
	defer m.Close()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	a := c.Subscribe()
 | 
						a, err := c.Subscribe()
 | 
				
			||||||
	b := c.Subscribe()
 | 
						common.Must(err)
 | 
				
			||||||
	defer c.Unsubscribe(a)
 | 
						defer c.Unsubscribe(a)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						b, err := c.Subscribe()
 | 
				
			||||||
 | 
						common.Must(err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	pauseCh := make(chan struct{})
 | 
						pauseCh := make(chan struct{})
 | 
				
			||||||
	stopCh := make(chan struct{})
 | 
						stopCh := make(chan struct{})
 | 
				
			||||||
	errCh := make(chan string)
 | 
						errCh := make(chan string)
 | 
				
			||||||
@ -119,10 +132,10 @@ func TestStatsChannelUnsubcribe(t *testing.T) {
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	go func() {
 | 
						go func() { // Blocking publish
 | 
				
			||||||
		c.Publish(1)
 | 
							source <- 1
 | 
				
			||||||
		<-pauseCh // Wait for `b` goroutine to resume sending message
 | 
							<-pauseCh // Wait for `b` goroutine to resume sending message
 | 
				
			||||||
		c.Publish(2)
 | 
							source <- 2
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	go func() {
 | 
						go func() {
 | 
				
			||||||
@ -179,26 +192,27 @@ func TestStatsChannelUnsubcribe(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestStatsChannelTimeout(t *testing.T) {
 | 
					func TestStatsChannelTimeout(t *testing.T) {
 | 
				
			||||||
	raw, err := common.CreateObject(context.Background(), &Config{})
 | 
						// Do not use buffer so as to create blocking scenario
 | 
				
			||||||
	common.Must(err)
 | 
						c := NewChannel(&ChannelConfig{BufferSize: 0, BroadcastTimeout: 50})
 | 
				
			||||||
 | 
						common.Must(c.Start())
 | 
				
			||||||
 | 
						defer c.Close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	m := raw.(stats.Manager)
 | 
						source := c.Channel()
 | 
				
			||||||
	c, err := m.RegisterChannel("test.channel")
 | 
					 | 
				
			||||||
	common.Must(err)
 | 
					 | 
				
			||||||
	common.Must(m.Start())
 | 
					 | 
				
			||||||
	defer m.Close()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	a := c.Subscribe()
 | 
						a, err := c.Subscribe()
 | 
				
			||||||
	b := c.Subscribe()
 | 
						common.Must(err)
 | 
				
			||||||
	defer c.Unsubscribe(a)
 | 
						defer c.Unsubscribe(a)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						b, err := c.Subscribe()
 | 
				
			||||||
 | 
						common.Must(err)
 | 
				
			||||||
	defer c.Unsubscribe(b)
 | 
						defer c.Unsubscribe(b)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	stopCh := make(chan struct{})
 | 
						stopCh := make(chan struct{})
 | 
				
			||||||
	errCh := make(chan string)
 | 
						errCh := make(chan string)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	go func() {
 | 
						go func() { // Blocking publish
 | 
				
			||||||
		c.Publish(1)
 | 
							source <- 1
 | 
				
			||||||
		c.Publish(2)
 | 
							source <- 2
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	go func() {
 | 
						go func() {
 | 
				
			||||||
@ -229,7 +243,7 @@ func TestStatsChannelTimeout(t *testing.T) {
 | 
				
			|||||||
			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
 | 
								errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		// Block `b` channel for a time longer than `source`'s timeout
 | 
							// Block `b` channel for a time longer than `source`'s timeout
 | 
				
			||||||
		<-time.After(150 * time.Millisecond)
 | 
							<-time.After(200 * time.Millisecond)
 | 
				
			||||||
		{ // Test `b` has been unsubscribed by source
 | 
							{ // Test `b` has been unsubscribed by source
 | 
				
			||||||
			var aSet, bSet bool
 | 
								var aSet, bSet bool
 | 
				
			||||||
			for _, s := range c.Subscribers() {
 | 
								for _, s := range c.Subscribers() {
 | 
				
			||||||
@ -264,25 +278,27 @@ func TestStatsChannelTimeout(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestStatsChannelConcurrency(t *testing.T) {
 | 
					func TestStatsChannelConcurrency(t *testing.T) {
 | 
				
			||||||
	raw, err := common.CreateObject(context.Background(), &Config{})
 | 
						// Do not use buffer so as to create blocking scenario
 | 
				
			||||||
	common.Must(err)
 | 
						c := NewChannel(&ChannelConfig{BufferSize: 0, BroadcastTimeout: 100})
 | 
				
			||||||
 | 
						common.Must(c.Start())
 | 
				
			||||||
 | 
						defer c.Close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	m := raw.(stats.Manager)
 | 
						source := c.Channel()
 | 
				
			||||||
	c, err := m.RegisterChannel("test.channel")
 | 
					 | 
				
			||||||
	common.Must(err)
 | 
					 | 
				
			||||||
	common.Must(m.Start())
 | 
					 | 
				
			||||||
	defer m.Close()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	a := c.Subscribe()
 | 
						a, err := c.Subscribe()
 | 
				
			||||||
	b := c.Subscribe()
 | 
						common.Must(err)
 | 
				
			||||||
	defer c.Unsubscribe(a)
 | 
						defer c.Unsubscribe(a)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						b, err := c.Subscribe()
 | 
				
			||||||
 | 
						common.Must(err)
 | 
				
			||||||
 | 
						defer c.Unsubscribe(b)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	stopCh := make(chan struct{})
 | 
						stopCh := make(chan struct{})
 | 
				
			||||||
	errCh := make(chan string)
 | 
						errCh := make(chan string)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	go func() {
 | 
						go func() { // Blocking publish
 | 
				
			||||||
		c.Publish(1)
 | 
							source <- 1
 | 
				
			||||||
		c.Publish(2)
 | 
							source <- 2
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	go func() {
 | 
						go func() {
 | 
				
			||||||
 | 
				
			|||||||
@ -1,15 +0,0 @@
 | 
				
			|||||||
// +build !confonly
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
package stats
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
import (
 | 
					 | 
				
			||||||
	"context"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	"v2ray.com/core/common"
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func init() {
 | 
					 | 
				
			||||||
	common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
 | 
					 | 
				
			||||||
		return NewManager(ctx, config.(*Config))
 | 
					 | 
				
			||||||
	}))
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
@ -63,18 +63,90 @@ func (*Config) Descriptor() ([]byte, []int) {
 | 
				
			|||||||
	return file_app_stats_config_proto_rawDescGZIP(), []int{0}
 | 
						return file_app_stats_config_proto_rawDescGZIP(), []int{0}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type ChannelConfig struct {
 | 
				
			||||||
 | 
						state         protoimpl.MessageState
 | 
				
			||||||
 | 
						sizeCache     protoimpl.SizeCache
 | 
				
			||||||
 | 
						unknownFields protoimpl.UnknownFields
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						SubscriberLimit  int32 `protobuf:"varint,1,opt,name=SubscriberLimit,proto3" json:"SubscriberLimit,omitempty"`
 | 
				
			||||||
 | 
						BufferSize       int32 `protobuf:"varint,2,opt,name=BufferSize,proto3" json:"BufferSize,omitempty"`
 | 
				
			||||||
 | 
						BroadcastTimeout int32 `protobuf:"varint,3,opt,name=BroadcastTimeout,proto3" json:"BroadcastTimeout,omitempty"`
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (x *ChannelConfig) Reset() {
 | 
				
			||||||
 | 
						*x = ChannelConfig{}
 | 
				
			||||||
 | 
						if protoimpl.UnsafeEnabled {
 | 
				
			||||||
 | 
							mi := &file_app_stats_config_proto_msgTypes[1]
 | 
				
			||||||
 | 
							ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 | 
				
			||||||
 | 
							ms.StoreMessageInfo(mi)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (x *ChannelConfig) String() string {
 | 
				
			||||||
 | 
						return protoimpl.X.MessageStringOf(x)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (*ChannelConfig) ProtoMessage() {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (x *ChannelConfig) ProtoReflect() protoreflect.Message {
 | 
				
			||||||
 | 
						mi := &file_app_stats_config_proto_msgTypes[1]
 | 
				
			||||||
 | 
						if protoimpl.UnsafeEnabled && x != nil {
 | 
				
			||||||
 | 
							ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 | 
				
			||||||
 | 
							if ms.LoadMessageInfo() == nil {
 | 
				
			||||||
 | 
								ms.StoreMessageInfo(mi)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return ms
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return mi.MessageOf(x)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Deprecated: Use ChannelConfig.ProtoReflect.Descriptor instead.
 | 
				
			||||||
 | 
					func (*ChannelConfig) Descriptor() ([]byte, []int) {
 | 
				
			||||||
 | 
						return file_app_stats_config_proto_rawDescGZIP(), []int{1}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (x *ChannelConfig) GetSubscriberLimit() int32 {
 | 
				
			||||||
 | 
						if x != nil {
 | 
				
			||||||
 | 
							return x.SubscriberLimit
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return 0
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (x *ChannelConfig) GetBufferSize() int32 {
 | 
				
			||||||
 | 
						if x != nil {
 | 
				
			||||||
 | 
							return x.BufferSize
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return 0
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (x *ChannelConfig) GetBroadcastTimeout() int32 {
 | 
				
			||||||
 | 
						if x != nil {
 | 
				
			||||||
 | 
							return x.BroadcastTimeout
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return 0
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var File_app_stats_config_proto protoreflect.FileDescriptor
 | 
					var File_app_stats_config_proto protoreflect.FileDescriptor
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var file_app_stats_config_proto_rawDesc = []byte{
 | 
					var file_app_stats_config_proto_rawDesc = []byte{
 | 
				
			||||||
	0x0a, 0x16, 0x61, 0x70, 0x70, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x73, 0x2f, 0x63, 0x6f, 0x6e, 0x66,
 | 
						0x0a, 0x16, 0x61, 0x70, 0x70, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x73, 0x2f, 0x63, 0x6f, 0x6e, 0x66,
 | 
				
			||||||
	0x69, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x14, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e,
 | 
						0x69, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x14, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e,
 | 
				
			||||||
	0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x73, 0x22, 0x08,
 | 
						0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x73, 0x22, 0x08,
 | 
				
			||||||
	0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x42, 0x4d, 0x0a, 0x18, 0x63, 0x6f, 0x6d, 0x2e,
 | 
						0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x22, 0x85, 0x01, 0x0a, 0x0d, 0x43, 0x68, 0x61,
 | 
				
			||||||
	0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x73,
 | 
						0x6e, 0x6e, 0x65, 0x6c, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x28, 0x0a, 0x0f, 0x53, 0x75,
 | 
				
			||||||
	0x74, 0x61, 0x74, 0x73, 0x50, 0x01, 0x5a, 0x18, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f,
 | 
						0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x01, 0x20,
 | 
				
			||||||
	0x6d, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x61, 0x70, 0x70, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x73,
 | 
						0x01, 0x28, 0x05, 0x52, 0x0f, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x4c,
 | 
				
			||||||
	0xaa, 0x02, 0x14, 0x56, 0x32, 0x52, 0x61, 0x79, 0x2e, 0x43, 0x6f, 0x72, 0x65, 0x2e, 0x41, 0x70,
 | 
						0x69, 0x6d, 0x69, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x42, 0x75, 0x66, 0x66, 0x65, 0x72, 0x53, 0x69,
 | 
				
			||||||
	0x70, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
 | 
						0x7a, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x42, 0x75, 0x66, 0x66, 0x65, 0x72,
 | 
				
			||||||
 | 
						0x53, 0x69, 0x7a, 0x65, 0x12, 0x2a, 0x0a, 0x10, 0x42, 0x72, 0x6f, 0x61, 0x64, 0x63, 0x61, 0x73,
 | 
				
			||||||
 | 
						0x74, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x10,
 | 
				
			||||||
 | 
						0x42, 0x72, 0x6f, 0x61, 0x64, 0x63, 0x61, 0x73, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74,
 | 
				
			||||||
 | 
						0x42, 0x4d, 0x0a, 0x18, 0x63, 0x6f, 0x6d, 0x2e, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f,
 | 
				
			||||||
 | 
						0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x73, 0x50, 0x01, 0x5a, 0x18,
 | 
				
			||||||
 | 
						0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x61,
 | 
				
			||||||
 | 
						0x70, 0x70, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x73, 0xaa, 0x02, 0x14, 0x56, 0x32, 0x52, 0x61, 0x79,
 | 
				
			||||||
 | 
						0x2e, 0x43, 0x6f, 0x72, 0x65, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x73, 0x62,
 | 
				
			||||||
 | 
						0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var (
 | 
					var (
 | 
				
			||||||
@ -89,9 +161,10 @@ func file_app_stats_config_proto_rawDescGZIP() []byte {
 | 
				
			|||||||
	return file_app_stats_config_proto_rawDescData
 | 
						return file_app_stats_config_proto_rawDescData
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var file_app_stats_config_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
 | 
					var file_app_stats_config_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
 | 
				
			||||||
var file_app_stats_config_proto_goTypes = []interface{}{
 | 
					var file_app_stats_config_proto_goTypes = []interface{}{
 | 
				
			||||||
	(*Config)(nil),        // 0: v2ray.core.app.stats.Config
 | 
						(*Config)(nil),        // 0: v2ray.core.app.stats.Config
 | 
				
			||||||
 | 
						(*ChannelConfig)(nil), // 1: v2ray.core.app.stats.ChannelConfig
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
var file_app_stats_config_proto_depIdxs = []int32{
 | 
					var file_app_stats_config_proto_depIdxs = []int32{
 | 
				
			||||||
	0, // [0:0] is the sub-list for method output_type
 | 
						0, // [0:0] is the sub-list for method output_type
 | 
				
			||||||
@ -119,6 +192,18 @@ func file_app_stats_config_proto_init() {
 | 
				
			|||||||
				return nil
 | 
									return nil
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							file_app_stats_config_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
 | 
				
			||||||
 | 
								switch v := v.(*ChannelConfig); i {
 | 
				
			||||||
 | 
								case 0:
 | 
				
			||||||
 | 
									return &v.state
 | 
				
			||||||
 | 
								case 1:
 | 
				
			||||||
 | 
									return &v.sizeCache
 | 
				
			||||||
 | 
								case 2:
 | 
				
			||||||
 | 
									return &v.unknownFields
 | 
				
			||||||
 | 
								default:
 | 
				
			||||||
 | 
									return nil
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	type x struct{}
 | 
						type x struct{}
 | 
				
			||||||
	out := protoimpl.TypeBuilder{
 | 
						out := protoimpl.TypeBuilder{
 | 
				
			||||||
@ -126,7 +211,7 @@ func file_app_stats_config_proto_init() {
 | 
				
			|||||||
			GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
 | 
								GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
 | 
				
			||||||
			RawDescriptor: file_app_stats_config_proto_rawDesc,
 | 
								RawDescriptor: file_app_stats_config_proto_rawDesc,
 | 
				
			||||||
			NumEnums:      0,
 | 
								NumEnums:      0,
 | 
				
			||||||
			NumMessages:   1,
 | 
								NumMessages:   2,
 | 
				
			||||||
			NumExtensions: 0,
 | 
								NumExtensions: 0,
 | 
				
			||||||
			NumServices:   0,
 | 
								NumServices:   0,
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
 | 
				
			|||||||
@ -9,3 +9,9 @@ option java_multiple_files = true;
 | 
				
			|||||||
message Config {
 | 
					message Config {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					message ChannelConfig {
 | 
				
			||||||
 | 
					    int32 SubscriberLimit = 1;
 | 
				
			||||||
 | 
					    int32 BufferSize = 2;
 | 
				
			||||||
 | 
					    int32 BroadcastTimeout = 3;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -8,6 +8,8 @@ import (
 | 
				
			|||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"v2ray.com/core/common"
 | 
				
			||||||
 | 
						"v2ray.com/core/common/errors"
 | 
				
			||||||
	"v2ray.com/core/features/stats"
 | 
						"v2ray.com/core/features/stats"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -92,10 +94,10 @@ func (m *Manager) RegisterChannel(name string) (stats.Channel, error) {
 | 
				
			|||||||
		return nil, newError("Channel ", name, " already registered.")
 | 
							return nil, newError("Channel ", name, " already registered.")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	newError("create new channel ", name).AtDebug().WriteToLog()
 | 
						newError("create new channel ", name).AtDebug().WriteToLog()
 | 
				
			||||||
	c := new(Channel)
 | 
						c := NewChannel(&ChannelConfig{BufferSize: 16, BroadcastTimeout: 100})
 | 
				
			||||||
	m.channels[name] = c
 | 
						m.channels[name] = c
 | 
				
			||||||
	if m.running {
 | 
						if m.running {
 | 
				
			||||||
		c.Start()
 | 
							return c, c.Start()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return c, nil
 | 
						return c, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@ -108,7 +110,7 @@ func (m *Manager) UnregisterChannel(name string) error {
 | 
				
			|||||||
	if c, found := m.channels[name]; found {
 | 
						if c, found := m.channels[name]; found {
 | 
				
			||||||
		newError("remove channel ", name).AtDebug().WriteToLog()
 | 
							newError("remove channel ", name).AtDebug().WriteToLog()
 | 
				
			||||||
		delete(m.channels, name)
 | 
							delete(m.channels, name)
 | 
				
			||||||
		c.Close()
 | 
							return c.Close()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@ -129,8 +131,14 @@ func (m *Manager) Start() error {
 | 
				
			|||||||
	m.access.Lock()
 | 
						m.access.Lock()
 | 
				
			||||||
	defer m.access.Unlock()
 | 
						defer m.access.Unlock()
 | 
				
			||||||
	m.running = true
 | 
						m.running = true
 | 
				
			||||||
 | 
						errs := []error{}
 | 
				
			||||||
	for _, channel := range m.channels {
 | 
						for _, channel := range m.channels {
 | 
				
			||||||
		channel.Start()
 | 
							if err := channel.Start(); err != nil {
 | 
				
			||||||
 | 
								errs = append(errs, err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(errs) != 0 {
 | 
				
			||||||
 | 
							return errors.Combine(errs...)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@ -140,10 +148,22 @@ func (m *Manager) Close() error {
 | 
				
			|||||||
	m.access.Lock()
 | 
						m.access.Lock()
 | 
				
			||||||
	defer m.access.Unlock()
 | 
						defer m.access.Unlock()
 | 
				
			||||||
	m.running = false
 | 
						m.running = false
 | 
				
			||||||
 | 
						errs := []error{}
 | 
				
			||||||
	for name, channel := range m.channels {
 | 
						for name, channel := range m.channels {
 | 
				
			||||||
		newError("remove channel ", name).AtDebug().WriteToLog()
 | 
							newError("remove channel ", name).AtDebug().WriteToLog()
 | 
				
			||||||
		delete(m.channels, name)
 | 
							delete(m.channels, name)
 | 
				
			||||||
		channel.Close()
 | 
							if err := channel.Close(); err != nil {
 | 
				
			||||||
 | 
								errs = append(errs, err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(errs) != 0 {
 | 
				
			||||||
 | 
							return errors.Combine(errs...)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func init() {
 | 
				
			||||||
 | 
						common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
 | 
				
			||||||
 | 
							return NewManager(ctx, config.(*Config))
 | 
				
			||||||
 | 
						}))
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -42,7 +42,8 @@ func TestStatsChannelRunnable(t *testing.T) {
 | 
				
			|||||||
		t.Fatalf("unexpected non-running channel: test.channel.%d", 2)
 | 
							t.Fatalf("unexpected non-running channel: test.channel.%d", 2)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	s1 := c1.Subscribe()
 | 
						s1, err := c1.Subscribe()
 | 
				
			||||||
 | 
						common.Must(err)
 | 
				
			||||||
	common.Must(c1.Close())
 | 
						common.Must(c1.Close())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if c1.Running() {
 | 
						if c1.Running() {
 | 
				
			||||||
 | 
				
			|||||||
@ -30,9 +30,30 @@ type Channel interface {
 | 
				
			|||||||
	// SubscriberCount returns the number of the subscribers.
 | 
						// SubscriberCount returns the number of the subscribers.
 | 
				
			||||||
	Subscribers() []chan interface{}
 | 
						Subscribers() []chan interface{}
 | 
				
			||||||
	// Subscribe registers for listening to channel stream and returns a new listener channel.
 | 
						// Subscribe registers for listening to channel stream and returns a new listener channel.
 | 
				
			||||||
	Subscribe() chan interface{}
 | 
						Subscribe() (chan interface{}, error)
 | 
				
			||||||
	// Unsubscribe unregisters a listener channel from current Channel object.
 | 
						// Unsubscribe unregisters a listener channel from current Channel object.
 | 
				
			||||||
	Unsubscribe(chan interface{})
 | 
						Unsubscribe(chan interface{}) error
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// SubscribeRunnableChannel subscribes the channel and starts it if there is first subscriber coming.
 | 
				
			||||||
 | 
					func SubscribeRunnableChannel(c Channel) (chan interface{}, error) {
 | 
				
			||||||
 | 
						if len(c.Subscribers()) == 0 {
 | 
				
			||||||
 | 
							if err := c.Start(); err != nil {
 | 
				
			||||||
 | 
								return nil, err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return c.Subscribe()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// UnsubscribeClosableChannel unsubcribes the channel and close it if there is no more subscriber.
 | 
				
			||||||
 | 
					func UnsubscribeClosableChannel(c Channel, sub chan interface{}) error {
 | 
				
			||||||
 | 
						if err := c.Unsubscribe(sub); err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(c.Subscribers()) == 0 {
 | 
				
			||||||
 | 
							return c.Close()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Manager is the interface for stats manager.
 | 
					// Manager is the interface for stats manager.
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user