Skip to content

Commit

Permalink
feat:bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyao-w committed May 6, 2023
1 parent aaf7af8 commit cf5a13f
Show file tree
Hide file tree
Showing 15 changed files with 1,767 additions and 1,630 deletions.
25 changes: 9 additions & 16 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (r *Raft) BootstrapCluster(configuration Configuration) defaultFuture {
select {
case <-r.shutDown.C:
future.fail(ErrShutDown)
case r.commandCh <- &command{typ: commandBootstrap, item: future}:
case r.commandCh <- &command{enum: commandBootstrap, callback: future}:
}
return future
}
Expand Down Expand Up @@ -77,11 +77,7 @@ func (r *Raft) apiApplyLog(entry *LogEntry, timeout time.Duration) ApplyFuture {
case <-r.shutDown.C:
return &errFuture[nilRespFuture]{ErrShutDown}
case r.apiLogApplyCh <- applyFuture: //batch apply
return applyFuture
case r.commandCh <- &command{ // 正常提交
typ: commandLogApply,
item: applyFuture,
}:
case r.commandCh <- &command{enum: commandLogApply, callback: applyFuture}: // 正常提交
}
return applyFuture
}
Expand All @@ -94,7 +90,7 @@ func (r *Raft) VerifyLeader() Future[bool] {
case <-r.shutDown.C:
vf.fail(ErrShutDown)
return &vf.deferResponse
case r.commandCh <- &command{typ: commandVerifyLeader, item: vf}:
case r.commandCh <- &command{enum: commandVerifyLeader, callback: vf}:
return &vf.deferResponse
}
}
Expand All @@ -118,7 +114,7 @@ func (r *Raft) requestClusterChange(req configurationChangeRequest, timeout time
return &errFuture[nilRespFuture]{err: errors.New("apply log time out")}
case <-r.shutDown.C:
return &errFuture[nilRespFuture]{err: ErrShutDown}
case r.commandCh <- &command{typ: commandClusterChange, item: ccf}:
case r.commandCh <- &command{enum: commandClusterChange, callback: ccf}:
return ccf
}
}
Expand Down Expand Up @@ -157,7 +153,8 @@ func (r *Raft) SnapShot() Future[OpenSnapShot] {
}
}

func (r *Raft) StateCh() <-chan State {
// StateCh 状态变化的通知
func (r *Raft) StateCh() <-chan *StateChange {
return r.stateChangeCh
}

Expand Down Expand Up @@ -186,7 +183,7 @@ func (r *Raft) LeaderTransfer(id ServerID, address ServerAddr) defaultFuture {
}
future.init()
select {
case r.commandCh <- &command{typ: commandLeadershipTransfer, item: future}:
case r.commandCh <- &command{enum: commandLeadershipTransfer, callback: future}:
return future
case <-r.shutDown.C:
return &errFuture[nilRespFuture]{ErrShutDown}
Expand All @@ -211,11 +208,7 @@ func (r *Raft) ReloadConfig(rc ReloadableConfig) error {
select {
case <-r.shutDown.C:
return ErrShutDown
case r.commandCh <- &command{
typ: commandConfigReload,
item: newConf,
}:

case r.commandCh <- &command{enum: commandConfigReload, callback: &newConf}:
}
return nil
}
Expand All @@ -227,7 +220,7 @@ func (r *Raft) ReStoreSnapshot(meta *SnapShotMeta, reader io.ReadCloser) error {
}
fu.init()
select {
case r.commandCh <- &command{typ: commandSnapshotRestore, item: fu}:
case r.commandCh <- &command{enum: commandSnapshotRestore, callback: fu}:
case <-r.shutDown.C:
return ErrShutDown
}
Expand Down
88 changes: 64 additions & 24 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,25 @@ import (
)

type (
commandTyp int
command struct {
typ commandTyp
item interface{}
reject interface {
reject(state State)
}
commandMap map[commandTyp]map[State]func(*Raft, interface{})

commandEnum int
command struct {
enum commandEnum
callback reject
}
commandMap map[commandEnum]map[State]func(*Raft, any)
)

// testFuture 测试用
type testFuture struct {
deferResponse[string]
}

const (
commandTest commandTyp = iota + 1 // 用于单测
commandTest commandEnum = iota + 1 // 用于单测
commandClusterGet
commandBootstrap
commandLogApply
Expand Down Expand Up @@ -70,18 +76,53 @@ func init() {
},
}
}
func (d *deferResponse[_]) reject(state State) {
if state == ShutDown {
d.fail(ErrShutDown)
return
}
d.fail(fmt.Errorf("current state %s can't process", state.String()))
}

func (c *Config) reject(State) {}

// processCommand 处理 raft 的命令,需要在 channelCommand 注册 commandEnum + 对应 State 的处理函数,
// 如果该请求当前没有对应的 State 处理函数,则会强制转成 reject interface 执行拒绝回调
// 如果未在 channelCommand 注册,则会直接 panic
func (r *Raft) processCommand(cmd *command) {
cc, ok := channelCommand[cmd.typ]
cc, ok := channelCommand[cmd.enum]
if !ok {
panic(fmt.Sprintf("command type :%d not register", cmd.typ))
panic(fmt.Sprintf("command type :%d not register", cmd.enum))
}
state := r.state.Get()
f, ok := cc[state]
if ok {
f(r, cmd.item)
f(r, cmd.callback)
} else {
cmd.item.(reject).reject(state)
cmd.callback.reject(state)
}
}

// processHeartBeatTimeout 处理心跳超时判断,只在 cycleFollower 中调用!
func (r *Raft) processHeartBeatTimeout(warn func(...interface{})) {
tm := r.Conf().HeartbeatTimeout
r.heartbeatTimeout = randomTimeout(tm)
if time.Now().Before(r.getLastContact().Add(tm)) {
return
}
oldLeaderInfo := r.leaderInfo.Get()
r.clearLeaderInfo() // 必须清掉,否则在投票时候选人会被拒绝
// 区分下不能开始选举的原因
switch {
case r.cluster.latestIndex == 0:
warn("no cluster members")
case r.cluster.stable() && !r.canVote(r.localInfo.ID):
warn("no part of stable Configuration, aborting election")
case r.canVote(r.localInfo.ID):
warn("heartbeat abortCh reached, starting election", "last-leader-addr", oldLeaderInfo.Addr, "last-leader-id", oldLeaderInfo.ID)
r.setCandidate()
default:
warn("heartbeat abortCh reached, not part of a stable Configuration or a non-voter, not triggering a leader election")
}
}

Expand Down Expand Up @@ -115,7 +156,7 @@ BREAK:
break BREAK
}
}
r.applyLog(futures)
r.applyLog(futures...)
}

// processBootstrap 引导集群启动,节点必须是干净的(日志、快照、任期都是 0 )
Expand Down Expand Up @@ -169,7 +210,7 @@ func (r *Raft) processSnapshotRestore(item interface{}) {
fu.fail(ErrLeadershipTransferInProgress)
return
}
if r.cluster.commitIndex != r.cluster.latestIndex {
if !r.cluster.stable() {
fu.fail(fmt.Errorf("cannot restore snapshot now, wait until the configuration entry at %v has been applied (have applied %v)",
r.cluster.latestIndex, r.cluster.commitIndex))
return
Expand All @@ -179,7 +220,8 @@ func (r *Raft) processSnapshotRestore(item interface{}) {
r.leaderState.inflight.Remove(e)
}
index := r.getLatestIndex() + 1
sink, err := r.snapshotStore.Create(SnapShotVersionDefault, index, r.getCurrentTerm(), r.cluster.latest, r.cluster.latestIndex, nil)
sink, err := r.snapshotStore.Create(SnapShotVersionDefault, index, r.getCurrentTerm(),
r.cluster.commit, r.cluster.commitIndex, r.rpc)
if err != nil {
fu.fail(err)
return
Expand Down Expand Up @@ -226,13 +268,12 @@ func (r *Raft) pickLatestPeer() *replication {
var (
latest *replication
latestIndex uint64
rep = r.leaderState.replicate
)
for _, info := range r.getLatestConfiguration() {
if !info.isVoter() {
continue
}
fr, ok := rep[info.ID]
fr, ok := r.leaderState.replicate[info.ID]
if !ok {
continue
}
Expand All @@ -241,7 +282,6 @@ func (r *Raft) pickLatestPeer() *replication {
latest = fr
}
}

return latest
}

Expand All @@ -266,7 +306,7 @@ func (r *Raft) leadershipTransfer(fr *replication) error {
}
}
}
if i >= rounds {
if i > rounds {
return errors.New("reach the maximum number of catch-up rounds")
}
resp, err := r.rpc.FastTimeout(Ptr(fr.peer.Get()), &FastTimeoutRequest{
Expand Down Expand Up @@ -348,6 +388,7 @@ func (r *Raft) processVerifyLeader(item interface{}) {
fu.voteGranted = 0
fu.reportOnce = new(sync.Once)
fu.stepDown = r.leaderState.stepDown
// 先投自己一票
fu.vote(true)
for _, repl := range r.leaderState.replicate {
repl.observe(fu)
Expand All @@ -360,15 +401,15 @@ func (r *Raft) processClusterChange(item interface{}) {
var (
fu = item.(*configurationChangeFuture)
)
if r.cluster.commitIndex != r.cluster.latestIndex {
fu.fail(errors.New("no stable configuration"))
if !r.cluster.stable() {
fu.fail(errors.New("no stable cluster"))
return
}
if r.leaderState.getLeadershipTransfer() {
fu.fail(ErrLeadershipTransferInProgress)
return
}
if r.cluster.latestIndex != fu.req.pervIndex {
if fu.req.pervIndex > 0 && r.cluster.latestIndex != fu.req.pervIndex {
fu.fail(errors.New("configuration index not match"))
return
}
Expand All @@ -384,10 +425,9 @@ func (r *Raft) processClusterChange(item interface{}) {
},
}
logFu.init()
r.applyLog([]*LogFuture{logFu})

r.cluster.setLatest(logFu.Index(), newConfiguration)
r.applyLog(logFu)
r.setLatestConfiguration(logFu.Index(), newConfiguration)
r.onConfigurationUpdate()
r.reloadReplication()

fu.success()
}
5 changes: 4 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (
)

const (
minCheckInterval = 10 * time.Millisecond
minCheckInterval = 10 * time.Millisecond
minHeartBeatInterval = time.Millisecond * 10
)

type Config struct {
Expand All @@ -22,7 +23,9 @@ type Config struct {
Logger Logger
LocalID string
LeadershipCatchUpRounds uint
LeadershipLostShutDown bool
}

type ReloadableConfig struct {
TrailingLogs uint64
SnapshotInterval time.Duration
Expand Down
4 changes: 4 additions & 0 deletions configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ func (c *cluster) Clone() cluster {
}
}

// stable 集群配置是否处在变更阶段
func (c *cluster) stable() bool {
return c.commitIndex > 0 && c.latestIndex == c.commitIndex
}
func DecodeConfiguration(data []byte) (c Configuration) {
if err := json.Unmarshal(data, &c); err != nil {
panic(fmt.Errorf("failed to decode Configuration: %s ,%s", err, data))
Expand Down
10 changes: 5 additions & 5 deletions fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import "io"

type FSM interface {
Apply(*LogEntry) interface{}
ReStore(reader io.ReadCloser) error
ReStore(reader io.ReadCloser) error // 从快照恢复,需要自行实现觅等
Snapshot() (FsmSnapshot, error)
}

Expand Down Expand Up @@ -78,7 +78,7 @@ func (r *Raft) runFSM() {
}
snapshot, err := r.fsm.Snapshot()
if err != nil {
r.logger.Errorf("")
r.logger.Errorf("fsm generate snap shot err :%s", err)
}
fu.responded(&SnapShotFutureResp{
term: lastAppliedTerm,
Expand All @@ -105,10 +105,10 @@ func (r *Raft) runFSM() {
}
case fu := <-r.fsmRestoreCh:
meta, err := r.recoverSnapshotByID(fu.ID)
lastAppliedIdx = meta.Index
lastAppliedTerm = meta.Term
if err == nil {
lastAppliedIdx, lastAppliedTerm = meta.Index, meta.Term
}
fu.responded(nil, err)

case fu := <-r.fsmSnapshotCh:
snapshot(fu)
}
Expand Down
13 changes: 0 additions & 13 deletions future.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package papillon

import (
"errors"
"fmt"
common_util "github.com/fuyao-w/common-util"
"io"
"sync"
Expand Down Expand Up @@ -30,10 +29,6 @@ type defaultFuture = Future[nilRespFuture]

type defaultDeferResponse = deferResponse[nilRespFuture]

type reject interface {
reject(state State)
}

type deferResponse[T any] struct {
err error
once *sync.Once
Expand All @@ -42,14 +37,6 @@ type deferResponse[T any] struct {
ShutdownCh <-chan struct{}
}

func (d *deferResponse[_]) reject(state State) {
if state == ShutDown {
d.fail(ErrShutDown)
return
}
d.fail(fmt.Errorf("current state %s can't process", state.String()))
}

func (d *deferResponse[_]) init() {
d.errCh = make(chan error, 1)
d.once = new(sync.Once)
Expand Down
Loading

0 comments on commit cf5a13f

Please sign in to comment.