Skip to content

Commit

Permalink
sending heart beat msg from client to server periodically
Browse files Browse the repository at this point in the history
  • Loading branch information
bufrr authored and yilunzhang committed Jul 19, 2023
1 parent 1604365 commit c154e72
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 80 deletions.
8 changes: 5 additions & 3 deletions entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func (te *TunaEntry) Start(shouldReconnect bool) error {
}
if te.udpConn != nil {
te.startUDPReaderWriter(te.udpConn, nil, &te.bytesExitToEntry, &te.bytesEntryToExit)
go sendPingMsg(te.udpConn, te.udpCloseChan)
}
go func() {
for {
Expand Down Expand Up @@ -311,6 +312,7 @@ func (te *TunaEntry) Close() {

te.isClosed = true
close(te.closeChan)
close(te.udpCloseChan)
for _, listener := range te.tcpListeners {
Close(listener)
}
Expand Down Expand Up @@ -617,14 +619,14 @@ func StartReverse(config *EntryConfiguration, wallet *nkn.Wallet) error {
continue
}
if bytes.Equal(buffer[:PrefixLen], []byte{PrefixLen - 1: 0}) && n > PrefixLen {
if encrypted {
continue
}
connMetadata, err := parseUDPConnMetadata(buffer[PrefixLen:n])
if err != nil {
log.Println("Couldn't read udp metadata from client:", err)
continue
}
if connMetadata.IsPing || encrypted {
continue
}
k := string(append(connMetadata.PublicKey, connMetadata.Nonce...))
readyChan, _ := tcpReady.LoadOrStore(k, make(chan struct{}))
<-readyChan.(chan struct{})
Expand Down
5 changes: 3 additions & 2 deletions exit.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ func (te *TunaExit) StartReverse(shouldReconnect bool) error {
time.Sleep(1 * time.Second)
continue
}
go sendPingMsg(udpConn, te.udpCloseChan)
}

var tcpPorts []uint32
Expand Down Expand Up @@ -705,10 +706,10 @@ func (te *TunaExit) Close() {

te.isClosed = true
close(te.closeChan)
close(te.udpCloseChan)
Close(te.tcpListener)
Close(te.udpConn)
Close(te.Common.tcpConn)
Close(te.Common.udpConn)
Close(te.tcpConn)

te.CloseUDPConn()
te.OnConnect.close()
Expand Down
75 changes: 42 additions & 33 deletions pb/tuna.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pb/tuna.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ message ConnectionMetadata {
bytes nonce = 3;
bool is_measurement = 4;
uint32 measurement_bytes_downlink = 5;
bool is_ping = 6;
}

message ServiceMetadata {
Expand Down
66 changes: 24 additions & 42 deletions tuna.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"math/rand"
"net"
Expand Down Expand Up @@ -127,7 +126,6 @@ type Common struct {

sync.RWMutex
udpReadWriteChanLock sync.RWMutex
udpReadyChanLock sync.RWMutex
paymentReceiver string
entryToExitPrice common.Fixed64
exitToEntryPrice common.Fixed64
Expand All @@ -142,7 +140,7 @@ type Common struct {
activeSessions int
linger time.Duration
presetNode *types.Node
connReadyChan map[string]chan struct{}
connReadyChan sync.Map

reverseBytesExitToEntry map[string][]uint64
reverseBytesEntryToExit map[string][]uint64
Expand Down Expand Up @@ -240,6 +238,7 @@ func NewCommon(
curveSecretKey: curveSecretKey,
encryptionAlgo: encryptionAlgo,
closeChan: make(chan struct{}),
udpCloseChan: make(chan struct{}),
sharedKeys: make(map[string]*[sharedKeySize]byte),
measureDelayConcurrentWorkers: measureDelayConcurrentWorkers,
measureBandwidthConcurrentWorkers: measureBandwidthConcurrentWorkers,
Expand All @@ -249,9 +248,8 @@ func NewCommon(
reverseBytesEntryToExit: make(map[string][]uint64),
reverseBytesExitToEntry: make(map[string][]uint64),

udpReadChan: make(chan []byte),
udpWriteChan: make(chan []byte),
connReadyChan: make(map[string]chan struct{}),
udpReadChan: make(chan []byte),
udpWriteChan: make(chan []byte),
}
c.minBalance, err = common.StringToFixed64(minBalance)
if err != nil {
Expand Down Expand Up @@ -322,8 +320,6 @@ func (c *Common) GetServerUDPConn(force bool) (UDPConn, error) {
if err != nil {
return nil, err
}
c.RLock()
defer c.RUnlock()
return c.GetUDPConn(), nil
}

Expand Down Expand Up @@ -408,8 +404,7 @@ func (c *Common) startUDPReaderWriter(conn *EncryptUDPConn, toAddr *net.UDPAddr,
n := 0
encrypted := false
var err error
addrToKey := make(map[string]string)
var addrToKeyLock sync.RWMutex
addrToKey := new(sync.Map)

go func() {
buffer := make([]byte, MaxUDPBufferSize)
Expand All @@ -425,24 +420,19 @@ func (c *Common) startUDPReaderWriter(conn *EncryptUDPConn, toAddr *net.UDPAddr,
}
}
if bytes.Equal(buffer[:PrefixLen], []byte{PrefixLen - 1: 0}) && c.IsServer && n > PrefixLen {
if encrypted {
continue
}
connMetadata, err := parseUDPConnMetadata(buffer[PrefixLen:n])
if err != nil {
log.Println("Couldn't read udp metadata from client:", err)
continue
}
if connMetadata.IsPing || encrypted {
continue
}
connKey := string(append(connMetadata.PublicKey, connMetadata.Nonce...))

c.udpReadyChanLock.Lock()
readyChan, ok := c.connReadyChan[connKey]
if !ok {
readyChan = make(chan struct{}, 1)
c.connReadyChan[connKey] = readyChan
}
c.udpReadyChanLock.Unlock()
<-readyChan
readyChan, _ := c.connReadyChan.LoadOrStore(connKey, make(chan struct{}, 1))
<-readyChan.(chan struct{})

encryptKey, ok := c.encryptKeys.Load(connKey)
if !ok {
log.Println("no encrypt key found")
Expand All @@ -457,9 +447,7 @@ func (c *Common) startUDPReaderWriter(conn *EncryptUDPConn, toAddr *net.UDPAddr,

if in == nil && out == nil {
k := string(append(connMetadata.PublicKey, connMetadata.Nonce...))
addrToKeyLock.Lock()
addrToKey[from.String()] = k
addrToKeyLock.Unlock()
addrToKey.Store(from.String(), k)
}
continue
}
Expand All @@ -477,10 +465,10 @@ func (c *Common) startUDPReaderWriter(conn *EncryptUDPConn, toAddr *net.UDPAddr,
if in != nil {
atomic.AddUint64(in, uint64(n))
} else {
addrToKeyLock.RLock()
k := addrToKey[from.String()]
addrToKeyLock.RUnlock()
atomic.AddUint64(&c.reverseBytesEntryToExit[k][b[2]], uint64(n))
k, ok := addrToKey.Load(from.String())
if ok {
atomic.AddUint64(&c.reverseBytesEntryToExit[k.(string)][b[2]], uint64(n))
}
}
}
}
Expand All @@ -505,10 +493,10 @@ func (c *Common) startUDPReaderWriter(conn *EncryptUDPConn, toAddr *net.UDPAddr,
if out != nil {
atomic.AddUint64(out, uint64(n))
} else {
addrToKeyLock.RLock()
k := addrToKey[from.String()]
addrToKeyLock.RUnlock()
atomic.AddUint64(&c.reverseBytesExitToEntry[k][data[2]], uint64(n))
k, ok := addrToKey.Load(from.String())
if ok {
atomic.AddUint64(&c.reverseBytesExitToEntry[k.(string)][data[2]], uint64(n))
}
}
case <-c.udpCloseChan:
return
Expand Down Expand Up @@ -612,15 +600,9 @@ func (c *Common) wrapConn(conn net.Conn, remotePublicKey []byte, localConnMetada
c.encryptKeys.Store(k, encryptKey)

if c.IsServer {
c.udpReadyChanLock.Lock()
readyChan, ok := c.connReadyChan[k]
if !ok {
readyChan = make(chan struct{}, 1)
c.connReadyChan[k] = readyChan
}
c.udpReadyChanLock.Unlock()
readyChan, _ := c.connReadyChan.LoadOrStore(k, make(chan struct{}, 1))
select {
case readyChan <- struct{}{}:
case readyChan.(chan struct{}) <- struct{}{}:
default:
}
}
Expand Down Expand Up @@ -1583,7 +1565,7 @@ func ConnIDToPort(data []byte) uint16 {
}

func LoadPassword(path string) (string, error) {
content, err := ioutil.ReadFile(path)
content, err := os.ReadFile(path)
if err != nil {
return "", err
}
Expand All @@ -1599,7 +1581,7 @@ func LoadOrCreateAccount(walletFile, passwordFile string) (*vault.Account, error
if _, err = os.Stat(passwordFile); os.IsNotExist(err) {
pswd = base64.StdEncoding.EncodeToString(util.RandomBytes(24))
log.Println("Creating wallet.pswd")
err = ioutil.WriteFile(passwordFile, []byte(pswd), 0644)
err = os.WriteFile(passwordFile, []byte(pswd), 0644)
if err != nil {
return nil, fmt.Errorf("save password to file error: %v", err)
}
Expand Down
22 changes: 22 additions & 0 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net"
"strings"
"sync"
"time"

"github.com/nknorg/nkn-sdk-go"
"github.com/nknorg/nkn/v2/common"
Expand All @@ -24,6 +25,7 @@ const (
nodeRPCPort = 30003
randomIdentifierChars = "abcdefghijklmnopqrstuvwxyz0123456789"
randomIdentifierLength = 8
heartbeatInterval = 30 * time.Second
)

var encryptionAlgoMap = map[string]pb.EncryptionAlgo{
Expand Down Expand Up @@ -191,6 +193,26 @@ func writeUDPConnMetadata(conn UDPConn, addr *net.UDPAddr, connMetadata *pb.Conn
return err
}

func sendPingMsg(conn UDPConn, closeChan chan struct{}) {
pingMsg := new(pb.ConnectionMetadata)
pingMsg.IsPing = true

for {
select {
case <-closeChan:
break
default:
}
err := writeUDPConnMetadata(conn, nil, pingMsg)
if err != nil {
log.Println("write udp ping msg error:", err)
break
}
time.Sleep(heartbeatInterval)
}
return
}

func readStreamMetadata(stream *smux.Stream) (*pb.StreamMetadata, error) {
b, err := ReadVarBytes(stream, maxStreamMetadataSize)
if err != nil {
Expand Down

0 comments on commit c154e72

Please sign in to comment.