Skip to content

Commit

Permalink
redist
Browse files Browse the repository at this point in the history
  • Loading branch information
arjunsk committed Feb 25, 2024
1 parent 35a7004 commit 206a6b1
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 2 deletions.
2 changes: 1 addition & 1 deletion cmd/ba_storageserv/d_redist/redist_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (m *Manager) Start() {
continue
} else if status == redistst.StatsAbort {
// resume from next mshard id
rb.RestoretSnapShotState(st)
rb.RestoreSnapShotState(st)
mshardid = st.GetMShardId()
if mshardid != 0 {
mshardid++
Expand Down
49 changes: 48 additions & 1 deletion cmd/ba_storageserv/d_redist/replicator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package redist

import (
"errors"
"github.com/golang/glog"
proto "junodb_lite/pkg/ac_proto"
shard "junodb_lite/pkg/b_shard"
etcd "junodb_lite/pkg/c_etcd"
io "junodb_lite/pkg/y_conn_mgr"
Expand Down Expand Up @@ -37,6 +40,50 @@ func (r *Replicator) GetShardId() shard.ID {
return r.shardId
}

func (r *Replicator) RestoretSnapShotState(s *redistst.Stats) {
func (r *Replicator) RestoreSnapShotState(s *redistst.Stats) {
r.snapshotStats.Restore(s)
}

func (r *Replicator) SendRequest(msg *proto.RawMessage, params ...bool) error {
glog.Infof("redist:ReplicateRequest: proc=%v, rb=%v", r.processor, r)
if r.processor == nil {
return errors.New("outbound processor is not available")
}

//default
realtime := true
cntOnFailure := true
if len(params) > 0 {
realtime = params[0]
}

if len(params) > 1 {
cntOnFailure = params[1]
}

var stats *redistst.Stats = &r.snapshotStats
if realtime {
stats = &r.realtimeStats
}

reqctx := NewRedistRequestContext(msg, r.processor.GetRequestCh(), stats)
var err error

if realtime {
err = r.processor.SendRequest(reqctx)
} else {
//err = r.processor.SendRequestLowPriority(reqctx)
}

if err == nil {
//stats.IncreaseTotalCnt()
return nil
}

// forwarding queue is full or not ready
if cntOnFailure {
//stats.IncreaseTotalCnt()
//stats.IncreaseDropCnt()
}
return errors.New("Forwarding queue is either full or not ready, drop req")
}
128 changes: 128 additions & 0 deletions cmd/ba_storageserv/d_redist/req_ctx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package redist

import (
"context"
"io"
proto "junodb_lite/pkg/ac_proto"
. "junodb_lite/pkg/y_conn_mgr"
redistst "junodb_lite/pkg/y_stats/redist"
util "junodb_lite/pkg/y_util"
"time"
)

type RedistRequestContext struct {
util.QueItemBase
message proto.RawMessage
retry_cnt uint16
timeReceived time.Time
reqCh chan IRequestContext // channel for retry
stats *redistst.Stats
}

func NewRedistRequestContext(msg *proto.RawMessage,
reqCh chan IRequestContext, stats *redistst.Stats) *RedistRequestContext {
r := &RedistRequestContext{
retry_cnt: 0,
timeReceived: time.Now(),
reqCh: reqCh,
stats: stats,
}
//r.SetQueTimeout(RedistConfig.RedistRespTimeout.Duration)
//r.message.DeepCopy(msg)
return r
}

func (r *RedistRequestContext) OnCleanup() {
//TODO implement me
panic("implement me")
}

func (r *RedistRequestContext) OnExpiration() {
//TODO implement me
panic("implement me")
}

func (r *RedistRequestContext) Deadline() (deadline time.Time) {
//TODO implement me
panic("implement me")
}

func (r *RedistRequestContext) ResetDeadline() {
//TODO implement me
panic("implement me")
}

func (r *RedistRequestContext) SetId(id uint32) {
//TODO implement me
panic("implement me")
}

func (r *RedistRequestContext) GetId() uint32 {
//TODO implement me
panic("implement me")
}

func (r *RedistRequestContext) SetInUse(flag bool) {
//TODO implement me
panic("implement me")
}

func (r *RedistRequestContext) SetQueTimeout(t time.Duration) {
//TODO implement me
panic("implement me")
}

func (r *RedistRequestContext) GetQueTimeout() (t time.Duration) {
//TODO implement me
panic("implement me")
}

func (r *RedistRequestContext) IsInUse() bool {
//TODO implement me
panic("implement me")
}

func (r *RedistRequestContext) GetMessage() *proto.RawMessage {
//TODO implement me
panic("implement me")
}

func (r *RedistRequestContext) GetCtx() context.Context {
//TODO implement me
panic("implement me")
}

func (r *RedistRequestContext) Cancel() {
//TODO implement me
panic("implement me")
}

func (r *RedistRequestContext) Read(re io.Reader) (n int, err error) {
//TODO implement me
panic("implement me")
}

func (r *RedistRequestContext) WriteWithOpaque(opaque uint32, w io.Writer) (n int, err error) {
//TODO implement me
panic("implement me")
}

func (r *RedistRequestContext) Reply(resp IResponseContext) {
//TODO implement me
panic("implement me")
}

func (r *RedistRequestContext) OnComplete() {
//TODO implement me
panic("implement me")
}

func (r *RedistRequestContext) GetReceiveTime() time.Time {
//TODO implement me
panic("implement me")
}

func (r *RedistRequestContext) SetTimeout(parent context.Context, duration time.Duration) {
//TODO implement me
panic("implement me")
}
4 changes: 4 additions & 0 deletions pkg/y_conn_mgr/da_outboundprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ func (p *OutboundProcessor) connect(connCh chan *OutboundConnector, id int, conn
}
}

func (p *OutboundProcessor) GetRequestCh() chan IRequestContext {
return p.reqCh
}

func NewOutboundProcessor(connInfo string, config *OutboundConfig, enableBounce bool) *OutboundProcessor {
return NewOutbProcessor(ServiceEndpoint{Addr: connInfo}, config, enableBounce)
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/y_util/ring_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,10 @@ type RingBuffer struct {
func (rb *RingBuffer) CleanAll() {

}

type QueItemBase struct {
id uint32
flag uint32
timeout time.Duration
deadline time.Time
}

0 comments on commit 206a6b1

Please sign in to comment.