Skip to content

Commit

Permalink
TUN-8419: Add capnp safe transport
Browse files Browse the repository at this point in the history
To help support temporary errors that can occur in the capnp rpc
calls, a wrapper is introduced to inspect the error conditions and
allow for retrying within a short window.
  • Loading branch information
DevinCarr committed May 20, 2024
1 parent eb2e434 commit 2db0021
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 5 deletions.
3 changes: 2 additions & 1 deletion tunnelrpc/proto/tunnelrpc.capnp.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion tunnelrpc/quic/cloudflared_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/google/uuid"

"github.com/cloudflare/cloudflared/tunnelrpc"
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
)

Expand All @@ -29,7 +30,7 @@ func NewCloudflaredClient(ctx context.Context, stream io.ReadWriteCloser, reques
if n != len(rpcStreamProtocolSignature) {
return nil, fmt.Errorf("expect to write %d bytes for RPC stream protocol signature, wrote %d", len(rpcStreamProtocolSignature), n)
}
transport := rpc.StreamTransport(stream)
transport := tunnelrpc.SafeTransport(stream)
conn := rpc.NewConn(transport)
client := pogs.NewCloudflaredServer_PogsClient(conn.Bootstrap(ctx), conn)
return &CloudflaredClient{
Expand Down
3 changes: 2 additions & 1 deletion tunnelrpc/quic/cloudflared_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"zombiezen.com/go/capnproto2/rpc"

"github.com/cloudflare/cloudflared/tunnelrpc"
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
)

Expand Down Expand Up @@ -53,7 +54,7 @@ func (s *CloudflaredServer) Serve(ctx context.Context, stream io.ReadWriteCloser
func (s *CloudflaredServer) handleRPC(ctx context.Context, stream io.ReadWriteCloser) error {
ctx, cancel := context.WithTimeout(ctx, s.responseTimeout)
defer cancel()
transport := rpc.StreamTransport(stream)
transport := tunnelrpc.SafeTransport(stream)
defer transport.Close()

main := pogs.CloudflaredServer_ServerToClient(s.sessionManager, s.configManager)
Expand Down
3 changes: 2 additions & 1 deletion tunnelrpc/quic/session_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/google/uuid"
"zombiezen.com/go/capnproto2/rpc"

"github.com/cloudflare/cloudflared/tunnelrpc"
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
)

Expand All @@ -28,7 +29,7 @@ func NewSessionClient(ctx context.Context, stream io.ReadWriteCloser, requestTim
if n != len(rpcStreamProtocolSignature) {
return nil, fmt.Errorf("expect to write %d bytes for RPC stream protocol signature, wrote %d", len(rpcStreamProtocolSignature), n)
}
transport := rpc.StreamTransport(stream)
transport := tunnelrpc.SafeTransport(stream)
conn := rpc.NewConn(transport)
return &SessionClient{
client: pogs.NewSessionManager_PogsClient(conn.Bootstrap(ctx), conn),
Expand Down
3 changes: 2 additions & 1 deletion tunnelrpc/quic/session_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"zombiezen.com/go/capnproto2/rpc"

"github.com/cloudflare/cloudflared/tunnelrpc"
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
)

Expand Down Expand Up @@ -43,7 +44,7 @@ func (s *SessionManagerServer) Serve(ctx context.Context, stream io.ReadWriteClo
ctx, cancel := context.WithTimeout(ctx, s.responseTimeout)
defer cancel()

transport := rpc.StreamTransport(stream)
transport := tunnelrpc.SafeTransport(stream)
defer transport.Close()

main := pogs.SessionManager_ServerToClient(s.sessionManager)
Expand Down
69 changes: 69 additions & 0 deletions tunnelrpc/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package tunnelrpc

import (
"io"
"time"

"github.com/pkg/errors"
"zombiezen.com/go/capnproto2/rpc"
)

const (
// These default values are here so that we give some time for the underlying connection/stream
// to recover in the face of what we believe to be temporarily errors.
// We don't want to be too aggressive, as the end result of giving a final error (non-temporary)
// will result in the connection to be dropped.
// In turn, the other side will probably reconnect, which will put again more pressure in the overall system.
// So, the best solution is to give it some conservative time to recover.
defaultSleepBetweenTemporaryError = 500 * time.Millisecond
defaultMaxRetries = 3
)

type readWriterSafeTemporaryErrorCloser struct {
io.ReadWriteCloser

retries int
sleepBetweenRetries time.Duration
maxRetries int
}

func (r *readWriterSafeTemporaryErrorCloser) Read(p []byte) (n int, err error) {
n, err = r.ReadWriteCloser.Read(p)

// if there was a failure reading from the read closer, and the error is temporary, try again in some seconds
// otherwise, just fail without a temporary error.
if n == 0 && err != nil && isTemporaryError(err) {
if r.retries >= r.maxRetries {
return 0, errors.Wrap(err, "failed read from capnproto ReaderWriter after multiple temporary errors")
} else {
r.retries += 1

// sleep for some time to prevent quick read loops that cause exhaustion of CPU resources
time.Sleep(r.sleepBetweenRetries)
}
}

if err == nil {
r.retries = 0
}

return n, err
}

func SafeTransport(rw io.ReadWriteCloser) rpc.Transport {
return rpc.StreamTransport(&readWriterSafeTemporaryErrorCloser{
ReadWriteCloser: rw,
maxRetries: defaultMaxRetries,
sleepBetweenRetries: defaultSleepBetweenTemporaryError,
})
}

// isTemporaryError reports whether e has a Temporary() method that
// returns true.
func isTemporaryError(e error) bool {
type temp interface {
Temporary() bool
}
t, ok := e.(temp)
return ok && t.Temporary()
}

0 comments on commit 2db0021

Please sign in to comment.