Skip to content

Commit

Permalink
fix: fix replication v1.14 (#2537)
Browse files Browse the repository at this point in the history
  • Loading branch information
dranikpg authored Feb 4, 2024
1 parent 32bdcd1 commit fd1d0bc
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 3 deletions.
1 change: 1 addition & 0 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1849,6 +1849,7 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
cntx->transaction = stub_tx.get();

result = interpreter->RunFunction(eval_args.sha, &error);
cntx->transaction->FIX_ConcludeJournalExec(); // flush journal

cntx->transaction = tx;
return OpStatus::OK;
Expand Down
2 changes: 1 addition & 1 deletion src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1203,7 +1203,7 @@ auto DflyShardReplica::TransactionReader::NextTxData(JournalReader* reader, Cont

// Otherwise, continue building multi command.
DCHECK(res->opcode == journal::Op::MULTI_COMMAND || res->opcode == journal::Op::EXEC);
DCHECK(res->txid > 0);
DCHECK(res->txid > 0 || res->shard_cnt == 1);

auto txid = res->txid;
auto& txdata = current_[txid];
Expand Down
21 changes: 19 additions & 2 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ Transaction::Transaction(const Transaction* parent, ShardId shard_id, std::optio
// Use squashing mechanism for inline execution of single-shard EVAL
multi_->mode = LOCK_AHEAD;
}

multi_->role = SQUASHED_STUB;
multi_->shard_journal_write.resize(1);

time_now_ms_ = parent->time_now_ms_;

Expand Down Expand Up @@ -972,6 +974,16 @@ void Transaction::IterateMultiLocks(ShardId sid, std::function<void(const std::s
}
}

void Transaction::FIX_ConcludeJournalExec() {
if (!multi_->shard_journal_write.front())
return;

if (auto journal = EngineShard::tlocal()->journal(); journal != nullptr) {
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, 1,
unique_slot_checker_.GetUniqueSlotId(), {}, false);
}
}

void Transaction::EnableShard(ShardId sid) {
unique_shard_cnt_ = 1;
unique_shard_id_ = sid;
Expand Down Expand Up @@ -1506,8 +1518,13 @@ void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&
bool allow_await) const {
auto journal = shard->journal();
CHECK(journal);
if (multi_ && multi_->role != SQUASHED_STUB)
multi_->shard_journal_write[shard->shard_id()] = true;

if (multi_) {
if (multi_->role != SQUASHED_STUB)
multi_->shard_journal_write[shard->shard_id()] = true;
else
multi_->shard_journal_write[0] = true;
}

bool is_multi = multi_commands || IsAtomicMulti();

Expand Down
3 changes: 3 additions & 0 deletions src/server/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,9 @@ class Transaction {

void IterateMultiLocks(ShardId sid, std::function<void(const std::string&)> cb) const;

// Send journal EXEC opcode after a series of MULTI commands on the currently active shard
void FIX_ConcludeJournalExec();

private:
// Holds number of locks for each IntentLock::Mode: shared and exlusive.
struct LockCnt {
Expand Down
32 changes: 32 additions & 0 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,38 @@ async def test_expiry(df_local_factory: DflyInstanceFactory, n_keys=1000):
assert all(v is None for v in res)


@dfly_args({"proactor_threads": 4})
async def test_simple_scripts(df_local_factory: DflyInstanceFactory):
master = df_local_factory.create()
replicas = [df_local_factory.create() for _ in range(2)]
df_local_factory.start_all([master] + replicas)

c_replicas = [replica.client() for replica in replicas]
c_master = master.client()

# Connect replicas and wait for sync to finish
for c_replica in c_replicas:
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
await check_all_replicas_finished([c_replica], c_master)

# Generate some scripts and run them
keys = ["a", "b", "c", "d", "e"]
for i in range(len(keys) + 1):
script = ""
subkeys = keys[:i]
for key in subkeys:
script += f"redis.call('INCR', '{key}')"
script += f"redis.call('INCR', '{key}')"

await c_master.eval(script, len(subkeys), *subkeys)

# Wait for replicas
await check_all_replicas_finished([c_replica], c_master)

for c_replica in c_replicas:
assert (await c_replica.mget(keys)) == ["10", "8", "6", "4", "2"]


"""
Test script replication.
Expand Down

0 comments on commit fd1d0bc

Please sign in to comment.