Skip to content

Commit

Permalink
Merge pull request #21 from DarthPestilane/refactor/non-block-handling
Browse files Browse the repository at this point in the history
refactor: non-block handling
  • Loading branch information
DarthPestilane authored Oct 26, 2021
2 parents f0cc85b + 2d69d54 commit fb167fc
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 24 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ lint-fix:

.PHONY: test
test:
CGO_ENABLED=1 go test -race -count=1 -covermode=atomic -coverprofile=.testCoverage.txt -timeout=2m . ./message
go test -count=1 -covermode=atomic -coverprofile=.testCoverage.txt -timeout=2m . ./message

.PHONY: test-v
test-v:
CGO_ENABLED=1 go test -race -count=1 -covermode=atomic -coverprofile=.testCoverage.txt -timeout=2m -v . ./message
go test -count=1 -covermode=atomic -coverprofile=.testCoverage.txt -timeout=2m -v . ./message

.PHONY: cover-view
cover-view:
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ go test -bench=. -run=none -benchmem -benchtime=250000x
goos: darwin
goarch: amd64
pkg: github.com/DarthPestilane/easytcp
Benchmark_NoHandler-8 250000 4479 ns/op 86 B/op 2 allocs/op
Benchmark_OneHandler-8 250000 4377 ns/op 85 B/op 2 allocs/op
Benchmark_OneHandlerCtxGetSet-8 250000 4076 ns/op 363 B/op 4 allocs/op
Benchmark_OneHandlerMessageGetSet-8 250000 4202 ns/op 396 B/op 5 allocs/op
Benchmark_NoHandler-8 250000 5330 ns/op 95 B/op 2 allocs/op
Benchmark_OneHandler-8 250000 5495 ns/op 96 B/op 2 allocs/op
Benchmark_OneHandlerCtxGetSet-8 250000 5120 ns/op 433 B/op 5 allocs/op
Benchmark_OneHandlerMessageGetSet-8 250000 5329 ns/op 466 B/op 6 allocs/op
PASS
ok github.com/DarthPestilane/easytcp 4.609s
ok github.com/DarthPestilane/easytcp 5.427s
```

*since easytcp is built on the top of golang `net` library, the benchmark of networks does not make much sense.*
Expand Down
27 changes: 16 additions & 11 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ func (s *Session) Close() {
// The loop breaks if errors occurred or the session is closed.
func (s *Session) readInbound(router *Router, timeout time.Duration) {
for {
select {
case <-s.closed:
return
default:
}
if timeout > 0 {
if err := s.conn.SetReadDeadline(time.Now().Add(timeout)); err != nil {
Log.Errorf("session %s set read deadline err: %s", s.id, err)
Expand All @@ -87,17 +92,17 @@ func (s *Session) readInbound(router *Router, timeout time.Duration) {
continue
}

ctx := s.ctxPool.Get().(*Context)
ctx.reset(s, reqEntry)

if err := router.handleRequest(ctx); err != nil {
Log.Errorf("handle request err: %s", err)
}

if err := s.SendResp(ctx); err != nil {
Log.Errorf("send resp context err: %s", err)
break
}
// don't block the loop.
go func() {
ctx := s.ctxPool.Get().(*Context)
ctx.reset(s, reqEntry)
if err := router.handleRequest(ctx); err != nil {
Log.Errorf("handle request err: %s", err)
}
if err := s.SendResp(ctx); err != nil {
Log.Errorf("send resp context err: %s", err)
}
}()
}
Log.Tracef("session %s readInbound exit because of error", s.id)
s.Close()
Expand Down
9 changes: 3 additions & 6 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,15 @@ func TestTCPSession_readInbound(t *testing.T) {
defer ctrl.Finish()

packer := mock.NewMockPacker(ctrl)
packer.EXPECT().Unpack(gomock.Any()).Return(&message.Entry{ID: 1, Data: []byte("test")}, nil)
packer.EXPECT().Unpack(gomock.Any()).AnyTimes().Return(&message.Entry{ID: 1, Data: []byte("test")}, nil)

r := newRouter()
r.register(1, func(ctx *Context) error {
ctx.session.Close()
return fmt.Errorf("route error")
})

sess := newSession(nil, &SessionOption{Packer: packer, respQueueSize: 10})
sess.Close()
loopDone := make(chan struct{})
go func() {
sess.readInbound(r, 0)
Expand All @@ -138,15 +138,12 @@ func TestTCPSession_readInbound(t *testing.T) {
}
})

codec := mock.NewMockCodec(ctrl)
codec.EXPECT().Encode(gomock.Any()).Return([]byte("encode ok"), nil)

r := newRouter()
r.register(1, func(ctx *Context) error {
return ctx.Response(2, []byte("ok"))
})

sess := newSession(nil, &SessionOption{Packer: packer, Codec: codec, respQueueSize: 10})
sess := newSession(nil, &SessionOption{Packer: packer, Codec: nil, respQueueSize: 10})
readDone := make(chan struct{})
go func() {
sess.readInbound(r, 0)
Expand Down

0 comments on commit fb167fc

Please sign in to comment.