diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index a24c7ed..fb69112 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -43,6 +43,12 @@ jobs:
- name: End 2 end
run: make e2evv
+ - uses: actions/upload-artifact@v3
+ with:
+ name: e2e packet flow
+ path: e2e/mermaid/
+ if-no-files-found: warn
+
test:
name: Build and test on ${{ matrix.os }}
runs-on: ${{ matrix.os }}
@@ -78,3 +84,9 @@ jobs:
- name: End 2 end
run: make e2evv
+
+ - uses: actions/upload-artifact@v3
+ with:
+ name: e2e packet flow
+ path: e2e/mermaid/
+ if-no-files-found: warn
diff --git a/.gitignore b/.gitignore
index 82b3460..5560418 100644
--- a/.gitignore
+++ b/.gitignore
@@ -10,3 +10,4 @@
/cpu.pprof
/build
/*.tar.gz
+/e2e/mermaid/
diff --git a/control_tester.go b/control_tester.go
index 00691de..7852943 100644
--- a/control_tester.go
+++ b/control_tester.go
@@ -63,6 +63,24 @@ func (c *Control) InjectLightHouseAddr(vpnIp net.IP, toAddr *net.UDPAddr) {
}
}
+// InjectRelays will push relayVpnIps into the local lighthouse cache for the vpnIp
+// This is necessary to inform an initiator of possible relays for communicating with a responder
+func (c *Control) InjectRelays(vpnIp net.IP, relayVpnIps []net.IP) {
+ c.f.lightHouse.Lock()
+ remoteList := c.f.lightHouse.unlockedGetRemoteList(iputil.Ip2VpnIp(vpnIp))
+ remoteList.Lock()
+ defer remoteList.Unlock()
+ c.f.lightHouse.Unlock()
+
+ iVpnIp := iputil.Ip2VpnIp(vpnIp)
+ uVpnIp := []uint32{}
+ for _, rVPnIp := range relayVpnIps {
+ uVpnIp = append(uVpnIp, uint32(iputil.Ip2VpnIp(rVPnIp)))
+ }
+
+ remoteList.unlockedSetRelay(iVpnIp, iVpnIp, uVpnIp)
+}
+
// GetFromTun will pull a packet off the tun side of nebula
func (c *Control) GetFromTun(block bool) []byte {
return c.f.inside.(*overlay.TestTun).Get(block)
@@ -118,6 +136,10 @@ func (c *Control) InjectTunUDPPacket(toIp net.IP, toPort uint16, fromPort uint16
c.f.inside.(*overlay.TestTun).Send(buffer.Bytes())
}
+func (c *Control) GetVpnIp() iputil.VpnIp {
+ return c.f.myVpnIp
+}
+
func (c *Control) GetUDPAddr() string {
return c.f.outside.Addr.String()
}
diff --git a/e2e/handshakes_test.go b/e2e/handshakes_test.go
index 9f239c2..0535204 100644
--- a/e2e/handshakes_test.go
+++ b/e2e/handshakes_test.go
@@ -18,8 +18,8 @@ import (
func TestGoodHandshake(t *testing.T) {
ca, _, caKey, _ := newTestCaCert(time.Now(), time.Now().Add(10*time.Minute), []*net.IPNet{}, []*net.IPNet{}, []string{})
- myControl, myVpnIp, myUdpAddr := newSimpleServer(ca, caKey, "me", net.IP{10, 0, 0, 1})
- theirControl, theirVpnIp, theirUdpAddr := newSimpleServer(ca, caKey, "them", net.IP{10, 0, 0, 2})
+ myControl, myVpnIp, myUdpAddr := newSimpleServer(ca, caKey, "me", net.IP{10, 0, 0, 1}, nil)
+ theirControl, theirVpnIp, theirUdpAddr := newSimpleServer(ca, caKey, "them", net.IP{10, 0, 0, 2}, nil)
// Put their info in our lighthouse
myControl.InjectLightHouseAddr(theirVpnIp, theirUdpAddr)
@@ -57,7 +57,9 @@ func TestGoodHandshake(t *testing.T) {
assertUdpPacket(t, []byte("Hi from me"), myCachedPacket, myVpnIp, theirVpnIp, 80, 80)
t.Log("Do a bidirectional tunnel test")
- assertTunnel(t, myVpnIp, theirVpnIp, myControl, theirControl, router.NewR(myControl, theirControl))
+ r := router.NewR(t, myControl, theirControl)
+ defer r.RenderFlow()
+ assertTunnel(t, myVpnIp, theirVpnIp, myControl, theirControl, r)
myControl.Stop()
theirControl.Stop()
@@ -70,9 +72,9 @@ func TestWrongResponderHandshake(t *testing.T) {
// The IPs here are chosen on purpose:
// The current remote handling will sort by preference, public, and then lexically.
// So we need them to have a higher address than evil (we could apply a preference though)
- myControl, myVpnIp, myUdpAddr := newSimpleServer(ca, caKey, "me", net.IP{10, 0, 0, 100})
- theirControl, theirVpnIp, theirUdpAddr := newSimpleServer(ca, caKey, "them", net.IP{10, 0, 0, 99})
- evilControl, evilVpnIp, evilUdpAddr := newSimpleServer(ca, caKey, "evil", net.IP{10, 0, 0, 2})
+ myControl, myVpnIp, myUdpAddr := newSimpleServer(ca, caKey, "me", net.IP{10, 0, 0, 100}, nil)
+ theirControl, theirVpnIp, theirUdpAddr := newSimpleServer(ca, caKey, "them", net.IP{10, 0, 0, 99}, nil)
+ evilControl, evilVpnIp, evilUdpAddr := newSimpleServer(ca, caKey, "evil", net.IP{10, 0, 0, 2}, nil)
// Add their real udp addr, which should be tried after evil.
myControl.InjectLightHouseAddr(theirVpnIp, theirUdpAddr)
@@ -81,7 +83,8 @@ func TestWrongResponderHandshake(t *testing.T) {
myControl.InjectLightHouseAddr(theirVpnIp, evilUdpAddr)
// Build a router so we don't have to reason who gets which packet
- r := router.NewR(myControl, theirControl, evilControl)
+ r := router.NewR(t, myControl, theirControl, evilControl)
+ defer r.RenderFlow()
// Start the servers
myControl.Start()
@@ -130,15 +133,16 @@ func TestWrongResponderHandshake(t *testing.T) {
func Test_Case1_Stage1Race(t *testing.T) {
ca, _, caKey, _ := newTestCaCert(time.Now(), time.Now().Add(10*time.Minute), []*net.IPNet{}, []*net.IPNet{}, []string{})
- myControl, myVpnIp, myUdpAddr := newSimpleServer(ca, caKey, "me ", net.IP{10, 0, 0, 1})
- theirControl, theirVpnIp, theirUdpAddr := newSimpleServer(ca, caKey, "them", net.IP{10, 0, 0, 2})
+ myControl, myVpnIp, myUdpAddr := newSimpleServer(ca, caKey, "me ", net.IP{10, 0, 0, 1}, nil)
+ theirControl, theirVpnIp, theirUdpAddr := newSimpleServer(ca, caKey, "them", net.IP{10, 0, 0, 2}, nil)
// Put their info in our lighthouse and vice versa
myControl.InjectLightHouseAddr(theirVpnIp, theirUdpAddr)
theirControl.InjectLightHouseAddr(myVpnIp, myUdpAddr)
// Build a router so we don't have to reason who gets which packet
- r := router.NewR(myControl, theirControl)
+ r := router.NewR(t, myControl, theirControl)
+ defer r.RenderFlow()
// Start the servers
myControl.Start()
@@ -152,16 +156,16 @@ func Test_Case1_Stage1Race(t *testing.T) {
myHsForThem := myControl.GetFromUDP(true)
theirHsForMe := theirControl.GetFromUDP(true)
- t.Log("Now inject both stage 1 handshake packets")
- myControl.InjectUDPPacket(theirHsForMe)
- theirControl.InjectUDPPacket(myHsForThem)
+ r.Log("Now inject both stage 1 handshake packets")
+ r.InjectUDPPacket(theirControl, myControl, theirHsForMe)
+ r.InjectUDPPacket(myControl, theirControl, myHsForThem)
//TODO: they should win, grab their index for me and make sure I use it in the end.
- t.Log("They should not have a stage 2 (won the race) but I should send one")
- theirControl.InjectUDPPacket(myControl.GetFromUDP(true))
+ r.Log("They should not have a stage 2 (won the race) but I should send one")
+ r.InjectUDPPacket(myControl, theirControl, myControl.GetFromUDP(true))
- t.Log("Route for me until I send a message packet to them")
- myControl.WaitForType(1, 0, theirControl)
+ r.Log("Route for me until I send a message packet to them")
+ r.RouteForAllUntilAfterMsgTypeTo(theirControl, header.Message, header.MessageNone)
t.Log("My cached packet should be received by them")
myCachedPacket := theirControl.GetFromTun(true)
@@ -182,4 +186,32 @@ func Test_Case1_Stage1Race(t *testing.T) {
//TODO: assert hostmaps
}
+func TestRelays(t *testing.T) {
+ ca, _, caKey, _ := newTestCaCert(time.Now(), time.Now().Add(10*time.Minute), []*net.IPNet{}, []*net.IPNet{}, []string{})
+ myControl, myVpnIp, _ := newSimpleServer(ca, caKey, "me ", net.IP{10, 0, 0, 1}, m{"relay": m{"use_relays": true}})
+ relayControl, relayVpnIp, relayUdpAddr := newSimpleServer(ca, caKey, "relay ", net.IP{10, 0, 0, 128}, m{"relay": m{"am_relay": true}})
+ theirControl, theirVpnIp, theirUdpAddr := newSimpleServer(ca, caKey, "them ", net.IP{10, 0, 0, 2}, m{"relay": m{"use_relays": true}})
+
+ // Teach my how to get to the relay and that their can be reached via the relay
+ myControl.InjectLightHouseAddr(relayVpnIp, relayUdpAddr)
+ myControl.InjectRelays(theirVpnIp, []net.IP{relayVpnIp})
+ relayControl.InjectLightHouseAddr(theirVpnIp, theirUdpAddr)
+
+ // Build a router so we don't have to reason who gets which packet
+ r := router.NewR(t, myControl, relayControl, theirControl)
+ defer r.RenderFlow()
+
+ // Start the servers
+ myControl.Start()
+ relayControl.Start()
+ theirControl.Start()
+
+ t.Log("Trigger a handshake from me to them via the relay")
+ myControl.InjectTunUDPPacket(theirVpnIp, 80, 80, []byte("Hi from me"))
+
+ p := r.RouteForAllUntilTxTun(theirControl)
+ assertUdpPacket(t, []byte("Hi from me"), p, myVpnIp, theirVpnIp, 80, 80)
+ //TODO: assert we actually used the relay even though it should be impossible for a tunnel to have occurred without it
+}
+
//TODO: add a test with many lies
diff --git a/e2e/helpers_test.go b/e2e/helpers_test.go
index 843e08c..2edcd0c 100644
--- a/e2e/helpers_test.go
+++ b/e2e/helpers_test.go
@@ -15,6 +15,7 @@ import (
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
+ "github.com/imdario/mergo"
"github.com/sirupsen/logrus"
"github.com/slackhq/nebula"
"github.com/slackhq/nebula/cert"
@@ -30,7 +31,7 @@ import (
type m map[string]interface{}
// newSimpleServer creates a nebula instance with many assumptions
-func newSimpleServer(caCrt *cert.NebulaCertificate, caKey []byte, name string, udpIp net.IP) (*nebula.Control, net.IP, *net.UDPAddr) {
+func newSimpleServer(caCrt *cert.NebulaCertificate, caKey []byte, name string, udpIp net.IP, overrides m) (*nebula.Control, net.IP, *net.UDPAddr) {
l := NewTestLogger()
vpnIpNet := &net.IPNet{IP: make([]byte, len(udpIp)), Mask: net.IPMask{255, 255, 255, 0}}
@@ -78,6 +79,15 @@ func newSimpleServer(caCrt *cert.NebulaCertificate, caKey []byte, name string, u
"level": l.Level.String(),
},
}
+
+ if overrides != nil {
+ err = mergo.Merge(&overrides, mc, mergo.WithAppendSlice)
+ if err != nil {
+ panic(err)
+ }
+ mc = overrides
+ }
+
cb, err := yaml.Marshal(mc)
if err != nil {
panic(err)
diff --git a/e2e/router/router.go b/e2e/router/router.go
index ac8e02c..f55034a 100644
--- a/e2e/router/router.go
+++ b/e2e/router/router.go
@@ -4,14 +4,23 @@
package router
import (
+ "context"
"fmt"
"net"
+ "os"
+ "path/filepath"
"reflect"
"strconv"
+ "strings"
"sync"
+ "testing"
+ "time"
+ "github.com/google/gopacket"
+ "github.com/google/gopacket/layers"
"github.com/slackhq/nebula"
"github.com/slackhq/nebula/header"
+ "github.com/slackhq/nebula/iputil"
"github.com/slackhq/nebula/udp"
)
@@ -28,38 +37,93 @@ type R struct {
// map[from address + ":" + to address] => ip:port to rewrite in the udp packet to receiver
outNat map[string]net.UDPAddr
+ // A map of vpn ip to the nebula control it belongs to
+ vpnControls map[iputil.VpnIp]*nebula.Control
+
+ flow []flowEntry
+
// All interactions are locked to help serialize behavior
sync.Mutex
+
+ fn string
+ cancelRender context.CancelFunc
+ t *testing.T
+}
+
+type flowEntry struct {
+ note string
+ packet *packet
+}
+
+type packet struct {
+ from *nebula.Control
+ to *nebula.Control
+ packet *udp.Packet
+ tun bool // a packet pulled off a tun device
+ rx bool // the packet was received by a udp device
}
type ExitType int
const (
- // Keeps routing, the function will get called again on the next packet
+ // KeepRouting the function will get called again on the next packet
KeepRouting ExitType = 0
- // Does not route this packet and exits immediately
+ // ExitNow does not route this packet and exits immediately
ExitNow ExitType = 1
- // Routes this packet and exits immediately afterwards
+ // RouteAndExit routes this packet and exits immediately afterwards
RouteAndExit ExitType = 2
)
type ExitFunc func(packet *udp.Packet, receiver *nebula.Control) ExitType
-func NewR(controls ...*nebula.Control) *R {
+// NewR creates a new router to pass packets in a controlled fashion between the provided controllers.
+// The packet flow will be recorded in a file within the mermaid directory under the same name as the test.
+// Renders will occur automatically, roughly every 100ms, until a call to RenderFlow() is made
+func NewR(t *testing.T, controls ...*nebula.Control) *R {
+ ctx, cancel := context.WithCancel(context.Background())
+
+ if err := os.MkdirAll("mermaid", 0755); err != nil {
+ panic(err)
+ }
+
r := &R{
- controls: make(map[string]*nebula.Control),
- inNat: make(map[string]*nebula.Control),
- outNat: make(map[string]net.UDPAddr),
+ controls: make(map[string]*nebula.Control),
+ vpnControls: make(map[iputil.VpnIp]*nebula.Control),
+ inNat: make(map[string]*nebula.Control),
+ outNat: make(map[string]net.UDPAddr),
+ fn: filepath.Join("mermaid", fmt.Sprintf("%s.md", t.Name())),
+ t: t,
+ cancelRender: cancel,
}
+ // Try to remove our render file
+ os.Remove(r.fn)
+
for _, c := range controls {
addr := c.GetUDPAddr()
if _, ok := r.controls[addr]; ok {
panic("Duplicate listen address: " + addr)
}
+
+ r.vpnControls[c.GetVpnIp()] = c
r.controls[addr] = c
}
+ // Spin the renderer in case we go nuts and the test never completes
+ go func() {
+ clockSource := time.NewTicker(time.Millisecond * 100)
+ defer clockSource.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-clockSource.C:
+ r.renderFlow()
+ }
+ }
+ }()
+
return r
}
@@ -78,6 +142,112 @@ func (r *R) AddRoute(ip net.IP, port uint16, c *nebula.Control) {
r.inNat[inAddr] = c
}
+// RenderFlow renders the packet flow seen up until now and stops further automatic renders from happening.
+func (r *R) RenderFlow() {
+ r.cancelRender()
+ r.renderFlow()
+}
+
+func (r *R) renderFlow() {
+ f, err := os.OpenFile(r.fn, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0644)
+ if err != nil {
+ panic(err)
+ }
+
+ var participants = map[string]struct{}{}
+ var participansVals []string
+
+ fmt.Fprintln(f, "```mermaid")
+ fmt.Fprintln(f, "sequenceDiagram")
+
+ // Assemble participants
+ for _, e := range r.flow {
+ if e.packet == nil {
+ continue
+ }
+
+ addr := e.packet.from.GetUDPAddr()
+ if _, ok := participants[addr]; ok {
+ continue
+ }
+ participants[addr] = struct{}{}
+ sanAddr := strings.Replace(addr, ":", "#58;", 1)
+ participansVals = append(participansVals, sanAddr)
+ fmt.Fprintf(
+ f, " participant %s as Nebula: %s
UDP: %s\n",
+ sanAddr, e.packet.from.GetVpnIp(), sanAddr,
+ )
+ }
+
+ // Print packets
+ h := &header.H{}
+ for _, e := range r.flow {
+ if e.packet == nil {
+ fmt.Fprintf(f, " note over %s: %s\n", strings.Join(participansVals, ", "), e.note)
+ continue
+ }
+
+ p := e.packet
+ if p.tun {
+ fmt.Fprintln(f, r.formatUdpPacket(p))
+
+ } else {
+ if err := h.Parse(p.packet.Data); err != nil {
+ panic(err)
+ }
+
+ line := "--x"
+ if p.rx {
+ line = "->>"
+ }
+
+ fmt.Fprintf(f,
+ " %s%s%s: %s(%s), counter: %v\n",
+ strings.Replace(p.from.GetUDPAddr(), ":", "#58;", 1),
+ line,
+ strings.Replace(p.to.GetUDPAddr(), ":", "#58;", 1),
+ h.TypeName(), h.SubTypeName(), h.MessageCounter,
+ )
+ }
+ }
+ fmt.Fprintln(f, "```")
+}
+
+// InjectFlow can be used to record packet flow if the test is handling the routing on its own.
+// The packet is assumed to have been received
+func (r *R) InjectFlow(from, to *nebula.Control, p *udp.Packet) {
+ r.Lock()
+ defer r.Unlock()
+ r.unlockedInjectFlow(from, to, p, false)
+}
+
+func (r *R) Log(arg ...any) {
+ r.Lock()
+ r.flow = append(r.flow, flowEntry{note: fmt.Sprint(arg...)})
+ r.t.Log(arg...)
+ r.Unlock()
+}
+
+func (r *R) Logf(format string, arg ...any) {
+ r.Lock()
+ r.flow = append(r.flow, flowEntry{note: fmt.Sprintf(format, arg...)})
+ r.t.Logf(format, arg...)
+ r.Unlock()
+}
+
+// unlockedInjectFlow is used by the router to record a packet has been transmitted, the packet is returned and
+// should be marked as received AFTER it has been placed on the receivers channel
+func (r *R) unlockedInjectFlow(from, to *nebula.Control, p *udp.Packet, tun bool) *packet {
+ fp := &packet{
+ from: from,
+ to: to,
+ packet: p.Copy(),
+ tun: tun,
+ }
+ r.flow = append(r.flow, flowEntry{packet: fp})
+ return fp
+}
+
// OnceFrom will route a single packet from sender then return
// If the router doesn't have the nebula controller for that address, we panic
func (r *R) OnceFrom(sender *nebula.Control) {
@@ -96,6 +266,11 @@ func (r *R) RouteUntilTxTun(sender *nebula.Control, receiver *nebula.Control) []
select {
// Maybe we already have something on the tun for us
case b := <-tunTx:
+ r.Lock()
+ np := udp.Packet{Data: make([]byte, len(b))}
+ copy(np.Data, b)
+ r.unlockedInjectFlow(receiver, receiver, &np, true)
+ r.Unlock()
return b
// Nope, lets push the sender along
@@ -108,13 +283,73 @@ func (r *R) RouteUntilTxTun(sender *nebula.Control, receiver *nebula.Control) []
r.Unlock()
panic("No control for udp tx")
}
-
+ fp := r.unlockedInjectFlow(sender, c, p, false)
c.InjectUDPPacket(p)
+ fp.rx = true
r.Unlock()
}
}
}
+// RouteForAllUntilTxTun will route for everyone and return when a packet is seen on receivers tun
+// If the router doesn't have the nebula controller for that address, we panic
+func (r *R) RouteForAllUntilTxTun(receiver *nebula.Control) []byte {
+ sc := make([]reflect.SelectCase, len(r.controls)+1)
+ cm := make([]*nebula.Control, len(r.controls)+1)
+
+ i := 0
+ sc[i] = reflect.SelectCase{
+ Dir: reflect.SelectRecv,
+ Chan: reflect.ValueOf(receiver.GetTunTxChan()),
+ Send: reflect.Value{},
+ }
+ cm[i] = receiver
+
+ i++
+ for _, c := range r.controls {
+ sc[i] = reflect.SelectCase{
+ Dir: reflect.SelectRecv,
+ Chan: reflect.ValueOf(c.GetUDPTxChan()),
+ Send: reflect.Value{},
+ }
+
+ cm[i] = c
+ i++
+ }
+
+ for {
+ x, rx, _ := reflect.Select(sc)
+ r.Lock()
+
+ if x == 0 {
+ // we are the tun tx, we can exit
+ p := rx.Interface().([]byte)
+ np := udp.Packet{Data: make([]byte, len(p))}
+ copy(np.Data, p)
+
+ r.unlockedInjectFlow(cm[x], cm[x], &np, true)
+ r.Unlock()
+ return p
+
+ } else {
+ // we are a udp tx, route and continue
+ p := rx.Interface().(*udp.Packet)
+ outAddr := cm[x].GetUDPAddr()
+
+ inAddr := net.JoinHostPort(p.ToIp.String(), fmt.Sprintf("%v", p.ToPort))
+ c := r.getControl(outAddr, inAddr, p)
+ if c == nil {
+ r.Unlock()
+ panic("No control for udp tx")
+ }
+ fp := r.unlockedInjectFlow(cm[x], c, p, false)
+ c.InjectUDPPacket(p)
+ fp.rx = true
+ }
+ r.Unlock()
+ }
+}
+
// RouteExitFunc will call the whatDo func with each udp packet from sender.
// whatDo can return:
// - exitNow: the packet will not be routed and this call will return immediately
@@ -144,12 +379,16 @@ func (r *R) RouteExitFunc(sender *nebula.Control, whatDo ExitFunc) {
return
case RouteAndExit:
+ fp := r.unlockedInjectFlow(sender, receiver, p, false)
receiver.InjectUDPPacket(p)
+ fp.rx = true
r.Unlock()
return
case KeepRouting:
+ fp := r.unlockedInjectFlow(sender, receiver, p, false)
receiver.InjectUDPPacket(p)
+ fp.rx = true
default:
panic(fmt.Sprintf("Unknown exitFunc return: %v", e))
@@ -175,6 +414,34 @@ func (r *R) RouteUntilAfterMsgType(sender *nebula.Control, msgType header.Messag
})
}
+func (r *R) RouteForAllUntilAfterMsgTypeTo(receiver *nebula.Control, msgType header.MessageType, subType header.MessageSubType) {
+ h := &header.H{}
+ r.RouteForAllExitFunc(func(p *udp.Packet, r *nebula.Control) ExitType {
+ if r != receiver {
+ return KeepRouting
+ }
+
+ if err := h.Parse(p.Data); err != nil {
+ panic(err)
+ }
+
+ if h.Type == msgType && h.Subtype == subType {
+ return RouteAndExit
+ }
+
+ return KeepRouting
+ })
+}
+
+func (r *R) InjectUDPPacket(sender, receiver *nebula.Control, packet *udp.Packet) {
+ r.Lock()
+ defer r.Unlock()
+
+ fp := r.unlockedInjectFlow(sender, receiver, packet, false)
+ receiver.InjectUDPPacket(packet)
+ fp.rx = true
+}
+
// RouteForUntilAfterToAddr will route for sender and return only after it sees and sends a packet destined for toAddr
// finish can be any of the exitType values except `keepRouting`, the default value is `routeAndExit`
// If the router doesn't have the nebula controller for that address, we panic
@@ -234,12 +501,16 @@ func (r *R) RouteForAllExitFunc(whatDo ExitFunc) {
return
case RouteAndExit:
+ fp := r.unlockedInjectFlow(cm[x], receiver, p, false)
receiver.InjectUDPPacket(p)
+ fp.rx = true
r.Unlock()
return
case KeepRouting:
+ fp := r.unlockedInjectFlow(cm[x], receiver, p, false)
receiver.InjectUDPPacket(p)
+ fp.rx = true
default:
panic(fmt.Sprintf("Unknown exitFunc return: %v", e))
@@ -321,3 +592,31 @@ func (r *R) getControl(fromAddr, toAddr string, p *udp.Packet) *nebula.Control {
return r.controls[toAddr]
}
+
+func (r *R) formatUdpPacket(p *packet) string {
+ packet := gopacket.NewPacket(p.packet.Data, layers.LayerTypeIPv4, gopacket.Lazy)
+ v4 := packet.Layer(layers.LayerTypeIPv4).(*layers.IPv4)
+ if v4 == nil {
+ panic("not an ipv4 packet")
+ }
+
+ from := "unknown"
+ if c, ok := r.vpnControls[iputil.Ip2VpnIp(v4.SrcIP)]; ok {
+ from = c.GetUDPAddr()
+ }
+
+ udp := packet.Layer(layers.LayerTypeUDP).(*layers.UDP)
+ if udp == nil {
+ panic("not a udp packet")
+ }
+
+ data := packet.ApplicationLayer()
+ return fmt.Sprintf(
+ " %s-->>%s: src port: %v
dest port: %v
data: \"%v\"\n",
+ strings.Replace(from, ":", "#58;", 1),
+ strings.Replace(p.to.GetUDPAddr(), ":", "#58;", 1),
+ udp.SrcPort,
+ udp.DstPort,
+ string(data.Payload()),
+ )
+}
diff --git a/header/header.go b/header/header.go
index aae8079..50b7d62 100644
--- a/header/header.go
+++ b/header/header.go
@@ -46,6 +46,7 @@ var typeMap = map[MessageType]string{
LightHouse: "lightHouse",
Test: "test",
CloseTunnel: "closeTunnel",
+ Control: "control",
}
const (
@@ -73,7 +74,10 @@ var subTypeTestMap = map[MessageSubType]string{
var subTypeNoneMap = map[MessageSubType]string{0: "none"}
var subTypeMap = map[MessageType]*map[MessageSubType]string{
- Message: &subTypeNoneMap,
+ Message: {
+ MessageNone: "none",
+ MessageRelay: "relay",
+ },
RecvError: &subTypeNoneMap,
LightHouse: &subTypeNoneMap,
Test: &subTypeTestMap,
diff --git a/header/header_test.go b/header/header_test.go
index acb090a..765a006 100644
--- a/header/header_test.go
+++ b/header/header_test.go
@@ -82,10 +82,14 @@ func TestTypeMap(t *testing.T) {
LightHouse: "lightHouse",
Test: "test",
CloseTunnel: "closeTunnel",
+ Control: "control",
}, typeMap)
assert.Equal(t, map[MessageType]*map[MessageSubType]string{
- Message: &subTypeNoneMap,
+ Message: {
+ MessageNone: "none",
+ MessageRelay: "relay",
+ },
RecvError: &subTypeNoneMap,
LightHouse: &subTypeNoneMap,
Test: &subTypeTestMap,
diff --git a/overlay/tun_tester.go b/overlay/tun_tester.go
index 0fdd6ef..8cfb103 100644
--- a/overlay/tun_tester.go
+++ b/overlay/tun_tester.go
@@ -36,8 +36,8 @@ func newTun(l *logrus.Logger, deviceName string, cidr *net.IPNet, _ int, routes
Routes: routes,
routeTree: routeTree,
l: l,
- rxPackets: make(chan []byte, 1),
- TxPackets: make(chan []byte, 1),
+ rxPackets: make(chan []byte, 10),
+ TxPackets: make(chan []byte, 10),
}, nil
}
diff --git a/udp/udp_tester.go b/udp/udp_tester.go
index b4619ff..8ea88b0 100644
--- a/udp/udp_tester.go
+++ b/udp/udp_tester.go
@@ -48,8 +48,8 @@ type Conn struct {
func NewListener(l *logrus.Logger, ip string, port int, _ bool, _ int) (*Conn, error) {
return &Conn{
Addr: &Addr{net.ParseIP(ip), uint16(port)},
- RxPackets: make(chan *Packet, 1),
- TxPackets: make(chan *Packet, 1),
+ RxPackets: make(chan *Packet, 10),
+ TxPackets: make(chan *Packet, 10),
l: l,
}, nil
}