Skip to content

Commit

Permalink
Add pin sigchain protocol to reduce relay ram usage (stage 1/2)
Browse files Browse the repository at this point in the history
When most nodes have updated, sigChainElemCacheExpiration will be
reduced in the next stage to reduce ram usage.

Signed-off-by: Yilun <[email protected]>
  • Loading branch information
yilunzhang committed Apr 24, 2020
1 parent 09c8c1b commit e13663a
Show file tree
Hide file tree
Showing 7 changed files with 711 additions and 189 deletions.
32 changes: 20 additions & 12 deletions common/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,43 +8,51 @@ import (

// Cache is an anstract cache layer
type Cache interface {
Add([]byte, interface{}) error
Get([]byte) (interface{}, bool)
Set([]byte, interface{}) error
Add(key []byte, value interface{}) error
Get(key []byte) (value interface{}, ok bool)
Set(key []byte, value interface{}) error
SetWithExpiration(key []byte, value interface{}, expiration time.Duration) error
}

// GoCache is the caching layer implemented by go-cache.
type GoCache struct {
cache *gocache.Cache
*gocache.Cache
}

// NewGoCache creates a go-cache cache with a given default expiration duration
// and cleanup interval.
func NewGoCache(defaultExpiration, cleanupInterval time.Duration) *GoCache {
return &GoCache{
cache: gocache.New(defaultExpiration, cleanupInterval),
Cache: gocache.New(defaultExpiration, cleanupInterval),
}
}

func (gc *GoCache) byteKeyToStringKey(byteKey []byte) string {
return string(byteKey)
}

// Add adds an item to the cache only if an item doesn't already exist for the
// given key, or if the existing item has expired. Returns an error otherwise.
func (gc *GoCache) Add(key []byte, value interface{}) error {
return gc.cache.Add(gc.byteKeyToStringKey(key), value, 0)
return gc.Cache.Add(byteKeyToStringKey(key), value, gocache.DefaultExpiration)
}

// Get gets an item from the cache. Returns the item or nil, and a bool
// indicating whether the key was found.
func (gc *GoCache) Get(key []byte) (interface{}, bool) {
return gc.cache.Get(gc.byteKeyToStringKey(key))
return gc.Cache.Get(byteKeyToStringKey(key))
}

// Set adds an item to the cache, replacing any existing item, using the default
// expiration.
func (gc *GoCache) Set(key []byte, value interface{}) error {
gc.cache.SetDefault(gc.byteKeyToStringKey(key), value)
gc.Cache.SetDefault(byteKeyToStringKey(key), value)
return nil
}

// SetWithExpiration adds an item to the cache, replacing any existing item,
// with a given expiration.
func (gc *GoCache) SetWithExpiration(key []byte, value interface{}, expiration time.Duration) error {
gc.Cache.Set(byteKeyToStringKey(key), value, expiration)
return nil
}

func byteKeyToStringKey(byteKey []byte) string {
return string(byteKey)
}
1 change: 1 addition & 0 deletions event/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ const (
NewBlockProduced
SendInboundMessageToClient
BacktrackSigChain
PinSigChain
)
112 changes: 97 additions & 15 deletions node/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ func NewRelayService(wallet vault.Wallet, localNode *LocalNode) *RelayService {
func (rs *RelayService) Start() error {
event.Queue.Subscribe(event.NewBlockProduced, rs.populateVRFCache)
event.Queue.Subscribe(event.NewBlockProduced, rs.flushSigChain)
event.Queue.Subscribe(event.PinSigChain, rs.startPinSigChain)
event.Queue.Subscribe(event.BacktrackSigChain, rs.backtrackDestSigChain)
rs.localNode.AddMessageHandler(pb.RELAY, rs.relayMessageHandler)
rs.localNode.AddMessageHandler(pb.PIN_SIGNATURE_CHAIN, rs.pinSigChainMessageHandler)
rs.localNode.AddMessageHandler(pb.BACKTRACK_SIGNATURE_CHAIN, rs.backtrackSigChainMessageHandler)
return nil
}
Expand Down Expand Up @@ -68,24 +70,30 @@ func NewRelayMessage(srcIdentifier string, srcPubkey, destID, payload, blockHash
return msg, nil
}

// relayMessageHandler handles a RELAY message
func (rs *RelayService) relayMessageHandler(remoteMessage *RemoteMessage) ([]byte, bool, error) {
msgBody := &pb.Relay{}
err := proto.Unmarshal(remoteMessage.Message, msgBody)
// NewPinSigChainMessage creates a PIN_SIGNATURE_CHAIN message
func NewPinSigChainMessage(hash []byte) (*pb.UnsignedMessage, error) {
msgBody := &pb.PinSignatureChain{
Hash: hash,
}

buf, err := proto.Marshal(msgBody)
if err != nil {
return nil, false, err
return nil, err
}

event.Queue.Notify(event.SendInboundMessageToClient, msgBody)
msg := &pb.UnsignedMessage{
MessageType: pb.PIN_SIGNATURE_CHAIN,
Message: buf,
}

return nil, false, nil
return msg, nil
}

// NewBacktrackSigChainMessage creates a BACKTRACK_SIGNATURE_CHAIN message
func NewBacktrackSigChainMessage(sigChainElems []*pb.SigChainElem, prevHash []byte) (*pb.UnsignedMessage, error) {
func NewBacktrackSigChainMessage(sigChainElems []*pb.SigChainElem, hash []byte) (*pb.UnsignedMessage, error) {
msgBody := &pb.BacktrackSignatureChain{
SigChainElems: sigChainElems,
PrevHash: prevHash,
Hash: hash,
}

buf, err := proto.Marshal(msgBody)
Expand All @@ -101,6 +109,35 @@ func NewBacktrackSigChainMessage(sigChainElems []*pb.SigChainElem, prevHash []by
return msg, nil
}

// relayMessageHandler handles a RELAY message
func (rs *RelayService) relayMessageHandler(remoteMessage *RemoteMessage) ([]byte, bool, error) {
msgBody := &pb.Relay{}
err := proto.Unmarshal(remoteMessage.Message, msgBody)
if err != nil {
return nil, false, err
}

event.Queue.Notify(event.SendInboundMessageToClient, msgBody)

return nil, false, nil
}

// pinSigChainMessageHandler handles a PIN_SIGNATURE_CHAIN message
func (rs *RelayService) pinSigChainMessageHandler(remoteMessage *RemoteMessage) ([]byte, bool, error) {
msgBody := &pb.PinSignatureChain{}
err := proto.Unmarshal(remoteMessage.Message, msgBody)
if err != nil {
return nil, false, err
}

err = rs.pinSigChain(msgBody.Hash, remoteMessage.Sender.PublicKey)
if err != nil {
return nil, false, err
}

return nil, false, nil
}

// backtrackSigChainMessageHandler handles a BACKTRACK_SIGNATURE_CHAIN message
func (rs *RelayService) backtrackSigChainMessageHandler(remoteMessage *RemoteMessage) ([]byte, bool, error) {
msgBody := &pb.BacktrackSignatureChain{}
Expand All @@ -109,14 +146,49 @@ func (rs *RelayService) backtrackSigChainMessageHandler(remoteMessage *RemoteMes
return nil, false, err
}

err = rs.backtrackSigChain(msgBody.SigChainElems, msgBody.PrevHash, remoteMessage.Sender.PublicKey)
err = rs.backtrackSigChain(msgBody.SigChainElems, msgBody.Hash, remoteMessage.Sender.PublicKey)
if err != nil {
return nil, false, err
}

return nil, false, nil
}

func (rs *RelayService) pinSigChain(hash, senderPubkey []byte) error {
prevHash, prevNodeID, err := rs.porServer.PinSigChain(hash, senderPubkey)
if err != nil {
return err
}

if prevNodeID == nil {
return nil
}

nextHop := rs.localNode.GetNbrNode(chordIDToNodeID(prevNodeID))
if nextHop == nil {
return fmt.Errorf("cannot find next hop with id %x", prevNodeID)
}

nextMsg, err := NewPinSigChainMessage(prevHash)
if err != nil {
return err
}

buf, err := rs.localNode.SerializeMessage(nextMsg, false)
if err != nil {
return err
}

err = nextHop.SendBytesAsync(buf)
if err != nil {
return err
}

rs.porServer.PinSigChainSuccess(hash)

return nil
}

func (rs *RelayService) backtrackSigChain(sigChainElems []*pb.SigChainElem, hash, senderPubkey []byte) error {
sigChainElems, prevHash, prevNodeID, err := rs.porServer.BacktrackSigChain(sigChainElems, hash, senderPubkey)
if err != nil {
Expand Down Expand Up @@ -201,18 +273,28 @@ func (rs *RelayService) broadcastSigChain(sigChain *pb.SigChain) error {
return nil
}

func (rs *RelayService) startPinSigChain(v interface{}) {
sigChainInfo, ok := v.(*por.PinSigChainInfo)
if !ok {
log.Error("Decode pin sigchain info failed")
return
}

err := rs.pinSigChain(sigChainInfo.PrevHash, nil)
if err != nil {
log.Errorf("Pin sigchain error: %v", err)
}
}

func (rs *RelayService) backtrackDestSigChain(v interface{}) {
sigChainInfo, ok := v.(*por.BacktrackSigChainInfo)
if !ok {
log.Error("Decode backtrack sigchain info failed")
return
}

err := rs.backtrackSigChain(
[]*pb.SigChainElem{sigChainInfo.DestSigChainElem},
sigChainInfo.PrevHash,
nil,
)
sigChainElems := []*pb.SigChainElem{sigChainInfo.DestSigChainElem}
err := rs.backtrackSigChain(sigChainElems, sigChainInfo.PrevHash, nil)
if err != nil {
log.Errorf("Backtrack sigchain error: %v", err)
}
Expand Down
Loading

0 comments on commit e13663a

Please sign in to comment.