Skip to content

Commit

Permalink
fix: race when bumping items while loading a snapshot (#4564)
Browse files Browse the repository at this point in the history
The original issue was submitted in #4497 and we supplied a fix in #4507. However, the fix ignored that the RdbLoader::Load() function is run per flow/shard thread and the "poison pill" of updating the loading state at the end of RdbLoader::Load() introduced a race condition. This PR fixes the race.

Signed-off-by: kostas <[email protected]>
  • Loading branch information
kostasrim authored Feb 13, 2025
1 parent 5849313 commit 82d905c
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 14 deletions.
4 changes: 2 additions & 2 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ DbSlice::DbSlice(uint32_t index, bool cache_mode, EngineShard* owner)
cache_mode_(cache_mode),
owner_(owner),
client_tracking_map_(owner->memory_resource()) {
load_in_progress_ = false;
db_arr_.emplace_back();
CreateDb(0);
expire_base_[0] = expire_base_[1] = 0;
Expand Down Expand Up @@ -795,7 +794,8 @@ void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
std::swap(db_arr_[index]->trans_locks, flush_db_arr[index]->trans_locks);
}

CHECK(fetched_items_.empty());
LOG_IF(DFATAL, !fetched_items_.empty())
<< "Some operation might bumped up items outside of a transaction";

auto cb = [indexes, flush_db_arr = std::move(flush_db_arr)]() mutable {
flush_db_arr.clear();
Expand Down
16 changes: 12 additions & 4 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -473,11 +473,19 @@ class DbSlice {

bool IsCacheMode() const {
// During loading time we never bump elements.
return cache_mode_ && !load_in_progress_;
return cache_mode_ && (load_ref_count_ == 0);
}

void SetLoadInProgress(bool in_progress) {
load_in_progress_ = in_progress;
void IncrLoadInProgress() {
++load_ref_count_;
}

void DecrLoadInProgress() {
--load_ref_count_;
}

bool IsLoadRefCountZero() const {
return load_ref_count_ == 0;
}

// Test hook to inspect last locked keys.
Expand Down Expand Up @@ -585,7 +593,6 @@ class DbSlice {

ShardId shard_id_;
uint8_t cache_mode_ : 1;
uint8_t load_in_progress_ : 1;

EngineShard* owner_;

Expand All @@ -598,6 +605,7 @@ class DbSlice {
size_t soft_budget_limit_ = 0;
size_t table_memory_ = 0;
uint64_t entries_count_ = 0;
unsigned load_ref_count_ = 0;

mutable SliceEvents events_; // we may change this even for const operations.

Expand Down
9 changes: 8 additions & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2509,7 +2509,14 @@ GlobalState Service::SwitchState(GlobalState from, GlobalState to) {
VLOG(1) << "Switching state from " << from << " to " << to;
global_state_ = to;

pp_.Await([&](ProactorBase*) { ServerState::tlocal()->set_gstate(to); });
pp_.Await([&](ProactorBase*) {
ServerState::tlocal()->set_gstate(to);
auto* es = EngineShard::tlocal();
if (es && to == GlobalState::ACTIVE) {
DbSlice& db = namespaces->GetDefaultNamespace().GetDbSlice(es->shard_id());
DCHECK(db.IsLoadRefCountZero());
}
});
return to;
}

Expand Down
24 changes: 17 additions & 7 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2034,9 +2034,11 @@ error_code RdbLoader::Load(io::Source* src) {

auto cleanup = absl::Cleanup([&] { FinishLoad(start, &keys_loaded); });

shard_set->AwaitRunningOnShardQueue([](EngineShard* es) {
namespaces->GetDefaultNamespace().GetCurrentDbSlice().SetLoadInProgress(true);
});
// Increment local one if it exists
if (EngineShard* es = EngineShard::tlocal(); es) {
namespaces->GetDefaultNamespace().GetCurrentDbSlice().IncrLoadInProgress();
}

while (!stop_early_.load(memory_order_relaxed)) {
if (pause_) {
ThisFiber::SleepFor(100ms);
Expand Down Expand Up @@ -2226,12 +2228,13 @@ void RdbLoader::FinishLoad(absl::Time start_time, size_t* keys_loaded) {
FlushShardAsync(i);

// Send sentinel callbacks to ensure that all previous messages have been processed.
shard_set->Add(i, [bc]() mutable {
namespaces->GetDefaultNamespace().GetCurrentDbSlice().SetLoadInProgress(false);
bc->Dec();
});
shard_set->Add(i, [bc]() mutable { bc->Dec(); });
}
bc->Wait(); // wait for sentinels to report.
// Decrement local one if it exists
if (EngineShard* es = EngineShard::tlocal(); es) {
namespaces->GetDefaultNamespace().GetCurrentDbSlice().DecrLoadInProgress();
}

absl::Duration dur = absl::Now() - start_time;
load_time_ = double(absl::ToInt64Milliseconds(dur)) / 1000;
Expand Down Expand Up @@ -2515,7 +2518,12 @@ void RdbLoader::FlushShardAsync(ShardId sid) {
return;

auto cb = [indx = this->cur_db_index_, this, ib = std::move(out_buf)] {
// Before we start loading, increment LoadInProgress.
// This is required because FlushShardAsync dispatches to multiple shards, and those shards
// might have not yet have their state (load in progress) incremented.
namespaces->GetDefaultNamespace().GetCurrentDbSlice().IncrLoadInProgress();
this->LoadItemsBuffer(indx, ib);
namespaces->GetDefaultNamespace().GetCurrentDbSlice().DecrLoadInProgress();

// Block, if tiered storage is active, but can't keep up
while (EngineShard::tlocal()->ShouldThrottleForTiering()) {
Expand Down Expand Up @@ -2554,6 +2562,8 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
DbContext db_cntx{&namespaces->GetDefaultNamespace(), db_ind, GetCurrentTimeMs()};
DbSlice& db_slice = db_cntx.GetDbSlice(es->shard_id());

DCHECK(!db_slice.IsCacheMode());

auto error_msg = [](const auto* item, auto db_ind) {
return absl::StrCat("Found empty key: ", item->key, " in DB ", db_ind, " rdb_type ",
item->val.rdb_type);
Expand Down
15 changes: 15 additions & 0 deletions src/server/rdb_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -735,4 +735,19 @@ TEST_F(RdbTest, HugeKeyIssue4497) {
EXPECT_EQ(Run({"flushall"}), "OK");
}

TEST_F(RdbTest, HugeKeyIssue4554) {
SetTestFlag("cache_mode", "true");
// We need to stress one flow/shard such that the others finish early. Lock on hashtags allows
// that.
SetTestFlag("lock_on_hashtags", "true");
ResetService();

EXPECT_EQ(
Run({"debug", "populate", "20", "{tmp}", "20", "rand", "type", "set", "elements", "10000"}),
"OK");
EXPECT_EQ(Run({"save", "df", "hugekey"}), "OK");
EXPECT_EQ(Run({"dfly", "load", "hugekey-summary.dfs"}), "OK");
EXPECT_EQ(Run({"flushall"}), "OK");
}

} // namespace dfly
1 change: 1 addition & 0 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ error_code Replica::InitiateDflySync() {
// Lock to prevent the error handler from running instantly
// while the flows are in a mixed state.
lock_guard lk{flows_op_mu_};

shard_set->pool()->AwaitFiberOnAll(std::move(shard_cb));

size_t num_full_flows =
Expand Down

0 comments on commit 82d905c

Please sign in to comment.