Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add PacificA data replication consistency scheme #2994

Open
wants to merge 17 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ src/build_version.cc
.cache

.idea/


#build
build/
buildtrees
Expand Down
25 changes: 14 additions & 11 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ slow-cmd-thread-pool-size : 1
admin-thread-pool-size : 2

# Slow cmd list e.g. hgetall, mset
slow-cmd-list :
slow-cmd-list :

# List of commands considered as administrative. These commands will be handled by the admin thread pool. Modify this list as needed.
# Default commands: info, ping, monitor
Expand Down Expand Up @@ -94,7 +94,7 @@ write-buffer-size : 256M
# If <= 0, a proper value is automatically calculated.
# (usually 1/8 of writer-buffer-size, rounded up to a multiple of 4KB)
# Supported Units [K|M|G], arena-block-size default unit is in [bytes].
arena-block-size :
arena-block-size :

# Timeout of Pika's connection, counting down starts When there are no requests
# on a connection (it enters sleep state), when the countdown reaches 0, the connection
Expand All @@ -108,12 +108,12 @@ timeout : 60
# [NOTICE] If this admin password is the same as user password (including both being empty),
# in this scenario, users are not subject to the restrictions imposed by the userblacklist.
# PS: "user password" refers to value of the parameter below: userpass.
requirepass :
requirepass :

# Password for replication verify, used for authentication when a slave
# connects to a master to request replication.
# [NOTICE] The value of this parameter must match the "requirepass" setting on the master.
masterauth :
masterauth :

# The [password of user], which is empty by default.
# [NOTICE] If this user password is the same as admin password (including both being empty),
Expand Down Expand Up @@ -154,7 +154,7 @@ consensus-level : 0

# The Prefix of dump file's name.
# All the files that generated by command "bgsave" will be name with this prefix.
dump-prefix :
dump-prefix :

# daemonize [yes | no].
#daemonize : yes
Expand Down Expand Up @@ -568,7 +568,7 @@ cache-num : 16
# cache-model 0:cache_none 1:cache_read
cache-model : 1
# cache-type: string, set, zset, list, hash, bit
cache-type: string, set, zset, list, hash, bit
cache-type : string, set, zset, list, hash, bit

# Maximum number of keys in the zset redis cache
# On the disk DB, a zset field may have many fields. In the memory cache, we limit the maximum
Expand Down Expand Up @@ -600,10 +600,10 @@ cache-maxmemory : 10737418240
cache-maxmemory-policy : 1

# cache-maxmemory-samples
cache-maxmemory-samples: 5
cache-maxmemory-samples : 5

# cache-lfu-decay-time
cache-lfu-decay-time: 1
cache-lfu-decay-time : 1


# is possible to manage access to Pub/Sub channels with ACL rules as well. The
Expand Down Expand Up @@ -657,12 +657,12 @@ cache-lfu-decay-time: 1
# 'internal-used-unfinished-full-sync' is used to generate a metric 'is_eligible_for_master_election'
# which serves for the scenario of codis-pika cluster reelection
# You'd better [DO NOT MODIFY IT UNLESS YOU KNOW WHAT YOU ARE DOING]
internal-used-unfinished-full-sync :
internal-used-unfinished-full-sync :

# for wash data from 4.0.0 to 4.0.1
# https://github.com/OpenAtomFoundation/pika/issues/2886
# default value: true
wash-data: true
wash-data : true

# Pika automatic compact compact strategy, a complement to rocksdb compact.
# Trigger the compact background task periodically according to `compact-interval`
Expand Down Expand Up @@ -696,4 +696,7 @@ dont-compact-sst-created-in-seconds : 20
# For OBD_Compact
# According to the number of sst files in rocksdb,
# compact every `compact-every-num-of-files` file.
best-delete-min-ratio : 10
best-delete-min-ratio : 10
# Generated by ReplicationID CONFIG REWRITE
replication-id : 6573021a3fdc3550d4e6a9bec5967726486d139b164b57b33d
run-id : ad00e48e2067e5a9d6e7893f83f83ab08cf5c86b
1 change: 1 addition & 0 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class SlaveofCmd : public Cmd {
private:
std::string master_ip_;
int64_t master_port_ = -1;
bool is_consistency_cmd_ = false;
bool is_none_ = false;
void DoInitial() override;
void Clear() override {
Expand Down
3 changes: 2 additions & 1 deletion include/pika_binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class Binlog : public pstd::noncopyable {
void Unlock() { mutex_.unlock(); }

pstd::Status Put(const std::string& item);
pstd::Status Put(const std::string& item, LogOffset *cur_logoffset,std::string& binlog);
pstd::Status IsOpened();
pstd::Status GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset, uint32_t* term = nullptr, uint64_t* logic_id = nullptr);
/*
Expand All @@ -78,7 +79,7 @@ class Binlog : public pstd::noncopyable {
void Close();

private:
pstd::Status Put(const char* item, int len);
pstd::Status Put(const char* item, int len,LogOffset *cur_logoffset = nullptr, bool is_consistency = false);
pstd::Status EmitPhysicalRecord(RecordType t, const char* ptr, size_t n, int* temp_pro_offset);
static pstd::Status AppendPadding(pstd::WritableFile* file, uint64_t* len);
void InitLogFile();
Expand Down
5 changes: 4 additions & 1 deletion include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ class CmdRes {
kTxnAbort,
kMultiKey,
kNoExists,
kConsistencyTimeout, // consistency time out
};

CmdRes() = default;
Expand Down Expand Up @@ -434,6 +435,8 @@ class CmdRes {
break;
case kNoExists:
return message_;
case kConsistencyTimeout:
return "-ERR consistency timeout\r\n";
default:
break;
}
Expand Down Expand Up @@ -586,7 +589,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
// enable copy, used default copy
// Cmd(const Cmd&);
void ProcessCommand(const HintKeys& hint_key = HintKeys());
void InternalProcessCommand(const HintKeys& hint_key);
void InternalProcessCommand(const HintKeys& hint_key,bool is_consistency = false);
void DoCommand(const HintKeys& hint_key);
bool DoReadCommandInCache();
void LogCommand() const;
Expand Down
114 changes: 105 additions & 9 deletions include/pika_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@

#include <utility>

#include "include/pika_define.h"
#include "pstd/include/env.h"
#include "include/pika_binlog_transverter.h"
#include "include/pika_client_conn.h"
#include "include/pika_define.h"
#include "include/pika_slave_node.h"
#include "include/pika_stable_log.h"
#include "pstd/include/env.h"

class Context : public pstd::noncopyable {
public:
Expand All @@ -25,7 +25,7 @@ class Context : public pstd::noncopyable {
void Reset(const LogOffset& offset);

std::shared_mutex rwlock_;
LogOffset applied_index_;
LogOffset applied_index_ = LogOffset();
SyncWindow applied_win_;

std::string ToString() {
Expand All @@ -50,21 +50,41 @@ class SyncProgress {
pstd::Status AddSlaveNode(const std::string& ip, int port, const std::string& db_name, int session_id);
pstd::Status RemoveSlaveNode(const std::string& ip, int port);
pstd::Status Update(const std::string& ip, int port, const LogOffset& start, const LogOffset& end,
LogOffset* committed_index);
LogOffset* committed_index);
int SlaveSize();
int SlaveBinlogStateSize() {
std::shared_lock l(rwlock_);
return slave_binlog_state_size;
}
void AddSlaveBinlogStateSize() {
std::lock_guard l(rwlock_);
slave_binlog_state_size++;
}
void SubSlaveBinlogStateSize() {
std::lock_guard l(rwlock_);
slave_binlog_state_size--;
}
void AddMatchIndex(const std::string& ip, int port, const LogOffset& offset) {
std::lock_guard l(rwlock_);
match_index_[ip + std::to_string(port)] = offset;
}

private:
std::shared_mutex rwlock_;
std::unordered_map<std::string, std::shared_ptr<SlaveNode>> slaves_;
std::unordered_map<std::string, LogOffset> match_index_;
int slave_binlog_state_size = 0;
};

class MemLog {
public:
struct LogItem {
LogItem(const LogOffset& _offset, std::shared_ptr<Cmd> _cmd_ptr, std::shared_ptr<PikaClientConn> _conn_ptr,
std::shared_ptr<std::string> _resp_ptr)
: offset(_offset), cmd_ptr(std::move(_cmd_ptr)), conn_ptr(std::move(_conn_ptr)), resp_ptr(std::move(_resp_ptr)) {}
: offset(_offset),
cmd_ptr(std::move(_cmd_ptr)),
conn_ptr(std::move(_conn_ptr)),
resp_ptr(std::move(_resp_ptr)) {}
LogOffset offset;
std::shared_ptr<Cmd> cmd_ptr;
std::shared_ptr<PikaClientConn> conn_ptr;
Expand Down Expand Up @@ -100,6 +120,34 @@ class MemLog {
LogOffset last_offset_;
};

class Log {
public:
struct LogItem {
LogItem(const LogOffset& _offset, std::shared_ptr<Cmd> _cmd_ptr, std::string _binlog)
: offset(_offset), cmd_ptr(std::move(_cmd_ptr)), binlog_(std::move(_binlog)) {}
LogOffset offset;
std::shared_ptr<Cmd> cmd_ptr;
std::string binlog_;
};

Log();
int Size();
void AppendLog(const LogItem& item);
LogOffset LastOffset();
LogOffset FirstOffset();
LogItem At(int index);
int FindOffset(const LogOffset& send_offset);
pstd::Status Truncate(const LogOffset& offset);
pstd::Status TruncateFrom(const LogOffset& offset);

private:
int FindLogIndex(const LogOffset& offset);
std::shared_mutex logs_mutex_;
std::vector<LogItem> logs_;
LogOffset last_index_;
LogOffset first_index_;
};

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider thread safety for the new Log class.

The Log class uses a std::shared_mutex named logs_mutex_, but some methods may not adequately protect shared resources. Review the methods to ensure proper locking and prevent data races.

For example, wrap accesses to logs_ with appropriate locks:

 int Size() {
-  // Existing code
+  std::shared_lock lock(logs_mutex_);
+  return logs_.size();
 }

 LogOffset LastOffset() {
-  // Existing code
+  std::shared_lock lock(logs_mutex_);
+  return last_index_;
 }

Ensure all public methods accessing shared data are thread-safe.

Committable suggestion skipped: line range outside the PR's diff.

class ConsensusCoordinator {
public:
ConsensusCoordinator(const std::string& db_name);
Expand Down Expand Up @@ -174,11 +222,11 @@ class ConsensusCoordinator {

pstd::Status GetBinlogOffset(const BinlogOffset& start_offset, LogOffset* log_offset);
pstd::Status GetBinlogOffset(const BinlogOffset& start_offset, const BinlogOffset& end_offset,
std::vector<LogOffset>* log_offset);
pstd::Status FindBinlogFileNum(const std::map<uint32_t, std::string>& binlogs, uint64_t target_index, uint32_t start_filenum,
uint32_t* founded_filenum);
std::vector<LogOffset>* log_offset);
pstd::Status FindBinlogFileNum(const std::map<uint32_t, std::string>& binlogs, uint64_t target_index,
uint32_t start_filenum, uint32_t* founded_filenum);
pstd::Status FindLogicOffsetBySearchingBinlog(const BinlogOffset& hint_offset, uint64_t target_index,
LogOffset* found_offset);
LogOffset* found_offset);
pstd::Status FindLogicOffset(const BinlogOffset& start_offset, uint64_t target_index, LogOffset* found_offset);
pstd::Status GetLogsBefore(const BinlogOffset& start_offset, std::vector<LogOffset>* hints);

Expand All @@ -199,5 +247,53 @@ class ConsensusCoordinator {
SyncProgress sync_pros_;
std::shared_ptr<StableLog> stable_logger_;
std::shared_ptr<MemLog> mem_logger_;

// pacificA
public:
void InitContext(){
context_->Init();
}
bool checkFinished(const LogOffset& offset);
pstd::Status AppendEntries(const std::shared_ptr<Cmd>& cmd_ptr, LogOffset& cur_logoffset);
void SetIsConsistency(bool is_consistency);
bool GetISConsistency();
pstd::Status SendBinlog(std::shared_ptr<SlaveNode> slave_ptr, std::string db_name);
pstd::Status Truncate(const LogOffset& offset);
pstd::Status AppendSlaveEntries(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute);
pstd::Status CommitAppLog(const LogOffset& master_committed_id);
pstd::Status UpdateCommittedID();
pstd::Status ApplyBinlog(const std::shared_ptr<Cmd>& cmd_ptr);
pstd::Status ProcessCoordination();

LogOffset GetCommittedId() {
std::lock_guard l(committed_id_rwlock_);
return committed_id_;
}
LogOffset GetPreparedId() {
std::lock_guard l(prepared_id__rwlock_);
return prepared_id_;
}
void SetPreparedId(const LogOffset& offset) {
std::lock_guard l(prepared_id__rwlock_);
prepared_id_ = offset;
}
void SetCommittedId(const LogOffset& offset) {
std::lock_guard l(committed_id_rwlock_);
committed_id_ = offset;
context_->UpdateAppliedIndex(committed_id_);
}

private:
pstd::Status PersistAppendBinlog(const std::shared_ptr<Cmd>& cmd_ptr, LogOffset& cur_offset);

private:
std::shared_mutex is_consistency_rwlock_;
bool is_consistency_ = false;
std::shared_mutex committed_id_rwlock_;
LogOffset committed_id_ = LogOffset();
std::shared_mutex prepared_id__rwlock_;
LogOffset prepared_id_ = LogOffset();
std::shared_ptr<Log> logs_;
};

#endif // INCLUDE_PIKA_CONSENSUS_H_
4 changes: 4 additions & 0 deletions include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ enum SlaveState {
kSlaveNotSync = 0,
kSlaveDbSync = 1,
kSlaveBinlogSync = 2,
KCandidate = 3,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Update SlaveStateMsg array to include KCandidate state.

The KCandidate state was added to the SlaveState enum, but the corresponding debug message array SlaveStateMsg needs to be updated.

Apply this diff:

-const std::string SlaveStateMsg[] = {"SlaveNotSync", "SlaveDbSync", "SlaveBinlogSync"};
+const std::string SlaveStateMsg[] = {"SlaveNotSync", "SlaveDbSync", "SlaveBinlogSync", "Candidate"};

Committable suggestion skipped: line range outside the PR's diff.

};

// debug only
Expand Down Expand Up @@ -274,9 +275,12 @@ class RmNode : public Node {
struct WriteTask {
struct RmNode rm_node_;
struct BinlogChip binlog_chip_;
LogOffset committed_id_ = LogOffset();
LogOffset prev_offset_;
WriteTask(const RmNode& rm_node, const BinlogChip& binlog_chip, const LogOffset& prev_offset)
: rm_node_(rm_node), binlog_chip_(binlog_chip), prev_offset_(prev_offset) {}
WriteTask(const RmNode& rm_node, const BinlogChip& binlog_chip, const LogOffset& prev_offset, const LogOffset& committed_id)
: rm_node_(rm_node), binlog_chip_(binlog_chip), prev_offset_(prev_offset), committed_id_(committed_id) {}
};

// slowlog define
Expand Down
24 changes: 24 additions & 0 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class SyncMasterDB : public SyncDB {
pstd::Status ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr);
pstd::Status ConsensusProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute);
LogOffset ConsensusCommittedIndex();

LogOffset ConsensusLastIndex();

std::shared_ptr<StableLog> StableLogger() { return coordinator_.StableLogger(); }
Expand All @@ -92,6 +93,27 @@ class SyncMasterDB : public SyncDB {
pstd::Mutex session_mu_;
int32_t session_id_ = 0;
ConsensusCoordinator coordinator_;

//pacificA public:
public:
void InitContext(){
coordinator_.InitContext();
}
bool checkFinished(const LogOffset& offset);
void SetConsistency(bool is_consistenct);
bool GetISConsistency();
pstd::Status ProcessCoordination();
void SetPreparedId(const LogOffset& offset);
void SetCommittedId(const LogOffset& offset);
LogOffset GetPreparedId();
LogOffset GetCommittedId();
pstd::Status AppendSlaveEntries(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute);
pstd::Status AppendCandidateBinlog(const std::string& ip, int port, const LogOffset& offset);
pstd::Status UpdateCommittedID();
pstd::Status CommitAppLog(const LogOffset& master_committed_id);
pstd::Status Truncate(const LogOffset& offset);


};

class SyncSlaveDB : public SyncDB {
Expand Down Expand Up @@ -191,6 +213,8 @@ class PikaReplicaManager {

std::shared_mutex& GetDBLock() { return dbs_rw_; }

void BuildBinlogOffset(const LogOffset& offset, InnerMessage::BinlogOffset* boffset);

void DBLock() {
dbs_rw_.lock();
}
Expand Down
Loading
Loading