Skip to content

Commit

Permalink
fix: disable ThreadLocalMutex when big value ser is off (#3521)
Browse files Browse the repository at this point in the history
* fix: disable ThreadLocalMutex when big value ser is off

* refactor: address comments

---------

Co-authored-by: Ubuntu <[email protected]>
Co-authored-by: Borys <[email protected]>
  • Loading branch information
3 people authored and romange committed Aug 15, 2024
1 parent 1e7ca5a commit 0da7bd2
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 12 deletions.
25 changes: 18 additions & 7 deletions src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ atomic_uint64_t rss_mem_peak(0);

unsigned kernel_version = 0;
size_t max_memory_limit = 0;
size_t serialization_max_chunk_size = 0;

const char* GlobalStateName(GlobalState s) {
switch (s) {
Expand Down Expand Up @@ -462,16 +463,26 @@ ThreadLocalMutex::~ThreadLocalMutex() {
}

void ThreadLocalMutex::lock() {
DCHECK_EQ(EngineShard::tlocal(), shard_);
util::fb2::NoOpLock noop_lk_;
cond_var_.wait(noop_lk_, [this]() { return !flag_; });
flag_ = true;
if (serialization_max_chunk_size != 0) {
DCHECK_EQ(EngineShard::tlocal(), shard_);
util::fb2::NoOpLock noop_lk_;
if (locked_fiber_ != nullptr) {
DCHECK(util::fb2::detail::FiberActive() != locked_fiber_);
}
cond_var_.wait(noop_lk_, [this]() { return !flag_; });
flag_ = true;
DCHECK_EQ(locked_fiber_, nullptr);
locked_fiber_ = util::fb2::detail::FiberActive();
}
}

void ThreadLocalMutex::unlock() {
DCHECK_EQ(EngineShard::tlocal(), shard_);
flag_ = false;
cond_var_.notify_one();
if (serialization_max_chunk_size != 0) {
DCHECK_EQ(EngineShard::tlocal(), shard_);
flag_ = false;
cond_var_.notify_one();
locked_fiber_ = nullptr;
}
}

} // namespace dfly
3 changes: 3 additions & 0 deletions src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,9 @@ class ThreadLocalMutex {
EngineShard* shard_;
util::fb2::CondVarAny cond_var_;
bool flag_ = false;
util::fb2::detail::FiberInterface* locked_fiber_{nullptr};
};

extern size_t serialization_max_chunk_size;

} // namespace dfly
7 changes: 7 additions & 0 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,16 @@ ABSL_FLAG(dfly::MemoryBytesFlag, maxmemory, dfly::MemoryBytesFlag{},
"Limit on maximum-memory that is used by the database. "
"0 - means the program will automatically determine its maximum memory usage. "
"default: 0");

ABSL_FLAG(double, oom_deny_ratio, 1.1,
"commands with flag denyoom will return OOM when the ratio between maxmemory and used "
"memory is above this value");

ABSL_FLAG(size_t, serialization_max_chunk_size, 0,
"Maximum size of a value that may be serialized at once during snapshotting or full "
"sync. Values bigger than this threshold will be serialized using streaming "
"serialization. 0 - to disable streaming mode");

namespace dfly {

#if defined(__linux__)
Expand Down Expand Up @@ -871,6 +877,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
config_registry.RegisterMutable("dbfilename");
config_registry.RegisterMutable("table_growth_margin");

serialization_max_chunk_size = GetFlag(FLAGS_serialization_max_chunk_size);
uint32_t shard_num = GetFlag(FLAGS_num_shards);
if (shard_num == 0 || shard_num > pp_.size()) {
LOG_IF(WARNING, shard_num > pp_.size())
Expand Down
6 changes: 1 addition & 5 deletions src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@
#include "util/fibers/synchronization.h"

using facade::operator""_MB;
ABSL_FLAG(size_t, serialization_max_chunk_size, 0,
"Maximum size of a value that may be serialized at once during snapshotting or full "
"sync. Values bigger than this threshold will be serialized using streaming "
"serialization. 0 - to disable streaming mode");

namespace dfly {

Expand Down Expand Up @@ -76,7 +72,7 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll, Snapshot
journal_cb_id_ = journal->RegisterOnChange(std::move(journal_cb));
}

const auto flush_threshold = absl::GetFlag(FLAGS_serialization_max_chunk_size);
const auto flush_threshold = serialization_max_chunk_size;
std::function<void(size_t, RdbSerializer::FlushState)> flush_fun;
if (flush_threshold != 0 && allow_flush == SnapshotFlush::kAllow) {
flush_fun = [this, flush_threshold](size_t bytes_serialized,
Expand Down

0 comments on commit 0da7bd2

Please sign in to comment.