From eb212992fde9f550df6b1e27677271a9bf244464 Mon Sep 17 00:00:00 2001 From: David Finkel Date: Mon, 9 Dec 2019 15:03:05 -0500 Subject: [PATCH 1/7] Plumb cache-hit-level information up the callstack Due to single-flighting some apsects of tracing are a bit more difficult than we'd usually expect, so plumb back information about cache-hit-levels in the results struct. --- galaxycache.go | 37 ++++++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/galaxycache.go b/galaxycache.go index 51ba90a2..e2186eb8 100644 --- a/galaxycache.go +++ b/galaxycache.go @@ -340,7 +340,9 @@ func (h hitLevel) isHit() bool { // recordRequest records the corresponding opencensus measurement // to the level at which data was found on Get/load -func (g *Galaxy) recordRequest(ctx context.Context, h hitLevel) { +func (g *Galaxy) recordRequest(ctx context.Context, h hitLevel, localAuthoritative bool) { + span := trace.FromContext(ctx) + span.Annotatef([]trace.Attribute{trace.StringAttribute("hit_level", h.String())}, "fetched from %s", h) switch h { case hitMaincache: g.Stats.MaincacheHits.Add(1) @@ -354,6 +356,9 @@ func (g *Galaxy) recordRequest(ctx context.Context, h hitLevel) { case hitBackend: g.Stats.BackendLoads.Add(1) stats.Record(ctx, MBackendLoads.M(1)) + if !localAuthoritative { + span.Annotatef(nil, "failed to fetch from peer, not authoritative for key") + } } } @@ -389,14 +394,14 @@ func (g *Galaxy) Get(ctx context.Context, key string, dest Codec) error { stats.Record(ctx, MKeyLength.M(int64(len(key)))) if hlvl.isHit() { - span.Annotatef(nil, "Cache hit") + span.Annotatef([]trace.Attribute{trace.BoolAttribute("cache_hit", true)}, "Cache hit in %s", hlvl) value.stats.touch() - g.recordRequest(ctx, hlvl) + g.recordRequest(ctx, hlvl, false) stats.Record(ctx, MValueLength.M(int64(len(value.data)))) return dest.UnmarshalBinary(value.data) } - span.Annotatef(nil, "Cache miss") + span.Annotatef([]trace.Attribute{trace.BoolAttribute("cache_hit", false)}, "Cache miss") // Optimization to avoid double unmarshalling or copying: keep // track of whether the dest was already populated. One caller // (if local) will set this; the losers will not. The common @@ -417,8 +422,11 @@ func (g *Galaxy) Get(ctx context.Context, key string, dest Codec) error { } type valWithLevel struct { - val *valWithStat - level hitLevel + val *valWithStat + level hitLevel + localAuthoritative bool + peerErr error + localErr error } // load loads key either by invoking the getter locally or by sending it to another machine. @@ -455,19 +463,21 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi g.Stats.CoalescedMaincacheHits.Add(1) } stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(CacheLevelKey, hlvl.String())}, MCoalescedCacheHits.M(1)) - return &valWithLevel{value, hlvl}, nil + return &valWithLevel{value, hlvl, false, nil, nil}, nil } g.Stats.CoalescedLoads.Add(1) stats.Record(ctx, MCoalescedLoads.M(1)) - var err error + authoritative := true + var peerErr error if peer, ok := g.peerPicker.pickPeer(key); ok { - value, err = g.getFromPeer(ctx, peer, key) - if err == nil { + value, peerErr = g.getFromPeer(ctx, peer, key) + authoritative = false + if peerErr == nil { g.Stats.CoalescedPeerLoads.Add(1) stats.Record(ctx, MCoalescedPeerLoads.M(1)) - return &valWithLevel{value, hitPeer}, nil + return &valWithLevel{value, hitPeer, false, nil, nil}, nil } g.Stats.PeerLoadErrors.Add(1) @@ -489,12 +499,13 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi destPopulated = true // only one caller of load gets this return value value = newValWithStat(data, nil) g.populateCache(key, value, &g.mainCache) - return &valWithLevel{value, hitBackend}, nil + return &valWithLevel{value, hitBackend, authoritative, peerErr, err}, nil }) if err == nil { value = viewi.(*valWithLevel).val level := viewi.(*valWithLevel).level - g.recordRequest(ctx, level) // record the hits for all load calls, including those that tagged onto the singleflight + authoritative := viewi.(*valWithLevel).localAuthoritative + g.recordRequest(ctx, level, authoritative) // record the hits for all load calls, including those that tagged onto the singleflight } return } From da6a423b60b86fd202cfb2bd0d6e6146cd6400e6 Mon Sep 17 00:00:00 2001 From: David Finkel Date: Mon, 9 Dec 2019 15:07:08 -0500 Subject: [PATCH 2/7] grpc: Annotate requests with connection state --- grpc/grpcclient.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/grpc/grpcclient.go b/grpc/grpcclient.go index 1ab9366d..d39f4c94 100644 --- a/grpc/grpcclient.go +++ b/grpc/grpcclient.go @@ -22,6 +22,7 @@ import ( gc "github.com/vimeo/galaxycache" pb "github.com/vimeo/galaxycache/galaxycachepb" "go.opencensus.io/plugin/ocgrpc" + "go.opencensus.io/trace" "google.golang.org/grpc" "google.golang.org/grpc/status" @@ -45,7 +46,13 @@ type grpcFetcher struct { // certificates on the peers operating as servers should specify // grpc.WithInsecure() as one of the arguments. func NewGRPCFetchProtocol(dialOpts ...grpc.DialOption) *GRPCFetchProtocol { - dialOpts = append(dialOpts, grpc.WithStatsHandler(&ocgrpc.ClientHandler{})) + dialOpts = append(dialOpts, grpc.WithStatsHandler(&ocgrpc.ClientHandler{ + StartOptions: trace.StartOptions{ + // Preserve the sampling-decision of the parent span + Sampler: nil, + SpanKind: trace.SpanKindClient, + }, + })) return &GRPCFetchProtocol{PeerDialOptions: dialOpts} } @@ -64,6 +71,8 @@ func (gp *GRPCFetchProtocol) NewFetcher(address string) (gc.RemoteFetcher, error // Fetch here implements the RemoteFetcher interface for // sending Gets to peers over an RPC connection func (g *grpcFetcher) Fetch(ctx context.Context, galaxy string, key string) ([]byte, error) { + span := trace.FromContext(ctx) + span.Annotatef(nil, "fetching from %s; connection state %s", g.address, g.conn.GetState()) resp, err := g.client.GetFromPeer(ctx, &pb.GetRequest{ Galaxy: galaxy, Key: key, From 3085b3f17eead8d08475cc737ea07ed65dc531f8 Mon Sep 17 00:00:00 2001 From: David Finkel Date: Mon, 9 Dec 2019 15:56:27 -0500 Subject: [PATCH 3/7] opencensus metrics: add cache-{size,entries} stats Also define a (generated) stringer for `CacheType`. --- cachetype_string.go | 26 ++++++++++++++++++++++++++ galaxycache.go | 23 +++++++++++++++++------ observability.go | 5 +++++ 3 files changed, 48 insertions(+), 6 deletions(-) create mode 100644 cachetype_string.go diff --git a/cachetype_string.go b/cachetype_string.go new file mode 100644 index 00000000..7a93c371 --- /dev/null +++ b/cachetype_string.go @@ -0,0 +1,26 @@ +// Code generated by "stringer -type=CacheType"; DO NOT EDIT. + +package galaxycache + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[MainCache-1] + _ = x[HotCache-2] + _ = x[CandidateCache-3] +} + +const _CacheType_name = "MainCacheHotCacheCandidateCache" + +var _CacheType_index = [...]uint8{0, 9, 17, 31} + +func (i CacheType) String() string { + i -= 1 + if i < 0 || i >= CacheType(len(_CacheType_index)-1) { + return "CacheType(" + strconv.FormatInt(int64(i+1), 10) + ")" + } + return _CacheType_name[_CacheType_index[i]:_CacheType_index[i+1]] +} diff --git a/galaxycache.go b/galaxycache.go index e2186eb8..3211a453 100644 --- a/galaxycache.go +++ b/galaxycache.go @@ -127,13 +127,16 @@ func (universe *Universe) NewGalaxy(name string, cacheBytes int64, getter Backen peerPicker: universe.peerPicker, cacheBytes: cacheBytes, mainCache: cache{ - lru: lru.New(0), + ctype: MainCache, + lru: lru.New(0), }, hotCache: cache{ - lru: lru.New(0), + ctype: HotCache, + lru: lru.New(0), }, candidateCache: cache{ - lru: lru.New(gOpts.maxCandidates), + ctype: CandidateCache, + lru: lru.New(gOpts.maxCandidates), }, hcStatsWithTime: HCStatsWithTime{ hcs: &promoter.HCStats{ @@ -498,7 +501,7 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi stats.Record(ctx, MCoalescedBackendLoads.M(1)) destPopulated = true // only one caller of load gets this return value value = newValWithStat(data, nil) - g.populateCache(key, value, &g.mainCache) + g.populateCache(ctx, key, value, &g.mainCache) return &valWithLevel{value, hitBackend, authoritative, peerErr, err}, nil }) if err == nil { @@ -537,7 +540,7 @@ func (g *Galaxy) getFromPeer(ctx context.Context, peer RemoteFetcher, key string } value := newValWithStat(data, kStats) if g.opts.promoter.ShouldPromote(key, value.data, stats) { - g.populateCache(key, value, &g.hotCache) + g.populateCache(ctx, key, value, &g.hotCache) } return value, nil } @@ -558,11 +561,16 @@ func (g *Galaxy) lookupCache(key string) (*valWithStat, hitLevel) { return vi.(*valWithStat), hitHotcache } -func (g *Galaxy) populateCache(key string, value *valWithStat, cache *cache) { +func (g *Galaxy) populateCache(ctx context.Context, key string, value *valWithStat, cache *cache) { if g.cacheBytes <= 0 { return } cache.add(key, value) + // Record the size of this cache after we've finished evicting any necessary values. + defer func() { + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(CacheLevelKey, cache.ctype.String())}, + MCacheSize.M(cache.bytes()), MCacheEntries.M(cache.items())) + }() // Evict items from cache(s) if necessary. for { @@ -619,6 +627,7 @@ func (g *Galaxy) CacheStats(which CacheType) CacheStats { // and counts the size of all keys and values. Candidate cache only // utilizes the lru.Cache and mutex, not the included stats. type cache struct { + ctype CacheType mu sync.Mutex nbytes int64 // of all keys and values lru *lru.Cache @@ -737,3 +746,5 @@ type CacheStats struct { Hits int64 Evictions int64 } + +//go:generate stringer -type=CacheType diff --git a/observability.go b/observability.go index c89f8fe1..933f288f 100644 --- a/observability.go +++ b/observability.go @@ -52,6 +52,9 @@ var ( MValueLength = stats.Int64("galaxycache/value_length", "The length of values", stats.UnitBytes) MRoundtripLatencyMilliseconds = stats.Float64("galaxycache/roundtrip_latency", "Roundtrip latency in milliseconds", stats.UnitMilliseconds) + + MCacheSize = stats.Int64("galaxycache/cache_bytes", "The number of bytes used for storing Keys and Values in the cache", stats.UnitBytes) + MCacheEntries = stats.Int64("galaxycache/cache_entries", "The number of entries in the cache", stats.UnitDimensionless) ) // GalaxyKey tags the name of the galaxy @@ -80,6 +83,8 @@ var AllViews = []*view.View{ {Measure: MValueLength, TagKeys: []tag.Key{GalaxyKey}, Aggregation: defaultBytesDistribution}, {Measure: MRoundtripLatencyMilliseconds, TagKeys: []tag.Key{GalaxyKey}, Aggregation: defaultMillisecondsDistribution}, + {Measure: MCacheSize, TagKeys: []tag.Key{GalaxyKey, CacheLevelKey}, Aggregation: view.LastValue()}, + {Measure: MCacheEntries, TagKeys: []tag.Key{GalaxyKey, CacheLevelKey}, Aggregation: view.LastValue()}, } func sinceInMilliseconds(start time.Time) float64 { From 2bb6b748679d4dc592ec0b5f88f13685dfbe64ad Mon Sep 17 00:00:00 2001 From: David Finkel Date: Mon, 9 Dec 2019 16:06:06 -0500 Subject: [PATCH 4/7] Keyify a Promoter in a test This cleans up a go vet lint. --- galaxycache_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/galaxycache_test.go b/galaxycache_test.go index bc53ec46..6017191a 100644 --- a/galaxycache_test.go +++ b/galaxycache_test.go @@ -272,7 +272,7 @@ func TestPeers(t *testing.T) { return dest.UnmarshalBinary([]byte("got:" + key)) } - testGalaxy := universe.NewGalaxy("TestPeers-galaxy", tc.cacheSize, GetterFunc(getter), WithPromoter(&promoter.ProbabilisticPromoter{10})) + testGalaxy := universe.NewGalaxy("TestPeers-galaxy", tc.cacheSize, GetterFunc(getter), WithPromoter(&promoter.ProbabilisticPromoter{ProbDenominator: 10})) if tc.initFunc != nil { tc.initFunc(testGalaxy, testproto.TestFetchers) From 061a0ea6dd3edd1b68158affebc7f8fc2183dbbc Mon Sep 17 00:00:00 2001 From: David Finkel Date: Mon, 9 Dec 2019 16:16:43 -0500 Subject: [PATCH 5/7] CacheType: convert to uint8 We only need ~2 bits for the types of cache. `int` is 64-bits on 64-bit platforms, which is significant overkill. Convert CacheType to a uint8 and move it to the end of the `cache` struct so it only affects alignment, but not the actual size. (we can pack in other small values at the end later as well) --- cachetype_string.go | 2 +- galaxycache.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cachetype_string.go b/cachetype_string.go index 7a93c371..abc5c2ee 100644 --- a/cachetype_string.go +++ b/cachetype_string.go @@ -19,7 +19,7 @@ var _CacheType_index = [...]uint8{0, 9, 17, 31} func (i CacheType) String() string { i -= 1 - if i < 0 || i >= CacheType(len(_CacheType_index)-1) { + if i >= CacheType(len(_CacheType_index)-1) { return "CacheType(" + strconv.FormatInt(int64(i+1), 10) + ")" } return _CacheType_name[_CacheType_index[i]:_CacheType_index[i+1]] diff --git a/galaxycache.go b/galaxycache.go index 3211a453..a0171146 100644 --- a/galaxycache.go +++ b/galaxycache.go @@ -592,7 +592,7 @@ func (g *Galaxy) populateCache(ctx context.Context, key string, value *valWithSt } // CacheType represents a type of cache. -type CacheType int +type CacheType uint8 const ( // MainCache is the cache for items that this peer is the @@ -627,12 +627,12 @@ func (g *Galaxy) CacheStats(which CacheType) CacheStats { // and counts the size of all keys and values. Candidate cache only // utilizes the lru.Cache and mutex, not the included stats. type cache struct { - ctype CacheType mu sync.Mutex nbytes int64 // of all keys and values lru *lru.Cache nhit, nget int64 nevict int64 // number of evictions + ctype CacheType } func (c *cache) stats() CacheStats { From b0ef4acd6fa505ba31982d648da8a6beb7e41705 Mon Sep 17 00:00:00 2001 From: David Finkel Date: Mon, 9 Dec 2019 16:37:18 -0500 Subject: [PATCH 6/7] observability: separate out CacheTypeKey --- galaxycache.go | 2 +- observability.go | 17 +++++++++++------ 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/galaxycache.go b/galaxycache.go index a0171146..8dfafc10 100644 --- a/galaxycache.go +++ b/galaxycache.go @@ -568,7 +568,7 @@ func (g *Galaxy) populateCache(ctx context.Context, key string, value *valWithSt cache.add(key, value) // Record the size of this cache after we've finished evicting any necessary values. defer func() { - stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(CacheLevelKey, cache.ctype.String())}, + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(CacheTypeKey, cache.ctype.String())}, MCacheSize.M(cache.bytes()), MCacheEntries.M(cache.items())) }() diff --git a/observability.go b/observability.go index 933f288f..c1aa2522 100644 --- a/observability.go +++ b/observability.go @@ -57,11 +57,16 @@ var ( MCacheEntries = stats.Int64("galaxycache/cache_entries", "The number of entries in the cache", stats.UnitDimensionless) ) -// GalaxyKey tags the name of the galaxy -var GalaxyKey = tag.MustNewKey("galaxy") +var ( + // GalaxyKey tags the name of the galaxy + GalaxyKey = tag.MustNewKey("galaxy") + + // CacheLevelKey tags the level at which data was found on Get + CacheLevelKey = tag.MustNewKey("cache-hit-level") -// CacheLevelKey tags the level at which data was found on Get -var CacheLevelKey = tag.MustNewKey("cache-hit-level") + // CacheTypeKey tags the galaxy sub-cache the metric applies to + CacheTypeKey = tag.MustNewKey("cache-type") +) // AllViews is a slice of default views for people to use var AllViews = []*view.View{ @@ -83,8 +88,8 @@ var AllViews = []*view.View{ {Measure: MValueLength, TagKeys: []tag.Key{GalaxyKey}, Aggregation: defaultBytesDistribution}, {Measure: MRoundtripLatencyMilliseconds, TagKeys: []tag.Key{GalaxyKey}, Aggregation: defaultMillisecondsDistribution}, - {Measure: MCacheSize, TagKeys: []tag.Key{GalaxyKey, CacheLevelKey}, Aggregation: view.LastValue()}, - {Measure: MCacheEntries, TagKeys: []tag.Key{GalaxyKey, CacheLevelKey}, Aggregation: view.LastValue()}, + {Measure: MCacheSize, TagKeys: []tag.Key{GalaxyKey, CacheTypeKey}, Aggregation: view.LastValue()}, + {Measure: MCacheEntries, TagKeys: []tag.Key{GalaxyKey, CacheTypeKey}, Aggregation: view.LastValue()}, } func sinceInMilliseconds(start time.Time) float64 { From 9884bcd8f1006ae2a87481ee3708d41a19f1d156 Mon Sep 17 00:00:00 2001 From: David Finkel Date: Mon, 9 Dec 2019 17:17:49 -0500 Subject: [PATCH 7/7] Annotatef -> Annotate (per review comment) --- galaxycache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/galaxycache.go b/galaxycache.go index 8dfafc10..2cfa5872 100644 --- a/galaxycache.go +++ b/galaxycache.go @@ -360,7 +360,7 @@ func (g *Galaxy) recordRequest(ctx context.Context, h hitLevel, localAuthoritati g.Stats.BackendLoads.Add(1) stats.Record(ctx, MBackendLoads.M(1)) if !localAuthoritative { - span.Annotatef(nil, "failed to fetch from peer, not authoritative for key") + span.Annotate(nil, "failed to fetch from peer, not authoritative for key") } } }