Skip to content

Commit

Permalink
TUN-8709: Add session migration for datagram v3
Browse files Browse the repository at this point in the history
When a registration response from cloudflared gets lost on it's way back to the edge, the edge service will retry and send another registration request. Since cloudflared already has bound the local UDP socket for the provided request id, we want to re-send the registration response.

There are three types of retries that the edge will send:

1. A retry from the same QUIC connection index; cloudflared will just respond back with a registration response and reset the idle timer for the session.
2. A retry from a different QUIC connection index; cloudflared will need to migrate the current session connection to this new QUIC connection and reset the idle timer for the session.
3. A retry to a different cloudflared connector; cloudflared will eventually time the session out since no further packets will arrive to the session at the original connector.

Closes TUN-8709
  • Loading branch information
DevinCarr committed Nov 6, 2024
1 parent 70393b6 commit 952622a
Show file tree
Hide file tree
Showing 9 changed files with 359 additions and 75 deletions.
3 changes: 2 additions & 1 deletion connection/quic_datagram_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ type datagramV3Connection struct {
func NewDatagramV3Connection(ctx context.Context,
conn quic.Connection,
sessionManager cfdquic.SessionManager,
index uint8,
logger *zerolog.Logger,
) DatagramSessionHandler {
datagramMuxer := cfdquic.NewDatagramConn(conn, sessionManager, logger)
datagramMuxer := cfdquic.NewDatagramConn(conn, sessionManager, index, logger)

return &datagramV3Connection{
conn,
Expand Down
2 changes: 0 additions & 2 deletions quic/v3/datagram.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,6 @@ const (
ResponseDestinationUnreachable SessionRegistrationResp = 0x01
// Session registration was unable to bind to a local UDP socket.
ResponseUnableToBindSocket SessionRegistrationResp = 0x02
// Session registration is already bound to another connection.
ResponseSessionAlreadyConnected SessionRegistrationResp = 0x03
// Session registration failed with an unexpected error but provided a message.
ResponseErrorWithMsg SessionRegistrationResp = 0xff
)
Expand Down
16 changes: 11 additions & 5 deletions quic/v3/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,19 @@ import (
)

var (
ErrSessionNotFound = errors.New("session not found")
// ErrSessionNotFound indicates that a session has not been registered yet for the request id.
ErrSessionNotFound = errors.New("session not found")
// ErrSessionBoundToOtherConn is returned when a registration already exists for a different connection.
ErrSessionBoundToOtherConn = errors.New("session is in use by another connection")
// ErrSessionAlreadyRegistered is returned when a registration already exists for this connection.
ErrSessionAlreadyRegistered = errors.New("session is already registered for this connection")
)

type SessionManager interface {
// RegisterSession will register a new session if it does not already exist for the request ID.
// During new session creation, the session will also bind the UDP socket for the origin.
// If the session exists for a different connection, it will return [ErrSessionBoundToOtherConn].
RegisterSession(request *UDPSessionRegistrationDatagram, conn DatagramWriter) (Session, error)
RegisterSession(request *UDPSessionRegistrationDatagram, conn DatagramConn) (Session, error)
// GetSession returns an active session if available for the provided connection.
// If the session does not exist, it will return [ErrSessionNotFound]. If the session exists for a different
// connection, it will return [ErrSessionBoundToOtherConn].
Expand All @@ -45,12 +49,14 @@ func NewSessionManager(log *zerolog.Logger, originDialer DialUDP) SessionManager
}
}

func (s *sessionManager) RegisterSession(request *UDPSessionRegistrationDatagram, conn DatagramWriter) (Session, error) {
func (s *sessionManager) RegisterSession(request *UDPSessionRegistrationDatagram, conn DatagramConn) (Session, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
// Check to make sure session doesn't already exist for requestID
_, exists := s.sessions[request.RequestID]
if exists {
if session, exists := s.sessions[request.RequestID]; exists {
if conn.ID() == session.ConnectionID() {
return nil, ErrSessionAlreadyRegistered
}
return nil, ErrSessionBoundToOtherConn
}
// Attempt to bind the UDP socket for the new session
Expand Down
8 changes: 7 additions & 1 deletion quic/v3/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,14 @@ func TestRegisterSession(t *testing.T) {

// We shouldn't be able to register another session with the same request id
_, err = manager.RegisterSession(&request, &noopEyeball{})
if !errors.Is(err, v3.ErrSessionAlreadyRegistered) {
t.Fatalf("session is already registered for this connection: %v", err)
}

// We shouldn't be able to register another session with the same request id for a different connection
_, err = manager.RegisterSession(&request, &noopEyeball{connID: 1})
if !errors.Is(err, v3.ErrSessionBoundToOtherConn) {
t.Fatalf("session should not be able to be registered again: %v", err)
t.Fatalf("session is already registered for a separate connection: %v", err)
}

// Get session
Expand Down
75 changes: 63 additions & 12 deletions quic/v3/muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type DatagramConn interface {
DatagramWriter
// Serve provides a server interface to process and handle incoming QUIC datagrams and demux their datagram v3 payloads.
Serve(context.Context) error
// ID indicates connection index identifier
ID() uint8
}

// DatagramWriter provides the Muxer interface to create proper Datagrams when sending over a connection.
Expand All @@ -41,24 +43,30 @@ type QuicConnection interface {

type datagramConn struct {
conn QuicConnection
index uint8
sessionManager SessionManager
logger *zerolog.Logger

datagrams chan []byte
readErrors chan error
}

func NewDatagramConn(conn QuicConnection, sessionManager SessionManager, logger *zerolog.Logger) DatagramConn {
func NewDatagramConn(conn QuicConnection, sessionManager SessionManager, index uint8, logger *zerolog.Logger) DatagramConn {
log := logger.With().Uint8("datagramVersion", 3).Logger()
return &datagramConn{
conn: conn,
index: index,
sessionManager: sessionManager,
logger: &log,
datagrams: make(chan []byte, demuxChanCapacity),
readErrors: make(chan error, 2),
}
}

func (c datagramConn) ID() uint8 {
return c.index
}

func (c *datagramConn) SendUDPSessionDatagram(datagram []byte) error {
return c.conn.SendDatagram(datagram)
}
Expand Down Expand Up @@ -163,9 +171,20 @@ func (c *datagramConn) Serve(ctx context.Context) error {
// This method handles new registrations of a session and the serve loop for the session.
func (c *datagramConn) handleSessionRegistrationDatagram(ctx context.Context, datagram *UDPSessionRegistrationDatagram) {
session, err := c.sessionManager.RegisterSession(datagram, c)
if err != nil {
switch err {
case nil:
// Continue as normal
case ErrSessionAlreadyRegistered:
// Session is already registered and likely the response got lost
c.handleSessionAlreadyRegistered(datagram.RequestID)
return
case ErrSessionBoundToOtherConn:
// Session is already registered but to a different connection
c.handleSessionMigration(datagram.RequestID)
return
default:
c.logger.Err(err).Msgf("session registration failure")
c.handleSessionRegistrationFailure(datagram.RequestID, err)
c.handleSessionRegistrationFailure(datagram.RequestID)
return
}
// Make sure to eventually remove the session from the session manager when the session is closed
Expand Down Expand Up @@ -197,17 +216,49 @@ func (c *datagramConn) handleSessionRegistrationDatagram(ctx context.Context, da
c.logger.Err(err).Msgf("session was closed with an error")
}

func (c *datagramConn) handleSessionRegistrationFailure(requestID RequestID, regErr error) {
var errResp SessionRegistrationResp
switch regErr {
case ErrSessionBoundToOtherConn:
errResp = ResponseSessionAlreadyConnected
default:
errResp = ResponseUnableToBindSocket
func (c *datagramConn) handleSessionAlreadyRegistered(requestID RequestID) {
// Send another registration response since the session is already active
err := c.SendUDPSessionResponse(requestID, ResponseOk)
if err != nil {
c.logger.Err(err).Msgf("session registration failure: unable to send an additional session registration response")
return
}
err := c.SendUDPSessionResponse(requestID, errResp)

session, err := c.sessionManager.GetSession(requestID)
if err != nil {
// If for some reason we can not find the session after attempting to register it, we can just return
// instead of trying to reset the idle timer for it.
return
}
// The session is already running in another routine so we want to restart the idle timeout since no proxied
// packets have come down yet.
session.ResetIdleTimer()
}

func (c *datagramConn) handleSessionMigration(requestID RequestID) {
// We need to migrate the currently running session to this edge connection.
session, err := c.sessionManager.GetSession(requestID)
if err != nil {
// If for some reason we can not find the session after attempting to register it, we can just return
// instead of trying to reset the idle timer for it.
return
}

// Migrate the session to use this edge connection instead of the currently running one.
session.Migrate(c)

// Send another registration response since the session is already active
err = c.SendUDPSessionResponse(requestID, ResponseOk)
if err != nil {
c.logger.Err(err).Msgf("session registration failure: unable to send an additional session registration response")
return
}
}

func (c *datagramConn) handleSessionRegistrationFailure(requestID RequestID) {
err := c.SendUDPSessionResponse(requestID, ResponseUnableToBindSocket)
if err != nil {
c.logger.Err(err).Msgf("unable to send session registration error response (%d)", errResp)
c.logger.Err(err).Msgf("unable to send session registration error response (%d)", ResponseUnableToBindSocket)
}
}

Expand Down
Loading

0 comments on commit 952622a

Please sign in to comment.