Skip to content

Commit

Permalink
map based subscriber list
Browse files Browse the repository at this point in the history
  • Loading branch information
DarienRaymond committed Jul 1, 2018
1 parent 4c18b61 commit b23b218
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 30 deletions.
64 changes: 34 additions & 30 deletions common/signal/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"time"

"v2ray.com/core/common"
"v2ray.com/core/common/signal/done"
"v2ray.com/core/common/task"
)

type Subscriber struct {
name string
buffer chan interface{}
removed chan struct{}
buffer chan interface{}
done *done.Instance
}

func (s *Subscriber) push(msg interface{}) {
Expand All @@ -25,62 +25,66 @@ func (s *Subscriber) Wait() <-chan interface{} {
return s.buffer
}

func (s *Subscriber) Close() {
close(s.removed)
func (s *Subscriber) Close() error {
return s.done.Close()
}

func (s *Subscriber) IsClosed() bool {
select {
case <-s.removed:
return true
default:
return false
}
return s.done.Done()
}

type Service struct {
sync.RWMutex
subs []*Subscriber
subs map[string][]*Subscriber
ctask *task.Periodic
}

func NewService() *Service {
s := &Service{}
s := &Service{
subs: make(map[string][]*Subscriber),
}
s.ctask = &task.Periodic{
Execute: s.cleanup,
Execute: s.Cleanup,
Interval: time.Second * 30,
}
common.Must(s.ctask.Start())
return s
}

func (s *Service) cleanup() error {
// Cleanup cleans up internal caches of subscribers.
// Visible for testing only.
func (s *Service) Cleanup() error {
s.Lock()
defer s.Unlock()

if len(s.subs) < 16 {
return nil
}

newSub := make([]*Subscriber, 0, len(s.subs))
for _, sub := range s.subs {
if !sub.IsClosed() {
newSub = append(newSub, sub)
for name, subs := range s.subs {
newSub := make([]*Subscriber, 0, len(s.subs))
for _, sub := range subs {
if !sub.IsClosed() {
newSub = append(newSub, sub)
}
}
if len(newSub) == 0 {
delete(s.subs, name)
} else {
s.subs[name] = newSub
}
}

s.subs = newSub
if len(s.subs) == 0 {
s.subs = make(map[string][]*Subscriber)
}
return nil
}

func (s *Service) Subscribe(name string) *Subscriber {
sub := &Subscriber{
name: name,
buffer: make(chan interface{}, 16),
removed: make(chan struct{}),
buffer: make(chan interface{}, 16),
done: done.New(),
}
s.Lock()
s.subs = append(s.subs, sub)
subs := append(s.subs[name], sub)
s.subs[name] = subs
s.Unlock()
return sub
}
Expand All @@ -89,8 +93,8 @@ func (s *Service) Publish(name string, message interface{}) {
s.RLock()
defer s.RUnlock()

for _, sub := range s.subs {
if sub.name == name && !sub.IsClosed() {
for _, sub := range s.subs[name] {
if !sub.IsClosed() {
sub.push(message)
}
}
Expand Down
2 changes: 2 additions & 0 deletions common/signal/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ func TestPubsub(t *testing.T) {
t.Fail()
default:
}

service.Cleanup()
}

0 comments on commit b23b218

Please sign in to comment.