Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add detach operation. #12

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions aio_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ var (
ErrDeadline = errors.New("operation exceeded deadline")
// ErrEmptyBuffer means the buffer is nil
ErrEmptyBuffer = errors.New("empty buffer")
// ErrInvalidDetachResult means the result cannot be used
ErrInvalidDetachResult = errors.New("invalid detach result")
// ErrCPUID indicates the given cpuid is invalid
ErrCPUID = errors.New("no such core")
)
Expand All @@ -45,6 +47,8 @@ const (
OpRead OpType = iota
// OpWrite means the aiocb is a write operation
OpWrite
// OpDetach means the aiocb is a detach operation
OpDetach
// internal operation to delete an related resource
opDelete
)
Expand Down
42 changes: 35 additions & 7 deletions watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"container/list"
"io"
"net"
"os"
"reflect"
"runtime"
"sync"
Expand Down Expand Up @@ -258,11 +259,32 @@ func (w *watcher) WriteTimeout(ctx interface{}, conn net.Conn, buf []byte, deadl
}

// Free let the watcher to release resources related to this conn immediately,
// like socket file descriptors.
// like socket file descriptors. Will close the conn.
func (w *watcher) Free(conn net.Conn) error {
return w.aioCreate(nil, opDelete, conn, nil, zeroTime, false)
}

// Detach let the watcher to release resources related to this conn immediately,
// but keeps the conn and return its duplicated file descriptor in OpResult.Size.
func (w *watcher) Detach(ctx interface{}, conn net.Conn) error {
return w.aioCreate(ctx, OpDetach, conn, nil, zeroTime, false)
}

// RecoverConnFromDetachResult reassemble the file descriptor stored in
// `OpResult.Size` of a `Detach` result to a `net.Conn`.
func RecoverConnFromDetachResult(r OpResult) (net.Conn, error) {
if r.Operation != OpDetach || r.Error != nil || r.Size == 0 {
return nil, ErrInvalidDetachResult
}
f := os.NewFile(uintptr(r.Size), "")
if f == nil {
return nil, ErrInvalidDetachResult
}
conn, err := net.FileConn(f)
_ = f.Close()
return conn, err
}

// core async-io creation
func (w *watcher) aioCreate(ctx interface{}, op OpType, conn net.Conn, buf []byte, deadline time.Time, readfull bool) error {
select {
Expand Down Expand Up @@ -394,7 +416,7 @@ func (w *watcher) tryWrite(fd int, pcb *aiocb) bool {
}

// release connection related resources
func (w *watcher) releaseConn(ident int) {
func (w *watcher) releaseConn(ident int, close bool) {
if desc, ok := w.descs[ident]; ok {
// delete from heap
for e := desc.readers.Front(); e != nil; e = e.Next() {
Expand All @@ -413,7 +435,9 @@ func (w *watcher) releaseConn(ident int) {
delete(w.descs, ident)
delete(w.connIdents, desc.ptr)
// close socket file descriptor duplicated from net.Conn
syscall.Close(ident)
if close {
_ = syscall.Close(ident)
}
}
}

Expand All @@ -434,7 +458,7 @@ func (w *watcher) loop() {
// defer function to release all resources
defer func() {
for ident := range w.descs {
w.releaseConn(ident)
w.releaseConn(ident, true)
}
}()

Expand Down Expand Up @@ -477,7 +501,7 @@ func (w *watcher) loop() {
if ident, ok := w.connIdents[ptr]; ok {
// since it's gc-ed, queue is impossible to hold net.Conn
// we don't have to send to chIOCompletion,just release here
w.releaseConn(ident)
w.releaseConn(ident, true)
}
w.gc[i] = nil
}
Expand All @@ -498,8 +522,12 @@ func (w *watcher) handlePending(pending []*aiocb) {
for _, pcb := range pending {
ident, ok := w.connIdents[pcb.ptr]
// resource releasing operation
if pcb.op == opDelete && ok {
w.releaseConn(ident)
if (pcb.op == opDelete || pcb.op == OpDetach) && ok {
w.releaseConn(ident, pcb.op == opDelete)
if pcb.op == OpDetach {
pcb.size = ident
w.deliver(pcb)
}
continue
}

Expand Down
199 changes: 199 additions & 0 deletions watcher_detach_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package gaio_test

import (
"math/rand"
"net"
"testing"

"github.com/xtaci/gaio"
)

type detachCtx struct {
id int
cmd [2]byte
times int
conn net.Conn
}

func newDetachCtx() *detachCtx {
return &detachCtx{
cmd: [2]byte{'0', '0'},
}
}

// detachServer responses to character cmd received from connection,
// prints stored cmd or detach the connection and re-watch.
func detachServer(t testing.TB) net.Listener {
addr, _ := net.ResolveTCPAddr("tcp", "localhost:0")
ln, err := net.ListenTCP("tcp", addr)
if err != nil {
t.Fatal(err)
}

w, err := gaio.NewWatcher()
if err != nil {
t.Fatal(err)
}

go func() {
for {
results, err := w.WaitIO()
if err != nil {
return
}

for _, res := range results {
switch res.Operation {
case gaio.OpRead:
if res.Error != nil {
t.Log(res.Error)
_ = w.Free(res.Conn)
continue
}
ctx := res.Context.(*detachCtx)
if res.Size > 0 {
switch res.Buffer[0] {
case 'p': // print current cmd
_ = w.Write(ctx, res.Conn, ctx.cmd[:1])
case 'x': // exit
continue
default: // remember the cmd and detach
ctx.cmd[0] = res.Buffer[0]
_ = w.Detach(ctx, res.Conn)
}
}
case gaio.OpWrite:
if res.Error != nil {
t.Fatal(res.Error)
}
if res.Size > 0 {
// write complete, start read again
_ = w.Read(res.Context, res.Conn, nil)
}
case gaio.OpDetach:
if res.Error != nil {
t.Fatal(res.Error)
}
ctx := res.Context.(*detachCtx)
// conn in detach result should be the old one
if res.Conn != ctx.conn {
t.Error("conn changed", res.Conn, ctx.conn)
}
conn, err := gaio.RecoverConnFromDetachResult(res)
if err != nil {
t.Fatal(err)
}
ctx.conn = conn
// join the watcher again
_ = w.Write(ctx, conn, ctx.cmd[:1])
}
}
}
}()

go func() {
for {
conn, err := ln.AcceptTCP()
if err != nil {
_ = w.Close()
return
}
ctx := newDetachCtx()
ctx.conn = conn
err = w.Read(ctx, conn, nil)
if err != nil {
t.Fatal(err)
}
}
}()
return ln
}

func testDetachParallel(t *testing.T, par int, times int) {
t.Log("testing concurrent:", par, "connections")
ln := detachServer(t)
defer func() { _ = ln.Close() }()

w, err := gaio.NewWatcher()
if err != nil {
t.Fatal(err)
}
defer func() { _ = w.Close() }()

go func() {
for i := 0; i < par; i++ {
conn, err := net.Dial("tcp", ln.Addr().String())
if err != nil {
t.Fatal(err)
}
ctx := newDetachCtx()
ctx.id = i
ctx.cmd[0] = 'p' // print init cmd
err = w.Write(ctx, conn, ctx.cmd[:1])
if err != nil {
t.Fatal(err)
}
}
}()

done := 0
for done < par {
results, err := w.WaitIO()
if err != nil {
t.Log(err)
return
}

for _, res := range results {
switch res.Operation {
case gaio.OpWrite:
// recv
if res.Error != nil {
t.Error(res.Error)
}
ctx := res.Context.(*detachCtx)
if ctx.cmd[0] == 'x' {
err = w.Free(res.Conn)
done++
} else {
err = w.Read(ctx, res.Conn, nil)
}
if err != nil {
t.Fatal(err)
}
case gaio.OpRead:
if res.Error != nil {
t.Error(res.Error)
}
ctx := res.Context.(*detachCtx)
if ctx.cmd[1] != res.Buffer[0] {
t.Error("not match", ctx.id, string(ctx.cmd[0]), string(ctx.cmd[1]), string(res.Buffer[0]))
}
ctx.times++
if times > 0 && ctx.times > times {
ctx.cmd[0] = 'x'
} else {
n := rand.Intn(10)
if n > 5 {
ctx.cmd[0] = 'p'
} else {
ctx.cmd[0] = byte(int('0') + n)
ctx.cmd[1] = ctx.cmd[0]
}
}
if err := w.Write(ctx, res.Conn, ctx.cmd[:1]); err != nil {
t.Fatal(err)
}
}
}
}
_ = w.Close()
}

func TestDetach10(t *testing.T) {
testDetachParallel(t, 10, 100)
}

func TestDetach100(t *testing.T) {
testDetachParallel(t, 100, 1000)
}