Skip to content

Commit

Permalink
Merge pull request #15 from vimeo/trace_enhancement
Browse files Browse the repository at this point in the history
Annotate traces with cache-hit-level data and add cache-size stats
  • Loading branch information
dfinkel authored Dec 9, 2019
2 parents ed31064 + 9884bcd commit 04dcb1b
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 26 deletions.
26 changes: 26 additions & 0 deletions cachetype_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 42 additions & 20 deletions galaxycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,16 @@ func (universe *Universe) NewGalaxy(name string, cacheBytes int64, getter Backen
peerPicker: universe.peerPicker,
cacheBytes: cacheBytes,
mainCache: cache{
lru: lru.New(0),
ctype: MainCache,
lru: lru.New(0),
},
hotCache: cache{
lru: lru.New(0),
ctype: HotCache,
lru: lru.New(0),
},
candidateCache: cache{
lru: lru.New(gOpts.maxCandidates),
ctype: CandidateCache,
lru: lru.New(gOpts.maxCandidates),
},
hcStatsWithTime: HCStatsWithTime{
hcs: &promoter.HCStats{
Expand Down Expand Up @@ -340,7 +343,9 @@ func (h hitLevel) isHit() bool {

// recordRequest records the corresponding opencensus measurement
// to the level at which data was found on Get/load
func (g *Galaxy) recordRequest(ctx context.Context, h hitLevel) {
func (g *Galaxy) recordRequest(ctx context.Context, h hitLevel, localAuthoritative bool) {
span := trace.FromContext(ctx)
span.Annotatef([]trace.Attribute{trace.StringAttribute("hit_level", h.String())}, "fetched from %s", h)
switch h {
case hitMaincache:
g.Stats.MaincacheHits.Add(1)
Expand All @@ -354,6 +359,9 @@ func (g *Galaxy) recordRequest(ctx context.Context, h hitLevel) {
case hitBackend:
g.Stats.BackendLoads.Add(1)
stats.Record(ctx, MBackendLoads.M(1))
if !localAuthoritative {
span.Annotate(nil, "failed to fetch from peer, not authoritative for key")
}
}
}

Expand Down Expand Up @@ -389,14 +397,14 @@ func (g *Galaxy) Get(ctx context.Context, key string, dest Codec) error {
stats.Record(ctx, MKeyLength.M(int64(len(key))))

if hlvl.isHit() {
span.Annotatef(nil, "Cache hit")
span.Annotatef([]trace.Attribute{trace.BoolAttribute("cache_hit", true)}, "Cache hit in %s", hlvl)
value.stats.touch()
g.recordRequest(ctx, hlvl)
g.recordRequest(ctx, hlvl, false)
stats.Record(ctx, MValueLength.M(int64(len(value.data))))
return dest.UnmarshalBinary(value.data)
}

span.Annotatef(nil, "Cache miss")
span.Annotatef([]trace.Attribute{trace.BoolAttribute("cache_hit", false)}, "Cache miss")
// Optimization to avoid double unmarshalling or copying: keep
// track of whether the dest was already populated. One caller
// (if local) will set this; the losers will not. The common
Expand All @@ -417,8 +425,11 @@ func (g *Galaxy) Get(ctx context.Context, key string, dest Codec) error {
}

type valWithLevel struct {
val *valWithStat
level hitLevel
val *valWithStat
level hitLevel
localAuthoritative bool
peerErr error
localErr error
}

// load loads key either by invoking the getter locally or by sending it to another machine.
Expand Down Expand Up @@ -455,19 +466,21 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi
g.Stats.CoalescedMaincacheHits.Add(1)
}
stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(CacheLevelKey, hlvl.String())}, MCoalescedCacheHits.M(1))
return &valWithLevel{value, hlvl}, nil
return &valWithLevel{value, hlvl, false, nil, nil}, nil

}
g.Stats.CoalescedLoads.Add(1)
stats.Record(ctx, MCoalescedLoads.M(1))

var err error
authoritative := true
var peerErr error
if peer, ok := g.peerPicker.pickPeer(key); ok {
value, err = g.getFromPeer(ctx, peer, key)
if err == nil {
value, peerErr = g.getFromPeer(ctx, peer, key)
authoritative = false
if peerErr == nil {
g.Stats.CoalescedPeerLoads.Add(1)
stats.Record(ctx, MCoalescedPeerLoads.M(1))
return &valWithLevel{value, hitPeer}, nil
return &valWithLevel{value, hitPeer, false, nil, nil}, nil
}

g.Stats.PeerLoadErrors.Add(1)
Expand All @@ -488,13 +501,14 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi
stats.Record(ctx, MCoalescedBackendLoads.M(1))
destPopulated = true // only one caller of load gets this return value
value = newValWithStat(data, nil)
g.populateCache(key, value, &g.mainCache)
return &valWithLevel{value, hitBackend}, nil
g.populateCache(ctx, key, value, &g.mainCache)
return &valWithLevel{value, hitBackend, authoritative, peerErr, err}, nil
})
if err == nil {
value = viewi.(*valWithLevel).val
level := viewi.(*valWithLevel).level
g.recordRequest(ctx, level) // record the hits for all load calls, including those that tagged onto the singleflight
authoritative := viewi.(*valWithLevel).localAuthoritative
g.recordRequest(ctx, level, authoritative) // record the hits for all load calls, including those that tagged onto the singleflight
}
return
}
Expand Down Expand Up @@ -526,7 +540,7 @@ func (g *Galaxy) getFromPeer(ctx context.Context, peer RemoteFetcher, key string
}
value := newValWithStat(data, kStats)
if g.opts.promoter.ShouldPromote(key, value.data, stats) {
g.populateCache(key, value, &g.hotCache)
g.populateCache(ctx, key, value, &g.hotCache)
}
return value, nil
}
Expand All @@ -547,11 +561,16 @@ func (g *Galaxy) lookupCache(key string) (*valWithStat, hitLevel) {
return vi.(*valWithStat), hitHotcache
}

func (g *Galaxy) populateCache(key string, value *valWithStat, cache *cache) {
func (g *Galaxy) populateCache(ctx context.Context, key string, value *valWithStat, cache *cache) {
if g.cacheBytes <= 0 {
return
}
cache.add(key, value)
// Record the size of this cache after we've finished evicting any necessary values.
defer func() {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(CacheTypeKey, cache.ctype.String())},
MCacheSize.M(cache.bytes()), MCacheEntries.M(cache.items()))
}()

// Evict items from cache(s) if necessary.
for {
Expand All @@ -573,7 +592,7 @@ func (g *Galaxy) populateCache(key string, value *valWithStat, cache *cache) {
}

// CacheType represents a type of cache.
type CacheType int
type CacheType uint8

const (
// MainCache is the cache for items that this peer is the
Expand Down Expand Up @@ -613,6 +632,7 @@ type cache struct {
lru *lru.Cache
nhit, nget int64
nevict int64 // number of evictions
ctype CacheType
}

func (c *cache) stats() CacheStats {
Expand Down Expand Up @@ -726,3 +746,5 @@ type CacheStats struct {
Hits int64
Evictions int64
}

//go:generate stringer -type=CacheType
2 changes: 1 addition & 1 deletion galaxycache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func TestPeers(t *testing.T) {
return dest.UnmarshalBinary([]byte("got:" + key))
}

testGalaxy := universe.NewGalaxy("TestPeers-galaxy", tc.cacheSize, GetterFunc(getter), WithPromoter(&promoter.ProbabilisticPromoter{10}))
testGalaxy := universe.NewGalaxy("TestPeers-galaxy", tc.cacheSize, GetterFunc(getter), WithPromoter(&promoter.ProbabilisticPromoter{ProbDenominator: 10}))

if tc.initFunc != nil {
tc.initFunc(testGalaxy, testproto.TestFetchers)
Expand Down
11 changes: 10 additions & 1 deletion grpc/grpcclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
gc "github.com/vimeo/galaxycache"
pb "github.com/vimeo/galaxycache/galaxycachepb"
"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/trace"

"google.golang.org/grpc"
"google.golang.org/grpc/status"
Expand All @@ -45,7 +46,13 @@ type grpcFetcher struct {
// certificates on the peers operating as servers should specify
// grpc.WithInsecure() as one of the arguments.
func NewGRPCFetchProtocol(dialOpts ...grpc.DialOption) *GRPCFetchProtocol {
dialOpts = append(dialOpts, grpc.WithStatsHandler(&ocgrpc.ClientHandler{}))
dialOpts = append(dialOpts, grpc.WithStatsHandler(&ocgrpc.ClientHandler{
StartOptions: trace.StartOptions{
// Preserve the sampling-decision of the parent span
Sampler: nil,
SpanKind: trace.SpanKindClient,
},
}))
return &GRPCFetchProtocol{PeerDialOptions: dialOpts}
}

Expand All @@ -64,6 +71,8 @@ func (gp *GRPCFetchProtocol) NewFetcher(address string) (gc.RemoteFetcher, error
// Fetch here implements the RemoteFetcher interface for
// sending Gets to peers over an RPC connection
func (g *grpcFetcher) Fetch(ctx context.Context, galaxy string, key string) ([]byte, error) {
span := trace.FromContext(ctx)
span.Annotatef(nil, "fetching from %s; connection state %s", g.address, g.conn.GetState())
resp, err := g.client.GetFromPeer(ctx, &pb.GetRequest{
Galaxy: galaxy,
Key: key,
Expand Down
18 changes: 14 additions & 4 deletions observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,21 @@ var (
MValueLength = stats.Int64("galaxycache/value_length", "The length of values", stats.UnitBytes)

MRoundtripLatencyMilliseconds = stats.Float64("galaxycache/roundtrip_latency", "Roundtrip latency in milliseconds", stats.UnitMilliseconds)

MCacheSize = stats.Int64("galaxycache/cache_bytes", "The number of bytes used for storing Keys and Values in the cache", stats.UnitBytes)
MCacheEntries = stats.Int64("galaxycache/cache_entries", "The number of entries in the cache", stats.UnitDimensionless)
)

// GalaxyKey tags the name of the galaxy
var GalaxyKey = tag.MustNewKey("galaxy")
var (
// GalaxyKey tags the name of the galaxy
GalaxyKey = tag.MustNewKey("galaxy")

// CacheLevelKey tags the level at which data was found on Get
CacheLevelKey = tag.MustNewKey("cache-hit-level")

// CacheLevelKey tags the level at which data was found on Get
var CacheLevelKey = tag.MustNewKey("cache-hit-level")
// CacheTypeKey tags the galaxy sub-cache the metric applies to
CacheTypeKey = tag.MustNewKey("cache-type")
)

// AllViews is a slice of default views for people to use
var AllViews = []*view.View{
Expand All @@ -80,6 +88,8 @@ var AllViews = []*view.View{
{Measure: MValueLength, TagKeys: []tag.Key{GalaxyKey}, Aggregation: defaultBytesDistribution},

{Measure: MRoundtripLatencyMilliseconds, TagKeys: []tag.Key{GalaxyKey}, Aggregation: defaultMillisecondsDistribution},
{Measure: MCacheSize, TagKeys: []tag.Key{GalaxyKey, CacheTypeKey}, Aggregation: view.LastValue()},
{Measure: MCacheEntries, TagKeys: []tag.Key{GalaxyKey, CacheTypeKey}, Aggregation: view.LastValue()},
}

func sinceInMilliseconds(start time.Time) float64 {
Expand Down

0 comments on commit 04dcb1b

Please sign in to comment.