Skip to content

Commit

Permalink
Merge pull request #23 from libsv/refactor/set-unhealthy
Browse files Browse the repository at this point in the history
refactor peer health monitor
  • Loading branch information
boecklim authored Jul 22, 2024
2 parents 6e9795c + ba6e38b commit b5ce3c9
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 77 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,7 @@ lint:
.PHONY: install
install:
go install honnef.co/go/tools/cmd/staticcheck@latest

.PHONY: gen_go
gen_go:
go generate ./...
1 change: 1 addition & 0 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type PeerI interface {
RequestBlock(blockHash *chainhash.Hash)
Network() wire.BitcoinNet
IsHealthy() bool
IsUnhealthyCh() <-chan struct{}
Shutdown()
Restart()
}
Expand Down
76 changes: 40 additions & 36 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ const (
retryReadWriteMessageAttempts = 5
reconnectInterval = 10 * time.Second

pingInterval = 2 * time.Minute
connectionHealthTickerDuration = 3 * time.Minute
pingIntervalDefault = 2 * time.Minute
connectionHealthTickerDurationDefault = 3 * time.Minute
)

type Block struct {
Expand Down Expand Up @@ -71,13 +71,15 @@ type Peer struct {
invBatcher *batcher.Batcher[chainhash.Hash]
dataBatcher *batcher.Batcher[chainhash.Hash]
maximumMessageSize int64
isHealthy bool
isHealthy atomic.Bool
userAgentName *string
userAgentVersion *string
retryReadWriteMessageInterval time.Duration
nrWriteHandlers int

ctx context.Context
isUnhealthyCh chan struct{}
pingInterval time.Duration
connectionHealthThreshold time.Duration
ctx context.Context

cancelReadHandler context.CancelFunc
cancelWriteHandler context.CancelFunc
Expand Down Expand Up @@ -105,13 +107,16 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw
address: address,
writeChan: writeChan,
pingPongAlive: make(chan struct{}, 1),
isUnhealthyCh: make(chan struct{}),
peerHandler: peerHandler,
logger: peerLogger,
dial: net.Dial,
nrWriteHandlers: nrWriteHandlersDefault,
maximumMessageSize: defaultMaximumMessageSize,
batchDelay: defaultBatchDelayMilliseconds * time.Millisecond,
retryReadWriteMessageInterval: retryReadWriteMessageIntervalDefault,
pingInterval: pingIntervalDefault,
connectionHealthThreshold: connectionHealthTickerDurationDefault,
writerWg: &sync.WaitGroup{},
readerWg: &sync.WaitGroup{},
reconnectingWg: &sync.WaitGroup{},
Expand All @@ -132,6 +137,9 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw
}

func (p *Peer) start() {

p.logger.Info("Starting peer")

ctx, cancelAll := context.WithCancel(context.Background())
p.cancelAll = cancelAll
p.ctx = ctx
Expand Down Expand Up @@ -788,15 +796,15 @@ func (p *Peer) versionMessage(address string) *wire.MsgVersion {
func (p *Peer) startMonitorPingPong() {
p.healthMonitorWg.Add(1)

pingTicker := time.NewTicker(pingInterval)
pingTicker := time.NewTicker(p.pingInterval)

go func() {
// if no ping/pong signal is received for certain amount of time, mark peer as unhealthy
checkConnectionHealthTicker := time.NewTicker(connectionHealthTickerDuration)
monitorConnectionTicker := time.NewTicker(p.connectionHealthThreshold)

defer func() {
p.healthMonitorWg.Done()
checkConnectionHealthTicker.Stop()
monitorConnectionTicker.Stop()
}()

for {
Expand All @@ -809,50 +817,42 @@ func (p *Peer) startMonitorPingPong() {
}
p.writeChan <- wire.NewMsgPing(nonce)
case <-p.pingPongAlive:
p.mu.Lock()
p.isHealthy = true
p.mu.Unlock()
// if ping/pong signal is received reset the ticker
monitorConnectionTicker.Reset(p.connectionHealthThreshold)
p.setHealthy()
case <-monitorConnectionTicker.C:

// if ping/pong is received signal reset the ticker
checkConnectionHealthTicker.Reset(connectionHealthTickerDuration)
case <-checkConnectionHealthTicker.C:
p.isHealthy.Store(false)

select {
case p.isUnhealthyCh <- struct{}{}:
default: // Do not block if nothing is ready from channel
}

p.mu.Lock()
p.isHealthy = false
p.logger.Warn("peer unhealthy")
p.mu.Unlock()
case <-p.ctx.Done():
return
}
}
}()
}

func (p *Peer) IsHealthy() bool {
p.mu.Lock()
defer p.mu.Unlock()

return p.isHealthy
func (p *Peer) IsUnhealthyCh() <-chan struct{} {
return p.isUnhealthyCh
}

func (p *Peer) stopReadHandler() {
if p.cancelReadHandler == nil {
func (p *Peer) setHealthy() {

if p.isHealthy.Load() {
return
}
p.logger.Debug("Cancelling read handlers")
p.cancelReadHandler()
p.logger.Debug("Waiting for read handlers to stop")
p.readerWg.Wait()

p.logger.Info("peer healthy")
p.isHealthy.Store(true)
}

func (p *Peer) stopWriteHandler() {
if p.cancelWriteHandler == nil {
return
}
p.logger.Debug("Cancelling write handlers")
p.cancelWriteHandler()
p.logger.Debug("Waiting for writer handlers to stop")
p.writerWg.Wait()
func (p *Peer) IsHealthy() bool {
return p.isHealthy.Load()
}

func (p *Peer) Restart() {
Expand All @@ -862,10 +862,14 @@ func (p *Peer) Restart() {
}

func (p *Peer) Shutdown() {
p.logger.Info("Shutting down")

p.cancelAll()

p.reconnectingWg.Wait()
p.healthMonitorWg.Wait()
p.writerWg.Wait()
p.readerWg.Wait()

p.logger.Info("Shutdown complete")
}
32 changes: 15 additions & 17 deletions peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ type PeerManager struct {
logger *slog.Logger
ebs int64
restartUnhealthyPeers bool
monitorPeersInterval time.Duration
waitGroup sync.WaitGroup
cancelAll context.CancelFunc
ctx context.Context
Expand Down Expand Up @@ -84,6 +83,7 @@ func (pm *PeerManager) GetPeers() []PeerI {
}

func (pm *PeerManager) Shutdown() {
pm.logger.Info("Shutting down peer manager")

if pm.cancelAll != nil {
pm.cancelAll()
Expand All @@ -96,24 +96,22 @@ func (pm *PeerManager) Shutdown() {
}

func (pm *PeerManager) StartMonitorPeerHealth() {
ticker := time.NewTicker(pm.monitorPeersInterval)
pm.waitGroup.Add(1)
go func() {
defer pm.waitGroup.Done()
for {
select {
case <-pm.ctx.Done():
return
case <-ticker.C:
for _, peer := range pm.GetPeers() {
if !peer.IsHealthy() {
pm.logger.Warn("peer unhealthy - restarting", slog.String("address", peer.String()), slog.Bool("connected", peer.Connected()))
peer.Restart()
}

for _, peer := range pm.peers {
pm.waitGroup.Add(1)
go func(p PeerI) {
defer pm.waitGroup.Done()
for {
select {
case <-pm.ctx.Done():
return
case <-p.IsUnhealthyCh():
pm.logger.Warn("peer unhealthy - restarting", slog.String("address", p.String()), slog.Bool("connected", p.Connected()))
p.Restart()
}
}
}
}()
}(peer)
}
}

// AnnounceTransaction will send an INV message to the provided peers or to selected peers if peers is nil
Expand Down
3 changes: 1 addition & 2 deletions peer_manager_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ func WithExcessiveBlockSize(ebs int64) PeerManagerOptions {
}
}

func WithRestartUnhealthyPeers(monitorPeersInterval time.Duration) PeerManagerOptions {
func WithRestartUnhealthyPeers() PeerManagerOptions {
return func(p *PeerManager) {
p.restartUnhealthyPeers = true
p.monitorPeersInterval = monitorPeersInterval
}
}
6 changes: 2 additions & 4 deletions peer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestNewPeerManager(t *testing.T) {
err = pm.AddPeer(peer)
require.NoError(t, err)
assert.Len(t, pm.GetPeers(), 1)
peer.Shutdown()
pm.Shutdown()
})

t.Run("1 peer - de dup", func(t *testing.T) {
Expand All @@ -64,9 +64,7 @@ func TestNewPeerManager(t *testing.T) {

assert.Len(t, pm.GetPeers(), 4)

for _, peer := range peers {
peer.Shutdown()
}
pm.Shutdown()
})
}

Expand Down
4 changes: 4 additions & 0 deletions peer_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func (p *PeerMock) IsHealthy() bool {
return true
}

func (p *PeerMock) IsUnhealthyCh() <-chan struct{} {
return make(<-chan struct{})
}

func (p *PeerMock) Connected() bool {
return true
}
Expand Down
11 changes: 11 additions & 0 deletions peer_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,14 @@ func WithNrOfWriteHandlers(NrWriteHandlers int) PeerOptions {
return nil
}
}

// WithPingInterval sets the optional time duration ping interval and connection health threshold
// ping interval is the time interval in which the peer sends a ping
// connection health threshold is the time duration after which the connection is marked unhealthy if no signal is received
func WithPingInterval(pingInterval time.Duration, connectionHealthThreshold time.Duration) PeerOptions {
return func(p *Peer) error {
p.pingInterval = pingInterval
p.connectionHealthThreshold = connectionHealthThreshold
return nil
}
}
24 changes: 22 additions & 2 deletions peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,10 @@ func TestReconnect(t *testing.T) {

if tc.cancelRead {
// cancel reader so that writer will disconnect
peer.stopReadHandler()
stopReadHandler(peer)
} else {
// cancel writer so that reader will disconnect
peer.stopWriteHandler()
stopWriteHandler(peer)
}

// break connection
Expand Down Expand Up @@ -637,3 +637,23 @@ func doHandshake(t *testing.T, p *Peer, myConn net.Conn) {
assert.NoError(t, err)
assert.Equal(t, wire.CmdVerAck, msg.Command())
}

func stopReadHandler(p *Peer) {
if p.cancelReadHandler == nil {
return
}
p.logger.Debug("Cancelling read handlers")
p.cancelReadHandler()
p.logger.Debug("Waiting for read handlers to stop")
p.readerWg.Wait()
}

func stopWriteHandler(p *Peer) {
if p.cancelWriteHandler == nil {
return
}
p.logger.Debug("Cancelling write handlers")
p.cancelWriteHandler()
p.logger.Debug("Waiting for writer handlers to stop")
p.writerWg.Wait()
}
38 changes: 22 additions & 16 deletions test/peer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,22 +148,12 @@ func TestNewPeer(t *testing.T) {
t.Log("shutdown finished")
})

t.Run("announce transaction", func(t *testing.T) {
t.Run("restart", func(t *testing.T) {
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))

pm := p2p.NewPeerManager(logger, wire.TestNet)
require.NotNil(t, pm)

peerHandler := &p2p.PeerHandlerIMock{
HandleTransactionsGetFunc: func(msgs []*wire.InvVect, peer p2p.PeerI) ([][]byte, error) {
return [][]byte{TX1RawBytes}, nil
},
}

peer, err := p2p.NewPeer(logger, "localhost:"+p2pPortBinding, peerHandler, wire.TestNet)
require.NoError(t, err)
peerHandler := p2p.NewMockPeerHandler()

err = pm.AddPeer(peer)
peer, err := p2p.NewPeer(logger, "localhost:"+p2pPortBinding, peerHandler, wire.TestNet, p2p.WithUserAgent("agent", "0.0.1"))
require.NoError(t, err)

t.Log("expect that peer has connected")
Expand All @@ -175,14 +165,30 @@ func TestNewPeer(t *testing.T) {
break connectLoop
}
case <-time.NewTimer(5 * time.Second).C:
t.Fatal("peer did not disconnect")
t.Fatal("peer did not connect")
}
}

pm.AnnounceTransaction(TX1Hash, []p2p.PeerI{peer})
t.Log("restart peer")
peer.Restart()

time.Sleep(100 * time.Millisecond)
t.Log("expect that peer has re-established connection")
reconnectLoop:
for {
select {
case <-time.NewTicker(200 * time.Millisecond).C:
if peer.Connected() {
break reconnectLoop
}
case <-time.NewTimer(2 * time.Second).C:
t.Fatal("peer did not reconnect")
}
}

require.NoError(t, err)

t.Log("shutdown")
peer.Shutdown()
t.Log("shutdown finished")
})
}
Loading

0 comments on commit b5ce3c9

Please sign in to comment.