Skip to content

Commit

Permalink
feat: make "get providers" non-blocking when there are closer peers
Browse files Browse the repository at this point in the history
Let's assume there's one (or zero) providers for a record and everyone
is looking for 10 providers.

- Before, peers would wait for all peers along the path to return this
  one provider. However, because there's only one provider, peers won't be
  able to short-circuit anyways.
- Now, peers will go to the end of the path before waiting.

This may make some queries slower, but it attempts to give "priority" to
peers that actually _need_ responses as opposed to peers that are
"optimistically" waiting for responses.
  • Loading branch information
Stebalien committed Jul 22, 2021
1 parent a804f6a commit 3ec4325
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 12 deletions.
42 changes: 33 additions & 9 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
kb "github.com/libp2p/go-libp2p-kbucket"
pstore "github.com/libp2p/go-libp2p-peerstore"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -317,23 +318,46 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.

resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())

// Find closer peers.
closer := dht.betterPeersToQuery(pmes, p, dht.bucketSize)
myBucket := true
if len(closer) > 0 {
// Fill out peer infos.
// TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos).
infos := pstore.PeerInfos(dht.peerstore, closer)
resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)

// If we have a full bucket of closer peers, check to see if _we're_ in the closest
// set.
if len(closer) >= dht.bucketSize {
// Check to see if _we're_ in the "close" bucket.
// If not, we _may_
peers := append(closer, dht.self)
peers = kb.SortClosestPeers(peers, kb.ConvertKey(string(pmes.GetKey())))
myBucket = peers[len(peers)-1] != dht.self
}
}

// setup providers
providers := dht.ProviderManager.GetProviders(ctx, key)
var providers []peer.ID
if myBucket {
// If we're in the closest set, block getting providers.
providers = dht.ProviderManager.GetProviders(ctx, key)
} else {
// Otherwise, don't block. The peer will find a closer peer.
var err error
providers, err = dht.ProviderManager.GetProvidersNonBlocking(ctx, key)
if err != nil {
logger.Debugw("dropping get providers requests", err)
}
}

if len(providers) > 0 {
// TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos).
infos := pstore.PeerInfos(dht.peerstore, providers)
resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
}

// Also send closer peers.
closer := dht.betterPeersToQuery(pmes, p, dht.bucketSize)
if closer != nil {
// TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos).
infos := pstore.PeerInfos(dht.peerstore, closer)
resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
}

return resp, nil
}

Expand Down
38 changes: 35 additions & 3 deletions providers/providers_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ var lruCacheSize = 256
var batchBufferSize = 256
var log = logging.Logger("providers")
var defaultProvideBufferSize = 256
var defaultGetProvidersBufferSize = 16
var defaultGetProvidersNonBlockingBufferSize = defaultGetProvidersBufferSize / 4

// ProviderManager adds and pulls providers out of the datastore,
// caching them in between
Expand Down Expand Up @@ -107,9 +109,7 @@ type getProv struct {
func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching, opts ...Option) (*ProviderManager, error) {
pm := new(ProviderManager)
pm.nonBlocking = true
// buffer size of one to reduce context switching.
pm.getprovs = make(chan *getProv, 1)
// buffer so we can handle bursts.
pm.getprovs = make(chan *getProv, defaultGetProvidersBufferSize)
pm.newprovs = make(chan *addProv, defaultProvideBufferSize)
pm.dstore = autobatch.NewAutoBatching(dstore, batchBufferSize)
cache, err := lru.NewLRU(lruCacheSize, nil)
Expand Down Expand Up @@ -336,6 +336,38 @@ func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) []peer.ID
}
}

// GetProvidersNonBlocking returns the set of providers for the given key. If the "get providers"
// queue is full, it returns immediately.
//
// This method _does not_ copy the set. Do not modify it.
func (pm *ProviderManager) GetProvidersNonBlocking(ctx context.Context, k []byte) ([]peer.ID, error) {
// If we're "busy", don't even try. This is clearly racy, but it's mostly an "optimistic"
// check anyways and it should stabalize pretty quickly when we're under load.
//
// This helps leave some space for peers that actually need responses.
if len(pm.getprovs) > defaultGetProvidersNonBlockingBufferSize {
return nil, ErrWouldBlock
}

gp := &getProv{
key: k,
resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking
}
select {
case pm.getprovs <- gp:
default:
return nil, ErrWouldBlock
}
select {
case <-pm.proc.Closing():
return nil, ErrClosing
case <-ctx.Done():
return nil, ctx.Err()
case peers := <-gp.resp:
return peers, nil
}
}

func (pm *ProviderManager) getProvidersForKey(k []byte) ([]peer.ID, error) {
pset, err := pm.getProviderSetForKey(k)
if err != nil {
Expand Down

0 comments on commit 3ec4325

Please sign in to comment.