From c1dedb4e6e0d7830163c1e97b3196db747a8eef3 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Sun, 20 Nov 2022 12:13:30 +0100 Subject: [PATCH] improve handling of dead peers --- comm.go | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/comm.go b/comm.go index 86380a74..4dd6d3bc 100644 --- a/comm.go +++ b/comm.go @@ -13,7 +13,6 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-msgio" - "github.com/libp2p/go-msgio/protoio" pb "github.com/libp2p/go-libp2p-pubsub/pb" ) @@ -126,7 +125,7 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan } go p.handleSendingMessages(ctx, s, outgoing) - go p.handlePeerEOF(ctx, s) + go p.handlePeerDead(s) select { case p.newPeerStream <- s: case <-ctx.Done(): @@ -142,19 +141,16 @@ func (p *PubSub) handleNewPeerWithBackoff(ctx context.Context, pid peer.ID, back } } -func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) { +func (p *PubSub) handlePeerDead(s network.Stream) { pid := s.Conn().RemotePeer() - r := protoio.NewDelimitedReader(s, p.maxMessageSize) - rpc := new(RPC) - for { - err := r.ReadMsg(&rpc.RPC) - if err != nil { - p.notifyPeerDead(pid) - return - } + _, err := s.Read([]byte{}) + if err == nil { log.Debugf("unexpected message from %s", pid) } + + s.Reset() + p.notifyPeerDead(pid) } func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, outgoing <-chan *RPC) {