diff --git a/galaxycache.go b/galaxycache.go index a81460c5..e9b8f1c1 100644 --- a/galaxycache.go +++ b/galaxycache.go @@ -96,7 +96,7 @@ type Universe struct { // NewUniverse is the main constructor for the Universe object. It is passed a // FetchProtocol (to specify fetching via GRPC or HTTP) and its own URL along // with options. -func NewUniverse(protocol FetchProtocol, selfURL string, opts ...UniverseOpt) *Universe { +func NewUniverse(protocol FetchProtocol, selfID string, opts ...UniverseOpt) *Universe { options := &universeOpts{} for _, opt := range opts { opt(options) @@ -104,7 +104,7 @@ func NewUniverse(protocol FetchProtocol, selfURL string, opts ...UniverseOpt) *U c := &Universe{ galaxies: make(map[string]*Galaxy), - peerPicker: newPeerPicker(protocol, selfURL, options.hashOpts), + peerPicker: newPeerPicker(protocol, selfID, options.hashOpts), recorder: options.recorder, } @@ -114,8 +114,8 @@ func NewUniverse(protocol FetchProtocol, selfURL string, opts ...UniverseOpt) *U // NewUniverseWithOpts is a deprecated constructor for the Universe object that // defines a non-default hash function and number of replicas. Please use // `NewUniverse` with the `WithHashOpts` option instead. -func NewUniverseWithOpts(protocol FetchProtocol, selfURL string, options *HashOptions) *Universe { - return NewUniverse(protocol, selfURL, WithHashOpts(options)) +func NewUniverseWithOpts(protocol FetchProtocol, selfID string, options *HashOptions) *Universe { + return NewUniverse(protocol, selfID, WithHashOpts(options)) } // NewGalaxy creates a coordinated galaxy-aware BackendGetter from a @@ -197,7 +197,7 @@ func (universe *Universe) GetGalaxy(name string) *Galaxy { // Each PeerURL value should be a valid base URL, // for example "example.net:8000". func (universe *Universe) Set(peerURLs ...string) error { - return universe.peerPicker.set(peerURLs...) + return universe.peerPicker.setURLs(peerURLs...) } // Shutdown closes all open fetcher connections diff --git a/galaxycache_test.go b/galaxycache_test.go index 0445606e..675e6961 100644 --- a/galaxycache_test.go +++ b/galaxycache_test.go @@ -45,7 +45,7 @@ const ( ) func initSetup() (*Universe, context.Context, chan string) { - return NewUniverse(&TestProtocol{}, "test"), context.TODO(), make(chan string) + return NewUniverse(&TestProtocol{TestFetchers: map[string]*TestFetcher{}}, "test"), context.TODO(), make(chan string) } func setupStringGalaxyTest(cacheFills *AtomicInt) (*Galaxy, context.Context, chan string) { @@ -222,13 +222,17 @@ func TestPeers(t *testing.T) { numGets: 200, expectedHits: map[string]int{"fetcher0": 50, "fetcher1": 50, "fetcher2": 50, "fetcher3": 50}, cacheSize: 1 << 20, + initFunc: func(g *Galaxy, fetchers map[string]*TestFetcher) { + fetchers["fetcher0"] = &TestFetcher{} + }, }, { testName: "cached_base", numGets: 200, expectedHits: map[string]int{"fetcher0": 0, "fetcher1": 48, "fetcher2": 47, "fetcher3": 48}, cacheSize: 1 << 20, - initFunc: func(g *Galaxy, _ map[string]*TestFetcher) { + initFunc: func(g *Galaxy, fetchers map[string]*TestFetcher) { + fetchers["fetcher0"] = &TestFetcher{} for i := 0; i < 200; i++ { key := fmt.Sprintf("%d", i) var got StringCodec @@ -246,6 +250,7 @@ func TestPeers(t *testing.T) { expectedHits: map[string]int{"fetcher0": 100, "fetcher1": 50, "fetcher2": 0, "fetcher3": 50}, cacheSize: 1 << 20, initFunc: func(g *Galaxy, fetchers map[string]*TestFetcher) { + fetchers["fetcher0"] = &TestFetcher{} fetchers["fetcher2"].fail = true }, }, @@ -266,7 +271,9 @@ func TestPeers(t *testing.T) { universe := NewUniverseWithOpts(testproto, "fetcher0", hashOpts) dummyCtx := context.TODO() - universe.Set("fetcher0", "fetcher1", "fetcher2", "fetcher3") + if setErr := universe.Set("fetcher0", "fetcher1", "fetcher2", "fetcher3"); setErr != nil { + t.Fatalf("failed to set peers on universe: %s", setErr) + } getter := func(_ context.Context, key string, dest Codec) error { // these are local hits @@ -688,7 +695,7 @@ func TestRecorder(t *testing.T) { getter := func(_ context.Context, key string, dest Codec) error { return dest.UnmarshalBinary([]byte("got:" + key)) } - u := NewUniverse(&TestProtocol{}, "test-universe", WithRecorder(meter)) + u := NewUniverse(&TestProtocol{TestFetchers: map[string]*TestFetcher{}}, "test-universe", WithRecorder(meter)) g := u.NewGalaxy("test", 1024, GetterFunc(getter)) var s StringCodec err := g.Get(context.Background(), "foo", &s) diff --git a/go.mod b/go.mod index 99be0fb1..0fa8fc1e 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/golang/protobuf v1.4.3 github.com/vimeo/go-clocks v1.1.2 go.opencensus.io v0.22.5 + golang.org/x/sync v0.1.0 google.golang.org/grpc v1.35.0 ) diff --git a/go.sum b/go.sum index a4a9f11a..411f592c 100644 --- a/go.sum +++ b/go.sum @@ -57,6 +57,8 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/peers.go b/peers.go index 764f0a51..8304c2d9 100644 --- a/peers.go +++ b/peers.go @@ -29,6 +29,8 @@ import ( "fmt" "sync" + "golang.org/x/sync/errgroup" + "github.com/vimeo/galaxycache/consistenthash" ) @@ -48,9 +50,11 @@ type RemoteFetcher interface { // peers, and a map of RemoteFetchers to those peers type PeerPicker struct { fetchingProtocol FetchProtocol - selfURL string - peers *consistenthash.Map - fetchers map[string]RemoteFetcher + selfID string + includeSelf bool + peerIDs *consistenthash.Map + fetchers map[string]RemoteFetcher // keyed by ID + mapGen peerSetGeneration mu sync.RWMutex opts HashOptions } @@ -68,11 +72,12 @@ type HashOptions struct { } // Creates a peer picker; called when creating a new Universe -func newPeerPicker(proto FetchProtocol, selfURL string, options *HashOptions) *PeerPicker { +func newPeerPicker(proto FetchProtocol, selfID string, options *HashOptions) *PeerPicker { pp := &PeerPicker{ fetchingProtocol: proto, - selfURL: selfURL, + selfID: selfID, fetchers: make(map[string]RemoteFetcher), + includeSelf: true, } if options != nil { pp.opts = *options @@ -80,7 +85,7 @@ func newPeerPicker(proto FetchProtocol, selfURL string, options *HashOptions) *P if pp.opts.Replicas == 0 { pp.opts.Replicas = defaultReplicas } - pp.peers = consistenthash.New(pp.opts.Replicas, pp.opts.HashFn) + pp.peerIDs = consistenthash.New(pp.opts.Replicas, pp.opts.HashFn) return pp } @@ -89,58 +94,221 @@ func newPeerPicker(proto FetchProtocol, selfURL string, options *HashOptions) *P func (pp *PeerPicker) pickPeer(key string) (RemoteFetcher, bool) { pp.mu.Lock() defer pp.mu.Unlock() - if URL := pp.peers.Get(key); URL != "" && URL != pp.selfURL { + if URL := pp.peerIDs.Get(key); URL != "" && URL != pp.selfID { peer, ok := pp.fetchers[URL] return peer, ok } return nil, false } -func (pp *PeerPicker) set(peerURLs ...string) error { - pp.mu.Lock() - defer pp.mu.Unlock() +// setURLs assumes that peerURL == peerID (legacy reasons) +func (pp *PeerPicker) setURLs(peerURLs ...string) error { + peers := make([]Peer, len(peerURLs)) + for i, url := range peerURLs { + peers[i] = Peer{URI: url, ID: url} + } + return pp.set(peers...) +} + +// Peer is an ID and ip:port/url tuple for a specific peer +type Peer struct { + // Unique ID for this peer (e.g. in k8s may be a pod name) + ID string + // URI or URL that the registered PeerFetcher can connect to + // URI should be a valid base URL, + // for example "example.net:8000" or "10.32.54.231:8123". + URI string +} + +type peerSetGeneration uint64 + +type peerSetDiff struct { + added []Peer + removed map[string]struct{} + selfIncluded bool + generation peerSetGeneration +} + +func (pp *PeerPicker) diffAbsolutePeers(peers []Peer) peerSetDiff { + pp.mu.RLock() + defer pp.mu.RUnlock() currFetchers := make(map[string]struct{}) for url := range pp.fetchers { currFetchers[url] = struct{}{} } - for _, url := range peerURLs { + selfIncluded := false + newPeers := make([]Peer, 0, len(peers)) + for _, peer := range peers { + if peer.ID == pp.selfID { + selfIncluded = true + + continue + } // open a new fetcher if there is currently no peer at url - if _, ok := pp.fetchers[url]; !ok { - newFetcher, err := pp.fetchingProtocol.NewFetcher(url) - if err != nil { - return err - } - pp.fetchers[url] = newFetcher + // also skip the self ID + if _, ok := pp.fetchers[peer.ID]; !ok { + newPeers = append(newPeers, peer) + continue } - delete(currFetchers, url) + delete(currFetchers, peer.ID) } - for url := range currFetchers { - err := pp.fetchers[url].Close() - delete(pp.fetchers, url) - if err != nil { - return err + return peerSetDiff{ + added: newPeers, + removed: currFetchers, + selfIncluded: selfIncluded, + generation: pp.mapGen, + } +} + +// if nil, false is returned, there's a version mismatch +// newFetchers should match indices in diff.added +func (pp *PeerPicker) updatePeers(diff peerSetDiff, newFetchers []RemoteFetcher) ([]RemoteFetcher, bool) { + pp.mu.Lock() + defer pp.mu.Unlock() + if diff.generation != pp.mapGen { + return nil, false + } + pp.includeSelf = diff.selfIncluded + // be optimistic: assume that we didn't race with anything. (we can do + // some extra allocations in the uncommon/racy case) + toClose := make([]RemoteFetcher, 0, len(diff.removed)) + for i, fetcher := range newFetchers { + if _, ok := pp.fetchers[diff.added[i].ID]; ok { + toClose = append(toClose, fetcher) + continue } + pp.fetchers[diff.added[i].ID] = fetcher } - pp.peers = consistenthash.New(pp.opts.Replicas, pp.opts.HashFn) - pp.peers.Add(peerURLs...) - return nil + + for remID := range diff.removed { + if fetcher, ok := pp.fetchers[remID]; ok { + toClose = append(toClose, fetcher) + } + delete(pp.fetchers, remID) + } + + pp.regenerateHashringLocked() + + return toClose, true } -func (pp *PeerPicker) shutdown() error { - errs := []error{} - for _, fetcher := range pp.fetchers { - err := fetcher.Close() - if err != nil { - errs = append(errs, err) +func (pp *PeerPicker) set(peers ...Peer) error { + loopPersistentFetchers := map[string]RemoteFetcher{} + defer func() { + for _, f := range loopPersistentFetchers { + f.Close() } + }() + for { + diff := pp.diffAbsolutePeers(peers) + + newfetchers := make([]RemoteFetcher, len(diff.added)) + dialEG := errgroup.Group{} + for i, peerIter := range diff.added { + if f, ok := loopPersistentFetchers[peerIter.ID]; ok { + newfetchers[i] = f + delete(loopPersistentFetchers, peerIter.ID) + continue + } + peer := peerIter + i := i + dialEG.Go(func() error { + newFetcher, err := pp.fetchingProtocol.NewFetcher(peer.URI) + if err != nil { + return err + } + newfetchers[i] = newFetcher + return nil + }) + } + if dialErr := dialEG.Wait(); dialErr != nil { + // NB: as of writing: we shouldn't get here in any real case as + // neither the HTTP nor the gRPC RemoteFetcher implementations + // actually do work when first constructed. + for _, fetcher := range newfetchers { + if fetcher == nil { + continue + } + fetcher.Close() + } + return fmt.Errorf("failed to dial at least one backend: %w", dialErr) + } + + // regenerate the hashring before we try to close any of the fetchers + // so if they fail we don't end up with a hashring that's inconsistent + // with the set of fetchers. + rmFetchers, updated := pp.updatePeers(diff, newfetchers) + if !updated { + // Stash all the fetchers that we've already opened before looping + for i, fetcher := range newfetchers { + loopPersistentFetchers[diff.added[i].ID] = fetcher + } + continue + } + + // if there's 0 or 1 to close, just iterate. + // if there are more, we'll spin up goroutines and use an errgroup + // (more for error-handling than efficiency) + if len(rmFetchers) < 2 { + for _, fetcher := range rmFetchers { + err := fetcher.Close() + if err != nil { + return err + } + } + return nil + } + closeEG := errgroup.Group{} + for _, fetcher := range rmFetchers { + f := fetcher + closeEG.Go(func() error { + if closeErr := f.Close(); closeErr != nil { + return fmt.Errorf("failed to close RemoteFetcher: %w", closeErr) + } + return nil + }) + } + if closeErr := closeEG.Wait(); closeErr != nil { + return fmt.Errorf("failed to close fetcher(s): %w", closeErr) + } + return nil } - if len(errs) > 0 { - return fmt.Errorf("failed to close: %v", errs) +} + +func (pp *PeerPicker) regenerateHashringLocked() { + selfAdj := 0 + if pp.includeSelf { + selfAdj = 1 } - return nil + + newPeerIDs := make([]string, selfAdj, len(pp.fetchers)+selfAdj) + if pp.includeSelf { + newPeerIDs[0] = pp.selfID + } + for id := range pp.fetchers { + newPeerIDs = append(newPeerIDs, id) + } + + // the consistenthash ring doesn't support removals so regenerate! + pp.peerIDs = consistenthash.New(pp.opts.Replicas, pp.opts.HashFn) + pp.peerIDs.Add(newPeerIDs...) + pp.mapGen++ +} + +func (pp *PeerPicker) setIncludeSelf(inc bool) { + pp.mu.Lock() + defer pp.mu.Unlock() + pp.includeSelf = inc + pp.regenerateHashringLocked() +} + +func (pp *PeerPicker) shutdown() error { + pp.setIncludeSelf(false) + // Clear out all the existing peers + return pp.set() } // FetchProtocol defines the chosen fetching protocol to peers (namely