Skip to content

Commit

Permalink
cleanup e2e test
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcoPolo committed Feb 4, 2025
1 parent 4ceaeaf commit 6459ddb
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 139 deletions.
172 changes: 45 additions & 127 deletions p2p/protocol/holepunch/holepunch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/libp2p/go-libp2p/p2p/transport/quicreuse"
"go.uber.org/fx"

"github.com/libp2p/go-msgio/pbio"
ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -132,22 +133,6 @@ func TestDirectDialWorks(t *testing.T) {
require.Equal(t, holepunch.DirectDialEvtT, events[0].Type)
}

type testHost struct {
H *host.Host
HostOpts []libp2p.Option
Public bool
ConnectToRelay bool
IPAddr string
Port uint16
}

type simnet struct {
Hosts []*testHost
Relay *host.Host

router *simconn.SimpleFirewallRouter
}

func connectToRelay(relayPtr *host.Host) libp2p.Option {
return func(cfg *libp2p.Config) error {
if relayPtr == nil {
Expand All @@ -166,44 +151,6 @@ func connectToRelay(relayPtr *host.Host) libp2p.Option {
}
}

func (s *simnet) Start(t *testing.T) {
s.router = &simconn.SimpleFirewallRouter{}

var err error
*s.Relay, err = libp2p.New(
libp2p.ListenAddrs(ma.StringCast("/ip4/1.2.0.1/udp/8000/quic-v1")),
libp2p.DisableRelay(),
libp2p.ResourceManager(&network.NullResourceManager{}),
quicReuseOpts(true, s.router),
)
require.NoError(t, err)
_, err = relayv2.New(*s.Relay)
require.NoError(t, err)

for _, h := range s.Hosts {
if h.Public {
*h.H, err = libp2p.New(
libp2p.ListenAddrs(ma.StringCast(fmt.Sprintf("/ip4/%s/udp/%d/quic-v1", h.IPAddr, h.Port))),
libp2p.ResourceManager(&network.NullResourceManager{}),
quicReuseOpts(true, s.router))
require.NoError(t, err)
continue
}
var relay host.Host
if h.ConnectToRelay {
relay = *s.Relay
}
*h.H = mkHostWithStaticAutoRelay(t, h.IPAddr, int(h.Port), relay, s.router, h.HostOpts)
}
}

func (sc *simnet) Stop(t *testing.T) {
for _, h := range sc.Hosts {
(*h.H).Close()
}
(*sc.Relay).Close()
}

func learnAddrs(h1, h2 host.Host) {
h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.ConnectedAddrTTL)
h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.ConnectedAddrTTL)
Expand All @@ -222,81 +169,53 @@ func pingAtoB(t *testing.T, a, b host.Host) {
require.NoError(t, result.Error)
}

func MustNewHost(t *testing.T, opts ...libp2p.Option) host.Host {
h, err := libp2p.New(opts...)
require.NoError(t, err)
return h
}

func TestEndToEndSimConnect(t *testing.T) {
h1tr := &mockEventTracer{}
h2tr := &mockEventTracer{}
// identify.ActivationThresh = 3

// var observer1 host.Host
// var observer2 host.Host
// var observer3 host.Host

var h1 host.Host
var h2 host.Host
var relay host.Host
n := &simnet{
Hosts: []*testHost{
// {H: &observer1, IPAddr: "3.0.0.1", Port: 8000, Public: true},
// {H: &observer2, IPAddr: "3.0.0.2", Port: 8000, Public: true},
// {H: &observer3, IPAddr: "3.0.0.3", Port: 8000, Public: true},
{H: &h1, IPAddr: "2.2.0.1", Port: 8000,
HostOpts: []libp2p.Option{
libp2p.EnableHolePunching(holepunch.WithTracer(h1tr)),
}},
{H: &h2, IPAddr: "2.2.0.2", Port: 8001,
ConnectToRelay: true,
HostOpts: []libp2p.Option{
libp2p.EnableHolePunching(holepunch.WithTracer(h2tr)),
libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr {
fmt.Println("addrs in factory:", addrs)
return addrs
}),
}},
},
Relay: &relay,
}
n.Start(t)
defer n.Stop(t)
fmt.Println("H2 addrs:", h2.Addrs())
panic("stop")

// pingAtoB(t, h2, observer1)
// pingAtoB(t, h2, observer2)
// pingAtoB(t, h2, observer3)
router := &simconn.SimpleFirewallRouter{}
relay := MustNewHost(t,
quicSimConn(true, router),
libp2p.ListenAddrs(ma.StringCast("/ip4/1.2.0.1/udp/8000/quic-v1")),
libp2p.DisableRelay(),
libp2p.ResourceManager(&network.NullResourceManager{}),
libp2p.WithFxOption(fx.Invoke(func(h host.Host) {
// Setup relay service
_, err := relayv2.New(h)
require.NoError(t, err)
})),
)
h1 := MustNewHost(t,
quicSimConn(false, router),
libp2p.ForceReachabilityPrivate(),
libp2p.EnableHolePunching(holepunch.WithTracer(h1tr), holepunch.DirectDialTimeout(100*time.Millisecond)),
libp2p.ListenAddrs(ma.StringCast("/ip4/2.2.0.1/udp/8000/quic-v1")),
libp2p.ResourceManager(&network.NullResourceManager{}),
)
h2 := MustNewHost(t,
quicSimConn(false, router),
libp2p.ForceReachabilityPrivate(),
libp2p.ListenAddrs(ma.StringCast("/ip4/2.2.0.2/udp/8001/quic-v1")),
libp2p.ResourceManager(&network.NullResourceManager{}),
connectToRelay(&relay),
libp2p.EnableHolePunching(holepunch.WithTracer(h2tr), holepunch.DirectDialTimeout(100*time.Millisecond)),
)

// time.Sleep(100 * time.Millisecond)
defer h1.Close()
defer h2.Close()
defer relay.Close()

// time.Sleep(100 * time.Millisecond)
p1 := ping.NewPingService(h1)
require.NoError(t, h1.Connect(context.Background(), peer.AddrInfo{
ID: relay.ID(),
Addrs: relay.Addrs(),
}))
// var raddr ma.Multiaddr
// for _, a := range h2.Addrs() {
// if _, err := a.ValueForProtocol(ma.P_CIRCUIT); err == nil {
// raddr = a
// break
// }
// }
// require.NoError(t, h1.Connect(context.Background(), peer.AddrInfo{
// ID: h2.ID(),
// Addrs: []ma.Multiaddr{raddr},
// }))

fmt.Println("h1.ID():", h1.ID(), h1.Addrs())
go func() {
for {
time.Sleep(1 * time.Second)

fmt.Println("h2.ID():", h2.ID(), h2.Addrs())
}
}()
// Wait for holepunch service to start
time.Sleep(500 * time.Millisecond)

learnAddrs(h1, h2)
res := p1.Ping(network.WithAllowLimitedConn(context.Background(), "test"), h2.ID())
result := <-res
require.NoError(t, result.Error)
pingAtoB(t, h1, h2)

// wait till a direct connection is complete
ensureDirectConn(t, h1, h2)
Expand Down Expand Up @@ -574,9 +493,8 @@ func mkHostWithStaticAutoRelay(t *testing.T, ipAddr string, port int, relay host
opts = append(opts, hostOpts...)
opts = append(opts,
libp2p.ListenAddrs(ma.StringCast(fmt.Sprintf("/ip4/%s/udp/%d/quic-v1", ipAddr, port))),
// libp2p.ForceReachabilityPrivate(),
libp2p.ResourceManager(&network.NullResourceManager{}),
quicReuseOpts(false, router))
quicSimConn(false, router))

if relay != nil {
pi := peer.AddrInfo{
Expand Down Expand Up @@ -624,7 +542,7 @@ func (m *MockSourceIPSelector) PreferredSourceIPForDestination(dst *net.UDPAddr)
return *m.ip.Load(), nil
}

func quicReuseOpts(isPublic bool, router *simconn.SimpleFirewallRouter) libp2p.Option {
func quicSimConn(isPublic bool, router *simconn.SimpleFirewallRouter) libp2p.Option {
m := &MockSourceIPSelector{}
return libp2p.QUICReuse(
quicreuse.NewConnManager,
Expand Down Expand Up @@ -656,7 +574,7 @@ func makeRelayedHosts(t *testing.T, h1opt, h2opt []holepunch.Option, addHolePunc
libp2p.ListenAddrs(ma.StringCast("/ip4/1.2.0.1/udp/8000/quic-v1")),
libp2p.DisableRelay(),
libp2p.ResourceManager(&network.NullResourceManager{}),
quicReuseOpts(true, router),
quicSimConn(true, router),
)
require.NoError(t, err)
_, err = relayv2.New(relay)
Expand All @@ -666,7 +584,7 @@ func makeRelayedHosts(t *testing.T, h1opt, h2opt []holepunch.Option, addHolePunc
h, err := libp2p.New(
libp2p.ListenAddrs(ma.StringCast("/ip4/1.2.0.2/udp/8000/quic-v1")),
libp2p.ForceReachabilityPrivate(),
quicReuseOpts(false, router),
quicSimConn(false, router),
libp2p.DisableRelay(),
)
require.NoError(t, err)
Expand Down Expand Up @@ -731,7 +649,7 @@ func mkHostWithHolePunchSvc2(t *testing.T, ipAddr string, port int, router *simc
libp2p.ListenAddrs(ma.StringCast(fmt.Sprintf("/ip4/%s/udp/%d/quic-v1", ipAddr, port))),
libp2p.ForceReachabilityPrivate(),
libp2p.ResourceManager(&network.NullResourceManager{}),
quicReuseOpts(false, router),
quicSimConn(false, router),
)
require.NoError(t, err)
hps := addHolePunchService(t, h, nil, opts...)
Expand Down
13 changes: 7 additions & 6 deletions p2p/protocol/holepunch/holepuncher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ import (
// ErrHolePunchActive is returned from DirectConnect when another hole punching attempt is currently running
var ErrHolePunchActive = errors.New("another hole punching attempt to this peer is active")

const (
dialTimeout = 5 * time.Second
maxRetries = 3
)
const maxRetries = 3

// The holePuncher is run on the peer that's behind a NAT / Firewall.
// It observes new incoming connections via a relay that it has a reservation with,
Expand All @@ -40,6 +37,8 @@ type holePuncher struct {
ids identify.IDService
listenAddrs func() []ma.Multiaddr

directDialTimeout time.Duration

// active hole punches for deduplicating
activeMx sync.Mutex
active map[peer.ID]struct{}
Expand Down Expand Up @@ -118,7 +117,7 @@ func (hp *holePuncher) directConnect(rp peer.ID) error {
for _, a := range hp.host.Peerstore().Addrs(rp) {
if !isRelayAddress(a) && manet.IsPublicAddr(a) {
forceDirectConnCtx := network.WithForceDirectDial(hp.ctx, "hole-punching")
dialCtx, cancel := context.WithTimeout(forceDirectConnCtx, 100*time.Millisecond)
dialCtx, cancel := context.WithTimeout(forceDirectConnCtx, hp.directDialTimeout)

tstart := time.Now()
// This dials *all* addresses, public and private, from the peerstore.
Expand Down Expand Up @@ -159,7 +158,9 @@ func (hp *holePuncher) directConnect(rp peer.ID) error {
}
hp.tracer.StartHolePunch(rp, addrs, rtt)
hp.tracer.HolePunchAttempt(pi.ID)
err := holePunchConnect(hp.ctx, hp.host, pi, false)
ctx, cancel := context.WithTimeout(hp.ctx, hp.directDialTimeout)
err := holePunchConnect(ctx, hp.host, pi, false)
cancel()
dt := time.Since(start)
hp.tracer.EndHolePunch(rp, dt, err)
if err == nil {
Expand Down
20 changes: 17 additions & 3 deletions p2p/protocol/holepunch/svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
ma "github.com/multiformats/go-multiaddr"
)

const defaultDirectDialTimeout = 5 * time.Second

// Protocol is the libp2p protocol for Hole Punching.
const Protocol protocol.ID = "/libp2p/dcutr"

Expand All @@ -38,6 +40,13 @@ var ErrClosed = errors.New("hole punching service closing")

type Option func(*Service) error

func DirectDialTimeout(timeout time.Duration) Option {
return func(s *Service) error {
s.directDialTimeout = timeout
return nil
}
}

// The Service runs on every node that supports the DCUtR protocol.
type Service struct {
ctx context.Context
Expand All @@ -52,8 +61,9 @@ type Service struct {
// publicly reachable relay addresses.
listenAddrs func() []ma.Multiaddr

holePuncherMx sync.Mutex
holePuncher *holePuncher
directDialTimeout time.Duration
holePuncherMx sync.Mutex
holePuncher *holePuncher

hasPublicAddrsChan chan struct{}

Expand Down Expand Up @@ -83,6 +93,7 @@ func NewService(h host.Host, ids identify.IDService, listenAddrs func() []ma.Mul
ids: ids,
listenAddrs: listenAddrs,
hasPublicAddrsChan: make(chan struct{}),
directDialTimeout: defaultDirectDialTimeout,
}

for _, opt := range opts {
Expand Down Expand Up @@ -137,6 +148,7 @@ func (s *Service) waitForPublicAddr() {
return
}
s.holePuncher = newHolePuncher(s.host, s.ids, s.listenAddrs, s.tracer, s.filter)
s.holePuncher.directDialTimeout = s.directDialTimeout
s.holePuncherMx.Unlock()
close(s.hasPublicAddrsChan)
}
Expand Down Expand Up @@ -258,7 +270,9 @@ func (s *Service) handleNewStream(str network.Stream) {
log.Debugw("starting hole punch", "peer", rp)
start := time.Now()
s.tracer.HolePunchAttempt(pi.ID)
err = holePunchConnect(s.ctx, s.host, pi, true)
ctx, cancel := context.WithTimeout(s.ctx, s.directDialTimeout)
err = holePunchConnect(ctx, s.host, pi, true)
cancel()
dt := time.Since(start)
s.tracer.EndHolePunch(rp, dt, err)
s.tracer.HolePunchFinished("receiver", 1, addrs, ownAddrs, getDirectConnection(s.host, rp))
Expand Down
4 changes: 1 addition & 3 deletions p2p/protocol/holepunch/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,9 @@ func getDirectConnection(h host.Host, p peer.ID) network.Conn {
func holePunchConnect(ctx context.Context, host host.Host, pi peer.AddrInfo, isClient bool) error {
holePunchCtx := network.WithSimultaneousConnect(ctx, isClient, "hole-punching")
forceDirectConnCtx := network.WithForceDirectDial(holePunchCtx, "hole-punching")
dialCtx, cancel := context.WithTimeout(forceDirectConnCtx, dialTimeout)
defer cancel()

log.Debugw("holepunchConnect", "host", host.ID(), "peer", pi.ID, "addrs", pi.Addrs)
if err := host.Connect(dialCtx, pi); err != nil {
if err := host.Connect(forceDirectConnCtx, pi); err != nil {
log.Debugw("hole punch attempt with peer failed", "peer ID", pi.ID, "error", err)
return err
}
Expand Down

0 comments on commit 6459ddb

Please sign in to comment.