Skip to content

Commit

Permalink
TUN-8158: Bring back commit e653741 and fixes infinite loop on linux …
Browse files Browse the repository at this point in the history
…when the socket is closed
  • Loading branch information
chungthuang committed Jan 22, 2024
1 parent f75503b commit 9c1f5c3
Show file tree
Hide file tree
Showing 15 changed files with 376 additions and 64 deletions.
7 changes: 5 additions & 2 deletions cmd/cloudflared/tunnel/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ import (
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
)

const secretValue = "*****"
const (
secretValue = "*****"
icmpFunnelTimeout = time.Second * 10
)

var (
developerPortal = "https://developers.cloudflare.com/cloudflare-one/connections/connect-apps/install-and-setup"
Expand Down Expand Up @@ -361,7 +364,7 @@ func newPacketConfig(c *cli.Context, logger *zerolog.Logger) (*ingress.GlobalRou
logger.Info().Msgf("ICMP proxy will use %s as source for IPv6", ipv6Src)
}

icmpRouter, err := ingress.NewICMPRouter(ipv4Src, ipv6Src, zone, logger)
icmpRouter, err := ingress.NewICMPRouter(ipv4Src, ipv6Src, zone, logger, icmpFunnelTimeout)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ require (
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect
github.com/facebookgo/subset v0.0.0-20150612182917-8dac2c3c4870 // indirect
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568 // indirect
github.com/fortytw2/leaktest v1.3.0 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ github.com/facebookgo/subset v0.0.0-20150612182917-8dac2c3c4870 h1:E2s37DuLxFhQD
github.com/facebookgo/subset v0.0.0-20150612182917-8dac2c3c4870/go.mod h1:5tD+neXqOorC30/tWg0LCSkrqj/AR6gu8yY8/fpw1q0=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568 h1:BHsljHzVlRcyQhjrss6TZTdY2VfCqZPbv5k3iBFa2ZQ=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/getsentry/sentry-go v0.16.0 h1:owk+S+5XcgJLlGR/3+3s6N4d+uKwqYvh/eS0AIMjPWo=
Expand Down
18 changes: 7 additions & 11 deletions ingress/icmp_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,16 @@ func newICMPProxy(listenIP netip.Addr, zone string, logger *zerolog.Logger, idle
}

func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error {
ctx, span := responder.requestSpan(ctx, pk)
_, span := responder.requestSpan(ctx, pk)
defer responder.exportSpan()

originalEcho, err := getICMPEcho(pk.Message)
if err != nil {
tracing.EndWithErrorStatus(span, err)
return err
}
span.SetAttributes(
attribute.Int("originalEchoID", originalEcho.ID),
attribute.Int("seq", originalEcho.Seq),
)
observeICMPRequest(ip.logger, span, pk.Src.String(), pk.Dst.String(), originalEcho.ID, originalEcho.Seq)

echoIDTrackerKey := flow3Tuple{
srcIP: pk.Src,
dstIP: pk.Dst,
Expand Down Expand Up @@ -189,6 +187,7 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
tracing.EndWithErrorStatus(span, err)
return err
}

err = icmpFlow.sendToDst(pk.Dst, pk.Message)
if err != nil {
tracing.EndWithErrorStatus(span, err)
Expand Down Expand Up @@ -269,15 +268,12 @@ func (ip *icmpProxy) sendReply(ctx context.Context, reply *echoReply) error {
_, span := icmpFlow.responder.replySpan(ctx, ip.logger)
defer icmpFlow.responder.exportSpan()

span.SetAttributes(
attribute.String("dst", reply.from.String()),
attribute.Int("echoID", reply.echo.ID),
attribute.Int("seq", reply.echo.Seq),
attribute.Int("originalEchoID", icmpFlow.originalEchoID),
)
if err := icmpFlow.returnToSrc(reply); err != nil {
tracing.EndWithErrorStatus(span, err)
return err
}
observeICMPReply(ip.logger, span, reply.from.String(), reply.echo.ID, reply.echo.Seq)
span.SetAttributes(attribute.Int("originalEchoID", icmpFlow.originalEchoID))
tracing.End(span)
return nil
}
50 changes: 22 additions & 28 deletions ingress/icmp_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,7 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
tracing.EndWithErrorStatus(span, err)
return err
}
span.SetAttributes(
attribute.Int("originalEchoID", originalEcho.ID),
attribute.Int("seq", originalEcho.Seq),
)
observeICMPRequest(ip.logger, span, pk.Src.String(), pk.Dst.String(), originalEcho.ID, originalEcho.Seq)

shouldReplaceFunnelFunc := createShouldReplaceFunnelFunc(ip.logger, responder.datagramMuxer, pk, originalEcho.ID)
newFunnelFunc := func() (packet.Funnel, error) {
Expand Down Expand Up @@ -156,14 +153,8 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
Int("originalEchoID", originalEcho.ID).
Msg("New flow")
go func() {
defer ip.srcFunnelTracker.Unregister(funnelID, icmpFlow)
if err := ip.listenResponse(ctx, icmpFlow); err != nil {
ip.logger.Debug().Err(err).
Str("src", pk.Src.String()).
Str("dst", pk.Dst.String()).
Int("originalEchoID", originalEcho.ID).
Msg("Failed to listen for ICMP echo response")
}
ip.listenResponse(ctx, icmpFlow)
ip.srcFunnelTracker.Unregister(funnelID, icmpFlow)
}()
}
if err := icmpFlow.sendToDst(pk.Dst, pk.Message); err != nil {
Expand All @@ -179,17 +170,17 @@ func (ip *icmpProxy) Serve(ctx context.Context) error {
return ctx.Err()
}

func (ip *icmpProxy) listenResponse(ctx context.Context, flow *icmpEchoFlow) error {
func (ip *icmpProxy) listenResponse(ctx context.Context, flow *icmpEchoFlow) {
buf := make([]byte, mtu)
for {
retryable, err := ip.handleResponse(ctx, flow, buf)
if err != nil && !retryable {
return err
if done := ip.handleResponse(ctx, flow, buf); done {
return
}
}
}

func (ip *icmpProxy) handleResponse(ctx context.Context, flow *icmpEchoFlow, buf []byte) (retryableErr bool, err error) {
// Listens for ICMP response and handles error logging
func (ip *icmpProxy) handleResponse(ctx context.Context, flow *icmpEchoFlow, buf []byte) (done bool) {
_, span := flow.responder.replySpan(ctx, ip.logger)
defer flow.responder.exportSpan()

Expand All @@ -199,33 +190,36 @@ func (ip *icmpProxy) handleResponse(ctx context.Context, flow *icmpEchoFlow, buf

n, from, err := flow.originConn.ReadFrom(buf)
if err != nil {
if flow.IsClosed() {
tracing.EndWithErrorStatus(span, fmt.Errorf("flow was closed"))
return true
}
ip.logger.Error().Err(err).Str("socket", flow.originConn.LocalAddr().String()).Msg("Failed to read from ICMP socket")
tracing.EndWithErrorStatus(span, err)
return false, err
return true
}
reply, err := parseReply(from, buf[:n])
if err != nil {
ip.logger.Error().Err(err).Str("dst", from.String()).Msg("Failed to parse ICMP reply")
tracing.EndWithErrorStatus(span, err)
return true, err
return false
}
if !isEchoReply(reply.msg) {
err := fmt.Errorf("Expect ICMP echo reply, got %s", reply.msg.Type)
ip.logger.Debug().Str("dst", from.String()).Msgf("Drop ICMP %s from reply", reply.msg.Type)
tracing.EndWithErrorStatus(span, err)
return true, err
return false
}
span.SetAttributes(
attribute.String("dst", reply.from.String()),
attribute.Int("echoID", reply.echo.ID),
attribute.Int("seq", reply.echo.Seq),
)

if err := flow.returnToSrc(reply); err != nil {
ip.logger.Debug().Err(err).Str("dst", from.String()).Msg("Failed to send ICMP reply")
ip.logger.Error().Err(err).Str("dst", from.String()).Msg("Failed to send ICMP reply")
tracing.EndWithErrorStatus(span, err)
return true, err
return false
}

observeICMPReply(ip.logger, span, from.String(), reply.echo.ID, reply.echo.Seq)
tracing.End(span)
return true, nil
return false
}

// Only linux uses flow3Tuple as FunnelID
Expand Down
8 changes: 8 additions & 0 deletions ingress/icmp_posix.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"net"
"net/netip"
"sync/atomic"

"github.com/google/gopacket/layers"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -46,6 +47,7 @@ type flow3Tuple struct {
type icmpEchoFlow struct {
*packet.ActivityTracker
closeCallback func() error
closed *atomic.Bool
src netip.Addr
originConn *icmp.PacketConn
responder *packetResponder
Expand All @@ -59,6 +61,7 @@ func newICMPEchoFlow(src netip.Addr, closeCallback func() error, originConn *icm
return &icmpEchoFlow{
ActivityTracker: packet.NewActivityTracker(),
closeCallback: closeCallback,
closed: &atomic.Bool{},
src: src,
originConn: originConn,
responder: responder,
Expand Down Expand Up @@ -86,9 +89,14 @@ func (ief *icmpEchoFlow) Equal(other packet.Funnel) bool {
}

func (ief *icmpEchoFlow) Close() error {
ief.closed.Store(true)
return ief.closeCallback()
}

func (ief *icmpEchoFlow) IsClosed() bool {
return ief.closed.Load()
}

// sendToDst rewrites the echo ID to the one assigned to this flow
func (ief *icmpEchoFlow) sendToDst(dst netip.Addr, msg *icmp.Message) error {
ief.UpdateLastActive()
Expand Down
10 changes: 9 additions & 1 deletion ingress/icmp_posix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/fortytw2/leaktest"
"github.com/google/gopacket/layers"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
Expand All @@ -18,6 +19,8 @@ import (
)

func TestFunnelIdleTimeout(t *testing.T) {
defer leaktest.Check(t)()

const (
idleTimeout = time.Second
echoID = 42573
Expand Down Expand Up @@ -73,13 +76,16 @@ func TestFunnelIdleTimeout(t *testing.T) {
require.NoError(t, proxy.Request(ctx, &pk, &newResponder))
validateEchoFlow(t, <-newMuxer.cfdToEdge, &pk)

time.Sleep(idleTimeout * 2)
cancel()
<-proxyDone
}

func TestReuseFunnel(t *testing.T) {
defer leaktest.Check(t)()

const (
idleTimeout = time.Second
idleTimeout = time.Millisecond * 100
echoID = 42573
startSeq = 8129
)
Expand Down Expand Up @@ -135,6 +141,8 @@ func TestReuseFunnel(t *testing.T) {
require.True(t, found)
require.Equal(t, funnel1, funnel2)

time.Sleep(idleTimeout * 2)

cancel()
<-proxyDone
}
17 changes: 7 additions & 10 deletions ingress/icmp_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,7 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
if err != nil {
return err
}
requestSpan.SetAttributes(
attribute.Int("originalEchoID", echo.ID),
attribute.Int("seq", echo.Seq),
)
observeICMPRequest(ip.logger, requestSpan, pk.Src.String(), pk.Dst.String(), echo.ID, echo.Seq)

resp, err := ip.icmpEchoRoundtrip(pk.Dst, echo)
if err != nil {
Expand All @@ -296,17 +293,17 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
responder.exportSpan()

_, replySpan := responder.replySpan(ctx, ip.logger)
replySpan.SetAttributes(
attribute.Int("originalEchoID", echo.ID),
attribute.Int("seq", echo.Seq),
attribute.Int64("rtt", int64(resp.rtt())),
attribute.String("status", resp.status().String()),
)
err = ip.handleEchoReply(pk, echo, resp, responder)
if err != nil {
ip.logger.Err(err).Msg("Failed to send ICMP reply")
tracing.EndWithErrorStatus(replySpan, err)
return errors.Wrap(err, "failed to handle ICMP echo reply")
}
observeICMPReply(ip.logger, replySpan, pk.Dst.String(), echo.ID, echo.Seq)
replySpan.SetAttributes(
attribute.Int64("rtt", int64(resp.rtt())),
attribute.String("status", resp.status().String()),
)
tracing.End(replySpan)
return nil
}
Expand Down
33 changes: 28 additions & 5 deletions ingress/origin_icmp_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"time"

"github.com/rs/zerolog"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/net/icmp"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
Expand All @@ -15,9 +17,7 @@ import (
)

const (
// funnelIdleTimeout controls how long to wait to close a funnel without send/return
funnelIdleTimeout = time.Second * 10
mtu = 1500
mtu = 1500
// icmpRequestTimeoutMs controls how long to wait for a reply
icmpRequestTimeoutMs = 1000
)
Expand All @@ -32,8 +32,9 @@ type icmpRouter struct {
}

// NewICMPRouter doesn't return an error if either ipv4 proxy or ipv6 proxy can be created. The machine might only
// support one of them
func NewICMPRouter(ipv4Addr, ipv6Addr netip.Addr, ipv6Zone string, logger *zerolog.Logger) (*icmpRouter, error) {
// support one of them.
// funnelIdleTimeout controls how long to wait to close a funnel without send/return
func NewICMPRouter(ipv4Addr, ipv6Addr netip.Addr, ipv6Zone string, logger *zerolog.Logger, funnelIdleTimeout time.Duration) (*icmpRouter, error) {
ipv4Proxy, ipv4Err := newICMPProxy(ipv4Addr, "", logger, funnelIdleTimeout)
ipv6Proxy, ipv6Err := newICMPProxy(ipv6Addr, ipv6Zone, logger, funnelIdleTimeout)
if ipv4Err != nil && ipv6Err != nil {
Expand Down Expand Up @@ -102,3 +103,25 @@ func getICMPEcho(msg *icmp.Message) (*icmp.Echo, error) {
func isEchoReply(msg *icmp.Message) bool {
return msg.Type == ipv4.ICMPTypeEchoReply || msg.Type == ipv6.ICMPTypeEchoReply
}

func observeICMPRequest(logger *zerolog.Logger, span trace.Span, src string, dst string, echoID int, seq int) {
logger.Debug().
Str("src", src).
Str("dst", dst).
Int("originalEchoID", echoID).
Int("originalEchoSeq", seq).
Msg("Received ICMP request")
span.SetAttributes(
attribute.Int("originalEchoID", echoID),
attribute.Int("seq", seq),
)
}

func observeICMPReply(logger *zerolog.Logger, span trace.Span, dst string, echoID int, seq int) {
logger.Debug().Str("dst", dst).Int("echoID", echoID).Int("seq", seq).Msg("Sent ICMP reply to edge")
span.SetAttributes(
attribute.String("dst", dst),
attribute.Int("echoID", echoID),
attribute.Int("seq", seq),
)
}
Loading

0 comments on commit 9c1f5c3

Please sign in to comment.