diff --git a/cachetype_string.go b/cachetype_string.go new file mode 100644 index 00000000..abc5c2ee --- /dev/null +++ b/cachetype_string.go @@ -0,0 +1,26 @@ +// Code generated by "stringer -type=CacheType"; DO NOT EDIT. + +package galaxycache + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[MainCache-1] + _ = x[HotCache-2] + _ = x[CandidateCache-3] +} + +const _CacheType_name = "MainCacheHotCacheCandidateCache" + +var _CacheType_index = [...]uint8{0, 9, 17, 31} + +func (i CacheType) String() string { + i -= 1 + if i >= CacheType(len(_CacheType_index)-1) { + return "CacheType(" + strconv.FormatInt(int64(i+1), 10) + ")" + } + return _CacheType_name[_CacheType_index[i]:_CacheType_index[i+1]] +} diff --git a/galaxycache.go b/galaxycache.go index 51ba90a2..2cfa5872 100644 --- a/galaxycache.go +++ b/galaxycache.go @@ -127,13 +127,16 @@ func (universe *Universe) NewGalaxy(name string, cacheBytes int64, getter Backen peerPicker: universe.peerPicker, cacheBytes: cacheBytes, mainCache: cache{ - lru: lru.New(0), + ctype: MainCache, + lru: lru.New(0), }, hotCache: cache{ - lru: lru.New(0), + ctype: HotCache, + lru: lru.New(0), }, candidateCache: cache{ - lru: lru.New(gOpts.maxCandidates), + ctype: CandidateCache, + lru: lru.New(gOpts.maxCandidates), }, hcStatsWithTime: HCStatsWithTime{ hcs: &promoter.HCStats{ @@ -340,7 +343,9 @@ func (h hitLevel) isHit() bool { // recordRequest records the corresponding opencensus measurement // to the level at which data was found on Get/load -func (g *Galaxy) recordRequest(ctx context.Context, h hitLevel) { +func (g *Galaxy) recordRequest(ctx context.Context, h hitLevel, localAuthoritative bool) { + span := trace.FromContext(ctx) + span.Annotatef([]trace.Attribute{trace.StringAttribute("hit_level", h.String())}, "fetched from %s", h) switch h { case hitMaincache: g.Stats.MaincacheHits.Add(1) @@ -354,6 +359,9 @@ func (g *Galaxy) recordRequest(ctx context.Context, h hitLevel) { case hitBackend: g.Stats.BackendLoads.Add(1) stats.Record(ctx, MBackendLoads.M(1)) + if !localAuthoritative { + span.Annotate(nil, "failed to fetch from peer, not authoritative for key") + } } } @@ -389,14 +397,14 @@ func (g *Galaxy) Get(ctx context.Context, key string, dest Codec) error { stats.Record(ctx, MKeyLength.M(int64(len(key)))) if hlvl.isHit() { - span.Annotatef(nil, "Cache hit") + span.Annotatef([]trace.Attribute{trace.BoolAttribute("cache_hit", true)}, "Cache hit in %s", hlvl) value.stats.touch() - g.recordRequest(ctx, hlvl) + g.recordRequest(ctx, hlvl, false) stats.Record(ctx, MValueLength.M(int64(len(value.data)))) return dest.UnmarshalBinary(value.data) } - span.Annotatef(nil, "Cache miss") + span.Annotatef([]trace.Attribute{trace.BoolAttribute("cache_hit", false)}, "Cache miss") // Optimization to avoid double unmarshalling or copying: keep // track of whether the dest was already populated. One caller // (if local) will set this; the losers will not. The common @@ -417,8 +425,11 @@ func (g *Galaxy) Get(ctx context.Context, key string, dest Codec) error { } type valWithLevel struct { - val *valWithStat - level hitLevel + val *valWithStat + level hitLevel + localAuthoritative bool + peerErr error + localErr error } // load loads key either by invoking the getter locally or by sending it to another machine. @@ -455,19 +466,21 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi g.Stats.CoalescedMaincacheHits.Add(1) } stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(CacheLevelKey, hlvl.String())}, MCoalescedCacheHits.M(1)) - return &valWithLevel{value, hlvl}, nil + return &valWithLevel{value, hlvl, false, nil, nil}, nil } g.Stats.CoalescedLoads.Add(1) stats.Record(ctx, MCoalescedLoads.M(1)) - var err error + authoritative := true + var peerErr error if peer, ok := g.peerPicker.pickPeer(key); ok { - value, err = g.getFromPeer(ctx, peer, key) - if err == nil { + value, peerErr = g.getFromPeer(ctx, peer, key) + authoritative = false + if peerErr == nil { g.Stats.CoalescedPeerLoads.Add(1) stats.Record(ctx, MCoalescedPeerLoads.M(1)) - return &valWithLevel{value, hitPeer}, nil + return &valWithLevel{value, hitPeer, false, nil, nil}, nil } g.Stats.PeerLoadErrors.Add(1) @@ -488,13 +501,14 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi stats.Record(ctx, MCoalescedBackendLoads.M(1)) destPopulated = true // only one caller of load gets this return value value = newValWithStat(data, nil) - g.populateCache(key, value, &g.mainCache) - return &valWithLevel{value, hitBackend}, nil + g.populateCache(ctx, key, value, &g.mainCache) + return &valWithLevel{value, hitBackend, authoritative, peerErr, err}, nil }) if err == nil { value = viewi.(*valWithLevel).val level := viewi.(*valWithLevel).level - g.recordRequest(ctx, level) // record the hits for all load calls, including those that tagged onto the singleflight + authoritative := viewi.(*valWithLevel).localAuthoritative + g.recordRequest(ctx, level, authoritative) // record the hits for all load calls, including those that tagged onto the singleflight } return } @@ -526,7 +540,7 @@ func (g *Galaxy) getFromPeer(ctx context.Context, peer RemoteFetcher, key string } value := newValWithStat(data, kStats) if g.opts.promoter.ShouldPromote(key, value.data, stats) { - g.populateCache(key, value, &g.hotCache) + g.populateCache(ctx, key, value, &g.hotCache) } return value, nil } @@ -547,11 +561,16 @@ func (g *Galaxy) lookupCache(key string) (*valWithStat, hitLevel) { return vi.(*valWithStat), hitHotcache } -func (g *Galaxy) populateCache(key string, value *valWithStat, cache *cache) { +func (g *Galaxy) populateCache(ctx context.Context, key string, value *valWithStat, cache *cache) { if g.cacheBytes <= 0 { return } cache.add(key, value) + // Record the size of this cache after we've finished evicting any necessary values. + defer func() { + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(CacheTypeKey, cache.ctype.String())}, + MCacheSize.M(cache.bytes()), MCacheEntries.M(cache.items())) + }() // Evict items from cache(s) if necessary. for { @@ -573,7 +592,7 @@ func (g *Galaxy) populateCache(key string, value *valWithStat, cache *cache) { } // CacheType represents a type of cache. -type CacheType int +type CacheType uint8 const ( // MainCache is the cache for items that this peer is the @@ -613,6 +632,7 @@ type cache struct { lru *lru.Cache nhit, nget int64 nevict int64 // number of evictions + ctype CacheType } func (c *cache) stats() CacheStats { @@ -726,3 +746,5 @@ type CacheStats struct { Hits int64 Evictions int64 } + +//go:generate stringer -type=CacheType diff --git a/galaxycache_test.go b/galaxycache_test.go index bc53ec46..6017191a 100644 --- a/galaxycache_test.go +++ b/galaxycache_test.go @@ -272,7 +272,7 @@ func TestPeers(t *testing.T) { return dest.UnmarshalBinary([]byte("got:" + key)) } - testGalaxy := universe.NewGalaxy("TestPeers-galaxy", tc.cacheSize, GetterFunc(getter), WithPromoter(&promoter.ProbabilisticPromoter{10})) + testGalaxy := universe.NewGalaxy("TestPeers-galaxy", tc.cacheSize, GetterFunc(getter), WithPromoter(&promoter.ProbabilisticPromoter{ProbDenominator: 10})) if tc.initFunc != nil { tc.initFunc(testGalaxy, testproto.TestFetchers) diff --git a/grpc/grpcclient.go b/grpc/grpcclient.go index 1ab9366d..d39f4c94 100644 --- a/grpc/grpcclient.go +++ b/grpc/grpcclient.go @@ -22,6 +22,7 @@ import ( gc "github.com/vimeo/galaxycache" pb "github.com/vimeo/galaxycache/galaxycachepb" "go.opencensus.io/plugin/ocgrpc" + "go.opencensus.io/trace" "google.golang.org/grpc" "google.golang.org/grpc/status" @@ -45,7 +46,13 @@ type grpcFetcher struct { // certificates on the peers operating as servers should specify // grpc.WithInsecure() as one of the arguments. func NewGRPCFetchProtocol(dialOpts ...grpc.DialOption) *GRPCFetchProtocol { - dialOpts = append(dialOpts, grpc.WithStatsHandler(&ocgrpc.ClientHandler{})) + dialOpts = append(dialOpts, grpc.WithStatsHandler(&ocgrpc.ClientHandler{ + StartOptions: trace.StartOptions{ + // Preserve the sampling-decision of the parent span + Sampler: nil, + SpanKind: trace.SpanKindClient, + }, + })) return &GRPCFetchProtocol{PeerDialOptions: dialOpts} } @@ -64,6 +71,8 @@ func (gp *GRPCFetchProtocol) NewFetcher(address string) (gc.RemoteFetcher, error // Fetch here implements the RemoteFetcher interface for // sending Gets to peers over an RPC connection func (g *grpcFetcher) Fetch(ctx context.Context, galaxy string, key string) ([]byte, error) { + span := trace.FromContext(ctx) + span.Annotatef(nil, "fetching from %s; connection state %s", g.address, g.conn.GetState()) resp, err := g.client.GetFromPeer(ctx, &pb.GetRequest{ Galaxy: galaxy, Key: key, diff --git a/observability.go b/observability.go index c89f8fe1..c1aa2522 100644 --- a/observability.go +++ b/observability.go @@ -52,13 +52,21 @@ var ( MValueLength = stats.Int64("galaxycache/value_length", "The length of values", stats.UnitBytes) MRoundtripLatencyMilliseconds = stats.Float64("galaxycache/roundtrip_latency", "Roundtrip latency in milliseconds", stats.UnitMilliseconds) + + MCacheSize = stats.Int64("galaxycache/cache_bytes", "The number of bytes used for storing Keys and Values in the cache", stats.UnitBytes) + MCacheEntries = stats.Int64("galaxycache/cache_entries", "The number of entries in the cache", stats.UnitDimensionless) ) -// GalaxyKey tags the name of the galaxy -var GalaxyKey = tag.MustNewKey("galaxy") +var ( + // GalaxyKey tags the name of the galaxy + GalaxyKey = tag.MustNewKey("galaxy") + + // CacheLevelKey tags the level at which data was found on Get + CacheLevelKey = tag.MustNewKey("cache-hit-level") -// CacheLevelKey tags the level at which data was found on Get -var CacheLevelKey = tag.MustNewKey("cache-hit-level") + // CacheTypeKey tags the galaxy sub-cache the metric applies to + CacheTypeKey = tag.MustNewKey("cache-type") +) // AllViews is a slice of default views for people to use var AllViews = []*view.View{ @@ -80,6 +88,8 @@ var AllViews = []*view.View{ {Measure: MValueLength, TagKeys: []tag.Key{GalaxyKey}, Aggregation: defaultBytesDistribution}, {Measure: MRoundtripLatencyMilliseconds, TagKeys: []tag.Key{GalaxyKey}, Aggregation: defaultMillisecondsDistribution}, + {Measure: MCacheSize, TagKeys: []tag.Key{GalaxyKey, CacheTypeKey}, Aggregation: view.LastValue()}, + {Measure: MCacheEntries, TagKeys: []tag.Key{GalaxyKey, CacheTypeKey}, Aggregation: view.LastValue()}, } func sinceInMilliseconds(start time.Time) float64 {