From ff9c76b3965c7ccc0a17a48ff4da6d1e10b95bdb Mon Sep 17 00:00:00 2001 From: David Finkel Date: Wed, 26 Oct 2022 09:20:25 -0400 Subject: [PATCH] PeerPicker: methods for incremental peer changes Add methods that are a bit more friendly to work with. --- galaxycache.go | 32 +++++ galaxycache_test.go | 11 ++ peers.go | 90 ++++++++++++++ peers_test.go | 281 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 414 insertions(+) create mode 100644 peers_test.go diff --git a/galaxycache.go b/galaxycache.go index e9b8f1c1..0b0912a9 100644 --- a/galaxycache.go +++ b/galaxycache.go @@ -107,6 +107,8 @@ func NewUniverse(protocol FetchProtocol, selfID string, opts ...UniverseOpt) *Un peerPicker: newPeerPicker(protocol, selfID, options.hashOpts), recorder: options.recorder, } + // Insert the Self-ID into the hash-ring + c.peerPicker.set(Peer{ID: selfID, URI: ""}) return c } @@ -196,10 +198,40 @@ func (universe *Universe) GetGalaxy(name string) *Galaxy { // Set updates the Universe's list of peers (contained in the PeerPicker). // Each PeerURL value should be a valid base URL, // for example "example.net:8000". +// This is a compatibility wrapper around SetPeers which sets the ID and URI +// equal. func (universe *Universe) Set(peerURLs ...string) error { return universe.peerPicker.setURLs(peerURLs...) } +// SetPeers updates the Universe's list of peers (contained in the PeerPicker). +// Each Peer's URI value should be a valid base URL, while the ID may be anything that's unique, +// for example "example.net:8000". +// If Set and SetPeers are mixed, the ID and URI fields must match. +func (universe *Universe) SetPeers(peers ...Peer) error { + return universe.peerPicker.set(peers...) +} + +// AddPeer updates the Universe's list of peers to include the passed peer (contained in the PeerPicker). +// The Peer's URI value should be a valid base URL, while the ID may be anything that's unique, +// for example "example.net:8000". +// If Set and AddPeer are mixed, the ID and URI fields must match. +func (universe *Universe) AddPeer(peer Peer) error { + return universe.peerPicker.add(peer) +} + +// SetIncludeSelf toggles the inclusion of the "self ID" for the universe in the PeerPicker's hash-ring +func (universe *Universe) SetIncludeSelf(incSelf bool) { + universe.peerPicker.setIncludeSelf(incSelf) +} + +// RemovePeers updates the Universe's list of peers to remove the passed peers IDs (contained in the PeerPicker). +// The arguments should match the ID field on SetPeers and AddPeers calls and the URLs passed to Set. +// unrecognized IDs are ignored +func (universe *Universe) RemovePeers(ids ...string) error { + return universe.peerPicker.remove(ids...) +} + // Shutdown closes all open fetcher connections func (universe *Universe) Shutdown() error { return universe.peerPicker.shutdown() diff --git a/galaxycache_test.go b/galaxycache_test.go index 675e6961..bd258102 100644 --- a/galaxycache_test.go +++ b/galaxycache_test.go @@ -169,10 +169,13 @@ func TestCacheEviction(t *testing.T) { // Testing types to use in TestPeers type TestProtocol struct { TestFetchers map[string]*TestFetcher + dialFails map[string]struct{} + mu sync.Mutex } type TestFetcher struct { hits int fail bool + uri string } func (fetcher *TestFetcher) Close() error { @@ -191,6 +194,14 @@ func (proto *TestProtocol) NewFetcher(url string) (RemoteFetcher, error) { newTestFetcher := &TestFetcher{ hits: 0, fail: false, + uri: url, + } + proto.mu.Lock() + defer proto.mu.Unlock() + if proto.dialFails != nil { + if _, fail := proto.dialFails[url]; fail { + return nil, errors.New("failing due to predetermined error") + } } proto.TestFetchers[url] = newTestFetcher return newTestFetcher, nil diff --git a/peers.go b/peers.go index 8304c2d9..782f0b6e 100644 --- a/peers.go +++ b/peers.go @@ -278,6 +278,69 @@ func (pp *PeerPicker) set(peers ...Peer) error { } } +func (pp *PeerPicker) checkPeerPresence(peer Peer) bool { + pp.mu.RLock() + defer pp.mu.RUnlock() + _, ok := pp.fetchers[peer.ID] + return ok +} + +// returns whether the fetcher was inserted. +// if false is returned, the caller should close the fetcher. +func (pp *PeerPicker) insertPeer(peer Peer, fetcher RemoteFetcher) bool { + pp.mu.Lock() + defer pp.mu.Unlock() + _, ok := pp.fetchers[peer.ID] + if ok { + return false + } + + pp.fetchers[peer.ID] = fetcher + // No need to initialize a new peer hashring, we're only adding peers. + pp.peerIDs.Add(peer.ID) + return true +} + +func (pp *PeerPicker) add(peer Peer) error { + // Do a quick check to see if this peer is already there before we acquire the heavy write-lock + if pp.checkPeerPresence(peer) { + return nil + } + + newFetcher, err := pp.fetchingProtocol.NewFetcher(peer.URI) + if err != nil { + return fmt.Errorf("fetcher init failed for %s (at %s): %w", peer.ID, peer.URI, err) + } + + if !pp.insertPeer(peer, newFetcher) { + // Something else raced and already added this fetcher + // close it + newFetcher.Close() + } + return nil +} + +func (pp *PeerPicker) removePeers(peerIDs ...string) []RemoteFetcher { + pp.mu.Lock() + defer pp.mu.Unlock() + return pp.removePeersLocked(peerIDs...) + +} + +func (pp *PeerPicker) removePeersLocked(peerIDs ...string) []RemoteFetcher { + out := make([]RemoteFetcher, 0, len(peerIDs)) + for _, peerID := range peerIDs { + f, ok := pp.fetchers[peerID] + if ok { + out = append(out, f) + } + delete(pp.fetchers, peerID) + } + + pp.regenerateHashringLocked() + return out +} + func (pp *PeerPicker) regenerateHashringLocked() { selfAdj := 0 if pp.includeSelf { @@ -305,6 +368,33 @@ func (pp *PeerPicker) setIncludeSelf(inc bool) { pp.regenerateHashringLocked() } +func (pp *PeerPicker) remove(ids ...string) error { + toClose := pp.removePeers(ids...) + + // if there's 0 or 1 to close, just iterate. + // if there are more, we'll spin up goroutines and use an errgroup + if len(toClose) < 2 { + for _, f := range toClose { + if closeErr := f.Close(); closeErr != nil { + return closeErr + } + } + return nil + } + eg := errgroup.Group{} + for _, f := range toClose { + f := f + eg.Go(func() error { + if closeErr := f.Close(); closeErr != nil { + return fmt.Errorf("failed to close RemoteFetcher: %w", closeErr) + } + return nil + }) + } + + return eg.Wait() +} + func (pp *PeerPicker) shutdown() error { pp.setIncludeSelf(false) // Clear out all the existing peers diff --git a/peers_test.go b/peers_test.go new file mode 100644 index 00000000..5c709a30 --- /dev/null +++ b/peers_test.go @@ -0,0 +1,281 @@ +package galaxycache + +import ( + "sync" + "testing" +) + +// TestPeers tests to ensure that an instance with given hash +// function results in the expected number of gets both locally and into each other peer +func TestPeersIncremental(t *testing.T) { + + hashOpts := &HashOptions{ + Replicas: 2, + HashFn: nil, + } + + type addRemoveStep struct { + add []Peer + remove []string + expectedPeers []Peer // don't include self (covered by includeSelf) + parallel bool // step should be split up and run in parallel + expectFailAdd bool + expectFailRm bool + includeSelf bool + setIncSelf bool + } + + const selfID = "selfImpossibleFetcher" + testCases := []struct { + name string + initFunc func(testProtocol *TestProtocol) + cacheSize int64 + includeSelf bool + steps []addRemoveStep + }{ + { + name: "base_add_remove_serial", + initFunc: func(*TestProtocol) {}, + cacheSize: 1 << 20, + steps: []addRemoveStep{ + { + add: []Peer{{ID: "fizzlebat3", URI: "fizzleboot3"}, {ID: "fizzlebat4", URI: "fizzleboot4"}}, + remove: []string{}, + expectedPeers: []Peer{{ID: "fizzlebat3", URI: "fizzleboot3"}, {ID: "fizzlebat4", URI: "fizzleboot4"}}, + parallel: false, + expectFailAdd: false, + expectFailRm: false, + includeSelf: true, // include self, but don't alter the default + setIncSelf: false, + }, + { + add: []Peer{}, + remove: []string{"fizzlebat3", "fizzleboat3"}, // remove a name that doesn't exist + expectedPeers: []Peer{{ID: "fizzlebat4", URI: "fizzleboot4"}}, + parallel: false, + expectFailAdd: false, + expectFailRm: false, + includeSelf: true, // include self, but don't alter the default + setIncSelf: false, + }, + { + add: []Peer{}, + remove: []string{}, // remove a name that doesn't exist + expectedPeers: []Peer{{ID: "fizzlebat4", URI: "fizzleboot4"}}, + parallel: false, + expectFailAdd: false, + expectFailRm: false, + includeSelf: true, + setIncSelf: true, + }, + { + add: []Peer{}, + remove: []string{}, // remove a name that doesn't exist + expectedPeers: []Peer{{ID: "fizzlebat4", URI: "fizzleboot4"}}, + parallel: false, + expectFailAdd: false, + expectFailRm: false, + includeSelf: false, + setIncSelf: true, + }, + { + add: []Peer{}, + remove: []string{}, // remove a name that doesn't exist + expectedPeers: []Peer{{ID: "fizzlebat4", URI: "fizzleboot4"}}, + parallel: false, + expectFailAdd: false, + expectFailRm: false, + includeSelf: true, + setIncSelf: true, + }, + }, + }, + { + name: "base_add_parallel", + initFunc: func(*TestProtocol) {}, + cacheSize: 1 << 20, + steps: []addRemoveStep{ + { + add: []Peer{{ID: "fizzlebat3", URI: "fizzleboot3"}, {ID: "fizzlebat4", URI: "fizzleboot4"}, {ID: "fizzlebat5", URI: "fizzleboot5"}}, + remove: []string{}, + expectedPeers: []Peer{{ID: "fizzlebat3", URI: "fizzleboot3"}, {ID: "fizzlebat4", URI: "fizzleboot4"}, {ID: "fizzlebat5", URI: "fizzleboot5"}}, + parallel: true, + expectFailAdd: false, + expectFailRm: false, + includeSelf: true, // include self, but don't alter the default + setIncSelf: false, + }, + { + add: []Peer{}, + remove: []string{"fizzlebat3", "fizzleboat3"}, // remove a name that doesn't exist + expectedPeers: []Peer{{ID: "fizzlebat4", URI: "fizzleboot4"}, {ID: "fizzlebat5", URI: "fizzleboot5"}}, + parallel: false, + expectFailAdd: false, + expectFailRm: false, + includeSelf: true, // include self, but don't alter the default + setIncSelf: false, + }, + }, + }, + { + name: "one_peer_down", + cacheSize: 1 << 20, + initFunc: func(proto *TestProtocol) { + proto.dialFails = map[string]struct{}{"fizzleboot3": {}} + }, + steps: []addRemoveStep{ + { + add: []Peer{{ID: "fizzlebat3", URI: "fizzleboot3"}}, + remove: []string{}, + expectedPeers: []Peer{}, + parallel: false, + expectFailAdd: true, + expectFailRm: false, + includeSelf: true, // include self, but don't alter the default + setIncSelf: false, + }, + { + add: []Peer{{ID: "fizzlebat4", URI: "fizzleboot4"}}, + remove: []string{}, + expectedPeers: []Peer{{ID: "fizzlebat4", URI: "fizzleboot4"}}, + parallel: false, + expectFailAdd: false, + expectFailRm: false, + includeSelf: true, // include self, but don't alter the default + setIncSelf: false, + }, + { + add: []Peer{}, + remove: []string{"fizzlebat3", "fizzleboat3"}, // remove a name that doesn't exist + expectedPeers: []Peer{{ID: "fizzlebat4", URI: "fizzleboot4"}}, + parallel: false, + expectFailAdd: false, + expectFailRm: false, + includeSelf: true, // include self, but don't alter the default + setIncSelf: false, + }, + }, + }, + } + + for _, itbl := range testCases { + tbl := itbl + + t.Run(tbl.name, func(t *testing.T) { + t.Parallel() + // instantiate test fetchers with the test protocol + testproto := TestProtocol{ + TestFetchers: make(map[string]*TestFetcher), + } + + checkErr := func(expErr bool, addErr error, opName string, stepIdx, opIdx int) { + t.Helper() + if expErr { + if addErr == nil { + t.Errorf("error expected at step %d (%dth %s) (got nil)", + stepIdx, opIdx, opName) + } + } else if addErr != nil { + t.Errorf("error %sing peer at step %d (%dth %s): %s", + opName, stepIdx, stepIdx, opName, addErr) + } + } + + u := NewUniverseWithOpts(&testproto, selfID, hashOpts) + + tbl.initFunc(&testproto) + + for si, step := range tbl.steps { + if step.setIncSelf { + u.SetIncludeSelf(step.includeSelf) + } + if !step.parallel { + for z, p := range step.add { + addErr := u.AddPeer(p) + // for now; assume that all adds for the step will fail if any of them + // will + checkErr(step.expectFailAdd, addErr, "add", z, si) + } + if len(step.remove) > 0 { + removeErr := u.RemovePeers(step.remove...) + checkErr(step.expectFailRm, removeErr, "remove", 0, si) + } + } else { + // unbuffered channel that we'll close after all goroutines are spun up to + // ensure they all run at roughly the same time + gate := make(chan struct{}) + wg := sync.WaitGroup{} + for iz, ip := range step.add { + wg.Add(1) + go func(i int, peer Peer) { + defer wg.Done() + <-gate + addErr := u.AddPeer(peer) + // for now; assume that all parallel adds for the step will fail + // if any of them will + checkErr(step.expectFailAdd, addErr, "add", i, si) + }(iz, ip) + } + for iz, ip := range step.remove { + wg.Add(1) + go func(i int, peer string) { + defer wg.Done() + <-gate + addErr := u.RemovePeers(peer) + // for now; assume that all parallel adds for the step will fail + // if any of them will + checkErr(step.expectFailRm, addErr, "remove", i, si) + }(iz, ip) + } + close(gate) + wg.Wait() + } + + allPeersSlice := u.peerPicker.peerIDs.GetReplicated("a", 10) + allPeers := make(map[string]struct{}, len(allPeersSlice)) + for _, pn := range allPeersSlice { + allPeers[pn] = struct{}{} + } + + fetcherNames := make(map[string]struct{}, len(u.peerPicker.fetchers)) + fetcherURIs := make(map[string]struct{}, len(u.peerPicker.fetchers)) + for fn, f := range u.peerPicker.fetchers { + fetcherNames[fn] = struct{}{} + fetcherURIs[f.(*TestFetcher).uri] = struct{}{} + } + + for _, expPeer := range step.expectedPeers { + if _, ok := allPeers[expPeer.ID]; !ok { + t.Errorf("missing peer %q from hashring at step %d", expPeer.ID, si) + } + delete(allPeers, expPeer.ID) + if _, ok := fetcherNames[expPeer.ID]; !ok { + t.Errorf("missing peer %q from fetchers map keys at step %d", expPeer.ID, si) + } + delete(fetcherNames, expPeer.ID) + if _, ok := fetcherURIs[expPeer.URI]; !ok { + t.Errorf("missing peer %q (URI %q) from fetchers values at step %d", + expPeer.ID, expPeer.URI, si) + } + delete(fetcherURIs, expPeer.URI) + } + if step.includeSelf { + if _, ok := allPeers[selfID]; !ok { + t.Errorf("missing self entry in hashring at step %d", si) + } + delete(allPeers, selfID) + } + if len(allPeers) > 0 { + t.Errorf("unexpected peer(s) in hashring at step %d: %v", si, allPeers) + } + if len(fetcherNames) > 0 { + t.Errorf("unexpected peer(s) in fetcher-map at step %d: %v", si, fetcherNames) + } + if len(fetcherURIs) > 0 { + t.Errorf("unexpected peer(s)' URI(s) in fetcher-map at step %d: %v", si, fetcherURIs) + } + } + }) + } + +}