Skip to content

Commit

Permalink
Merge pull request #26 from vimeo/generic_lru_stack
Browse files Browse the repository at this point in the history
Leverage Generics to improve LRU type-friendliness
  • Loading branch information
dfinkel authored Jul 29, 2022
2 parents b7e5d71 + 75aac24 commit 83c6207
Show file tree
Hide file tree
Showing 13 changed files with 695 additions and 119 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ jobs:
strategy:
matrix:
os: [macOS-latest, ubuntu-latest]
goversion: [1.13, 1.14, 1.15]
goversion: [1.17, 1.18, '1.19.0-rc.2']
steps:
- name: Set up Go ${{matrix.goversion}} on ${{matrix.os}}
uses: actions/setup-go@v1
uses: actions/setup-go@v3
with:
go-version: ${{matrix.goversion}}
id: go
Expand Down
42 changes: 42 additions & 0 deletions atomic_int.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//go:build !go1.19

/*
Copyright 2012 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package galaxycache

import (
"strconv"
"sync/atomic"
)

// An AtomicInt is an int64 to be accessed atomically.
// It thinly wraps atomic.Int64 with go1.19+
type AtomicInt int64

// Add atomically adds n to i.
func (i *AtomicInt) Add(n int64) {
atomic.AddInt64((*int64)(i), n)
}

// Get atomically gets the value of i.
func (i *AtomicInt) Get() int64 {
return atomic.LoadInt64((*int64)(i))
}

func (i *AtomicInt) String() string {
return strconv.FormatInt(i.Get(), 10)
}
44 changes: 44 additions & 0 deletions atomic_int_go1.19.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
//go:build go1.19

/*
Copyright 2022 Vimeo Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package galaxycache

import (
"strconv"
"sync/atomic"
)

// An AtomicInt is an int64 to be accessed atomically.
// It thinly wraps atomic.Int64 with go1.19+
type AtomicInt struct {
v atomic.Int64
}

// Add atomically adds n to i.
func (i *AtomicInt) Add(n int64) {
i.v.Add(n)
}

// Get atomically gets the value of i.
func (i *AtomicInt) Get() int64 {
return i.v.Load()
}

func (i *AtomicInt) String() string {
return strconv.FormatInt(i.Get(), 10)
}
121 changes: 27 additions & 94 deletions galaxycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,10 @@ import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/vimeo/galaxycache/lru"
"github.com/vimeo/galaxycache/promoter"
"github.com/vimeo/galaxycache/singleflight"

Expand Down Expand Up @@ -153,22 +150,13 @@ func (universe *Universe) NewGalaxy(name string, cacheBytes int64, getter Backen
opt.apply(&gOpts)
}
g := &Galaxy{
name: name,
getter: getter,
peerPicker: universe.peerPicker,
cacheBytes: cacheBytes,
mainCache: cache{
ctype: MainCache,
lru: lru.New(0),
},
hotCache: cache{
ctype: HotCache,
lru: lru.New(0),
},
candidateCache: cache{
ctype: CandidateCache,
lru: lru.New(gOpts.maxCandidates),
},
name: name,
getter: getter,
peerPicker: universe.peerPicker,
cacheBytes: cacheBytes,
mainCache: newCache(MainCache),
hotCache: newCache(HotCache),
candidateCache: newCandidateCache(gOpts.maxCandidates),
hcStatsWithTime: HCStatsWithTime{
hcs: &promoter.HCStats{
HCCapacity: cacheBytes / gOpts.hcRatio,
Expand All @@ -178,9 +166,6 @@ func (universe *Universe) NewGalaxy(name string, cacheBytes int64, getter Backen
}
g.mainCache.setLRUOnEvicted(nil)
g.hotCache.setLRUOnEvicted(g.candidateCache.addToCandidateCache)
g.candidateCache.lru.OnEvicted = func(key lru.Key, value interface{}) {
g.candidateCache.nevict++
}
g.parent = universe

universe.galaxies[name] = g
Expand Down Expand Up @@ -245,7 +230,7 @@ type Galaxy struct {
// of key/value pairs that can be stored globally.
hotCache cache

candidateCache cache
candidateCache candidateCache

hcStatsWithTime HCStatsWithTime

Expand Down Expand Up @@ -460,15 +445,15 @@ func (g *Galaxy) Get(ctx context.Context, key string, dest Codec) error {
}

type valWithLevel struct {
val *valWithStat
val valWithStat
level hitLevel
localAuthoritative bool
peerErr error
localErr error
}

// load loads key either by invoking the getter locally or by sending it to another machine.
func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWithStat, destPopulated bool, err error) {
func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value valWithStat, destPopulated bool, err error) {
g.Stats.Loads.Add(1)
g.recordStats(ctx, nil, MLoads.M(1))

Expand Down Expand Up @@ -556,19 +541,18 @@ func (g *Galaxy) getLocally(ctx context.Context, key string, dest Codec) ([]byte
return dest.MarshalBinary()
}

func (g *Galaxy) getFromPeer(ctx context.Context, peer RemoteFetcher, key string) (*valWithStat, error) {
func (g *Galaxy) getFromPeer(ctx context.Context, peer RemoteFetcher, key string) (valWithStat, error) {
data, err := peer.Fetch(ctx, g.name, key)
if err != nil {
return nil, err
return valWithStat{}, err
}
vi, ok := g.candidateCache.get(key)
kStats, ok := g.candidateCache.get(key)
if !ok {
vi = g.addNewToCandidateCache(key)
kStats = g.addNewToCandidateCache(key)
}

g.maybeUpdateHotCacheStats() // will update if at least a second has passed since the last update

kStats := vi.(*keyStats)
stats := promoter.Stats{
KeyQPS: kStats.val(),
HCStats: g.hcStatsWithTime.hcs,
Expand All @@ -580,23 +564,23 @@ func (g *Galaxy) getFromPeer(ctx context.Context, peer RemoteFetcher, key string
return value, nil
}

func (g *Galaxy) lookupCache(key string) (*valWithStat, hitLevel) {
func (g *Galaxy) lookupCache(key string) (valWithStat, hitLevel) {
if g.cacheBytes <= 0 {
return nil, miss
return valWithStat{}, miss
}
vi, ok := g.mainCache.get(key)
if ok {
return vi.(*valWithStat), hitMaincache
return vi, hitMaincache
}
vi, ok = g.hotCache.get(key)
if !ok {
return nil, miss
return valWithStat{}, miss
}
g.Stats.HotcacheHits.Add(1)
return vi.(*valWithStat), hitHotcache
return vi, hitHotcache
}

func (g *Galaxy) populateCache(ctx context.Context, key string, value *valWithStat, cache *cache) {
func (g *Galaxy) populateCache(ctx context.Context, key string, value valWithStat, cache *cache) {
if g.cacheBytes <= 0 {
return
}
Expand Down Expand Up @@ -661,24 +645,13 @@ func (g *Galaxy) CacheStats(which CacheType) CacheStats {
case HotCache:
return g.hotCache.stats()
case CandidateCache:
return g.candidateCache.stats()
// not worth tracking this for the CandidateCache
return CacheStats{}
default:
return CacheStats{}
}
}

// cache is a wrapper around an *lru.Cache that adds synchronization
// and counts the size of all keys and values. Candidate cache only
// utilizes the lru.Cache and mutex, not the included stats.
type cache struct {
mu sync.Mutex
nbytes int64 // of all keys and values
lru *lru.Cache
nhit, nget int64
nevict int64 // number of evictions
ctype CacheType
}

func (c *cache) stats() CacheStats {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -699,44 +672,21 @@ type valWithStat struct {
// sizeOfValWithStats returns the total size of the value in the hot/main
// cache, including the data, key stats, and a pointer to the val itself
func (v *valWithStat) size() int64 {
const statsSize = int64(unsafe.Sizeof(*v.stats))
const ptrSize = int64(unsafe.Sizeof(v))
const vwsSize = int64(unsafe.Sizeof(*v))
// using cap() instead of len() for data leads to inconsistency
// after unmarshaling/marshaling the data
return int64(unsafe.Sizeof(*v.stats)) + int64(len(v.data)) + int64(unsafe.Sizeof(v)) + int64(unsafe.Sizeof(*v))
}

func (c *cache) setLRUOnEvicted(f func(key string, kStats *keyStats)) {
c.lru.OnEvicted = func(key lru.Key, value interface{}) {
val := value.(*valWithStat)
c.nbytes -= int64(len(key.(string))) + val.size()
c.nevict++
if f != nil {
f(key.(string), val.stats)
}
}
return statsSize + ptrSize + vwsSize + int64(len(v.data))
}

func (c *cache) add(key string, value *valWithStat) {
func (c *cache) add(key string, value valWithStat) {
c.mu.Lock()
defer c.mu.Unlock()
c.lru.Add(key, value)
c.nbytes += int64(len(key)) + value.size()
}

func (c *cache) get(key string) (vi interface{}, ok bool) {
c.mu.Lock()
defer c.mu.Unlock()
c.nget++
if c.lru == nil {
return
}
vi, ok = c.lru.Get(key)
if !ok {
return
}
c.nhit++
return vi, true
}

func (c *cache) removeOldest() {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down Expand Up @@ -765,23 +715,6 @@ func (c *cache) itemsLocked() int64 {
return int64(c.lru.Len())
}

// An AtomicInt is an int64 to be accessed atomically.
type AtomicInt int64

// Add atomically adds n to i.
func (i *AtomicInt) Add(n int64) {
atomic.AddInt64((*int64)(i), n)
}

// Get atomically gets the value of i.
func (i *AtomicInt) Get() int64 {
return atomic.LoadInt64((*int64)(i))
}

func (i *AtomicInt) String() string {
return strconv.FormatInt(i.Get(), 10)
}

// CacheStats are returned by stats accessors on Galaxy.
type CacheStats struct {
Bytes int64
Expand Down
7 changes: 4 additions & 3 deletions galaxycache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,6 @@ func (fetcher *TestFetcher) Close() error {
return nil
}

type testFetchers []RemoteFetcher

func (fetcher *TestFetcher) Fetch(ctx context.Context, galaxy string, key string) ([]byte, error) {
if fetcher.fail {
return nil, errors.New("simulated error from peer")
Expand Down Expand Up @@ -553,7 +551,10 @@ func TestPromotion(t *testing.T) {
}
g.getFromPeer(ctx, tf, key)
val, okHot := g.hotCache.get(key)
if string(val.(*valWithStat).data) != "got:"+testKey {
if !okHot {
t.Errorf("key %q missing from hot cache", key)
}
if string(val.data) != "got:"+testKey {
t.Error("Did not promote from candidacy")
}
},
Expand Down
12 changes: 10 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
module github.com/vimeo/galaxycache

go 1.12
go 1.18

require (
github.com/golang/protobuf v1.4.3
go.opencensus.io v0.22.5
golang.org/x/net v0.0.0-20210119194325-5f4716e94777 // indirect
google.golang.org/grpc v1.35.0
)

require (
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
golang.org/x/net v0.0.0-20210119194325-5f4716e94777 // indirect
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 // indirect
golang.org/x/text v0.3.3 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
google.golang.org/protobuf v1.25.0 // indirect
)
Loading

0 comments on commit 83c6207

Please sign in to comment.