Skip to content

Commit

Permalink
Wait for shutdown completion
Browse files Browse the repository at this point in the history
  • Loading branch information
boecklim committed Mar 19, 2024
1 parent 15c3f22 commit 1cfa63b
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 22 deletions.
54 changes: 33 additions & 21 deletions Peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,26 +52,27 @@ type Block struct {
}

type Peer struct {
address string
network wire.BitcoinNet
mu sync.RWMutex
readConn net.Conn
writeConn net.Conn
incomingConn net.Conn
dial func(network, address string) (net.Conn, error)
peerHandler PeerHandlerI
writeChan chan wire.Message
quit chan struct{}
pingPongAlive chan struct{}
logger *slog.Logger
sentVerAck atomic.Bool
receivedVerAck atomic.Bool
batchDelay time.Duration
invBatcher *batcher.Batcher[chainhash.Hash]
dataBatcher *batcher.Batcher[chainhash.Hash]
maximumMessageSize int64
isHealthy bool
quitReadHandler chan struct{}
address string
network wire.BitcoinNet
mu sync.RWMutex
readConn net.Conn
writeConn net.Conn
incomingConn net.Conn
dial func(network, address string) (net.Conn, error)
peerHandler PeerHandlerI
writeChan chan wire.Message
quit chan struct{}
pingPongAlive chan struct{}
logger *slog.Logger
sentVerAck atomic.Bool
receivedVerAck atomic.Bool
batchDelay time.Duration
invBatcher *batcher.Batcher[chainhash.Hash]
dataBatcher *batcher.Batcher[chainhash.Hash]
maximumMessageSize int64
isHealthy bool
quitReadHandler chan struct{}
quitReadHandlerComplete chan struct{}
}

// NewPeer returns a new bitcoin peer for the provided address and configuration.
Expand Down Expand Up @@ -281,8 +282,10 @@ func (p *Peer) readRetry(r io.Reader, pver uint32, bsvnet wire.BitcoinNet) (wire

func (p *Peer) startReadHandler() {
p.quitReadHandler = make(chan struct{}, 10)
p.quitReadHandlerComplete = make(chan struct{}, 10)

p.logger.Info("Starting read handler")

p.logger.Info("starting read handler")
go func() {

readConn := p.readConn
Expand All @@ -292,6 +295,13 @@ func (p *Peer) startReadHandler() {
return
}

go func() {
if p.quitReadHandlerComplete != nil {
p.quitReadHandlerComplete <- struct{}{}
}
p.logger.Info("Shutting down read handler")
}()

reader := bufio.NewReader(&io.LimitedReader{R: readConn, N: p.maximumMessageSize})
for {
select {
Expand All @@ -306,6 +316,7 @@ func (p *Peer) startReadHandler() {

p.mu.Lock()
p.quitReadHandler = nil
p.quitReadHandlerComplete = nil
p.mu.Unlock()

return
Expand Down Expand Up @@ -686,5 +697,6 @@ func (p *Peer) Shutdown() {
defer p.mu.Unlock()
if p.quitReadHandler != nil {
p.quitReadHandler <- struct{}{}
<-p.quitReadHandlerComplete
}
}
4 changes: 3 additions & 1 deletion peer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestNewPeer(t *testing.T) {

peer, err := NewPeer(logger, "localhost:18333", peerHandler, wire.TestNet)
require.NoError(t, err)
defer peer.Shutdown()

time.Sleep(5 * time.Second)

require.True(t, peer.Connected())
Expand All @@ -111,5 +111,7 @@ func TestNewPeer(t *testing.T) {
// wait longer than the reconnect interval and expect that peer has re-established connection
time.Sleep(reconnectInterval + 2*time.Second)
require.True(t, peer.Connected())

peer.Shutdown()
})
}

0 comments on commit 1cfa63b

Please sign in to comment.