Skip to content

Commit

Permalink
Fix matcher cache (#8039)
Browse files Browse the repository at this point in the history
* Fix matcher cache

Signed-off-by: alanprot <[email protected]>

* Simplifying cache interface

Signed-off-by: alanprot <[email protected]>

---------

Signed-off-by: alanprot <[email protected]>
  • Loading branch information
alanprot authored Jan 6, 2025
1 parent 6e29530 commit bed76cf
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 47 deletions.
47 changes: 11 additions & 36 deletions pkg/store/cache/matchers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,23 @@
package storecache

import (
"fmt"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
"golang.org/x/sync/singleflight"

"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
)

const DefaultCacheSize = 200

type NewItemFunc func(matcher ConversionLabelMatcher) (*labels.Matcher, error)
type NewItemFunc func() (*labels.Matcher, error)

type MatchersCache interface {
// GetOrSet retrieves a matcher from cache or creates and stores it if not present.
// If the matcher is not in cache, it uses the provided newItem function to create it.
GetOrSet(key ConversionLabelMatcher, newItem NewItemFunc) (*labels.Matcher, error)
GetOrSet(key string, newItem NewItemFunc) (*labels.Matcher, error)
}

// Ensure implementations satisfy the interface.
Expand All @@ -41,14 +38,14 @@ func NewNoopMatcherCache() MatchersCache {
}

// GetOrSet implements MatchersCache by always creating a new matcher without caching.
func (n *NoopMatcherCache) GetOrSet(key ConversionLabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) {
return newItem(key)
func (n *NoopMatcherCache) GetOrSet(_ string, newItem NewItemFunc) (*labels.Matcher, error) {
return newItem()
}

// LruMatchersCache implements MatchersCache with an LRU cache and metrics.
type LruMatchersCache struct {
reg prometheus.Registerer
cache *lru.Cache[ConversionLabelMatcher, *labels.Matcher]
cache *lru.Cache[string, *labels.Matcher]
metrics *matcherCacheMetrics
size int
sf singleflight.Group
Expand Down Expand Up @@ -78,7 +75,7 @@ func NewMatchersCache(opts ...MatcherCacheOption) (*LruMatchersCache, error) {
}
cache.metrics = newMatcherCacheMetrics(cache.reg)

lruCache, err := lru.NewWithEvict[ConversionLabelMatcher, *labels.Matcher](cache.size, cache.onEvict)
lruCache, err := lru.NewWithEvict[string, *labels.Matcher](cache.size, cache.onEvict)
if err != nil {
return nil, err
}
Expand All @@ -87,16 +84,15 @@ func NewMatchersCache(opts ...MatcherCacheOption) (*LruMatchersCache, error) {
return cache, nil
}

func (c *LruMatchersCache) GetOrSet(key ConversionLabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) {
func (c *LruMatchersCache) GetOrSet(key string, newItem NewItemFunc) (*labels.Matcher, error) {
c.metrics.requestsTotal.Inc()

v, err, _ := c.sf.Do(key.String(), func() (interface{}, error) {
v, err, _ := c.sf.Do(key, func() (interface{}, error) {
if item, ok := c.cache.Get(key); ok {
c.metrics.hitsTotal.Inc()
return item, nil
}

item, err := newItem(key)
item, err := newItem()
if err != nil {
return nil, err
}
Expand All @@ -111,7 +107,7 @@ func (c *LruMatchersCache) GetOrSet(key ConversionLabelMatcher, newItem NewItemF
return v.(*labels.Matcher), nil
}

func (c *LruMatchersCache) onEvict(_ ConversionLabelMatcher, _ *labels.Matcher) {
func (c *LruMatchersCache) onEvict(_ string, _ *labels.Matcher) {
c.metrics.evicted.Inc()
c.metrics.numItems.Set(float64(c.cache.Len()))
}
Expand Down Expand Up @@ -155,32 +151,11 @@ func newMatcherCacheMetrics(reg prometheus.Registerer) *matcherCacheMetrics {
func MatchersToPromMatchersCached(cache MatchersCache, ms ...storepb.LabelMatcher) ([]*labels.Matcher, error) {
res := make([]*labels.Matcher, 0, len(ms))
for i := range ms {
pm, err := cache.GetOrSet(&ms[i], MatcherToPromMatcher)
pm, err := cache.GetOrSet(ms[i].String(), func() (*labels.Matcher, error) { return storepb.MatcherToPromMatcher(ms[i]) })
if err != nil {
return nil, err
}
res = append(res, pm)
}
return res, nil
}

func MatcherToPromMatcher(m ConversionLabelMatcher) (*labels.Matcher, error) {
mi, ok := m.(*storepb.LabelMatcher)
if !ok {
return nil, fmt.Errorf("invalid matcher type. Got: %T", m)
}

return storepb.MatcherToPromMatcher(*mi)
}

// ConversionLabelMatcher is a common interface for the Prometheus and Thanos label matchers.
type ConversionLabelMatcher interface {
String() string
GetName() string
GetValue() string
}

var (
_ ConversionLabelMatcher = (*storepb.LabelMatcher)(nil)
_ ConversionLabelMatcher = (*prompb.LabelMatcher)(nil)
)
26 changes: 15 additions & 11 deletions pkg/store/cache/matchers_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,51 +36,53 @@ func TestMatchersCache(t *testing.T) {
}

var cacheHit bool
newItem := func(matcher storecache.ConversionLabelMatcher) (*labels.Matcher, error) {
cacheHit = false
return storecache.MatcherToPromMatcher(matcher)
newItem := func(matcher *storepb.LabelMatcher) func() (*labels.Matcher, error) {
return func() (*labels.Matcher, error) {
cacheHit = false
return storepb.MatcherToPromMatcher(*matcher)
}
}
expected := labels.MustNewMatcher(labels.MatchEqual, "key", "val")
expected2 := labels.MustNewMatcher(labels.MatchRegexp, "key2", "val2|val3")
expected3 := labels.MustNewMatcher(labels.MatchEqual, "key3", "val3")

item, err := cache.GetOrSet(matcher, newItem)
item, err := cache.GetOrSet(matcher.String(), newItem(matcher))
testutil.Ok(t, err)
testutil.Equals(t, false, cacheHit)
testutil.Equals(t, expected.String(), item.String())

cacheHit = true
item, err = cache.GetOrSet(matcher, newItem)
item, err = cache.GetOrSet(matcher.String(), newItem(matcher))
testutil.Ok(t, err)
testutil.Equals(t, true, cacheHit)
testutil.Equals(t, expected.String(), item.String())

cacheHit = true
item, err = cache.GetOrSet(matcher2, newItem)
item, err = cache.GetOrSet(matcher2.String(), newItem(matcher2))
testutil.Ok(t, err)
testutil.Equals(t, false, cacheHit)
testutil.Equals(t, expected2.String(), item.String())

cacheHit = true
item, err = cache.GetOrSet(matcher2, newItem)
item, err = cache.GetOrSet(matcher2.String(), newItem(matcher2))
testutil.Ok(t, err)
testutil.Equals(t, true, cacheHit)
testutil.Equals(t, expected2.String(), item.String())

cacheHit = true
item, err = cache.GetOrSet(matcher, newItem)
item, err = cache.GetOrSet(matcher.String(), newItem(matcher))
testutil.Ok(t, err)
testutil.Equals(t, true, cacheHit)
testutil.Equals(t, expected, item)

cacheHit = true
item, err = cache.GetOrSet(matcher3, newItem)
item, err = cache.GetOrSet(matcher3.String(), newItem(matcher3))
testutil.Ok(t, err)
testutil.Equals(t, false, cacheHit)
testutil.Equals(t, expected3, item)

cacheHit = true
item, err = cache.GetOrSet(matcher2, newItem)
item, err = cache.GetOrSet(matcher2.String(), newItem(matcher2))
testutil.Ok(t, err)
testutil.Equals(t, false, cacheHit)
testutil.Equals(t, expected2.String(), item.String())
Expand All @@ -104,7 +106,9 @@ func BenchmarkMatchersCache(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
matcher := matchers[i%len(matchers)]
_, err := cache.GetOrSet(matcher, storecache.MatcherToPromMatcher)
_, err := cache.GetOrSet(matcher.String(), func() (*labels.Matcher, error) {
return storepb.MatcherToPromMatcher(*matcher)
})
if err != nil {
b.Fatalf("failed to get or set cache item: %v", err)
}
Expand Down

0 comments on commit bed76cf

Please sign in to comment.