From d8554a055935a4cbc3339df9089a569cdc31ff18 Mon Sep 17 00:00:00 2001 From: kostas Date: Wed, 5 Feb 2025 14:05:01 +0200 Subject: [PATCH 1/8] fix: race when bumping items while loading a snapshot Signed-off-by: kostas --- src/server/rdb_load.cc | 8 +------- src/server/replica.cc | 15 +++++++++++++++ src/server/server_family.cc | 10 ++++++++++ 3 files changed, 26 insertions(+), 7 deletions(-) diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 192df90b041f..d49555009b57 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -2034,9 +2034,6 @@ error_code RdbLoader::Load(io::Source* src) { auto cleanup = absl::Cleanup([&] { FinishLoad(start, &keys_loaded); }); - shard_set->AwaitRunningOnShardQueue([](EngineShard* es) { - namespaces->GetDefaultNamespace().GetCurrentDbSlice().SetLoadInProgress(true); - }); while (!stop_early_.load(memory_order_relaxed)) { if (pause_) { ThisFiber::SleepFor(100ms); @@ -2226,10 +2223,7 @@ void RdbLoader::FinishLoad(absl::Time start_time, size_t* keys_loaded) { FlushShardAsync(i); // Send sentinel callbacks to ensure that all previous messages have been processed. - shard_set->Add(i, [bc]() mutable { - namespaces->GetDefaultNamespace().GetCurrentDbSlice().SetLoadInProgress(false); - bc->Dec(); - }); + shard_set->Add(i, [bc]() mutable { bc->Dec(); }); } bc->Wait(); // wait for sentinels to report. diff --git a/src/server/replica.cc b/src/server/replica.cc index 5a627777d596..f1166f1f5c5b 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -542,6 +542,21 @@ error_code Replica::InitiateDflySync() { // Lock to prevent the error handler from running instantly // while the flows are in a mixed state. lock_guard lk{flows_op_mu_}; + absl::Cleanup clean = [this]() { + shard_set->AwaitRunningOnShardQueue([](EngineShard* es) { + namespaces->GetDefaultNamespace().GetCurrentDbSlice().SetLoadInProgress(false); + }); + }; + + // See issue #4554 + // We really need two dispatches here because StartSyncFlow executes on different shards. + // If the sync fiber on one of the threads, starts, loads the snapshots and starts flushing + // the loaded data to the shards asynchronously via LoadItemsBuffer() while the other thread + // did not even spawned the Sync flow fiber, it will start Bumping items because + // SetLoadInProgress will still be false. + shard_set->AwaitRunningOnShardQueue([](EngineShard* es) { + namespaces->GetDefaultNamespace().GetCurrentDbSlice().SetLoadInProgress(true); + }); shard_set->pool()->AwaitFiberOnAll(std::move(shard_cb)); size_t num_full_flows = diff --git a/src/server/server_family.cc b/src/server/server_family.cc index a1cd3b5feef1..014f449b28e2 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1100,6 +1100,11 @@ std::optional> ServerFamily::Load(string_view load_pat return immediate(expand_result.error()); } + // See issue #4554 + shard_set->AwaitRunningOnShardQueue([](EngineShard* es) { + namespaces->GetDefaultNamespace().GetCurrentDbSlice().SetLoadInProgress(true); + }); + auto new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING); if (new_state != GlobalState::LOADING) { LOG(WARNING) << new_state << " in progress, ignored"; @@ -1155,6 +1160,11 @@ std::optional> ServerFamily::Load(string_view load_pat service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); future.Resolve(*(aggregated_result->first_error)); + // See issue #4554 + // Once we are done we need to clean the state + shard_set->AwaitRunningOnShardQueue([](EngineShard* es) { + namespaces->GetDefaultNamespace().GetCurrentDbSlice().SetLoadInProgress(false); + }); }; pool.GetNextProactor()->Dispatch(std::move(load_join_func)); From ab451c2cf03368c119729fbefb9fc797ca0fba2e Mon Sep 17 00:00:00 2001 From: kostas Date: Thu, 6 Feb 2025 12:18:05 +0200 Subject: [PATCH 2/8] add test Signed-off-by: kostas --- src/server/rdb_test.cc | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index f2a23ce62126..c4feca4b9de4 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -735,4 +735,19 @@ TEST_F(RdbTest, HugeKeyIssue4497) { EXPECT_EQ(Run({"flushall"}), "OK"); } +TEST_F(RdbTest, HugeKeyIssue4554) { + SetTestFlag("cache_mode", "true"); + // We need to stress one flow/shard such that the others finish early. Lock on hashtags allows + // that. + SetTestFlag("lock_on_hashtags", "true"); + ResetService(); + + EXPECT_EQ( + Run({"debug", "populate", "20", "{tmp}", "20", "rand", "type", "set", "elements", "10000"}), + "OK"); + EXPECT_EQ(Run({"save", "df", "hugekey"}), "OK"); + EXPECT_EQ(Run({"dfly", "load", "hugekey-summary.dfs"}), "OK"); + EXPECT_EQ(Run({"flushall"}), "OK"); +} + } // namespace dfly From ac9943c61efbae2e989881f40fa852476c08e3b8 Mon Sep 17 00:00:00 2001 From: kostas Date: Thu, 6 Feb 2025 13:11:28 +0200 Subject: [PATCH 3/8] use sequence numbers --- src/server/db_slice.cc | 1 - src/server/db_slice.h | 12 ++++++++---- src/server/rdb_load.cc | 16 ++++++++++++++++ src/server/replica.cc | 14 -------------- src/server/server_family.cc | 10 ---------- 5 files changed, 24 insertions(+), 29 deletions(-) diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 4e2c57577819..94755e9b423b 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -283,7 +283,6 @@ DbSlice::DbSlice(uint32_t index, bool cache_mode, EngineShard* owner) cache_mode_(cache_mode), owner_(owner), client_tracking_map_(owner->memory_resource()) { - load_in_progress_ = false; db_arr_.emplace_back(); CreateDb(0); expire_base_[0] = expire_base_[1] = 0; diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 0cae9d6f0f38..8f14c03d609c 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -473,11 +473,15 @@ class DbSlice { bool IsCacheMode() const { // During loading time we never bump elements. - return cache_mode_ && !load_in_progress_; + return cache_mode_ && (load_in_progress_ == 0); } - void SetLoadInProgress(bool in_progress) { - load_in_progress_ = in_progress; + void IncrLoadInProgress() { + ++load_in_progress_; + } + + void DecrLoadInProgress() { + --load_in_progress_; } // Test hook to inspect last locked keys. @@ -585,7 +589,6 @@ class DbSlice { ShardId shard_id_; uint8_t cache_mode_ : 1; - uint8_t load_in_progress_ : 1; EngineShard* owner_; @@ -598,6 +601,7 @@ class DbSlice { size_t soft_budget_limit_ = 0; size_t table_memory_ = 0; uint64_t entries_count_ = 0; + size_t load_in_progress_ = 0; mutable SliceEvents events_; // we may change this even for const operations. diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index d49555009b57..224527f9ca6e 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -2034,6 +2034,11 @@ error_code RdbLoader::Load(io::Source* src) { auto cleanup = absl::Cleanup([&] { FinishLoad(start, &keys_loaded); }); + // Increment local one if it exists + if (EngineShard* es = EngineShard::tlocal(); es) { + namespaces->GetDefaultNamespace().GetCurrentDbSlice().IncrLoadInProgress(); + } + while (!stop_early_.load(memory_order_relaxed)) { if (pause_) { ThisFiber::SleepFor(100ms); @@ -2226,6 +2231,10 @@ void RdbLoader::FinishLoad(absl::Time start_time, size_t* keys_loaded) { shard_set->Add(i, [bc]() mutable { bc->Dec(); }); } bc->Wait(); // wait for sentinels to report. + // Decrement local one if it exists + if (EngineShard* es = EngineShard::tlocal(); es) { + namespaces->GetDefaultNamespace().GetCurrentDbSlice().IncrLoadInProgress(); + } absl::Duration dur = absl::Now() - start_time; load_time_ = double(absl::ToInt64Milliseconds(dur)) / 1000; @@ -2509,7 +2518,12 @@ void RdbLoader::FlushShardAsync(ShardId sid) { return; auto cb = [indx = this->cur_db_index_, this, ib = std::move(out_buf)] { + // Before we start loading, increment LoadInProgress. + // This is required because FlushShardAsync dispatches to multiple shards, and those shards + // might have not yet have their state (load in progress) incremented. + namespaces->GetDefaultNamespace().GetCurrentDbSlice().IncrLoadInProgress(); this->LoadItemsBuffer(indx, ib); + namespaces->GetDefaultNamespace().GetCurrentDbSlice().DecrLoadInProgress(); // Block, if tiered storage is active, but can't keep up while (EngineShard::tlocal()->ShouldThrottleForTiering()) { @@ -2548,6 +2562,8 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) { DbContext db_cntx{&namespaces->GetDefaultNamespace(), db_ind, GetCurrentTimeMs()}; DbSlice& db_slice = db_cntx.GetDbSlice(es->shard_id()); + DCHECK(!db_slice.IsCacheMode()); + auto error_msg = [](const auto* item, auto db_ind) { return absl::StrCat("Found empty key: ", item->key, " in DB ", db_ind, " rdb_type ", item->val.rdb_type); diff --git a/src/server/replica.cc b/src/server/replica.cc index f1166f1f5c5b..5f8438a5df91 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -542,21 +542,7 @@ error_code Replica::InitiateDflySync() { // Lock to prevent the error handler from running instantly // while the flows are in a mixed state. lock_guard lk{flows_op_mu_}; - absl::Cleanup clean = [this]() { - shard_set->AwaitRunningOnShardQueue([](EngineShard* es) { - namespaces->GetDefaultNamespace().GetCurrentDbSlice().SetLoadInProgress(false); - }); - }; - // See issue #4554 - // We really need two dispatches here because StartSyncFlow executes on different shards. - // If the sync fiber on one of the threads, starts, loads the snapshots and starts flushing - // the loaded data to the shards asynchronously via LoadItemsBuffer() while the other thread - // did not even spawned the Sync flow fiber, it will start Bumping items because - // SetLoadInProgress will still be false. - shard_set->AwaitRunningOnShardQueue([](EngineShard* es) { - namespaces->GetDefaultNamespace().GetCurrentDbSlice().SetLoadInProgress(true); - }); shard_set->pool()->AwaitFiberOnAll(std::move(shard_cb)); size_t num_full_flows = diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 014f449b28e2..a1cd3b5feef1 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1100,11 +1100,6 @@ std::optional> ServerFamily::Load(string_view load_pat return immediate(expand_result.error()); } - // See issue #4554 - shard_set->AwaitRunningOnShardQueue([](EngineShard* es) { - namespaces->GetDefaultNamespace().GetCurrentDbSlice().SetLoadInProgress(true); - }); - auto new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING); if (new_state != GlobalState::LOADING) { LOG(WARNING) << new_state << " in progress, ignored"; @@ -1160,11 +1155,6 @@ std::optional> ServerFamily::Load(string_view load_pat service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); future.Resolve(*(aggregated_result->first_error)); - // See issue #4554 - // Once we are done we need to clean the state - shard_set->AwaitRunningOnShardQueue([](EngineShard* es) { - namespaces->GetDefaultNamespace().GetCurrentDbSlice().SetLoadInProgress(false); - }); }; pool.GetNextProactor()->Dispatch(std::move(load_join_func)); From da658d59f57136c7d4f81ab2c23369531aa72175 Mon Sep 17 00:00:00 2001 From: kostas Date: Fri, 7 Feb 2025 10:24:35 +0200 Subject: [PATCH 4/8] comments Signed-off-by: kostas --- src/server/db_slice.cc | 3 ++- src/server/db_slice.h | 8 ++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 94755e9b423b..cd4e91ba46fd 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -794,7 +794,8 @@ void DbSlice::FlushDbIndexes(const std::vector& indexes) { std::swap(db_arr_[index]->trans_locks, flush_db_arr[index]->trans_locks); } - CHECK(fetched_items_.empty()); + LOG_IF(DFATAL, fetched_items_.empty()) + << "Some operation might bumped up items outside of a transaction"; auto cb = [indexes, flush_db_arr = std::move(flush_db_arr)]() mutable { flush_db_arr.clear(); diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 8f14c03d609c..4e4a992b8d45 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -473,15 +473,15 @@ class DbSlice { bool IsCacheMode() const { // During loading time we never bump elements. - return cache_mode_ && (load_in_progress_ == 0); + return cache_mode_ && (load_ref_count_ == 0); } void IncrLoadInProgress() { - ++load_in_progress_; + ++load_ref_count_; } void DecrLoadInProgress() { - --load_in_progress_; + --load_ref_count_; } // Test hook to inspect last locked keys. @@ -601,7 +601,7 @@ class DbSlice { size_t soft_budget_limit_ = 0; size_t table_memory_ = 0; uint64_t entries_count_ = 0; - size_t load_in_progress_ = 0; + unsigned load_ref_count_ = 0; mutable SliceEvents events_; // we may change this even for const operations. From b31f119aaf68e70366d105da5df43bc0fb5be5cf Mon Sep 17 00:00:00 2001 From: kostas Date: Fri, 7 Feb 2025 10:53:09 +0200 Subject: [PATCH 5/8] revert log if condition --- src/server/db_slice.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index cd4e91ba46fd..7266a6453a7a 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -794,7 +794,7 @@ void DbSlice::FlushDbIndexes(const std::vector& indexes) { std::swap(db_arr_[index]->trans_locks, flush_db_arr[index]->trans_locks); } - LOG_IF(DFATAL, fetched_items_.empty()) + LOG_IF(DFATAL, !fetched_items_.empty()) << "Some operation might bumped up items outside of a transaction"; auto cb = [indexes, flush_db_arr = std::move(flush_db_arr)]() mutable { From 3af2133a32cf85de7594f52b88ea6be81abff05d Mon Sep 17 00:00:00 2001 From: kostas Date: Thu, 13 Feb 2025 11:12:01 +0200 Subject: [PATCH 6/8] add dcheck --- src/server/db_slice.h | 4 ++++ src/server/main_service.cc | 9 ++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 4e4a992b8d45..ed910df524bc 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -526,6 +526,10 @@ class DbSlice { return &block_counter_; } + bool IsFetchedItemsEmpty() const { + return fetched_items_.empty(); + } + private: void PreUpdate(DbIndex db_ind, Iterator it, std::string_view key); void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index cc705a38c9d6..1b74c89b2f0e 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -2508,7 +2508,14 @@ GlobalState Service::SwitchState(GlobalState from, GlobalState to) { VLOG(1) << "Switching state from " << from << " to " << to; global_state_ = to; - pp_.Await([&](ProactorBase*) { ServerState::tlocal()->set_gstate(to); }); + pp_.Await([&](ProactorBase*) { + ServerState::tlocal()->set_gstate(to); + auto* es = EngineShard::tlocal(); + if (es && to == GlobalState::ACTIVE) { + DbSlice& db = namespaces->GetDefaultNamespace().GetDbSlice(es->shard_id()); + DCHECK(db.IsFetchedItemsEmpty()); + } + }); return to; } From 084ce9162d3c57b1ceb67a79f91d5051760a568f Mon Sep 17 00:00:00 2001 From: kostas Date: Thu, 13 Feb 2025 11:24:47 +0200 Subject: [PATCH 7/8] comments --- src/server/db_slice.h | 11 +++++++---- src/server/main_service.cc | 2 +- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/server/db_slice.h b/src/server/db_slice.h index ed910df524bc..2a1f0b824256 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -484,6 +484,13 @@ class DbSlice { --load_ref_count_; } + bool IsLoadInProgressZeroInCacheMode() const { + if (!cache_mode_) { + return true; + } + return IsCacheMode(); + } + // Test hook to inspect last locked keys. const auto& TEST_GetLastLockedFps() const { return uniq_fps_; @@ -526,10 +533,6 @@ class DbSlice { return &block_counter_; } - bool IsFetchedItemsEmpty() const { - return fetched_items_.empty(); - } - private: void PreUpdate(DbIndex db_ind, Iterator it, std::string_view key); void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 1b74c89b2f0e..cbb5c0f19ff4 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -2513,7 +2513,7 @@ GlobalState Service::SwitchState(GlobalState from, GlobalState to) { auto* es = EngineShard::tlocal(); if (es && to == GlobalState::ACTIVE) { DbSlice& db = namespaces->GetDefaultNamespace().GetDbSlice(es->shard_id()); - DCHECK(db.IsFetchedItemsEmpty()); + DCHECK(db.IsLoadInProgressZeroInCacheMode()); } }); return to; From 72bd0fa13eb1aa5c1133de22ab6200e651c17266 Mon Sep 17 00:00:00 2001 From: kostas Date: Thu, 13 Feb 2025 13:03:08 +0200 Subject: [PATCH 8/8] fixes --- src/server/db_slice.h | 7 ++----- src/server/main_service.cc | 2 +- src/server/rdb_load.cc | 2 +- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 2a1f0b824256..94ed5ed1d231 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -484,11 +484,8 @@ class DbSlice { --load_ref_count_; } - bool IsLoadInProgressZeroInCacheMode() const { - if (!cache_mode_) { - return true; - } - return IsCacheMode(); + bool IsLoadRefCountZero() const { + return load_ref_count_ == 0; } // Test hook to inspect last locked keys. diff --git a/src/server/main_service.cc b/src/server/main_service.cc index cbb5c0f19ff4..b3851b4b94fa 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -2513,7 +2513,7 @@ GlobalState Service::SwitchState(GlobalState from, GlobalState to) { auto* es = EngineShard::tlocal(); if (es && to == GlobalState::ACTIVE) { DbSlice& db = namespaces->GetDefaultNamespace().GetDbSlice(es->shard_id()); - DCHECK(db.IsLoadInProgressZeroInCacheMode()); + DCHECK(db.IsLoadRefCountZero()); } }); return to; diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 224527f9ca6e..4189332e747f 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -2233,7 +2233,7 @@ void RdbLoader::FinishLoad(absl::Time start_time, size_t* keys_loaded) { bc->Wait(); // wait for sentinels to report. // Decrement local one if it exists if (EngineShard* es = EngineShard::tlocal(); es) { - namespaces->GetDefaultNamespace().GetCurrentDbSlice().IncrLoadInProgress(); + namespaces->GetDefaultNamespace().GetCurrentDbSlice().DecrLoadInProgress(); } absl::Duration dur = absl::Now() - start_time;