Skip to content

Commit

Permalink
TUN-8640: Refactor ICMPRouter to support new ICMPResponders
Browse files Browse the repository at this point in the history
A new ICMPResponder interface is introduced to provide different
implementations of how the ICMP flows should return to the QUIC
connection muxer.

Improves usages of netip.AddrPort to leverage the embedded zone
field for IPv6 addresses.

Closes TUN-8640
  • Loading branch information
DevinCarr committed Nov 27, 2024
1 parent 46dc631 commit 9da15b5
Show file tree
Hide file tree
Showing 19 changed files with 199 additions and 236 deletions.
2 changes: 1 addition & 1 deletion cmd/cloudflared/tunnel/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ func StartServer(

// Disable ICMP packet routing for quick tunnels
if quickTunnelURL != "" {
tunnelConfig.PacketConfig = nil
tunnelConfig.ICMPRouterServer = nil
}

internalRules := []ingress.Rule{}
Expand Down
20 changes: 7 additions & 13 deletions cmd/cloudflared/tunnel/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,11 @@ func prepareTunnelConfig(
QUICConnectionLevelFlowControlLimit: c.Uint64(quicConnLevelFlowControlLimit),
QUICStreamLevelFlowControlLimit: c.Uint64(quicStreamLevelFlowControlLimit),
}
packetConfig, err := newPacketConfig(c, log)
icmpRouter, err := newICMPRouter(c, log)
if err != nil {
log.Warn().Err(err).Msg("ICMP proxy feature is disabled")
} else {
tunnelConfig.PacketConfig = packetConfig
tunnelConfig.ICMPRouterServer = icmpRouter
}
orchestratorConfig := &orchestration.Config{
Ingress: &ingressRules,
Expand Down Expand Up @@ -351,7 +351,7 @@ func adjustIPVersionByBindAddress(ipVersion allregions.ConfigIPVersion, ip net.I
}
}

func newPacketConfig(c *cli.Context, logger *zerolog.Logger) (*ingress.GlobalRouterConfig, error) {
func newICMPRouter(c *cli.Context, logger *zerolog.Logger) (ingress.ICMPRouterServer, error) {
ipv4Src, err := determineICMPv4Src(c.String("icmpv4-src"), logger)
if err != nil {
return nil, errors.Wrap(err, "failed to determine IPv4 source address for ICMP proxy")
Expand All @@ -368,16 +368,11 @@ func newPacketConfig(c *cli.Context, logger *zerolog.Logger) (*ingress.GlobalRou
logger.Info().Msgf("ICMP proxy will use %s as source for IPv6", ipv6Src)
}

icmpRouter, err := ingress.NewICMPRouter(ipv4Src, ipv6Src, zone, logger, icmpFunnelTimeout)
icmpRouter, err := ingress.NewICMPRouter(ipv4Src, ipv6Src, logger, icmpFunnelTimeout)
if err != nil {
return nil, err
}
return &ingress.GlobalRouterConfig{
ICMPRouter: icmpRouter,
IPv4Src: ipv4Src,
IPv6Src: ipv6Src,
Zone: zone,
}, nil
return icmpRouter, nil
}

func determineICMPv4Src(userDefinedSrc string, logger *zerolog.Logger) (netip.Addr, error) {
Expand Down Expand Up @@ -407,13 +402,12 @@ type interfaceIP struct {

func determineICMPv6Src(userDefinedSrc string, logger *zerolog.Logger, ipv4Src netip.Addr) (addr netip.Addr, zone string, err error) {
if userDefinedSrc != "" {
userDefinedIP, zone, _ := strings.Cut(userDefinedSrc, "%")
addr, err := netip.ParseAddr(userDefinedIP)
addr, err := netip.ParseAddr(userDefinedSrc)
if err != nil {
return netip.Addr{}, "", err
}
if addr.Is6() {
return addr, zone, nil
return addr, addr.Zone(), nil
}
return netip.Addr{}, "", fmt.Errorf("expect IPv6, but %s is IPv4", userDefinedSrc)
}
Expand Down
27 changes: 0 additions & 27 deletions connection/quic_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"io"
"net"
"net/http"
"net/netip"
"strconv"
"strings"
"sync/atomic"
Expand All @@ -18,7 +17,6 @@ import (
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"

"github.com/cloudflare/cloudflared/packet"
cfdquic "github.com/cloudflare/cloudflared/quic"
"github.com/cloudflare/cloudflared/tracing"
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
Expand Down Expand Up @@ -417,28 +415,3 @@ func (np *nopCloserReadWriter) Close() error {

return nil
}

// muxerWrapper wraps DatagramMuxerV2 to satisfy the packet.FunnelUniPipe interface
type muxerWrapper struct {
muxer *cfdquic.DatagramMuxerV2
}

func (rp *muxerWrapper) SendPacket(dst netip.Addr, pk packet.RawPacket) error {
return rp.muxer.SendPacket(cfdquic.RawPacket(pk))
}

func (rp *muxerWrapper) ReceivePacket(ctx context.Context) (packet.RawPacket, error) {
pk, err := rp.muxer.ReceivePacket(ctx)
if err != nil {
return packet.RawPacket{}, err
}
rawPacket, ok := pk.(cfdquic.RawPacket)
if ok {
return packet.RawPacket(rawPacket), nil
}
return packet.RawPacket{}, fmt.Errorf("unexpected packet type %+v", pk)
}

func (rp *muxerWrapper) Close() error {
return nil
}
2 changes: 1 addition & 1 deletion connection/quic_connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ func testTunnelConnection(t *testing.T, serverAddr netip.AddrPort, index uint8)
sessionDemuxChan := make(chan *packet.Session, 4)
datagramMuxer := cfdquic.NewDatagramMuxerV2(conn, &log, sessionDemuxChan)
sessionManager := datagramsession.NewManager(&log, datagramMuxer.SendToSession, sessionDemuxChan)
packetRouter := ingress.NewPacketRouter(nil, datagramMuxer, &log)
packetRouter := ingress.NewPacketRouter(nil, datagramMuxer, 0, &log)

datagramConn := &datagramV2Connection{
conn,
Expand Down
5 changes: 3 additions & 2 deletions connection/quic_datagram_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,16 @@ type datagramV2Connection struct {

func NewDatagramV2Connection(ctx context.Context,
conn quic.Connection,
packetConfig *ingress.GlobalRouterConfig,
icmpRouter ingress.ICMPRouter,
index uint8,
rpcTimeout time.Duration,
streamWriteTimeout time.Duration,
logger *zerolog.Logger,
) DatagramSessionHandler {
sessionDemuxChan := make(chan *packet.Session, demuxChanCapacity)
datagramMuxer := cfdquic.NewDatagramMuxerV2(conn, logger, sessionDemuxChan)
sessionManager := datagramsession.NewManager(logger, datagramMuxer.SendToSession, sessionDemuxChan)
packetRouter := ingress.NewPacketRouter(packetConfig, datagramMuxer, logger)
packetRouter := ingress.NewPacketRouter(icmpRouter, datagramMuxer, index, logger)

return &datagramV2Connection{
conn,
Expand Down
25 changes: 11 additions & 14 deletions ingress/icmp_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ type icmpProxy struct {
srcFunnelTracker *packet.FunnelTracker
echoIDTracker *echoIDTracker
conn *icmp.PacketConn
// Response is handled in one-by-one, so encoder can be shared between funnels
encoder *packet.Encoder
logger *zerolog.Logger
idleTimeout time.Duration
logger *zerolog.Logger
idleTimeout time.Duration
}

// echoIDTracker tracks which ID has been assigned. It first loops through assignment from lastAssignment to then end,
Expand Down Expand Up @@ -114,25 +112,24 @@ func (snf echoFunnelID) String() string {
return strconv.FormatUint(uint64(snf), 10)
}

func newICMPProxy(listenIP netip.Addr, zone string, logger *zerolog.Logger, idleTimeout time.Duration) (*icmpProxy, error) {
conn, err := newICMPConn(listenIP, zone)
func newICMPProxy(listenIP netip.Addr, logger *zerolog.Logger, idleTimeout time.Duration) (*icmpProxy, error) {
conn, err := newICMPConn(listenIP)
if err != nil {
return nil, err
}
logger.Info().Msgf("Created ICMP proxy listening on %s", conn.LocalAddr())
return &icmpProxy{
srcFunnelTracker: packet.NewFunnelTracker(),
echoIDTracker: newEchoIDTracker(),
encoder: packet.NewEncoder(),
conn: conn,
logger: logger,
idleTimeout: idleTimeout,
}, nil
}

func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error {
_, span := responder.requestSpan(ctx, pk)
defer responder.exportSpan()
func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder ICMPResponder) error {
_, span := responder.RequestSpan(ctx, pk)
defer responder.ExportSpan()

originalEcho, err := getICMPEcho(pk.Message)
if err != nil {
Expand All @@ -154,7 +151,7 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
}
span.SetAttributes(attribute.Int("assignedEchoID", int(assignedEchoID)))

shouldReplaceFunnelFunc := createShouldReplaceFunnelFunc(ip.logger, responder.datagramMuxer, pk, originalEcho.ID)
shouldReplaceFunnelFunc := createShouldReplaceFunnelFunc(ip.logger, responder, pk, originalEcho.ID)
newFunnelFunc := func() (packet.Funnel, error) {
originalEcho, err := getICMPEcho(pk.Message)
if err != nil {
Expand All @@ -164,7 +161,7 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
ip.echoIDTracker.release(echoIDTrackerKey, assignedEchoID)
return nil
}
icmpFlow := newICMPEchoFlow(pk.Src, closeCallback, ip.conn, responder, int(assignedEchoID), originalEcho.ID, ip.encoder)
icmpFlow := newICMPEchoFlow(pk.Src, closeCallback, ip.conn, responder, int(assignedEchoID), originalEcho.ID)
return icmpFlow, nil
}
funnelID := echoFunnelID(assignedEchoID)
Expand Down Expand Up @@ -265,8 +262,8 @@ func (ip *icmpProxy) sendReply(ctx context.Context, reply *echoReply) error {
return err
}

_, span := icmpFlow.responder.replySpan(ctx, ip.logger)
defer icmpFlow.responder.exportSpan()
_, span := icmpFlow.responder.ReplySpan(ctx, ip.logger)
defer icmpFlow.responder.ExportSpan()

if err := icmpFlow.returnToSrc(reply); err != nil {
tracing.EndWithErrorStatus(span, err)
Expand Down
4 changes: 2 additions & 2 deletions ingress/icmp_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ var errICMPProxyNotImplemented = fmt.Errorf("ICMP proxy is not implemented on %s

type icmpProxy struct{}

func (ip icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error {
func (ip icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder ICMPResponder) error {
return errICMPProxyNotImplemented
}

func (ip *icmpProxy) Serve(ctx context.Context) error {
return errICMPProxyNotImplemented
}

func newICMPProxy(listenIP netip.Addr, zone string, logger *zerolog.Logger, idleTimeout time.Duration) (*icmpProxy, error) {
func newICMPProxy(listenIP netip.Addr, logger *zerolog.Logger, idleTimeout time.Duration) (*icmpProxy, error) {
return nil, errICMPProxyNotImplemented
}
26 changes: 12 additions & 14 deletions ingress/icmp_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,23 @@ var (
type icmpProxy struct {
srcFunnelTracker *packet.FunnelTracker
listenIP netip.Addr
ipv6Zone string
logger *zerolog.Logger
idleTimeout time.Duration
}

func newICMPProxy(listenIP netip.Addr, zone string, logger *zerolog.Logger, idleTimeout time.Duration) (*icmpProxy, error) {
if err := testPermission(listenIP, zone, logger); err != nil {
func newICMPProxy(listenIP netip.Addr, logger *zerolog.Logger, idleTimeout time.Duration) (*icmpProxy, error) {
if err := testPermission(listenIP, logger); err != nil {
return nil, err
}
return &icmpProxy{
srcFunnelTracker: packet.NewFunnelTracker(),
listenIP: listenIP,
ipv6Zone: zone,
logger: logger,
idleTimeout: idleTimeout,
}, nil
}

func testPermission(listenIP netip.Addr, zone string, logger *zerolog.Logger) error {
func testPermission(listenIP netip.Addr, logger *zerolog.Logger) error {
// Opens a non-privileged ICMP socket. On Linux the group ID of the process needs to be in ping_group_range
// Only check ping_group_range once for IPv4
if listenIP.Is4() {
Expand All @@ -64,7 +62,7 @@ func testPermission(listenIP netip.Addr, zone string, logger *zerolog.Logger) er
return err
}
}
conn, err := newICMPConn(listenIP, zone)
conn, err := newICMPConn(listenIP)
if err != nil {
return err
}
Expand Down Expand Up @@ -98,9 +96,9 @@ func checkInPingGroup() error {
return fmt.Errorf("did not find group range in %s", pingGroupPath)
}

func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error {
ctx, span := responder.requestSpan(ctx, pk)
defer responder.exportSpan()
func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder ICMPResponder) error {
ctx, span := responder.RequestSpan(ctx, pk)
defer responder.ExportSpan()

originalEcho, err := getICMPEcho(pk.Message)
if err != nil {
Expand All @@ -109,9 +107,9 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
}
observeICMPRequest(ip.logger, span, pk.Src.String(), pk.Dst.String(), originalEcho.ID, originalEcho.Seq)

shouldReplaceFunnelFunc := createShouldReplaceFunnelFunc(ip.logger, responder.datagramMuxer, pk, originalEcho.ID)
shouldReplaceFunnelFunc := createShouldReplaceFunnelFunc(ip.logger, responder, pk, originalEcho.ID)
newFunnelFunc := func() (packet.Funnel, error) {
conn, err := newICMPConn(ip.listenIP, ip.ipv6Zone)
conn, err := newICMPConn(ip.listenIP)
if err != nil {
tracing.EndWithErrorStatus(span, err)
return nil, errors.Wrap(err, "failed to open ICMP socket")
Expand All @@ -127,7 +125,7 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
span.SetAttributes(attribute.Int("port", localUDPAddr.Port))

echoID := localUDPAddr.Port
icmpFlow := newICMPEchoFlow(pk.Src, closeCallback, conn, responder, echoID, originalEcho.ID, packet.NewEncoder())
icmpFlow := newICMPEchoFlow(pk.Src, closeCallback, conn, responder, echoID, originalEcho.ID)
return icmpFlow, nil
}
funnelID := flow3Tuple{
Expand Down Expand Up @@ -181,8 +179,8 @@ func (ip *icmpProxy) listenResponse(ctx context.Context, flow *icmpEchoFlow) {

// Listens for ICMP response and handles error logging
func (ip *icmpProxy) handleResponse(ctx context.Context, flow *icmpEchoFlow, buf []byte) (done bool) {
_, span := flow.responder.replySpan(ctx, ip.logger)
defer flow.responder.exportSpan()
_, span := flow.responder.ReplySpan(ctx, ip.logger)
defer flow.responder.ExportSpan()

span.SetAttributes(
attribute.Int("originalEchoID", flow.originalEchoID),
Expand Down
28 changes: 9 additions & 19 deletions ingress/icmp_posix.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,20 @@ import (
)

// Opens a non-privileged ICMP socket on Linux and Darwin
func newICMPConn(listenIP netip.Addr, zone string) (*icmp.PacketConn, error) {
func newICMPConn(listenIP netip.Addr) (*icmp.PacketConn, error) {
if listenIP.Is4() {
return icmp.ListenPacket("udp4", listenIP.String())
}
listenAddr := listenIP.String()
if zone != "" {
listenAddr = listenAddr + "%" + zone
}
return icmp.ListenPacket("udp6", listenAddr)
return icmp.ListenPacket("udp6", listenIP.String())
}

func netipAddr(addr net.Addr) (netip.Addr, bool) {
udpAddr, ok := addr.(*net.UDPAddr)
if !ok {
return netip.Addr{}, false
}
return netip.AddrFromSlice(udpAddr.IP)

return udpAddr.AddrPort().Addr(), true
}

type flow3Tuple struct {
Expand All @@ -50,14 +47,12 @@ type icmpEchoFlow struct {
closed *atomic.Bool
src netip.Addr
originConn *icmp.PacketConn
responder *packetResponder
responder ICMPResponder
assignedEchoID int
originalEchoID int
// it's up to the user to ensure respEncoder is not used concurrently
respEncoder *packet.Encoder
}

func newICMPEchoFlow(src netip.Addr, closeCallback func() error, originConn *icmp.PacketConn, responder *packetResponder, assignedEchoID, originalEchoID int, respEncoder *packet.Encoder) *icmpEchoFlow {
func newICMPEchoFlow(src netip.Addr, closeCallback func() error, originConn *icmp.PacketConn, responder ICMPResponder, assignedEchoID, originalEchoID int) *icmpEchoFlow {
return &icmpEchoFlow{
ActivityTracker: packet.NewActivityTracker(),
closeCallback: closeCallback,
Expand All @@ -67,7 +62,6 @@ func newICMPEchoFlow(src netip.Addr, closeCallback func() error, originConn *icm
responder: responder,
assignedEchoID: assignedEchoID,
originalEchoID: originalEchoID,
respEncoder: respEncoder,
}
}

Expand Down Expand Up @@ -139,11 +133,7 @@ func (ief *icmpEchoFlow) returnToSrc(reply *echoReply) error {
},
Message: reply.msg,
}
serializedPacket, err := ief.respEncoder.Encode(&pk)
if err != nil {
return err
}
return ief.responder.returnPacket(serializedPacket)
return ief.responder.ReturnPacket(&pk)
}

type echoReply struct {
Expand Down Expand Up @@ -184,7 +174,7 @@ func toICMPEchoFlow(funnel packet.Funnel) (*icmpEchoFlow, error) {
return icmpFlow, nil
}

func createShouldReplaceFunnelFunc(logger *zerolog.Logger, muxer muxer, pk *packet.ICMP, originalEchoID int) func(packet.Funnel) bool {
func createShouldReplaceFunnelFunc(logger *zerolog.Logger, responder ICMPResponder, pk *packet.ICMP, originalEchoID int) func(packet.Funnel) bool {
return func(existing packet.Funnel) bool {
existingFlow, err := toICMPEchoFlow(existing)
if err != nil {
Expand All @@ -199,7 +189,7 @@ func createShouldReplaceFunnelFunc(logger *zerolog.Logger, muxer muxer, pk *pack
// If the existing flow has a different muxer, there's a new quic connection where return packets should be
// routed. Otherwise, return packets will be send to the first observed incoming connection, rather than the
// most recently observed connection.
if existingFlow.responder.datagramMuxer != muxer {
if existingFlow.responder.ConnectionIndex() != responder.ConnectionIndex() {
logger.Debug().
Str("src", pk.Src.String()).
Str("dst", pk.Dst.String()).
Expand Down
Loading

0 comments on commit 9da15b5

Please sign in to comment.