From 60c8d79851a9834f465a8542db03260ffce8d45f Mon Sep 17 00:00:00 2001 From: Sergio Salvatore Date: Tue, 19 Jan 2021 19:31:09 -0500 Subject: [PATCH] Allow Recording Stats to a Specific stats.Recorder It can be useful to be able to record to specific recorder, so make that an option. This also deprecates the NewUniverseWithOpts constructor in favor of the NewUniverse constructor which is now variadic. --- galaxycache.go | 100 +++++++++++++++++++++++++++++++------------- galaxycache_test.go | 34 +++++++++++++++ http/http.go | 21 ++++++++-- 3 files changed, 123 insertions(+), 32 deletions(-) diff --git a/galaxycache.go b/galaxycache.go index 2cfa5872..626db13d 100644 --- a/galaxycache.go +++ b/galaxycache.go @@ -62,32 +62,63 @@ func (f GetterFunc) Get(ctx context.Context, key string, dest Codec) error { return f(ctx, key, dest) } +type universeOpts struct { + hashOpts *HashOptions + recorder stats.Recorder +} + +// UniverseOpt is a functional Universe option. +type UniverseOpt func(*universeOpts) + +// WithHashOpts sets the HashOptions on a universe. +func WithHashOpts(hashOpts *HashOptions) UniverseOpt { + return func(u *universeOpts) { + u.hashOpts = hashOpts + } +} + +// WithRecorder allows you to override the default stats.Recorder used for +// stats. +func WithRecorder(recorder stats.Recorder) UniverseOpt { + return func(u *universeOpts) { + u.recorder = recorder + } +} + // Universe defines the primary container for all galaxycache operations. // It contains the galaxies and PeerPicker type Universe struct { mu sync.RWMutex galaxies map[string]*Galaxy // galaxies are indexed by their name peerPicker *PeerPicker + recorder stats.Recorder } -// NewUniverse is the default constructor for the Universe object. -// It is passed a FetchProtocol (to specify fetching via GRPC or HTTP) -// and its own URL -func NewUniverse(protocol FetchProtocol, selfURL string) *Universe { - return NewUniverseWithOpts(protocol, selfURL, nil) -} +// NewUniverse is the main constructor for the Universe object. It is passed a +// FetchProtocol (to specify fetching via GRPC or HTTP) and its own URL along +// with options. +func NewUniverse(protocol FetchProtocol, selfURL string, opts ...UniverseOpt) *Universe { + options := &universeOpts{} + for _, opt := range opts { + opt(options) + } -// NewUniverseWithOpts is the optional constructor for the Universe -// object that defines a non-default hash function and number of replicas -func NewUniverseWithOpts(protocol FetchProtocol, selfURL string, options *HashOptions) *Universe { c := &Universe{ galaxies: make(map[string]*Galaxy), - peerPicker: newPeerPicker(protocol, selfURL, options), + peerPicker: newPeerPicker(protocol, selfURL, options.hashOpts), + recorder: options.recorder, } return c } +// NewUniverseWithOpts is a deprecated constructor for the Universe object that +// defines a non-default hash function and number of replicas. Please use +// `NewUniverse` with the `WithHashOpts` option instead. +func NewUniverseWithOpts(protocol FetchProtocol, selfURL string, options *HashOptions) *Universe { + return NewUniverse(protocol, selfURL, WithHashOpts(options)) +} + // NewGalaxy creates a coordinated galaxy-aware BackendGetter from a // BackendGetter. // @@ -150,6 +181,7 @@ func (universe *Universe) NewGalaxy(name string, cacheBytes int64, getter Backen g.candidateCache.lru.OnEvicted = func(key lru.Key, value interface{}) { g.candidateCache.nevict++ } + g.parent = universe universe.galaxies[name] = g return g @@ -228,6 +260,9 @@ type Galaxy struct { // Stats are statistics on the galaxy. Stats GalaxyStats + + // pointer to the parent universe that created this galaxy + parent *Universe } // GalaxyOption is an interface for implementing functional galaxy options @@ -349,16 +384,16 @@ func (g *Galaxy) recordRequest(ctx context.Context, h hitLevel, localAuthoritati switch h { case hitMaincache: g.Stats.MaincacheHits.Add(1) - stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(CacheLevelKey, h.String())}, MCacheHits.M(1)) + g.recordStats(ctx, []tag.Mutator{tag.Upsert(CacheLevelKey, h.String())}, MCacheHits.M(1)) case hitHotcache: g.Stats.HotcacheHits.Add(1) - stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(CacheLevelKey, h.String())}, MCacheHits.M(1)) + g.recordStats(ctx, []tag.Mutator{tag.Upsert(CacheLevelKey, h.String())}, MCacheHits.M(1)) case hitPeer: g.Stats.PeerLoads.Add(1) - stats.Record(ctx, MPeerLoads.M(1)) + g.recordStats(ctx, nil, MPeerLoads.M(1)) case hitBackend: g.Stats.BackendLoads.Add(1) - stats.Record(ctx, MBackendLoads.M(1)) + g.recordStats(ctx, nil, MBackendLoads.M(1)) if !localAuthoritative { span.Annotate(nil, "failed to fetch from peer, not authoritative for key") } @@ -383,24 +418,24 @@ func (g *Galaxy) Get(ctx context.Context, key string, dest Codec) error { ctx, span := trace.StartSpan(ctx, "galaxycache.(*Galaxy).Get on "+g.name) startTime := time.Now() defer func() { - stats.Record(ctx, MRoundtripLatencyMilliseconds.M(sinceInMilliseconds(startTime))) + g.recordStats(ctx, nil, MRoundtripLatencyMilliseconds.M(sinceInMilliseconds(startTime))) span.End() }() g.Stats.Gets.Add(1) - stats.Record(ctx, MGets.M(1)) + g.recordStats(ctx, nil, MGets.M(1)) if dest == nil { span.SetStatus(trace.Status{Code: trace.StatusCodeInvalidArgument, Message: "no Codec was provided"}) return errors.New("galaxycache: no Codec was provided") } value, hlvl := g.lookupCache(key) - stats.Record(ctx, MKeyLength.M(int64(len(key)))) + g.recordStats(ctx, nil, MKeyLength.M(int64(len(key)))) if hlvl.isHit() { span.Annotatef([]trace.Attribute{trace.BoolAttribute("cache_hit", true)}, "Cache hit in %s", hlvl) value.stats.touch() g.recordRequest(ctx, hlvl, false) - stats.Record(ctx, MValueLength.M(int64(len(value.data)))) + g.recordStats(ctx, nil, MValueLength.M(int64(len(value.data)))) return dest.UnmarshalBinary(value.data) } @@ -413,11 +448,11 @@ func (g *Galaxy) Get(ctx context.Context, key string, dest Codec) error { value, destPopulated, err := g.load(ctx, key, dest) if err != nil { span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: "Failed to load key: " + err.Error()}) - stats.Record(ctx, MLoadErrors.M(1)) + g.recordStats(ctx, nil, MLoadErrors.M(1)) return err } value.stats.touch() - stats.Record(ctx, MValueLength.M(int64(len(value.data)))) + g.recordStats(ctx, nil, MValueLength.M(int64(len(value.data)))) if destPopulated { return nil } @@ -435,7 +470,7 @@ type valWithLevel struct { // load loads key either by invoking the getter locally or by sending it to another machine. func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWithStat, destPopulated bool, err error) { g.Stats.Loads.Add(1) - stats.Record(ctx, MLoads.M(1)) + g.recordStats(ctx, nil, MLoads.M(1)) viewi, err := g.loadGroup.Do(key, func() (interface{}, error) { // Check the cache again because singleflight can only dedup calls @@ -465,12 +500,12 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi } else { g.Stats.CoalescedMaincacheHits.Add(1) } - stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(CacheLevelKey, hlvl.String())}, MCoalescedCacheHits.M(1)) + g.recordStats(ctx, []tag.Mutator{tag.Insert(CacheLevelKey, hlvl.String())}, MCoalescedCacheHits.M(1)) return &valWithLevel{value, hlvl, false, nil, nil}, nil } g.Stats.CoalescedLoads.Add(1) - stats.Record(ctx, MCoalescedLoads.M(1)) + g.recordStats(ctx, nil, MCoalescedLoads.M(1)) authoritative := true var peerErr error @@ -479,12 +514,12 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi authoritative = false if peerErr == nil { g.Stats.CoalescedPeerLoads.Add(1) - stats.Record(ctx, MCoalescedPeerLoads.M(1)) + g.recordStats(ctx, nil, MCoalescedPeerLoads.M(1)) return &valWithLevel{value, hitPeer, false, nil, nil}, nil } g.Stats.PeerLoadErrors.Add(1) - stats.Record(ctx, MPeerLoadErrors.M(1)) + g.recordStats(ctx, nil, MPeerLoadErrors.M(1)) // TODO(bradfitz): log the peer's error? keep // log of the past few for /galaxycache? It's // probably boring (normal task movement), so not @@ -493,12 +528,12 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi data, err := g.getLocally(ctx, key, dest) if err != nil { g.Stats.BackendLoadErrors.Add(1) - stats.Record(ctx, MBackendLoadErrors.M(1)) + g.recordStats(ctx, nil, MBackendLoadErrors.M(1)) return nil, err } g.Stats.CoalescedBackendLoads.Add(1) - stats.Record(ctx, MCoalescedBackendLoads.M(1)) + g.recordStats(ctx, nil, MCoalescedBackendLoads.M(1)) destPopulated = true // only one caller of load gets this return value value = newValWithStat(data, nil) g.populateCache(ctx, key, value, &g.mainCache) @@ -568,7 +603,7 @@ func (g *Galaxy) populateCache(ctx context.Context, key string, value *valWithSt cache.add(key, value) // Record the size of this cache after we've finished evicting any necessary values. defer func() { - stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(CacheTypeKey, cache.ctype.String())}, + g.recordStats(ctx, []tag.Mutator{tag.Upsert(CacheTypeKey, cache.ctype.String())}, MCacheSize.M(cache.bytes()), MCacheEntries.M(cache.items())) }() @@ -591,6 +626,15 @@ func (g *Galaxy) populateCache(ctx context.Context, key string, value *valWithSt } } +func (g *Galaxy) recordStats(ctx context.Context, mutators []tag.Mutator, measurements ...stats.Measurement) { + stats.RecordWithOptions( + ctx, + stats.WithMeasurements(measurements...), + stats.WithTags(mutators...), + stats.WithRecorder(g.parent.recorder), + ) +} + // CacheType represents a type of cache. type CacheType uint8 diff --git a/galaxycache_test.go b/galaxycache_test.go index 6017191a..fac06279 100644 --- a/galaxycache_test.go +++ b/galaxycache_test.go @@ -32,6 +32,8 @@ import ( "unsafe" "github.com/vimeo/galaxycache/promoter" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" ) const ( @@ -579,3 +581,35 @@ func TestPromotion(t *testing.T) { } } + +func TestRecorder(t *testing.T) { + meter := view.NewMeter() + meter.Start() + defer meter.Stop() + testView := &view.View{ + Measure: MGets, + TagKeys: []tag.Key{GalaxyKey}, + Aggregation: view.Count(), + } + meter.Register(testView) + + getter := func(_ context.Context, key string, dest Codec) error { + return dest.UnmarshalBinary([]byte("got:" + key)) + } + u := NewUniverse(&TestProtocol{}, "test-universe", WithRecorder(meter)) + g := u.NewGalaxy("test", 1024, GetterFunc(getter)) + var s StringCodec + err := g.Get(context.Background(), "foo", &s) + if err != nil { + t.Fatalf("error getting foo: %s", err) + } + + rows, retErr := meter.RetrieveData(testView.Name) + if retErr != nil { + t.Fatalf("error getting data from view: %s", retErr) + } + + if len(rows) != 1 { + t.Errorf("expected 1 row, got %d", len(rows)) + } +} diff --git a/http/http.go b/http/http.go index 98d874ee..928dad0d 100644 --- a/http/http.go +++ b/http/http.go @@ -42,11 +42,13 @@ type HTTPFetchProtocol struct { basePath string } -// HTTPOptions specifies a base path for serving and fetching. -// *ONLY SPECIFY IF NOT USING THE DEFAULT "/_galaxycache/" BASE PATH*. +// HTTPOptions can specify the transport, base path, and stats.Recorder for +// serving and fetching. *ONLY SPECIFY IF NOT USING THE DEFAULT "/_galaxycache/" +// BASE PATH*. type HTTPOptions struct { Transport http.RoundTripper BasePath string + Recorder stats.Recorder } // NewHTTPFetchProtocol creates an HTTP fetch protocol to be passed @@ -90,6 +92,7 @@ func (hp *HTTPFetchProtocol) NewFetcher(url string) (gc.RemoteFetcher, error) { type HTTPHandler struct { universe *gc.Universe basePath string + recorder stats.Recorder } // RegisterHTTPHandler sets up an HTTPHandler with a user specified path @@ -103,10 +106,16 @@ type HTTPHandler struct { // if specifying a serveMux. func RegisterHTTPHandler(universe *gc.Universe, opts *HTTPOptions, serveMux *http.ServeMux) { basePath := defaultBasePath + var recorder stats.Recorder if opts != nil { basePath = opts.BasePath + recorder = opts.Recorder + } + newHTTPHandler := &HTTPHandler{ + basePath: basePath, + universe: universe, + recorder: recorder, } - newHTTPHandler := &HTTPHandler{basePath: basePath, universe: universe} if serveMux == nil { http.Handle(basePath, &ochttp.Handler{ Handler: ochttp.WithRouteTag(newHTTPHandler, basePath), @@ -160,7 +169,11 @@ func (h *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // TODO: remove galaxy.Stats from here galaxy.Stats.ServerRequests.Add(1) - stats.Record(ctx, gc.MServerRequests.M(1)) + stats.RecordWithOptions( + ctx, + stats.WithMeasurements(gc.MServerRequests.M(1)), + stats.WithRecorder(h.recorder), + ) var value gc.ByteCodec err := galaxy.Get(ctx, key, &value) if err != nil {