From 5756aea09b9a2c9de54050d88cfd4bf746038713 Mon Sep 17 00:00:00 2001 From: liukun Date: Sat, 30 Jan 2021 18:49:00 +0800 Subject: [PATCH] Add detach operation. --- aio_generic.go | 4 + watcher.go | 42 +++++++-- watcher_detach_test.go | 199 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 238 insertions(+), 7 deletions(-) create mode 100644 watcher_detach_test.go diff --git a/aio_generic.go b/aio_generic.go index d7f4857..7af84f4 100644 --- a/aio_generic.go +++ b/aio_generic.go @@ -31,6 +31,8 @@ var ( ErrDeadline = errors.New("operation exceeded deadline") // ErrEmptyBuffer means the buffer is nil ErrEmptyBuffer = errors.New("empty buffer") + // ErrInvalidDetachResult means the result cannot be used + ErrInvalidDetachResult = errors.New("invalid detach result") ) var ( @@ -45,6 +47,8 @@ const ( OpRead OpType = iota // OpWrite means the aiocb is a write operation OpWrite + // OpDetach means the aiocb is a detach operation + OpDetach // internal operation to delete an related resource opDelete ) diff --git a/watcher.go b/watcher.go index 14bef6d..7951197 100644 --- a/watcher.go +++ b/watcher.go @@ -11,6 +11,7 @@ import ( "container/list" "io" "net" + "os" "reflect" "runtime" "sync" @@ -212,11 +213,32 @@ func (w *watcher) WriteTimeout(ctx interface{}, conn net.Conn, buf []byte, deadl } // Free let the watcher to release resources related to this conn immediately, -// like socket file descriptors. +// like socket file descriptors. Will close the conn. func (w *watcher) Free(conn net.Conn) error { return w.aioCreate(nil, opDelete, conn, nil, zeroTime, false) } +// Detach let the watcher to release resources related to this conn immediately, +// but keeps the conn and return its duplicated file descriptor in OpResult.Size. +func (w *watcher) Detach(ctx interface{}, conn net.Conn) error { + return w.aioCreate(ctx, OpDetach, conn, nil, zeroTime, false) +} + +// RecoverConnFromDetachResult reassemble the file descriptor stored in +// `OpResult.Size` of a `Detach` result to a `net.Conn`. +func RecoverConnFromDetachResult(r OpResult) (net.Conn, error) { + if r.Operation != OpDetach || r.Error != nil || r.Size == 0 { + return nil, ErrInvalidDetachResult + } + f := os.NewFile(uintptr(r.Size), "") + if f == nil { + return nil, ErrInvalidDetachResult + } + conn, err := net.FileConn(f) + _ = f.Close() + return conn, err +} + // core async-io creation func (w *watcher) aioCreate(ctx interface{}, op OpType, conn net.Conn, buf []byte, deadline time.Time, readfull bool) error { select { @@ -336,7 +358,7 @@ func (w *watcher) tryWrite(fd int, pcb *aiocb) bool { } // release connection related resources -func (w *watcher) releaseConn(ident int) { +func (w *watcher) releaseConn(ident int, close bool) { if desc, ok := w.descs[ident]; ok { // delete from heap for e := desc.readers.Front(); e != nil; e = e.Next() { @@ -356,7 +378,9 @@ func (w *watcher) releaseConn(ident int) { delete(w.descs, ident) delete(w.connIdents, desc.ptr) // close socket file descriptor duplicated from net.Conn - syscall.Close(ident) + if close { + _ = syscall.Close(ident) + } } } @@ -393,7 +417,7 @@ func (w *watcher) loop() { // defer function to release all resources defer func() { for ident := range w.descs { - w.releaseConn(ident) + w.releaseConn(ident, true) } }() @@ -439,7 +463,7 @@ func (w *watcher) loop() { if ident, ok := w.connIdents[ptr]; ok { // since it's gc-ed, queue is impossible to hold net.Conn // we don't have to send to chIOCompletion,just release here - w.releaseConn(ident) + w.releaseConn(ident, true) } w.gc[i] = nil } @@ -457,8 +481,12 @@ func (w *watcher) handlePending(pending []*aiocb) { for _, pcb := range pending { ident, ok := w.connIdents[pcb.ptr] // resource releasing operation - if pcb.op == opDelete && ok { - w.releaseConn(ident) + if (pcb.op == opDelete || pcb.op == OpDetach) && ok { + w.releaseConn(ident, pcb.op == opDelete) + if pcb.op == OpDetach { + pcb.size = ident + w.deliver(pcb) + } continue } diff --git a/watcher_detach_test.go b/watcher_detach_test.go new file mode 100644 index 0000000..8e0f965 --- /dev/null +++ b/watcher_detach_test.go @@ -0,0 +1,199 @@ +package gaio_test + +import ( + "math/rand" + "net" + "testing" + + "github.com/xtaci/gaio" +) + +type detachCtx struct { + id int + cmd [2]byte + times int + conn net.Conn +} + +func newDetachCtx() *detachCtx { + return &detachCtx{ + cmd: [2]byte{'0', '0'}, + } +} + +// detachServer responses to character cmd received from connection, +// prints stored cmd or detach the connection and re-watch. +func detachServer(t testing.TB) net.Listener { + addr, _ := net.ResolveTCPAddr("tcp", "localhost:0") + ln, err := net.ListenTCP("tcp", addr) + if err != nil { + t.Fatal(err) + } + + w, err := gaio.NewWatcher() + if err != nil { + t.Fatal(err) + } + + go func() { + for { + results, err := w.WaitIO() + if err != nil { + return + } + + for _, res := range results { + switch res.Operation { + case gaio.OpRead: + if res.Error != nil { + t.Log(res.Error) + _ = w.Free(res.Conn) + continue + } + ctx := res.Context.(*detachCtx) + if res.Size > 0 { + switch res.Buffer[0] { + case 'p': // print current cmd + _ = w.Write(ctx, res.Conn, ctx.cmd[:1]) + case 'x': // exit + continue + default: // remember the cmd and detach + ctx.cmd[0] = res.Buffer[0] + _ = w.Detach(ctx, res.Conn) + } + } + case gaio.OpWrite: + if res.Error != nil { + t.Fatal(res.Error) + } + if res.Size > 0 { + // write complete, start read again + _ = w.Read(res.Context, res.Conn, nil) + } + case gaio.OpDetach: + if res.Error != nil { + t.Fatal(res.Error) + } + ctx := res.Context.(*detachCtx) + // conn in detach result should be the old one + if res.Conn != ctx.conn { + t.Error("conn changed", res.Conn, ctx.conn) + } + conn, err := gaio.RecoverConnFromDetachResult(res) + if err != nil { + t.Fatal(err) + } + ctx.conn = conn + // join the watcher again + _ = w.Write(ctx, conn, ctx.cmd[:1]) + } + } + } + }() + + go func() { + for { + conn, err := ln.AcceptTCP() + if err != nil { + _ = w.Close() + return + } + ctx := newDetachCtx() + ctx.conn = conn + err = w.Read(ctx, conn, nil) + if err != nil { + t.Fatal(err) + } + } + }() + return ln +} + +func testDetachParallel(t *testing.T, par int, times int) { + t.Log("testing concurrent:", par, "connections") + ln := detachServer(t) + defer func() { _ = ln.Close() }() + + w, err := gaio.NewWatcher() + if err != nil { + t.Fatal(err) + } + defer func() { _ = w.Close() }() + + go func() { + for i := 0; i < par; i++ { + conn, err := net.Dial("tcp", ln.Addr().String()) + if err != nil { + t.Fatal(err) + } + ctx := newDetachCtx() + ctx.id = i + ctx.cmd[0] = 'p' // print init cmd + err = w.Write(ctx, conn, ctx.cmd[:1]) + if err != nil { + t.Fatal(err) + } + } + }() + + done := 0 + for done < par { + results, err := w.WaitIO() + if err != nil { + t.Log(err) + return + } + + for _, res := range results { + switch res.Operation { + case gaio.OpWrite: + // recv + if res.Error != nil { + t.Error(res.Error) + } + ctx := res.Context.(*detachCtx) + if ctx.cmd[0] == 'x' { + err = w.Free(res.Conn) + done++ + } else { + err = w.Read(ctx, res.Conn, nil) + } + if err != nil { + t.Fatal(err) + } + case gaio.OpRead: + if res.Error != nil { + t.Error(res.Error) + } + ctx := res.Context.(*detachCtx) + if ctx.cmd[1] != res.Buffer[0] { + t.Error("not match", ctx.id, string(ctx.cmd[0]), string(ctx.cmd[1]), string(res.Buffer[0])) + } + ctx.times++ + if times > 0 && ctx.times > times { + ctx.cmd[0] = 'x' + } else { + n := rand.Intn(10) + if n > 5 { + ctx.cmd[0] = 'p' + } else { + ctx.cmd[0] = byte(int('0') + n) + ctx.cmd[1] = ctx.cmd[0] + } + } + if err := w.Write(ctx, res.Conn, ctx.cmd[:1]); err != nil { + t.Fatal(err) + } + } + } + } + _ = w.Close() +} + +func TestDetach10(t *testing.T) { + testDetachParallel(t, 10, 100) +} + +func TestDetach100(t *testing.T) { + testDetachParallel(t, 100, 1000) +}