Skip to content

Commit

Permalink
Allow Recording Stats to a Specific stats.Recorder
Browse files Browse the repository at this point in the history
It can be useful to be able to record to specific recorder, so make that
an option.

This also deprecates the NewUniverseWithOpts constructor in favor of the
NewUniverse constructor which is now variadic.
  • Loading branch information
sergiosalvatore committed Jan 20, 2021
1 parent 4e5f50b commit 60c8d79
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 32 deletions.
100 changes: 72 additions & 28 deletions galaxycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,32 +62,63 @@ func (f GetterFunc) Get(ctx context.Context, key string, dest Codec) error {
return f(ctx, key, dest)
}

type universeOpts struct {
hashOpts *HashOptions
recorder stats.Recorder
}

// UniverseOpt is a functional Universe option.
type UniverseOpt func(*universeOpts)

// WithHashOpts sets the HashOptions on a universe.
func WithHashOpts(hashOpts *HashOptions) UniverseOpt {
return func(u *universeOpts) {
u.hashOpts = hashOpts
}
}

// WithRecorder allows you to override the default stats.Recorder used for
// stats.
func WithRecorder(recorder stats.Recorder) UniverseOpt {
return func(u *universeOpts) {
u.recorder = recorder
}
}

// Universe defines the primary container for all galaxycache operations.
// It contains the galaxies and PeerPicker
type Universe struct {
mu sync.RWMutex
galaxies map[string]*Galaxy // galaxies are indexed by their name
peerPicker *PeerPicker
recorder stats.Recorder
}

// NewUniverse is the default constructor for the Universe object.
// It is passed a FetchProtocol (to specify fetching via GRPC or HTTP)
// and its own URL
func NewUniverse(protocol FetchProtocol, selfURL string) *Universe {
return NewUniverseWithOpts(protocol, selfURL, nil)
}
// NewUniverse is the main constructor for the Universe object. It is passed a
// FetchProtocol (to specify fetching via GRPC or HTTP) and its own URL along
// with options.
func NewUniverse(protocol FetchProtocol, selfURL string, opts ...UniverseOpt) *Universe {
options := &universeOpts{}
for _, opt := range opts {
opt(options)
}

// NewUniverseWithOpts is the optional constructor for the Universe
// object that defines a non-default hash function and number of replicas
func NewUniverseWithOpts(protocol FetchProtocol, selfURL string, options *HashOptions) *Universe {
c := &Universe{
galaxies: make(map[string]*Galaxy),
peerPicker: newPeerPicker(protocol, selfURL, options),
peerPicker: newPeerPicker(protocol, selfURL, options.hashOpts),
recorder: options.recorder,
}

return c
}

// NewUniverseWithOpts is a deprecated constructor for the Universe object that
// defines a non-default hash function and number of replicas. Please use
// `NewUniverse` with the `WithHashOpts` option instead.
func NewUniverseWithOpts(protocol FetchProtocol, selfURL string, options *HashOptions) *Universe {
return NewUniverse(protocol, selfURL, WithHashOpts(options))
}

// NewGalaxy creates a coordinated galaxy-aware BackendGetter from a
// BackendGetter.
//
Expand Down Expand Up @@ -150,6 +181,7 @@ func (universe *Universe) NewGalaxy(name string, cacheBytes int64, getter Backen
g.candidateCache.lru.OnEvicted = func(key lru.Key, value interface{}) {
g.candidateCache.nevict++
}
g.parent = universe

universe.galaxies[name] = g
return g
Expand Down Expand Up @@ -228,6 +260,9 @@ type Galaxy struct {

// Stats are statistics on the galaxy.
Stats GalaxyStats

// pointer to the parent universe that created this galaxy
parent *Universe
}

// GalaxyOption is an interface for implementing functional galaxy options
Expand Down Expand Up @@ -349,16 +384,16 @@ func (g *Galaxy) recordRequest(ctx context.Context, h hitLevel, localAuthoritati
switch h {
case hitMaincache:
g.Stats.MaincacheHits.Add(1)
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(CacheLevelKey, h.String())}, MCacheHits.M(1))
g.recordStats(ctx, []tag.Mutator{tag.Upsert(CacheLevelKey, h.String())}, MCacheHits.M(1))
case hitHotcache:
g.Stats.HotcacheHits.Add(1)
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(CacheLevelKey, h.String())}, MCacheHits.M(1))
g.recordStats(ctx, []tag.Mutator{tag.Upsert(CacheLevelKey, h.String())}, MCacheHits.M(1))
case hitPeer:
g.Stats.PeerLoads.Add(1)
stats.Record(ctx, MPeerLoads.M(1))
g.recordStats(ctx, nil, MPeerLoads.M(1))
case hitBackend:
g.Stats.BackendLoads.Add(1)
stats.Record(ctx, MBackendLoads.M(1))
g.recordStats(ctx, nil, MBackendLoads.M(1))
if !localAuthoritative {
span.Annotate(nil, "failed to fetch from peer, not authoritative for key")
}
Expand All @@ -383,24 +418,24 @@ func (g *Galaxy) Get(ctx context.Context, key string, dest Codec) error {
ctx, span := trace.StartSpan(ctx, "galaxycache.(*Galaxy).Get on "+g.name)
startTime := time.Now()
defer func() {
stats.Record(ctx, MRoundtripLatencyMilliseconds.M(sinceInMilliseconds(startTime)))
g.recordStats(ctx, nil, MRoundtripLatencyMilliseconds.M(sinceInMilliseconds(startTime)))
span.End()
}()

g.Stats.Gets.Add(1)
stats.Record(ctx, MGets.M(1))
g.recordStats(ctx, nil, MGets.M(1))
if dest == nil {
span.SetStatus(trace.Status{Code: trace.StatusCodeInvalidArgument, Message: "no Codec was provided"})
return errors.New("galaxycache: no Codec was provided")
}
value, hlvl := g.lookupCache(key)
stats.Record(ctx, MKeyLength.M(int64(len(key))))
g.recordStats(ctx, nil, MKeyLength.M(int64(len(key))))

if hlvl.isHit() {
span.Annotatef([]trace.Attribute{trace.BoolAttribute("cache_hit", true)}, "Cache hit in %s", hlvl)
value.stats.touch()
g.recordRequest(ctx, hlvl, false)
stats.Record(ctx, MValueLength.M(int64(len(value.data))))
g.recordStats(ctx, nil, MValueLength.M(int64(len(value.data))))
return dest.UnmarshalBinary(value.data)
}

Expand All @@ -413,11 +448,11 @@ func (g *Galaxy) Get(ctx context.Context, key string, dest Codec) error {
value, destPopulated, err := g.load(ctx, key, dest)
if err != nil {
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: "Failed to load key: " + err.Error()})
stats.Record(ctx, MLoadErrors.M(1))
g.recordStats(ctx, nil, MLoadErrors.M(1))
return err
}
value.stats.touch()
stats.Record(ctx, MValueLength.M(int64(len(value.data))))
g.recordStats(ctx, nil, MValueLength.M(int64(len(value.data))))
if destPopulated {
return nil
}
Expand All @@ -435,7 +470,7 @@ type valWithLevel struct {
// load loads key either by invoking the getter locally or by sending it to another machine.
func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWithStat, destPopulated bool, err error) {
g.Stats.Loads.Add(1)
stats.Record(ctx, MLoads.M(1))
g.recordStats(ctx, nil, MLoads.M(1))

viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
// Check the cache again because singleflight can only dedup calls
Expand Down Expand Up @@ -465,12 +500,12 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi
} else {
g.Stats.CoalescedMaincacheHits.Add(1)
}
stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(CacheLevelKey, hlvl.String())}, MCoalescedCacheHits.M(1))
g.recordStats(ctx, []tag.Mutator{tag.Insert(CacheLevelKey, hlvl.String())}, MCoalescedCacheHits.M(1))
return &valWithLevel{value, hlvl, false, nil, nil}, nil

}
g.Stats.CoalescedLoads.Add(1)
stats.Record(ctx, MCoalescedLoads.M(1))
g.recordStats(ctx, nil, MCoalescedLoads.M(1))

authoritative := true
var peerErr error
Expand All @@ -479,12 +514,12 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi
authoritative = false
if peerErr == nil {
g.Stats.CoalescedPeerLoads.Add(1)
stats.Record(ctx, MCoalescedPeerLoads.M(1))
g.recordStats(ctx, nil, MCoalescedPeerLoads.M(1))
return &valWithLevel{value, hitPeer, false, nil, nil}, nil
}

g.Stats.PeerLoadErrors.Add(1)
stats.Record(ctx, MPeerLoadErrors.M(1))
g.recordStats(ctx, nil, MPeerLoadErrors.M(1))
// TODO(bradfitz): log the peer's error? keep
// log of the past few for /galaxycache? It's
// probably boring (normal task movement), so not
Expand All @@ -493,12 +528,12 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi
data, err := g.getLocally(ctx, key, dest)
if err != nil {
g.Stats.BackendLoadErrors.Add(1)
stats.Record(ctx, MBackendLoadErrors.M(1))
g.recordStats(ctx, nil, MBackendLoadErrors.M(1))
return nil, err
}

g.Stats.CoalescedBackendLoads.Add(1)
stats.Record(ctx, MCoalescedBackendLoads.M(1))
g.recordStats(ctx, nil, MCoalescedBackendLoads.M(1))
destPopulated = true // only one caller of load gets this return value
value = newValWithStat(data, nil)
g.populateCache(ctx, key, value, &g.mainCache)
Expand Down Expand Up @@ -568,7 +603,7 @@ func (g *Galaxy) populateCache(ctx context.Context, key string, value *valWithSt
cache.add(key, value)
// Record the size of this cache after we've finished evicting any necessary values.
defer func() {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(CacheTypeKey, cache.ctype.String())},
g.recordStats(ctx, []tag.Mutator{tag.Upsert(CacheTypeKey, cache.ctype.String())},
MCacheSize.M(cache.bytes()), MCacheEntries.M(cache.items()))
}()

Expand All @@ -591,6 +626,15 @@ func (g *Galaxy) populateCache(ctx context.Context, key string, value *valWithSt
}
}

func (g *Galaxy) recordStats(ctx context.Context, mutators []tag.Mutator, measurements ...stats.Measurement) {
stats.RecordWithOptions(
ctx,
stats.WithMeasurements(measurements...),
stats.WithTags(mutators...),
stats.WithRecorder(g.parent.recorder),
)
}

// CacheType represents a type of cache.
type CacheType uint8

Expand Down
34 changes: 34 additions & 0 deletions galaxycache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"unsafe"

"github.com/vimeo/galaxycache/promoter"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)

const (
Expand Down Expand Up @@ -579,3 +581,35 @@ func TestPromotion(t *testing.T) {
}

}

func TestRecorder(t *testing.T) {
meter := view.NewMeter()
meter.Start()
defer meter.Stop()
testView := &view.View{
Measure: MGets,
TagKeys: []tag.Key{GalaxyKey},
Aggregation: view.Count(),
}
meter.Register(testView)

getter := func(_ context.Context, key string, dest Codec) error {
return dest.UnmarshalBinary([]byte("got:" + key))
}
u := NewUniverse(&TestProtocol{}, "test-universe", WithRecorder(meter))
g := u.NewGalaxy("test", 1024, GetterFunc(getter))
var s StringCodec
err := g.Get(context.Background(), "foo", &s)
if err != nil {
t.Fatalf("error getting foo: %s", err)
}

rows, retErr := meter.RetrieveData(testView.Name)
if retErr != nil {
t.Fatalf("error getting data from view: %s", retErr)
}

if len(rows) != 1 {
t.Errorf("expected 1 row, got %d", len(rows))
}
}
21 changes: 17 additions & 4 deletions http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ type HTTPFetchProtocol struct {
basePath string
}

// HTTPOptions specifies a base path for serving and fetching.
// *ONLY SPECIFY IF NOT USING THE DEFAULT "/_galaxycache/" BASE PATH*.
// HTTPOptions can specify the transport, base path, and stats.Recorder for
// serving and fetching. *ONLY SPECIFY IF NOT USING THE DEFAULT "/_galaxycache/"
// BASE PATH*.
type HTTPOptions struct {
Transport http.RoundTripper
BasePath string
Recorder stats.Recorder
}

// NewHTTPFetchProtocol creates an HTTP fetch protocol to be passed
Expand Down Expand Up @@ -90,6 +92,7 @@ func (hp *HTTPFetchProtocol) NewFetcher(url string) (gc.RemoteFetcher, error) {
type HTTPHandler struct {
universe *gc.Universe
basePath string
recorder stats.Recorder
}

// RegisterHTTPHandler sets up an HTTPHandler with a user specified path
Expand All @@ -103,10 +106,16 @@ type HTTPHandler struct {
// if specifying a serveMux.
func RegisterHTTPHandler(universe *gc.Universe, opts *HTTPOptions, serveMux *http.ServeMux) {
basePath := defaultBasePath
var recorder stats.Recorder
if opts != nil {
basePath = opts.BasePath
recorder = opts.Recorder
}
newHTTPHandler := &HTTPHandler{
basePath: basePath,
universe: universe,
recorder: recorder,
}
newHTTPHandler := &HTTPHandler{basePath: basePath, universe: universe}
if serveMux == nil {
http.Handle(basePath, &ochttp.Handler{
Handler: ochttp.WithRouteTag(newHTTPHandler, basePath),
Expand Down Expand Up @@ -160,7 +169,11 @@ func (h *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

// TODO: remove galaxy.Stats from here
galaxy.Stats.ServerRequests.Add(1)
stats.Record(ctx, gc.MServerRequests.M(1))
stats.RecordWithOptions(
ctx,
stats.WithMeasurements(gc.MServerRequests.M(1)),
stats.WithRecorder(h.recorder),
)
var value gc.ByteCodec
err := galaxy.Get(ctx, key, &value)
if err != nil {
Expand Down

0 comments on commit 60c8d79

Please sign in to comment.