Skip to content

Commit

Permalink
add PacificA consistency test cases and fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
buzhimingyonghu committed Feb 17, 2025
1 parent 05cf8af commit bee8886
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 25 deletions.
24 changes: 12 additions & 12 deletions src/pika_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,20 @@ Status Context::Init() {

void Context::UpdateAppliedIndex(const LogOffset& offset) {
std::lock_guard l(rwlock_);
LogOffset cur_offset;
applied_win_.Update(SyncWinItem(offset), SyncWinItem(offset), &cur_offset);
if (cur_offset > applied_index_) {
applied_index_ = cur_offset;
StableSave();
}
applied_index_ = offset;
StableSave();
// LogOffset cur_offset;
// applied_win_.Update(SyncWinItem(offset), SyncWinItem(offset), &cur_offset);
// if (cur_offset > applied_index_) {
// applied_index_ = cur_offset;
// StableSave();
// }
}

void Context::Reset(const LogOffset& offset) {
std::lock_guard l(rwlock_);
applied_index_ = offset;
applied_win_.Reset();
//applied_win_.Reset();
StableSave();
}

Expand Down Expand Up @@ -381,9 +383,9 @@ Status ConsensusCoordinator::UpdateSlave(const std::string& ip, int port, const
std::lock_guard l(slave_ptr->slave_mu);
slave_ptr->acked_offset = end;
sync_pros_.AddMatchIndex(ip, port, slave_ptr->acked_offset);
LOG(INFO) << "PacificA slave ip: " << ip << ", port :" << port << "slave acked_offset "
LOG(INFO) << "PacificA slave ip: " << ip << ", port :" << port << ",slave acked_offset "
<< slave_ptr->acked_offset.ToString();
if (slave_ptr->acked_offset >= slave_ptr->target_offset) {
if (slave_ptr->slave_state != kSlaveBinlogSync && slave_ptr->acked_offset >= slave_ptr->target_offset) {
slave_ptr->slave_state = kSlaveBinlogSync;
LOG(INFO) << "PacificA change slave_state kSlaveBinlogSync acked_offset: " << slave_ptr->acked_offset.ToString()
<< ", target_offset: " << slave_ptr->target_offset.ToString();
Expand Down Expand Up @@ -882,11 +884,9 @@ Status ConsensusCoordinator::AppendSlaveEntries(const std::shared_ptr<Cmd>& cmd_
Status ConsensusCoordinator::CommitAppLog(const LogOffset& master_committed_id) {
int index = logs_->FindOffset(logs_->FirstOffset());
int log_size = logs_->Size(); // Cache log size
LOG(INFO) << "PacificA CommitAppLog master_committed_id index: " << index << " log_size: " << log_size
<< " , m_offset: " << master_committed_id.ToString();
for (int i = index; i < log_size; ++i) {
Log::LogItem log = logs_->At(i);
if (log.offset >= master_committed_id) {
if (master_committed_id >= log.offset) {
LOG(INFO) << "PacificA master_committed_id: " << master_committed_id.ToString()
<< ", ApplyLog: " << log.offset.ToString();
ApplyBinlog(log.cmd_ptr);
Expand Down
17 changes: 8 additions & 9 deletions src/pika_repl_bgworker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,14 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
slave_db->SetReplState(ReplState::kTryConnect);
return;
}

if(db->GetISConsistency()){
const InnerMessage::BinlogOffset& committed_id = binlog_res.committed_id();
LogOffset master_committed_id(BinlogOffset(committed_id.filenum(),committed_id.offset()),LogicOffset(committed_id.term(),committed_id.index()));
Status s= db->CommitAppLog(master_committed_id);
if(!s.ok()){
return;
}
}
// empty binlog treated as keepalive packet
if (binlog_res.binlog().empty()) {
continue;
Expand All @@ -156,14 +163,6 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
LOG(WARNING) << "DB " << worker->db_name_ << " Not Found";
return;
}
if(db->GetISConsistency()){
const InnerMessage::BinlogOffset& committed_id = binlog_res.committed_id();
LogOffset master_committed_id(BinlogOffset(committed_id.filenum(),committed_id.offset()),LogicOffset(committed_id.term(),committed_id.index()));
Status s= db->CommitAppLog(master_committed_id);
if(!s.ok()){
return;
}
}
}

if (only_keepalive) {
Expand Down
4 changes: 2 additions & 2 deletions src/pika_repl_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) {
db->Logger()->GetProducerStatus(&boffset.filenum, &boffset.offset);
slave_db->SetMasterSessionId(session_id);
LogOffset offset(boffset, logic_last_offset);
LOG(INFO)<<"PacificA first binlog stable offset : "<< offset.ToString();
LOG(INFO)<<"PacificA slave first binlog stable offset : "<< offset.ToString();
if(db->GetISConsistency()){
if (try_sync_response.has_prepared_id()){
const InnerMessage::BinlogOffset& prepared_id = try_sync_response.prepared_id();
Expand All @@ -235,7 +235,7 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) {
if(master_prepared_id < db->GetPreparedId()){
if(master_prepared_id < db->GetCommittedId()){
slave_db->SetReplState(ReplState::kError);
LOG(WARNING) << "DB: " << db_name << " master committedId > slave committedId";
LOG(WARNING) << "DB: " << db_name << " master preparedId < slave committedId error";
return;
}
db->SetPreparedId(master_prepared_id);
Expand Down
6 changes: 4 additions & 2 deletions src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ Status SyncMasterDB::WakeUpSlaveBinlogSync() {
std::lock_guard l(slave_ptr->slave_mu);
if (slave_ptr->sent_offset == slave_ptr->acked_offset) {
Status s;
if (coordinator_.GetISConsistency()) {
if (coordinator_.GetISConsistency()&&(slave_ptr->slave_state == SlaveState::kSlaveBinlogSync||slave_ptr->slave_state == SlaveState::KCandidate)) {
s = coordinator_.SendBinlog(slave_ptr, db_info_.db_name_);
} else {
s = ReadBinlogFileToWq(slave_ptr);
Expand Down Expand Up @@ -447,7 +447,9 @@ Status SyncMasterDB::AppendCandidateBinlog(const std::string& ip, int port, cons
}else {
slave_ptr->slave_state = KCandidate;
}
LOG(INFO)<<"PacificA first binlog slave_state: "<< slave_ptr->slave_state;
if(slave_ptr->slave_state == KCandidate){
LOG(INFO)<<"PacificA first binlog slave_state is Candidate";
}
slave_ptr->sent_offset = offset;
slave_ptr->acked_offset = offset;
slave_ptr->target_offset =GetPreparedId();
Expand Down

0 comments on commit bee8886

Please sign in to comment.