Skip to content

Commit 78c6ede

Browse files
committed
fix queue handler
1 parent 187da7f commit 78c6ede

File tree

4 files changed

+74
-36
lines changed

4 files changed

+74
-36
lines changed

concurrency/barrier.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,8 @@ func (b *Barrier) Done() {
2828
if b == nil {
2929
return
3030
}
31-
b.c <- struct{}{}
31+
select {
32+
case b.c <- struct{}{}:
33+
default:
34+
}
3235
}

queue/queue.go

+59-32
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package queue
22

33
import (
4+
"errors"
45
"fmt"
56
"sync"
67
"time"
78

9+
"github.com/leopoldxx/go-utils/concurrency"
810
"github.com/leopoldxx/go-utils/trace"
911

1012
"context"
@@ -16,22 +18,35 @@ const (
1618
)
1719

1820
// HandlerWrap function for Handler interface
19-
func HandlerWrap(f func(ctx context.Context, data []byte) error) *HandlerWrapper {
20-
return &HandlerWrapper{f}
21+
func HandlerWrap(name string, f func(ctx context.Context, data []byte) error) *HandlerWrapper {
22+
return &HandlerWrapper{name, f}
2123
}
2224

2325
// HandlerWrapper for Handler
2426
type HandlerWrapper struct {
25-
Impl func(ctx context.Context, data []byte) error
27+
NameValue string
28+
Impl func(ctx context.Context, data []byte) error
2629
}
2730

2831
// Handle of hw
2932
func (hw *HandlerWrapper) Handle(ctx context.Context, data []byte) error {
33+
if hw == nil {
34+
return errors.New("nil handler")
35+
}
3036
return hw.Impl(ctx, data)
3137
}
3238

39+
// Name of the handler
40+
func (hw *HandlerWrapper) Name() string {
41+
if hw == nil {
42+
return "<nil>"
43+
}
44+
return hw.NameValue
45+
}
46+
3347
// Handler of MsgQue
3448
type Handler interface {
49+
Name() string
3550
Handle(ctx context.Context, data []byte) error
3651
}
3752

@@ -76,40 +91,51 @@ func (mq *MsgQueue) Stop() {
7691

7792
// Run the background processor
7893
func (mq *MsgQueue) Run() {
94+
handleBarrier := concurrency.NewBarrier(100)
7995
handle := func(mq *MsgQueue, mb *msgBody) {
80-
var wg sync.WaitGroup
8196
mq.mu.Lock()
82-
if hs, ok := mq.handlers[mb.topic]; ok {
83-
ctx, cancel := context.WithTimeout(mq.stopCtx, HandleTimeout)
97+
hs, ok := mq.handlers[mb.topic]
98+
tmphs := make([]Handler, 0, len(hs))
99+
if ok {
84100
for h := range hs {
85-
wg.Add(1)
86-
go func(h Handler, ctx context.Context, body []byte) {
87-
tracer := trace.GetTraceFromContext(ctx)
88-
defer wg.Done()
89-
90-
tmpCh := make(chan error, 1)
91-
defer close(tmpCh)
92-
go func() {
93-
defer func() {
94-
if r := recover(); r != nil {
95-
tracer.Errorf("panic: %v\n", r)
96-
}
97-
}()
98-
tmpCh <- h.Handle(ctx, body)
99-
}()
101+
tmphs = append(tmphs, h)
102+
}
103+
}
104+
mq.mu.Unlock()
105+
defer handleBarrier.Done()
100106

101-
select {
102-
case <-tmpCh:
103-
case <-ctx.Done():
107+
var wg sync.WaitGroup
108+
ctx, cancel := context.WithTimeout(mq.stopCtx, HandleTimeout)
109+
defer cancel()
110+
for h := range tmphs {
111+
wg.Add(1)
112+
go func(hd Handler, ctx context.Context, body []byte) {
113+
ctx = trace.WithTraceForContext(ctx, hd.Name())
114+
tracer := trace.GetTraceFromContext(ctx)
115+
defer func() {
116+
if r := recover(); r != nil {
117+
tracer.Errorf("handler %s panic: %v\n", hd.Name(), r)
104118
}
105-
}(h, ctx, mb.body)
106-
}
107-
mq.mu.Unlock()
108-
wg.Wait()
109-
cancel()
110-
} else {
111-
mq.mu.Unlock()
119+
}()
120+
defer wg.Done()
121+
122+
tmpCh := make(chan error, 1)
123+
go func() {
124+
defer func() {
125+
if r := recover(); r != nil {
126+
tracer.Errorf("handler %s panic: %v\n", hd.Name(), r)
127+
}
128+
}()
129+
tmpCh <- hd.Handle(ctx, body)
130+
}()
131+
132+
select {
133+
case <-tmpCh:
134+
case <-ctx.Done():
135+
}
136+
}(tmphs[h], ctx, mb.body)
112137
}
138+
wg.Wait()
113139
}
114140

115141
for {
@@ -118,7 +144,8 @@ func (mq *MsgQueue) Run() {
118144
close(mq.data)
119145
return
120146
case msg := <-mq.data:
121-
handle(mq, &msg)
147+
handleBarrier.Advance()
148+
go handle(mq, &msg)
122149
}
123150
}
124151
}

queue/queue_test.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ func (fh *fakeHandle) NeedWrapHandle(ctx context.Context, data []byte) error {
2525
return nil
2626
}
2727

28+
func (fh *fakeHandle) Name() string {
29+
return fh.name
30+
}
31+
2832
func (fh *fakeHandle) Handle(ctx context.Context, data []byte) error {
2933
fh.Lock()
3034
defer fh.Unlock()
@@ -41,8 +45,8 @@ func TestSubPub(t *testing.T) {
4145
fh1 := &fakeHandle{name: "fh1", t: t}
4246
fh2 := &fakeHandle{name: "fh2", t: t}
4347

44-
unsub1, _ := que.Sub("topic-all", &HandlerWrapper{fh1.NeedWrapHandle})
45-
unsub2, _ := que.Sub("topic-all", HandlerWrap(fh2.NeedWrapHandle))
48+
unsub1, _ := que.Sub("topic-all", &HandlerWrapper{"fake-handle", fh1.NeedWrapHandle})
49+
unsub2, _ := que.Sub("topic-all", HandlerWrap("fake-handle2", fh2.NeedWrapHandle))
4650
unsub3, _ := que.Sub("topic-add", fh1)
4751

4852
checkNum := func(n1, n2, n3 int) {
@@ -67,6 +71,8 @@ func TestSubPub(t *testing.T) {
6771
}
6872
}
6973

74+
checkNum(2, 2, 1)
75+
7076
que.Pub("topic-all", []byte("msg1"))
7177
time.Sleep(10 * time.Millisecond)
7278
checkCount(fh1, 1)

trace/trace_handler.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@ import (
1919
"net/http"
2020
)
2121

22+
type key int
23+
2224
const (
23-
tracerLogHandlerID = "tracer-handler-id-757b345cf9312183e788faaee990d349"
25+
tracerLogHandlerID key = 32702 // random key
2426
)
2527

2628
// Handler wrap a trace handler outer the original http.Handler

0 commit comments

Comments
 (0)