Skip to content

Commit

Permalink
feat(event): Added event dispatcher (#82)
Browse files Browse the repository at this point in the history
Signed-off-by: Flc゛ <[email protected]>
  • Loading branch information
flc1125 authored Jan 30, 2024
1 parent ad0f9e0 commit 2ff07fd
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 0 deletions.
92 changes: 92 additions & 0 deletions event/dispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package event

import (
"sync"

"github.com/go-kratos/kratos/v2/log"

"github.com/go-kratos-ecosystem/components/v2/feature"
)

type Event string

type Listener interface {
Listen() []Event
Handle(event Event, data interface{})
}

type Dispatcher struct {
listeners map[Event][]Listener
rw sync.RWMutex

recovery func(err interface{}, listener Listener, event Event, data interface{})
}

type Option func(*Dispatcher)

func WithRecovery(handler func(err interface{}, listener Listener, event Event, data interface{})) Option {
return func(d *Dispatcher) {
if handler != nil {
d.recovery = handler
}
}
}

var DefaultRecovery = func(err interface{}, event Event, data interface{}) {
log.Errorf("[Event] handler panic event: %s, data: %v, err: %v", event, data, err)
}

func NewDispatcher(opts ...Option) *Dispatcher {
d := &Dispatcher{
listeners: make(map[Event][]Listener),
}

for _, opt := range opts {
opt(d)
}

return d
}

func (d *Dispatcher) AddListener(listener ...Listener) {
d.rw.Lock()
defer d.rw.Unlock()

for _, l := range listener {
for _, event := range l.Listen() {
if _, ok := d.listeners[event]; !ok {
d.listeners[event] = make([]Listener, 0)
}

d.listeners[event] = append(d.listeners[event], l)
}
}
}

func (d *Dispatcher) Dispatch(event Event, data interface{}) {
d.rw.RLock()
defer d.rw.RUnlock()

if listeners, ok := d.listeners[event]; ok {
for _, listener := range listeners {
// if support Asyncable
if l, ok := listener.(feature.Asyncable); ok && l.Async() {
go d.handle(listener, event, data)
} else {
d.handle(listener, event, data)
}
}
}
}

func (d *Dispatcher) handle(listener Listener, event Event, data interface{}) {
if d.recovery != nil {
defer func() {
if err := recover(); err != nil {
d.recovery(err, listener, event, data)
}
}()
}

listener.Handle(event, data)
}
97 changes: 97 additions & 0 deletions event/dispatcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package event

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/go-kratos-ecosystem/components/v2/feature"
)

type result struct {
event Event
data interface{}
err interface{}
}

var recv = make(chan result, 1)

type testListener struct {
feature.AsyncFeature
}

func newTestListener() *testListener {
return &testListener{}
}

func (l *testListener) Listen() []Event {
return []Event{
"test",
"test2",
}
}

func (l *testListener) Handle(event Event, data interface{}) {
if s, ok := data.(string); ok {
recv <- result{
event: event,
data: s,
}
} else {
panic("invalid data")
}
}

type test2Listener struct{}

func (l *test2Listener) Listen() []Event {
return []Event{
"test3",
}
}

func (l *test2Listener) Handle(event Event, data interface{}) {
recv <- result{
event: event,
data: data,
}
}

func TestDispatcher(t *testing.T) {
var (
d = NewDispatcher(
WithRecovery(func(err interface{}, listener Listener, event Event, data interface{}) {
recv <- result{
event: event,
data: data,
err: err,
}
}),
)
l = newTestListener()
)

d.AddListener(l, &test2Listener{})
assert.True(t, l.Async())

d.Dispatch("test", "test data")
r1 := <-recv
assert.Equal(t, Event("test"), r1.event)
assert.Equal(t, "test data", r1.data)

d.Dispatch("test2", "test2 data")
r2 := <-recv
assert.Equal(t, Event("test2"), r2.event)
assert.Equal(t, "test2 data", r2.data)

d.Dispatch("test", 111)
r3 := <-recv
assert.Equal(t, Event("test"), r3.event)
assert.Equal(t, 111, r3.data)
assert.Equal(t, "invalid data", r3.err)

d.Dispatch("test3", "test3 data")
r4 := <-recv
assert.Equal(t, Event("test3"), r4.event)
assert.Equal(t, "test3 data", r4.data)
}
11 changes: 11 additions & 0 deletions feature/async.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package feature

type Asyncable interface {
Async() bool
}

type AsyncFeature struct{}

func (*AsyncFeature) Async() bool {
return true
}

0 comments on commit 2ff07fd

Please sign in to comment.