Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/txpool: improve Add() logic, handle edge case #2754

Merged
merged 5 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 8 additions & 28 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ func (pool *LegacyPool) Stats() (int, int) {
return pool.stats()
}

func (pool *LegacyPool) statsOverflowPool() int {
func (pool *LegacyPool) statsOverflowPool() uint64 {
pool.mu.RLock()
defer pool.mu.RUnlock()

Expand Down Expand Up @@ -907,25 +907,14 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
}

func (pool *LegacyPool) addToOverflowPool(drop types.Transactions, isLocal bool) {
// calculate total number of slots in drop. Accordingly add them to OverflowPool (if there is space)
availableSlotsOverflowPool := pool.availableSlotsOverflowPool()
if availableSlotsOverflowPool > 0 {
// transfer availableSlotsOverflowPool number of transactions slots from drop to OverflowPool
currentSlotsUsed := 0
for i, tx := range drop {
txSlots := numSlots(tx)
if currentSlotsUsed+txSlots <= availableSlotsOverflowPool {
from, _ := types.Sender(pool.signer, tx)
pool.localBufferPool.Add(tx)
log.Debug("adding to OverflowPool", "transaction", tx.Hash().String(), "from", from.String())
currentSlotsUsed += txSlots
} else {
log.Debug("not all got added to OverflowPool", "totalAdded", i+1)
return
}
for _, tx := range drop {
added := pool.localBufferPool.Add(tx)
if added {
from, _ := types.Sender(pool.signer, tx)
log.Debug("Added to OverflowPool", "transaction", tx.Hash().String(), "from", from.String())
} else {
log.Debug("Failed to add transaction to OverflowPool", "transaction", tx.Hash().String())
}
} else {
log.Debug("adding to OverflowPool unsuccessful", "availableSlotsOverflowPool", availableSlotsOverflowPool)
}
}

Expand Down Expand Up @@ -2108,15 +2097,6 @@ func (pool *LegacyPool) transferTransactions() {
pool.Add(txs, true, false)
}

func (pool *LegacyPool) availableSlotsOverflowPool() int {
maxOverflowPoolSize := int(pool.config.OverflowPoolSlots)
availableSlots := maxOverflowPoolSize - pool.localBufferPool.Size()
if availableSlots > 0 {
return availableSlots
}
return 0
}

func (pool *LegacyPool) PrintTxStats() {
for _, l := range pool.pending {
for _, transaction := range l.txs.items {
Expand Down
8 changes: 4 additions & 4 deletions core/txpool/legacypool/legacypool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2268,25 +2268,25 @@ func TestTransferTransactions(t *testing.T) {

assert.Equal(t, 0, pending, "pending transactions mismatched")
assert.Equal(t, 0, queue, "queued transactions mismatched")
assert.Equal(t, 1, pool.statsOverflowPool(), "OverflowPool size unexpected")
assert.Equal(t, uint64(1), pool.statsOverflowPool(), "OverflowPool size unexpected")

tx2 := dynamicFeeTx(0, 100000, big.NewInt(3), big.NewInt(2), keys[1])
pool.addToOverflowPool([]*types.Transaction{tx2}, true)
assert.Equal(t, 1, pool.statsOverflowPool(), "OverflowPool size unexpected")
assert.Equal(t, uint64(1), pool.statsOverflowPool(), "OverflowPool size unexpected")
<-pool.requestPromoteExecutables(newAccountSet(pool.signer, from))
pending, queue = pool.Stats()

assert.Equal(t, 0, pending, "pending transactions mismatched")
assert.Equal(t, 1, queue, "queued transactions mismatched")
assert.Equal(t, 0, pool.statsOverflowPool(), "OverflowPool size unexpected")
assert.Equal(t, uint64(0), pool.statsOverflowPool(), "OverflowPool size unexpected")

tx3 := dynamicFeeTx(0, 100000, big.NewInt(3), big.NewInt(2), keys[2])
pool.addToOverflowPool([]*types.Transaction{tx3}, true)
pending, queue = pool.Stats()

assert.Equal(t, 1, pending, "pending transactions mismatched")
assert.Equal(t, 0, queue, "queued transactions mismatched")
assert.Equal(t, 1, pool.statsOverflowPool(), "OverflowPool size unexpected")
assert.Equal(t, uint64(1), pool.statsOverflowPool(), "OverflowPool size unexpected")
}

// Tests that the pool rejects replacement dynamic fee transactions that don't
Expand Down
43 changes: 31 additions & 12 deletions core/txpool/legacypool/tx_overflowpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)

// txHeapItem implements the Interface interface (https://pkg.go.dev/container/heap#Interface) of heap so that it can be heapified
Expand Down Expand Up @@ -65,8 +66,8 @@ type TxOverflowPool struct {
txHeap txHeap
index map[common.Hash]*txHeapItem
mu sync.RWMutex
maxSize uint64
totalSize int
maxSize uint64 // Maximum slots
totalSize uint64 // Total number of slots currently
}

func NewTxOverflowPoolHeap(estimatedMaxSize uint64) *TxOverflowPool {
Expand All @@ -77,34 +78,52 @@ func NewTxOverflowPoolHeap(estimatedMaxSize uint64) *TxOverflowPool {
}
}

func (tp *TxOverflowPool) Add(tx *types.Transaction) {
func (tp *TxOverflowPool) Add(tx *types.Transaction) bool {
tp.mu.Lock()
defer tp.mu.Unlock()

if _, exists := tp.index[tx.Hash()]; exists {
// Transaction already in pool, ignore
return
return false
}

txSlots := uint64(numSlots(tx))

// If the transaction is too big to ever fit (and the pool isn't empty right now), reject it
if (txSlots > tp.maxSize) || (txSlots == tp.maxSize && tp.totalSize != 0) {
log.Warn("Transaction too large to fit in OverflowPool", "transaction", tx.Hash().String(), "requiredSlots", txSlots, "maxSlots", tp.maxSize)
return false
}

if uint64(len(tp.txHeap)) >= tp.maxSize {
// Remove the oldest transaction to make space
// Remove transactions until there is room for the new transaction
for tp.totalSize+txSlots > tp.maxSize {
if tp.txHeap.Len() == 0 {
// No transactions left to remove, cannot make room
log.Warn("Not enough space in OverflowPool even after clearing", "transaction", tx.Hash().String())
return false
}
// Remove the oldest transaction
oldestItem, ok := heap.Pop(&tp.txHeap).(*txHeapItem)
if !ok || oldestItem == nil {
return
log.Error("Failed to pop from txHeap during Add")
return false
}
delete(tp.index, oldestItem.tx.Hash())
tp.totalSize -= numSlots(oldestItem.tx)
tp.totalSize -= uint64(numSlots(oldestItem.tx))
OverflowPoolGauge.Dec(1)
}

// Add the new transaction
item := &txHeapItem{
tx: tx,
timestamp: time.Now().UnixNano(),
}
heap.Push(&tp.txHeap, item)
tp.index[tx.Hash()] = item
tp.totalSize += numSlots(tx)
tp.totalSize += txSlots
OverflowPoolGauge.Inc(1)

return true
}

func (tp *TxOverflowPool) Get(hash common.Hash) (*types.Transaction, bool) {
Expand All @@ -122,7 +141,7 @@ func (tp *TxOverflowPool) Remove(hash common.Hash) {
if item, ok := tp.index[hash]; ok {
heap.Remove(&tp.txHeap, item.index)
delete(tp.index, hash)
tp.totalSize -= numSlots(item.tx)
tp.totalSize -= uint64(numSlots(item.tx))
OverflowPoolGauge.Dec(1)
}
}
Expand All @@ -141,7 +160,7 @@ func (tp *TxOverflowPool) Flush(n int) []*types.Transaction {
}
txs[i] = item.tx
delete(tp.index, item.tx.Hash())
tp.totalSize -= numSlots(item.tx)
tp.totalSize -= uint64(numSlots(item.tx))
}

OverflowPoolGauge.Dec(int64(n))
Expand All @@ -154,7 +173,7 @@ func (tp *TxOverflowPool) Len() int {
return tp.txHeap.Len()
}

func (tp *TxOverflowPool) Size() int {
func (tp *TxOverflowPool) Size() uint64 {
tp.mu.RLock()
defer tp.mu.RUnlock()
return tp.totalSize
Expand Down
77 changes: 77 additions & 0 deletions core/txpool/legacypool/tx_overflowpool_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package legacypool

import (
rand3 "crypto/rand"
"math/big"
rand2 "math/rand"
"testing"
Expand All @@ -9,6 +10,7 @@ import (
"github.com/cometbft/cometbft/libs/rand"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/assert"
)

// Helper function to create a test transaction
Expand Down Expand Up @@ -157,6 +159,59 @@ func TestTxOverflowPoolHeapLen(t *testing.T) {
}
}

func TestTxOverflowPoolSlotCalculation(t *testing.T) {
// Initialize the pool with a maximum size of 2
pool := NewTxOverflowPoolHeap(2)

// Create two transactions with different slot requirements
tx1 := createTestTx(1, big.NewInt(1000)) // tx1 takes 1 slot
tx2 := createTestTx(2, big.NewInt(2000)) // tx2 takes 1 slot

// Add both transactions to fill the pool
pool.Add(tx1)
pool.Add(tx2)

if pool.Len() != 2 {
t.Fatalf("Expected pool size 2, but got %d", pool.Len())
}

dataSize := 40000
tx3 := createLargeTestTx(
3, // nonce
big.NewInt(100000000000), // gasPrice: 100 Gwei
dataSize,
) // takes 2 slots

// Create a third transaction with more slots than tx1
tx3Added := pool.Add(tx3)
assert.Equal(t, false, tx3Added)
assert.Equal(t, uint64(2), pool.totalSize)

// Verify that the pool length remains at 2
assert.Equal(t, 2, pool.Len(), "Expected pool size 2 after overflow")

tx4 := createTestTx(4, big.NewInt(3000)) // tx4 takes 1 slot
// Add tx4 to the pool
assert.True(t, pool.Add(tx4), "Failed to add tx4")

// The pool should evict the oldest transaction (tx1) to make room for tx4
// Verify that tx1 is no longer in the pool
_, exists := pool.Get(tx1.Hash())
assert.False(t, exists, "Expected tx1 to be evicted from the pool")
}

func TestBiggerTx(t *testing.T) {
// Create a transaction with 40KB of data (which should take 2 slots)
dataSize := 40000
tx := createLargeTestTx(
0, // nonce
big.NewInt(100000000000), // gasPrice: 100 Gwei
dataSize,
)
numberOfSlots := numSlots(tx)
assert.Equal(t, 2, numberOfSlots)
}

// Helper function to create a random test transaction
func createRandomTestTx() *types.Transaction {
nonce := uint64(rand.Intn(1000000))
Expand All @@ -176,6 +231,28 @@ func createRandomTestTxs(n int) []*types.Transaction {
return txs
}

// createLargeTestTx creates a transaction with a large data payload
func createLargeTestTx(nonce uint64, gasPrice *big.Int, dataSize int) *types.Transaction {
// Generate random data of specified size
data := make([]byte, dataSize)
rand3.Read(data)

to := common.HexToAddress("0x1234567890123456789012345678901234567890")

// Calculate gas needed for the data
// Gas costs: 21000 (base) + 16 (per non-zero byte) or 4 (per zero byte)
gasLimit := uint64(21000 + (16 * len(data)))

return types.NewTransaction(
nonce,
to,
big.NewInt(1000),
gasLimit,
gasPrice,
data,
)
}

// goos: darwin
// goarch: arm64
// pkg: github.com/ethereum/go-ethereum/core/txpool/legacypool
Expand Down
Loading