Skip to content

Commit

Permalink
Merge branch 'master' into version
Browse files Browse the repository at this point in the history
  • Loading branch information
dustinxie authored Dec 9, 2024
2 parents e69d3e2 + d5df01a commit 217e328
Show file tree
Hide file tree
Showing 56 changed files with 575 additions and 677 deletions.
2 changes: 1 addition & 1 deletion action/envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ type (
Nonce() uint64
Gas() uint64
GasPrice() *big.Int
EffectiveGasPrice(*big.Int) *big.Int
TxDynamicGas
AccessList() types.AccessList
TxBlob
Expand All @@ -71,6 +70,7 @@ type (
TxDynamicGas interface {
GasTipCap() *big.Int
GasFeeCap() *big.Int
EffectiveGasPrice(*big.Int) *big.Int
}

TxBlob interface {
Expand Down
4 changes: 2 additions & 2 deletions action/protocol/execution/evm/evm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,12 +260,12 @@ func TestConstantinople(t *testing.T) {
},
{
"io1pcg2ja9krrhujpazswgz77ss46xgt88afqlk6y",
41174200,
33730920,
},
// after Vanuatu
{
action.EmptyAddress,
41174201,
33730921,
},
{
"io1pcg2ja9krrhujpazswgz77ss46xgt88afqlk6y",
Expand Down
2 changes: 1 addition & 1 deletion action/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func SplitGas(ctx context.Context, tx action.TxDynamicGas, usedGas uint64) (*big
return priority.Mul(priority, gas), base.Mul(base, gas), nil
}

func EffectiveGasPrice(ctx context.Context, tx action.TxCommon) *big.Int {
func EffectiveGasPrice(ctx context.Context, tx action.TxDynamicGas) *big.Int {
if !MustGetFeatureCtx(ctx).EnableDynamicFeeTx {
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions action/protocol/rewarding/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func (p *Protocol) deleteStateV2(sm protocol.StateManager, key []byte) error {
func (p *Protocol) settleSystemAction(
ctx context.Context,
sm protocol.StateManager,
act action.TxCommon,
act action.TxDynamicGas,
status uint64,
si int,
logs []*action.Log,
Expand All @@ -399,7 +399,7 @@ func (p *Protocol) settleSystemAction(
func (p *Protocol) settleUserAction(
ctx context.Context,
sm protocol.StateManager,
act action.TxCommon,
act action.TxDynamicGas,
status uint64,
si int,
logs []*action.Log,
Expand All @@ -411,7 +411,7 @@ func (p *Protocol) settleUserAction(
func (p *Protocol) settleAction(
ctx context.Context,
sm protocol.StateManager,
act action.TxCommon,
act action.TxDynamicGas,
status uint64,
si int,
isSystemAction bool,
Expand Down
2 changes: 1 addition & 1 deletion action/protocol/staking/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ const (
func (p *Protocol) settleAction(
ctx context.Context,
sm protocol.StateManager,
act action.TxCommon,
act action.TxDynamicGas,
status uint64,
logs []*action.Log,
tLogs []*action.TransactionLog,
Expand Down
10 changes: 10 additions & 0 deletions action/sealedenvelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,3 +242,13 @@ func (sealed *SealedEnvelope) VerifySignature() error {
}
return nil
}

// Protected says whether the transaction is replay-protected.
func (sealed *SealedEnvelope) Protected() bool {
switch sealed.encoding {
case iotextypes.Encoding_TX_CONTAINER:
return sealed.Envelope.(*txContainer).tx.Protected()
default:
return sealed.encoding != iotextypes.Encoding_ETHEREUM_UNPROTECTED
}
}
1 change: 1 addition & 0 deletions actpool/actpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type ActPool interface {
ReceiveBlock(*block.Block) error

AddActionEnvelopeValidators(...action.SealedEnvelopeValidator)
AddSubscriber(sub Subscriber)
}

// Subscriber is the interface for actpool subscriber
Expand Down
46 changes: 25 additions & 21 deletions actsync/actionsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ func (as *ActionSync) Stop(ctx context.Context) error {
return err
}
close(as.quit)
close(as.syncChan)
as.wg.Wait()
return nil
}
Expand Down Expand Up @@ -127,28 +126,33 @@ func (as *ActionSync) ReceiveAction(_ context.Context, hash hash.Hash256) {

func (as *ActionSync) sync() {
defer as.wg.Done()
for hash := range as.syncChan {
log.L().Debug("syncing action", log.Hex("hash", hash[:]))
channelFullnessMtc.WithLabelValues("action").Set(float64(len(as.syncChan)) / float64(cap(as.syncChan)))
ctx, cancel := context.WithTimeout(context.Background(), unicaseTimeout)
defer cancel()
msg, ok := as.actions.Load(hash)
if !ok {
log.L().Debug("action not requested or already received", log.Hex("hash", hash[:]))
continue
}
if time.Since(msg.(*actionMsg).lastTime) < as.cfg.Interval {
log.L().Debug("action is recently requested", log.Hex("hash", hash[:]))
continue
}
msg.(*actionMsg).lastTime = time.Now()
// TODO: enhancement, request multiple actions in one message
if err := as.requestFromNeighbors(ctx, hash); err != nil {
log.L().Warn("Failed to request action from neighbors", zap.Error(err))
counterMtc.WithLabelValues("failed").Inc()
for {
select {
case hash := <-as.syncChan:
log.L().Debug("syncing action", log.Hex("hash", hash[:]))
channelFullnessMtc.WithLabelValues("action").Set(float64(len(as.syncChan)) / float64(cap(as.syncChan)))
ctx, cancel := context.WithTimeout(context.Background(), unicaseTimeout)
defer cancel()
msg, ok := as.actions.Load(hash)
if !ok {
log.L().Debug("action not requested or already received", log.Hex("hash", hash[:]))
continue
}
if time.Since(msg.(*actionMsg).lastTime) < as.cfg.Interval {
log.L().Debug("action is recently requested", log.Hex("hash", hash[:]))
continue
}
msg.(*actionMsg).lastTime = time.Now()
// TODO: enhancement, request multiple actions in one message
if err := as.requestFromNeighbors(ctx, hash); err != nil {
log.L().Warn("Failed to request action from neighbors", zap.Error(err))
counterMtc.WithLabelValues("failed").Inc()
}
case <-as.quit:
log.L().Info("quitting action sync")
return
}
}
log.L().Info("quitting action sync")
}

func (as *ActionSync) triggerSync() {
Expand Down
36 changes: 36 additions & 0 deletions actsync/actionsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,40 @@ func TestActionSync(t *testing.T) {
r.False(ok, "action should be removed after received")
}
})
t.Run("requestWhenStopping", func(t *testing.T) {
count := atomic.Int32{}
as := NewActionSync(Config{
Size: 1000,
Interval: 10 * time.Millisecond,
}, &Helper{
P2PNeighbor: func() ([]peer.AddrInfo, error) {
return neighbors, nil
},
UnicastOutbound: func(_ context.Context, p peer.AddrInfo, msg proto.Message) error {
count.Add(1)
return nil
},
})
r.NoError(as.Start(context.Background()))
acts := []hash.Hash256{}
for i := 0; i < 100; i++ {
acts = append(acts, hash.Hash256b([]byte{byte(i)}))
}
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
for k := 0; k <= 10; k++ {
idx := i*10 + k
if idx >= len(acts) {
break
}
as.RequestAction(context.Background(), acts[idx])
}
}(i)
}
r.NoError(as.Stop(context.Background()))
wg.Wait()
})
}
93 changes: 93 additions & 0 deletions api/action_radio.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package api

import (
"context"
"encoding/hex"

"github.com/iotexproject/iotex-proto/golang/iotextypes"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/iotexproject/iotex-core/v2/action"
"github.com/iotexproject/iotex-core/v2/pkg/log"
batch "github.com/iotexproject/iotex-core/v2/pkg/messagebatcher"
)

// ActionRadioOption is the option to create ActionRadio
type ActionRadioOption func(*ActionRadio)

// WithMessageBatch enables message batching
func WithMessageBatch() ActionRadioOption {
return func(ar *ActionRadio) {
ar.messageBatcher = batch.NewManager(func(msg *batch.Message) error {
return ar.broadcastHandler(context.Background(), ar.chainID, msg.Data)
})
}
}

// ActionRadio broadcasts actions to the network
type ActionRadio struct {
broadcastHandler BroadcastOutbound
messageBatcher *batch.Manager
chainID uint32
}

// NewActionRadio creates a new ActionRadio
func NewActionRadio(broadcastHandler BroadcastOutbound, chainID uint32, opts ...ActionRadioOption) *ActionRadio {
ar := &ActionRadio{
broadcastHandler: broadcastHandler,
chainID: chainID,
}
for _, opt := range opts {
opt(ar)
}
return ar
}

// Start starts the action radio
func (ar *ActionRadio) Start() error {
if ar.messageBatcher != nil {
return ar.messageBatcher.Start()
}
return nil
}

// Stop stops the action radio
func (ar *ActionRadio) Stop() error {
if ar.messageBatcher != nil {
return ar.messageBatcher.Stop()
}
return nil
}

// OnAdded broadcasts the action to the network
func (ar *ActionRadio) OnAdded(selp *action.SealedEnvelope) {
var (
hasSidecar = selp.BlobTxSidecar() != nil
hash, _ = selp.Hash()
out proto.Message
err error
)
if hasSidecar {
out = &iotextypes.ActionHash{
Hash: hash[:],
}
} else {
out = selp.Proto()
}
if ar.messageBatcher != nil && !hasSidecar { // TODO: batch blobTx
err = ar.messageBatcher.Put(&batch.Message{
ChainID: ar.chainID,
Target: nil,
Data: out,
})
} else {
err = ar.broadcastHandler(context.Background(), ar.chainID, out)
}
if err != nil {
log.L().Warn("Failed to broadcast SendAction request.", zap.Error(err), zap.String("actionHash", hex.EncodeToString(hash[:])))
}
}

// OnRemoved does nothing
func (ar *ActionRadio) OnRemoved(act *action.SealedEnvelope) {}
37 changes: 37 additions & 0 deletions api/action_radio_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package api

import (
"context"
"math/big"
"sync/atomic"
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"github.com/iotexproject/iotex-core/v2/action"
"github.com/iotexproject/iotex-core/v2/test/identityset"
)

func TestActionRadio(t *testing.T) {
r := require.New(t)
broadcastCount := uint64(0)
radio := NewActionRadio(
func(_ context.Context, _ uint32, _ proto.Message) error {
atomic.AddUint64(&broadcastCount, 1)
return nil
},
0)
r.NoError(radio.Start())
defer func() {
r.NoError(radio.Stop())
}()

gas := uint64(100000)
gasPrice := big.NewInt(10)
selp, err := action.SignedTransfer(identityset.Address(1).String(), identityset.PrivateKey(1), 1, big.NewInt(1), nil, gas, gasPrice)
r.NoError(err)

radio.OnAdded(selp)
r.Equal(uint64(1), atomic.LoadUint64(&broadcastCount))
}
7 changes: 1 addition & 6 deletions api/blockdaoservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,7 @@ func (service *blockDAOService) FooterByHeight(_ context.Context, request *block
if err != nil {
return nil, err
}
footerpb, err := footer.ConvertToBlockFooterPb()
if err != nil {
return nil, err
}

return &blockdaopb.FooterResponse{
Footer: footerpb,
Footer: footer.Proto(),
}, nil
}
Loading

0 comments on commit 217e328

Please sign in to comment.