diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 7e23a388d1..4f85a34963 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -466,17 +466,18 @@ func (s *BucketStore) validate() error { type noopCache struct{} -func (noopCache) StorePostings(ulid.ULID, labels.Label, []byte, string) {} +func (noopCache) StorePostings(ulid.ULID, labels.Label, []byte, string, time.Duration) {} func (noopCache) FetchMultiPostings(_ context.Context, _ ulid.ULID, keys []labels.Label, tenant string) (map[labels.Label][]byte, []labels.Label) { return map[labels.Label][]byte{}, keys } -func (noopCache) StoreExpandedPostings(_ ulid.ULID, _ []*labels.Matcher, _ []byte, tenant string) {} +func (noopCache) StoreExpandedPostings(_ ulid.ULID, _ []*labels.Matcher, _ []byte, tenant string, _ time.Duration) { +} func (noopCache) FetchExpandedPostings(_ context.Context, _ ulid.ULID, _ []*labels.Matcher, tenant string) ([]byte, bool) { return []byte{}, false } -func (noopCache) StoreSeries(ulid.ULID, storage.SeriesRef, []byte, string) {} +func (noopCache) StoreSeries(ulid.ULID, storage.SeriesRef, []byte, string, time.Duration) {} func (noopCache) FetchMultiSeries(_ context.Context, _ ulid.ULID, ids []storage.SeriesRef, tenant string) (map[storage.SeriesRef][]byte, []storage.SeriesRef) { return map[storage.SeriesRef][]byte{}, ids } @@ -3090,7 +3091,7 @@ func (r *bucketIndexReader) storeExpandedPostingsToCache(ms []*labels.Matcher, p r.stats.CachedPostingsCompressionTimeSum += compressionDuration r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize) r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(length * 4) // Estimate the posting list size. - r.block.indexCache.StoreExpandedPostings(r.block.meta.ULID, ms, dataToCache, tenant) + r.block.indexCache.StoreExpandedPostings(r.block.meta.ULID, ms, dataToCache, tenant, storecache.CacheTTL(r.block.meta)) } var bufioReaderPool = sync.Pool{ @@ -3229,7 +3230,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab stats.CachedPostingsCompressionTimeSum += time.Since(startCompression) stats.add(PostingsTouched, 1, len(diffVarintPostings)) - r.block.indexCache.StorePostings(r.block.meta.ULID, keys[keyID], dataToCache, tenant) + r.block.indexCache.StorePostings(r.block.meta.ULID, keys[keyID], dataToCache, tenant, storecache.CacheTTL(r.block.meta)) } stats.PostingsFetchDurationSum += time.Since(begin) @@ -3417,7 +3418,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series r.loadedSeries[id] = c r.loadedSeriesMtx.Unlock() - r.block.indexCache.StoreSeries(r.block.meta.ULID, id, c, tenant) + r.block.indexCache.StoreSeries(r.block.meta.ULID, id, c, tenant, storecache.CacheTTL(r.block.meta)) } return nil } diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index bee87898c6..7fa2c674f3 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -59,24 +59,24 @@ func (c *swappableCache) SwapWith(ptr2 storecache.IndexCache) { c.ptr = ptr2 } -func (c *swappableCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) { - c.ptr.StorePostings(blockID, l, v, tenant) +func (c *swappableCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string, ttl time.Duration) { + c.ptr.StorePostings(blockID, l, v, tenant, ttl) } func (c *swappableCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (map[labels.Label][]byte, []labels.Label) { return c.ptr.FetchMultiPostings(ctx, blockID, keys, tenant) } -func (c *swappableCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) { - c.ptr.StoreExpandedPostings(blockID, matchers, v, tenant) +func (c *swappableCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string, ttl time.Duration) { + c.ptr.StoreExpandedPostings(blockID, matchers, v, tenant, ttl) } func (c *swappableCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) { return c.ptr.FetchExpandedPostings(ctx, blockID, matchers, tenant) } -func (c *swappableCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) { - c.ptr.StoreSeries(blockID, id, v, tenant) +func (c *swappableCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string, ttl time.Duration) { + c.ptr.StoreSeries(blockID, id, v, tenant, ttl) } func (c *swappableCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (map[storage.SeriesRef][]byte, []storage.SeriesRef) { diff --git a/pkg/store/cache/cache.go b/pkg/store/cache/cache.go index c20a1f2459..9ac79c09d9 100644 --- a/pkg/store/cache/cache.go +++ b/pkg/store/cache/cache.go @@ -8,12 +8,14 @@ import ( "encoding/base64" "strconv" "strings" + "time" "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" + "github.com/thanos-io/thanos/pkg/block/metadata" "golang.org/x/crypto/blake2b" "github.com/thanos-io/thanos/pkg/tenancy" @@ -42,26 +44,40 @@ var ( // (potentially with a deadline) as in the original user's request. type IndexCache interface { // StorePostings stores postings for a single series. - StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) + StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string, ttl time.Duration) // FetchMultiPostings fetches multiple postings - each identified by a label - // and returns a map containing cache hits, along with a list of missing keys. FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) // StoreExpandedPostings stores expanded postings for a set of label matchers. - StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) + StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string, ttl time.Duration) // FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) // StoreSeries stores a single series. - StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) + StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string, ttl time.Duration) // FetchMultiSeries fetches multiple series - each identified by ID - from the cache // and returns a map containing cache hits, along with a list of missing IDs. FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) } +// CacheTTL returns cache TTL based on the block meta. A Short TTL is useful for +// temporary blocks that will be compacted soon and a long TTL for large blocks +// that won't be compacted. +func CacheTTL(meta *metadata.Meta) time.Duration { + ttl := time.Duration(meta.MaxTime-meta.MinTime) * time.Millisecond + + // ceil to the next hour + if ttl%time.Hour != 0 { + ttl += time.Hour - ttl%time.Hour + } + + return ttl +} + // Common metrics that should be used by all cache implementations. type CommonMetrics struct { RequestTotal *prometheus.CounterVec diff --git a/pkg/store/cache/cache_test.go b/pkg/store/cache/cache_test.go index 2d568d1321..1cc3df94b3 100644 --- a/pkg/store/cache/cache_test.go +++ b/pkg/store/cache/cache_test.go @@ -9,12 +9,15 @@ import ( "math" "strings" "testing" + "time" "github.com/oklog/ulid" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb" "golang.org/x/crypto/blake2b" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/testutil/custom" ) @@ -142,6 +145,49 @@ func TestCacheKey_string_ShouldGuaranteeReasonablyShortKeyLength(t *testing.T) { } } +func TestCacheTTL(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + meta *metadata.Meta + expectedTTL time.Duration + }{ + "30m": { + meta: &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + MinTime: 1730000000000, + MaxTime: 1730001800000, + }, + }, + expectedTTL: time.Hour, + }, + "1h": { + meta: &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + MinTime: 1730000000000, + MaxTime: 1730003600000, + }, + }, + expectedTTL: time.Hour, + }, + "1h 1m": { + meta: &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + MinTime: 1730000000000, + MaxTime: 1730003660000, + }, + }, + expectedTTL: time.Hour * 2, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + testutil.Equals(t, testData.expectedTTL, CacheTTL(testData.meta)) + }) + } +} + func BenchmarkCacheKey_string_Postings(b *testing.B) { uid := ulid.MustNew(1, nil) key := CacheKey{uid.String(), CacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)}), ""} diff --git a/pkg/store/cache/filter_cache.go b/pkg/store/cache/filter_cache.go index ade9da4c81..6601270c63 100644 --- a/pkg/store/cache/filter_cache.go +++ b/pkg/store/cache/filter_cache.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "slices" + "time" "github.com/oklog/ulid" "github.com/prometheus/prometheus/model/labels" @@ -31,9 +32,9 @@ func NewFilteredIndexCache(cache IndexCache, enabledItems []string) *FilteredInd // StorePostings sets the postings identified by the ulid and label to the value v, // if the postings already exists in the cache it is not mutated. -func (c *FilteredIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) { +func (c *FilteredIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string, ttl time.Duration) { if c.postingsEnabled { - c.cache.StorePostings(blockID, l, v, tenant) + c.cache.StorePostings(blockID, l, v, tenant, ttl) } } @@ -47,9 +48,9 @@ func (c *FilteredIndexCache) FetchMultiPostings(ctx context.Context, blockID uli } // StoreExpandedPostings stores expanded postings for a set of label matchers. -func (c *FilteredIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) { +func (c *FilteredIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string, ttl time.Duration) { if c.expandedPostingsEnabled { - c.cache.StoreExpandedPostings(blockID, matchers, v, tenant) + c.cache.StoreExpandedPostings(blockID, matchers, v, tenant, ttl) } } @@ -63,9 +64,9 @@ func (c *FilteredIndexCache) FetchExpandedPostings(ctx context.Context, blockID // StoreSeries sets the series identified by the ulid and id to the value v, // if the series already exists in the cache it is not mutated. -func (c *FilteredIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) { +func (c *FilteredIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string, ttl time.Duration) { if c.seriesEnabled { - c.cache.StoreSeries(blockID, id, v, tenant) + c.cache.StoreSeries(blockID, id, v, tenant, ttl) } } diff --git a/pkg/store/cache/filter_cache_test.go b/pkg/store/cache/filter_cache_test.go index ea3144fb8d..0a4fa2c01b 100644 --- a/pkg/store/cache/filter_cache_test.go +++ b/pkg/store/cache/filter_cache_test.go @@ -48,9 +48,9 @@ func TestFilterCache(t *testing.T) { { name: "empty enabled items", verifyFunc: func(t *testing.T, c IndexCache) { - c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant) - c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant) - c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant) + c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant, 0) + c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant, 0) + c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant, 0) hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant) testutil.Equals(t, 0, len(missed)) @@ -69,9 +69,9 @@ func TestFilterCache(t *testing.T) { name: "all enabled items", enabledItems: []string{CacheTypeSeries, CacheTypePostings, CacheTypeExpandedPostings}, verifyFunc: func(t *testing.T, c IndexCache) { - c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant) - c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant) - c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant) + c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant, 0) + c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant, 0) + c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant, 0) hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant) testutil.Equals(t, 0, len(missed)) @@ -90,9 +90,9 @@ func TestFilterCache(t *testing.T) { name: "only enable postings", enabledItems: []string{CacheTypePostings}, verifyFunc: func(t *testing.T, c IndexCache) { - c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant) - c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant) - c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant) + c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant, 0) + c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant, 0) + c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant, 0) hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant) testutil.Equals(t, 0, len(missed)) @@ -110,9 +110,9 @@ func TestFilterCache(t *testing.T) { name: "only enable expanded postings", enabledItems: []string{CacheTypeExpandedPostings}, verifyFunc: func(t *testing.T, c IndexCache) { - c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant) - c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant) - c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant) + c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant, 0) + c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant, 0) + c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant, 0) hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant) testutil.Equals(t, 1, len(missed)) @@ -131,9 +131,9 @@ func TestFilterCache(t *testing.T) { name: "only enable series", enabledItems: []string{CacheTypeSeries}, verifyFunc: func(t *testing.T, c IndexCache) { - c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant) - c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant) - c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant) + c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant, 0) + c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant, 0) + c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant, 0) hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant) testutil.Equals(t, 1, len(missed)) diff --git a/pkg/store/cache/inmemory.go b/pkg/store/cache/inmemory.go index 3a8ddbb86d..523e6bb003 100644 --- a/pkg/store/cache/inmemory.go +++ b/pkg/store/cache/inmemory.go @@ -6,6 +6,7 @@ package storecache import ( "context" "sync" + "time" "unsafe" "github.com/go-kit/log" @@ -291,7 +292,7 @@ func copyToKey(l labels.Label) CacheKeyPostings { // StorePostings sets the postings identified by the ulid and label to the value v, // if the postings already exists in the cache it is not mutated. -func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) { +func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string, _ time.Duration) { c.commonMetrics.DataSizeBytes.WithLabelValues(CacheTypePostings, tenant).Observe(float64(len(v))) c.set(CacheTypePostings, CacheKey{Block: blockID.String(), Key: copyToKey(l)}, v) } @@ -331,7 +332,7 @@ func (c *InMemoryIndexCache) FetchMultiPostings(ctx context.Context, blockID uli } // StoreExpandedPostings stores expanded postings for a set of label matchers. -func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) { +func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string, _ time.Duration) { c.commonMetrics.DataSizeBytes.WithLabelValues(CacheTypeExpandedPostings, tenant).Observe(float64(len(v))) c.set(CacheTypeExpandedPostings, CacheKey{Block: blockID.String(), Key: CacheKeyExpandedPostings(LabelMatchersToString(matchers))}, v) } @@ -354,7 +355,7 @@ func (c *InMemoryIndexCache) FetchExpandedPostings(ctx context.Context, blockID // StoreSeries sets the series identified by the ulid and id to the value v, // if the series already exists in the cache it is not mutated. -func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) { +func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string, _ time.Duration) { c.commonMetrics.DataSizeBytes.WithLabelValues(CacheTypeSeries, tenant).Observe(float64(len(v))) c.set(CacheTypeSeries, CacheKey{blockID.String(), CacheKeySeries(id), ""}, v) } diff --git a/pkg/store/cache/inmemory_test.go b/pkg/store/cache/inmemory_test.go index 688ec51cbc..eff6c6856d 100644 --- a/pkg/store/cache/inmemory_test.go +++ b/pkg/store/cache/inmemory_test.go @@ -81,14 +81,14 @@ func TestInMemoryIndexCache_AvoidsDeadlock(t *testing.T) { testutil.Ok(t, err) cache.lru = l - cache.StorePostings(ulid.MustNew(0, nil), labels.Label{Name: "test2", Value: "1"}, []byte{42, 33, 14, 67, 11}, tenancy.DefaultTenant) + cache.StorePostings(ulid.MustNew(0, nil), labels.Label{Name: "test2", Value: "1"}, []byte{42, 33, 14, 67, 11}, tenancy.DefaultTenant, 0) testutil.Equals(t, uint64(sliceHeaderSize+5), cache.curSize) testutil.Equals(t, float64(cache.curSize), promtest.ToFloat64(cache.currentSize.WithLabelValues(CacheTypePostings))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(CacheTypePostings))) // This triggers deadlock logic. - cache.StorePostings(ulid.MustNew(0, nil), labels.Label{Name: "test1", Value: "1"}, []byte{42}, tenancy.DefaultTenant) + cache.StorePostings(ulid.MustNew(0, nil), labels.Label{Name: "test1", Value: "1"}, []byte{42}, tenancy.DefaultTenant, 0) testutil.Equals(t, uint64(sliceHeaderSize+1), cache.curSize) testutil.Equals(t, float64(cache.curSize), promtest.ToFloat64(cache.currentSize.WithLabelValues(CacheTypePostings))) @@ -134,7 +134,7 @@ func TestInMemoryIndexCache_UpdateItem(t *testing.T) { }{ { typ: CacheTypePostings, - set: func(id storage.SeriesRef, b []byte) { cache.StorePostings(uid(id), lbl, b, tenancy.DefaultTenant) }, + set: func(id storage.SeriesRef, b []byte) { cache.StorePostings(uid(id), lbl, b, tenancy.DefaultTenant, 0) }, get: func(id storage.SeriesRef) ([]byte, bool) { hits, _ := cache.FetchMultiPostings(ctx, uid(id), []labels.Label{lbl}, tenancy.DefaultTenant) b, ok := hits[lbl] @@ -144,7 +144,7 @@ func TestInMemoryIndexCache_UpdateItem(t *testing.T) { }, { typ: CacheTypeSeries, - set: func(id storage.SeriesRef, b []byte) { cache.StoreSeries(uid(id), id, b, tenancy.DefaultTenant) }, + set: func(id storage.SeriesRef, b []byte) { cache.StoreSeries(uid(id), id, b, tenancy.DefaultTenant, 0) }, get: func(id storage.SeriesRef) ([]byte, bool) { hits, _ := cache.FetchMultiSeries(ctx, uid(id), []storage.SeriesRef{id}, tenancy.DefaultTenant) b, ok := hits[id] @@ -155,7 +155,7 @@ func TestInMemoryIndexCache_UpdateItem(t *testing.T) { { typ: CacheTypeExpandedPostings, set: func(id storage.SeriesRef, b []byte) { - cache.StoreExpandedPostings(uid(id), []*labels.Matcher{matcher}, b, tenancy.DefaultTenant) + cache.StoreExpandedPostings(uid(id), []*labels.Matcher{matcher}, b, tenancy.DefaultTenant, 0) }, get: func(id storage.SeriesRef) ([]byte, bool) { return cache.FetchExpandedPostings(ctx, uid(id), []*labels.Matcher{matcher}, tenancy.DefaultTenant) @@ -222,9 +222,9 @@ func TestInMemoryIndexCache_MaxNumberOfItemsHit(t *testing.T) { id := ulid.MustNew(0, nil) - cache.StorePostings(id, labels.Label{Name: "test", Value: "123"}, []byte{42, 33}, tenancy.DefaultTenant) - cache.StorePostings(id, labels.Label{Name: "test", Value: "124"}, []byte{42, 33}, tenancy.DefaultTenant) - cache.StorePostings(id, labels.Label{Name: "test", Value: "125"}, []byte{42, 33}, tenancy.DefaultTenant) + cache.StorePostings(id, labels.Label{Name: "test", Value: "123"}, []byte{42, 33}, tenancy.DefaultTenant, 0) + cache.StorePostings(id, labels.Label{Name: "test", Value: "124"}, []byte{42, 33}, tenancy.DefaultTenant, 0) + cache.StorePostings(id, labels.Label{Name: "test", Value: "125"}, []byte{42, 33}, tenancy.DefaultTenant, 0) testutil.Equals(t, uint64(2*sliceHeaderSize+4), cache.curSize) testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(CacheTypePostings))) @@ -260,7 +260,7 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, []labels.Label{lbls}, pMisses) // Add sliceHeaderSize + 2 bytes. - cache.StorePostings(id, lbls, []byte{42, 33}, tenancy.DefaultTenant) + cache.StorePostings(id, lbls, []byte{42, 33}, tenancy.DefaultTenant, 0) testutil.Equals(t, uint64(sliceHeaderSize+2), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(CacheTypePostings))) testutil.Equals(t, float64(sliceHeaderSize+2), promtest.ToFloat64(cache.currentSize.WithLabelValues(CacheTypePostings))) @@ -286,7 +286,7 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, []labels.Label{{Name: "test", Value: "124"}}, pMisses) // Add sliceHeaderSize + 3 more bytes. - cache.StoreSeries(id, 1234, []byte{222, 223, 224}, tenancy.DefaultTenant) + cache.StoreSeries(id, 1234, []byte{222, 223, 224}, tenancy.DefaultTenant, 0) testutil.Equals(t, uint64(2*sliceHeaderSize+5), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(CacheTypePostings))) testutil.Equals(t, float64(sliceHeaderSize+2), promtest.ToFloat64(cache.currentSize.WithLabelValues(CacheTypePostings))) @@ -310,7 +310,7 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { for i := 0; i < sliceHeaderSize; i++ { v = append(v, 3) } - cache.StorePostings(id, lbls2, v, tenancy.DefaultTenant) + cache.StorePostings(id, lbls2, v, tenancy.DefaultTenant, 0) testutil.Equals(t, uint64(2*sliceHeaderSize+5), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(CacheTypePostings))) @@ -338,7 +338,7 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, emptyPostingsMisses, pMisses) // Add same item again. - cache.StorePostings(id, lbls2, v, tenancy.DefaultTenant) + cache.StorePostings(id, lbls2, v, tenancy.DefaultTenant, 0) testutil.Equals(t, uint64(2*sliceHeaderSize+5), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(CacheTypePostings))) @@ -357,7 +357,7 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, emptyPostingsMisses, pMisses) // Add too big item. - cache.StorePostings(id, labels.Label{Name: "test", Value: "toobig"}, append(v, 5), tenancy.DefaultTenant) + cache.StorePostings(id, labels.Label{Name: "test", Value: "toobig"}, append(v, 5), tenancy.DefaultTenant, 0) testutil.Equals(t, uint64(2*sliceHeaderSize+5), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(CacheTypePostings))) testutil.Equals(t, float64(2*sliceHeaderSize+5), promtest.ToFloat64(cache.currentSize.WithLabelValues(CacheTypePostings))) @@ -390,7 +390,7 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { lbls3 := labels.Label{Name: "test", Value: "124"} - cache.StorePostings(id, lbls3, []byte{}, tenancy.DefaultTenant) + cache.StorePostings(id, lbls3, []byte{}, tenancy.DefaultTenant, 0) testutil.Equals(t, uint64(sliceHeaderSize), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(CacheTypePostings))) @@ -410,7 +410,7 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { // nil works and still allocates empty slice. lbls4 := labels.Label{Name: "test", Value: "125"} - cache.StorePostings(id, lbls4, []byte(nil), tenancy.DefaultTenant) + cache.StorePostings(id, lbls4, []byte(nil), tenancy.DefaultTenant, 0) testutil.Equals(t, 2*uint64(sliceHeaderSize), cache.curSize) testutil.Equals(t, float64(2), promtest.ToFloat64(cache.current.WithLabelValues(CacheTypePostings))) diff --git a/pkg/store/cache/memcached.go b/pkg/store/cache/memcached.go index b99babfe03..cc10d5eaf1 100644 --- a/pkg/store/cache/memcached.go +++ b/pkg/store/cache/memcached.go @@ -33,7 +33,7 @@ type RemoteIndexCache struct { compressionScheme string - ttl time.Duration + maxTTL time.Duration // Metrics. requestTotal *prometheus.CounterVec @@ -45,7 +45,7 @@ type RemoteIndexCache struct { // NewRemoteIndexCache makes a new RemoteIndexCache. func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheClient, commonMetrics *CommonMetrics, reg prometheus.Registerer, ttl time.Duration) (*RemoteIndexCache, error) { c := &RemoteIndexCache{ - ttl: ttl, + maxTTL: ttl, logger: logger, memcached: cacheClient, compressionScheme: compressionSchemeStreamedSnappy, // Hardcode it for now. Expose it once we support different types of compressions. @@ -81,10 +81,10 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli // StorePostings sets the postings identified by the ulid and label to the value v. // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. -func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) { +func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string, ttl time.Duration) { c.dataSizeBytes.WithLabelValues(CacheTypePostings, tenant).Observe(float64(len(v))) key := CacheKey{blockID.String(), CacheKeyPostings(l), c.compressionScheme}.String() - if err := c.memcached.SetAsync(key, v, c.ttl); err != nil { + if err := c.memcached.SetAsync(key, v, min(ttl, c.maxTTL)); err != nil { level.Error(c.logger).Log("msg", "failed to cache postings in memcached", "err", err) } } @@ -133,11 +133,11 @@ func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid. // StoreExpandedPostings sets the postings identified by the ulid and label to the value v. // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. -func (c *RemoteIndexCache) StoreExpandedPostings(blockID ulid.ULID, keys []*labels.Matcher, v []byte, tenant string) { +func (c *RemoteIndexCache) StoreExpandedPostings(blockID ulid.ULID, keys []*labels.Matcher, v []byte, tenant string, ttl time.Duration) { c.dataSizeBytes.WithLabelValues(CacheTypeExpandedPostings, tenant).Observe(float64(len(v))) key := CacheKey{blockID.String(), CacheKeyExpandedPostings(LabelMatchersToString(keys)), c.compressionScheme}.String() - if err := c.memcached.SetAsync(key, v, c.ttl); err != nil { + if err := c.memcached.SetAsync(key, v, min(ttl, c.maxTTL)); err != nil { level.Error(c.logger).Log("msg", "failed to cache expanded postings in memcached", "err", err) } } @@ -167,11 +167,11 @@ func (c *RemoteIndexCache) FetchExpandedPostings(ctx context.Context, blockID ul // StoreSeries sets the series identified by the ulid and id to the value v. // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. -func (c *RemoteIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) { +func (c *RemoteIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string, ttl time.Duration) { c.dataSizeBytes.WithLabelValues(CacheTypeSeries, tenant).Observe(float64(len(v))) key := CacheKey{blockID.String(), CacheKeySeries(id), ""}.String() - if err := c.memcached.SetAsync(key, v, c.ttl); err != nil { + if err := c.memcached.SetAsync(key, v, min(ttl, c.maxTTL)); err != nil { level.Error(c.logger).Log("msg", "failed to cache series in memcached", "err", err) } } diff --git a/pkg/store/cache/memcached_test.go b/pkg/store/cache/memcached_test.go index e0ae72ae3d..e5831c8ef7 100644 --- a/pkg/store/cache/memcached_test.go +++ b/pkg/store/cache/memcached_test.go @@ -94,7 +94,7 @@ func TestMemcachedIndexCache_FetchMultiPostings(t *testing.T) { // Store the postings expected before running the test. ctx := context.Background() for _, p := range testData.setup { - c.StorePostings(p.block, p.label, p.value, tenancy.DefaultTenant) + c.StorePostings(p.block, p.label, p.value, tenancy.DefaultTenant, memcachedDefaultTTL) } // Fetch postings from cached and assert on it. @@ -175,7 +175,7 @@ func TestMemcachedIndexCache_FetchExpandedPostings(t *testing.T) { // Store the postings expected before running the test. ctx := context.Background() for _, p := range testData.setup { - c.StoreExpandedPostings(p.block, p.matchers, p.value, tenancy.DefaultTenant) + c.StoreExpandedPostings(p.block, p.matchers, p.value, tenancy.DefaultTenant, memcachedDefaultTTL) } // Fetch postings from cached and assert on it. @@ -270,7 +270,7 @@ func TestMemcachedIndexCache_FetchMultiSeries(t *testing.T) { // Store the series expected before running the test. ctx := context.Background() for _, p := range testData.setup { - c.StoreSeries(p.block, p.id, p.value, tenancy.DefaultTenant) + c.StoreSeries(p.block, p.id, p.value, tenancy.DefaultTenant, memcachedDefaultTTL) } // Fetch series from cached and assert on it. diff --git a/pkg/store/cache/tracing_index_cache.go b/pkg/store/cache/tracing_index_cache.go index 38a0f61822..ec2aee304f 100644 --- a/pkg/store/cache/tracing_index_cache.go +++ b/pkg/store/cache/tracing_index_cache.go @@ -5,6 +5,7 @@ package storecache import ( "context" + "time" "github.com/oklog/ulid" "github.com/prometheus/prometheus/model/labels" @@ -24,8 +25,8 @@ func NewTracingIndexCache(name string, cache IndexCache) *TracingIndexCache { } // StorePostings stores postings for a single series. -func (c *TracingIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) { - c.cache.StorePostings(blockID, l, v, tenant) +func (c *TracingIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string, ttl time.Duration) { + c.cache.StorePostings(blockID, l, v, tenant, ttl) } // FetchMultiPostings fetches multiple postings - each identified by a label - @@ -48,8 +49,8 @@ func (c *TracingIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid } // StoreExpandedPostings stores expanded postings for a set of label matchers. -func (c *TracingIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) { - c.cache.StoreExpandedPostings(blockID, matchers, v, tenant) +func (c *TracingIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string, ttl time.Duration) { + c.cache.StoreExpandedPostings(blockID, matchers, v, tenant, ttl) } // FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. @@ -68,8 +69,8 @@ func (c *TracingIndexCache) FetchExpandedPostings(ctx context.Context, blockID u // StoreSeries stores a single series. Skip instrumenting this method // excessive spans as a single request can store millions of series. -func (c *TracingIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) { - c.cache.StoreSeries(blockID, id, v, tenant) +func (c *TracingIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string, ttl time.Duration) { + c.cache.StoreSeries(blockID, id, v, tenant, ttl) } // FetchMultiSeries fetches multiple series - each identified by ID - from the cache