From 3ec4325a5f9662c9c4f5cd1762ac9e430f884ba6 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 22 Jul 2021 10:47:31 -0700 Subject: [PATCH] feat: make "get providers" non-blocking when there are closer peers Let's assume there's one (or zero) providers for a record and everyone is looking for 10 providers. - Before, peers would wait for all peers along the path to return this one provider. However, because there's only one provider, peers won't be able to short-circuit anyways. - Now, peers will go to the end of the path before waiting. This may make some queries slower, but it attempts to give "priority" to peers that actually _need_ responses as opposed to peers that are "optimistically" waiting for responses. --- handlers.go | 42 ++++++++++++++++++++++++++-------- providers/providers_manager.go | 38 +++++++++++++++++++++++++++--- 2 files changed, 68 insertions(+), 12 deletions(-) diff --git a/handlers.go b/handlers.go index 3390e01ea..1de82f0d3 100644 --- a/handlers.go +++ b/handlers.go @@ -9,6 +9,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" + kb "github.com/libp2p/go-libp2p-kbucket" pstore "github.com/libp2p/go-libp2p-peerstore" "github.com/gogo/protobuf/proto" @@ -317,8 +318,39 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb. resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) + // Find closer peers. + closer := dht.betterPeersToQuery(pmes, p, dht.bucketSize) + myBucket := true + if len(closer) > 0 { + // Fill out peer infos. + // TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos). + infos := pstore.PeerInfos(dht.peerstore, closer) + resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos) + + // If we have a full bucket of closer peers, check to see if _we're_ in the closest + // set. + if len(closer) >= dht.bucketSize { + // Check to see if _we're_ in the "close" bucket. + // If not, we _may_ + peers := append(closer, dht.self) + peers = kb.SortClosestPeers(peers, kb.ConvertKey(string(pmes.GetKey()))) + myBucket = peers[len(peers)-1] != dht.self + } + } + // setup providers - providers := dht.ProviderManager.GetProviders(ctx, key) + var providers []peer.ID + if myBucket { + // If we're in the closest set, block getting providers. + providers = dht.ProviderManager.GetProviders(ctx, key) + } else { + // Otherwise, don't block. The peer will find a closer peer. + var err error + providers, err = dht.ProviderManager.GetProvidersNonBlocking(ctx, key) + if err != nil { + logger.Debugw("dropping get providers requests", err) + } + } if len(providers) > 0 { // TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos). @@ -326,14 +358,6 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb. resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos) } - // Also send closer peers. - closer := dht.betterPeersToQuery(pmes, p, dht.bucketSize) - if closer != nil { - // TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos). - infos := pstore.PeerInfos(dht.peerstore, closer) - resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos) - } - return resp, nil } diff --git a/providers/providers_manager.go b/providers/providers_manager.go index 69fcce780..3e31c79e5 100644 --- a/providers/providers_manager.go +++ b/providers/providers_manager.go @@ -36,6 +36,8 @@ var lruCacheSize = 256 var batchBufferSize = 256 var log = logging.Logger("providers") var defaultProvideBufferSize = 256 +var defaultGetProvidersBufferSize = 16 +var defaultGetProvidersNonBlockingBufferSize = defaultGetProvidersBufferSize / 4 // ProviderManager adds and pulls providers out of the datastore, // caching them in between @@ -107,9 +109,7 @@ type getProv struct { func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching, opts ...Option) (*ProviderManager, error) { pm := new(ProviderManager) pm.nonBlocking = true - // buffer size of one to reduce context switching. - pm.getprovs = make(chan *getProv, 1) - // buffer so we can handle bursts. + pm.getprovs = make(chan *getProv, defaultGetProvidersBufferSize) pm.newprovs = make(chan *addProv, defaultProvideBufferSize) pm.dstore = autobatch.NewAutoBatching(dstore, batchBufferSize) cache, err := lru.NewLRU(lruCacheSize, nil) @@ -336,6 +336,38 @@ func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) []peer.ID } } +// GetProvidersNonBlocking returns the set of providers for the given key. If the "get providers" +// queue is full, it returns immediately. +// +// This method _does not_ copy the set. Do not modify it. +func (pm *ProviderManager) GetProvidersNonBlocking(ctx context.Context, k []byte) ([]peer.ID, error) { + // If we're "busy", don't even try. This is clearly racy, but it's mostly an "optimistic" + // check anyways and it should stabalize pretty quickly when we're under load. + // + // This helps leave some space for peers that actually need responses. + if len(pm.getprovs) > defaultGetProvidersNonBlockingBufferSize { + return nil, ErrWouldBlock + } + + gp := &getProv{ + key: k, + resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking + } + select { + case pm.getprovs <- gp: + default: + return nil, ErrWouldBlock + } + select { + case <-pm.proc.Closing(): + return nil, ErrClosing + case <-ctx.Done(): + return nil, ctx.Err() + case peers := <-gp.resp: + return peers, nil + } +} + func (pm *ProviderManager) getProvidersForKey(k []byte) ([]peer.ID, error) { pset, err := pm.getProviderSetForKey(k) if err != nil {