From f1529d7724b42c7b2f60c96c736c9579dc046fdc Mon Sep 17 00:00:00 2001 From: David Finkel Date: Fri, 29 Jul 2022 15:32:54 -0400 Subject: [PATCH] hot/candidate cache: remove damping on key qps The damping was a bit problematic in that it prevented any hits less than a second after the initial candidate-cache insertion from having a non-zero qps calculated. In this new windowed-QPS type, we specifically avoid including a time.Time so there are no pointers for the Garbage Collector to scan. As such, we now measure time relative to a new `baseTime` in the Galaxy. To facilitate testing, time is now managed by an implementation of the go-clocks package's Clock interface. Additionally, this has a few other properties: - After some time a value is considered "stale" and reset. (so as not to penalize a key that's hung in the candidate cache for hours relative to one that's been evicted and brought back) - QPS is calculated relative to the current "epoch" for that key (which is either the creation time or the last time it was reset due to staleness. To facilitate a simpler Promoter implementation, we add a Hits field that's guaranteed to be 0 on the first fetch. --- galaxycache.go | 84 ++++++++++++---- galaxycache_test.go | 232 ++++++++++++++++++++++++++++++------------- go.mod | 1 + go.sum | 2 + hotcache.go | 114 +++++++++++---------- promoter/promoter.go | 7 +- 6 files changed, 296 insertions(+), 144 deletions(-) diff --git a/galaxycache.go b/galaxycache.go index 59209d36..a995f3c9 100644 --- a/galaxycache.go +++ b/galaxycache.go @@ -35,6 +35,8 @@ import ( "github.com/vimeo/galaxycache/promoter" "github.com/vimeo/galaxycache/singleflight" + "github.com/vimeo/go-clocks" + "go.opencensus.io/stats" "go.opencensus.io/tag" "go.opencensus.io/trace" @@ -142,21 +144,26 @@ func (universe *Universe) NewGalaxy(name string, cacheBytes int64, getter Backen } gOpts := galaxyOpts{ - promoter: &promoter.DefaultPromoter{}, - hcRatio: 8, // default hotcache size is 1/8th of cacheBytes - maxCandidates: 100, + promoter: &promoter.DefaultPromoter{}, + hcRatio: 8, // default hotcache size is 1/8th of cacheBytes + maxCandidates: 100, + clock: clocks.DefaultClock(), + resetIdleStatsAge: time.Minute, } for _, opt := range opts { opt.apply(&gOpts) } g := &Galaxy{ - name: name, - getter: getter, - peerPicker: universe.peerPicker, - cacheBytes: cacheBytes, - mainCache: newCache(MainCache), - hotCache: newCache(HotCache), - candidateCache: newCandidateCache(gOpts.maxCandidates), + name: name, + getter: getter, + peerPicker: universe.peerPicker, + cacheBytes: cacheBytes, + mainCache: newCache(MainCache), + hotCache: newCache(HotCache), + candidateCache: newCandidateCache(gOpts.maxCandidates), + baseTime: gOpts.clock.Now(), + resetIdleStatsAge: gOpts.resetIdleStatsAge, + clock: gOpts.clock, hcStatsWithTime: HCStatsWithTime{ hcs: &promoter.HCStats{ HCCapacity: cacheBytes / gOpts.hcRatio, @@ -241,6 +248,16 @@ type Galaxy struct { opts galaxyOpts + baseTime time.Time + + // Time that must elapse without any touches to a key before we clear + // its stats with the next touch. + // This protects intermittently hot keys from having very low qps + // calculations during a traffic burst. + resetIdleStatsAge time.Duration + + clock clocks.Clock + _ int32 // force Stats to be 8-byte aligned on 32-bit platforms // Stats are statistics on the galaxy. @@ -250,6 +267,11 @@ type Galaxy struct { parent *Universe } +// now returns the current time relative to the baseTime +func (g *Galaxy) now() time.Duration { + return g.clock.Now().Sub(g.baseTime) +} + // GalaxyOption is an interface for implementing functional galaxy options type GalaxyOption interface { apply(*galaxyOpts) @@ -258,9 +280,11 @@ type GalaxyOption interface { // galaxyOpts contains optional fields for the galaxy (each with a default // value if not set) type galaxyOpts struct { - promoter promoter.Interface - hcRatio int64 - maxCandidates int + promoter promoter.Interface + hcRatio int64 + maxCandidates int + clock clocks.Clock + resetIdleStatsAge time.Duration } type funcGalaxyOption func(*galaxyOpts) @@ -298,6 +322,23 @@ func WithMaxCandidates(n int) GalaxyOption { }) } +// WithClock lets one override the clock used internally for key-stats +// accounting (among other things). +func WithClock(clk clocks.Clock) GalaxyOption { + return newFuncGalaxyOption(func(g *galaxyOpts) { + g.clock = clk + }) +} + +// WithIdleStatsAgeResetWindow overrides the default interval after which a key +// that's been idle for a while gets its stats reset (such that that hit is +// recorded as if it were the first). +func WithIdleStatsAgeResetWindow(age time.Duration) GalaxyOption { + return newFuncGalaxyOption(func(g *galaxyOpts) { + g.resetIdleStatsAge = age + }) +} + // flightGroup is defined as an interface which flightgroup.Group // satisfies. We define this so that we may test with an alternate // implementation. @@ -418,7 +459,7 @@ func (g *Galaxy) Get(ctx context.Context, key string, dest Codec) error { if hlvl.isHit() { span.Annotatef([]trace.Attribute{trace.BoolAttribute("cache_hit", true)}, "Cache hit in %s", hlvl) - value.stats.touch() + value.stats.touch(g.resetIdleStatsAge, g.now()) g.recordRequest(ctx, hlvl, false) g.recordStats(ctx, nil, MValueLength.M(int64(len(value.data)))) return dest.UnmarshalBinary(value.data) @@ -436,7 +477,7 @@ func (g *Galaxy) Get(ctx context.Context, key string, dest Codec) error { g.recordStats(ctx, nil, MLoadErrors.M(1)) return err } - value.stats.touch() + value.stats.touch(g.resetIdleStatsAge, g.now()) g.recordStats(ctx, nil, MValueLength.M(int64(len(value.data)))) if destPopulated { return nil @@ -520,7 +561,7 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value valWit g.Stats.CoalescedBackendLoads.Add(1) g.recordStats(ctx, nil, MCoalescedBackendLoads.M(1)) destPopulated = true // only one caller of load gets this return value - value = newValWithStat(data, nil) + value = g.newValWithStat(data, nil) g.populateCache(ctx, key, value, &g.mainCache) return &valWithLevel{value, hitBackend, authoritative, peerErr, err}, nil }) @@ -549,15 +590,22 @@ func (g *Galaxy) getFromPeer(ctx context.Context, peer RemoteFetcher, key string kStats, ok := g.candidateCache.get(key) if !ok { kStats = g.addNewToCandidateCache(key) + // NB: we do not touch() kStats here because that's reserved + // for code outside the singleflight block. + // This has the advantageous effect of guaranteeing that + // hitCount is 0 if it's a new key, thus making it easy for a + // promoter to distinguish a new key. } g.maybeUpdateHotCacheStats() // will update if at least a second has passed since the last update + hitCount, keyQPS := kStats.val(g.now()) stats := promoter.Stats{ - KeyQPS: kStats.val(), + KeyQPS: keyQPS, + Hits: hitCount, HCStats: g.hcStatsWithTime.hcs, } - value := newValWithStat(data, kStats) + value := g.newValWithStat(data, kStats) if g.opts.promoter.ShouldPromote(key, value.data, stats) { g.populateCache(ctx, key, value, &g.hotCache) } diff --git a/galaxycache_test.go b/galaxycache_test.go index c89cbabe..15c97108 100644 --- a/galaxycache_test.go +++ b/galaxycache_test.go @@ -32,6 +32,7 @@ import ( "unsafe" "github.com/vimeo/galaxycache/promoter" + "github.com/vimeo/go-clocks/fake" "go.opencensus.io/stats/view" "go.opencensus.io/tag" ) @@ -383,8 +384,8 @@ func TestNoDedup(t *testing.T) { // If the singleflight callback doesn't double-check the cache again // upon entry, we would increment nbytes twice but the entry would // only be in the cache once. - testKStats := keyStats{dQPS: dampedQPS{period: time.Second}} - testvws := newValWithStat([]byte(testval), &testKStats) + testKStats := keyStats{dQPS: windowedAvgQPS{}} + testvws := g.newValWithStat([]byte(testval), &testKStats) wantBytes := int64(len(testkey)) + testvws.size() if g.mainCache.nbytes != wantBytes { t.Errorf("cache has %d bytes, want %d", g.mainCache.nbytes, wantBytes) @@ -402,88 +403,103 @@ func TestGalaxyStatsAlignment(t *testing.T) { func TestHotcache(t *testing.T) { keyToAdd := "hi" hcTests := []struct { - name string - numGets int - numHeatBursts int - burstInterval time.Duration - timeToVal time.Duration - expectedBurstQPS float64 - expectedValQPS float64 + name string + numGets int + numHeatBursts int + resetIdleAge time.Duration + burstInterval time.Duration + timeToVal time.Duration + expectedBurstQPS float64 + expectedBurstCount int64 + expectedValQPS float64 + expectedFinalCount int64 }{ { - name: "10k_heat_burst_1_sec", - numGets: 10000, - numHeatBursts: 5, - burstInterval: 1 * time.Second, - timeToVal: 5 * time.Second, - expectedBurstQPS: 1559.0, - expectedValQPS: 1316.0, + name: "10k_heat_burst_1_sec", + numGets: 10000, + numHeatBursts: 5, + resetIdleAge: time.Minute, + burstInterval: 1 * time.Second, + timeToVal: 5 * time.Second, + expectedBurstQPS: 10000.0, + expectedBurstCount: 50000, + expectedValQPS: 5000.0, + expectedFinalCount: 50000, }, { - name: "10k_heat_burst_5_secs", - numGets: 10000, - numHeatBursts: 5, - burstInterval: 5 * time.Second, - timeToVal: 5 * time.Second, - expectedBurstQPS: 1067.0, - expectedValQPS: 900.5, + name: "10k_heat_burst_5_secs", + numGets: 10000, + numHeatBursts: 5, + resetIdleAge: time.Minute, + burstInterval: 5 * time.Second, + timeToVal: 25 * time.Second, + expectedBurstQPS: 2000.0, + expectedBurstCount: 50000, + expectedValQPS: 1000.0, + expectedFinalCount: 50000, }, { - name: "1k_heat_burst_1_secs", - numGets: 1000, - numHeatBursts: 5, - burstInterval: 1 * time.Second, - timeToVal: 5 * time.Second, - expectedBurstQPS: 155.9, - expectedValQPS: 131.6, - }, - { - name: "1k_heat_burst_5_secs", - numGets: 1000, - numHeatBursts: 5, - burstInterval: 5 * time.Second, - timeToVal: 5 * time.Second, - expectedBurstQPS: 106.7, - expectedValQPS: 90.05, + name: "1k_heat_burst_4s_3s_reset_interval", + numGets: 1000, + numHeatBursts: 3, + resetIdleAge: time.Second * 3, + burstInterval: 4 * time.Second, + timeToVal: 5 * time.Second, + expectedBurstQPS: 250.0, // only the last burst + expectedBurstCount: 1000, + expectedValQPS: 111.1, + expectedFinalCount: 1000, }, } for _, tc := range hcTests { t.Run(tc.name, func(t *testing.T) { u := NewUniverse(&TestProtocol{}, "test-universe") + nowTime := time.Now() + fc := fake.NewClock(nowTime) g := u.NewGalaxy("test-galaxy", 1<<20, GetterFunc(func(_ context.Context, key string, dest Codec) error { return dest.UnmarshalBinary([]byte("hello")) - })) + }), WithClock(fc), WithIdleStatsAgeResetWindow(tc.resetIdleAge)) + relNow := nowTime.Sub(g.baseTime) kStats := &keyStats{ - dQPS: dampedQPS{ - period: time.Second, - }, + dQPS: windowedAvgQPS{trackEpoch: relNow}, } - value := newValWithStat([]byte("hello"), kStats) + value := g.newValWithStat([]byte("hello"), kStats) g.hotCache.add(keyToAdd, value) - now := time.Now() + // blast the key in the hotcache with a bunch of hypothetical gets every few seconds for k := 0; k < tc.numHeatBursts; k++ { - for k := 0; k < tc.numGets; k++ { - kStats.dQPS.touch(now) + for ik := 0; ik < tc.numGets; ik++ { + // force this key up to the top of the LRU + g.hotCache.get(keyToAdd) + kStats.dQPS.touch(g.resetIdleStatsAge, relNow) } - t.Logf("QPS on %d gets in 1 second on burst #%d: %f\n", tc.numGets, k+1, kStats.dQPS.curDQPS) - now = now.Add(tc.burstInterval) + t.Logf("QPS on %d gets in 1 second on burst #%d: %d\n", tc.numGets, k+1, kStats.dQPS.count) + relNow += tc.burstInterval + fc.Advance(tc.burstInterval) } - val := kStats.dQPS.val(now) + cnt, val := kStats.dQPS.val(relNow) if math.Abs(val-tc.expectedBurstQPS) > val/100 { // ensure less than %1 error t.Errorf("QPS after bursts: %f, Wanted: %f", val, tc.expectedBurstQPS) } - value2 := newValWithStat([]byte("hello there"), nil) + if cnt != tc.expectedBurstCount { + t.Errorf("hit-count unexpected: %d; wanted %d", cnt, tc.expectedBurstCount) + } + value2 := g.newValWithStat([]byte("hello there"), nil) g.hotCache.add(keyToAdd+"2", value2) // ensure that hcStats are properly updated after adding g.maybeUpdateHotCacheStats() t.Logf("Hottest QPS: %f, Coldest QPS: %f\n", g.hcStatsWithTime.hcs.MostRecentQPS, g.hcStatsWithTime.hcs.LeastRecentQPS) - now = now.Add(tc.timeToVal) - val = kStats.dQPS.val(now) - if math.Abs(val-tc.expectedValQPS) > val/100 { - t.Errorf("QPS after delayed Val() call: %f, Wanted: %f", val, tc.expectedBurstQPS) + relNow += tc.timeToVal + fc.Advance(tc.timeToVal) + cnt2, val2 := kStats.dQPS.val(relNow) + if math.Abs(val2-tc.expectedValQPS) > val2/100 { + t.Errorf("QPS after delayed Val() call; %s elapsed since val birth: %f, Wanted: %f", + relNow-kStats.dQPS.trackEpoch, val2, tc.expectedBurstQPS) + } + if cnt2 != tc.expectedFinalCount { + t.Errorf("hit-count unexpected: %d; wanted %d", cnt2, tc.expectedFinalCount) } }) } @@ -501,23 +517,33 @@ func (p *promoteFromCandidate) ShouldPromote(key string, data []byte, stats prom return false } +type trackingNeverPromoter struct { + stats []promoter.Stats +} + +func (t *trackingNeverPromoter) ShouldPromote(key string, data []byte, stats promoter.Stats) bool { + t.stats = append(t.stats, stats) + return false +} + // Ensures cache entries move properly through the stages of candidacy // to full hotcache member. Simulates a galaxy where elements are always promoted, // never promoted, etc func TestPromotion(t *testing.T) { - ctx := context.Background() - testKey := "to-get" + outerCtx := context.Background() + const testKey = "to-get" testCases := []struct { - testName string - promoter promoter.Interface - cacheSize int64 - checkCache func(ctx context.Context, t testing.TB, key string, val interface{}, okCand bool, okHot bool, tf *TestFetcher, g *Galaxy) + testName string + promoter promoter.Interface + cacheSize int64 + firstCheck func(ctx context.Context, t testing.TB, key string, val valWithStat, okCand bool, okHot bool, tf *TestFetcher, g *Galaxy) + secondCheck func(ctx context.Context, t testing.TB, key string, val valWithStat, okCand bool, okHot bool, tf *TestFetcher, g *Galaxy) }{ { testName: "never_promote", promoter: promoter.Func(func(key string, data []byte, stats promoter.Stats) bool { return false }), cacheSize: 1 << 20, - checkCache: func(_ context.Context, t testing.TB, _ string, _ interface{}, okCand bool, okHot bool, _ *TestFetcher, _ *Galaxy) { + firstCheck: func(_ context.Context, t testing.TB, _ string, _ valWithStat, okCand bool, okHot bool, _ *TestFetcher, _ *Galaxy) { if !okCand { t.Error("Candidate not found in candidate cache") } @@ -530,10 +556,10 @@ func TestPromotion(t *testing.T) { testName: "always_promote", promoter: promoter.Func(func(key string, data []byte, stats promoter.Stats) bool { return true }), cacheSize: 1 << 20, - checkCache: func(_ context.Context, t testing.TB, _ string, val interface{}, _ bool, okHot bool, _ *TestFetcher, _ *Galaxy) { + firstCheck: func(_ context.Context, t testing.TB, _ string, val valWithStat, _ bool, okHot bool, _ *TestFetcher, _ *Galaxy) { if !okHot { t.Error("Key not found in hotcache") - } else if val == nil { + } else if val.data == nil { t.Error("Found element in hotcache, but no associated data") } }, @@ -542,7 +568,7 @@ func TestPromotion(t *testing.T) { testName: "candidate_promotion", promoter: &promoteFromCandidate{}, cacheSize: 1 << 20, - checkCache: func(ctx context.Context, t testing.TB, key string, _ interface{}, okCand bool, okHot bool, tf *TestFetcher, g *Galaxy) { + firstCheck: func(ctx context.Context, t testing.TB, key string, _ valWithStat, okCand bool, okHot bool, tf *TestFetcher, g *Galaxy) { if !okCand { t.Error("Candidate not found in candidate cache") } @@ -559,24 +585,88 @@ func TestPromotion(t *testing.T) { } }, }, + { + testName: "never_promote_but_track", + promoter: &trackingNeverPromoter{}, + cacheSize: 1 << 20, + firstCheck: func(_ context.Context, t testing.TB, _ string, _ valWithStat, okCand bool, okHot bool, _ *TestFetcher, g *Galaxy) { + if okHot { + t.Errorf("value unexpectedly in hot-cache") + } + if !okCand { + t.Errorf("value unexpectedly missing from candidate-cache") + } + pro := g.opts.promoter.(*trackingNeverPromoter) + if pro.stats == nil { + t.Errorf("no stats recorded; promoter not called") + return + } else if len(pro.stats) != 1 { + t.Errorf("incorrect call-count for promoter: %d; expected %d", len(pro.stats), 1) + return + } + if pro.stats[0].Hits != 0 { + t.Errorf("first hit had non-zero hits: %d", pro.stats[0].Hits) + } + if pro.stats[0].KeyQPS != 0 { + t.Errorf("first hit had non-zero QPS: %f", pro.stats[0].KeyQPS) + } + }, + secondCheck: func(_ context.Context, t testing.TB, _ string, _ valWithStat, okCand bool, okHot bool, _ *TestFetcher, g *Galaxy) { + if okHot { + t.Errorf("value unexpectedly in hot-cache") + } + if !okCand { + t.Errorf("value unexpectedly missing from candidate-cache") + } + pro := g.opts.promoter.(*trackingNeverPromoter) + if pro.stats == nil { + t.Errorf("no stats recorded; promoter not called") + return + } else if len(pro.stats) != 2 { + t.Errorf("incorrect call-count for promoter: %d; expected %d", len(pro.stats), 2) + return + } + if pro.stats[1].Hits != 1 { + t.Errorf("second hit had unexpected hits: got %d; want %d", pro.stats[1].Hits, 1) + } + // one hit with near-zero-time passed == time will be bounded by the minimum time, which + // is 100ms. + if pro.stats[1].KeyQPS != 10.0 { + t.Errorf("second hit had unexpected QPS: got %f; want %f", pro.stats[1].KeyQPS, 10.0) + } + }, + }, } for _, tc := range testCases { t.Run(tc.testName, func(t *testing.T) { - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(outerCtx) defer cancel() fetcher := &TestFetcher{} - testProto := &TestProtocol{} + testProto := &TestProtocol{TestFetchers: map[string]*TestFetcher{"foobar": fetcher}} getter := func(_ context.Context, key string, dest Codec) error { return dest.UnmarshalBinary([]byte("got:" + key)) } universe := NewUniverse(testProto, "promotion-test") + universe.Set("foobar") galaxy := universe.NewGalaxy("test-galaxy", tc.cacheSize, GetterFunc(getter), WithPromoter(tc.promoter)) - galaxy.getFromPeer(ctx, fetcher, testKey) - _, okCandidate := galaxy.candidateCache.get(testKey) - value, okHot := galaxy.hotCache.get(testKey) - tc.checkCache(ctx, t, testKey, value, okCandidate, okHot, fetcher, galaxy) + sc := StringCodec("") + { + galaxy.Get(ctx, testKey, &sc) + _, okCandidate := galaxy.candidateCache.get(testKey) + value, okHot := galaxy.hotCache.get(testKey) + tc.firstCheck(ctx, t, testKey, value, okCandidate, okHot, fetcher, galaxy) + } + if tc.secondCheck == nil { + return + } + { + galaxy.Get(ctx, testKey, &sc) + _, okCandidate := galaxy.candidateCache.get(testKey) + value, okHot := galaxy.hotCache.get(testKey) + tc.secondCheck(ctx, t, testKey, value, okCandidate, okHot, fetcher, galaxy) + } }) } diff --git a/go.mod b/go.mod index d47f95cc..99be0fb1 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.18 require ( github.com/golang/protobuf v1.4.3 + github.com/vimeo/go-clocks v1.1.2 go.opencensus.io v0.22.5 google.golang.org/grpc v1.35.0 ) diff --git a/go.sum b/go.sum index 1a6b1974..a4a9f11a 100644 --- a/go.sum +++ b/go.sum @@ -36,6 +36,8 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1: github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/vimeo/go-clocks v1.1.2 h1:HAh+gZT4PIIWJEIc/cXi1EPExZWMUgL/9lO0SA+EPrw= +github.com/vimeo/go-clocks v1.1.2/go.mod h1:coJz9AfolJ/xWbjgudyoJew7Kw/kV17P3fLIumNLjEg= go.opencensus.io v0.22.5 h1:dntmOdLpSpHlVqbW5Eay97DelsZHe+55D+xC6i0dDS0= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/hotcache.go b/hotcache.go index 69e2111b..a1c9ab95 100644 --- a/hotcache.go +++ b/hotcache.go @@ -17,7 +17,6 @@ limitations under the License. package galaxycache import ( - "math" "sync" "time" @@ -29,17 +28,18 @@ import ( func (g *Galaxy) maybeUpdateHotCacheStats() { g.mu.Lock() defer g.mu.Unlock() - now := time.Now() + now := g.clock.Now() if now.Sub(g.hcStatsWithTime.t) < time.Second { return } + nowRel := now.Sub(g.baseTime) mruEleQPS := 0.0 lruEleQPS := 0.0 mruEle := g.hotCache.mostRecent() lruEle := g.hotCache.leastRecent() if mruEle != nil { // lru contains at least one element - mruEleQPS = mruEle.stats.val() - lruEleQPS = lruEle.stats.val() + _, mruEleQPS = mruEle.stats.val(nowRel) + _, lruEleQPS = lruEle.stats.val(nowRel) } newHCS := &promoter.HCStats{ @@ -54,12 +54,18 @@ func (g *Galaxy) maybeUpdateHotCacheStats() { // keyStats keeps track of the hotness of a key type keyStats struct { - dQPS dampedQPS + dQPS windowedAvgQPS } -func newValWithStat(data []byte, kStats *keyStats) valWithStat { +func (g *Galaxy) newValWithStat(data []byte, kStats *keyStats) valWithStat { if kStats == nil { - kStats = &keyStats{dampedQPS{period: time.Second}} + kStats = &keyStats{ + dQPS: windowedAvgQPS{ + trackEpoch: g.now(), + lastRef: g.now(), + count: 0, + }, + } } return valWithStat{ @@ -68,69 +74,69 @@ func newValWithStat(data []byte, kStats *keyStats) valWithStat { } } -func (k *keyStats) val() float64 { - return k.dQPS.val(time.Now()) +func (k *keyStats) val(now time.Duration) (int64, float64) { + return k.dQPS.val(now) } -func (k *keyStats) touch() { - k.dQPS.touch(time.Now()) +func (k *keyStats) touch(resetIdleAge, now time.Duration) { + k.dQPS.touch(resetIdleAge, now) } -// dampedQPS is an average that recombines the current state with the previous. -type dampedQPS struct { - mu sync.Mutex - period time.Duration - t time.Time - curDQPS float64 - count float64 -} +type windowedAvgQPS struct { + // time used for the denominator of the calculation + // measured relative to the galaxy baseTime + trackEpoch time.Duration + + // last time the count was updated + lastRef time.Duration -// must be between 0 and 1, the fraction of the new value that comes from -// current rather than previous. -// if `samples` is the number of samples into the damped weighted average you -// want to maximize the fraction of the contribution after; x is the damping -// constant complement (this way we don't have to multiply out (1-x) ^ samples) -// f(x) = (1 - x) * x ^ samples = x ^samples - x ^(samples + 1) -// f'(x) = samples * x ^ (samples - 1) - (samples + 1) * x ^ samples -// this yields a critical point at x = (samples - 1) / samples -const dampingConstant = (1.0 / 30.0) // 30 seconds (30 samples at a 1s interval) -const dampingConstantComplement = 1.0 - dampingConstant - -func (d *dampedQPS) touch(now time.Time) { - d.mu.Lock() - defer d.mu.Unlock() - d.maybeFlush(now) - d.count++ + count int64 + + // protects all above + mu sync.Mutex } -// d.mu must be held when calling maybeFlush (otherwise racy) -func (d *dampedQPS) maybeFlush(now time.Time) { - if d.t.IsZero() { - d.t = now +func (a *windowedAvgQPS) touch(resetIdleAge, now time.Duration) { + a.mu.Lock() + defer a.mu.Unlock() + + // if the last touch was longer ago than resetIdleAge, pretend this is + // a new entry. + // This protects against the case where an entry manages to hang in the + // cache while idle but only gets hit hard periodically. + if now-a.lastRef > resetIdleAge { + a.trackEpoch = now + a.lastRef = now + a.count = 1 return } - if now.Sub(d.t) < d.period { - return + + if a.lastRef < now { + // another goroutine may have grabbed a later "now" value + // before acquiring this lock before this one. + // Try not to let the timestamp go backwards due to races. + a.lastRef = now } - curDQPS, cur := d.curDQPS, d.count - exponent := math.Floor(float64(now.Sub(d.t))/float64(d.period)) - 1 - d.curDQPS = ((dampingConstant * cur) + (dampingConstantComplement * curDQPS)) * math.Pow(dampingConstantComplement, exponent) - d.count = 0 - d.t = now + a.count++ } -func (d *dampedQPS) val(now time.Time) float64 { - d.mu.Lock() - defer d.mu.Unlock() - d.maybeFlush(now) - return d.curDQPS +func (a *windowedAvgQPS) val(now time.Duration) (int64, float64) { + a.mu.Lock() + defer a.mu.Unlock() + age := now - a.trackEpoch + + // Set a small minimum interval so one request that was super-recent + // doesn't give a huge rate. + const minInterval = 100 * time.Millisecond + if age < minInterval { + age = minInterval + } + return a.count, float64(a.count) / age.Seconds() } func (g *Galaxy) addNewToCandidateCache(key string) *keyStats { kStats := &keyStats{ - dQPS: dampedQPS{ - period: time.Second, - }, + dQPS: windowedAvgQPS{trackEpoch: g.now()}, } g.candidateCache.addToCandidateCache(key, kStats) diff --git a/promoter/promoter.go b/promoter/promoter.go index 9c82e76a..b1f09e5f 100644 --- a/promoter/promoter.go +++ b/promoter/promoter.go @@ -30,7 +30,12 @@ type HCStats struct { // Stats contains both the KeyQPS and a pointer to the galaxy-wide // HCStats type Stats struct { - KeyQPS float64 + // Request-rate for this key (possibly with some windowing applied) + KeyQPS float64 + // Number of hits for this key (also possibly with some windowing applied) + // This will be zero if there is no record of this key (not seen before + // or tracking expired) + Hits int64 HCStats *HCStats }