diff --git a/internal/socket/callback.go b/internal/socket/callback.go index d506e53..6db3230 100644 --- a/internal/socket/callback.go +++ b/internal/socket/callback.go @@ -4,7 +4,6 @@ import ( "io" "github.com/jjeffcaii/reactor-go" - "github.com/rsocket/rsocket-go/internal/common" "github.com/rsocket/rsocket-go/rx" "github.com/rsocket/rsocket-go/rx/flux" "github.com/rsocket/rsocket-go/rx/mono" @@ -26,13 +25,11 @@ func (s requestStreamCallback) stopWithError(err error) { } type requestResponseCallback struct { - sink mono.Sink - cache interface{} + sink mono.Sink } func (s requestResponseCallback) stopWithError(err error) { s.sink.Error(err) - common.TryRelease(s.cache) } type requestChannelCallback struct { diff --git a/internal/socket/duplex.go b/internal/socket/duplex.go index db21bea..cfc45c2 100644 --- a/internal/socket/duplex.go +++ b/internal/socket/duplex.go @@ -139,7 +139,7 @@ func (dc *DuplexConnection) Close() error { dc.destroySndQueue() dc.destroySndBacklog() - dc.destroyFragment() + dc.destroyAllFragments() if dc.destroyReqSche { _ = dc.reqSche.Close() @@ -172,7 +172,7 @@ func (dc *DuplexConnection) destroyHandler(err error) { } } -func (dc *DuplexConnection) destroyFragment() { +func (dc *DuplexConnection) destroyAllFragments() { dc.fragments.Range(func(_, i interface{}) bool { common.TryRelease(i) return true @@ -255,11 +255,10 @@ func (dc *DuplexConnection) RequestResponse(req payload.Payload) (res mono.Mono) handler := &requestResponseCallback{} onFinally := func(s reactor.SignalType, d reactor.Disposable) { - common.TryRelease(handler.cache) if s == reactor.SignalTypeCancel { dc.sendFrame(framing.NewWriteableCancelFrame(sid)) } - // Unregister handler w/sink (processor). + // Unregisters handler w/sink (processor), releases fragment. dc.unregister(sid) // Dispose sink (processor). d.Dispose() @@ -925,7 +924,6 @@ func (dc *DuplexConnection) onFramePayload(frame core.BufferedFrame) error { switch handler := v.(type) { case *requestResponseCallback: - handler.cache = next handler.sink.Success(next) case requestStreamCallback: fg := h.Flag()