diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index 8b2c3bfdef..0fcead7f13 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -407,7 +407,7 @@ func (lp *logPoller) Replay(ctx context.Context, fromBlock int64) (err error) { defer func() { if errors.Is(err, context.Canceled) { err = ErrReplayRequestAborted - } else if errors.Is(err, commontypes.ErrFinalityViolated) { + } else if errors.Is(err, ErrFinalityViolated) { // Replay only declares finality violation and does not resolve it, as it's possible that [fromBlock, savedFinalizedBlockNumber] // does not contain the violation. lp.lggr.Criticalw("Replay failed due to finality violation", "fromBlock", fromBlock, "err", err) @@ -633,7 +633,7 @@ func (lp *logPoller) run() { } err := lp.BackupPollAndSaveLogs(ctx) switch { - case errors.Is(err, commontypes.ErrFinalityViolated): + case errors.Is(err, ErrFinalityViolated): // BackupPoll only declares finality violation and does not resolve it, as it's possible that processed range // does not contain the violation. lp.lggr.Criticalw("Backup poll failed due to finality violation", "err", err) @@ -816,7 +816,7 @@ func (lp *logPoller) blocksFromFinalizedLogs(ctx context.Context, logs []types.L for i, log := range logs { if log.BlockHash != blocks[i].BlockHash { - return nil, fmt.Errorf("finalized log produced by tx %s has block hash %s that does not match fetched block's hash %s: %w", log.TxHash, log.BlockHash, blocks[i].BlockHash, commontypes.ErrFinalityViolated) + return nil, fmt.Errorf("finalized log produced by tx %s has block hash %s that does not match fetched block's hash %s: %w", log.TxHash, log.BlockHash, blocks[i].BlockHash, ErrFinalityViolated) } } @@ -949,7 +949,7 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren // conditions this would be equal to lastProcessed.BlockNumber + 1. func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) { err := lp.pollAndSaveLogs(ctx, currentBlockNumber) - if errors.Is(err, commontypes.ErrFinalityViolated) { + if errors.Is(err, ErrFinalityViolated) { lp.lggr.Criticalw("Failed to poll and save logs due to finality violation, retrying later", "err", err) lp.finalityViolated.Store(true) lp.SvcErrBuffer.Append(err) @@ -1107,7 +1107,7 @@ func (lp *logPoller) findBlockAfterLCA(ctx context.Context, current *evmtypes.He } lp.lggr.Criticalw("Reorg greater than finality depth detected", "finalityTag", lp.useFinalityTag, "current", current.Number, "latestFinalized", latestFinalizedBlockNumber) - return nil, fmt.Errorf("%w: finalized block hash %s does not match RPC's %s at height %d", commontypes.ErrFinalityViolated, ourParentBlockHash, blockAfterLCA.Hash, blockAfterLCA.Number) + return nil, fmt.Errorf("%w: finalized block hash %s does not match RPC's %s at height %d", ErrFinalityViolated, ourParentBlockHash, blockAfterLCA.Hash, blockAfterLCA.Number) } // PruneOldBlocks removes blocks that are > lp.keepFinalizedBlocksDepth behind the latest finalized block. @@ -1320,13 +1320,13 @@ func (lp *logPoller) fillRemainingBlocksFromRPC( logPollerBlocks := make(map[uint64]LogPollerBlock) for _, head := range evmBlocks { - logPollerBlocks[uint64(head.Number)] = LogPollerBlock{ - EvmChainId: head.EVMChainID, - BlockHash: head.Hash, - BlockNumber: head.Number, - BlockTimestamp: head.Timestamp, - FinalizedBlockNumber: head.Number, // always finalized; only matters if this block is returned by LatestBlock() - CreatedAt: head.Timestamp, + logPollerBlocks[uint64(head.BlockNumber)] = LogPollerBlock{ + EvmChainId: head.EvmChainId, + BlockHash: head.BlockHash, + BlockNumber: head.BlockNumber, + BlockTimestamp: head.BlockTimestamp, + FinalizedBlockNumber: head.BlockNumber, // always finalized; only matters if this block is returned by LatestBlock() + CreatedAt: head.BlockTimestamp, } } return logPollerBlocks, nil @@ -1382,7 +1382,7 @@ func (lp *logPoller) fetchBlocks(ctx context.Context, blocksRequested []uint64, if rpcChainReference.Hash != chainReference.BlockHash { return nil, fmt.Errorf("expected RPC's finalized block hash at hegiht %d to be %s but got %s: %w", - chainReference.BlockNumber, chainReference.BlockHash, rpcChainReference.Hash, commontypes.ErrFinalityViolated) + chainReference.BlockNumber, chainReference.BlockHash, rpcChainReference.Hash, ErrFinalityViolated) } reqs = reqs[:len(reqs)-1] // no need to include chain reference into results @@ -1489,7 +1489,7 @@ func ensureIdenticalBlocksBatches(fetched1, fetched2 map[uint64]*evmtypes.Head) } if head1.Hash != head2.Hash { - return fmt.Errorf("expected block %d to be finalized but got different hashes %s and %s from RPC: %w", num, head1.Hash, head2.Hash, commontypes.ErrFinalityViolated) + return fmt.Errorf("expected block %d to be finalized but got different hashes %s and %s from RPC: %w", num, head1.Hash, head2.Hash, ErrFinalityViolated) } } diff --git a/core/chains/evm/logpoller/log_poller_internal_test.go b/core/chains/evm/logpoller/log_poller_internal_test.go index 3850ecac0d..58ee52b8a3 100644 --- a/core/chains/evm/logpoller/log_poller_internal_test.go +++ b/core/chains/evm/logpoller/log_poller_internal_test.go @@ -709,7 +709,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) { require.NoError(t, orm.InsertBlock(tests.Context(t), common.HexToHash("0x123"), 2, time.Unix(10, 0), 2)) lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) lp.PollAndSaveLogs(tests.Context(t), 4) - require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated) + require.ErrorIs(t, lp.HealthReport()[lp.Name()], ErrFinalityViolated) }) t.Run("RPCs contradict each other and return different finalized blocks", func(t *testing.T) { lggr, _ := logger.TestObserved(t, zapcore.ErrorLevel) @@ -729,7 +729,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) { }) lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) lp.PollAndSaveLogs(tests.Context(t), 4) - require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated) + require.ErrorIs(t, lp.HealthReport()[lp.Name()], ErrFinalityViolated) }) t.Run("Log's hash does not match block's", func(t *testing.T) { lggr, _ := logger.TestObserved(t, zapcore.ErrorLevel) @@ -746,7 +746,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) { mockBatchCallContext(t, ec) lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) lp.PollAndSaveLogs(tests.Context(t), 4) - require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated) + require.ErrorIs(t, lp.HealthReport()[lp.Name()], ErrFinalityViolated) }) t.Run("Happy path", func(t *testing.T) { lggr, _ := logger.TestObserved(t, zapcore.ErrorLevel) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 9761b27dc6..20ed4f5951 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -993,9 +993,9 @@ func (o *DSORM) FilteredLogs(ctx context.Context, filter []query.Expression, lim func (o *DSORM) SelectLatestFinalizedBlock(ctx context.Context) (*LogPollerBlock, error) { var b LogPollerBlock if err := o.ds.GetContext(ctx, &b, - blocksQuery(`WHERE evm_chain_id = $1 AND block_number <= ( + `SELECT * FROM evm.log_poller_blocks WHERE evm_chain_id = $1 AND block_number <= ( SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1 - ) ORDER BY block_number DESC LIMIT 1`), ubig.New(o.chainID), + ) ORDER BY block_number DESC LIMIT 1`, ubig.New(o.chainID), ); err != nil { return nil, err }