Skip to content

Commit 1e25be6

Browse files
PokIsemainePragmaTwicegit-hulk
authored
refactor: hoist key mutexes to ExecuteCommands (#2620)
Co-authored-by: Twice <[email protected]> Co-authored-by: hulk <[email protected]>
1 parent d66e827 commit 1e25be6

21 files changed

+107
-254
lines changed

src/commands/blocking_commander.h

+10-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#pragma once
2222

2323
#include "commander.h"
24+
#include "common/lock_manager.h"
2425
#include "event_util.h"
2526
#include "server/redis_connection.h"
2627

@@ -44,6 +45,10 @@ class BlockingCommander : public Commander,
4445
// in other words, returning true indicates ending the blocking
4546
virtual bool OnBlockingWrite() = 0;
4647

48+
// GetLocks() locks the keys of the BlockingCommander with MultiLockGuard.
49+
// When OnWrite() is triggered, BlockingCommander needs to relock the keys.
50+
virtual MultiLockGuard GetLocks() = 0;
51+
4752
// to start the blocking process
4853
// usually put to the end of the Execute method
4954
Status StartBlocking(int64_t timeout, std::string *output) {
@@ -63,7 +68,11 @@ class BlockingCommander : public Commander,
6368
}
6469

6570
void OnWrite(bufferevent *bev) {
66-
bool done = OnBlockingWrite();
71+
bool done{false};
72+
{
73+
auto guard = GetLocks();
74+
done = OnBlockingWrite();
75+
}
6776

6877
if (!done) {
6978
// The connection may be waked up but can't pop from the datatype.

src/commands/cmd_list.cc

+29
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,16 @@ class CommandBPop : public BlockingCommander {
313313
return s;
314314
}
315315

316+
MultiLockGuard GetLocks() override {
317+
std::vector<std::string> lock_keys;
318+
lock_keys.reserve(keys_.size());
319+
for (const auto &key : keys_) {
320+
auto ns_key = ComposeNamespaceKey(conn_->GetNamespace(), key, srv_->storage->IsSlotIdEncoded());
321+
lock_keys.emplace_back(std::move(ns_key));
322+
}
323+
return MultiLockGuard(srv_->storage->GetLockManager(), lock_keys);
324+
}
325+
316326
bool OnBlockingWrite() override {
317327
engine::Context ctx(srv_->storage);
318328
auto s = TryPopFromList(ctx);
@@ -436,6 +446,16 @@ class CommandBLMPop : public BlockingCommander {
436446
}
437447
}
438448

449+
MultiLockGuard GetLocks() override {
450+
std::vector<std::string> lock_keys;
451+
lock_keys.reserve(keys_.size());
452+
for (const auto &key : keys_) {
453+
auto ns_key = ComposeNamespaceKey(conn_->GetNamespace(), key, srv_->storage->IsSlotIdEncoded());
454+
lock_keys.emplace_back(std::move(ns_key));
455+
}
456+
return MultiLockGuard(srv_->storage->GetLockManager(), lock_keys);
457+
}
458+
439459
bool OnBlockingWrite() override {
440460
engine::Context ctx(srv_->storage);
441461
auto s = ExecuteUnblocked(ctx);
@@ -767,6 +787,15 @@ class CommandBLMove : public BlockingCommander {
767787

768788
void UnblockKeys() override { srv_->UnblockOnKey(args_[1], conn_); }
769789

790+
MultiLockGuard GetLocks() override {
791+
std::vector<std::string> lock_keys{
792+
ComposeNamespaceKey(conn_->GetNamespace(), args_[1], srv_->storage->IsSlotIdEncoded())};
793+
if (args_[1] != args_[2]) {
794+
lock_keys.emplace_back(ComposeNamespaceKey(conn_->GetNamespace(), args_[2], srv_->storage->IsSlotIdEncoded()));
795+
}
796+
return MultiLockGuard(srv_->storage->GetLockManager(), lock_keys);
797+
}
798+
770799
bool OnBlockingWrite() override {
771800
redis::List list_db(srv_->storage, conn_->GetNamespace());
772801
std::string elem;

src/commands/cmd_stream.cc

+8
Original file line numberDiff line numberDiff line change
@@ -1635,6 +1635,14 @@ class CommandXReadGroup : public Commander,
16351635
redis::Stream stream_db(srv_->storage, conn_->GetNamespace());
16361636

16371637
std::vector<StreamReadResult> results;
1638+
1639+
std::vector<std::string> lock_keys;
1640+
lock_keys.reserve(streams_.size());
1641+
for (auto &stream_name : streams_) {
1642+
auto ns_key = stream_db.AppendNamespacePrefix(stream_name);
1643+
lock_keys.emplace_back(std::move(ns_key));
1644+
}
1645+
MultiLockGuard guard(srv_->storage->GetLockManager(), lock_keys);
16381646
engine::Context ctx(srv_->storage);
16391647
for (size_t i = 0; i < streams_.size(); ++i) {
16401648
redis::StreamRangeOptions options;

src/commands/cmd_zset.cc

+20
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,16 @@ class CommandBZPop : public BlockingCommander {
366366
conn_->Reply(output);
367367
}
368368

369+
MultiLockGuard GetLocks() override {
370+
std::vector<std::string> lock_keys;
371+
lock_keys.reserve(keys_.size());
372+
for (const auto &key : keys_) {
373+
auto ns_key = ComposeNamespaceKey(conn_->GetNamespace(), key, srv_->storage->IsSlotIdEncoded());
374+
lock_keys.emplace_back(std::move(ns_key));
375+
}
376+
return MultiLockGuard(srv_->storage->GetLockManager(), lock_keys);
377+
}
378+
369379
bool OnBlockingWrite() override {
370380
std::string user_key;
371381
std::vector<MemberScore> member_scores;
@@ -548,6 +558,16 @@ class CommandBZMPop : public BlockingCommander {
548558

549559
std::string NoopReply(const Connection *conn) override { return conn->NilString(); }
550560

561+
MultiLockGuard GetLocks() override {
562+
std::vector<std::string> lock_keys;
563+
lock_keys.reserve(keys_.size());
564+
for (const auto &key : keys_) {
565+
auto ns_key = ComposeNamespaceKey(conn_->GetNamespace(), key, srv_->storage->IsSlotIdEncoded());
566+
lock_keys.emplace_back(std::move(ns_key));
567+
}
568+
return MultiLockGuard(srv_->storage->GetLockManager(), lock_keys);
569+
}
570+
551571
bool OnBlockingWrite() override {
552572
std::string user_key;
553573
std::vector<MemberScore> member_scores;

src/server/redis_connection.cc

+16-2
Original file line numberDiff line numberDiff line change
@@ -489,9 +489,24 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
489489

490490
SetLastCmd(cmd_name);
491491
{
492+
std::optional<MultiLockGuard> guard;
493+
if (cmd_flags & kCmdWrite) {
494+
std::vector<std::string> lock_keys;
495+
attributes->ForEachKeyRange(
496+
[&lock_keys, this](const std::vector<std::string> &args, const CommandKeyRange &key_range) {
497+
key_range.ForEachKey(
498+
[&, this](const std::string &key) {
499+
auto ns_key = ComposeNamespaceKey(ns_, key, srv_->storage->IsSlotIdEncoded());
500+
lock_keys.emplace_back(std::move(ns_key));
501+
},
502+
args);
503+
},
504+
cmd_tokens);
505+
506+
guard.emplace(srv_->storage->GetLockManager(), lock_keys);
507+
}
492508
engine::Context ctx(srv_->storage);
493509

494-
// TODO: transaction support for index recording
495510
std::vector<GlobalIndexer::RecordResult> index_records;
496511
if (!srv_->index_mgr.index_map.empty() && IsCmdForIndexing(cmd_flags, attributes->category) &&
497512
!config->cluster_enabled) {
@@ -512,7 +527,6 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
512527
}
513528

514529
s = ExecuteCommand(ctx, cmd_name, cmd_tokens, current_cmd.get(), &reply);
515-
// TODO: transaction support for index updating
516530
for (const auto &record : index_records) {
517531
auto s = GlobalIndexer::Update(ctx, record);
518532
if (!s.IsOK() && !s.Is<Status::TypeMismatched>()) {

src/storage/redis_db.cc

+8-12
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ rocksdb::Status Database::Expire(engine::Context &ctx, const Slice &user_key, ui
112112

113113
std::string value;
114114
Metadata metadata(kRedisNone, false);
115-
LockGuard guard(storage_->GetLockManager(), ns_key);
115+
116116
rocksdb::Status s = storage_->Get(ctx, ctx.GetReadOptions(), metadata_cf_handle_, ns_key, &value);
117117
if (!s.ok()) return s;
118118

@@ -150,7 +150,7 @@ rocksdb::Status Database::Del(engine::Context &ctx, const Slice &user_key) {
150150
std::string ns_key = AppendNamespacePrefix(user_key);
151151

152152
std::string value;
153-
LockGuard guard(storage_->GetLockManager(), ns_key);
153+
154154
rocksdb::Status s = storage_->Get(ctx, ctx.GetReadOptions(), metadata_cf_handle_, ns_key, &value);
155155
if (!s.ok()) return s;
156156
Metadata metadata(kRedisNone, false);
@@ -165,13 +165,12 @@ rocksdb::Status Database::Del(engine::Context &ctx, const Slice &user_key) {
165165
rocksdb::Status Database::MDel(engine::Context &ctx, const std::vector<Slice> &keys, uint64_t *deleted_cnt) {
166166
*deleted_cnt = 0;
167167

168-
std::vector<std::string> lock_keys;
169-
lock_keys.reserve(keys.size());
168+
std::vector<std::string> ns_keys;
169+
ns_keys.reserve(keys.size());
170170
for (const auto &key : keys) {
171171
std::string ns_key = AppendNamespacePrefix(key);
172-
lock_keys.emplace_back(std::move(ns_key));
172+
ns_keys.emplace_back(std::move(ns_key));
173173
}
174-
MultiLockGuard guard(storage_->GetLockManager(), lock_keys);
175174

176175
auto batch = storage_->GetWriteBatchBase();
177176
WriteBatchLogData log_data(kRedisNone);
@@ -181,8 +180,8 @@ rocksdb::Status Database::MDel(engine::Context &ctx, const std::vector<Slice> &k
181180
}
182181

183182
std::vector<Slice> slice_keys;
184-
slice_keys.reserve(lock_keys.size());
185-
for (const auto &ns_key : lock_keys) {
183+
slice_keys.reserve(ns_keys.size());
184+
for (const auto &ns_key : ns_keys) {
186185
slice_keys.emplace_back(ns_key);
187186
}
188187

@@ -202,7 +201,7 @@ rocksdb::Status Database::MDel(engine::Context &ctx, const std::vector<Slice> &k
202201
if (!s.ok()) continue;
203202
if (metadata.Expired()) continue;
204203

205-
s = batch->Delete(metadata_cf_handle_, lock_keys[i]);
204+
s = batch->Delete(metadata_cf_handle_, ns_keys[i]);
206205
if (!s.ok()) return s;
207206
*deleted_cnt += 1;
208207
}
@@ -652,9 +651,6 @@ rocksdb::Status Database::typeInternal(engine::Context &ctx, const Slice &key, R
652651

653652
rocksdb::Status Database::Copy(engine::Context &ctx, const std::string &key, const std::string &new_key, bool nx,
654653
bool delete_old, CopyResult *res) {
655-
std::vector<std::string> lock_keys = {key, new_key};
656-
MultiLockGuard guard(storage_->GetLockManager(), lock_keys);
657-
658654
RedisType type = kRedisNone;
659655
auto s = typeInternal(ctx, key, &type);
660656
if (!s.ok()) return s;

src/types/redis_bitmap.cc

+1-9
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,6 @@ rocksdb::Status Bitmap::SetBit(engine::Context &ctx, const Slice &user_key, uint
184184
std::string raw_value;
185185
std::string ns_key = AppendNamespacePrefix(user_key);
186186

187-
LockGuard guard(storage_->GetLockManager(), ns_key);
188187
BitmapMetadata metadata;
189188
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata, &raw_value);
190189
if (!s.ok() && !s.IsNotFound()) return s;
@@ -461,7 +460,6 @@ rocksdb::Status Bitmap::BitOp(engine::Context &ctx, BitOpFlags op_flag, const st
461460
const Slice &user_key, const std::vector<Slice> &op_keys, int64_t *len) {
462461
std::string raw_value;
463462
std::string ns_key = AppendNamespacePrefix(user_key);
464-
LockGuard guard(storage_->GetLockManager(), ns_key);
465463

466464
std::vector<std::pair<std::string, BitmapMetadata>> meta_pairs;
467465
uint64_t max_bitmap_size = 0;
@@ -824,15 +822,9 @@ template <bool ReadOnly>
824822
rocksdb::Status Bitmap::bitfield(engine::Context &ctx, const Slice &user_key, const std::vector<BitfieldOperation> &ops,
825823
std::vector<std::optional<BitfieldValue>> *rets) {
826824
std::string ns_key = AppendNamespacePrefix(user_key);
827-
828-
std::optional<LockGuard> guard;
829-
if constexpr (!ReadOnly) {
830-
guard = LockGuard(storage_->GetLockManager(), ns_key);
831-
}
832-
833825
BitmapMetadata metadata;
834826
std::string raw_value;
835-
// TODO(mwish): maintain snapshot for read-only bitfield.
827+
836828
auto s = GetMetadata(ctx, ns_key, &metadata, &raw_value);
837829
if (!s.ok() && !s.IsNotFound()) {
838830
return s;

src/types/redis_bloom_chain.cc

-2
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,6 @@ rocksdb::Status BloomChain::Reserve(engine::Context &ctx, const Slice &user_key,
126126
uint16_t expansion) {
127127
std::string ns_key = AppendNamespacePrefix(user_key);
128128

129-
LockGuard guard(storage_->GetLockManager(), ns_key);
130129
BloomChainMetadata bloom_chain_metadata;
131130
rocksdb::Status s = getBloomChainMetadata(ctx, ns_key, &bloom_chain_metadata);
132131
if (!s.ok() && !s.IsNotFound()) return s;
@@ -156,7 +155,6 @@ rocksdb::Status BloomChain::InsertCommon(engine::Context &ctx, const Slice &user
156155
const BloomFilterInsertOptions &insert_options,
157156
std::vector<BloomFilterAddResult> *rets) {
158157
std::string ns_key = AppendNamespacePrefix(user_key);
159-
LockGuard guard(storage_->GetLockManager(), ns_key);
160158

161159
BloomChainMetadata metadata;
162160
rocksdb::Status s = getBloomChainMetadata(ctx, ns_key, &metadata);

src/types/redis_hash.cc

+1-4
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ rocksdb::Status Hash::IncrBy(engine::Context &ctx, const Slice &user_key, const
6666

6767
std::string ns_key = AppendNamespacePrefix(user_key);
6868

69-
LockGuard guard(storage_->GetLockManager(), ns_key);
7069
HashMetadata metadata;
7170
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
7271
if (!s.ok() && !s.IsNotFound()) return s;
@@ -117,7 +116,6 @@ rocksdb::Status Hash::IncrByFloat(engine::Context &ctx, const Slice &user_key, c
117116

118117
std::string ns_key = AppendNamespacePrefix(user_key);
119118

120-
LockGuard guard(storage_->GetLockManager(), ns_key);
121119
HashMetadata metadata;
122120
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
123121
if (!s.ok() && !s.IsNotFound()) return s;
@@ -211,7 +209,7 @@ rocksdb::Status Hash::Delete(engine::Context &ctx, const Slice &user_key, const
211209
WriteBatchLogData log_data(kRedisHash);
212210
auto s = batch->PutLogData(log_data.Encode());
213211
if (!s.ok()) return s;
214-
LockGuard guard(storage_->GetLockManager(), ns_key);
212+
215213
s = GetMetadata(ctx, ns_key, &metadata);
216214
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
217215

@@ -245,7 +243,6 @@ rocksdb::Status Hash::MSet(engine::Context &ctx, const Slice &user_key, const st
245243
*added_cnt = 0;
246244
std::string ns_key = AppendNamespacePrefix(user_key);
247245

248-
LockGuard guard(storage_->GetLockManager(), ns_key);
249246
HashMetadata metadata;
250247
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
251248
if (!s.ok() && !s.IsNotFound()) return s;

src/types/redis_hyperloglog.cc

-2
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ rocksdb::Status HyperLogLog::Add(engine::Context &ctx, const Slice &user_key,
112112
*ret = 0;
113113
std::string ns_key = AppendNamespacePrefix(user_key);
114114

115-
LockGuard guard(storage_->GetLockManager(), ns_key);
116115
HyperLogLogMetadata metadata{};
117116
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
118117
if (!s.ok() && !s.IsNotFound()) {
@@ -238,7 +237,6 @@ rocksdb::Status HyperLogLog::Merge(engine::Context &ctx, const Slice &dest_user_
238237
}
239238

240239
std::string dest_key = AppendNamespacePrefix(dest_user_key);
241-
LockGuard guard(storage_->GetLockManager(), dest_key);
242240
std::vector<std::string> registers;
243241
HyperLogLogMetadata metadata;
244242

0 commit comments

Comments
 (0)