Skip to content

Commit

Permalink
chore(mempool): refactor TxRepear into ConsensusBlockDataReaper
Browse files Browse the repository at this point in the history
  • Loading branch information
jsteenb2 committed Aug 11, 2022
1 parent 9a65a02 commit f4a1f96
Show file tree
Hide file tree
Showing 16 changed files with 40 additions and 72 deletions.
6 changes: 1 addition & 5 deletions consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,7 @@ func TestMempoolRmBadTx(t *testing.T) {
t.Errorf("error getting reaper: %s", err)
return
}
txs, err := reaper.Txs(ctx)
if err != nil {
t.Errorf("error getting txs from reaper: %s", err)
return
}
txs := reaper.Txs
if len(txs) == 0 {
emptyMempoolCh <- struct{}{}
return
Expand Down
4 changes: 2 additions & 2 deletions consensus/replay_stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func (e emptyMempool) PrepBlockFinality(_ context.Context) (func(), error) {
return func() {}, nil
}

func (e emptyMempool) Reap(ctx context.Context, opts ...mempl.ReapOptFn) (types.TxReaper, error) {
return types.Txs{}, nil
func (e emptyMempool) Reap(ctx context.Context, opts ...mempl.ReapOptFn) (types.Data, error) {
return types.Data{}, nil
}

func (e emptyMempool) TxsAvailable() <-chan struct{} {
Expand Down
14 changes: 7 additions & 7 deletions mempool/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,23 +242,23 @@ func (a *ABCI) PrepBlockFinality(_ context.Context) (func(), error) {
}

// Reap calls the underlying pool's Reap method with the given options.
func (a *ABCI) Reap(ctx context.Context, opts ...ReapOptFn) (types.TxReaper, error) {
func (a *ABCI) Reap(ctx context.Context, opts ...ReapOptFn) (types.Data, error) {
opt := CoalesceReapOpts(opts...)
reaper, err := a.pool.Reap(ctx, opt)
data, err := a.pool.Reap(ctx, opt)
if err != nil {
return nil, err
return types.Data{}, err
}

if opt.Verify {
return a.verifyReaper(ctx, reaper)
return a.verifyReaper(ctx, data)
}

return reaper, nil
return data, nil
}

func (a *ABCI) verifyReaper(ctx context.Context, reaper types.TxReaper) (types.TxReaper, error) {
func (a *ABCI) verifyReaper(ctx context.Context, data types.Data) (types.Data, error) {
// TODO(berg): wire this up, for now just a no op, all txs are good...
return reaper, nil
return data, nil
}

// Remove removes txs from the cache and underlying pool.
Expand Down
4 changes: 1 addition & 3 deletions mempool/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ func BenchmarkReap(b *testing.B) {
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
reaper, err := mp.Reap(ctx, ReapBytes(100000000), ReapGas(10000000))
_, err := mp.Reap(ctx, ReapBytes(100000000), ReapGas(10000000))
require.NoError(b, err)

reaper.Txs(ctx)
}
}

Expand Down
6 changes: 2 additions & 4 deletions mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,7 @@ func TestReapMaxBytesMaxGas(t *testing.T) {
reaper, err := mp.Reap(ctx, ReapBytes(tt.maxBytes), ReapGas(tt.maxGas))
require.NoError(t, err)

got, err := reaper.Txs(ctx)
require.NoError(t, err)
got := reaper.Txs
assert.Equal(t, tt.expectedNumTxs, len(got), "Got %d txs, expected %d, tc #%d",
len(got), tt.expectedNumTxs, tcIndex)
require.NoError(t, mp.Flush(ctx))
Expand Down Expand Up @@ -415,8 +414,7 @@ func TestSerialReap(t *testing.T) {
reaper, err := mp.Reap(ctx)
require.NoError(t, err)

txs, err := reaper.Txs(ctx)
require.NoError(t, err)
txs := reaper.Txs
require.Equal(t, len(txs), exp, fmt.Sprintf("Expected to reap %v txs but got %v", exp, len(txs)))
}

Expand Down
4 changes: 2 additions & 2 deletions mempool/mock/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ func (m Mempool) PrepBlockFinality(_ context.Context) (func(), error) {
return func() {}, nil
}

func (m Mempool) Reap(ctx context.Context, opts ...mempl.ReapOptFn) (types.TxReaper, error) {
return types.Txs{}, nil
func (m Mempool) Reap(ctx context.Context, opts ...mempl.ReapOptFn) (types.Data, error) {
return types.Data{}, nil
}
20 changes: 11 additions & 9 deletions mempool/pool_clist.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,20 +273,22 @@ func (mem *PoolCList) CheckTxPrep(ctx context.Context, tx types.Tx) error {
return nil
}

func (mem *PoolCList) Reap(ctx context.Context, opt ReapOption) (types.TxReaper, error) {
func (mem *PoolCList) Reap(ctx context.Context, opt ReapOption) (types.Data, error) {
if opt.NumTxs > -1 && (opt.GasLimit > -1 || opt.BlockSizeLimit > -1) {
return nil, fmt.Errorf("reaping by num txs and one of gas limit or block size limit is not supported")
return types.Data{}, fmt.Errorf("reaping by num txs and one of gas limit or block size limit is not supported")
}

if opt.NumTxs == -1 && opt.GasLimit == -1 && opt.BlockSizeLimit == -1 {
return mem.reapMaxTxs(-1), nil
var txs types.Txs
switch {
case opt.NumTxs == -1 && opt.GasLimit == -1 && opt.BlockSizeLimit == -1:
txs = mem.reapMaxTxs(-1)
case opt.NumTxs > -1:
txs = mem.reapMaxTxs(opt.NumTxs)
default:
txs = mem.reapMaxBytesMaxGas(opt.BlockSizeLimit, opt.GasLimit)
}

if opt.NumTxs > -1 {
return mem.reapMaxTxs(opt.NumTxs), nil
}

return mem.reapMaxBytesMaxGas(opt.BlockSizeLimit, opt.GasLimit), nil
return types.Data{Txs: txs}, nil
}

func (mem *PoolCList) Recheck(ctx context.Context, appConn proxy.AppConnMempool) (OpResult, error) {
Expand Down
4 changes: 1 addition & 3 deletions mempool/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,7 @@ func waitForTxsOnReactor(t *testing.T, txs types.Txs, reactor *Reactor, reactorI
reaper, err := mp.Reap(ctx, ReapTxs(len(txs)))
require.NoError(t, err)

reapedTxs, err := reaper.Txs(ctx)
require.NoError(t, err)

reapedTxs := reaper.Txs
for i, tx := range txs {
assert.Equalf(t, tx, reapedTxs[i],
"txs at index %d on reactor %d don't match: %v vs %v", i, reactorIndex, tx, reapedTxs[i])
Expand Down
2 changes: 1 addition & 1 deletion mempool/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Pool interface {

// Reap returns Txs from the given pool. It is up to the pool implementation to define
// how they handle the possible predicates from option combinations.
Reap(ctx context.Context, opts ReapOption) (types.TxReaper, error)
Reap(ctx context.Context, opts ReapOption) (types.Data, error)

// Recheck should trigger a recheck of the uncommitted txs within the mempool. Note
// that not all mempools make use of this. For example, the narwhal mempool does no
Expand Down
3 changes: 1 addition & 2 deletions rpc/client/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,7 @@ func TestBroadcastTxSync(t *testing.T) {
reaper, err := mempool.Reap(ctx, mempl.ReapTxs(len(tx)))
require.NoError(err)

txs, err := reaper.Txs(ctx)
require.NoError(err)
txs := reaper.Txs
require.EqualValues(tx, txs[0])
require.NoError(mempool.Flush(ctx))
}
Expand Down
2 changes: 1 addition & 1 deletion rpc/core/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type Mempool interface {
CheckTx(ctx context.Context, tx types.Tx, callback func(*abci.Response), txInfo mempl.TxInfo) error
Flush(ctx context.Context) error
PoolMeta() mempl.PoolMeta
Reap(ctx context.Context, opts ...mempl.ReapOptFn) (types.TxReaper, error)
Reap(ctx context.Context, opts ...mempl.ReapOptFn) (types.Data, error)
}

//----------------------------------------------
Expand Down
5 changes: 1 addition & 4 deletions rpc/core/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,7 @@ func UnconfirmedTxs(ctx *rpctypes.Context, limitPtr *int) (*ctypes.ResultUnconfi
return nil, err
}

txs, err := reaper.Txs(ctx.Context())
if err != nil {
return nil, err
}
txs := reaper.Txs
meta := env.Mempool.PoolMeta()
return &ctypes.ResultUnconfirmedTxs{
Count: len(txs),
Expand Down
6 changes: 3 additions & 3 deletions state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Mempool interface {
) error
NewHydratedBlock(ctx context.Context, block *types.Block) (*types.Block, error)
PrepBlockFinality(_ context.Context) (func(), error)
Reap(ctx context.Context, opts ...mempl.ReapOptFn) (types.TxReaper, error)
Reap(ctx context.Context, opts ...mempl.ReapOptFn) (types.Data, error)
}

// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -119,7 +119,7 @@ func (blockExec *BlockExecutor) CreateProposalBlock(
// Fetch a limited amount of valid txs
maxDataBytes := types.MaxDataBytes(maxBytes, evSize, state.Validators.Size())

reaper, err := blockExec.mempool.Reap(ctx,
data, err := blockExec.mempool.Reap(ctx,
mempl.ReapBytes(maxDataBytes),
mempl.ReapGas(maxGas),
// TODO(berg): we can do the verification as an option to the Reap. This is actually pretty
Expand All @@ -132,7 +132,7 @@ func (blockExec *BlockExecutor) CreateProposalBlock(
return nil, nil, err
}

bl, ps := state.MakeBlockV2(height, reaper.BlockData(), commit, evidence, proposerAddr)
bl, ps := state.MakeBlockV2(height, data, commit, evidence, proposerAddr)
return bl, ps, nil
}

Expand Down
4 changes: 2 additions & 2 deletions test/maverick/consensus/replay_stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func (e emptyMempool) PrepBlockFinality(_ context.Context) (func(), error) {
return func() {}, nil
}

func (e emptyMempool) Reap(ctx context.Context, opts ...mempl.ReapOptFn) (types.TxReaper, error) {
return types.Txs{}, nil
func (e emptyMempool) Reap(ctx context.Context, opts ...mempl.ReapOptFn) (types.Data, error) {
return types.Data{}, nil
}

func (e emptyMempool) TxsAvailable() <-chan struct{} {
Expand Down
4 changes: 4 additions & 0 deletions types/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package types

import (
"bytes"
"crypto/sha256"
"errors"
"fmt"

Expand All @@ -11,6 +12,9 @@ import (
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
)

// TxKey is the fixed length array key used as an index.
type TxKey [sha256.Size]byte

// Tx is an arbitrary byte array.
// NOTE: Tx has no types at this level, so when wire encoded it's just length-prefixed.
// Might we want types here ?
Expand Down
24 changes: 0 additions & 24 deletions types/tx_v2.go

This file was deleted.

0 comments on commit f4a1f96

Please sign in to comment.