diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 4e2c57577819..7266a6453a7a 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -283,7 +283,6 @@ DbSlice::DbSlice(uint32_t index, bool cache_mode, EngineShard* owner) cache_mode_(cache_mode), owner_(owner), client_tracking_map_(owner->memory_resource()) { - load_in_progress_ = false; db_arr_.emplace_back(); CreateDb(0); expire_base_[0] = expire_base_[1] = 0; @@ -795,7 +794,8 @@ void DbSlice::FlushDbIndexes(const std::vector& indexes) { std::swap(db_arr_[index]->trans_locks, flush_db_arr[index]->trans_locks); } - CHECK(fetched_items_.empty()); + LOG_IF(DFATAL, !fetched_items_.empty()) + << "Some operation might bumped up items outside of a transaction"; auto cb = [indexes, flush_db_arr = std::move(flush_db_arr)]() mutable { flush_db_arr.clear(); diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 0cae9d6f0f38..94ed5ed1d231 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -473,11 +473,19 @@ class DbSlice { bool IsCacheMode() const { // During loading time we never bump elements. - return cache_mode_ && !load_in_progress_; + return cache_mode_ && (load_ref_count_ == 0); } - void SetLoadInProgress(bool in_progress) { - load_in_progress_ = in_progress; + void IncrLoadInProgress() { + ++load_ref_count_; + } + + void DecrLoadInProgress() { + --load_ref_count_; + } + + bool IsLoadRefCountZero() const { + return load_ref_count_ == 0; } // Test hook to inspect last locked keys. @@ -585,7 +593,6 @@ class DbSlice { ShardId shard_id_; uint8_t cache_mode_ : 1; - uint8_t load_in_progress_ : 1; EngineShard* owner_; @@ -598,6 +605,7 @@ class DbSlice { size_t soft_budget_limit_ = 0; size_t table_memory_ = 0; uint64_t entries_count_ = 0; + unsigned load_ref_count_ = 0; mutable SliceEvents events_; // we may change this even for const operations. diff --git a/src/server/main_service.cc b/src/server/main_service.cc index cc705a38c9d6..b3851b4b94fa 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -2508,7 +2508,14 @@ GlobalState Service::SwitchState(GlobalState from, GlobalState to) { VLOG(1) << "Switching state from " << from << " to " << to; global_state_ = to; - pp_.Await([&](ProactorBase*) { ServerState::tlocal()->set_gstate(to); }); + pp_.Await([&](ProactorBase*) { + ServerState::tlocal()->set_gstate(to); + auto* es = EngineShard::tlocal(); + if (es && to == GlobalState::ACTIVE) { + DbSlice& db = namespaces->GetDefaultNamespace().GetDbSlice(es->shard_id()); + DCHECK(db.IsLoadRefCountZero()); + } + }); return to; } diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 192df90b041f..4189332e747f 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -2034,9 +2034,11 @@ error_code RdbLoader::Load(io::Source* src) { auto cleanup = absl::Cleanup([&] { FinishLoad(start, &keys_loaded); }); - shard_set->AwaitRunningOnShardQueue([](EngineShard* es) { - namespaces->GetDefaultNamespace().GetCurrentDbSlice().SetLoadInProgress(true); - }); + // Increment local one if it exists + if (EngineShard* es = EngineShard::tlocal(); es) { + namespaces->GetDefaultNamespace().GetCurrentDbSlice().IncrLoadInProgress(); + } + while (!stop_early_.load(memory_order_relaxed)) { if (pause_) { ThisFiber::SleepFor(100ms); @@ -2226,12 +2228,13 @@ void RdbLoader::FinishLoad(absl::Time start_time, size_t* keys_loaded) { FlushShardAsync(i); // Send sentinel callbacks to ensure that all previous messages have been processed. - shard_set->Add(i, [bc]() mutable { - namespaces->GetDefaultNamespace().GetCurrentDbSlice().SetLoadInProgress(false); - bc->Dec(); - }); + shard_set->Add(i, [bc]() mutable { bc->Dec(); }); } bc->Wait(); // wait for sentinels to report. + // Decrement local one if it exists + if (EngineShard* es = EngineShard::tlocal(); es) { + namespaces->GetDefaultNamespace().GetCurrentDbSlice().DecrLoadInProgress(); + } absl::Duration dur = absl::Now() - start_time; load_time_ = double(absl::ToInt64Milliseconds(dur)) / 1000; @@ -2515,7 +2518,12 @@ void RdbLoader::FlushShardAsync(ShardId sid) { return; auto cb = [indx = this->cur_db_index_, this, ib = std::move(out_buf)] { + // Before we start loading, increment LoadInProgress. + // This is required because FlushShardAsync dispatches to multiple shards, and those shards + // might have not yet have their state (load in progress) incremented. + namespaces->GetDefaultNamespace().GetCurrentDbSlice().IncrLoadInProgress(); this->LoadItemsBuffer(indx, ib); + namespaces->GetDefaultNamespace().GetCurrentDbSlice().DecrLoadInProgress(); // Block, if tiered storage is active, but can't keep up while (EngineShard::tlocal()->ShouldThrottleForTiering()) { @@ -2554,6 +2562,8 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) { DbContext db_cntx{&namespaces->GetDefaultNamespace(), db_ind, GetCurrentTimeMs()}; DbSlice& db_slice = db_cntx.GetDbSlice(es->shard_id()); + DCHECK(!db_slice.IsCacheMode()); + auto error_msg = [](const auto* item, auto db_ind) { return absl::StrCat("Found empty key: ", item->key, " in DB ", db_ind, " rdb_type ", item->val.rdb_type); diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index f2a23ce62126..c4feca4b9de4 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -735,4 +735,19 @@ TEST_F(RdbTest, HugeKeyIssue4497) { EXPECT_EQ(Run({"flushall"}), "OK"); } +TEST_F(RdbTest, HugeKeyIssue4554) { + SetTestFlag("cache_mode", "true"); + // We need to stress one flow/shard such that the others finish early. Lock on hashtags allows + // that. + SetTestFlag("lock_on_hashtags", "true"); + ResetService(); + + EXPECT_EQ( + Run({"debug", "populate", "20", "{tmp}", "20", "rand", "type", "set", "elements", "10000"}), + "OK"); + EXPECT_EQ(Run({"save", "df", "hugekey"}), "OK"); + EXPECT_EQ(Run({"dfly", "load", "hugekey-summary.dfs"}), "OK"); + EXPECT_EQ(Run({"flushall"}), "OK"); +} + } // namespace dfly diff --git a/src/server/replica.cc b/src/server/replica.cc index 5a627777d596..5f8438a5df91 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -542,6 +542,7 @@ error_code Replica::InitiateDflySync() { // Lock to prevent the error handler from running instantly // while the flows are in a mixed state. lock_guard lk{flows_op_mu_}; + shard_set->pool()->AwaitFiberOnAll(std::move(shard_cb)); size_t num_full_flows =