Skip to content

Commit

Permalink
Shutdown function for stopping read handler
Browse files Browse the repository at this point in the history
  • Loading branch information
boecklim committed Mar 21, 2024
1 parent 0ceb6e0 commit fa1cc1b
Showing 1 changed file with 26 additions and 3 deletions.
29 changes: 26 additions & 3 deletions Peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package p2p

import (
"bufio"
"context"
"encoding/hex"
"errors"
"fmt"
Expand Down Expand Up @@ -71,6 +72,7 @@ type Peer struct {
dataBatcher *batcher.Batcher[chainhash.Hash]
maximumMessageSize int64
isHealthy bool
cancelReadHandler context.CancelFunc
}

// NewPeer returns a new bitcoin peer for the provided address and configuration.
Expand Down Expand Up @@ -252,8 +254,17 @@ func (p *Peer) String() string {
return p.address
}

func (p *Peer) readRetry(r io.Reader, pver uint32, bsvnet wire.BitcoinNet) (wire.Message, error) {
func (p *Peer) readRetry(ctx context.Context, r io.Reader, pver uint32, bsvnet wire.BitcoinNet) (wire.Message, error) {
policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(retryReadWriteMessageInterval), retryReadWriteMessageAttempts)

//ctx, cancel := context.WithCancel(context.Background())
//ctx := context.Background()
policyContext := backoff.WithContext(policy, ctx)

//p.mu.Lock()
//p.cancelReadHandler = cancel
//p.mu.Unlock()

operation := func() (wire.Message, error) {
msg, _, err := wire.ReadMessage(r, pver, bsvnet)
if err != nil {
Expand All @@ -270,7 +281,7 @@ func (p *Peer) readRetry(r io.Reader, pver uint32, bsvnet wire.BitcoinNet) (wire
}
}

msg, err := backoff.RetryNotifyWithData(operation, policy, notifyAndReconnect)
msg, err := backoff.RetryNotifyWithData(operation, policyContext, notifyAndReconnect)
if err != nil {
return nil, err
}
Expand All @@ -281,14 +292,19 @@ func (p *Peer) readRetry(r io.Reader, pver uint32, bsvnet wire.BitcoinNet) (wire
func (p *Peer) readHandler() {
readConn := p.readConn

ctx, cancel := context.WithCancel(context.Background())
p.cancelReadHandler = cancel

p.logger.Info("Starting read handler")

if readConn == nil {
p.logger.Error("no connection")
return
}

reader := bufio.NewReader(&io.LimitedReader{R: readConn, N: p.maximumMessageSize})
for {
msg, err := p.readRetry(reader, wire.ProtocolVersion, p.network)
msg, err := p.readRetry(ctx, reader, wire.ProtocolVersion, p.network)
if err != nil {
p.disconnect()

Expand Down Expand Up @@ -664,3 +680,10 @@ func (p *Peer) IsHealthy() bool {

return p.isHealthy
}

func (p *Peer) Shutdown() {
if p.cancelReadHandler != nil {
p.cancelReadHandler()
//p.cancelReadHandlerWaitGroup.Wait()
}
}

0 comments on commit fa1cc1b

Please sign in to comment.