Skip to content

Commit

Permalink
Set index cache ttl based on block meta
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 committed Feb 4, 2025
1 parent 57efc2a commit 7811a90
Show file tree
Hide file tree
Showing 11 changed files with 135 additions and 71 deletions.
13 changes: 7 additions & 6 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,17 +466,18 @@ func (s *BucketStore) validate() error {

type noopCache struct{}

func (noopCache) StorePostings(ulid.ULID, labels.Label, []byte, string) {}
func (noopCache) StorePostings(ulid.ULID, labels.Label, []byte, string, time.Duration) {}
func (noopCache) FetchMultiPostings(_ context.Context, _ ulid.ULID, keys []labels.Label, tenant string) (map[labels.Label][]byte, []labels.Label) {
return map[labels.Label][]byte{}, keys
}

func (noopCache) StoreExpandedPostings(_ ulid.ULID, _ []*labels.Matcher, _ []byte, tenant string) {}
func (noopCache) StoreExpandedPostings(_ ulid.ULID, _ []*labels.Matcher, _ []byte, tenant string, _ time.Duration) {
}
func (noopCache) FetchExpandedPostings(_ context.Context, _ ulid.ULID, _ []*labels.Matcher, tenant string) ([]byte, bool) {
return []byte{}, false
}

func (noopCache) StoreSeries(ulid.ULID, storage.SeriesRef, []byte, string) {}
func (noopCache) StoreSeries(ulid.ULID, storage.SeriesRef, []byte, string, time.Duration) {}
func (noopCache) FetchMultiSeries(_ context.Context, _ ulid.ULID, ids []storage.SeriesRef, tenant string) (map[storage.SeriesRef][]byte, []storage.SeriesRef) {
return map[storage.SeriesRef][]byte{}, ids
}
Expand Down Expand Up @@ -3090,7 +3091,7 @@ func (r *bucketIndexReader) storeExpandedPostingsToCache(ms []*labels.Matcher, p
r.stats.CachedPostingsCompressionTimeSum += compressionDuration
r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize)
r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(length * 4) // Estimate the posting list size.
r.block.indexCache.StoreExpandedPostings(r.block.meta.ULID, ms, dataToCache, tenant)
r.block.indexCache.StoreExpandedPostings(r.block.meta.ULID, ms, dataToCache, tenant, storecache.CacheTTL(r.block.meta))
}

var bufioReaderPool = sync.Pool{
Expand Down Expand Up @@ -3229,7 +3230,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
stats.CachedPostingsCompressionTimeSum += time.Since(startCompression)
stats.add(PostingsTouched, 1, len(diffVarintPostings))

r.block.indexCache.StorePostings(r.block.meta.ULID, keys[keyID], dataToCache, tenant)
r.block.indexCache.StorePostings(r.block.meta.ULID, keys[keyID], dataToCache, tenant, storecache.CacheTTL(r.block.meta))
}

stats.PostingsFetchDurationSum += time.Since(begin)
Expand Down Expand Up @@ -3417,7 +3418,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series
r.loadedSeries[id] = c
r.loadedSeriesMtx.Unlock()

r.block.indexCache.StoreSeries(r.block.meta.ULID, id, c, tenant)
r.block.indexCache.StoreSeries(r.block.meta.ULID, id, c, tenant, storecache.CacheTTL(r.block.meta))
}
return nil
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,24 @@ func (c *swappableCache) SwapWith(ptr2 storecache.IndexCache) {
c.ptr = ptr2
}

func (c *swappableCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) {
c.ptr.StorePostings(blockID, l, v, tenant)
func (c *swappableCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string, ttl time.Duration) {
c.ptr.StorePostings(blockID, l, v, tenant, ttl)
}

func (c *swappableCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (map[labels.Label][]byte, []labels.Label) {
return c.ptr.FetchMultiPostings(ctx, blockID, keys, tenant)
}

func (c *swappableCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) {
c.ptr.StoreExpandedPostings(blockID, matchers, v, tenant)
func (c *swappableCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string, ttl time.Duration) {
c.ptr.StoreExpandedPostings(blockID, matchers, v, tenant, ttl)
}

func (c *swappableCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) {
return c.ptr.FetchExpandedPostings(ctx, blockID, matchers, tenant)
}

func (c *swappableCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) {
c.ptr.StoreSeries(blockID, id, v, tenant)
func (c *swappableCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string, ttl time.Duration) {
c.ptr.StoreSeries(blockID, id, v, tenant, ttl)
}

func (c *swappableCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (map[storage.SeriesRef][]byte, []storage.SeriesRef) {
Expand Down
20 changes: 17 additions & 3 deletions pkg/store/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"encoding/base64"
"strconv"
"strings"
"time"

"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/block/metadata"
"golang.org/x/crypto/blake2b"

"github.com/thanos-io/thanos/pkg/tenancy"
Expand Down Expand Up @@ -42,26 +44,38 @@ var (
// (potentially with a deadline) as in the original user's request.
type IndexCache interface {
// StorePostings stores postings for a single series.
StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string)
StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string, ttl time.Duration)

// FetchMultiPostings fetches multiple postings - each identified by a label -
// and returns a map containing cache hits, along with a list of missing keys.
FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label)

// StoreExpandedPostings stores expanded postings for a set of label matchers.
StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string)
StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string, ttl time.Duration)

// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not.
FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool)

// StoreSeries stores a single series.
StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string)
StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string, ttl time.Duration)

// FetchMultiSeries fetches multiple series - each identified by ID - from the cache
// and returns a map containing cache hits, along with a list of missing IDs.
FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef)
}

// CacheTTL return cache TTL based on the block meta.
func CacheTTL(meta *metadata.Meta) time.Duration {
ttl := time.Duration(meta.MaxTime-meta.MinTime) * time.Millisecond

// ceil to the next hour
if ttl%time.Hour != 0 {
ttl += time.Hour - ttl%time.Hour
}

return ttl
}

// Common metrics that should be used by all cache implementations.
type CommonMetrics struct {
RequestTotal *prometheus.CounterVec
Expand Down
46 changes: 46 additions & 0 deletions pkg/store/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ import (
"math"
"strings"
"testing"
"time"

"github.com/oklog/ulid"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"
"golang.org/x/crypto/blake2b"

"github.com/efficientgo/core/testutil"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/testutil/custom"
)

Expand Down Expand Up @@ -142,6 +145,49 @@ func TestCacheKey_string_ShouldGuaranteeReasonablyShortKeyLength(t *testing.T) {
}
}

func TestCacheTTL(t *testing.T) {
t.Parallel()

tests := map[string]struct {
meta *metadata.Meta
expectedTTL time.Duration
}{
"30m": {
meta: &metadata.Meta{
BlockMeta: tsdb.BlockMeta{
MinTime: 1730000000000,
MaxTime: 1730001800000,
},
},
expectedTTL: time.Hour,
},
"1h": {
meta: &metadata.Meta{
BlockMeta: tsdb.BlockMeta{
MinTime: 1730000000000,
MaxTime: 1730003600000,
},
},
expectedTTL: time.Hour,
},
"1h 1m": {
meta: &metadata.Meta{
BlockMeta: tsdb.BlockMeta{
MinTime: 1730000000000,
MaxTime: 1730003660000,
},
},
expectedTTL: time.Hour * 2,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
testutil.Equals(t, testData.expectedTTL, CacheTTL(testData.meta))
})
}
}

func BenchmarkCacheKey_string_Postings(b *testing.B) {
uid := ulid.MustNew(1, nil)
key := CacheKey{uid.String(), CacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)}), ""}
Expand Down
13 changes: 7 additions & 6 deletions pkg/store/cache/filter_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"slices"
"time"

"github.com/oklog/ulid"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -31,9 +32,9 @@ func NewFilteredIndexCache(cache IndexCache, enabledItems []string) *FilteredInd

// StorePostings sets the postings identified by the ulid and label to the value v,
// if the postings already exists in the cache it is not mutated.
func (c *FilteredIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) {
func (c *FilteredIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string, ttl time.Duration) {
if c.postingsEnabled {
c.cache.StorePostings(blockID, l, v, tenant)
c.cache.StorePostings(blockID, l, v, tenant, ttl)
}
}

Expand All @@ -47,9 +48,9 @@ func (c *FilteredIndexCache) FetchMultiPostings(ctx context.Context, blockID uli
}

// StoreExpandedPostings stores expanded postings for a set of label matchers.
func (c *FilteredIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) {
func (c *FilteredIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string, ttl time.Duration) {
if c.expandedPostingsEnabled {
c.cache.StoreExpandedPostings(blockID, matchers, v, tenant)
c.cache.StoreExpandedPostings(blockID, matchers, v, tenant, ttl)
}
}

Expand All @@ -63,9 +64,9 @@ func (c *FilteredIndexCache) FetchExpandedPostings(ctx context.Context, blockID

// StoreSeries sets the series identified by the ulid and id to the value v,
// if the series already exists in the cache it is not mutated.
func (c *FilteredIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) {
func (c *FilteredIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string, ttl time.Duration) {
if c.seriesEnabled {
c.cache.StoreSeries(blockID, id, v, tenant)
c.cache.StoreSeries(blockID, id, v, tenant, ttl)
}
}

Expand Down
30 changes: 15 additions & 15 deletions pkg/store/cache/filter_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ func TestFilterCache(t *testing.T) {
{
name: "empty enabled items",
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant)
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant, 0)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant, 0)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant, 0)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(missed))
Expand All @@ -69,9 +69,9 @@ func TestFilterCache(t *testing.T) {
name: "all enabled items",
enabledItems: []string{CacheTypeSeries, CacheTypePostings, CacheTypeExpandedPostings},
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant)
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant, 0)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant, 0)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant, 0)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(missed))
Expand All @@ -90,9 +90,9 @@ func TestFilterCache(t *testing.T) {
name: "only enable postings",
enabledItems: []string{CacheTypePostings},
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant)
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant, 0)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant, 0)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant, 0)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(missed))
Expand All @@ -110,9 +110,9 @@ func TestFilterCache(t *testing.T) {
name: "only enable expanded postings",
enabledItems: []string{CacheTypeExpandedPostings},
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant)
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant, 0)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant, 0)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant, 0)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(missed))
Expand All @@ -131,9 +131,9 @@ func TestFilterCache(t *testing.T) {
name: "only enable series",
enabledItems: []string{CacheTypeSeries},
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant)
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant, 0)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant, 0)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant, 0)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(missed))
Expand Down
7 changes: 4 additions & 3 deletions pkg/store/cache/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package storecache
import (
"context"
"sync"
"time"
"unsafe"

"github.com/go-kit/log"
Expand Down Expand Up @@ -291,7 +292,7 @@ func copyToKey(l labels.Label) CacheKeyPostings {

// StorePostings sets the postings identified by the ulid and label to the value v,
// if the postings already exists in the cache it is not mutated.
func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) {
func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string, _ time.Duration) {
c.commonMetrics.DataSizeBytes.WithLabelValues(CacheTypePostings, tenant).Observe(float64(len(v)))
c.set(CacheTypePostings, CacheKey{Block: blockID.String(), Key: copyToKey(l)}, v)
}
Expand Down Expand Up @@ -331,7 +332,7 @@ func (c *InMemoryIndexCache) FetchMultiPostings(ctx context.Context, blockID uli
}

// StoreExpandedPostings stores expanded postings for a set of label matchers.
func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) {
func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string, _ time.Duration) {
c.commonMetrics.DataSizeBytes.WithLabelValues(CacheTypeExpandedPostings, tenant).Observe(float64(len(v)))
c.set(CacheTypeExpandedPostings, CacheKey{Block: blockID.String(), Key: CacheKeyExpandedPostings(LabelMatchersToString(matchers))}, v)
}
Expand All @@ -354,7 +355,7 @@ func (c *InMemoryIndexCache) FetchExpandedPostings(ctx context.Context, blockID

// StoreSeries sets the series identified by the ulid and id to the value v,
// if the series already exists in the cache it is not mutated.
func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) {
func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string, _ time.Duration) {
c.commonMetrics.DataSizeBytes.WithLabelValues(CacheTypeSeries, tenant).Observe(float64(len(v)))
c.set(CacheTypeSeries, CacheKey{blockID.String(), CacheKeySeries(id), ""}, v)
}
Expand Down
Loading

0 comments on commit 7811a90

Please sign in to comment.