Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add PacificA data replication consistency scheme #2994

Open
wants to merge 17 commits into
base: unstable
Choose a base branch
from

Conversation

buzhimingyonghu
Copy link
Contributor

@buzhimingyonghu buzhimingyonghu commented Jan 13, 2025

PacificA 协议概述

PacificA 协议简单来说分为两部分:

  1. 数据复制
  2. 配置管理

由于在 Pika 中,配置管理主要由 pika_sentinel 负责,本文主要关注通过主从模式的数据复制及其与 pika_sentinel 配合的协调过程。

在 Pika 中的应用

主要分为三个部分:

  1. PacificA 中主从模式的数据一致流程处理
  2. 分布式日志型存储系统的设计
  3. 故障恢复后的协调状态

启动 PacificA

在 Pika 中,建立普通主从连接的命令为:

slaveof <ip> <port>

如果需要启动 PacificA 协议,需要增加 strong 参数:

slaveof <ip> <port> strong

当从节点执行上述命令时,会触发 slaveofcmd,读取相关参数,并由 pika_server 保存这些信息,随后异步交由 PikaAuxiliaryThread 线程(以下简称 PAT)处理。
PAT 是 PacificA 协议中的核心辅助线程,负责:

- 状态机状态切换
- 主从之间的心跳发送及超时检查
- 主从之间的同步任务

PacificA 主从模式的数据一致流程

主从建立连接的四个阶段

  1. MetaSync:主从元数据的同步和检查
  2. TrySync:判断数据完整性,选择全量同步或增量同步
  3. Candidate:从节点作为候选者,追加完整的准备列表
  4. BinlogSync:正式加入集群,开始进行数据复制

image
下面是基本的数据结构:
image

MetaSync 阶段

image

从节点的 PAT 线程通过发送 MetaReq 请求与主节点建立连接,其中包含 is_consistency 字段,表示强一致性请求。
主节点收到请求后,若 consistency 标记为 true,则会:

  1. 设置所有数据库的 consistency 标记
  2. 初始化上下文
  3. 判断是否需要进入协调状态

image

随后,从节点收到主节点返回的 MetaSyncRes,并执行以下操作:

  1. 比较本地和主节点的数据库结构 (db_structs) 是否一致

  2. 如果本地 replication_id 与主节点不一致,且本地 replication_id 为空,则执行全量同步;否则进行增量同步

  3. 根据同步类型更新从节点的状态:

    • 全量同步:设置状态为 kTryDBSync
    • 增量同步:设置状态为 kTryConnect

TrySync 阶段

image

全量同步完成后,从节点更新自身的 committedID 和 preparedID,并发送 TrySyncReq 请求,携带 committedID 确认日志一致性。
主节点验证后,返回包含主节点 preparedID 的 TrySyncRes,从节点将 preparedID 与主节点对齐,完成增量同步。

流程总结:

  1. 从节点发送 TrySyncReq,带有 committedID。

  2. 主节点检查 committedID:

    • 若主节点 committedID 大于从节点,表示同步正常。
    • 若从节点 committedID 大于主节点,表示选主失败。
  3. 主节点返回 TrySyncRes,包含主节点的 preparedID,从节点需对齐。

BinlogSync 阶段

image

主节点收到从节点的第一次 binlog 请求后,将从节点设置为候选者状态,并追加日志。
主节点通过心跳包和 binlog 数据通知从节点,将日志分阶段写入本地:

  • 从节点收到 binlogSync 后,先写入 binlog,等待主节点通知哪些请求已提交。
  • 主节点收到所有从节点确认后,将请求标记为已提交,更新提交点,确保与所有从节点保持一致。

分布式日志型存储系统的设计

PacificA 中采用逻辑复制的方式,具体包括:

  1. 状态的一致性:所有副本逻辑上保持相同的状态,并可处理相同类型的更新和查询。
  2. 日志记录:系统在接收到更新请求时,首先将其写入日志中,确保即使系统故障也能通过日志恢复数据。
  3. 内存数据结构更新:日志记录完成后,将更新应用到内存中的数据结构中。
  4. 定期创建检查点:防止内存溢出,定期将数据快照保存到磁盘,形成持久化检查点。
  5. 日志截断:检查点创建后,删除已存储到检查点的日志,优化存储需求。

image

故障恢复后的协调状态

image

初始状态

  • A 是主节点,B、C 和 D 是副本节点。
  • committedB 是 committedA 的子集,committedA 是所有副本 prepared 的子集。

故障恢复

当主节点 A 故障时:

  1. 系统重新配置,将 B 提升为新主节点。
  2. B 完成协调后,新的 committedB 与旧的 preparedB 保持一致,所有副本的 preparedID 与新主节点的 preparedID 对齐。

一次写请求的操作流程

  1. 写 binlog:等待所有从节点追加日志后,执行写入数据库请求。
  2. 主节点处理 binlog 请求:
    • 若非一致性模式,按传统主从复制执行。
    • 若为一致性模式:
  3. coordinator_ 追加日志并记录 offset。
  4. 等待从节点同步,更新主节点的 committedID。
  5. 若同步失败(超时 10 秒),记录信息并退出。

image

PacificA 一致性测试用例说明

测试用例1:基础一致性测试

目的:验证主从复制的基本功能和数据一致性
步骤

  1. 向主节点写入数据
  2. 验证两个从节点的数据同步情况
  3. 检查所有节点的复制状态
    预期结果:所有节点数据完全一致,复制状态正常

测试用例2:并发写入一致性测试

目的:确保并发写入时的数据一致性
步骤

  1. 向主节点并发写入10条数据
  2. 等待数据同步完成
  3. 验证两个从节点的所有数据
    预期结果:所有并发写入的数据都正确同步到从节点

测试用例3:网络分区恢复测试

目的:测试网络分区后的一致性恢复
步骤

  1. 写入初始数据
  2. 断开从节点1连接(模拟网络分区)
  3. 向主节点写入新数据
  4. 恢复从节点1连接
  5. 验证数据一致性
    预期结果:网络恢复后,断开的从节点应同步所有错过的数据

测试用例4:动态节点添加测试

目的:验证新增节点时的数据一致性
步骤

  1. 初始只启动主节点和一个从节点
  2. 写入一批初始数据
  3. 添加第二个从节点
  4. 写入新的数据
  5. 验证新旧数据的一致性
    预期结果:新加入的从节点应正确接收所有历史数据和新数据

测试用例5:节点故障恢复测试

目的:测试节点故障和恢复时的系统行为
步骤

  1. 向所有节点写入初始数据
  2. 模拟从节点1故障
  3. 故障期间写入数据
  4. 恢复从节点1
  5. 写入新的数据
  6. 验证所有数据集
    预期结果
  • 节点故障期间系统继续正常运行
  • 故障节点恢复后能同步所有错过的数据
  • 所有节点最终达到数据一致

测试环境

  • 1个主节点(端口:9301)
  • 2个从节点(端口:9302,9303)
  • 启用强一致性模式

注意事项

  1. 每个测试用例都包含足够的等待时间,确保数据同步完成
  2. 所有测试都在强一致性模式下进行
  3. 测试过程中会验证数据的完整性和一致性

a2f44de1-49a6-4292-9a77-7c34ca479355

Summary by CodeRabbit

Summary by CodeRabbit

  • New Features
    • Introduced a "strong" command option that activates enhanced consistency mode during replication.
    • Expanded log synchronization capabilities with additional parameters for tracking committed and prepared states.
    • Enhanced error reporting with a dedicated consistency timeout response.
    • Upgraded replication protocols by adding new fields to support state and offset tracking.
    • Improved server role and state management to optimize master-slave transitions.
    • Added methods for managing consistency state and log entries in the replication framework.
    • Introduced new member variables to track log offsets and consistency states in various classes.
    • Added integration tests for verifying consistency in master-slave replication scenarios.
    • Automated cleanup script for Pika and PacificA instances.
    • Enhanced replication consistency checks and error handling in various components.
    • Added new constants for PacificA consistency testing configuration.
    • Enhanced acknowledgment handling during binlog synchronization.
    • Updated methods for handling prepared IDs and consistency checks in replication processes.

Copy link

coderabbitai bot commented Jan 13, 2025

Walkthrough

This pull request refines the system’s consistency management and log offset handling. It introduces new member variables, method overloads, and API modifications that extend internal state tracking for replication and synchronization. Several classes across administration, consensus, replication, and server components receive enhancements to support consistency flags, improved error handling, and more detailed log operations. Additionally, protocol buffers are updated to carry consistency-related identifiers. Overall, the changes add new methods and modify control flow to better manage consistency in distributed operations without altering the public interface.

Changes

File(s) Change Summary
include/pika_admin.h Added private member is_consistency_cmd_ in SlaveofCmd and updated the Clear method to reset related variables.
include/pika_binlog.h Added an overloaded Put method accepting a string, log offset pointer, and binlog output; updated existing Put signature with additional parameters (LogOffset* and bool is_consistency).
include/pika_command.h Added new enum value kConsistencyTimeout in CmdRes::CmdRet and updated the message() method to handle consistency timeout errors.
include/pika_consensus.h Introduced new Log class with log management methods; added multiple new public methods in ConsensusCoordinator for appending, truncating, and committing binlogs; and introduced new member variables for consistency and committed IDs.
include/pika_define.h Added enumerator KCandidate to the SlaveState enum and updated the WriteTask struct with a new member committed_id_ and an associated constructor.
include/pika_rm.h Added several new public methods in SyncMasterDB (e.g., ConsensusLastIndex(), SetConsistency(), AppendSlaveEntries(), etc.) and in PikaReplicaManager (added BuildBinlogOffset()), enhancing log offset handling and coordination functions.
include/pika_server.h Added new member variables last_role_ and is_consistency_; updated SetMaster signature to include a consistency flag; and added new methods (last_role(), IsConsistency(), SetConsistency()) to track server role and consistency state.
include/pika_slave_node.h Updated sent_offset and acked_offset to have default initializations and added a new member target_offset for improved offset tracking.
src/pika_admin.cc Extended SlaveofCmd command handling in DoInitial() to support a new "strong" argument and updated the SetMaster() call to include the consistency flag.
src/pika_binlog.cc Added a new Put method for handling string parameters with binlog encoding and updated the existing Put method to include additional parameters for log offsets and consistency checks, ensuring thread-safe operations with a DEFER mechanism.
src/pika_command.cc Modified InternalProcessCommand to change the control flow based on the server's consistency state—invoking DoBinlog() and DoCommand() in different orders and enhancing error handling for binlog timeouts in consistency mode.
src/pika_consensus.cc Added numerous new methods in ConsensusCoordinator (e.g., SetConsistency(), AppendEntries(), CommitAppLog(), etc.), enhanced log management via the new Log class, and implemented functions to append, truncate, and search log entries.
src/pika_db.cc Modified DB::TryUpdateMasterOffset to set and log the master database’s prepared and committed IDs after updating the master offset.
src/pika_inner_message.proto Extended the protocol buffer definitions by adding optional fields: is_consistency in MetaSync, committed_id in TrySync (request), and prepared_id plus committed_id in the corresponding response messages, thereby enhancing state tracking in synchronization messages.
src/pika_repl_bgworker.cc Added a new variable ack_end and updated binlog write handling to check for database consistency, conditionally committing the application log based on the consistency state.
src/pika_repl_client.cc Enhanced SendMetaSync() and SendTrySync() methods to include consistency checks and populate the committed_id field when the master database is in a consistent state.
src/pika_repl_client_conn.cc Modified HandleTrySyncResponse to check and update the database’s prepared ID against the incoming prepared_id, triggering log truncation if required under consistency conditions.
src/pika_repl_server.cc Updated BuildBinlogSyncResp to include the committed_id field in responses when the server is in a consistency state.
src/pika_repl_server_conn.cc Enhanced replication connection methods by incorporating consistency state checks in handling MetaSync, TrySync, and BinlogSync requests, and improved error responses when there is a mismatch in committed offsets.
src/pika_rm.cc Added new methods in SyncMasterDB (e.g., checkFinished(), SetConsistency(), UpdateCommittedID(), etc.) and in PikaReplicaManager (added BuildBinlogOffset()); refined logic in methods like SyncBinlogToWq, WakeUpSlaveBinlogSync, and ConsensusProposeLog to include consistency checks and timeout mechanisms.
src/pika_server.cc Extended PikaServer with a new method last_role(), updated SetMaster() to accept a consistency flag, and added IsConsistency() and SetConsistency() methods, along with updates in role management during master/slave transitions.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Cmd
    participant Server
    Client->>Cmd: Send command
    Cmd->>Server: Check IsConsistency()
    alt Consistent Mode
        Server-->>Cmd: returns true
        Cmd->>Cmd: Call DoBinlog()
        Cmd->>Cmd: Evaluate binlog result
        Cmd->>Cmd: Call DoCommand()
    else Inconsistent Mode
        Server-->>Cmd: returns false
        Cmd->>Cmd: Call DoCommand()
        Cmd->>Cmd: Call DoBinlog()
    end
    Cmd->>Client: Return response
Loading
sequenceDiagram
    participant AdminCmd
    participant SlaveofCmd
    participant PikaServer
    AdminCmd->>SlaveofCmd: Execute DoInitial("strong")
    SlaveofCmd->>SlaveofCmd: Set is_consistency_cmd_ = true
    SlaveofCmd->>PikaServer: Call SetMaster(master_ip, master_port, true)
    PikaServer->>PikaServer: Update is_consistency_ and role info
    PikaServer-->>SlaveofCmd: Acknowledge master setting
    SlaveofCmd->>AdminCmd: Return command response
Loading

Suggested reviewers

  • AlexStocks
  • Mixficsol
  • dingxiaoshuai123

Poem

I'm a coding bunny, hopping through the log,
Tracking offsets and flags like a frog.
Consistency rises—a flag set so true,
In every method, there's magic anew.
With hops and bytes, I celebrate the day,
A whimsical debug in a rabbit's ballet!
🐇💻 Hop on to the code display!


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fccadca and 70be8a2.

📒 Files selected for processing (2)
  • src/pika_consensus.cc (4 hunks)
  • src/pika_rm.cc (7 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: build_on_macos
  • GitHub Check: Analyze (go)
  • GitHub Check: build_on_centos
🔇 Additional comments (1)
src/pika_consensus.cc (1)

229-229: Initialize is_consistency_ member variable.

The is_consistency_ member variable is not explicitly initialized in the constructor.

✨ Finishing Touches
  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@github-actions github-actions bot added the ✏️ Feature New feature or request label Jan 13, 2025
@buzhimingyonghu
Copy link
Contributor Author

buzhimingyonghu commented Jan 13, 2025

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 18

🔭 Outside diff range comments (1)
src/pika_command.cc (1)

Line range hint 860-885: Inconsistent method calls after updating InternalProcessCommand signature

The method InternalProcessCommand now requires a bool is_consistency parameter, but the call in ProcessCommand does not include it, leading to a mismatch and potential compilation errors.

Apply this diff to update the method call:

 void Cmd::ProcessCommand(const HintKeys& hint_keys) {
   if (stage_ == kNone) {
-    InternalProcessCommand(hint_keys);
+    InternalProcessCommand(hint_keys, g_pika_server->IsConsistency());
   } else {
     if (stage_ == kBinlogStage) {
       DoBinlog();
     } else if (stage_ == kExecuteStage) {
       DoCommand(hint_keys);
     }
   }
 }

Ensure that g_pika_server->IsConsistency() correctly provides the consistency flag.

🧹 Nitpick comments (23)
src/pika_repl_bgworker.cc (2)

91-92: Initialize ack_end to avoid potential undefined behavior.

The variable ack_end is declared but not initialized. While it may be assigned later in the code, it's good practice to initialize variables upon declaration to prevent potential undefined behavior.

Apply this diff to initialize ack_end:

 LogOffset ack_start;
+ LogOffset ack_end = LogOffset();

214-218: Handle error codes consistently in function returns.

The function HandleWriteBinlog returns an int, but in the null check added, a -1 is returned. Ensure that all error codes are handled consistently according to the function's expected return values.

Apply this diff to return the appropriate error code:

 if (!db) {
   LOG(WARNING) << worker->db_name_ << " Not found.";
-  return -1;
+  return -2;  // Use a consistent error code as per project conventions
 }

Ensure that -2 (or the chosen error code) is documented and handled appropriately by the caller.

include/pika_consensus.h (2)

28-28: Prefer constructor initialization lists for member variables.

Initializing applied_index_ within the class definition is acceptable, but it's better practice to use the constructor's initialization list for consistent initialization, especially if the class grows in complexity.

Apply this diff to move the initialization to the constructor:

- LogOffset applied_index_ = LogOffset();
+ LogOffset applied_index_;
...
+ Context(std::string path)
+     : path_(std::move(path)), applied_index_(LogOffset()) {}

252-296: Ensure consistent naming conventions for member variables and methods.

Some member variables use a trailing underscore (e.g., is_consistency_), while others don't (e.g., prepared_id__rwlock_). Additionally, prepared_id__rwlock_ has a double underscore. For readability and maintainability, adhere to a consistent naming convention.

Apply this diff to correct the variable names:

- std::shared_mutex prepared_id__rwlock_;
+ std::shared_mutex prepared_id_rwlock_;

Also, review other member variables and methods for consistent naming.

src/pika_binlog.cc (2)

187-196: Possible redundant call to GetProducerStatus.

The method GetProducerStatus is called inside both Binlog::Put methods. Ensure that this is necessary and consider avoiding redundant calls to improve performance.

If GetProducerStatus is called earlier or its results are already known, consider removing the redundant call.


Line range hint 235-284: Improve error handling and code readability in Binlog::Put.

The method mixes logging, status checks, and business logic, which can make it hard to read and maintain. Consider refactoring the method to separate concerns and improve error handling.

Extract sections into helper methods or adjust the structure for clarity.

src/pika_repl_server_conn.cc (2)

40-42: Avoid redundant slave size check and potential inconsistency.

The variable slave_size is set but not used effectively in the subsequent code. Additionally, relying on slave_size at this point may not reflect the actual state due to concurrency.

Consider removing slave_size or using it appropriately to enhance code clarity:

- int slave_size = g_pika_server->slave_size();

If necessary, ensure thread-safe access to shared data.


234-252: Ensure consistent error handling and prevent code duplication.

The code within the consistency check duplicates logic that exists later in the method for seeking the binlog reader and handling errors.

Refactor the code to eliminate duplication:

  • Merge the consistency-specific logic with the existing binlog seek operation.
  • Consolidate error handling to a single block.

This enhances maintainability and reduces the risk of inconsistency between different code paths.

src/pika_consensus.cc (5)

343-344: Remove redundant log statements or refine log levels.

The log statements at lines 343-344 might be redundant or could be combined for clarity.

Consider refining the logging:

LOG(WARNING) << DBInfo(db_name_).ToString()
             << "Drop log from leader logic_id " << attribute.logic_id()
             << ", current last index " << last_index.l_offset.index;

This provides the same information in a single, concise log entry.


597-598: Avoid logging sensitive information.

The log statements at lines 597-598 include potentially sensitive details that might not be necessary for log output.

Review the logging content and consider reducing verbosity or masking sensitive data:

// Example: Log only essential information
LOG(INFO) << DBInfo(db_name_).ToString() << "Searching binlog with hint offset";

This enhances security and complies with best practices for logging.


646-647: Correct typographical error in log message.

There is a typo in the log message: "GetBInlogOffset res" should be "GetBinlogOffset res".

Correct the typo for clarity:

- LOG(INFO) << DBInfo(db_name_).ToString() << "GetBInlogOffset res: " << s.ToString() << " possible_offset "
+ LOG(INFO) << DBInfo(db_name_).ToString() << "GetBinlogOffset res: " << s.ToString() << " possible_offset "

795-806: Ensure consistent naming conventions for methods.

The methods SetIsConsistency and GetISConsistency have inconsistent naming, with one using Is and the other using IS.

Standardize the method names:

- void SetIsConsistency(bool is_consistency);
- bool GetISConsistency();
+ void SetIsConsistency(bool is_consistency);
+ bool GetIsConsistency();

This improves code readability and maintains consistent naming conventions.


815-837: Avoid potential memory leaks with binlog strings.

In the PersistAppendBinlog method, the binlog string is passed by value, which could be inefficient for large strings.

Pass binlog by reference to avoid unnecessary copying:

Status s = stable_logger_->Logger()->Put(content, &offset, &binlog);

This improves performance by preventing unnecessary memory allocations.

src/pika_rm.cc (1)

488-495: Potential busy-wait loop in ConsensusProposeLog

In the ConsensusProposeLog method, the loop may cause high CPU usage due to frequent checks without adequate waiting. Consider using a condition variable or increasing the sleep duration to prevent a busy-wait.

Apply this diff to improve the loop:

 while (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - start).count() < 10) {
   // Check if consensus has been achieved for the given log offset
   if (checkFinished(offset)) {
     return Status::OK(); 
   }
-  std::this_thread::sleep_for(std::chrono::milliseconds(50));
+  std::this_thread::sleep_for(std::chrono::milliseconds(200));
 }
src/pika_command.cc (1)

972-981: Improper formatting and missing braces in DoBinlog

In the DoBinlog method, the if-else statements lack braces {} around their code blocks, which can lead to errors or misunderstandings during maintenance.

Apply this diff to add braces and improve readability:

 if (g_pika_server->IsConsistency() && s.IsTimeout()) {
+  res().SetRes(CmdRes::kConsistencyTimeout, "Timeout waiting for consistency");
+  LOG(WARNING) << sync_db_->SyncDBInfo().ToString()
+               << " Slave node consistency timeout: " << s.ToString();
 } else {
+  LOG(WARNING) << sync_db_->SyncDBInfo().ToString()
+               << " Writing binlog failed, maybe no space left on device: " << s.ToString();
+  res().SetRes(CmdRes::kErrOther, s.ToString());
 }
src/pika_repl_server.cc (1)

25-25: Consider adding cleanup logic in destructor.

While the simplified destructor logs the exit, consider adding explicit cleanup for any resources that might need proper termination.

include/pika_rm.h (1)

216-216: Consider adding documentation for BuildBinlogOffset.

While the implementation looks correct, adding documentation would help explain the role of this method in the replication process.

src/pika_repl_client_conn.cc (1)

213-214: Consider enhancing log messages for better debugging.

While the logging is helpful, consider adding more context like node IDs or connection details to help with distributed debugging.

-  LOG(INFO)<<"PacificA master TrySync Response master_prepared_id: "<<master_prepared_id.ToString();
-  LOG(INFO)<<"PacificA slave cur_prepared_id: "<<db->GetPreparedId().ToString();
+  LOG(INFO)<<"PacificA [node="<<g_pika_server->host()<<":"<<g_pika_server->port()<<"] master TrySync Response master_prepared_id: "<<master_prepared_id.ToString();
+  LOG(INFO)<<"PacificA [node="<<g_pika_server->host()<<":"<<g_pika_server->port()<<"] slave cur_prepared_id: "<<db->GetPreparedId().ToString();
include/pika_server.h (2)

156-159: Consider thread safety implications.

While the method correctly uses a lock guard, consider documenting the thread safety guarantees in the method comment.

+  /**
+   * @brief Get the current number of connected slaves
+   * @thread_safety Thread-safe through slave_mutex_ protection
+   * @return The number of connected slaves
+   */
   int slave_size(){
     std::lock_guard l(slave_mutex_);
     return slaves_.size();
   }

581-581: Consider initialization in constructor.

The is_consistency_ member should be explicitly initialized in the constructor to maintain consistency with other member initializations.

src/pika_db.cc (1)

529-532: LGTM: Proper consistency state update after DB sync.

The code correctly:

  1. Creates a master offset combining binlog and logic offsets
  2. Updates both prepared and committed IDs
  3. Logs the state change for debugging

Consider enhancing the log message to include the DB name for better context in multi-DB setups.

-  LOG(INFO)<<"PacificA write DB finished slave_comitted: "<<master_db->GetCommittedId().ToString()<<" prepared: "<<master_db->GetPreparedId().ToString();
+  LOG(INFO)<<"PacificA write DB finished [db="<<db_name_<<"] slave_comitted: "<<master_db->GetCommittedId().ToString()<<" prepared: "<<master_db->GetPreparedId().ToString();
include/pika_command.h (1)

592-592: Consider documenting the consistency parameter.

While the signature change is correct, consider adding documentation for the is_consistency parameter to clarify its usage.

-  void InternalProcessCommand(const HintKeys& hint_key,bool is_consistency = false);
+  /**
+   * @brief Process a command internally
+   * @param hint_key The hint keys for command routing
+   * @param is_consistency If true, ensures strong consistency through PacificA protocol
+   */
+  void InternalProcessCommand(const HintKeys& hint_key, bool is_consistency = false);
src/pika_inner_message.proto (1)

152-152: Add committed_id to BinlogSync response

The committed_id field is added to track committed transaction states in the BinlogSync response.

Consider documenting the relationship between prepared_id and committed_id in comments, as they form part of the transaction state management system.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f0704c3 and 21e1e9e.

📒 Files selected for processing (23)
  • .gitignore (0 hunks)
  • conf/pika.conf (8 hunks)
  • include/pika_admin.h (1 hunks)
  • include/pika_binlog.h (2 hunks)
  • include/pika_command.h (3 hunks)
  • include/pika_consensus.h (6 hunks)
  • include/pika_define.h (2 hunks)
  • include/pika_rm.h (3 hunks)
  • include/pika_server.h (5 hunks)
  • include/pika_slave_node.h (1 hunks)
  • src/pika_admin.cc (4 hunks)
  • src/pika_binlog.cc (3 hunks)
  • src/pika_command.cc (3 hunks)
  • src/pika_consensus.cc (15 hunks)
  • src/pika_db.cc (1 hunks)
  • src/pika_inner_message.proto (3 hunks)
  • src/pika_repl_bgworker.cc (3 hunks)
  • src/pika_repl_client.cc (3 hunks)
  • src/pika_repl_client_conn.cc (3 hunks)
  • src/pika_repl_server.cc (2 hunks)
  • src/pika_repl_server_conn.cc (7 hunks)
  • src/pika_rm.cc (7 hunks)
  • src/pika_server.cc (6 hunks)
💤 Files with no reviewable changes (1)
  • .gitignore
✅ Files skipped from review due to trivial changes (1)
  • conf/pika.conf
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: build_on_ubuntu
  • GitHub Check: build_on_macos
  • GitHub Check: Analyze (go)
  • GitHub Check: build_on_centos
🔇 Additional comments (27)
include/pika_consensus.h (1)

67-70: ⚠️ Potential issue

Thread-safety concern with match_index_ updates.

The AddMatchIndex method updates the match_index_ without a thread-safe guarantee if other methods access it concurrently. Ensure that access to match_index_ is properly synchronized.

Consider adding synchronization mechanisms when accessing match_index_:

 void AddMatchIndex(const std::string& ip, int port, const LogOffset& offset) {
-  std::lock_guard l(rwlock_);
+  std::unique_lock l(rwlock_);
   match_index_[ip + std::to_string(port)] = offset;
 }

Ensure that all accesses to match_index_ are protected by the same mutex.

Likely invalid or redundant comment.

src/pika_binlog.cc (1)

173-202: Check for binlog I/O errors consistently.

In the new Put method, if the Put operation fails, binlog_io_error_ is set to true. Ensure that this error state is handled appropriately elsewhere in the code to prevent unexpected behavior.

Please verify that binlog_io_error_ is checked after calling this method and that appropriate actions are taken when it's true.

src/pika_repl_server_conn.cc (1)

399-403: Adjust logic for slave binlog synchronization based on consistency state.

In the HandleBinlogSyncRequest method, when is_first_send is true, the code conditionally calls AppendCandidateBinlog or ActivateSlaveBinlogSync based on master_db->GetISConsistency(). Ensure that both methods handle the slave's synchronization state appropriately.

Confirm that AppendCandidateBinlog and ActivateSlaveBinlogSync correctly manage the slave's state and that there are no side effects when switching between consistency modes.

If needed, review the implementations:

This helps ensure that the correct synchronization process is followed.

✅ Verification successful

Verified: Slave binlog synchronization logic is correctly implemented

The implementations properly handle slave states:

  • AppendCandidateBinlog safely activates sync only when offset criteria are met
  • ActivateSlaveBinlogSync directly activates sync and updates offsets
  • Both methods maintain consistency guarantees appropriate for their respective modes
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

# Check the implementations of the methods
rg 'AppendCandidateBinlog' --files-with-matches -t cpp
rg 'ActivateSlaveBinlogSync' --files-with-matches -t cpp

Length of output: 306


Script:

#!/bin/bash
# Check the implementation of both methods in pika_rm.cc
echo "=== AppendCandidateBinlog implementation ==="
rg -A 10 "AppendCandidateBinlog.*\(" src/pika_rm.cc

echo -e "\n=== ActivateSlaveBinlogSync implementation ==="
rg -A 10 "ActivateSlaveBinlogSync.*\(" src/pika_rm.cc

Length of output: 1173

src/pika_consensus.cc (2)

872-916: Clarify loop logic in CommitAppLog method.

In the CommitAppLog method, the loop uses log_size captured before the loop. If logs_->TruncateFrom modifies logs_, this could lead to inconsistency.

Ensure that truncating logs within the loop does not affect iteration. Consider capturing logs_->Size() after potential modifications, or restructure the loop to account for dynamic size changes.

Additionally, verify that ApplyBinlog is correctly applying the logs and that SetCommittedId updates the state as intended.


472-472: Handle potential race conditions when truncating logs.

In the TruncateTo method, the code performs operations that might be affected by concurrent modifications to the logs. Ensure that necessary locks are held during truncation.

Check if adequate synchronization is in place to prevent race conditions:

// Ensure that stable_logger_ and mem_logger_ are properly locked
stable_logger_->Logger()->Lock();
// Truncate operations...
stable_logger_->Logger()->Unlock();

This helps maintain data integrity during truncation.

include/pika_binlog.h (2)

55-55: LGTM: New Put method signature enhances consistency tracking.

The new overload with LogOffset tracking and binlog output parameter aligns well with PacificA's requirements for strong consistency in replication.


82-82: Verify error handling for consistency-enabled operations.

The modified Put method adds consistency support, but we should ensure proper error handling when consistency checks fail.

Run this script to check error handling patterns:

✅ Verification successful

Error handling is properly implemented for consistency operations

The review revealed that the is_consistency parameter is used for metadata tracking (updating cur_logoffset) rather than validation. The method already implements comprehensive error handling that covers all operations, including proper propagation of errors through Status returns from file operations and produce calls.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for error handling patterns in consistency-related code
rg -A 3 "is_consistency.*true" 
rg -A 3 "Status::.*Error.*consistency"

Length of output: 282


Script:

#!/bin/bash
# Find Put method implementation
ast-grep --pattern 'Status Put(const char* $_, int $_, LogOffset* $_ = nullptr, bool $_ = false) {
  $$$
}'

# Search for any consistency-related error handling in the implementation file
rg -B 2 -A 5 "Put.*is_consistency" src/pika_binlog.cc

Length of output: 412


Script:

#!/bin/bash
# Get the complete Put method implementation with more context
rg -B 2 -A 30 "Status Binlog::Put.*is_consistency" src/pika_binlog.cc

Length of output: 1005


Script:

#!/bin/bash
# Get the complete Put method implementation with much more context
rg -B 2 -A 50 "Status Binlog::Put.*is_consistency" src/pika_binlog.cc

Length of output: 1513

src/pika_repl_server.cc (1)

104-105: LGTM: Proper handling of committed_id in sync response.

The addition of committed_id handling in BuildBinlogSyncResp is essential for maintaining consistency across replicas in the PacificA protocol.

include/pika_rm.h (2)

102-114: LGTM: Comprehensive implementation of PacificA protocol methods.

The added methods provide a complete implementation of PacificA's consistency requirements:

  • Log management through PreparedId and CommittedId
  • Consistency verification through checkFinished
  • Coordination handling through ProcessCoordination

96-101: Ensure thread safety for context initialization.

The InitContext method modifies coordinator state. Verify thread safety mechanisms are in place.

Run this script to check synchronization patterns:

✅ Verification successful

Thread safety is properly implemented for context initialization

The ConsensusCoordinator class already implements comprehensive thread safety using multiple shared mutexes to protect its internal state. The InitContext() implementation follows a simple delegation pattern that leverages these existing synchronization mechanisms.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check for mutex/lock usage around context initialization
rg -B 5 "InitContext.*\{" 
ast-grep --pattern 'lock_guard|unique_lock|shared_lock'

Length of output: 594


Script:

#!/bin/bash
# Check ConsensusCoordinator implementation and usage
rg -B 3 -A 3 "class ConsensusCoordinator"
rg "mutex|atomic" include/pika_consensus.h
rg "InitContext.*\{" -A 10 "src/*.cc"

Length of output: 752


Script:

#!/bin/bash
# Find implementation files and check InitContext
fd -e cc -e cpp
rg -B 2 -A 10 "InitContext.*\{" 

Length of output: 9003

include/pika_define.h (1)

278-283: LGTM: Enhanced WriteTask with consistency tracking.

The addition of committed_id and its constructor properly integrates with PacificA's consistency requirements.

src/pika_repl_client_conn.cc (3)

203-204: LGTM: Proper extraction and conversion of prepared_id.

The code correctly extracts the prepared_id from try_sync_response and creates a LogOffset object combining both binlog and logic offsets.


229-233: LGTM: Proper handling of consistency mode.

The code correctly handles the offset calculation based on consistency mode.


238-247: Verify error handling in consistency check.

The consistency check logic is critical for data integrity. While the implementation looks correct, consider:

  1. The error state might need more detailed logging
  2. The comment on line 245 is in Chinese and should be translated to English
   if(master_prepared_id<db->GetPreparedId()){
     if(master_prepared_id<db->GetCommittedId()){
       slave_db->SetReplState(ReplState::kError);
-      LOG(WARNING) << "DB: " << db_name << " master committedId > slave committedId";
+      LOG(WARNING) << "DB: " << db_name << " Consistency violation: master's committed ID (" 
+                  << master_prepared_id.ToString() << ") is less than slave's committed ID ("
+                  << db->GetCommittedId().ToString() << ")";
       return;
     }
     db->SetPreparedId(master_prepared_id);
-    // 向主的preparedid看齐,多余的裁剪掉
+    // Align with master's prepared ID by truncating excess data
     db->Truncate(master_prepared_id);
   }
✅ Verification successful

The review suggestions are valid and align with codebase patterns.

  1. The enhanced error logging provides better debugging context by including both master and slave IDs, consistent with error logging patterns in the codebase.
  2. The Chinese comment translation is accurate and maintains the original meaning while improving code readability.
🏁 Scripts executed

The following scripts were executed for the analysis:

Error: No shell scripts found


Script:

#!/bin/bash
# Check for ToString() method implementation
ast-grep --pattern 'ToString() {
  $$$
}'

# Look for similar logging patterns
rg "LOG\(WARNING\)" -A 2

# Look for Chinese comments
rg -P "[\p{Han}]+" --color always

# Find similar consistency checks
rg "master.*prepared.*id" -A 3

Length of output: 67323

include/pika_server.h (2)

93-93: LGTM: Role tracking enhancement.

The addition of last_role() method and last_role_ member variable properly tracks the server's previous role, which is essential for role transition management in the PacificA protocol.

Also applies to: 583-583


171-171: LGTM: Consistency control methods.

The addition of consistency-related methods and parameters aligns with the PacificA protocol requirements.

Also applies to: 184-185

include/pika_command.h (1)

337-339: LGTM: Proper error handling for consistency timeout.

The addition of kConsistencyTimeout and its corresponding error message properly handles consistency timeout scenarios in the PacificA protocol.

Also applies to: 438-439

include/pika_admin.h (1)

38-38: LGTM! Well-placed member variable for consistency tracking.

The new is_consistency_cmd_ member variable is appropriately initialized to false by default and correctly placed in the private section.

src/pika_server.cc (3)

272-275: LGTM! Thread-safe role tracking implementation.

The last_role() method is properly synchronized using a shared lock to safely access the role state.


692-704: LGTM! Enhanced SetMaster with consistency support.

The SetMaster method is updated to handle consistency mode while maintaining proper synchronization and role state management.


755-762: LGTM! Thread-safe consistency state management.

The IsConsistency and SetIsConsistency methods provide synchronized access to the consistency state.

src/pika_admin.cc (3)

99-99: Documentation update for new command option

The comment has been updated to include the new "strong" option for the slaveof command.


168-168: Propagate consistency flag to SetMaster

The SetMaster call is updated to include the is_consistency_cmd_ parameter, ensuring the consistency mode is properly propagated.

Let's verify the SetMaster method signature:

✅ Verification successful

SetMaster consistency flag propagation verified

The SetMaster method signature in pika_server.h correctly accepts the consistency flag parameter, matching the updated call site.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check SetMaster method declaration
rg "SetMaster.*bool.*consistency" include/

Length of output: 152


138-139: Implementation of strong consistency mode

The code adds support for strong consistency mode by setting is_consistency_cmd_ when the "strong" argument is provided.

Let's verify the initialization of is_consistency_cmd_:

✅ Verification successful

Implementation verified: Strong consistency mode is properly implemented

The is_consistency_cmd_ flag is correctly declared, initialized, and used throughout the codebase:

  • Properly declared as a private boolean member with default initialization to false
  • Correctly set when "strong" argument is provided
  • Appropriately used in server configuration via SetMaster
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check if is_consistency_cmd_ is properly initialized in the header file
rg -A 2 "class SlaveofCmd" include/

Length of output: 205


Script:

#!/bin/bash
# Get full SlaveofCmd class definition
ast-grep --pattern 'class SlaveofCmd : public Cmd {
  $$$
}'

# Find constructor implementation and usages
rg "is_consistency_cmd_" -A 2

Length of output: 1721

src/pika_inner_message.proto (3)

63-63: Add consistency flag to MetaSync message

The is_consistency boolean field is added to support strong consistency mode in metadata synchronization.


69-71: Update TrySync message structure

The TrySync message has been updated to:

  1. Use slot instead of db_name for better slot-based replication
  2. Add committed_id for tracking transaction state

138-138: Add prepared_id to TrySync response

The prepared_id field is added to track prepared transaction states in the TrySync response.

Comment on lines 156 to 161
std::shared_ptr<SyncMasterDB> db =g_pika_rm->GetSyncMasterDBByName(DBInfo(worker->db_name_));
if(db->GetISConsistency()){
Status s= db->CommitAppLog(master_committed_id);
if(!s.ok()){
return;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add null check for db to prevent null pointer dereference.

After retrieving the db instance with GetSyncMasterDBByName, ensure that db is not null before using it. This prevents potential crashes if the database is not found.

Apply this diff to add the null check:

 db = g_pika_rm->GetSyncMasterDBByName(DBInfo(worker->db_name_));
+ if (!db) {
+   LOG(WARNING) << "DB " << worker->db_name_ << " Not Found";
+   return;
+ }
 if (db->GetISConsistency()) {
   Status s = db->CommitAppLog(master_committed_id);
   if (!s.ok()) {
     return;
   }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
std::shared_ptr<SyncMasterDB> db =g_pika_rm->GetSyncMasterDBByName(DBInfo(worker->db_name_));
if(db->GetISConsistency()){
Status s= db->CommitAppLog(master_committed_id);
if(!s.ok()){
return;
}
std::shared_ptr<SyncMasterDB> db =g_pika_rm->GetSyncMasterDBByName(DBInfo(worker->db_name_));
if (!db) {
LOG(WARNING) << "DB " << worker->db_name_ << " Not Found";
return;
}
if(db->GetISConsistency()){
Status s= db->CommitAppLog(master_committed_id);
if(!s.ok()){
return;
}

Comment on lines 214 to 218
if(db->GetISConsistency()){
db->AppendSlaveEntries(c_ptr, worker->binlog_item_);
}else{
db->ConsensusProcessLeaderLog(c_ptr, worker->binlog_item_);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure db is valid before accessing its methods to prevent null pointer dereference.

The variable db might be null if the database is not found. Before calling db->GetISConsistency(), add a null check to prevent potential crashes.

Apply this diff to add the null check:

 if (!db) {
   LOG(WARNING) << worker->db_name_ << " Not found.";
+  return -1;
 }
 if (db->GetISConsistency()) {
   db->AppendSlaveEntries(c_ptr, worker->binlog_item_);
 } else {
   db->ConsensusProcessLeaderLog(c_ptr, worker->binlog_item_);
 }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines 154 to 161
const InnerMessage::BinlogOffset& committed_id = binlog_res.committed_id();
LogOffset master_committed_id(BinlogOffset(committed_id.filenum(),committed_id.offset()),LogicOffset(committed_id.term(),committed_id.index()));
std::shared_ptr<SyncMasterDB> db =g_pika_rm->GetSyncMasterDBByName(DBInfo(worker->db_name_));
if(db->GetISConsistency()){
Status s= db->CommitAppLog(master_committed_id);
if(!s.ok()){
return;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid variable shadowing by reusing the existing db variable.

The variable db is redeclared in line 156, which shadows the db declared earlier in the function. This can lead to confusion and potential errors. Consider reusing the existing db variable instead of redeclaring it.

Apply this diff to reuse the existing db variable:

- std::shared_ptr<SyncMasterDB> db = g_pika_rm->GetSyncMasterDBByName(DBInfo(worker->db_name_));
+ db = g_pika_rm->GetSyncMasterDBByName(DBInfo(worker->db_name_));

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines 123 to 150
class Log {
public:
struct LogItem {
LogItem(const LogOffset& _offset, std::shared_ptr<Cmd> _cmd_ptr, std::string _binlog)
: offset(_offset), cmd_ptr(std::move(_cmd_ptr)), binlog_(std::move(_binlog)) {}
LogOffset offset;
std::shared_ptr<Cmd> cmd_ptr;
std::string binlog_;
};

Log();
int Size();
void AppendLog(const LogItem& item);
LogOffset LastOffset();
LogOffset FirstOffset();
LogItem At(int index);
int FindOffset(const LogOffset& send_offset);
pstd::Status Truncate(const LogOffset& offset);
pstd::Status TruncateFrom(const LogOffset& offset);

private:
int FindLogIndex(const LogOffset& offset);
std::shared_mutex logs_mutex_;
std::vector<LogItem> logs_;
LogOffset last_index_;
LogOffset first_index_;
};

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider thread safety for the new Log class.

The Log class uses a std::shared_mutex named logs_mutex_, but some methods may not adequately protect shared resources. Review the methods to ensure proper locking and prevent data races.

For example, wrap accesses to logs_ with appropriate locks:

 int Size() {
-  // Existing code
+  std::shared_lock lock(logs_mutex_);
+  return logs_.size();
 }

 LogOffset LastOffset() {
-  // Existing code
+  std::shared_lock lock(logs_mutex_);
+  return last_index_;
 }

Ensure all public methods accessing shared data are thread-safe.

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +267 to +270
if(is_consistency && cur_logoffset != nullptr){
cur_logoffset->b_offset.filenum=version_->pro_num_;
cur_logoffset->b_offset.offset=version_->pro_offset_;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential null pointer dereference of cur_logoffset.

In the condition if(is_consistency && cur_logoffset != nullptr), if is_consistency is true but cur_logoffset is nullptr, the code will skip setting cur_logoffset, which might be unintended. Ensure that cur_logoffset is not null when is_consistency is true.

Apply this diff to enforce that cur_logoffset should not be nullptr when is_consistency is true:

 if (is_consistency) {
+  assert(cur_logoffset != nullptr);
   cur_logoffset->b_offset.filenum = version_->pro_num_;
   cur_logoffset->b_offset.offset = version_->pro_offset_;
 }

Alternatively, handle the case where cur_logoffset may be nullptr.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if(is_consistency && cur_logoffset != nullptr){
cur_logoffset->b_offset.filenum=version_->pro_num_;
cur_logoffset->b_offset.offset=version_->pro_offset_;
}
if(is_consistency) {
assert(cur_logoffset != nullptr);
cur_logoffset->b_offset.filenum=version_->pro_num_;
cur_logoffset->b_offset.offset=version_->pro_offset_;
}

Comment on lines +998 to +1009
Log::Log() = default;

int Log::Size() {
std::shared_lock lock(logs_mutex_);
return static_cast<int>(logs_.size());
}

void Log::AppendLog(const LogItem& item) {
std::lock_guard lock(logs_mutex_);
logs_.push_back(item);
last_index_ = item.offset;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Initialize first_index_ member variable in Log class.

In the Log constructor, first_index_ is not explicitly initialized, which might result in undefined behavior.

Initialize first_index_ in the constructor:

Log::Log() : first_index_(LogOffset()), last_index_(LogOffset()) {}

This ensures that both first_index_ and last_index_ have known initial states.

src/pika_rm.cc Outdated
Comment on lines 409 to 412
bool SyncMasterDB::GetISConsistency(){
coordinator_.GetISConsistency();
}
void SyncMasterDB::SetPreparedId(const LogOffset& offset){
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Missing return statement in GetISConsistency method

The method GetISConsistency is declared to return a bool, but it does not return any value. This can lead to undefined behavior.

Apply this diff to fix the issue:

 bool SyncMasterDB::GetISConsistency(){
-  coordinator_.GetISConsistency();
+  return coordinator_.GetISConsistency();
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
bool SyncMasterDB::GetISConsistency(){
coordinator_.GetISConsistency();
}
void SyncMasterDB::SetPreparedId(const LogOffset& offset){
bool SyncMasterDB::GetISConsistency(){
return coordinator_.GetISConsistency();
}
void SyncMasterDB::SetPreparedId(const LogOffset& offset){

src/pika_rm.cc Outdated
Comment on lines 406 to 408
void SyncMasterDB::SetConsistency(bool is_consistenct){
coordinator_.SetIsConsistency(is_consistenct);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Typo in parameter name is_consistenct in SetConsistency method

The parameter name is_consistenct is misspelled. It should be is_consistent.

Apply this diff to correct the typo:

 void SyncMasterDB::SetConsistency(bool is_consistenct){
-  coordinator_.SetIsConsistency(is_consistenct);
+  coordinator_.SetIsConsistency(is_consistent);
 }

Committable suggestion skipped: line range outside the PR's diff.

@@ -69,6 +69,7 @@ class SlaveNode : public RmNode {
SyncWindow sync_win;
LogOffset sent_offset;
LogOffset acked_offset;
LogOffset target_offset;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Uninitialized member variable target_offset in SlaveNode

The newly added member variable target_offset in the SlaveNode class should be properly initialized to prevent undefined behavior.

Apply this diff to initialize target_offset in the constructor:

 SlaveNode::SlaveNode(const std::string& ip, int port, const std::string& db_name, int session_id)
     : RmNode(ip, port, db_name, session_id) {
     ...
+    target_offset = LogOffset();
 }

Ensure that target_offset is initialized to a valid state.

Committable suggestion skipped: line range outside the PR's diff.

@@ -160,6 +160,7 @@ enum SlaveState {
kSlaveNotSync = 0,
kSlaveDbSync = 1,
kSlaveBinlogSync = 2,
KCandidate = 3,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Update SlaveStateMsg array to include KCandidate state.

The KCandidate state was added to the SlaveState enum, but the corresponding debug message array SlaveStateMsg needs to be updated.

Apply this diff:

-const std::string SlaveStateMsg[] = {"SlaveNotSync", "SlaveDbSync", "SlaveBinlogSync"};
+const std::string SlaveStateMsg[] = {"SlaveNotSync", "SlaveDbSync", "SlaveBinlogSync", "Candidate"};

Committable suggestion skipped: line range outside the PR's diff.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (7)
src/pika_command.cc (1)

873-885: Refactor duplicated code for duration measurement

The duration measurement code is duplicated in both branches of the if-else statement. To improve maintainability and reduce code duplication, consider moving the duration measurement outside the if-else block.

Apply this diff to refactor the code:

  if(g_pika_server->IsConsistency()){
    DoBinlog();
    if(res().ok()){
        DoCommand(hint_keys);
    }
  }else{
    DoCommand(hint_keys);
    DoBinlog();
  }
+ if (g_pika_conf->slowlog_slower_than() >= 0) {
+     do_duration_ += pstd::NowMicros() - start_us;
+ }
include/pika_consensus.h (1)

253-254: Ensure consistent naming for getter and setter methods

The method GetISConsistency() has inconsistent naming with its corresponding setter SetConsistency(bool is_consistency). Consider renaming it to GetConsistency() or IsConsistency() to align with naming conventions and improve readability.

src/pika_repl_client_conn.cc (1)

227-248: LGTM! PacificA consistency implementation looks solid.

The implementation correctly:

  • Validates prepared_id from master
  • Handles consistency checks
  • Implements proper error handling
  • Includes debug logging

However, consider adding more detailed error messages to help with debugging:

-          LOG(WARNING) << "DB: " << db_name << " master committedId > slave committedId";
+          LOG(WARNING) << "DB: " << db_name << " master committedId (" << master_prepared_id.ToString() 
+                      << ") is less than slave committedId (" << db->GetCommittedId().ToString() << ")";
include/pika_server.h (1)

567-569: Document the role tracking variables.

The role tracking variables need documentation to explain their purpose and valid states.

Add documentation above the variables:

  /*
   * Consistency and role management
   * is_consistency_: Indicates if the server is running in consistency mode
   * role_: Current role of the server (SINGLE/MASTER/SLAVE)
   * last_role_: Previous role of the server, used for role transition handling
   */
  bool is_consistency_ = false;
  int role_ = PIKA_ROLE_SINGLE;
  int last_role_ = PIKA_ROLE_SINGLE;
src/pika_consensus.cc (1)

796-803: Improve thread safety in consistency operations.

The consistency operations need better synchronization and documentation:

+/**
+ * Sets the consistency mode for the coordinator.
+ * Thread-safe operation protected by is_consistency_rwlock_.
+ */
 void ConsensusCoordinator::SetConsistency(bool is_consistency) {
   std::lock_guard l(is_consistency_rwlock_);
   is_consistency_ = is_consistency;
+  LOG(INFO) << "Consistency mode set to: " << is_consistency;
 }

+/**
+ * Gets the current consistency mode.
+ * Thread-safe operation protected by is_consistency_rwlock_.
+ * @return Current consistency mode
+ */
 bool ConsensusCoordinator::GetISConsistency() {
   std::shared_lock l(is_consistency_rwlock_);
   return is_consistency_;
 }
src/pika_rm.cc (1)

817-822: Improve error handling in UpdateSyncBinlogStatus.

The consistency update needs better error handling:

 if(db->GetISConsistency()){
   s = db->UpdateCommittedID();
   if (!s.ok()) {
+    LOG(WARNING) << "Failed to update committed ID: " << s.ToString();
     return s;
   }
 }
src/pika_server.cc (1)

754-761: Add thread safety documentation for consistency methods.

+/**
+ * Checks if the server is in consistency mode.
+ * Thread-safe operation protected by state_protector_.
+ * @return Current consistency mode
+ */
 bool PikaServer::IsConsistency() {
   std::shared_lock sp_l(state_protector_);
   return is_consistency_;
 }

+/**
+ * Sets the server's consistency mode.
+ * Thread-safe operation protected by state_protector_.
+ * @param is_consistency New consistency mode value
+ */
 void PikaServer::SetConsistency(bool is_consistency) {
   std::shared_lock sp_l(state_protector_);
   is_consistency_ = is_consistency;
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 21e1e9e and 2afafc0.

📒 Files selected for processing (15)
  • include/pika_command.h (2 hunks)
  • include/pika_consensus.h (4 hunks)
  • include/pika_define.h (2 hunks)
  • include/pika_server.h (12 hunks)
  • include/pika_slave_node.h (1 hunks)
  • src/pika_binlog.cc (3 hunks)
  • src/pika_command.cc (2 hunks)
  • src/pika_consensus.cc (19 hunks)
  • src/pika_repl_bgworker.cc (3 hunks)
  • src/pika_repl_client.cc (3 hunks)
  • src/pika_repl_client_conn.cc (2 hunks)
  • src/pika_repl_server.cc (1 hunks)
  • src/pika_repl_server_conn.cc (13 hunks)
  • src/pika_rm.cc (7 hunks)
  • src/pika_server.cc (30 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • include/pika_command.h
  • include/pika_slave_node.h
  • src/pika_repl_bgworker.cc
🧰 Additional context used
🪛 cppcheck (2.10-2)
src/pika_server.cc

[performance] 1639-1639: Prefer prefix ++/-- operators for non-primitive types.

(postfixOperator)


[performance] 1760-1760: Prefer prefix ++/-- operators for non-primitive types.

(postfixOperator)

⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: build_on_ubuntu
  • GitHub Check: build_on_centos
  • GitHub Check: Analyze (go)
  • GitHub Check: build_on_macos
🔇 Additional comments (15)
src/pika_command.cc (1)

972-981: LGTM!

The enhanced error handling logic for binlog write failures correctly differentiates between consistency timeouts and other errors, providing appropriate responses and logging.

src/pika_repl_server.cc (1)

106-109: LGTM!

The addition of the committed_id to the binlog synchronization response when the server is in consistency mode appropriately enhances consistency handling.

include/pika_consensus.h (1)

120-147: Consider thread safety for the new Log class

The Log class uses a pstd::Mutex named logs_mutex_, but some methods may not adequately protect shared resources. Please ensure that all methods accessing shared data are properly synchronized to prevent data races.

include/pika_define.h (3)

163-163: LGTM! The KCandidate state addition aligns with PacificA protocol.

The addition of KCandidate state to the SlaveState enum is essential for implementing the PacificA protocol's leader election mechanism.


167-167: LGTM! SlaveStateMsg array correctly updated.

The SlaveStateMsg array has been properly updated to include the "Candidate" string representation for the new KCandidate state.


278-278: LGTM! WriteTask enhancements support transaction tracking.

The addition of committed_id_ field and the new constructor in WriteTask struct properly support tracking committed transactions in the distributed log system, which is crucial for maintaining consistency in the PacificA protocol.

Also applies to: 282-283

src/pika_binlog.cc (2)

173-202: LGTM! New Put method properly implements log offset tracking.

The new Put method correctly handles:

  • Log offset tracking
  • Error handling
  • Lock management using DEFER
  • Binlog encoding with proper parameters

267-270: ⚠️ Potential issue

Add null pointer check for cur_logoffset when is_consistency is true.

When is_consistency is true, we should ensure cur_logoffset is not null before dereferencing it.

Apply this diff:

-    if(is_consistency && cur_logoffset != nullptr){
+    if(is_consistency) {
+      if (cur_logoffset == nullptr) {
+        return Status::InvalidArgument("cur_logoffset cannot be null when is_consistency is true");
+      }
       cur_logoffset->b_offset.filenum=version_->pro_num_;
       cur_logoffset->b_offset.offset=version_->pro_offset_;
     }

Likely invalid or redundant comment.

src/pika_repl_client.cc (1)

263-268: ⚠️ Potential issue

Add null pointer check for master_db.

The code assumes master_db is not null, which could lead to crashes.

Apply this diff:

   std::shared_ptr<SyncMasterDB> master_db =g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name));
+  if (!master_db) {
+    LOG(WARNING) << "Master DB not found for db_name: " << db_name;
+    return Status::NotFound("Master DB not found");
+  }
   if(master_db->GetISConsistency()){
     InnerMessage::BinlogOffset* committed_id = try_sync->mutable_committed_id();
     LogOffset master_committed_id = master_db->GetCommittedId();
     g_pika_rm->BuildBinlogOffset(master_committed_id,committed_id);
   }

Likely invalid or redundant comment.

include/pika_server.h (1)

118-122: LGTM! DB locking helpers improve thread safety.

The addition of DB locking helper methods improves code readability and ensures proper lock management.

src/pika_repl_server_conn.cc (3)

31-31: ⚠️ Potential issue

Validate the consistency flag before use.

Add a check to ensure the consistency flag is properly set in the request:

-bool is_consistency = meta_sync_request.is_consistency();
+bool is_consistency = meta_sync_request.has_is_consistency() ? meta_sync_request.is_consistency() : false;

Likely invalid or redundant comment.


228-261: 🛠️ Refactor suggestion

Improve committed ID validation and error handling.

The committed ID handling needs better validation and error messages:

 if(db->GetISConsistency()){
-  if(try_sync_request.has_committed_id()){
+  if(!try_sync_request.has_committed_id()) {
+    try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kError);
+    LOG(WARNING) << "Committed ID not provided in TrySync request from " << node.ip() << ":" << node.port();
+    return false;
+  }
   const InnerMessage::BinlogOffset& slave_committed_id = try_sync_request.committed_id();
   LogOffset committed_id(BinlogOffset(slave_committed_id.filenum(), slave_committed_id.offset()), 
                         LogicOffset(slave_committed_id.term(), slave_committed_id.index()));
   if (db->GetCommittedId() < committed_id) {
     try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kError);
-    LOG(WARNING) << "DB Name: " << db_name << "Slave CommittedId Greater than master";
+    LOG(WARNING) << "Slave committed ID (" << committed_id.ToString() 
+                 << ") is greater than master's (" << db->GetCommittedId().ToString() 
+                 << ") for DB: " << db_name;
     return false;
   }

Likely invalid or redundant comment.


54-68: 🛠️ Refactor suggestion

Enhance error handling in consistency processing.

The coordinator processing lacks proper error handling and logging. Consider:

 if (is_consistency) {
   auto master_dbs = g_pika_rm->GetSyncMasterDBs();
   g_pika_server->SetConsistency(is_consistency);
   for (auto& db : master_dbs) {
     if (slave_size == 0) {
       db.second->SetConsistency(is_consistency);
       db.second->InitContext();
+      LOG(INFO) << "Processing coordination for DB: " << db.first;
       Status s = db.second->ProcessCoordination();
       if (!s.ok()) {
         response.set_code(InnerMessage::kError);
-        response.set_reply("master ProcessCoordination error");
+        response.set_reply("Master ProcessCoordination error: " + s.ToString());
+        LOG(WARNING) << "ProcessCoordination failed for DB: " << db.first << ", Error: " << s.ToString();
+        break;  // Exit on first error
       }
     }
   }
 }

Likely invalid or redundant comment.

src/pika_consensus.cc (2)

220-226: ⚠️ Potential issue

Initialize member variables in constructor.

The constructor should initialize all member variables:

 ConsensusCoordinator::ConsensusCoordinator(const std::string& db_name) 
-    : db_name_(db_name) {
+    : db_name_(db_name)
+    , is_consistency_(false)
+    , committed_id_(LogOffset())
+    , prepared_id_(LogOffset()) {
   std::string db_log_path = g_pika_conf->log_path() + "log_" + db_name + "/";
   std::string log_path = db_log_path;
   context_ = std::make_shared<Context>(log_path + kContext);
   stable_logger_ = std::make_shared<StableLog>(db_name, log_path);
   mem_logger_ = std::make_shared<MemLog>();
   logs_ = std::make_shared<Log>();
 }

Likely invalid or redundant comment.


1000-1067: 🛠️ Refactor suggestion

Enhance thread safety and error handling in Log class.

The Log class needs better synchronization and error handling:

  1. Initialize member variables in constructor
  2. Add proper synchronization for critical operations
  3. Improve error handling
 Log::Log() = default;
+    : first_index_(LogOffset())
+    , last_index_(LogOffset()) {}

 void Log::AppendLog(const LogItem& item) {
   std::lock_guard lock(logs_mutex_);
   logs_.push_back(item);
   last_index_ = item.offset;
+  if (logs_.size() == 1) {
+    first_index_ = item.offset;
+  }
 }

 Status Log::Truncate(const LogOffset& offset) {
   std::lock_guard lock(logs_mutex_);
   int index = FindLogIndex(offset);
   if (index < 0) {
-    return Status::Corruption("Can't find correct index");
+    return Status::Corruption("Failed to find index for offset: " + offset.ToString());
   }
   last_index_ = logs_[index].offset;
   logs_.erase(logs_.begin() + index + 1, logs_.end());
   return Status::OK();
 }

Likely invalid or redundant comment.

Comment on lines +183 to +194
bool is_consistency = g_pika_server->IsConsistency();
meta_sync->set_is_consistency(is_consistency);
if(is_consistency){
auto master_dbs = g_pika_rm->GetSyncMasterDBs();
for (auto& db : master_dbs) {
if (g_pika_server->slaves_.size() == 0) {
db.second->SetConsistency(is_consistency);
db.second->InitContext();
Status s = db.second->ProcessCoordination();
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling for ProcessCoordination.

The ProcessCoordination call should include error handling.

Apply this diff:

       if (g_pika_server->slaves_.size() == 0) {
         db.second->SetConsistency(is_consistency);
         db.second->InitContext();
-        Status s = db.second->ProcessCoordination();
+        Status s = db.second->ProcessCoordination();
+        if (!s.ok()) {
+          LOG(WARNING) << "Failed to process coordination for DB: " << db.first 
+                      << ", error: " << s.ToString();
+          return s;
+        }
       }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
bool is_consistency = g_pika_server->IsConsistency();
meta_sync->set_is_consistency(is_consistency);
if(is_consistency){
auto master_dbs = g_pika_rm->GetSyncMasterDBs();
for (auto& db : master_dbs) {
if (g_pika_server->slaves_.size() == 0) {
db.second->SetConsistency(is_consistency);
db.second->InitContext();
Status s = db.second->ProcessCoordination();
}
}
}
bool is_consistency = g_pika_server->IsConsistency();
meta_sync->set_is_consistency(is_consistency);
if(is_consistency){
auto master_dbs = g_pika_rm->GetSyncMasterDBs();
for (auto& db : master_dbs) {
if (g_pika_server->slaves_.size() == 0) {
db.second->SetConsistency(is_consistency);
db.second->InitContext();
Status s = db.second->ProcessCoordination();
if (!s.ok()) {
LOG(WARNING) << "Failed to process coordination for DB: " << db.first
<< ", error: " << s.ToString();
return s;
}
}
}
}

Comment on lines +407 to +411
if (master_db->GetISConsistency()) {
Status s = master_db->AppendCandidateBinlog(node.ip(), node.port(), range_start);
} else {
Status s = master_db->ActivateSlaveBinlogSync(node.ip(), node.port(), range_start);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance error handling in binlog sync processing.

The binlog sync handling needs better error handling and status validation:

 if (master_db->GetISConsistency()) {
   Status s = master_db->AppendCandidateBinlog(node.ip(), node.port(), range_start);
+  if (!s.ok()) {
+    LOG(WARNING) << "Failed to append candidate binlog for " << node.ip() << ":" << node.port() 
+                 << ", Error: " << s.ToString();
+    return;
+  }
 } else {
   Status s = master_db->ActivateSlaveBinlogSync(node.ip(), node.port(), range_start);
+  if (!s.ok()) {
+    LOG(WARNING) << "Failed to activate slave binlog sync for " << node.ip() << ":" << node.port() 
+                 << ", Error: " << s.ToString();
+    return;
+  }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (master_db->GetISConsistency()) {
Status s = master_db->AppendCandidateBinlog(node.ip(), node.port(), range_start);
} else {
Status s = master_db->ActivateSlaveBinlogSync(node.ip(), node.port(), range_start);
}
if (master_db->GetISConsistency()) {
Status s = master_db->AppendCandidateBinlog(node.ip(), node.port(), range_start);
if (!s.ok()) {
LOG(WARNING) << "Failed to append candidate binlog for " << node.ip() << ":" << node.port()
<< ", Error: " << s.ToString();
return;
}
} else {
Status s = master_db->ActivateSlaveBinlogSync(node.ip(), node.port(), range_start);
if (!s.ok()) {
LOG(WARNING) << "Failed to activate slave binlog sync for " << node.ip() << ":" << node.port()
<< ", Error: " << s.ToString();
return;
}
}

Comment on lines 874 to 891
Status ConsensusCoordinator::CommitAppLog(const LogOffset& master_committed_id) {
int index = logs_->FindOffset(logs_->FirstOffset());
int log_size = logs_->Size(); // Cache log size
LOG(INFO) << "PacificA CommitAppLog master_committed_id index: " << index << " log_size: " << log_size
<< " , m_offset: " << master_committed_id.ToString();
for (int i = index; i < log_size; ++i) {
Log::LogItem log = logs_->At(i);
if (log.offset >= master_committed_id) {
LOG(INFO) << "PacificA master_committed_id: " << master_committed_id.ToString()
<< ", ApplyLog: " << log.offset.ToString();
ApplyBinlog(log.cmd_ptr);
}
}

logs_->TruncateFrom(master_committed_id); // Truncate logs
SetCommittedId(master_committed_id); // Update committed ID
return Status::OK();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance error handling in commit log operations.

The commit log operations need better error handling and logging:

 Status ConsensusCoordinator::CommitAppLog(const LogOffset& master_committed_id) {
+  if (master_committed_id == LogOffset()) {
+    return Status::InvalidArgument("Invalid master committed ID");
+  }
+
   int index = logs_->FindOffset(logs_->FirstOffset());
   int log_size = logs_->Size();
   LOG(INFO) << "PacificA CommitAppLog master_committed_id index: " << index 
             << " log_size: " << log_size
             << " , m_offset: " << master_committed_id.ToString();
+
+  if (index < 0) {
+    return Status::Corruption("Failed to find starting offset");
+  }
+
   for (int i = index; i < log_size; ++i) {
     Log::LogItem log = logs_->At(i);
     if (log.offset >= master_committed_id) {
       LOG(INFO) << "PacificA master_committed_id: " << master_committed_id.ToString()
                 << ", ApplyLog: " << log.offset.ToString();
-      ApplyBinlog(log.cmd_ptr);
+      Status s = ApplyBinlog(log.cmd_ptr);
+      if (!s.ok()) {
+        LOG(WARNING) << "Failed to apply binlog: " << s.ToString();
+        return s;
+      }
     }
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Status ConsensusCoordinator::CommitAppLog(const LogOffset& master_committed_id) {
int index = logs_->FindOffset(logs_->FirstOffset());
int log_size = logs_->Size(); // Cache log size
LOG(INFO) << "PacificA CommitAppLog master_committed_id index: " << index << " log_size: " << log_size
<< " , m_offset: " << master_committed_id.ToString();
for (int i = index; i < log_size; ++i) {
Log::LogItem log = logs_->At(i);
if (log.offset >= master_committed_id) {
LOG(INFO) << "PacificA master_committed_id: " << master_committed_id.ToString()
<< ", ApplyLog: " << log.offset.ToString();
ApplyBinlog(log.cmd_ptr);
}
}
logs_->TruncateFrom(master_committed_id); // Truncate logs
SetCommittedId(master_committed_id); // Update committed ID
return Status::OK();
}
Status ConsensusCoordinator::CommitAppLog(const LogOffset& master_committed_id) {
if (master_committed_id == LogOffset()) {
return Status::InvalidArgument("Invalid master committed ID");
}
int index = logs_->FindOffset(logs_->FirstOffset());
int log_size = logs_->Size(); // Cache log size
LOG(INFO) << "PacificA CommitAppLog master_committed_id index: " << index
<< " log_size: " << log_size
<< " , m_offset: " << master_committed_id.ToString();
if (index < 0) {
return Status::Corruption("Failed to find starting offset");
}
for (int i = index; i < log_size; ++i) {
Log::LogItem log = logs_->At(i);
if (log.offset >= master_committed_id) {
LOG(INFO) << "PacificA master_committed_id: " << master_committed_id.ToString()
<< ", ApplyLog: " << log.offset.ToString();
Status s = ApplyBinlog(log.cmd_ptr);
if (!s.ok()) {
LOG(WARNING) << "Failed to apply binlog: " << s.ToString();
return s;
}
}
}
logs_->TruncateFrom(master_committed_id); // Truncate logs
SetCommittedId(master_committed_id); // Update committed ID
return Status::OK();
}

Comment on lines +690 to +703
bool PikaServer::SetMaster(std::string& master_ip, int master_port, bool is_consistency) {
if (master_ip == "127.0.0.1") {
master_ip = host_;
}
std::lock_guard l(state_protector_);
if (((role_ ^ PIKA_ROLE_SLAVE) != 0) && repl_state_ == PIKA_REPL_NO_CONNECT) {
master_ip_ = master_ip;
master_port_ = master_port;
last_role_ = role_;
role_ |= PIKA_ROLE_SLAVE;
repl_state_ = PIKA_REPL_SHOULD_META_SYNC;
is_consistency_ = is_consistency;
LOG(INFO) << "Slavecmd Master, ip_port: " << master_ip_ << ":" << master_port_
<< " consistency: " << is_consistency_;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve SetMaster method with better validation.

The SetMaster method needs better validation and error handling:

-bool PikaServer::SetMaster(std::string& master_ip, int master_port, bool is_consistency) {
+bool PikaServer::SetMaster(std::string& master_ip, int master_port, bool is_consistency) {
+  if (master_port <= 0 || master_port > 65535) {
+    LOG(WARNING) << "Invalid master port: " << master_port;
+    return false;
+  }
+
   if (master_ip == "127.0.0.1") {
     master_ip = host_;
   }
+
   std::lock_guard l(state_protector_);
   if (((role_ ^ PIKA_ROLE_SLAVE) != 0) && repl_state_ == PIKA_REPL_NO_CONNECT) {
     master_ip_ = master_ip;
     master_port_ = master_port;
     last_role_ = role_;
     role_ |= PIKA_ROLE_SLAVE;
     repl_state_ = PIKA_REPL_SHOULD_META_SYNC;
     is_consistency_ = is_consistency;
     LOG(INFO) << "Slavecmd Master, ip_port: " << master_ip_ << ":" << master_port_
               << " consistency: " << is_consistency_;
     return true;
   }
   return false;
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
bool PikaServer::SetMaster(std::string& master_ip, int master_port, bool is_consistency) {
if (master_ip == "127.0.0.1") {
master_ip = host_;
}
std::lock_guard l(state_protector_);
if (((role_ ^ PIKA_ROLE_SLAVE) != 0) && repl_state_ == PIKA_REPL_NO_CONNECT) {
master_ip_ = master_ip;
master_port_ = master_port;
last_role_ = role_;
role_ |= PIKA_ROLE_SLAVE;
repl_state_ = PIKA_REPL_SHOULD_META_SYNC;
is_consistency_ = is_consistency;
LOG(INFO) << "Slavecmd Master, ip_port: " << master_ip_ << ":" << master_port_
<< " consistency: " << is_consistency_;
bool PikaServer::SetMaster(std::string& master_ip, int master_port, bool is_consistency) {
if (master_port <= 0 || master_port > 65535) {
LOG(WARNING) << "Invalid master port: " << master_port;
return false;
}
if (master_ip == "127.0.0.1") {
master_ip = host_;
}
std::lock_guard l(state_protector_);
if (((role_ ^ PIKA_ROLE_SLAVE) != 0) && repl_state_ == PIKA_REPL_NO_CONNECT) {
master_ip_ = master_ip;
master_port_ = master_port;
last_role_ = role_;
role_ |= PIKA_ROLE_SLAVE;
repl_state_ = PIKA_REPL_SHOULD_META_SYNC;
is_consistency_ = is_consistency;
LOG(INFO) << "Slavecmd Master, ip_port: " << master_ip_ << ":" << master_port_
<< " consistency: " << is_consistency_;
return true;
}
return false;
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
include/pika_consensus.h (3)

120-146: Add documentation for the Log class methods

The Log class implementation looks solid, but lacks documentation. Consider adding:

  • Method descriptions
  • Parameter explanations
  • Return value descriptions
  • Thread safety guarantees

Example documentation format:

/**
 * Appends a new log entry to the log.
 * @param item The log item to append
 * @thread-safety Thread-safe through logs_mutex_
 */
void AppendLog(const LogItem& item);

248-262: Consider adding error handling for edge cases

The PacificA-related methods should handle potential edge cases:

  • Network failures during coordination
  • Timeouts during consistency checks
  • Recovery from partial failures

Consider adding error handling:

pstd::Status ProcessCoordination() {
  try {
    // ... existing code ...
  } catch (const std::exception& e) {
    return pstd::Status::IOError("Coordination failed: " + std::string(e.what()));
  }
  return pstd::Status::OK();
}

284-291: Consider initializing member variables in constructor

Move the initialization of member variables to the constructor for better encapsulation:

ConsensusCoordinator::ConsensusCoordinator(const std::string& db_name) 
  : is_consistency_(false)
  , committed_id_(LogOffset())
  , prepared_id_(LogOffset())
  , logs_(std::make_shared<Log>()) {
  // ... rest of constructor
}
src/pika_repl_server_conn.cc (1)

228-263: Consider extracting consistency check logic

The consistency check logic could be extracted into a separate method for better maintainability:

bool ValidateConsistencyState(const std::shared_ptr<SyncMasterDB>& db,
                            const InnerMessage::TrySync& try_sync_request,
                            InnerMessage::TrySync* try_sync_response) {
  if (!try_sync_request.has_committed_id()) {
    try_sync_response->set_reply_code(InnerMessage::TrySync::kError);
    LOG(WARNING) << "TrySync no slave commmittedID";
    return false;
  }
  // ... rest of consistency validation logic
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2afafc0 and 7317faf.

📒 Files selected for processing (5)
  • include/pika_consensus.h (4 hunks)
  • include/pika_server.h (7 hunks)
  • src/pika_repl_client_conn.cc (2 hunks)
  • src/pika_repl_server_conn.cc (5 hunks)
  • src/pika_server.cc (9 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/pika_repl_client_conn.cc
  • include/pika_server.h
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: build_on_ubuntu
  • GitHub Check: build_on_centos
  • GitHub Check: build_on_macos
  • GitHub Check: Analyze (go)
🔇 Additional comments (8)
include/pika_consensus.h (3)

28-28: LGTM: Proper initialization of applied_index_

The initialization of applied_index_ with LogOffset() ensures a clean initial state.


55-70: LGTM: Thread-safe implementation of binlog state management

The implementation correctly uses mutex locks to ensure thread-safe operations on shared state.

Also applies to: 76-76


263-279: LGTM: Thread-safe ID management

The implementation correctly uses mutex locks for thread-safe access to committed and prepared IDs.

src/pika_repl_server_conn.cc (2)

31-68: LGTM: Proper handling of consistency in meta sync

The implementation correctly:

  • Handles consistency flags
  • Initializes master databases
  • Processes coordination with proper error handling

410-414: ⚠️ Potential issue

Add error handling for AppendCandidateBinlog

The status returned by AppendCandidateBinlog is not being checked:

 if (master_db->GetISConsistency()) {
   Status s = master_db->AppendCandidateBinlog(node.ip(), node.port(), range_start);
+  if (!s.ok()) {
+    LOG(WARNING) << "Failed to append candidate binlog for " << node.ip() << ":" << node.port() 
+                 << ", Error: " << s.ToString();
+    conn->NotifyClose();
+    return;
+  }
 } else {
   Status s = master_db->ActivateSlaveBinlogSync(node.ip(), node.port(), range_start);
+  if (!s.ok()) {
+    LOG(WARNING) << "Failed to activate slave binlog sync for " << node.ip() << ":" << node.port() 
+                 << ", Error: " << s.ToString();
+    conn->NotifyClose();
+    return;
+  }
 }

Likely invalid or redundant comment.

src/pika_server.cc (3)

271-274: LGTM: Thread-safe role access

The method correctly uses shared locking to access the last role state.


755-762: LGTM: Thread-safe consistency state management

The implementation correctly uses shared locks to manage consistency state.


691-704: ⚠️ Potential issue

Add port validation in SetMaster

The method should validate the master port before proceeding:

 bool PikaServer::SetMaster(std::string& master_ip, int master_port, bool is_consistency) {
+  if (master_port <= 0 || master_port > 65535) {
+    LOG(WARNING) << "Invalid master port: " << master_port;
+    return false;
+  }
   if (master_ip == "127.0.0.1") {
     master_ip = host_;
   }
   // ... rest of the method
 }

Likely invalid or redundant comment.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
src/pika_consensus.cc (3)

229-229: ⚠️ Potential issue

Initialize is_consistency_ member variable in constructor.

The is_consistency_ member variable is not initialized in the constructor, which could lead to undefined behavior.

Initialize is_consistency_ in the constructor:

 ConsensusCoordinator::ConsensusCoordinator(const std::string& db_name)
-    : db_name_(db_name) {
+    : db_name_(db_name), is_consistency_(false) {
   logs_ = std::make_shared<Log>();

375-397: 🛠️ Refactor suggestion

⚠️ Potential issue

Ensure thread safety and improve error handling.

  1. Thread safety concerns:

The code accesses shared data structures without proper synchronization.

  1. Error handling improvements needed:

The error handling in the non-consistency mode could be more robust:

 } else {
   LogOffset committed_index;
   Status s = sync_pros_.Update(ip, port, start, end, &committed_index);
   if (!s.ok()) {
+    LOG(WARNING) << "Failed to update sync progress: " << s.ToString();
     return s;
   }
+  LOG(INFO) << "Successfully updated sync progress for " << ip << ":" << port;
 }

1008-1019: ⚠️ Potential issue

Initialize Log class member variables properly.

The Log class members need proper initialization:

-Log::Log() = default;
+Log::Log() : first_index_(LogOffset()), last_index_(LogOffset()) {}

Additionally, consider adding invariant checks in AppendLog:

 void Log::AppendLog(const LogItem& item) {
   std::lock_guard lock(logs_mutex_);
+  if (!logs_.empty() && item.offset <= last_index_) {
+    LOG(WARNING) << "Attempting to append out-of-order log entry";
+    return;
+  }
   logs_.push_back(item);
   last_index_ = item.offset;
 }
🧹 Nitpick comments (4)
src/pika_repl_server_conn.cc (1)

229-263: Refactor consistency check logic for better readability.

The nested conditions and multiple early returns make the code harder to follow. Consider extracting the consistency check into a separate method.

Suggested refactor:

+bool PikaReplServerConn::ValidateConsistencyState(
+    const std::shared_ptr<SyncMasterDB>& db,
+    const InnerMessage::InnerRequest::TrySync& try_sync_request,
+    InnerMessage::InnerResponse::TrySync* try_sync_response) {
+  if (!db->GetISConsistency()) {
+    return true;  // Skip consistency check
+  }
+  
+  if (!try_sync_request.has_committed_id()) {
+    LOG(WARNING) << "TrySync no slave commmittedID";
+    try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kError);
+    return false;
+  }
+  
+  const InnerMessage::BinlogOffset& slave_committed_id = try_sync_request.committed_id();
+  LogOffset committed_id(
+      BinlogOffset(slave_committed_id.filenum(), slave_committed_id.offset()),
+      LogicOffset(slave_committed_id.term(), slave_committed_id.index()));
+  
+  if (db->GetCommittedId() < committed_id) {
+    try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kError);
+    LOG(WARNING) << "DB Name: " << db->DBName() << " Slave CommittedId Greater than master"
+                 << " master_id: " << db->GetCommittedId().ToString()
+                 << " slave_id: " << committed_id.ToString();
+    return false;
+  }
+  
+  // Set prepared ID in response
+  InnerMessage::BinlogOffset* master_prepared_id = try_sync_response->mutable_prepared_id();
+  g_pika_rm->BuildBinlogOffset(db->GetPreparedId(), master_prepared_id);
+  return true;
+}

Then use it in TrySyncOffsetCheck:

 if(db->GetISConsistency()){
-  if(try_sync_request.has_committed_id()){ ... }
-  else { ... }
+  if (!ValidateConsistencyState(db, try_sync_request, try_sync_response)) {
+    return false;
+  }
 }
src/pika_consensus.cc (3)

1031-1034: Improve method documentation and translate comments to English.

 Log::LogItem Log::At(int index) {
   std::shared_lock lock(logs_mutex_);
-  return logs_.at(index);  // 使用 at() 确保边界安全
+  return logs_.at(index);  // Using at() for bounds checking
 }

Consider adding method documentation:

/**
 * @brief Retrieves a log item at the specified index with bounds checking
 * @param index The index of the log item to retrieve
 * @return The log item at the specified index
 * @throws std::out_of_range if the index is out of bounds
 */

1068-1075: Consider performance optimization and improve documentation for FindLogIndex.

  1. The current linear search implementation could be inefficient for large logs. Consider using a more efficient data structure or indexing mechanism.
  2. Add clear documentation about return value semantics.
+/**
+ * @brief Finds the index of a log entry with the specified offset
+ * @param offset The offset to search for
+ * @return The index of the log entry if found, -1 otherwise
+ */
 int Log::FindLogIndex(const LogOffset& offset) {
+  // TODO: Consider using a more efficient data structure (e.g., map or binary search)
+  // for better performance with large logs
   for (size_t i = 0; i < logs_.size(); ++i) {
     if (logs_[i].offset == offset) {
       return static_cast<int>(i);
     }
   }
   return -1;
 }

802-1075: Consider architectural improvements for better maintainability and performance.

  1. Separation of Concerns:

    • Consider splitting the Log class into separate components for storage and indexing
    • Extract the consistency checking logic into a dedicated class
  2. Performance Optimizations:

    • Consider using more efficient data structures for log storage and indexing
    • Implement batch processing for log operations
    • Add caching mechanisms for frequently accessed log entries
  3. Error Recovery:

    • Consider implementing a recovery mechanism for partial failures
    • Add periodic state validation
  4. Monitoring:

    • Add metrics for monitoring replication lag
    • Implement health checks for consensus state

Would you like me to provide more detailed suggestions for any of these areas?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7317faf and 03d4a6e.

📒 Files selected for processing (4)
  • include/pika_server.h (6 hunks)
  • src/pika_consensus.cc (4 hunks)
  • src/pika_repl_server_conn.cc (4 hunks)
  • src/pika_server.cc (7 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • include/pika_server.h
  • src/pika_server.cc
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: build_on_centos
  • GitHub Check: build_on_macos
  • GitHub Check: build_on_ubuntu
  • GitHub Check: Analyze (go)
🔇 Additional comments (4)
src/pika_repl_server_conn.cc (3)

410-414: ⚠️ Potential issue

Add error handling for AppendCandidateBinlog.

The status returned by AppendCandidateBinlog is not checked in the consistency case, which could lead to silent failures.

Fix the error handling:

     if (master_db->GetISConsistency()) {
       Status s = master_db->AppendCandidateBinlog(node.ip(), node.port(), range_start);
+      if (!s.ok()) {
+        LOG(WARNING) << "Failed to append candidate binlog for " << node.ip() << ":" << node.port() 
+                     << ", Error: " << s.ToString();
+        conn->NotifyClose();
+        return;
+      }
     } else {
       Status s = master_db->ActivateSlaveBinlogSync(node.ip(), node.port(), range_start);
     }

Likely invalid or redundant comment.


61-65: ⚠️ Potential issue

Improve error handling for ProcessCoordination.

The error from ProcessCoordination is not properly propagated. The loop continues even after an error, which could leave the system in an inconsistent state.

Add proper error propagation:

           Status s = db.second->ProcessCoordination();
           if (!s.ok()) {
             response.set_code(InnerMessage::kError);
-            response.set_reply("master ProcessCoordination error");
+            response.set_reply("Master ProcessCoordination error: " + s.ToString());
+            LOG(WARNING) << "ProcessCoordination failed for DB: " << db.first 
+                        << ", Error: " << s.ToString();
+            break;  // Exit the loop on error
           }

Likely invalid or redundant comment.


58-66: Add synchronization around slave size check.

There's a potential race condition between checking slave_size and setting consistency. Another thread could add/remove slaves between the check and the initialization.

Consider adding proper synchronization:

-      if (slave_size == 0) {
+      std::lock_guard<std::mutex> guard(slave_mutex_);  // Add appropriate mutex
+      if (g_pika_server->slave_size() == 0) {
src/pika_consensus.cc (1)

882-899: ⚠️ Potential issue

Improve error handling and state management in CommitAppLog.

The method needs better error handling and state management:

  1. Add input validation
  2. Handle ApplyBinlog errors
  3. Ensure atomic state updates
 Status ConsensusCoordinator::CommitAppLog(const LogOffset& master_committed_id) {
+  if (master_committed_id == LogOffset()) {
+    return Status::InvalidArgument("Invalid master committed ID");
+  }
+
   int index = logs_->FindOffset(logs_->FirstOffset());
   int log_size = logs_->Size();
+  if (index < 0) {
+    return Status::Corruption("Failed to find starting offset");
+  }
+
   LOG(INFO) << "PacificA CommitAppLog master_committed_id index: " << index << " log_size: " << log_size
             << " , m_offset: " << master_committed_id.ToString();
   for (int i = index; i < log_size; ++i) {
     Log::LogItem log = logs_->At(i);
     if (log.offset >= master_committed_id) {
       LOG(INFO) << "PacificA master_committed_id: " << master_committed_id.ToString()
                 << ", ApplyLog: " << log.offset.ToString();
-      ApplyBinlog(log.cmd_ptr);
+      Status s = ApplyBinlog(log.cmd_ptr);
+      if (!s.ok()) {
+        LOG(WARNING) << "Failed to apply binlog: " << s.ToString();
+        return s;
+      }
     }
   }
-  logs_->TruncateFrom(master_committed_id);
-  SetCommittedId(master_committed_id);
+  Status s = logs_->TruncateFrom(master_committed_id);
+  if (!s.ok()) {
+    LOG(WARNING) << "Failed to truncate logs: " << s.ToString();
+    return s;
+  }
+  SetCommittedId(master_committed_id);
   return Status::OK();
 }

Likely invalid or redundant comment.

Comment on lines +963 to +989
Status ConsensusCoordinator::SendBinlog(std::shared_ptr<SlaveNode> slave_ptr, std::string db_name) {
std::vector<WriteTask> tasks;

// Check if there are new log entries that need to be sent to the slave
if (logs_->LastOffset() >= slave_ptr->acked_offset) {
// Find the index of the log entry corresponding to the slave's acknowledged offset
int index = logs_->FindOffset(slave_ptr->acked_offset);
if (index < logs_->Size()) {
for (int i = index; i < logs_->Size(); ++i) {
const Log::LogItem& item = logs_->At(i);

slave_ptr->SetLastSendTime(pstd::NowMicros());

RmNode rm_node(slave_ptr->Ip(), slave_ptr->Port(), slave_ptr->DBName(), slave_ptr->SessionId());
WriteTask task(rm_node, BinlogChip(item.offset, item.binlog_), slave_ptr->sent_offset, GetCommittedId());
tasks.emplace_back(std::move(task));

slave_ptr->sent_offset = item.offset;
}
}
}

if (!tasks.empty()) {
g_pika_rm->ProduceWriteQueue(slave_ptr->Ip(), slave_ptr->Port(), db_name, tasks);
}
return Status::OK();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance error handling and performance in SendBinlog.

The method needs improvements in several areas:

  1. Add input validation
  2. Improve error handling
  3. Optimize vector operations
 Status ConsensusCoordinator::SendBinlog(std::shared_ptr<SlaveNode> slave_ptr, std::string db_name) {
+  if (!slave_ptr) {
+    return Status::InvalidArgument("Invalid slave pointer");
+  }
+  if (db_name.empty()) {
+    return Status::InvalidArgument("Empty database name");
+  }
+
   std::vector<WriteTask> tasks;
+  tasks.reserve(logs_->Size());  // Pre-allocate space to avoid reallocations
 
   if (logs_->LastOffset() >= slave_ptr->acked_offset) {
     int index = logs_->FindOffset(slave_ptr->acked_offset);
+    if (index < 0) {
+      return Status::Corruption("Failed to find offset");
+    }
     if (index < logs_->Size()) {
       for (int i = index; i < logs_->Size(); ++i) {
         const Log::LogItem& item = logs_->At(i);
         slave_ptr->SetLastSendTime(pstd::NowMicros());
 
         RmNode rm_node(slave_ptr->Ip(), slave_ptr->Port(), slave_ptr->DBName(), slave_ptr->SessionId());
         WriteTask task(rm_node, BinlogChip(item.offset, item.binlog_), slave_ptr->sent_offset, GetCommittedId());
         tasks.emplace_back(std::move(task));
 
         slave_ptr->sent_offset = item.offset;
       }
     }
   }
 
   if (!tasks.empty()) {
-    g_pika_rm->ProduceWriteQueue(slave_ptr->Ip(), slave_ptr->Port(), db_name, tasks);
+    Status s = g_pika_rm->ProduceWriteQueue(slave_ptr->Ip(), slave_ptr->Port(), db_name, tasks);
+    if (!s.ok()) {
+      LOG(WARNING) << "Failed to produce write queue: " << s.ToString();
+      return s;
+    }
   }
   return Status::OK();
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Status ConsensusCoordinator::SendBinlog(std::shared_ptr<SlaveNode> slave_ptr, std::string db_name) {
std::vector<WriteTask> tasks;
// Check if there are new log entries that need to be sent to the slave
if (logs_->LastOffset() >= slave_ptr->acked_offset) {
// Find the index of the log entry corresponding to the slave's acknowledged offset
int index = logs_->FindOffset(slave_ptr->acked_offset);
if (index < logs_->Size()) {
for (int i = index; i < logs_->Size(); ++i) {
const Log::LogItem& item = logs_->At(i);
slave_ptr->SetLastSendTime(pstd::NowMicros());
RmNode rm_node(slave_ptr->Ip(), slave_ptr->Port(), slave_ptr->DBName(), slave_ptr->SessionId());
WriteTask task(rm_node, BinlogChip(item.offset, item.binlog_), slave_ptr->sent_offset, GetCommittedId());
tasks.emplace_back(std::move(task));
slave_ptr->sent_offset = item.offset;
}
}
}
if (!tasks.empty()) {
g_pika_rm->ProduceWriteQueue(slave_ptr->Ip(), slave_ptr->Port(), db_name, tasks);
}
return Status::OK();
}
Status ConsensusCoordinator::SendBinlog(std::shared_ptr<SlaveNode> slave_ptr, std::string db_name) {
if (!slave_ptr) {
return Status::InvalidArgument("Invalid slave pointer");
}
if (db_name.empty()) {
return Status::InvalidArgument("Empty database name");
}
std::vector<WriteTask> tasks;
tasks.reserve(logs_->Size()); // Pre-allocate space to avoid reallocations
// Check if there are new log entries that need to be sent to the slave
if (logs_->LastOffset() >= slave_ptr->acked_offset) {
// Find the index of the log entry corresponding to the slave's acknowledged offset
int index = logs_->FindOffset(slave_ptr->acked_offset);
if (index < 0) {
return Status::Corruption("Failed to find offset");
}
if (index < logs_->Size()) {
for (int i = index; i < logs_->Size(); ++i) {
const Log::LogItem& item = logs_->At(i);
slave_ptr->SetLastSendTime(pstd::NowMicros());
RmNode rm_node(slave_ptr->Ip(), slave_ptr->Port(), slave_ptr->DBName(), slave_ptr->SessionId());
WriteTask task(rm_node, BinlogChip(item.offset, item.binlog_), slave_ptr->sent_offset, GetCommittedId());
tasks.emplace_back(std::move(task));
slave_ptr->sent_offset = item.offset;
}
}
}
if (!tasks.empty()) {
Status s = g_pika_rm->ProduceWriteQueue(slave_ptr->Ip(), slave_ptr->Port(), db_name, tasks);
if (!s.ok()) {
LOG(WARNING) << "Failed to produce write queue: " << s.ToString();
return s;
}
}
return Status::OK();
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (5)
tests/integration/consensus_test.go (3)

33-54: Consider implementing a more robust sync verification mechanism.

Instead of using a fixed sleep duration, consider implementing a retry mechanism with timeout to verify synchronization. This would make the test more reliable and less dependent on timing.

-			// 等待同步
-			time.Sleep(2 * time.Second)
+			// 等待同步,使用重试机制
+			deadline := time.Now().Add(5 * time.Second)
+			for time.Now().Before(deadline) {
+				allSynced := true
+				for i := 0; i < 100; i++ {
+					key := fmt.Sprintf("key%d", i)
+					get := slaveClient.Get(ctx, key)
+					if get.Err() != nil {
+						allSynced = false
+						break
+					}
+				}
+				if allSynced {
+					break
+				}
+				time.Sleep(100 * time.Millisecond)
+			}

56-81: Enhance partition recovery test coverage.

While the test verifies basic recovery, consider adding these checks:

  1. Verify initial data is preserved after recovery
  2. Add multiple writes during partition
  3. Check replication lag metrics if available
 			// 验证数据一致性
+			// 验证初始数据仍然存在
+			get = slaveClient.Get(ctx, "test_key")
+			Expect(get.Err()).NotTo(HaveOccurred())
+			Expect(get.Val()).To(Equal("test_value"))
+
+			// 验证分区期间的多个写入
+			for i := 0; i < 5; i++ {
+				key := fmt.Sprintf("partition_key_%d", i)
+				get = slaveClient.Get(ctx, key)
+				Expect(get.Err()).NotTo(HaveOccurred())
+			}

113-134: Add performance monitoring to large dataset sync test.

Consider measuring and asserting on sync performance metrics:

  1. Time taken for initial sync
  2. Memory usage during sync
  3. Network bandwidth utilization
+			// 记录同步开始时间
+			syncStart := time.Now()
+
 			// 写入大量数据
 			for i := 0; i < 10000; i++ {
 				key := fmt.Sprintf("large_key_%d", i)
 				value := fmt.Sprintf("large_value_%d", i)
 				set := masterClient.Set(ctx, key, value, 0)
 				Expect(set.Err()).NotTo(HaveOccurred())
 			}
 
-			// 等待同步完成
-			time.Sleep(5 * time.Second)
+			// 使用重试机制等待同步完成并记录性能指标
+			deadline := time.Now().Add(10 * time.Second)
+			synced := false
+			var syncDuration time.Duration
+			for time.Now().Before(deadline) && !synced {
+				synced = true
+				for i := 0; i < 10000; i++ {
+					key := fmt.Sprintf("large_key_%d", i)
+					get := slaveClient.Get(ctx, key)
+					if get.Err() != nil {
+						synced = false
+						break
+					}
+				}
+				if synced {
+					syncDuration = time.Since(syncStart)
+					break
+				}
+				time.Sleep(100 * time.Millisecond)
+			}
+			
+			// 验证同步性能
+			Expect(synced).To(BeTrue(), "Sync timed out")
+			Expect(syncDuration).To(BeNumerically("<", 5*time.Second), "Sync took too long")
src/pika_consensus.cc (2)

945-948: Define log gap limit as a configurable constant.

The magic number kMaxLogGap should be defined as a configurable constant with documentation explaining its purpose and impact.

+// Maximum allowed gap between current log index and committed index
+// This prevents excessive memory usage from uncommitted logs
+constexpr uint64_t kMaxLogGap = 1000;  // Configurable via config file

1062-1070: Optimize log offset search.

The FindOffset method performs a linear search. Consider using a binary search for better performance with large logs.

 int Log::FindOffset(const LogOffset& send_offset) {
   std::shared_lock lock(logs_mutex_);
-  for (size_t i = 0; i < logs_.size(); ++i) {
-    if (logs_[i].offset > send_offset) {
-      return i;
-    }
-  }
-  return static_cast<int>(logs_.size());
+  auto it = std::lower_bound(logs_.begin(), logs_.end(), send_offset,
+    [](const LogItem& item, const LogOffset& offset) {
+      return item.offset < offset;
+    });
+  return static_cast<int>(std::distance(logs_.begin(), it));
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 03d4a6e and a799c88.

📒 Files selected for processing (2)
  • src/pika_consensus.cc (4 hunks)
  • tests/integration/consensus_test.go (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: build_on_centos
  • GitHub Check: Analyze (go)
  • GitHub Check: build_on_macos
🔇 Additional comments (3)
tests/integration/consensus_test.go (1)

1-32: Well-structured test setup!

The test initialization follows best practices with proper setup and teardown, ensuring a clean state for each test case.

src/pika_consensus.cc (2)

222-230: ⚠️ Potential issue

Initialize member variables in constructor.

The is_consistency_ and first_index_ member variables are not explicitly initialized, which could lead to undefined behavior.

Apply this diff to properly initialize all member variables:

 ConsensusCoordinator::ConsensusCoordinator(const std::string& db_name)
-    : db_name_(db_name) {
+    : db_name_(db_name), is_consistency_(false) {
   std::string db_log_path = g_pika_conf->log_path() + "log_" + db_name + "/";
   std::string log_path = db_log_path;
   context_ = std::make_shared<Context>(log_path + kContext);
   stable_logger_ = std::make_shared<StableLog>(db_name, log_path);
   mem_logger_ = std::make_shared<MemLog>();
-  logs_ = std::make_shared<Log>();
+  logs_ = std::make_shared<Log>();
+  logs_->first_index_ = LogOffset();  // Initialize first_index_

375-398: Thread safety properly implemented.

The code now correctly uses mutex locks when accessing shared data structures, addressing the previous thread safety concerns.

Comment on lines 926 to 944
Status ConsensusCoordinator::ProcessCoordination() {
// 添加状态检查
if (!is_consistency_) {
return Status::InvalidArgument("Consistency mode not enabled");
}

LogOffset offset = LogOffset();
Status s = stable_logger_->Logger()->GetProducerStatus(&(offset.b_offset.filenum),
&(offset.b_offset.offset),
&(offset.l_offset.term),
&(offset.l_offset.index));
if (!s.ok()) {
LOG(WARNING) << "Failed to get producer status: " << s.ToString();
return s;
}

LogOffset stable_committed_id = context_->applied_index_;

// 添加日志差距检查
if (offset.l_offset.index > stable_committed_id.l_offset.index + kMaxLogGap) {
LOG(WARNING) << "Log gap too large: " << offset.l_offset.index - stable_committed_id.l_offset.index;
return Status::Aborted("Log gap exceeds limit");
}

if (stable_committed_id == LogOffset() ||
stable_committed_id.l_offset.index + 10 < offset.l_offset.index) {
SetCommittedId(offset);
} else {
SetCommittedId(stable_committed_id);
}

SetPreparedId(offset);

// 主从角色切换时的处理
if (g_pika_server->role() & PIKA_ROLE_MASTER &&
g_pika_server->last_role() & PIKA_ROLE_SLAVE) {
Status s = CommitAppLog(GetPreparedId());
if (!s.ok()) {
LOG(WARNING) << "Failed to commit logs during role change: " << s.ToString();
return s;
}
}

return Status::OK();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance error handling in ProcessCoordination.

The method needs more comprehensive error handling and logging for better debugging.

 Status ConsensusCoordinator::ProcessCoordination() {
+  LOG(INFO) << "Starting coordination process...";
+
   // 添加状态检查
   if (!is_consistency_) {
+    LOG(WARNING) << "Coordination skipped: consistency mode not enabled";
     return Status::InvalidArgument("Consistency mode not enabled");
   }

   LogOffset offset = LogOffset();
   Status s = stable_logger_->Logger()->GetProducerStatus(&(offset.b_offset.filenum), 
                                                       &(offset.b_offset.offset),
                                                       &(offset.l_offset.term), 
                                                       &(offset.l_offset.index));
   if (!s.ok()) {
-    LOG(WARNING) << "Failed to get producer status: " << s.ToString();
+    LOG(ERROR) << "Failed to get producer status: " << s.ToString() 
+               << ", current term: " << term_
+               << ", committed_id: " << GetCommittedId().ToString();
     return s;
   }

+  LOG(INFO) << "Current producer status - offset: " << offset.ToString()
+            << ", committed_id: " << stable_committed_id.ToString();

Committable suggestion skipped: line range outside the PR's diff.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (1)
src/pika_consensus.cc (1)

222-230: ⚠️ Potential issue

Initialize all member variables in the constructor.

The is_consistency_ member variable is not initialized in the constructor, which could lead to undefined behavior.

Apply this diff to initialize all member variables:

 ConsensusCoordinator::ConsensusCoordinator(const std::string& db_name)
-    : db_name_(db_name) {
+    : db_name_(db_name)
+    , is_consistency_(false) {
   std::string db_log_path = g_pika_conf->log_path() + "log_" + db_name + "/";
   std::string log_path = db_log_path;
   context_ = std::make_shared<Context>(log_path + kContext);
   stable_logger_ = std::make_shared<StableLog>(db_name, log_path);
   mem_logger_ = std::make_shared<MemLog>();
   logs_ = std::make_shared<Log>();
 }
♻️ Duplicate comments (3)
src/pika_consensus.cc (3)

882-899: 🛠️ Refactor suggestion

Optimize CommitAppLog performance and error handling.

The method has several areas for improvement:

  1. The log size is accessed in each iteration.
  2. Error handling for ApplyBinlog is missing.

Apply this diff to improve performance and error handling:

 Status ConsensusCoordinator::CommitAppLog(const LogOffset& master_committed_id) {
+  if (master_committed_id == LogOffset()) {
+    return Status::InvalidArgument("Invalid master committed ID");
+  }
+
   int index = logs_->FindOffset(logs_->FirstOffset());
   int log_size = logs_->Size();  // Cache log size
   LOG(INFO) << "PacificA CommitAppLog master_committed_id index: " << index << " log_size: " << log_size
             << " , m_offset: " << master_committed_id.ToString();
+
+  if (index < 0) {
+    return Status::Corruption("Failed to find starting offset");
+  }
+
   for (int i = index; i < log_size; ++i) {
     Log::LogItem log = logs_->At(i);
     if (log.offset >= master_committed_id) {
       LOG(INFO) << "PacificA master_committed_id: " << master_committed_id.ToString()
                 << ", ApplyLog: " << log.offset.ToString();
-      ApplyBinlog(log.cmd_ptr);
+      Status s = ApplyBinlog(log.cmd_ptr);
+      if (!s.ok()) {
+        LOG(WARNING) << "Failed to apply binlog: " << s.ToString();
+        return s;
+      }
     }
   }
   return Status::OK();
 }

1008-1019: ⚠️ Potential issue

Initialize member variables and enhance error handling in Log class.

The Log class has several issues:

  1. Member variables are not properly initialized.
  2. No bounds checking in AppendLog.

Apply this diff to improve initialization and error handling:

-Log::Log() = default;
+Log::Log() : first_index_(LogOffset()), last_index_(LogOffset()) {}

 int Log::Size() {
   std::shared_lock lock(logs_mutex_);
   return static_cast<int>(logs_.size());
 }

 void Log::AppendLog(const LogItem& item) {
+  if (item.offset == LogOffset()) {
+    LOG(WARNING) << "Attempting to append invalid log item";
+    return;
+  }
   std::lock_guard lock(logs_mutex_);
+  if (logs_.empty()) {
+    first_index_ = item.offset;
+  }
   logs_.push_back(item);
   last_index_ = item.offset;
 }

373-398: ⚠️ Potential issue

Enhance thread safety and error handling in UpdateSlave.

The method has several issues:

  1. Shared data structures are accessed without proper synchronization.
  2. Error handling could be more robust.

Apply this diff to improve thread safety and error handling:

 Status ConsensusCoordinator::UpdateSlave(const std::string& ip, int port, const LogOffset& start,
                                          const LogOffset& end) {
+  if (ip.empty() || port <= 0) {
+    return Status::InvalidArgument("Invalid ip or port");
+  }
+
+  std::lock_guard<std::mutex> lock(update_slave_mutex_);  // Add mutex for thread safety
   if (is_consistency_) {
     std::shared_ptr<SlaveNode> slave_ptr = sync_pros_.GetSlaveNode(ip, port);
     if (!slave_ptr) {
       return Status::NotFound("ip " + ip + " port " + std::to_string(port));
     }
     {
       std::lock_guard l(slave_ptr->slave_mu);
       slave_ptr->acked_offset = end;
       sync_pros_.AddMatchIndex(ip, port, slave_ptr->acked_offset);
+      LOG(INFO) << "PacificA slave ip: " << ip << ", port: " << port 
+                << " updated acked_offset to " << slave_ptr->acked_offset.ToString();
       if (slave_ptr->acked_offset >= slave_ptr->target_offset) {
         slave_ptr->slave_state = kSlaveBinlogSync;
         LOG(INFO) << "PacificA change slave_state kSlaveBinlogSync acked_offset: " 
                  << slave_ptr->acked_offset.ToString()
                  << ", target_offset: " << slave_ptr->target_offset.ToString();
       }
     }
   } else {
     LogOffset committed_index;
     Status s = sync_pros_.Update(ip, port, start, end, &committed_index);
     if (!s.ok()) {
+      LOG(WARNING) << "Failed to update slave: " << s.ToString();
       return s;
     }
   }
   return Status::OK();
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a799c88 and c0ae54a.

📒 Files selected for processing (1)
  • src/pika_consensus.cc (4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: build_on_macos
  • GitHub Check: build_on_centos
  • GitHub Check: Analyze (go)

Comment on lines +1031 to +1034
Log::LogItem Log::At(int index) {
std::shared_lock lock(logs_mutex_);
return logs_.at(index); // 使用 at() 确保边界安全
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve error handling in Log::At method.

The method uses at() for bounds checking but doesn't handle the potential exception.

Apply this diff to add proper error handling:

 Log::LogItem Log::At(int index) {
   std::shared_lock lock(logs_mutex_);
-  return logs_.at(index);  // 使用 at() 确保边界安全
+  try {
+    return logs_.at(index);
+  } catch (const std::out_of_range& e) {
+    LOG(WARNING) << "Index out of range: " << index;
+    return LogItem();  // Return empty item or handle error appropriately
+  }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Log::LogItem Log::At(int index) {
std::shared_lock lock(logs_mutex_);
return logs_.at(index); // 使用 at() 确保边界安全
}
Log::LogItem Log::At(int index) {
std::shared_lock lock(logs_mutex_);
try {
return logs_.at(index);
} catch (const std::out_of_range& e) {
LOG(WARNING) << "Index out of range: " << index;
return LogItem(); // Return empty item or handle error appropriately
}
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
include/pika_define.h (1)

163-163: Fix naming convention for enum value.

The enum value KCandidate uses a different capitalization style (K prefix) compared to other enum values in SlaveState. Consider renaming to maintain consistency.

-  KCandidate = 3,
+  kCandidate = 3,
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c0ae54a and eab9f37.

📒 Files selected for processing (2)
  • include/pika_define.h (2 hunks)
  • src/pika_admin.cc (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/pika_admin.cc
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: build_on_centos
  • GitHub Check: Analyze (go)
  • GitHub Check: build_on_macos
🔇 Additional comments (2)
include/pika_define.h (2)

167-167: LGTM! The SlaveStateMsg array is correctly updated.

The array has been updated to include the string representation for the new state.


278-283: LGTM! Well-structured implementation for tracking committed log offsets.

The changes to WriteTask are well-implemented:

  • Default initialization of committed_id_
  • Proper use of const references in the constructor
  • Clear member initialization list

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (5)
tests/integration/consistency_test.go (3)

59-88: Consider Using Gomega’s Eventually Instead of Fixed Sleeps

Relying on time.Sleep (e.g., 3 seconds in line 77) can lead to fragile tests if replication is slow. Replacing it with Gomega’s Eventually or a similar retry-based assertion is more resilient.


89-117: Concurrent Writes Test

The loop captures the iteration variable correctly with i := i inside the goroutine. However, using time-bound sleeps (line 102) for synchronization can cause flakiness. Consider employing a sync.WaitGroup or a Gomega Eventually-style pattern to ensure all goroutines finish before verification.


264-387: Large, Comprehensive Failure and Recovery Test

This scenario-based test covers multiple stages (failure, partial data writes, recovery checks). It’s very detailed. As a small improvement, consider splitting it into smaller, more focused tests for maintainability and quicker debugging.

src/pika_consensus.cc (2)

477-477: Minor Formatting Issue

Chaining stream output with a semicolon immediately followed by a new local variable definition can reduce readability:

LOG(INFO) << ...;  LogOffset committed = committed_index();

Consider separating them into distinct statements for clarity.


806-813: Naming Consistency for Member Accessors

SetConsistency and GetISConsistency mismatch in naming style. Using a more descriptive “IsConsistencyEnabled” or “GetConsistency” might help maintain consistent naming conventions.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between eab9f37 and cfd3fed.

📒 Files selected for processing (10)
  • include/pika_define.h (2 hunks)
  • src/pika_admin.cc (4 hunks)
  • src/pika_consensus.cc (5 hunks)
  • src/pika_repl_bgworker.cc (4 hunks)
  • src/pika_repl_client_conn.cc (2 hunks)
  • src/pika_rm.cc (7 hunks)
  • tests/integration/cleanup_test.sh (1 hunks)
  • tests/integration/consistency_test.go (1 hunks)
  • tests/integration/options.go (1 hunks)
  • tests/integration/start_master_and_slave.sh (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • include/pika_define.h
  • src/pika_admin.cc
  • src/pika_repl_client_conn.cc
  • src/pika_repl_bgworker.cc
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: build_on_centos
  • GitHub Check: build_on_macos
  • GitHub Check: Analyze (go)
🔇 Additional comments (11)
tests/integration/consistency_test.go (4)

1-11: Use of Bloom-Style Imports in Ginkgo Tests Looks Good

Using dot imports for Ginkgo and Gomega is common for test readability. No issues here.


13-28: Initialization of Clients and Slave Setup

The test setup is clear and ensures a clean state by flushing the DB. This approach provides a predictable environment for subsequent test cases.


42-57: Robust Lifecycle Management

Using BeforeEach and AfterEach to create and close clients in every test context is a good practice that prevents test cross-contamination.


170-261: Thorough Dynamic Node Addition Test

This test rigorously checks both historical and newly added data. The step-by-step approach with verification enhances confidence in replication correctness. No immediate issues found.

tests/integration/options.go (1)

24-31: PacificA Test Constants

Defining these addresses and ports distinctly helps keep the test environment organized. No concerns regarding correctness or naming.

src/pika_consensus.cc (4)

67-69: Correct Synchronization for Updating applied_index_

Using std::lock_guard and then calling StableSave() is appropriate to ensure thread-safe updates.


375-395: Conditional Logic in UpdateSlave

The code differentiates between is_consistency_ and the older update path. Ensure edge cases (e.g., large offsets or invalid ranges) are correctly handled or logged if they appear.

Would you like a script to search for offset boundary checks in other areas of the codebase?


1031-1034: Duplicate: Handle Potential std::out_of_range in Log::At

This method uses logs_.at(index), which can throw if index is invalid. A prior review suggested adding a try/catch block. Revisit that recommendation if unhandled exceptions pose a risk.


881-899: Commit Logic Seems Correct

“CommitAppLog” processes each log entry up to the master’s offset, then truncates the committed portion. The flow matches typical raft-like designs. No major concerns.

src/pika_rm.cc (2)

400-421: LGTM! New consistency-related methods look good.

The new methods provide a clean interface for managing consistency state and offsets. The implementation correctly delegates to the coordinator.


1175-1180: LGTM! BuildBinlogOffset implementation is clean and straightforward.

The method correctly maps the LogOffset fields to the protobuf message fields.

Comment on lines 472 to 496
Status SyncMasterDB::ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr) {
return coordinator_.ProposeLog(cmd_ptr);
// If consistency is not required, directly propose the log without waiting for consensus
if (!coordinator_.GetISConsistency()) {
return coordinator_.ProposeLog(cmd_ptr);
}

auto start = std::chrono::steady_clock::now();
LogOffset offset;
Status s = coordinator_.AppendEntries(cmd_ptr, offset); // Append the log entry to the coordinator

if (!s.ok()) {
return s;
}

// Wait for consensus to be achieved within 10 seconds
while (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - start).count() < 10) {
// Check if consensus has been achieved for the given log offset
if (checkFinished(offset)) {
return Status::OK();
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}

return Status::Timeout("No consistency achieved within 10 seconds");
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance timeout handling in ConsensusProposeLog.

The current implementation has a few issues:

  1. Hard-coded timeout value of 10 seconds
  2. Fixed sleep interval of 50ms
  3. No early exit on error conditions

Apply this diff to improve the implementation:

Status SyncMasterDB::ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr) {
    if (!coordinator_.GetISConsistency()) {
        return coordinator_.ProposeLog(cmd_ptr);
    }

    auto start = std::chrono::steady_clock::now();
+   const int kConsensusTimeoutSec = 10;
+   const int kSleepIntervalMs = 50;
    LogOffset offset;
    Status s = coordinator_.AppendEntries(cmd_ptr, offset);

    if (!s.ok()) {
        return s;
    }

-   while (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - start).count() < 10) {
+   while (std::chrono::duration_cast<std::chrono::seconds>(
+          std::chrono::steady_clock::now() - start).count() < kConsensusTimeoutSec) {
        if (checkFinished(offset)) {
            return Status::OK(); 
        }
+       s = coordinator_.ProcessCoordination();
+       if (!s.ok()) {
+           return s;
+       }
-       std::this_thread::sleep_for(std::chrono::milliseconds(50));
+       std::this_thread::sleep_for(std::chrono::milliseconds(kSleepIntervalMs));
    }

-   return Status::Timeout("No consistency achieved within 10 seconds");
+   return Status::Timeout("No consistency achieved within " + 
+                         std::to_string(kConsensusTimeoutSec) + " seconds");
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Status SyncMasterDB::ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr) {
return coordinator_.ProposeLog(cmd_ptr);
// If consistency is not required, directly propose the log without waiting for consensus
if (!coordinator_.GetISConsistency()) {
return coordinator_.ProposeLog(cmd_ptr);
}
auto start = std::chrono::steady_clock::now();
LogOffset offset;
Status s = coordinator_.AppendEntries(cmd_ptr, offset); // Append the log entry to the coordinator
if (!s.ok()) {
return s;
}
// Wait for consensus to be achieved within 10 seconds
while (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - start).count() < 10) {
// Check if consensus has been achieved for the given log offset
if (checkFinished(offset)) {
return Status::OK();
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
return Status::Timeout("No consistency achieved within 10 seconds");
}
Status SyncMasterDB::ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr) {
if (!coordinator_.GetISConsistency()) {
return coordinator_.ProposeLog(cmd_ptr);
}
auto start = std::chrono::steady_clock::now();
const int kConsensusTimeoutSec = 10;
const int kSleepIntervalMs = 50;
LogOffset offset;
Status s = coordinator_.AppendEntries(cmd_ptr, offset);
if (!s.ok()) {
return s;
}
while (std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - start).count() < kConsensusTimeoutSec) {
if (checkFinished(offset)) {
return Status::OK();
}
s = coordinator_.ProcessCoordination();
if (!s.ok()) {
return s;
}
std::this_thread::sleep_for(std::chrono::milliseconds(kSleepIntervalMs));
}
return Status::Timeout("No consistency achieved within " + std::to_string(kConsensusTimeoutSec) + " seconds");
}

src/pika_rm.cc Outdated
Comment on lines 231 to 258
Status SyncMasterDB::WakeUpSlaveBinlogSync() {
std::unordered_map<std::string, std::shared_ptr<SlaveNode>> slaves = GetAllSlaveNodes();
std::vector<std::shared_ptr<SlaveNode>> to_del;
for (auto& slave_iter : slaves) {
std::shared_ptr<SlaveNode> slave_ptr = slave_iter.second;
std::lock_guard l(slave_ptr->slave_mu);
if (slave_ptr->sent_offset == slave_ptr->acked_offset) {
Status s = ReadBinlogFileToWq(slave_ptr);
if (!s.ok()) {
to_del.push_back(slave_ptr);
LOG(WARNING) << "WakeUpSlaveBinlogSync falied, Delete from RM, slave: " << slave_ptr->ToStringStatus() << " "
<< s.ToString();
}
std::unordered_map<std::string, std::shared_ptr<SlaveNode>> slaves = GetAllSlaveNodes();
std::vector<std::shared_ptr<SlaveNode>> to_del;
for (auto& slave_iter : slaves) {
std::shared_ptr<SlaveNode> slave_ptr = slave_iter.second;
std::lock_guard l(slave_ptr->slave_mu);
if (slave_ptr->sent_offset == slave_ptr->acked_offset) {
Status s;
if (coordinator_.GetISConsistency()&&(slave_ptr->slave_state == SlaveState::kSlaveBinlogSync||slave_ptr->slave_state == SlaveState::KCandidate)) {
s = coordinator_.SendBinlog(slave_ptr, db_info_.db_name_);
} else {
s = ReadBinlogFileToWq(slave_ptr);
}
if (!s.ok()) {
to_del.push_back(slave_ptr);
LOG(WARNING) << "WakeUpSlaveBinlogSync failed, marking for deletion: "
<< slave_ptr->ToStringStatus() << " - " << s.ToString();
}
}
}
}
for (auto& to_del_slave : to_del) {
RemoveSlaveNode(to_del_slave->Ip(), to_del_slave->Port());
}
return Status::OK();

for (const auto& to_del_slave : to_del) {
RemoveSlaveNode(to_del_slave->Ip(), to_del_slave->Port());
LOG(INFO) << "Removed slave: " << to_del_slave->ToStringStatus();
}

return Status::OK();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve error handling in WakeUpSlaveBinlogSync.

The error handling could be enhanced:

  1. The method continues processing other slaves even after encountering errors
  2. Errors are only logged but not propagated to the caller

Apply this diff to improve error handling:

Status PikaReplicaManager::WakeUpSlaveBinlogSync() {
    std::unordered_map<std::string, std::shared_ptr<SlaveNode>> slaves = GetAllSlaveNodes();
    std::vector<std::shared_ptr<SlaveNode>> to_del;
+   Status final_status = Status::OK();
    for (auto& slave_iter : slaves) {
        std::shared_ptr<SlaveNode> slave_ptr = slave_iter.second;
        std::lock_guard l(slave_ptr->slave_mu);
        if (slave_ptr->sent_offset == slave_ptr->acked_offset) {
          Status s;
          if (coordinator_.GetISConsistency()&&(slave_ptr->slave_state == SlaveState::kSlaveBinlogSync||slave_ptr->slave_state == SlaveState::KCandidate)) {
            s = coordinator_.SendBinlog(slave_ptr, db_info_.db_name_);
          } else {
            s = ReadBinlogFileToWq(slave_ptr);
          }
          if (!s.ok()) {
            to_del.push_back(slave_ptr);
            LOG(WARNING) << "WakeUpSlaveBinlogSync failed, marking for deletion: "
                             << slave_ptr->ToStringStatus() << " - " << s.ToString();
+           final_status = s;
          }
        }
    }

    for (const auto& to_del_slave : to_del) {
        RemoveSlaveNode(to_del_slave->Ip(), to_del_slave->Port());
        LOG(INFO) << "Removed slave: " << to_del_slave->ToStringStatus();
    }

-   return Status::OK();
+   return final_status;
}

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +1 to +83
#!/bin/bash

# Kill all pika instances
echo "Stopping all Pika instances..."
pkill -f pika

# Wait for services to completely stop
sleep 2

# Clean up configuration files
echo "Cleaning up configuration files..."
rm -f pika_single.conf
rm -f pika_master.conf
rm -f pika_slave.conf
rm -f pika_rename.conf
rm -f pika_acl_both_password.conf
rm -f pika_acl_only_admin_password.conf
rm -f pika_has_other_acl_user.conf
rm -f pacifica_master.conf
rm -f pacifica_slave1.conf
rm -f pacifica_slave2.conf

# Clean up backup files
echo "Cleaning up backup files..."
rm -f *.conf.bak

# Clean up data directories
echo "Cleaning up data directories..."
rm -rf master_data
rm -rf slave_data
rm -rf rename_data
rm -rf acl1_data
rm -rf acl2_data
rm -rf acl3_data
rm -rf pacifica_test

# Clean up PacificA test directories
echo "Cleaning up PacificA test directories..."
rm -rf pacifica_test/master
rm -rf pacifica_test/slave1
rm -rf pacifica_test/slave2

# Clean up log directories
echo "Cleaning up log files..."
rm -rf */log/*
rm -rf log/*
rm -rf *.log

# Clean up dump files
echo "Cleaning up dump files..."
rm -rf */dump/*
rm -rf dump/*

# Clean up db files
echo "Cleaning up database files..."
rm -rf */db/*
rm -rf db/*

# Clean up pid files
echo "Cleaning up pid files..."
rm -rf *.pid

# Clean up dbsync files
echo "Cleaning up dbsync files..."
rm -rf */dbsync/*
rm -rf dbsync/*

# Verify cleanup
echo "Verifying cleanup..."
if pgrep -f pika > /dev/null; then
echo "Warning: Some Pika instances are still running"
pgrep -f pika
else
echo "All Pika instances have been stopped"
fi

if [ -d "pacifica_test" ] || [ -d "master_data" ] || [ -d "slave_data" ]; then
echo "Warning: Some data directories still exist"
else
echo "All data directories have been removed"
fi

echo "Cleanup completed"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance cleanup script robustness and safety.

While the script is well-structured, it could benefit from several improvements:

Apply this diff to make the script more robust:

 #!/bin/bash
+set -euo pipefail
+
+# Define cleanup directories
+CLEANUP_DIRS=(
+    "master_data"
+    "slave_data"
+    "rename_data"
+    "acl1_data"
+    "acl2_data"
+    "acl3_data"
+    "pacifica_test"
+)
+
+# Define cleanup patterns
+CLEANUP_PATTERNS=(
+    "*.conf"
+    "*.conf.bak"
+    "*.log"
+    "*.pid"
+)

 # Kill all pika instances
 echo "Stopping all pika instances..."
-pkill -f pika
+pkill -f pika || true
 
 # Wait for services to completely stop
 sleep 2
 
-# Clean up configuration files
-echo "Cleaning up configuration files..."
-rm -f pika_single.conf
-rm -f pika_master.conf
-# ... (other rm commands)
+# Clean up files
+echo "Cleaning up files..."
+for pattern in "${CLEANUP_PATTERNS[@]}"; do
+    find . -maxdepth 1 -name "$pattern" -type f -exec rm -f {} +
+done
 
-# Clean up data directories
-echo "Cleaning up data directories..."
-rm -rf master_data
-rm -rf slave_data
-# ... (other rm commands)
+# Clean up directories
+echo "Cleaning up directories..."
+for dir in "${CLEANUP_DIRS[@]}"; do
+    if [ -d "$dir" ]; then
+        rm -rf "$dir"
+    fi
+done
 
 # Verify cleanup
 echo "Verifying cleanup..."
-if pgrep -f pika > /dev/null; then
+PIKA_PROCS=$(pgrep -f pika || true)
+if [ -n "$PIKA_PROCS" ]; then
     echo "Warning: Some Pika instances are still running"
-    pgrep -f pika
+    echo "$PIKA_PROCS"
 else
     echo "All Pika instances have been stopped"
 fi
 
-if [ -d "pacifica_test" ] || [ -d "master_data" ] || [ -d "slave_data" ]; then
-    echo "Warning: Some data directories still exist"
+EXISTING_DIRS=()
+for dir in "${CLEANUP_DIRS[@]}"; do
+    if [ -d "$dir" ]; then
+        EXISTING_DIRS+=("$dir")
+    fi
+done
+
+if [ ${#EXISTING_DIRS[@]} -gt 0 ]; then
+    echo "Warning: Following directories still exist:"
+    printf '%s\n' "${EXISTING_DIRS[@]}"
 else
     echo "All data directories have been removed"
 fi
 
 echo "Cleanup completed"

Key improvements:

  1. Added error handling with set -euo pipefail
  2. Defined arrays for cleanup targets
  3. Used find for safer file cleanup
  4. Added more detailed verification
  5. Improved error messages
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#!/bin/bash
# Kill all pika instances
echo "Stopping all Pika instances..."
pkill -f pika
# Wait for services to completely stop
sleep 2
# Clean up configuration files
echo "Cleaning up configuration files..."
rm -f pika_single.conf
rm -f pika_master.conf
rm -f pika_slave.conf
rm -f pika_rename.conf
rm -f pika_acl_both_password.conf
rm -f pika_acl_only_admin_password.conf
rm -f pika_has_other_acl_user.conf
rm -f pacifica_master.conf
rm -f pacifica_slave1.conf
rm -f pacifica_slave2.conf
# Clean up backup files
echo "Cleaning up backup files..."
rm -f *.conf.bak
# Clean up data directories
echo "Cleaning up data directories..."
rm -rf master_data
rm -rf slave_data
rm -rf rename_data
rm -rf acl1_data
rm -rf acl2_data
rm -rf acl3_data
rm -rf pacifica_test
# Clean up PacificA test directories
echo "Cleaning up PacificA test directories..."
rm -rf pacifica_test/master
rm -rf pacifica_test/slave1
rm -rf pacifica_test/slave2
# Clean up log directories
echo "Cleaning up log files..."
rm -rf */log/*
rm -rf log/*
rm -rf *.log
# Clean up dump files
echo "Cleaning up dump files..."
rm -rf */dump/*
rm -rf dump/*
# Clean up db files
echo "Cleaning up database files..."
rm -rf */db/*
rm -rf db/*
# Clean up pid files
echo "Cleaning up pid files..."
rm -rf *.pid
# Clean up dbsync files
echo "Cleaning up dbsync files..."
rm -rf */dbsync/*
rm -rf dbsync/*
# Verify cleanup
echo "Verifying cleanup..."
if pgrep -f pika > /dev/null; then
echo "Warning: Some Pika instances are still running"
pgrep -f pika
else
echo "All Pika instances have been stopped"
fi
if [ -d "pacifica_test" ] || [ -d "master_data" ] || [ -d "slave_data" ]; then
echo "Warning: Some data directories still exist"
else
echo "All data directories have been removed"
fi
echo "Cleanup completed"
#!/bin/bash
set -euo pipefail
# Define cleanup directories
CLEANUP_DIRS=(
"master_data"
"slave_data"
"rename_data"
"acl1_data"
"acl2_data"
"acl3_data"
"pacifica_test"
)
# Define cleanup patterns
CLEANUP_PATTERNS=(
"*.conf"
"*.conf.bak"
"*.log"
"*.pid"
)
# Kill all pika instances
echo "Stopping all Pika instances..."
pkill -f pika || true
# Wait for services to completely stop
sleep 2
# Clean up files
echo "Cleaning up files..."
for pattern in "${CLEANUP_PATTERNS[@]}"; do
find . -maxdepth 1 -name "$pattern" -type f -exec rm -f {} +
done
# Clean up directories
echo "Cleaning up directories..."
for dir in "${CLEANUP_DIRS[@]}"; do
if [ -d "$dir" ]; then
rm -rf "$dir"
fi
done
# Verify cleanup
echo "Verifying cleanup..."
PIKA_PROCS=$(pgrep -f pika || true)
if [ -n "$PIKA_PROCS" ]; then
echo "Warning: Some Pika instances are still running"
echo "$PIKA_PROCS"
else
echo "All Pika instances have been stopped"
fi
EXISTING_DIRS=()
for dir in "${CLEANUP_DIRS[@]}"; do
if [ -d "$dir" ]; then
EXISTING_DIRS+=("$dir")
fi
done
if [ ${#EXISTING_DIRS[@]} -gt 0 ]; then
echo "Warning: Following directories still exist:"
printf '%s\n' "${EXISTING_DIRS[@]}"
else
echo "All data directories have been removed"
fi
echo "Cleanup completed"

Comment on lines +107 to +154
# 创建PacificA一致性测试的数据目录
mkdir -p pacifica_test/master
mkdir -p pacifica_test/slave1
mkdir -p pacifica_test/slave2

# 为PacificA测试创建配置文件
cp ../conf/pika.conf ./pacifica_master.conf
cp ../conf/pika.conf ./pacifica_slave1.conf
cp ../conf/pika.conf ./pacifica_slave2.conf

# 配置主节点
sed -i.bak \
-e 's|port : 9221|port : 9301|' \
-e 's|log-path : ./log/|log-path : ./pacifica_test/master/log/|' \
-e 's|db-path : ./db/|db-path : ./pacifica_test/master/db/|' \
-e 's|dump-path : ./dump/|dump-path : ./pacifica_test/master/dump/|' \
-e 's|pidfile : ./pika.pid|pidfile : ./pacifica_test/master/pika.pid|' \
-e 's|db-sync-path : ./dbsync/|db-sync-path : ./pacifica_test/master/dbsync/|' \
-e 's|#daemonize : yes|daemonize : yes|' ./pacifica_master.conf

# 配置从节点1
sed -i.bak \
-e 's|port : 9221|port : 9302|' \
-e 's|log-path : ./log/|log-path : ./pacifica_test/slave1/log/|' \
-e 's|db-path : ./db/|db-path : ./pacifica_test/slave1/db/|' \
-e 's|dump-path : ./dump/|dump-path : ./pacifica_test/slave1/dump/|' \
-e 's|pidfile : ./pika.pid|pidfile : ./pacifica_test/slave1/pika.pid|' \
-e 's|db-sync-path : ./dbsync/|db-sync-path : ./pacifica_test/slave1/dbsync/|' \
-e 's|#daemonize : yes|daemonize : yes|' ./pacifica_slave1.conf

# 配置从节点2
sed -i.bak \
-e 's|port : 9221|port : 9303|' \
-e 's|log-path : ./log/|log-path : ./pacifica_test/slave2/log/|' \
-e 's|db-path : ./db/|db-path : ./pacifica_test/slave2/db/|' \
-e 's|dump-path : ./dump/|dump-path : ./pacifica_test/slave2/dump/|' \
-e 's|pidfile : ./pika.pid|pidfile : ./pacifica_test/slave2/pika.pid|' \
-e 's|db-sync-path : ./dbsync/|db-sync-path : ./pacifica_test/slave2/dbsync/|' \
-e 's|#daemonize : yes|daemonize : yes|' ./pacifica_slave2.conf

# 启动PacificA测试节点
echo "启动PacificA一致性测试节点..."
./pika -c ./pacifica_master.conf
./pika -c ./pacifica_slave1.conf
./pika -c ./pacifica_slave2.conf

# 等待节点启动
sleep 10
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance PacificA test setup robustness and validation.

While the setup is functional, it could benefit from several improvements:

Apply this diff to enhance the setup:

+# PacificA test configuration
+PACIFICA_BASE_DIR="pacifica_test"
+PACIFICA_NODES=(
+    "master:9301"
+    "slave1:9302"
+    "slave2:9303"
+)
+
 # Create PacificA test directories
-mkdir -p pacifica_test/master
-mkdir -p pacifica_test/slave1
-mkdir -p pacifica_test/slave2
+echo "Creating PacificA test directories..."
+for node in "${PACIFICA_NODES[@]}"; do
+    node_name=${node%%:*}
+    node_dir="$PACIFICA_BASE_DIR/$node_name"
+    mkdir -p "$node_dir"
+    for subdir in log db dump dbsync; do
+        mkdir -p "$node_dir/$subdir"
+    done
+done
 
 # Create PacificA test config files
+echo "Creating PacificA test config files..."
 cp ../conf/pika.conf ./pacifica_master.conf
 cp ../conf/pika.conf ./pacifica_slave1.conf
 cp ../conf/pika.conf ./pacifica_slave2.conf
 
-# Configure master node
-sed -i.bak   \
-  -e 's|port : 9221|port : 9301|'  \
-  # ... (other sed commands)
+# Configure nodes
+for node in "${PACIFICA_NODES[@]}"; do
+    node_name=${node%%:*}
+    node_port=${node#*:}
+    config_file="./pacifica_${node_name}.conf"
+    
+    echo "Configuring $node_name node on port $node_port..."
+    sed -i.bak \
+        -e "s|port : 9221|port : $node_port|" \
+        -e "s|log-path : ./log/|log-path : ./$PACIFICA_BASE_DIR/$node_name/log/|" \
+        -e "s|db-path : ./db/|db-path : ./$PACIFICA_BASE_DIR/$node_name/db/|" \
+        -e "s|dump-path : ./dump/|dump-path : ./$PACIFICA_BASE_DIR/$node_name/dump/|" \
+        -e "s|pidfile : ./pika.pid|pidfile : ./$PACIFICA_BASE_DIR/$node_name/pika.pid|" \
+        -e "s|db-sync-path : ./dbsync/|db-sync-path : ./$PACIFICA_BASE_DIR/$node_name/dbsync/|" \
+        -e 's|#daemonize : yes|daemonize : yes|' \
+        "$config_file"
+done
 
 # Start PacificA test nodes
-echo "启动PacificA一致性测试节点..."
-./pika -c ./pacifica_master.conf
-./pika -c ./pacifica_slave1.conf
-./pika -c ./pacifica_slave2.conf
+echo "Starting PacificA consistency test nodes..."
+for node in "${PACIFICA_NODES[@]}"; do
+    node_name=${node%%:*}
+    config_file="./pacifica_${node_name}.conf"
+    
+    echo "Starting $node_name node..."
+    ./pika -c "$config_file"
+    
+    # Verify node startup
+    node_port=${node#*:}
+    timeout=30
+    while [ $timeout -gt 0 ]; do
+        if nc -z localhost "$node_port"; then
+            echo "$node_name node is up on port $node_port"
+            break
+        fi
+        sleep 1
+        ((timeout--))
+    done
+    
+    if [ $timeout -eq 0 ]; then
+        echo "Error: $node_name node failed to start on port $node_port"
+        exit 1
+    fi
+done
 
-# Wait for nodes to start
-sleep 10
+echo "All PacificA nodes are running"

Key improvements:

  1. Added node configuration array for better maintainability
  2. Added proper directory structure creation
  3. Improved configuration file handling
  4. Added startup verification for each node
  5. Added timeout-based waiting instead of fixed sleep
  6. Added error handling for node startup failures
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# 创建PacificA一致性测试的数据目录
mkdir -p pacifica_test/master
mkdir -p pacifica_test/slave1
mkdir -p pacifica_test/slave2
# 为PacificA测试创建配置文件
cp ../conf/pika.conf ./pacifica_master.conf
cp ../conf/pika.conf ./pacifica_slave1.conf
cp ../conf/pika.conf ./pacifica_slave2.conf
# 配置主节点
sed -i.bak \
-e 's|port : 9221|port : 9301|' \
-e 's|log-path : ./log/|log-path : ./pacifica_test/master/log/|' \
-e 's|db-path : ./db/|db-path : ./pacifica_test/master/db/|' \
-e 's|dump-path : ./dump/|dump-path : ./pacifica_test/master/dump/|' \
-e 's|pidfile : ./pika.pid|pidfile : ./pacifica_test/master/pika.pid|' \
-e 's|db-sync-path : ./dbsync/|db-sync-path : ./pacifica_test/master/dbsync/|' \
-e 's|#daemonize : yes|daemonize : yes|' ./pacifica_master.conf
# 配置从节点1
sed -i.bak \
-e 's|port : 9221|port : 9302|' \
-e 's|log-path : ./log/|log-path : ./pacifica_test/slave1/log/|' \
-e 's|db-path : ./db/|db-path : ./pacifica_test/slave1/db/|' \
-e 's|dump-path : ./dump/|dump-path : ./pacifica_test/slave1/dump/|' \
-e 's|pidfile : ./pika.pid|pidfile : ./pacifica_test/slave1/pika.pid|' \
-e 's|db-sync-path : ./dbsync/|db-sync-path : ./pacifica_test/slave1/dbsync/|' \
-e 's|#daemonize : yes|daemonize : yes|' ./pacifica_slave1.conf
# 配置从节点2
sed -i.bak \
-e 's|port : 9221|port : 9303|' \
-e 's|log-path : ./log/|log-path : ./pacifica_test/slave2/log/|' \
-e 's|db-path : ./db/|db-path : ./pacifica_test/slave2/db/|' \
-e 's|dump-path : ./dump/|dump-path : ./pacifica_test/slave2/dump/|' \
-e 's|pidfile : ./pika.pid|pidfile : ./pacifica_test/slave2/pika.pid|' \
-e 's|db-sync-path : ./dbsync/|db-sync-path : ./pacifica_test/slave2/dbsync/|' \
-e 's|#daemonize : yes|daemonize : yes|' ./pacifica_slave2.conf
# 启动PacificA测试节点
echo "启动PacificA一致性测试节点..."
./pika -c ./pacifica_master.conf
./pika -c ./pacifica_slave1.conf
./pika -c ./pacifica_slave2.conf
# 等待节点启动
sleep 10
# PacificA test configuration
PACIFICA_BASE_DIR="pacifica_test"
PACIFICA_NODES=(
"master:9301"
"slave1:9302"
"slave2:9303"
)
# Create PacificA test directories
echo "Creating PacificA test directories..."
for node in "${PACIFICA_NODES[@]}"; do
node_name=${node%%:*}
node_dir="$PACIFICA_BASE_DIR/$node_name"
mkdir -p "$node_dir"
for subdir in log db dump dbsync; do
mkdir -p "$node_dir/$subdir"
done
done
# Create PacificA test config files
echo "Creating PacificA test config files..."
cp ../conf/pika.conf ./pacifica_master.conf
cp ../conf/pika.conf ./pacifica_slave1.conf
cp ../conf/pika.conf ./pacifica_slave2.conf
# Configure nodes
for node in "${PACIFICA_NODES[@]}"; do
node_name=${node%%:*}
node_port=${node#*:}
config_file="./pacifica_${node_name}.conf"
echo "Configuring $node_name node on port $node_port..."
sed -i.bak \
-e "s|port : 9221|port : $node_port|" \
-e "s|log-path : ./log/|log-path : ./$PACIFICA_BASE_DIR/$node_name/log/|" \
-e "s|db-path : ./db/|db-path : ./$PACIFICA_BASE_DIR/$node_name/db/|" \
-e "s|dump-path : ./dump/|dump-path : ./$PACIFICA_BASE_DIR/$node_name/dump/|" \
-e "s|pidfile : ./pika.pid|pidfile : ./$PACIFICA_BASE_DIR/$node_name/pika.pid|" \
-e "s|db-sync-path : ./dbsync/|db-sync-path : ./$PACIFICA_BASE_DIR/$node_name/dbsync/|" \
-e 's|#daemonize : yes|daemonize : yes|' \
"$config_file"
done
# Start PacificA test nodes
echo "Starting PacificA consistency test nodes..."
for node in "${PACIFICA_NODES[@]}"; do
node_name=${node%%:*}
config_file="./pacifica_${node_name}.conf"
echo "Starting $node_name node..."
./pika -c "$config_file"
# Verify node startup
node_port=${node#*:}
timeout=30
while [ $timeout -gt 0 ]; do
if nc -z localhost "$node_port"; then
echo "$node_name node is up on port $node_port"
break
fi
sleep 1
((timeout--))
done
if [ $timeout -eq 0 ]; then
echo "Error: $node_name node failed to start on port $node_port"
exit 1
fi
done
echo "All PacificA nodes are running"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
✏️ Feature New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant