Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set index cache ttl based on block meta #8089

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,17 +466,18 @@ func (s *BucketStore) validate() error {

type noopCache struct{}

func (noopCache) StorePostings(ulid.ULID, labels.Label, []byte, string) {}
func (noopCache) StorePostings(ulid.ULID, labels.Label, []byte, string, time.Duration) {}
func (noopCache) FetchMultiPostings(_ context.Context, _ ulid.ULID, keys []labels.Label, tenant string) (map[labels.Label][]byte, []labels.Label) {
return map[labels.Label][]byte{}, keys
}

func (noopCache) StoreExpandedPostings(_ ulid.ULID, _ []*labels.Matcher, _ []byte, tenant string) {}
func (noopCache) StoreExpandedPostings(_ ulid.ULID, _ []*labels.Matcher, _ []byte, tenant string, _ time.Duration) {
}
func (noopCache) FetchExpandedPostings(_ context.Context, _ ulid.ULID, _ []*labels.Matcher, tenant string) ([]byte, bool) {
return []byte{}, false
}

func (noopCache) StoreSeries(ulid.ULID, storage.SeriesRef, []byte, string) {}
func (noopCache) StoreSeries(ulid.ULID, storage.SeriesRef, []byte, string, time.Duration) {}
func (noopCache) FetchMultiSeries(_ context.Context, _ ulid.ULID, ids []storage.SeriesRef, tenant string) (map[storage.SeriesRef][]byte, []storage.SeriesRef) {
return map[storage.SeriesRef][]byte{}, ids
}
Expand Down Expand Up @@ -3090,7 +3091,7 @@ func (r *bucketIndexReader) storeExpandedPostingsToCache(ms []*labels.Matcher, p
r.stats.CachedPostingsCompressionTimeSum += compressionDuration
r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize)
r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(length * 4) // Estimate the posting list size.
r.block.indexCache.StoreExpandedPostings(r.block.meta.ULID, ms, dataToCache, tenant)
r.block.indexCache.StoreExpandedPostings(r.block.meta.ULID, ms, dataToCache, tenant, storecache.CacheTTL(r.block.meta))
}

var bufioReaderPool = sync.Pool{
Expand Down Expand Up @@ -3229,7 +3230,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
stats.CachedPostingsCompressionTimeSum += time.Since(startCompression)
stats.add(PostingsTouched, 1, len(diffVarintPostings))

r.block.indexCache.StorePostings(r.block.meta.ULID, keys[keyID], dataToCache, tenant)
r.block.indexCache.StorePostings(r.block.meta.ULID, keys[keyID], dataToCache, tenant, storecache.CacheTTL(r.block.meta))
}

stats.PostingsFetchDurationSum += time.Since(begin)
Expand Down Expand Up @@ -3417,7 +3418,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series
r.loadedSeries[id] = c
r.loadedSeriesMtx.Unlock()

r.block.indexCache.StoreSeries(r.block.meta.ULID, id, c, tenant)
r.block.indexCache.StoreSeries(r.block.meta.ULID, id, c, tenant, storecache.CacheTTL(r.block.meta))
}
return nil
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,24 @@ func (c *swappableCache) SwapWith(ptr2 storecache.IndexCache) {
c.ptr = ptr2
}

func (c *swappableCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) {
c.ptr.StorePostings(blockID, l, v, tenant)
func (c *swappableCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string, ttl time.Duration) {
c.ptr.StorePostings(blockID, l, v, tenant, ttl)
}

func (c *swappableCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (map[labels.Label][]byte, []labels.Label) {
return c.ptr.FetchMultiPostings(ctx, blockID, keys, tenant)
}

func (c *swappableCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) {
c.ptr.StoreExpandedPostings(blockID, matchers, v, tenant)
func (c *swappableCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string, ttl time.Duration) {
c.ptr.StoreExpandedPostings(blockID, matchers, v, tenant, ttl)
}

func (c *swappableCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) {
return c.ptr.FetchExpandedPostings(ctx, blockID, matchers, tenant)
}

func (c *swappableCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) {
c.ptr.StoreSeries(blockID, id, v, tenant)
func (c *swappableCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string, ttl time.Duration) {
c.ptr.StoreSeries(blockID, id, v, tenant, ttl)
}

func (c *swappableCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (map[storage.SeriesRef][]byte, []storage.SeriesRef) {
Expand Down
22 changes: 19 additions & 3 deletions pkg/store/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"encoding/base64"
"strconv"
"strings"
"time"

"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/block/metadata"
"golang.org/x/crypto/blake2b"

"github.com/thanos-io/thanos/pkg/tenancy"
Expand Down Expand Up @@ -42,26 +44,40 @@ var (
// (potentially with a deadline) as in the original user's request.
type IndexCache interface {
// StorePostings stores postings for a single series.
StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string)
StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string, ttl time.Duration)

// FetchMultiPostings fetches multiple postings - each identified by a label -
// and returns a map containing cache hits, along with a list of missing keys.
FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label)

// StoreExpandedPostings stores expanded postings for a set of label matchers.
StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string)
StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string, ttl time.Duration)

// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not.
FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool)

// StoreSeries stores a single series.
StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string)
StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string, ttl time.Duration)

// FetchMultiSeries fetches multiple series - each identified by ID - from the cache
// and returns a map containing cache hits, along with a list of missing IDs.
FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef)
}

// CacheTTL returns cache TTL based on the block meta. A Short TTL is useful for
// temporary blocks that will be compacted soon and a long TTL for large blocks
// that won't be compacted.
func CacheTTL(meta *metadata.Meta) time.Duration {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my usecase, I would like to take cache item type into consideration. I want to cache expanded postings with a larger TTL and postings for smaller TTL. Do we have a good way to make this customizable?

ttl := time.Duration(meta.MaxTime-meta.MinTime) * time.Millisecond

// ceil to the next hour
if ttl%time.Hour != 0 {
ttl += time.Hour - ttl%time.Hour
}

return ttl
}

// Common metrics that should be used by all cache implementations.
type CommonMetrics struct {
RequestTotal *prometheus.CounterVec
Expand Down
46 changes: 46 additions & 0 deletions pkg/store/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ import (
"math"
"strings"
"testing"
"time"

"github.com/oklog/ulid"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"
"golang.org/x/crypto/blake2b"

"github.com/efficientgo/core/testutil"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/testutil/custom"
)

Expand Down Expand Up @@ -142,6 +145,49 @@ func TestCacheKey_string_ShouldGuaranteeReasonablyShortKeyLength(t *testing.T) {
}
}

func TestCacheTTL(t *testing.T) {
t.Parallel()

tests := map[string]struct {
meta *metadata.Meta
expectedTTL time.Duration
}{
"30m": {
meta: &metadata.Meta{
BlockMeta: tsdb.BlockMeta{
MinTime: 1730000000000,
MaxTime: 1730001800000,
},
},
expectedTTL: time.Hour,
},
"1h": {
meta: &metadata.Meta{
BlockMeta: tsdb.BlockMeta{
MinTime: 1730000000000,
MaxTime: 1730003600000,
},
},
expectedTTL: time.Hour,
},
"1h 1m": {
meta: &metadata.Meta{
BlockMeta: tsdb.BlockMeta{
MinTime: 1730000000000,
MaxTime: 1730003660000,
},
},
expectedTTL: time.Hour * 2,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
testutil.Equals(t, testData.expectedTTL, CacheTTL(testData.meta))
})
}
}

func BenchmarkCacheKey_string_Postings(b *testing.B) {
uid := ulid.MustNew(1, nil)
key := CacheKey{uid.String(), CacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)}), ""}
Expand Down
13 changes: 7 additions & 6 deletions pkg/store/cache/filter_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"slices"
"time"

"github.com/oklog/ulid"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -31,9 +32,9 @@ func NewFilteredIndexCache(cache IndexCache, enabledItems []string) *FilteredInd

// StorePostings sets the postings identified by the ulid and label to the value v,
// if the postings already exists in the cache it is not mutated.
func (c *FilteredIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) {
func (c *FilteredIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string, ttl time.Duration) {
if c.postingsEnabled {
c.cache.StorePostings(blockID, l, v, tenant)
c.cache.StorePostings(blockID, l, v, tenant, ttl)
}
}

Expand All @@ -47,9 +48,9 @@ func (c *FilteredIndexCache) FetchMultiPostings(ctx context.Context, blockID uli
}

// StoreExpandedPostings stores expanded postings for a set of label matchers.
func (c *FilteredIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) {
func (c *FilteredIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string, ttl time.Duration) {
if c.expandedPostingsEnabled {
c.cache.StoreExpandedPostings(blockID, matchers, v, tenant)
c.cache.StoreExpandedPostings(blockID, matchers, v, tenant, ttl)
}
}

Expand All @@ -63,9 +64,9 @@ func (c *FilteredIndexCache) FetchExpandedPostings(ctx context.Context, blockID

// StoreSeries sets the series identified by the ulid and id to the value v,
// if the series already exists in the cache it is not mutated.
func (c *FilteredIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) {
func (c *FilteredIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string, ttl time.Duration) {
if c.seriesEnabled {
c.cache.StoreSeries(blockID, id, v, tenant)
c.cache.StoreSeries(blockID, id, v, tenant, ttl)
}
}

Expand Down
30 changes: 15 additions & 15 deletions pkg/store/cache/filter_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ func TestFilterCache(t *testing.T) {
{
name: "empty enabled items",
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant)
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant, 0)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant, 0)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant, 0)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(missed))
Expand All @@ -69,9 +69,9 @@ func TestFilterCache(t *testing.T) {
name: "all enabled items",
enabledItems: []string{CacheTypeSeries, CacheTypePostings, CacheTypeExpandedPostings},
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant)
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant, 0)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant, 0)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant, 0)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(missed))
Expand All @@ -90,9 +90,9 @@ func TestFilterCache(t *testing.T) {
name: "only enable postings",
enabledItems: []string{CacheTypePostings},
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant)
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant, 0)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant, 0)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant, 0)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(missed))
Expand All @@ -110,9 +110,9 @@ func TestFilterCache(t *testing.T) {
name: "only enable expanded postings",
enabledItems: []string{CacheTypeExpandedPostings},
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant)
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant, 0)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant, 0)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant, 0)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(missed))
Expand All @@ -131,9 +131,9 @@ func TestFilterCache(t *testing.T) {
name: "only enable series",
enabledItems: []string{CacheTypeSeries},
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant)
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant, 0)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant, 0)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant, 0)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(missed))
Expand Down
7 changes: 4 additions & 3 deletions pkg/store/cache/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package storecache
import (
"context"
"sync"
"time"
"unsafe"

"github.com/go-kit/log"
Expand Down Expand Up @@ -291,7 +292,7 @@ func copyToKey(l labels.Label) CacheKeyPostings {

// StorePostings sets the postings identified by the ulid and label to the value v,
// if the postings already exists in the cache it is not mutated.
func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) {
func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string, _ time.Duration) {
c.commonMetrics.DataSizeBytes.WithLabelValues(CacheTypePostings, tenant).Observe(float64(len(v)))
c.set(CacheTypePostings, CacheKey{Block: blockID.String(), Key: copyToKey(l)}, v)
}
Expand Down Expand Up @@ -331,7 +332,7 @@ func (c *InMemoryIndexCache) FetchMultiPostings(ctx context.Context, blockID uli
}

// StoreExpandedPostings stores expanded postings for a set of label matchers.
func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) {
func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string, _ time.Duration) {
c.commonMetrics.DataSizeBytes.WithLabelValues(CacheTypeExpandedPostings, tenant).Observe(float64(len(v)))
c.set(CacheTypeExpandedPostings, CacheKey{Block: blockID.String(), Key: CacheKeyExpandedPostings(LabelMatchersToString(matchers))}, v)
}
Expand All @@ -354,7 +355,7 @@ func (c *InMemoryIndexCache) FetchExpandedPostings(ctx context.Context, blockID

// StoreSeries sets the series identified by the ulid and id to the value v,
// if the series already exists in the cache it is not mutated.
func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) {
func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string, _ time.Duration) {
c.commonMetrics.DataSizeBytes.WithLabelValues(CacheTypeSeries, tenant).Observe(float64(len(v)))
c.set(CacheTypeSeries, CacheKey{blockID.String(), CacheKeySeries(id), ""}, v)
}
Expand Down
Loading
Loading