diff --git a/src/server/common.cc b/src/server/common.cc index 3f4d9ebb6914..732907ead879 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -117,6 +117,7 @@ atomic_uint64_t rss_mem_peak(0); unsigned kernel_version = 0; size_t max_memory_limit = 0; +size_t serialization_max_chunk_size = 0; const char* GlobalStateName(GlobalState s) { switch (s) { @@ -462,16 +463,26 @@ ThreadLocalMutex::~ThreadLocalMutex() { } void ThreadLocalMutex::lock() { - DCHECK_EQ(EngineShard::tlocal(), shard_); - util::fb2::NoOpLock noop_lk_; - cond_var_.wait(noop_lk_, [this]() { return !flag_; }); - flag_ = true; + if (serialization_max_chunk_size != 0) { + DCHECK_EQ(EngineShard::tlocal(), shard_); + util::fb2::NoOpLock noop_lk_; + if (locked_fiber_ != nullptr) { + DCHECK(util::fb2::detail::FiberActive() != locked_fiber_); + } + cond_var_.wait(noop_lk_, [this]() { return !flag_; }); + flag_ = true; + DCHECK_EQ(locked_fiber_, nullptr); + locked_fiber_ = util::fb2::detail::FiberActive(); + } } void ThreadLocalMutex::unlock() { - DCHECK_EQ(EngineShard::tlocal(), shard_); - flag_ = false; - cond_var_.notify_one(); + if (serialization_max_chunk_size != 0) { + DCHECK_EQ(EngineShard::tlocal(), shard_); + flag_ = false; + cond_var_.notify_one(); + locked_fiber_ = nullptr; + } } } // namespace dfly diff --git a/src/server/common.h b/src/server/common.h index 33efbcb35e73..d3a9c2ce9b75 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -379,6 +379,9 @@ class ThreadLocalMutex { EngineShard* shard_; util::fb2::CondVarAny cond_var_; bool flag_ = false; + util::fb2::detail::FiberInterface* locked_fiber_{nullptr}; }; +extern size_t serialization_max_chunk_size; + } // namespace dfly diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 16e79e7df9dc..a262867c6d7d 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -99,10 +99,16 @@ ABSL_FLAG(dfly::MemoryBytesFlag, maxmemory, dfly::MemoryBytesFlag{}, "Limit on maximum-memory that is used by the database. " "0 - means the program will automatically determine its maximum memory usage. " "default: 0"); + ABSL_FLAG(double, oom_deny_ratio, 1.1, "commands with flag denyoom will return OOM when the ratio between maxmemory and used " "memory is above this value"); +ABSL_FLAG(size_t, serialization_max_chunk_size, 0, + "Maximum size of a value that may be serialized at once during snapshotting or full " + "sync. Values bigger than this threshold will be serialized using streaming " + "serialization. 0 - to disable streaming mode"); + namespace dfly { #if defined(__linux__) @@ -871,6 +877,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector config_registry.RegisterMutable("dbfilename"); config_registry.RegisterMutable("table_growth_margin"); + serialization_max_chunk_size = GetFlag(FLAGS_serialization_max_chunk_size); uint32_t shard_num = GetFlag(FLAGS_num_shards); if (shard_num == 0 || shard_num > pp_.size()) { LOG_IF(WARNING, shard_num > pp_.size()) diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index d78b331de3c4..1b448a8cfca1 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -22,10 +22,6 @@ #include "util/fibers/synchronization.h" using facade::operator""_MB; -ABSL_FLAG(size_t, serialization_max_chunk_size, 0, - "Maximum size of a value that may be serialized at once during snapshotting or full " - "sync. Values bigger than this threshold will be serialized using streaming " - "serialization. 0 - to disable streaming mode"); namespace dfly { @@ -76,7 +72,7 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll, Snapshot journal_cb_id_ = journal->RegisterOnChange(std::move(journal_cb)); } - const auto flush_threshold = absl::GetFlag(FLAGS_serialization_max_chunk_size); + const auto flush_threshold = serialization_max_chunk_size; std::function flush_fun; if (flush_threshold != 0 && allow_flush == SnapshotFlush::kAllow) { flush_fun = [this, flush_threshold](size_t bytes_serialized,