Skip to content

Commit

Permalink
PeerPicker: methods for incremental peer changes
Browse files Browse the repository at this point in the history
Add methods that are a bit more friendly to work with.
  • Loading branch information
dfinkel committed Nov 3, 2022
1 parent dcfd232 commit ff9c76b
Show file tree
Hide file tree
Showing 4 changed files with 414 additions and 0 deletions.
32 changes: 32 additions & 0 deletions galaxycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ func NewUniverse(protocol FetchProtocol, selfID string, opts ...UniverseOpt) *Un
peerPicker: newPeerPicker(protocol, selfID, options.hashOpts),
recorder: options.recorder,
}
// Insert the Self-ID into the hash-ring
c.peerPicker.set(Peer{ID: selfID, URI: ""})

return c
}
Expand Down Expand Up @@ -196,10 +198,40 @@ func (universe *Universe) GetGalaxy(name string) *Galaxy {
// Set updates the Universe's list of peers (contained in the PeerPicker).
// Each PeerURL value should be a valid base URL,
// for example "example.net:8000".
// This is a compatibility wrapper around SetPeers which sets the ID and URI
// equal.
func (universe *Universe) Set(peerURLs ...string) error {
return universe.peerPicker.setURLs(peerURLs...)
}

// SetPeers updates the Universe's list of peers (contained in the PeerPicker).
// Each Peer's URI value should be a valid base URL, while the ID may be anything that's unique,
// for example "example.net:8000".
// If Set and SetPeers are mixed, the ID and URI fields must match.
func (universe *Universe) SetPeers(peers ...Peer) error {
return universe.peerPicker.set(peers...)
}

// AddPeer updates the Universe's list of peers to include the passed peer (contained in the PeerPicker).
// The Peer's URI value should be a valid base URL, while the ID may be anything that's unique,
// for example "example.net:8000".
// If Set and AddPeer are mixed, the ID and URI fields must match.
func (universe *Universe) AddPeer(peer Peer) error {
return universe.peerPicker.add(peer)
}

// SetIncludeSelf toggles the inclusion of the "self ID" for the universe in the PeerPicker's hash-ring
func (universe *Universe) SetIncludeSelf(incSelf bool) {
universe.peerPicker.setIncludeSelf(incSelf)
}

// RemovePeers updates the Universe's list of peers to remove the passed peers IDs (contained in the PeerPicker).
// The arguments should match the ID field on SetPeers and AddPeers calls and the URLs passed to Set.
// unrecognized IDs are ignored
func (universe *Universe) RemovePeers(ids ...string) error {
return universe.peerPicker.remove(ids...)
}

// Shutdown closes all open fetcher connections
func (universe *Universe) Shutdown() error {
return universe.peerPicker.shutdown()
Expand Down
11 changes: 11 additions & 0 deletions galaxycache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,13 @@ func TestCacheEviction(t *testing.T) {
// Testing types to use in TestPeers
type TestProtocol struct {
TestFetchers map[string]*TestFetcher
dialFails map[string]struct{}
mu sync.Mutex
}
type TestFetcher struct {
hits int
fail bool
uri string
}

func (fetcher *TestFetcher) Close() error {
Expand All @@ -191,6 +194,14 @@ func (proto *TestProtocol) NewFetcher(url string) (RemoteFetcher, error) {
newTestFetcher := &TestFetcher{
hits: 0,
fail: false,
uri: url,
}
proto.mu.Lock()
defer proto.mu.Unlock()
if proto.dialFails != nil {
if _, fail := proto.dialFails[url]; fail {
return nil, errors.New("failing due to predetermined error")
}
}
proto.TestFetchers[url] = newTestFetcher
return newTestFetcher, nil
Expand Down
90 changes: 90 additions & 0 deletions peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,69 @@ func (pp *PeerPicker) set(peers ...Peer) error {
}
}

func (pp *PeerPicker) checkPeerPresence(peer Peer) bool {
pp.mu.RLock()
defer pp.mu.RUnlock()
_, ok := pp.fetchers[peer.ID]
return ok
}

// returns whether the fetcher was inserted.
// if false is returned, the caller should close the fetcher.
func (pp *PeerPicker) insertPeer(peer Peer, fetcher RemoteFetcher) bool {
pp.mu.Lock()
defer pp.mu.Unlock()
_, ok := pp.fetchers[peer.ID]
if ok {
return false
}

pp.fetchers[peer.ID] = fetcher
// No need to initialize a new peer hashring, we're only adding peers.
pp.peerIDs.Add(peer.ID)
return true
}

func (pp *PeerPicker) add(peer Peer) error {
// Do a quick check to see if this peer is already there before we acquire the heavy write-lock
if pp.checkPeerPresence(peer) {
return nil
}

newFetcher, err := pp.fetchingProtocol.NewFetcher(peer.URI)
if err != nil {
return fmt.Errorf("fetcher init failed for %s (at %s): %w", peer.ID, peer.URI, err)
}

if !pp.insertPeer(peer, newFetcher) {
// Something else raced and already added this fetcher
// close it
newFetcher.Close()
}
return nil
}

func (pp *PeerPicker) removePeers(peerIDs ...string) []RemoteFetcher {
pp.mu.Lock()
defer pp.mu.Unlock()
return pp.removePeersLocked(peerIDs...)

}

func (pp *PeerPicker) removePeersLocked(peerIDs ...string) []RemoteFetcher {
out := make([]RemoteFetcher, 0, len(peerIDs))
for _, peerID := range peerIDs {
f, ok := pp.fetchers[peerID]
if ok {
out = append(out, f)
}
delete(pp.fetchers, peerID)
}

pp.regenerateHashringLocked()
return out
}

func (pp *PeerPicker) regenerateHashringLocked() {
selfAdj := 0
if pp.includeSelf {
Expand Down Expand Up @@ -305,6 +368,33 @@ func (pp *PeerPicker) setIncludeSelf(inc bool) {
pp.regenerateHashringLocked()
}

func (pp *PeerPicker) remove(ids ...string) error {
toClose := pp.removePeers(ids...)

// if there's 0 or 1 to close, just iterate.
// if there are more, we'll spin up goroutines and use an errgroup
if len(toClose) < 2 {
for _, f := range toClose {
if closeErr := f.Close(); closeErr != nil {
return closeErr
}
}
return nil
}
eg := errgroup.Group{}
for _, f := range toClose {
f := f
eg.Go(func() error {
if closeErr := f.Close(); closeErr != nil {
return fmt.Errorf("failed to close RemoteFetcher: %w", closeErr)
}
return nil
})
}

return eg.Wait()
}

func (pp *PeerPicker) shutdown() error {
pp.setIncludeSelf(false)
// Clear out all the existing peers
Expand Down
Loading

0 comments on commit ff9c76b

Please sign in to comment.