Skip to content

Commit

Permalink
node: pass eventbus at construction time (tendermint#8084)
Browse files Browse the repository at this point in the history
* node: pass eventbus at construction time

* remove cruft
  • Loading branch information
tychoish authored Mar 8, 2022
1 parent 4edc8c5 commit c35d6d6
Show file tree
Hide file tree
Showing 17 changed files with 111 additions and 103 deletions.
5 changes: 5 additions & 0 deletions internal/blocksync/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/consensus"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/mempool/mock"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/p2ptest"
Expand Down Expand Up @@ -121,13 +122,17 @@ func (rts *reactorTestSuite) addNode(
require.NoError(t, err)
require.NoError(t, stateStore.Save(state))

eventbus := eventbus.NewDefault(logger)
require.NoError(t, eventbus.Start(ctx))

blockExec := sm.NewBlockExecutor(
stateStore,
log.TestingLogger(),
rts.app[nodeID],
mock.Mempool{},
sm.EmptyEvidencePool{},
blockStore,
eventbus,
)

for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ {
Expand Down
5 changes: 2 additions & 3 deletions internal/consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,13 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
evpool := evidence.NewPool(logger.With("module", "evidence"), evidenceDB, stateStore, blockStore, evidence.NopMetrics(), eventBus)

// Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
cs, err := NewState(ctx, logger, thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool)
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore, eventBus)
cs, err := NewState(ctx, logger, thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool, eventBus)
require.NoError(t, err)
// set private validator
pv := privVals[i]
cs.SetPrivValidator(ctx, pv)

cs.SetEventBus(eventBus)
cs.SetTimeoutTicker(tickerFunc())

states[i] = cs
Expand Down
10 changes: 5 additions & 5 deletions internal/consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,10 @@ func newStateWithConfigAndBlockStore(
stateStore := sm.NewStore(stateDB)
require.NoError(t, stateStore.Save(state))

blockExec := sm.NewBlockExecutor(stateStore, logger, proxyAppConnCon, mempool, evpool, blockStore)
eventBus := eventbus.NewDefault(logger.With("module", "events"))
require.NoError(t, eventBus.Start(ctx))

blockExec := sm.NewBlockExecutor(stateStore, logger, proxyAppConnCon, mempool, evpool, blockStore, eventBus)
cs, err := NewState(ctx,
logger.With("module", "consensus"),
thisConfig.Consensus,
Expand All @@ -491,17 +494,14 @@ func newStateWithConfigAndBlockStore(
blockStore,
mempool,
evpool,
eventBus,
)
if err != nil {
t.Fatal(err)
}

cs.SetPrivValidator(ctx, pv)

eventBus := eventbus.NewDefault(logger.With("module", "events"))
require.NoError(t, eventBus.Start(ctx))

cs.SetEventBus(eventBus)
return cs
}

Expand Down
8 changes: 2 additions & 6 deletions internal/consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func NewReactor(
cs *State,
channelCreator p2p.ChannelCreator,
peerUpdates *p2p.PeerUpdates,
eventBus *eventbus.EventBus,
waitSync bool,
metrics *Metrics,
) (*Reactor, error) {
Expand Down Expand Up @@ -166,6 +167,7 @@ func NewReactor(
state: cs,
waitSync: waitSync,
peers: make(map[types.NodeID]*PeerState),
eventBus: eventBus,
Metrics: metrics,
stateCh: stateCh,
dataCh: dataCh,
Expand Down Expand Up @@ -226,12 +228,6 @@ func (r *Reactor) OnStop() {
}
}

// SetEventBus sets the reactor's event bus.
func (r *Reactor) SetEventBus(b *eventbus.EventBus) {
r.eventBus = b
r.state.SetEventBus(b)
}

// WaitSync returns whether the consensus reactor is waiting for state/block sync.
func (r *Reactor) WaitSync() bool {
r.mtx.RLock()
Expand Down
14 changes: 6 additions & 8 deletions internal/consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,12 @@ func setup(
state,
chCreator(nodeID),
node.MakePeerUpdates(ctx, t),
state.eventBus,
true,
NopMetrics(),
)
require.NoError(t, err)

reactor.SetEventBus(state.eventBus)

blocksSub, err := state.eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{
ClientID: testSubscriber,
Query: types.EventQueryNewBlock,
Expand Down Expand Up @@ -504,17 +503,16 @@ func TestReactorWithEvidence(t *testing.T) {

evpool2 := sm.EmptyEvidencePool{}

blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
eventBus := eventbus.NewDefault(log.TestingLogger().With("module", "events"))
require.NoError(t, eventBus.Start(ctx))

blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore, eventBus)

cs, err := NewState(ctx, logger.With("validator", i, "module", "consensus"),
thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool2)
thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool2, eventBus)
require.NoError(t, err)
cs.SetPrivValidator(ctx, pv)

eventBus := eventbus.NewDefault(log.TestingLogger().With("module", "events"))
require.NoError(t, eventBus.Start(ctx))
cs.SetEventBus(eventBus)

cs.SetTimeoutTicker(tickerFunc())

states[i] = cs
Expand Down
11 changes: 4 additions & 7 deletions internal/consensus/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ type Handshaker struct {
stateStore sm.Store
initialState sm.State
store sm.BlockStore
eventBus types.BlockEventPublisher
eventBus *eventbus.EventBus
genDoc *types.GenesisDoc
logger log.Logger

Expand All @@ -217,7 +217,7 @@ func NewHandshaker(
stateStore sm.Store,
state sm.State,
store sm.BlockStore,
eventBus types.BlockEventPublisher,
eventBus *eventbus.EventBus,
genDoc *types.GenesisDoc,
) *Handshaker {

Expand Down Expand Up @@ -484,9 +484,7 @@ func (h *Handshaker) replayBlocks(
if i == finalBlock && !mutateState {
// We emit events for the index services at the final block due to the sync issue when
// the node shutdown during the block committing status.
blockExec := sm.NewBlockExecutor(
h.stateStore, h.logger, appClient, emptyMempool{}, sm.EmptyEvidencePool{}, h.store)
blockExec.SetEventBus(h.eventBus)
blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, appClient, emptyMempool{}, sm.EmptyEvidencePool{}, h.store, h.eventBus)
appHash, err = sm.ExecCommitBlock(ctx,
blockExec, appClient, block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
if err != nil {
Expand Down Expand Up @@ -528,8 +526,7 @@ func (h *Handshaker) replayBlock(

// Use stubs for both mempool and evidence pool since no transactions nor
// evidence are needed here - block already exists.
blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, appClient, emptyMempool{}, sm.EmptyEvidencePool{}, h.store)
blockExec.SetEventBus(h.eventBus)
blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, appClient, emptyMempool{}, sm.EmptyEvidencePool{}, h.store, h.eventBus)

var err error
state, err = blockExec.ApplyBlock(ctx, state, meta.BlockID, block)
Expand Down
8 changes: 3 additions & 5 deletions internal/consensus/replay_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,10 @@ func (pb *playback) replayReset(ctx context.Context, count int, newStepSub event
pb.cs.Wait()

newCS, err := NewState(ctx, pb.cs.logger, pb.cs.config, pb.stateStore, pb.cs.blockExec,
pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool)
pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool, pb.cs.eventBus)
if err != nil {
return err
}
newCS.SetEventBus(pb.cs.eventBus)
newCS.startForReplay()

if err := pb.fp.Close(); err != nil {
Expand Down Expand Up @@ -349,13 +348,12 @@ func newConsensusStateForReplay(
}

mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp, mempool, evpool, blockStore)
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp, mempool, evpool, blockStore, eventBus)

consensusState, err := NewState(ctx, logger, csConfig, stateStore, blockExec,
blockStore, mempool, evpool)
blockStore, mempool, evpool, eventBus)
if err != nil {
return nil, err
}
consensusState.SetEventBus(eventBus)
return consensusState, nil
}
38 changes: 26 additions & 12 deletions internal/consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,9 @@ func testHandshakeReplay(
filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_a_r%d", nBlocks, mode, rand.Int())))
t.Cleanup(func() { require.NoError(t, kvstoreApp.Close()) })

eventBus := eventbus.NewDefault(logger)
require.NoError(t, eventBus.Start(ctx))

clientCreator2 := abciclient.NewLocalClient(logger, kvstoreApp)
if nBlocks > 0 {
// run nBlocks against a new client to build up the app state.
Expand All @@ -757,7 +760,7 @@ func testHandshakeReplay(
stateStore := sm.NewStore(stateDB1)
err := stateStore.Save(genesisState)
require.NoError(t, err)
buildAppStateFromChain(ctx, t, proxyApp, stateStore, sim.Mempool, sim.Evpool, genesisState, chain, nBlocks, mode, store)
buildAppStateFromChain(ctx, t, proxyApp, stateStore, sim.Mempool, sim.Evpool, genesisState, chain, eventBus, nBlocks, mode, store)
}

// Prune block store if requested
Expand All @@ -772,7 +775,7 @@ func testHandshakeReplay(
// now start the app using the handshake - it should sync
genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile())
require.NoError(t, err)
handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc)
handshaker := NewHandshaker(logger, stateStore, state, store, eventBus, genDoc)
proxyApp := proxy.New(clientCreator2, logger, proxy.NopMetrics())
require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections")
require.True(t, proxyApp.IsRunning())
Expand Down Expand Up @@ -822,9 +825,10 @@ func applyBlock(
blk *types.Block,
appClient abciclient.Client,
blockStore *mockBlockStore,
eventBus *eventbus.EventBus,
) sm.State {
testPartSize := types.BlockPartSizeBytes
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), appClient, mempool, evpool, blockStore)
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), appClient, mempool, evpool, blockStore, eventBus)

bps, err := blk.MakePartSet(testPartSize)
require.NoError(t, err)
Expand All @@ -843,6 +847,7 @@ func buildAppStateFromChain(
evpool sm.EvidencePool,
state sm.State,
chain []*types.Block,
eventBus *eventbus.EventBus,
nBlocks int,
mode uint,
blockStore *mockBlockStore,
Expand All @@ -864,18 +869,18 @@ func buildAppStateFromChain(
case 0:
for i := 0; i < nBlocks; i++ {
block := chain[i]
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, appClient, blockStore)
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, appClient, blockStore, eventBus)
}
case 1, 2, 3:
for i := 0; i < nBlocks-1; i++ {
block := chain[i]
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, appClient, blockStore)
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, appClient, blockStore, eventBus)
}

if mode == 2 || mode == 3 {
// update the kvstore height and apphash
// as if we ran commit but not
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, chain[nBlocks-1], appClient, blockStore)
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, chain[nBlocks-1], appClient, blockStore, eventBus)
}
default:
require.Fail(t, "unknown mode %v", mode)
Expand Down Expand Up @@ -917,23 +922,26 @@ func buildTMStateFromChain(

require.NoError(t, stateStore.Save(state))

eventBus := eventbus.NewDefault(logger)
require.NoError(t, eventBus.Start(ctx))

switch mode {
case 0:
// sync right up
for _, block := range chain {
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, proxyApp, blockStore)
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, proxyApp, blockStore, eventBus)
}

case 1, 2, 3:
// sync up to the penultimate as if we stored the block.
// whether we commit or not depends on the appHash
for _, block := range chain[:len(chain)-1] {
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, proxyApp, blockStore)
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, proxyApp, blockStore, eventBus)
}

// apply the final block to a state copy so we can
// get the right next appHash but keep the state back
applyBlock(ctx, t, stateStore, mempool, evpool, state, chain[len(chain)-1], proxyApp, blockStore)
applyBlock(ctx, t, stateStore, mempool, evpool, state, chain[len(chain)-1], proxyApp, blockStore, eventBus)
default:
require.Fail(t, "unknown mode %v", mode)
}
Expand Down Expand Up @@ -970,6 +978,9 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {

logger := log.TestingLogger()

eventBus := eventbus.NewDefault(logger)
require.NoError(t, eventBus.Start(ctx))

// 2. Tendermint must panic if app returns wrong hash for the first block
// - RANDOM HASH
// - 0x02
Expand All @@ -983,7 +994,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {
t.Cleanup(func() { cancel(); proxyApp.Wait() })

assert.Panics(t, func() {
h := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc)
h := NewHandshaker(logger, stateStore, state, store, eventBus, genDoc)
if err = h.Handshake(ctx, proxyApp); err != nil {
t.Log(err)
}
Expand All @@ -1003,7 +1014,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {
t.Cleanup(func() { cancel(); proxyApp.Wait() })

assert.Panics(t, func() {
h := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc)
h := NewHandshaker(logger, stateStore, state, store, eventBus, genDoc)
if err = h.Handshake(ctx, proxyApp); err != nil {
t.Log(err)
}
Expand Down Expand Up @@ -1235,6 +1246,9 @@ func TestHandshakeUpdatesValidators(t *testing.T) {
app := &initChainApp{vals: types.TM2PB.ValidatorUpdates(vals)}
client := abciclient.NewLocalClient(logger, app)

eventBus := eventbus.NewDefault(logger)
require.NoError(t, eventBus.Start(ctx))

cfg, err := ResetConfig(t.TempDir(), "handshake_test_")
require.NoError(t, err)
t.Cleanup(func() { _ = os.RemoveAll(cfg.RootDir) })
Expand All @@ -1252,7 +1266,7 @@ func TestHandshakeUpdatesValidators(t *testing.T) {
genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile())
require.NoError(t, err)

handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc)
handshaker := NewHandshaker(logger, stateStore, state, store, eventBus, genDoc)
proxyApp := proxy.New(client, logger, proxy.NopMetrics())
require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections")

Expand Down
8 changes: 2 additions & 6 deletions internal/consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,11 @@ func NewState(
blockStore sm.BlockStore,
txNotifier txNotifier,
evpool evidencePool,
eventBus *eventbus.EventBus,
options ...StateOption,
) (*State, error) {
cs := &State{
eventBus: eventBus,
logger: logger,
config: cfg,
blockExec: blockExec,
Expand Down Expand Up @@ -260,12 +262,6 @@ func (cs *State) updateStateFromStore(ctx context.Context) error {
return nil
}

// SetEventBus sets event bus.
func (cs *State) SetEventBus(b *eventbus.EventBus) {
cs.eventBus = b
cs.blockExec.SetEventBus(b)
}

// StateMetrics sets the metrics.
func StateMetrics(metrics *Metrics) StateOption {
return func(cs *State) { cs.metrics = metrics }
Expand Down
5 changes: 2 additions & 3 deletions internal/consensus/wal_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,12 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr

mempool := emptyMempool{}
evpool := sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp, mempool, evpool, blockStore)
consensusState, err := NewState(ctx, logger, cfg.Consensus, stateStore, blockExec, blockStore, mempool, evpool)
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp, mempool, evpool, blockStore, eventBus)
consensusState, err := NewState(ctx, logger, cfg.Consensus, stateStore, blockExec, blockStore, mempool, evpool, eventBus)
if err != nil {
t.Fatal(err)
}

consensusState.SetEventBus(eventBus)
if privValidator != nil && privValidator != (*privval.FilePV)(nil) {
consensusState.SetPrivValidator(ctx, privValidator)
}
Expand Down
Loading

0 comments on commit c35d6d6

Please sign in to comment.