Skip to content

Commit

Permalink
Use connection manager to drive NAT maintenance (#835)
Browse files Browse the repository at this point in the history
Co-authored-by: brad-defined <[email protected]>
  • Loading branch information
nbrownus and brad-defined authored Mar 31, 2023
1 parent 1a6c657 commit ee8e134
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 313 deletions.
320 changes: 130 additions & 190 deletions connection_manager.go

Large diffs are not rendered by default.

89 changes: 48 additions & 41 deletions connection_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/flynn/noise"
"github.com/slackhq/nebula/cert"
"github.com/slackhq/nebula/config"
"github.com/slackhq/nebula/iputil"
"github.com/slackhq/nebula/test"
"github.com/slackhq/nebula/udp"
Expand Down Expand Up @@ -60,16 +61,16 @@ func Test_NewConnectionManagerTest(t *testing.T) {
l: l,
}
ifce.certState.Store(cs)
now := time.Now()

// Create manager
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nc := newConnectionManager(ctx, l, ifce, 5, 10)
punchy := NewPunchyFromConfig(l, config.NewC(l))
nc := newConnectionManager(ctx, l, ifce, 5, 10, punchy)
p := []byte("")
nb := make([]byte, 12, 12)
out := make([]byte, mtu)
nc.HandleMonitorTick(now, p, nb, out)

// Add an ip we have established a connection w/ to hostmap
hostinfo := &HostInfo{
vpnIp: vpnIp,
Expand All @@ -84,26 +85,28 @@ func Test_NewConnectionManagerTest(t *testing.T) {

// We saw traffic out to vpnIp
nc.Out(hostinfo.localIndexId)
nc.In(hostinfo.localIndexId)
assert.NotContains(t, nc.pendingDeletion, hostinfo.localIndexId)
assert.Contains(t, nc.hostMap.Hosts, hostinfo.vpnIp)
assert.Contains(t, nc.hostMap.Indexes, hostinfo.localIndexId)
// Move ahead 5s. Nothing should happen
next_tick := now.Add(5 * time.Second)
nc.HandleMonitorTick(next_tick, p, nb, out)
nc.HandleDeletionTick(next_tick)
// Move ahead 6s. We haven't heard back
next_tick = now.Add(6 * time.Second)
nc.HandleMonitorTick(next_tick, p, nb, out)
nc.HandleDeletionTick(next_tick)
// This host should now be up for deletion
assert.Contains(t, nc.out, hostinfo.localIndexId)

// Do a traffic check tick, should not be pending deletion but should not have any in/out packets recorded
nc.doTrafficCheck(hostinfo.localIndexId, p, nb, out, time.Now())
assert.NotContains(t, nc.pendingDeletion, hostinfo.localIndexId)
assert.NotContains(t, nc.out, hostinfo.localIndexId)
assert.NotContains(t, nc.in, hostinfo.localIndexId)

// Do another traffic check tick, this host should be pending deletion now
nc.doTrafficCheck(hostinfo.localIndexId, p, nb, out, time.Now())
assert.Contains(t, nc.pendingDeletion, hostinfo.localIndexId)
assert.Contains(t, nc.hostMap.Hosts, hostinfo.vpnIp)
assert.NotContains(t, nc.out, hostinfo.localIndexId)
assert.NotContains(t, nc.in, hostinfo.localIndexId)
assert.Contains(t, nc.hostMap.Indexes, hostinfo.localIndexId)
// Move ahead some more
next_tick = now.Add(45 * time.Second)
nc.HandleMonitorTick(next_tick, p, nb, out)
nc.HandleDeletionTick(next_tick)
// The host should be evicted
assert.Contains(t, nc.hostMap.Hosts, hostinfo.vpnIp)

// Do a final traffic check tick, the host should now be removed
nc.doTrafficCheck(hostinfo.localIndexId, p, nb, out, time.Now())
assert.NotContains(t, nc.pendingDeletion, hostinfo.localIndexId)
assert.NotContains(t, nc.hostMap.Hosts, hostinfo.vpnIp)
assert.NotContains(t, nc.hostMap.Indexes, hostinfo.localIndexId)
Expand Down Expand Up @@ -136,16 +139,16 @@ func Test_NewConnectionManagerTest2(t *testing.T) {
l: l,
}
ifce.certState.Store(cs)
now := time.Now()

// Create manager
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nc := newConnectionManager(ctx, l, ifce, 5, 10)
punchy := NewPunchyFromConfig(l, config.NewC(l))
nc := newConnectionManager(ctx, l, ifce, 5, 10, punchy)
p := []byte("")
nb := make([]byte, 12, 12)
out := make([]byte, mtu)
nc.HandleMonitorTick(now, p, nb, out)

// Add an ip we have established a connection w/ to hostmap
hostinfo := &HostInfo{
vpnIp: vpnIp,
Expand All @@ -160,30 +163,33 @@ func Test_NewConnectionManagerTest2(t *testing.T) {

// We saw traffic out to vpnIp
nc.Out(hostinfo.localIndexId)
assert.NotContains(t, nc.pendingDeletion, vpnIp)
assert.Contains(t, nc.hostMap.Hosts, vpnIp)
// Move ahead 5s. Nothing should happen
next_tick := now.Add(5 * time.Second)
nc.HandleMonitorTick(next_tick, p, nb, out)
nc.HandleDeletionTick(next_tick)
// Move ahead 6s. We haven't heard back
next_tick = now.Add(6 * time.Second)
nc.HandleMonitorTick(next_tick, p, nb, out)
nc.HandleDeletionTick(next_tick)
// This host should now be up for deletion
nc.In(hostinfo.localIndexId)
assert.NotContains(t, nc.pendingDeletion, hostinfo.vpnIp)
assert.Contains(t, nc.hostMap.Hosts, hostinfo.vpnIp)
assert.Contains(t, nc.hostMap.Indexes, hostinfo.localIndexId)

// Do a traffic check tick, should not be pending deletion but should not have any in/out packets recorded
nc.doTrafficCheck(hostinfo.localIndexId, p, nb, out, time.Now())
assert.NotContains(t, nc.pendingDeletion, hostinfo.localIndexId)
assert.NotContains(t, nc.out, hostinfo.localIndexId)
assert.NotContains(t, nc.in, hostinfo.localIndexId)

// Do another traffic check tick, this host should be pending deletion now
nc.doTrafficCheck(hostinfo.localIndexId, p, nb, out, time.Now())
assert.Contains(t, nc.pendingDeletion, hostinfo.localIndexId)
assert.Contains(t, nc.hostMap.Hosts, vpnIp)
assert.NotContains(t, nc.out, hostinfo.localIndexId)
assert.NotContains(t, nc.in, hostinfo.localIndexId)
assert.Contains(t, nc.hostMap.Indexes, hostinfo.localIndexId)
// We heard back this time
assert.Contains(t, nc.hostMap.Hosts, hostinfo.vpnIp)

// We saw traffic, should no longer be pending deletion
nc.In(hostinfo.localIndexId)
// Move ahead some more
next_tick = now.Add(45 * time.Second)
nc.HandleMonitorTick(next_tick, p, nb, out)
nc.HandleDeletionTick(next_tick)
// The host should not be evicted
nc.doTrafficCheck(hostinfo.localIndexId, p, nb, out, time.Now())
assert.NotContains(t, nc.pendingDeletion, hostinfo.localIndexId)
assert.Contains(t, nc.hostMap.Hosts, hostinfo.vpnIp)
assert.NotContains(t, nc.out, hostinfo.localIndexId)
assert.NotContains(t, nc.in, hostinfo.localIndexId)
assert.Contains(t, nc.hostMap.Indexes, hostinfo.localIndexId)
assert.Contains(t, nc.hostMap.Hosts, hostinfo.vpnIp)
}

// Check if we can disconnect the peer.
Expand Down Expand Up @@ -257,7 +263,8 @@ func Test_NewConnectionManagerTest_DisconnectInvalid(t *testing.T) {
// Create manager
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nc := newConnectionManager(ctx, l, ifce, 5, 10)
punchy := NewPunchyFromConfig(l, config.NewC(l))
nc := newConnectionManager(ctx, l, ifce, 5, 10, punchy)
ifce.connectionManager = nc
hostinfo, _ := nc.hostMap.AddVpnIp(vpnIp, nil)
hostinfo.ConnectionState = &ConnectionState{
Expand Down
15 changes: 3 additions & 12 deletions handshake_ix.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,7 @@ func ixHandshakeStage1(f *Interface, addr *udp.Addr, via interface{}, packet []b
Info("Handshake message sent")
}

if existing != nil {
// Make sure we are tracking the old primary if there was one, it needs to go away eventually
f.connectionManager.Out(existing.localIndexId)
}

f.connectionManager.Out(hostinfo.localIndexId)
f.connectionManager.AddTrafficWatch(hostinfo.localIndexId)
hostinfo.handshakeComplete(f.l, f.cachedPacketMetrics)

return
Expand Down Expand Up @@ -495,12 +490,8 @@ func ixHandshakeStage2(f *Interface, addr *udp.Addr, via interface{}, hostinfo *
hostinfo.CreateRemoteCIDR(remoteCert)

// Complete our handshake and update metrics, this will replace any existing tunnels for this vpnIp
existing := f.handshakeManager.Complete(hostinfo, f)
if existing != nil {
// Make sure we are tracking the old primary if there was one, it needs to go away eventually
f.connectionManager.Out(existing.localIndexId)
}

f.handshakeManager.Complete(hostinfo, f)
f.connectionManager.AddTrafficWatch(hostinfo.localIndexId)
hostinfo.handshakeComplete(f.l, f.cachedPacketMetrics)
f.metricHandshakes.Update(duration)

Expand Down
4 changes: 1 addition & 3 deletions handshake_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (c *HandshakeManager) CheckAndComplete(hostinfo *HostInfo, handshakePacket
// Complete is a simpler version of CheckAndComplete when we already know we
// won't have a localIndexId collision because we already have an entry in the
// pendingHostMap. An existing hostinfo is returned if there was one.
func (c *HandshakeManager) Complete(hostinfo *HostInfo, f *Interface) *HostInfo {
func (c *HandshakeManager) Complete(hostinfo *HostInfo, f *Interface) {
c.pendingHostMap.Lock()
defer c.pendingHostMap.Unlock()
c.mainHostMap.Lock()
Expand All @@ -395,11 +395,9 @@ func (c *HandshakeManager) Complete(hostinfo *HostInfo, f *Interface) *HostInfo
Info("New host shadows existing host remoteIndex")
}

existingHostInfo := c.mainHostMap.Hosts[hostinfo.vpnIp]
// We need to remove from the pending hostmap first to avoid undoing work when after to the main hostmap.
c.pendingHostMap.unlockedDeleteHostInfo(hostinfo)
c.mainHostMap.unlockedAddHostInfo(hostinfo, f)
return existingHostInfo
}

// AddIndexHostInfo generates a unique localIndexId for this HostInfo
Expand Down
49 changes: 0 additions & 49 deletions hostmap.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package nebula

import (
"context"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -621,54 +620,6 @@ func (hm *HostMap) unlockedAddHostInfo(hostinfo *HostInfo, f *Interface) {
}
}

// punchList assembles a list of all non nil RemoteList pointer entries in this hostmap
// The caller can then do the its work outside of the read lock
func (hm *HostMap) punchList(rl []*RemoteList) []*RemoteList {
hm.RLock()
defer hm.RUnlock()

for _, v := range hm.Hosts {
if v.remotes != nil {
rl = append(rl, v.remotes)
}
}
return rl
}

// Punchy iterates through the result of punchList() to assemble all known addresses and sends a hole punch packet to them
func (hm *HostMap) Punchy(ctx context.Context, conn *udp.Conn) {
var metricsTxPunchy metrics.Counter
if hm.metricsEnabled {
metricsTxPunchy = metrics.GetOrRegisterCounter("messages.tx.punchy", nil)
} else {
metricsTxPunchy = metrics.NilCounter{}
}

var remotes []*RemoteList
b := []byte{1}

clockSource := time.NewTicker(time.Second * 10)
defer clockSource.Stop()

for {
remotes = hm.punchList(remotes[:0])
for _, rl := range remotes {
//TODO: CopyAddrs generates garbage but ForEach locks for the work here, figure out which way is better
for _, addr := range rl.CopyAddrs(hm.preferredRanges) {
metricsTxPunchy.Inc(1)
conn.WriteTo(b, addr)
}
}

select {
case <-ctx.Done():
return
case <-clockSource.C:
continue
}
}
}

// TryPromoteBest handles re-querying lighthouses and probing for better paths
// NOTE: It is an error to call this if you are a lighthouse since they should not roam clients!
func (i *HostInfo) TryPromoteBest(preferredRanges []*net.IPNet, ifce *Interface) {
Expand Down
7 changes: 4 additions & 3 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ type InterfaceConfig struct {
ServeDns bool
HandshakeManager *HandshakeManager
lightHouse *LightHouse
checkInterval int
pendingDeletionInterval int
checkInterval time.Duration
pendingDeletionInterval time.Duration
DropLocalBroadcast bool
DropMulticast bool
routines int
Expand All @@ -43,6 +43,7 @@ type InterfaceConfig struct {
caPool *cert.NebulaCAPool
disconnectInvalid bool
relayManager *relayManager
punchy *Punchy

ConntrackCacheTimeout time.Duration
l *logrus.Logger
Expand Down Expand Up @@ -172,7 +173,7 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) {
}

ifce.certState.Store(c.certState)
ifce.connectionManager = newConnectionManager(ctx, c.l, ifce, c.checkInterval, c.pendingDeletionInterval)
ifce.connectionManager = newConnectionManager(ctx, c.l, ifce, c.checkInterval, c.pendingDeletionInterval, c.punchy)

return ifce, nil
}
Expand Down
10 changes: 3 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,6 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg
*/

punchy := NewPunchyFromConfig(l, c)
if punchy.GetPunch() && !configTest {
l.Info("UDP hole punching enabled")
go hostMap.Punchy(ctx, udpConns[0])
}

lightHouse, err := NewLightHouseFromConfig(l, c, tunCidr, udpConns[0], punchy)
switch {
case errors.As(err, &util.ContextualError{}):
Expand Down Expand Up @@ -272,8 +267,8 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg
ServeDns: serveDns,
HandshakeManager: handshakeManager,
lightHouse: lightHouse,
checkInterval: checkInterval,
pendingDeletionInterval: pendingDeletionInterval,
checkInterval: time.Second * time.Duration(checkInterval),
pendingDeletionInterval: time.Second * time.Duration(pendingDeletionInterval),
DropLocalBroadcast: c.GetBool("tun.drop_local_broadcast", false),
DropMulticast: c.GetBool("tun.drop_multicast", false),
routines: routines,
Expand All @@ -282,6 +277,7 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg
caPool: caPool,
disconnectInvalid: c.GetBool("pki.disconnect_invalid", false),
relayManager: NewRelayManager(ctx, l, hostMap, c),
punchy: punchy,

ConntrackCacheTimeout: conntrackCacheTimeout,
l: l,
Expand Down
3 changes: 0 additions & 3 deletions outside.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,6 @@ func (f *Interface) readOutsidePackets(addr *udp.Addr, via interface{}, out []by

// closeTunnel closes a tunnel locally, it does not send a closeTunnel packet to the remote
func (f *Interface) closeTunnel(hostInfo *HostInfo) {
//TODO: this would be better as a single function in ConnectionManager that handled locks appropriately
f.connectionManager.ClearLocalIndex(hostInfo.localIndexId)
f.connectionManager.ClearPendingDeletion(hostInfo.localIndexId)
final := f.hostMap.DeleteHostInfo(hostInfo)
if final {
// We no longer have any tunnels with this vpn ip, clear learned lighthouse state to lower memory usage
Expand Down
29 changes: 24 additions & 5 deletions punchy.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ import (
)

type Punchy struct {
punch atomic.Bool
respond atomic.Bool
delay atomic.Int64
respondDelay atomic.Int64
l *logrus.Logger
punch atomic.Bool
respond atomic.Bool
delay atomic.Int64
respondDelay atomic.Int64
punchEverything atomic.Bool
l *logrus.Logger
}

func NewPunchyFromConfig(l *logrus.Logger, c *config.C) *Punchy {
Expand All @@ -38,6 +39,12 @@ func (p *Punchy) reload(c *config.C, initial bool) {
}

p.punch.Store(yes)
if yes {
p.l.Info("punchy enabled")
} else {
p.l.Info("punchy disabled")
}

} else if c.HasChanged("punchy.punch") || c.HasChanged("punchy") {
//TODO: it should be relatively easy to support this, just need to be able to cancel the goroutine and boot it up from here
p.l.Warn("Changing punchy.punch with reload is not supported, ignoring.")
Expand Down Expand Up @@ -66,6 +73,14 @@ func (p *Punchy) reload(c *config.C, initial bool) {
p.l.Infof("punchy.delay changed to %s", p.GetDelay())
}
}

if initial || c.HasChanged("punchy.target_all_remotes") {
p.punchEverything.Store(c.GetBool("punchy.target_all_remotes", true))
if !initial {
p.l.WithField("target_all_remotes", p.GetTargetEverything()).Info("punchy.target_all_remotes changed")
}
}

if initial || c.HasChanged("punchy.respond_delay") {
p.respondDelay.Store((int64)(c.GetDuration("punchy.respond_delay", 5*time.Second)))
if !initial {
Expand All @@ -89,3 +104,7 @@ func (p *Punchy) GetDelay() time.Duration {
func (p *Punchy) GetRespondDelay() time.Duration {
return (time.Duration)(p.respondDelay.Load())
}

func (p *Punchy) GetTargetEverything() bool {
return p.punchEverything.Load()
}

0 comments on commit ee8e134

Please sign in to comment.