Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(event): Added event dispatcher #82

Merged
merged 1 commit into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions event/dispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package event

import (
"sync"

"github.com/go-kratos/kratos/v2/log"

"github.com/go-kratos-ecosystem/components/v2/feature"
)

type Event string

type Listener interface {
Listen() []Event
Handle(event Event, data interface{})
}

type Dispatcher struct {
listeners map[Event][]Listener
rw sync.RWMutex

recovery func(err interface{}, listener Listener, event Event, data interface{})
}

type Option func(*Dispatcher)

func WithRecovery(handler func(err interface{}, listener Listener, event Event, data interface{})) Option {
return func(d *Dispatcher) {
if handler != nil {
d.recovery = handler
}
}
}

var DefaultRecovery = func(err interface{}, event Event, data interface{}) {
log.Errorf("[Event] handler panic event: %s, data: %v, err: %v", event, data, err)
}

func NewDispatcher(opts ...Option) *Dispatcher {
d := &Dispatcher{
listeners: make(map[Event][]Listener),
}

for _, opt := range opts {
opt(d)
}

return d
}

func (d *Dispatcher) AddListener(listener ...Listener) {
d.rw.Lock()
defer d.rw.Unlock()

for _, l := range listener {
for _, event := range l.Listen() {
if _, ok := d.listeners[event]; !ok {
d.listeners[event] = make([]Listener, 0)
}

d.listeners[event] = append(d.listeners[event], l)
}
}
}

func (d *Dispatcher) Dispatch(event Event, data interface{}) {
d.rw.RLock()
defer d.rw.RUnlock()

if listeners, ok := d.listeners[event]; ok {
for _, listener := range listeners {
// if support Asyncable
if l, ok := listener.(feature.Asyncable); ok && l.Async() {
go d.handle(listener, event, data)
} else {
d.handle(listener, event, data)
}
}
}
}

func (d *Dispatcher) handle(listener Listener, event Event, data interface{}) {
if d.recovery != nil {
defer func() {
if err := recover(); err != nil {
d.recovery(err, listener, event, data)
}
}()
}

listener.Handle(event, data)
}
97 changes: 97 additions & 0 deletions event/dispatcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package event

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/go-kratos-ecosystem/components/v2/feature"
)

type result struct {
event Event
data interface{}
err interface{}
}

var recv = make(chan result, 1)

type testListener struct {
feature.AsyncFeature
}

func newTestListener() *testListener {
return &testListener{}
}

func (l *testListener) Listen() []Event {
return []Event{
"test",
"test2",
}
}

func (l *testListener) Handle(event Event, data interface{}) {
if s, ok := data.(string); ok {
recv <- result{
event: event,
data: s,
}
} else {
panic("invalid data")
}
}

type test2Listener struct{}

func (l *test2Listener) Listen() []Event {
return []Event{
"test3",
}
}

func (l *test2Listener) Handle(event Event, data interface{}) {
recv <- result{
event: event,
data: data,
}
}

func TestDispatcher(t *testing.T) {
var (
d = NewDispatcher(
WithRecovery(func(err interface{}, listener Listener, event Event, data interface{}) {
recv <- result{
event: event,
data: data,
err: err,
}
}),
)
l = newTestListener()
)

d.AddListener(l, &test2Listener{})
assert.True(t, l.Async())

d.Dispatch("test", "test data")
r1 := <-recv
assert.Equal(t, Event("test"), r1.event)
assert.Equal(t, "test data", r1.data)

d.Dispatch("test2", "test2 data")
r2 := <-recv
assert.Equal(t, Event("test2"), r2.event)
assert.Equal(t, "test2 data", r2.data)

d.Dispatch("test", 111)
r3 := <-recv
assert.Equal(t, Event("test"), r3.event)
assert.Equal(t, 111, r3.data)
assert.Equal(t, "invalid data", r3.err)

d.Dispatch("test3", "test3 data")
r4 := <-recv
assert.Equal(t, Event("test3"), r4.event)
assert.Equal(t, "test3 data", r4.data)
}
11 changes: 11 additions & 0 deletions feature/async.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package feature

type Asyncable interface {
Async() bool
}

type AsyncFeature struct{}

func (*AsyncFeature) Async() bool {
return true
}
Loading