Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Matcher cache/series #8045

Merged
merged 9 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func runReceive(
return errors.Wrap(err, "parse relabel configuration")
}

var cache = storecache.NewNoopMatcherCache()
var cache = storecache.NoopMatchersCache
if conf.matcherCacheSize > 0 {
cache, err = storecache.NewMatchersCache(storecache.WithSize(conf.matcherCacheSize), storecache.WithPromRegistry(reg))
if err != nil {
Expand Down
13 changes: 13 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ type storeConfig struct {
postingGroupMaxKeySeriesRatio float64

indexHeaderLazyDownloadStrategy string

matcherCacheSize int
}

func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -225,6 +227,8 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("bucket-web-label", "External block label to use as group title in the bucket web UI").StringVar(&sc.label)

cmd.Flag("matcher-cache-size", "The size of the cache used for matching against external labels. Using 0 disables caching.").Default("0").IntVar(&sc.matcherCacheSize)
alanprot marked this conversation as resolved.
Show resolved Hide resolved

sc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd)
}

Expand Down Expand Up @@ -368,6 +372,14 @@ func runStore(
return errors.Wrap(err, "create index cache")
}

var matchersCache = storecache.NoopMatchersCache
if conf.matcherCacheSize > 0 {
matchersCache, err = storecache.NewMatchersCache(storecache.WithSize(conf.matcherCacheSize), storecache.WithPromRegistry(reg))
if err != nil {
return errors.Wrap(err, "failed to create matchers cache")
}
}

var blockLister block.Lister
switch syncStrategy(conf.blockListStrategy) {
case concurrentDiscovery:
Expand Down Expand Up @@ -413,6 +425,7 @@ func runStore(
}),
store.WithRegistry(reg),
store.WithIndexCache(indexCache),
store.WithMatchersCache(matchersCache),
store.WithQueryGate(queriesGate),
store.WithChunkPool(chunkPool),
store.WithFilterConfig(conf.filterConf),
Expand Down
2 changes: 2 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ Flags:
--log.format=logfmt Log format to use. Possible options: logfmt or
json.
--log.level=info Log filtering level.
--matcher-cache-size=0 The size of the cache used for matching against
external labels. Using 0 disables caching.
--max-time=9999-12-31T23:59:59Z
End of time range limit to serve. Thanos Store
will serve only blocks, which happened earlier
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func NewMultiTSDB(
bucket: bucket,
allowOutOfOrderUpload: allowOutOfOrderUpload,
hashFunc: hashFunc,
matcherCache: storecache.NewNoopMatcherCache(),
matcherCache: storecache.NoopMatchersCache,
}

for _, option := range options {
Expand Down
21 changes: 15 additions & 6 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ type BucketStore struct {
fetcher block.MetadataFetcher
dir string
indexCache storecache.IndexCache
matcherCache storecache.MatchersCache
indexReaderPool *indexheader.ReaderPool
buffers sync.Pool
chunkPool pool.Pool[byte]
Expand Down Expand Up @@ -517,6 +518,13 @@ func WithIndexCache(cache storecache.IndexCache) BucketStoreOption {
}
}

// WithMatchersCache sets a matchers cache to use instead of a noopCache.
func WithMatchersCache(cache storecache.MatchersCache) BucketStoreOption {
return func(s *BucketStore) {
s.matcherCache = cache
}
}

// WithQueryGate sets a queryGate to use instead of a noopGate.
func WithQueryGate(queryGate gate.Gate) BucketStoreOption {
return func(s *BucketStore) {
Expand Down Expand Up @@ -637,11 +645,12 @@ func NewBucketStore(
options ...BucketStoreOption,
) (*BucketStore, error) {
s := &BucketStore{
logger: log.NewNopLogger(),
bkt: bkt,
fetcher: fetcher,
dir: dir,
indexCache: noopCache{},
logger: log.NewNopLogger(),
bkt: bkt,
fetcher: fetcher,
dir: dir,
indexCache: noopCache{},
matcherCache: storecache.NoopMatchersCache,
buffers: sync.Pool{New: func() interface{} {
b := make([]byte, 0, initialBufSize)
return &b
Expand Down Expand Up @@ -1536,7 +1545,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store

tenant, _ := tenancy.GetTenantFromGRPCMetadata(srv.Context())

matchers, err := storepb.MatchersToPromMatchers(req.Matchers...)
matchers, err := storecache.MatchersToPromMatchersCached(s.matcherCache, req.Matchers...)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2080,6 +2080,9 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) {
indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, nil, storecache.InMemoryIndexCacheConfig{})
testutil.Ok(tb, err)

matcherCache, err := storecache.NewMatchersCache(storecache.WithSize(1024))
testutil.Ok(tb, err)

store, err := NewBucketStore(
instrBkt,
fetcher,
Expand All @@ -2096,6 +2099,7 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) {
0,
WithLogger(logger),
WithIndexCache(indexCache),
WithMatchersCache(matcherCache),
)
testutil.Ok(tb, err)
testutil.Ok(tb, store.SyncBlocks(context.Background()))
Expand Down
15 changes: 5 additions & 10 deletions pkg/store/cache/matchers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,14 @@ type MatchersCache interface {

// Ensure implementations satisfy the interface.
var (
_ MatchersCache = (*LruMatchersCache)(nil)
_ MatchersCache = (*NoopMatcherCache)(nil)
_ MatchersCache = (*LruMatchersCache)(nil)
NoopMatchersCache MatchersCache = &noopMatcherCache{}
)

// NoopMatcherCache is a no-op implementation of MatchersCache that doesn't cache anything.
type NoopMatcherCache struct{}

// NewNoopMatcherCache creates a new no-op matcher cache.
func NewNoopMatcherCache() MatchersCache {
return &NoopMatcherCache{}
}
type noopMatcherCache struct{}

// GetOrSet implements MatchersCache by always creating a new matcher without caching.
func (n *NoopMatcherCache) GetOrSet(_ string, newItem NewItemFunc) (*labels.Matcher, error) {
func (n *noopMatcherCache) GetOrSet(_ string, newItem NewItemFunc) (*labels.Matcher, error) {
return newItem()
}

Expand Down Expand Up @@ -74,6 +68,7 @@ func NewMatchersCache(opts ...MatcherCacheOption) (*LruMatchersCache, error) {
opt(cache)
}
cache.metrics = newMatcherCacheMetrics(cache.reg)
cache.metrics.maxItems.Set(float64(cache.size))

lruCache, err := lru.NewWithEvict[string, *labels.Matcher](cache.size, cache.onEvict)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func ScanGRPCCurlProtoStreamMessages(data []byte, atEOF bool) (advance int, toke
// Series returns all series for a requested time range and label matcher. The returned data may
// exceed the requested time bounds.
func (s *LocalStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels, storecache.NewNoopMatcherCache())
match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels, storecache.NoopMatchersCache)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto

extLset := p.externalLabelsFn()

match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storecache.NewNoopMatcherCache())
match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storecache.NoopMatchersCache)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
Expand Down Expand Up @@ -543,7 +543,7 @@ func (p *PrometheusStore) encodeChunk(ss []prompb.Sample) (storepb.Chunk_Encodin
func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
extLset := p.externalLabelsFn()

match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storecache.NewNoopMatcherCache())
match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storecache.NoopMatchersCache)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand Down Expand Up @@ -606,7 +606,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue

extLset := p.externalLabelsFn()

match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storecache.NewNoopMatcherCache())
match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storecache.NoopMatchersCache)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func NewProxyStore(
retrievalStrategy: retrievalStrategy,
tsdbSelector: DefaultSelector,
enableDedup: true,
matcherCache: storecache.NewNoopMatcherCache(),
matcherCache: storecache.NoopMatchersCache,
}

for _, option := range options {
Expand Down
6 changes: 3 additions & 3 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2175,7 +2175,7 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) {
responseTimeout: 5 * time.Second,
retrievalStrategy: EagerRetrieval,
tsdbSelector: DefaultSelector,
matcherCache: storecache.NewNoopMatcherCache(),
matcherCache: storecache.NoopMatchersCache,
}

var allResps []*storepb.SeriesResponse
Expand Down Expand Up @@ -2312,7 +2312,7 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) {
responseTimeout: 50 * time.Millisecond,
retrievalStrategy: respStrategy,
tsdbSelector: DefaultSelector,
matcherCache: storecache.NewNoopMatcherCache(),
matcherCache: storecache.NoopMatchersCache,
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -2350,7 +2350,7 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) {
responseTimeout: 50 * time.Millisecond,
retrievalStrategy: respStrategy,
tsdbSelector: DefaultSelector,
matcherCache: storecache.NewNoopMatcherCache(),
matcherCache: storecache.NoopMatchersCache,
}

ctx := context.Background()
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func NewTSDBStore(
b := make([]byte, 0, initialBufSize)
return &b
}},
matcherCache: storecache.NewNoopMatcherCache(),
matcherCache: storecache.NoopMatchersCache,
}

for _, option := range options {
Expand Down
Loading