From 1cfa63b5fbb2981bf0007c5436139278155d0802 Mon Sep 17 00:00:00 2001 From: Michael Boeckli Date: Tue, 19 Mar 2024 15:08:54 +0100 Subject: [PATCH] Wait for shutdown completion --- Peer.go | 54 ++++++++++++++++++++++++---------------- peer_integration_test.go | 4 ++- 2 files changed, 36 insertions(+), 22 deletions(-) diff --git a/Peer.go b/Peer.go index 448b7ea..7498c51 100644 --- a/Peer.go +++ b/Peer.go @@ -52,26 +52,27 @@ type Block struct { } type Peer struct { - address string - network wire.BitcoinNet - mu sync.RWMutex - readConn net.Conn - writeConn net.Conn - incomingConn net.Conn - dial func(network, address string) (net.Conn, error) - peerHandler PeerHandlerI - writeChan chan wire.Message - quit chan struct{} - pingPongAlive chan struct{} - logger *slog.Logger - sentVerAck atomic.Bool - receivedVerAck atomic.Bool - batchDelay time.Duration - invBatcher *batcher.Batcher[chainhash.Hash] - dataBatcher *batcher.Batcher[chainhash.Hash] - maximumMessageSize int64 - isHealthy bool - quitReadHandler chan struct{} + address string + network wire.BitcoinNet + mu sync.RWMutex + readConn net.Conn + writeConn net.Conn + incomingConn net.Conn + dial func(network, address string) (net.Conn, error) + peerHandler PeerHandlerI + writeChan chan wire.Message + quit chan struct{} + pingPongAlive chan struct{} + logger *slog.Logger + sentVerAck atomic.Bool + receivedVerAck atomic.Bool + batchDelay time.Duration + invBatcher *batcher.Batcher[chainhash.Hash] + dataBatcher *batcher.Batcher[chainhash.Hash] + maximumMessageSize int64 + isHealthy bool + quitReadHandler chan struct{} + quitReadHandlerComplete chan struct{} } // NewPeer returns a new bitcoin peer for the provided address and configuration. @@ -281,8 +282,10 @@ func (p *Peer) readRetry(r io.Reader, pver uint32, bsvnet wire.BitcoinNet) (wire func (p *Peer) startReadHandler() { p.quitReadHandler = make(chan struct{}, 10) + p.quitReadHandlerComplete = make(chan struct{}, 10) + + p.logger.Info("Starting read handler") - p.logger.Info("starting read handler") go func() { readConn := p.readConn @@ -292,6 +295,13 @@ func (p *Peer) startReadHandler() { return } + go func() { + if p.quitReadHandlerComplete != nil { + p.quitReadHandlerComplete <- struct{}{} + } + p.logger.Info("Shutting down read handler") + }() + reader := bufio.NewReader(&io.LimitedReader{R: readConn, N: p.maximumMessageSize}) for { select { @@ -306,6 +316,7 @@ func (p *Peer) startReadHandler() { p.mu.Lock() p.quitReadHandler = nil + p.quitReadHandlerComplete = nil p.mu.Unlock() return @@ -686,5 +697,6 @@ func (p *Peer) Shutdown() { defer p.mu.Unlock() if p.quitReadHandler != nil { p.quitReadHandler <- struct{}{} + <-p.quitReadHandlerComplete } } diff --git a/peer_integration_test.go b/peer_integration_test.go index 8be841a..4a401d3 100644 --- a/peer_integration_test.go +++ b/peer_integration_test.go @@ -92,7 +92,7 @@ func TestNewPeer(t *testing.T) { peer, err := NewPeer(logger, "localhost:18333", peerHandler, wire.TestNet) require.NoError(t, err) - defer peer.Shutdown() + time.Sleep(5 * time.Second) require.True(t, peer.Connected()) @@ -111,5 +111,7 @@ func TestNewPeer(t *testing.T) { // wait longer than the reconnect interval and expect that peer has re-established connection time.Sleep(reconnectInterval + 2*time.Second) require.True(t, peer.Connected()) + + peer.Shutdown() }) }