diff --git a/peer.go b/peer.go index 550b174..4173de8 100644 --- a/peer.go +++ b/peer.go @@ -826,7 +826,7 @@ func (p *Peer) startMonitorPingPong() { select { case p.isUnhealthyCh <- struct{}{}: - default: // Do not block if nothing is ready from channel + default: // Do not block if nothing is reading from channel } p.logger.Warn("peer unhealthy") diff --git a/peer_manager.go b/peer_manager.go index aeaf134..a54dfed 100644 --- a/peer_manager.go +++ b/peer_manager.go @@ -52,10 +52,6 @@ func NewPeerManager(logger *slog.Logger, network wire.BitcoinNet, options ...Pee logger.Info("Excessive block size set to", slog.Int64("block size", pm.ebs)) wire.SetLimits(uint64(pm.ebs)) - if pm.restartUnhealthyPeers { - pm.StartMonitorPeerHealth() - } - return pm } @@ -69,6 +65,10 @@ func (pm *PeerManager) AddPeer(peer PeerI) error { pm.peers = append(pm.peers, peer) + if pm.restartUnhealthyPeers { + pm.startMonitorPeerHealth(peer) + } + return nil } @@ -95,23 +95,22 @@ func (pm *PeerManager) Shutdown() { } } -func (pm *PeerManager) StartMonitorPeerHealth() { - - for _, peer := range pm.peers { - pm.waitGroup.Add(1) - go func(p PeerI) { - defer pm.waitGroup.Done() - for { - select { - case <-pm.ctx.Done(): - return - case <-p.IsUnhealthyCh(): - pm.logger.Warn("peer unhealthy - restarting", slog.String("address", p.String()), slog.Bool("connected", p.Connected())) - p.Restart() - } +func (pm *PeerManager) startMonitorPeerHealth(peer PeerI) { + pm.logger.Info("Starting peer health monitoring") + + pm.waitGroup.Add(1) + go func(p PeerI) { + defer pm.waitGroup.Done() + for { + select { + case <-pm.ctx.Done(): + return + case <-p.IsUnhealthyCh(): + pm.logger.Warn("peer unhealthy - restarting", slog.String("address", p.String()), slog.Bool("connected", p.Connected())) + p.Restart() } - }(peer) - } + } + }(peer) } // AnnounceTransaction will send an INV message to the provided peers or to selected peers if peers is nil diff --git a/peer_manager_test.go b/peer_manager_test.go index 67b371f..1e952e7 100644 --- a/peer_manager_test.go +++ b/peer_manager_test.go @@ -124,3 +124,45 @@ func TestAnnounceNewTransaction(t *testing.T) { assert.GreaterOrEqual(t, peersMessaged, len(peers)/2) }) } + +func TestMonitorPeerHealth(t *testing.T) { + + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + tt := []struct { + name string + restartUnhealthyPeers bool + }{ + { + name: "restart unhealthy peers", + restartUnhealthyPeers: true, + }, + { + name: "do not restart unhealthy peers", + restartUnhealthyPeers: false, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + var opts []PeerManagerOptions + + if tc.restartUnhealthyPeers { + opts = append(opts, WithRestartUnhealthyPeers()) + } + + pm := NewPeerManager(logger, wire.TestNet, opts...) + require.NotNil(t, pm) + + peerHandler := NewMockPeerHandler() + + peer, err := NewPeer(logger, "localhost:18333", peerHandler, wire.TestNet, WithPingInterval(100*time.Millisecond, 200*time.Millisecond)) + require.NoError(t, err) + + err = pm.AddPeer(peer) + require.NoError(t, err) + assert.Len(t, pm.GetPeers(), 1) + time.Sleep(1 * time.Second) + pm.Shutdown() + }) + } +}