Skip to content

Commit

Permalink
feat:fix pipeline 关闭阻塞问题
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyao-w committed May 15, 2023
1 parent d16abf7 commit 171f042
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 67 deletions.
31 changes: 15 additions & 16 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"errors"
"io"
"time"

. "github.com/fuyao-w/common-util"
)

var (
Expand Down Expand Up @@ -57,7 +55,7 @@ func (r *Raft) BootstrapCluster(configuration ClusterInfo) defaultFuture {
clusterInfo: configuration,
}
future.init()
if r == nil || Ptr(r.state.Get()).String() == unknown {
if r == nil || r.GetState().String() == unknown {
future.fail(ErrNotStarted)
return future
}
Expand Down Expand Up @@ -89,7 +87,7 @@ func (r *Raft) apiApplyLog(entry *LogEntry, timeout time.Duration) ApplyFuture {
log: entry,
}
applyFuture.init()
if r == nil || Ptr(r.state.Get()).String() == unknown {
if r == nil || r.GetState().String() == unknown {
applyFuture.fail(ErrNotStarted)
return applyFuture
}
Expand All @@ -108,7 +106,7 @@ func (r *Raft) apiApplyLog(entry *LogEntry, timeout time.Duration) ApplyFuture {
func (r *Raft) VerifyLeader() Future[bool] {
vf := &verifyFuture{}
vf.init()
if r == nil || Ptr(r.state.Get()).String() == unknown {
if r == nil || r.GetState().String() == unknown {
vf.fail(ErrNotStarted)
return vf
}
Expand All @@ -135,7 +133,7 @@ func (r *Raft) requestClusterChange(req clusterChangeRequest, timeout time.Durat
req: &req,
}
ccf.init()
if r == nil || Ptr(r.state.Get()).String() == unknown {
if r == nil || r.GetState().String() == unknown {
ccf.fail(ErrNotStarted)
return ccf
}
Expand Down Expand Up @@ -175,7 +173,7 @@ func (r *Raft) UpdateServer(peer ServerInfo, prevIndex uint64, timeout time.Dura
func (r *Raft) SnapShot() Future[OpenSnapShot] {
fu := &apiSnapshotFuture{}
fu.init()
if r == nil || Ptr(r.state.Get()).String() == unknown {
if r == nil || r.GetState().String() == unknown {
fu.fail(ErrNotStarted)
return fu
}
Expand Down Expand Up @@ -204,15 +202,16 @@ func (r *Raft) LastApplied() uint64 {
return r.getLastApplied()
}

func (r *Raft) LeaderTransfer(id ServerID, address ServerAddr) defaultFuture {
func (r *Raft) LeaderTransfer(id ServerID, address ServerAddr, timeout time.Duration) defaultFuture {
tm := genTimeoutCh(timeout)
future := &leadershipTransferFuture{
Peer: ServerInfo{
ID: id,
Addr: address,
},
}
future.init()
if r == nil || Ptr(r.state.Get()).String() == unknown {
if r == nil || r.GetState().String() == unknown {
future.fail(ErrNotStarted)
return future
}
Expand All @@ -225,15 +224,15 @@ func (r *Raft) LeaderTransfer(id ServerID, address ServerAddr) defaultFuture {
return future
case <-r.shutDown.C:
return &errFuture[nilRespFuture]{ErrShutDown}
default:
case <-tm:
return &errFuture[nilRespFuture]{ErrEnqueueTimeout}
}
}

func (r *Raft) ReloadConfig(rc ReloadableConfig) error {
r.confReloadMu.Lock()
defer r.confReloadMu.Unlock()
if r == nil || Ptr(r.state.Get()).String() == unknown {
if r == nil || r.GetState().String() == unknown {
return ErrNotStarted
}
oldConf := *r.Conf()
Expand All @@ -260,7 +259,7 @@ func (r *Raft) ReStoreSnapshot(meta *SnapShotMeta, reader io.ReadCloser) error {
reader: reader,
}
fu.init()
if r == nil || Ptr(r.state.Get()).String() == unknown {
if r == nil || r.GetState().String() == unknown {
return ErrNotStarted
}
select {
Expand All @@ -279,7 +278,7 @@ func (r *Raft) ReStoreSnapshot(meta *SnapShotMeta, reader io.ReadCloser) error {

func (r *Raft) ShutDown() defaultFuture {
var resp *shutDownFuture
if r == nil || Ptr(r.state.Get()).String() == unknown {
if r == nil || r.GetState().String() == unknown {
fu := new(defaultDeferResponse)
fu.init()
fu.fail(ErrNotStarted)
Expand All @@ -298,7 +297,7 @@ func (r *Raft) ShutDown() defaultFuture {
func (r *Raft) RaftState() Future[string] {
fu := new(deferResponse[string])
fu.init()
if r == nil || Ptr(r.state.Get()).String() == unknown {
if r == nil || r.GetState().String() == unknown {
fu.fail(ErrNotStarted)
return fu
}
Expand All @@ -313,7 +312,7 @@ func (r *Raft) ReadIndex(timeout time.Duration) Future[uint64] {
tm := genTimeoutCh(timeout)
fu := new(deferResponse[uint64])
fu.init()
if r == nil || Ptr(r.state.Get()).String() == unknown {
if r == nil || r.GetState().String() == unknown {
fu.fail(ErrNotStarted)
return fu
}
Expand All @@ -334,7 +333,7 @@ func (r *Raft) Barrier(readIndex uint64, timeout time.Duration) Future[uint64] {
readIndex: readIndex,
}
fu.init()
if r == nil || Ptr(r.state.Get()).String() == unknown {
if r == nil || r.GetState().String() == unknown {
fu.fail(ErrNotStarted)
return fu
}
Expand Down
8 changes: 4 additions & 4 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (r *Raft) processCommand(cmd *command) {
if !ok {
panic(fmt.Sprintf("command type :%d not register", cmd.enum))
}
state := r.state.Get()
state := r.GetState()
f, ok := cc[state]
if ok {
f(r, cmd.callback)
Expand Down Expand Up @@ -379,7 +379,7 @@ func (r *Raft) processReloadConfig(item interface{}) {
oldConf = item.(*Config)
newConf = r.Conf()
)
switch r.state.Get() {
switch r.GetState() {
case Follower:
if oldConf.HeartbeatTimeout != newConf.HeartbeatTimeout {
r.heartbeatTimeout = time.After(0)
Expand All @@ -396,7 +396,7 @@ func (r *Raft) processReloadConfig(item interface{}) {
r.electionTimeout = randomTimeout(newConf.ElectionTimeout)
}
default:
panic(fmt.Errorf("except state :%d ", r.state.Get()))
panic(fmt.Errorf("except state :%d ", r.GetState()))
}
}

Expand Down Expand Up @@ -482,7 +482,7 @@ func (r *Raft) processRaftStats(item interface{}) {
"leader_id": leader.ID,
"leader_addr": leader.Addr,
}
switch r.state.Get() {
switch r.GetState() {
case Leader:
var (
lastContact = make(map[ServerID]string, len(r.leaderState.replicate))
Expand Down
17 changes: 9 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,6 @@ func (r *Raft) processAppendEntry(req *AppendEntryRequest, cmd *RPC) {
var (
succ bool
)

defer func() {
cmd.Response <- &AppendEntryResponse{
RPCHeader: r.buildRPCHeader(),
Expand Down Expand Up @@ -648,7 +647,7 @@ func (r *Raft) processVote(req *VoteRequest, cmd *RPC) {
}
}
func (r *Raft) cycle(state State, f func() (stop bool)) {
for r.state.Get() == state && !f() {
for r.GetState() == state && !f() {
}
}

Expand Down Expand Up @@ -873,13 +872,13 @@ func (r *Raft) reloadReplication() {
fr := &replication{
nextIndex: nextIndex,
peer: NewLockItem(server),
stop: make(chan bool),
stop: make(chan bool, 1),
heartBeatStop: make(chan struct{}),
heartBeatDone: make(chan struct{}),
notifyCh: make(chan struct{}),
notifyCh: make(chan struct{}, 1),
done: make(chan struct{}),
trigger: make(chan *defaultDeferResponse),
lastContact: NewLockItem[time.Time](),
trigger: make(chan *defaultDeferResponse, 1),
lastContact: NewLockItem(time.Now()),
notify: NewLockItem(map[*verifyFuture]struct{}{}),
}
r.leaderState.replicate[server.ID] = fr
Expand All @@ -888,6 +887,8 @@ func (r *Raft) reloadReplication() {
func() { r.heartBeat(fr) },
func() { r.replicate(fr) },
)
// 立即触发心跳,防止领导权检查下台
asyncNotify(fr.notifyCh)
}
// 删除已经不在集群的跟随者线程
for _, repl := range r.leaderState.replicate {
Expand Down Expand Up @@ -1009,7 +1010,7 @@ func (r *Raft) processLeaderCommit() {
r.setCommitIndex(newCommitIndex)

if r.cluster.latestIndex > oldCommitIndex && r.cluster.latestIndex <= newCommitIndex {
r.logger.Infof("cluster stable")
r.logger.Info("cluster stable ,old :", oldCommitIndex, " new:", newCommitIndex)
r.setCommitConfiguration(r.cluster.latestIndex, r.cluster.latest)
if !r.canVote(r.localInfo.ID) { // 集群配置提交后,如果当前领导人不在集群内,则下台
stepDown = true
Expand Down Expand Up @@ -1114,11 +1115,11 @@ func (r *Raft) cycleLeader() {
r.setupLeaderState()
r.reloadReplication()
defer func() {
r.logger.Info("leave leader", r.localInfo.ID)
r.stopReplication()
r.clearLeaderState()
// 因为会把 LastContact 返回给 API 调用者,这里更新下时间避免跟随者状态获取的时间太旧
r.setLastContact()
r.logger.Info("leave leader", r.localInfo.ID)
}()
// 提交一个空日志,用于确认 commitIndex
future := &LogFuture{log: &LogEntry{Type: LogNoop}}
Expand Down
5 changes: 5 additions & 0 deletions mem_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package papillon
import (
"container/list"
"errors"
"fmt"
"io"
"sync"
"time"
Expand Down Expand Up @@ -178,6 +179,10 @@ func (m *memRPC) doRpc(cmdType rpcType, peer *memRPC, request interface{}, reade
case peer.consumerCh <- cmd:
timeout = time.Now().Sub(now)
case <-time.After(timeout):
if cmdType == CmdAppendEntry {
fmt.Println("time out------")
}
return nil, errors.New("time out")
}

select {
Expand Down
12 changes: 8 additions & 4 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ type (
StateChange struct {
Before, After State
}
// readOnly 跟踪只读请求,确保状态机已经应用完制定索引,用于实现线性一致性读
readOnly struct {
notifySet map[*readOnlyFuture]struct{}
request chan *readOnlyFuture
notifySet map[*readOnlyFuture]struct{} // 待响应的只读请求,只由状态机线程处理
request chan *readOnlyFuture // 只读请求
}
)

Expand All @@ -105,9 +106,12 @@ type (
}
)

// observe 将请求添加进 pending 集合
func (r *readOnly) observe(future *readOnlyFuture) {
r.notifySet[future] = struct{}{}
}

// notify 回调集合中达到索引位置的请求
func (r *readOnly) notify(index uint64) {
for future := range r.notifySet {
if future.readIndex <= index {
Expand Down Expand Up @@ -256,8 +260,8 @@ func (r *Raft) runState() {
}

func (r *Raft) setState(state State) {
before := r.state.Get()
r.state.set(state)
before := r.GetState()
r._setState(state)
overrideNotify(r.stateChangeCh, &StateChange{
Before: before,
After: state,
Expand Down
29 changes: 24 additions & 5 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestRaft(t *testing.T) {
http.Handle("/add_peer", &reloadPeerHandle{raftList})
http.Handle("/raft_state", &raftGetHandle{raftList})
http.Handle("/read_only", &readOnlyHandle{raftList})
http.Handle("/read_index", &readIndexHandle{raftList})
http.ListenAndServe("localhost:8080", nil)
}

Expand Down Expand Up @@ -130,7 +131,7 @@ type (

func (g *leaderTransferHandle) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
raft := getLeader(g.raftList...)
fu := raft.LeaderTransfer("", "")
fu := raft.LeaderTransfer("", "", 0)
_, err := fu.Response()
if err == nil {
writer.Write([]byte("succ"))
Expand Down Expand Up @@ -159,7 +160,7 @@ func (s *setHandle) ServeHTTP(writer http.ResponseWriter, request *http.Request)
fu := raft.Apply(kvSchema{}.encode(key, value), time.Second)
_, err := fu.Response()
if err != nil {
writer.Write([]byte("fail" + err.Error()))
writer.Write([]byte("fail set handle " + err.Error()))
} else {
writer.Write([]byte("succ"))
}
Expand All @@ -175,7 +176,7 @@ func (s *verifyHandle) ServeHTTP(writer http.ResponseWriter, request *http.Reque
_, err := fu.Response()

if err != nil {
writer.Write([]byte("fail" + err.Error()))
writer.Write([]byte("fail" + err.Error() + " " + cast.ToString(idx)))
} else {
writer.Write([]byte("succ"))
}
Expand Down Expand Up @@ -272,6 +273,20 @@ func (s *raftGetHandle) ServeHTTP(writer http.ResponseWriter, request *http.Requ
stat, _ := fu.Response()
writer.Write([]byte(stat))
}
func (s *readIndexHandle) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
idx := cast.ToInt(request.URL.Query().Get("idx"))
if idx < 0 || idx >= len(s.raftList) {
writer.Write([]byte("param err"))
return
}
fu := s.raftList[idx].ReadIndex(0)
ri, err := fu.Response()
if err != nil {
writer.Write([]byte(err.Error()))
} else {
writer.Write([]byte(cast.ToString(ri)))
}
}
func (s *userRestoreSnapshotHandle) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
//raft := getLeader(s.raftList...)
//snapshot := raft.snapShotStore
Expand Down Expand Up @@ -303,6 +318,10 @@ func (s *readOnlyHandle) ServeHTTP(writer http.ResponseWriter, request *http.Req
idx := cast.ToUint64(request.URL.Query().Get("idx"))
readIndex := cast.ToUint64(request.URL.Query().Get("index"))
fu := s.raftList[idx].Barrier(readIndex, 0)
i, _ := fu.Response()
writer.Write([]byte(cast.ToString(i)))
i, err := fu.Response()
if err != nil {
writer.Write([]byte(err.Error()))
} else {
writer.Write([]byte(cast.ToString(i)))
}
}
Loading

0 comments on commit 171f042

Please sign in to comment.