From 8546fdde4febd77c29560ba9d2c2ccb905e5d6a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Mon, 3 Feb 2025 13:38:37 +0200 Subject: [PATCH] store: lock around iterating over s.blocks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Hold a lock around s.blocks when iterating over it. I have experienced a case where a block had been added to a blockSet twice somehow and it being somehow removed from s.blocks is the only way it could happen. This is the only "bad" thing I've been able to spot. Remove the removeBlock() function as IMHO it is too shallow. Better to hold the lock consistently. Signed-off-by: Giedrius Statkevičius --- pkg/store/bucket.go | 73 +++++++++++++++++++--------------- pkg/store/bucket_test.go | 5 ++- test/e2e/store_gateway_test.go | 41 ++++++++++++------- 3 files changed, 71 insertions(+), 48 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 7e23a388d1..21696a460e 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -742,6 +742,7 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error { continue } if err := s.addBlock(ctx, meta); err != nil { + level.Warn(s.logger).Log("msg", "adding block failed", "err", err, "id", meta.ULID.String()) continue } } @@ -766,17 +767,32 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error { return metaFetchErr } + var cleanupBlocks []*bucketBlock + s.mtx.RLock() + keys := make([]ulid.ULID, 0, len(s.blocks)) + for k := range s.blocks { + keys = append(keys, k) + } + s.mtx.RUnlock() + // Drop all blocks that are no longer present in the bucket. - for id := range s.blocks { + for _, id := range keys { if _, ok := metas[id]; ok { continue } - if err := s.removeBlock(id); err != nil { - level.Warn(s.logger).Log("msg", "drop of outdated block failed", "block", id, "err", err) - s.metrics.blockDropFailures.Inc() - } - level.Info(s.logger).Log("msg", "dropped outdated block", "block", id) + + s.mtx.Lock() + b := s.blocks[id] + lset := labels.FromMap(b.meta.Thanos.Labels) + s.blockSets[lset.Hash()].remove(id) + delete(s.blocks, id) + s.mtx.Unlock() + + s.metrics.blocksLoaded.Dec() s.metrics.blockDrops.Inc() + cleanupBlocks = append(cleanupBlocks, b) + + level.Info(s.logger).Log("msg", "dropped outdated block", "block", id) } // Sync advertise labels. @@ -789,6 +805,25 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error { return strings.Compare(s.advLabelSets[i].String(), s.advLabelSets[j].String()) < 0 }) s.mtx.Unlock() + + go func() { + for _, b := range cleanupBlocks { + var errs prometheus.MultiError + + errs.Append(b.Close()) + + if b.dir != "" { + errs.Append(os.RemoveAll(b.dir)) + } + + if len(errs) == 0 { + return + } + + level.Warn(s.logger).Log("msg", "close of outdated block failed", "block", b.meta.ULID.String(), "err", errs.Error()) + s.metrics.blockDropFailures.Inc() + } + }() return nil } @@ -921,32 +956,6 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er return nil } -func (s *BucketStore) removeBlock(id ulid.ULID) error { - s.mtx.Lock() - b, ok := s.blocks[id] - if ok { - lset := labels.FromMap(b.meta.Thanos.Labels) - s.blockSets[lset.Hash()].remove(id) - delete(s.blocks, id) - } - s.mtx.Unlock() - - if !ok { - return nil - } - - s.metrics.blocksLoaded.Dec() - if err := b.Close(); err != nil { - return errors.Wrap(err, "close block") - } - - if b.dir == "" { - return nil - } - - return os.RemoveAll(b.dir) -} - // TimeRange returns the minimum and maximum timestamp of data available in the store. func (s *BucketStore) TimeRange() (mint, maxt int64) { s.mtx.RLock() diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 05438d7ef0..8616a99239 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1827,7 +1827,10 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { testutil.Equals(t, numSeries, len(srv.SeriesSet)) }) t.Run("remove second block. Cache stays. Ask for first again.", func(t *testing.T) { - testutil.Ok(t, store.removeBlock(b2.meta.ULID)) + b, _ := store.blocks[b2.meta.ULID] + lset := labels.FromMap(b.meta.Thanos.Labels) + store.blockSets[lset.Hash()].remove(b2.meta.ULID) + delete(store.blocks, b2.meta.ULID) srv := newStoreSeriesServer(context.Background()) testutil.Ok(t, store.Series(&storepb.SeriesRequest{ diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index 8977aa46bb..a36be45359 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/cortexproject/promqlsmith" + "github.com/efficientgo/core/backoff" "github.com/efficientgo/core/testutil" "github.com/efficientgo/e2e" e2edb "github.com/efficientgo/e2e/db" @@ -816,7 +817,13 @@ metafile_content_ttl: 0s` // thanos_blocks_meta_synced: 1x loadedMeta 0x labelExcludedMeta 0x TooFreshMeta. for _, st := range []*e2eobs.Observable{store1, store2, store3} { t.Run(st.Name(), func(t *testing.T) { - testutil.Ok(t, st.WaitSumMetrics(e2emon.Equals(1), "thanos_blocks_meta_synced")) + testutil.Ok(t, st.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{"thanos_blocks_meta_synced"}, e2emon.WaitMissingMetrics(), e2emon.WithWaitBackoff( + &backoff.Config{ + Min: 1 * time.Second, + Max: 10 * time.Second, + MaxRetries: 30, + }, + ))) testutil.Ok(t, st.WaitSumMetrics(e2emon.Equals(0), "thanos_blocks_meta_sync_failures_total")) testutil.Ok(t, st.WaitSumMetrics(e2emon.Equals(1), "thanos_bucket_store_blocks_loaded")) @@ -826,23 +833,27 @@ metafile_content_ttl: 0s` } t.Run("query with groupcache loading from object storage", func(t *testing.T) { - queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { return testQuery }, - time.Now, promclient.QueryOptions{ - Deduplicate: false, - }, - []model.Metric{ - { - "a": "1", - "b": "2", - "ext1": "value1", - "replica": "1", + for i := 0; i < 3; i++ { + queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { return testQuery }, + time.Now, promclient.QueryOptions{ + Deduplicate: false, }, - }, - ) + []model.Metric{ + { + "a": "1", + "b": "2", + "ext1": "value1", + "replica": "1", + }, + }, + ) + } for _, st := range []*e2eobs.Observable{store1, store2, store3} { - testutil.Ok(t, st.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{`thanos_cache_groupcache_loads_total`})) - testutil.Ok(t, st.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{`thanos_store_bucket_cache_operation_hits_total`}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "config", "chunks")))) + t.Run(st.Name(), func(t *testing.T) { + testutil.Ok(t, st.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{`thanos_cache_groupcache_loads_total`})) + testutil.Ok(t, st.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{`thanos_store_bucket_cache_operation_hits_total`}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "config", "chunks")))) + }) } })