Skip to content

Commit

Permalink
core/filtermaps: two dimensional log filter
Browse files Browse the repository at this point in the history
  • Loading branch information
zsfelfoldi committed Sep 15, 2024
1 parent ae70744 commit 9ad34e5
Show file tree
Hide file tree
Showing 10 changed files with 1,991 additions and 12 deletions.
582 changes: 582 additions & 0 deletions core/filtermaps/filtermaps.go

Large diffs are not rendered by default.

618 changes: 618 additions & 0 deletions core/filtermaps/indexer.go

Large diffs are not rendered by default.

500 changes: 500 additions & 0 deletions core/filtermaps/matcher.go

Large diffs are not rendered by default.

206 changes: 206 additions & 0 deletions core/rawdb/accessors_indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package rawdb

import (
"bytes"
"encoding/binary"
"errors"
"math/big"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -179,3 +181,207 @@ func DeleteBloombits(db ethdb.Database, bit uint, from uint64, to uint64) {
log.Crit("Failed to delete bloom bits", "err", it.Error())
}
}

var emptyRow = []uint32{}

// ReadFilterMapRow retrieves a filter map row at the given mapRowIndex
// (see filtermaps.mapRowIndex for the storage index encoding).
// Note that zero length rows are not stored in the database and therefore all
// non-existent entries are interpreted as empty rows and return no error.
// Also note that the mapRowIndex indexing scheme is the same as the one
// proposed in EIP-7745 for tree-hashing the filter map structure and for the
// same data proximity reasons it is also suitable for database representation.
// See also:
// https://eips.ethereum.org/EIPS/eip-7745#hash-tree-structure
func ReadFilterMapRow(db ethdb.KeyValueReader, mapRowIndex uint64) ([]uint32, error) {
key := filterMapRowKey(mapRowIndex)
has, err := db.Has(key)
if err != nil {
return nil, err
}
if !has {
return emptyRow, nil
}
encRow, err := db.Get(key)
if err != nil {
return nil, err
}
if len(encRow)&3 != 0 {
return nil, errors.New("Invalid encoded filter row length")
}
row := make([]uint32, len(encRow)/4)
for i := range row {
row[i] = binary.LittleEndian.Uint32(encRow[i*4 : (i+1)*4])
}
return row, nil
}

// WriteFilterMapRow stores a filter map row at the given mapRowIndex or deletes
// any existing entry if the row is empty.
func WriteFilterMapRow(db ethdb.KeyValueWriter, mapRowIndex uint64, row []uint32) {
var err error
if len(row) > 0 {
encRow := make([]byte, len(row)*4)
for i, c := range row {
binary.LittleEndian.PutUint32(encRow[i*4:(i+1)*4], c)
}
err = db.Put(filterMapRowKey(mapRowIndex), encRow)
} else {
err = db.Delete(filterMapRowKey(mapRowIndex))
}
if err != nil {
log.Crit("Failed to store filter map row", "err", err)
}
}

// ReadFilterMapBlockPtr retrieves the number of the block that generated the
// first log value entry of the given map.
func ReadFilterMapBlockPtr(db ethdb.KeyValueReader, mapIndex uint32) (uint64, error) {
encPtr, err := db.Get(filterMapBlockPtrKey(mapIndex))
if err != nil {
return 0, err
}
if len(encPtr) != 8 {
return 0, errors.New("Invalid block number encoding")
}
return binary.BigEndian.Uint64(encPtr), nil
}

// WriteFilterMapBlockPtr stores the number of the block that generated the
// first log value entry of the given map.
func WriteFilterMapBlockPtr(db ethdb.KeyValueWriter, mapIndex uint32, blockNumber uint64) {
var encPtr [8]byte
binary.BigEndian.PutUint64(encPtr[:], blockNumber)
if err := db.Put(filterMapBlockPtrKey(mapIndex), encPtr[:]); err != nil {
log.Crit("Failed to store filter map block pointer", "err", err)
}
}

// DeleteFilterMapBlockPtr deletes the number of the block that generated the
// first log value entry of the given map.
func DeleteFilterMapBlockPtr(db ethdb.KeyValueWriter, mapIndex uint32) {
if err := db.Delete(filterMapBlockPtrKey(mapIndex)); err != nil {
log.Crit("Failed to delete filter map block pointer", "err", err)
}
}

// ReadBlockLvPointer retrieves the starting log value index where the log values
// generated by the given block are located.
func ReadBlockLvPointer(db ethdb.KeyValueReader, blockNumber uint64) (uint64, error) {
encPtr, err := db.Get(blockLVKey(blockNumber))
if err != nil {
return 0, err
}
if len(encPtr) != 8 {
return 0, errors.New("Invalid log value pointer encoding")
}
return binary.BigEndian.Uint64(encPtr), nil
}

// WriteBlockLvPointer stores the starting log value index where the log values
// generated by the given block are located.
func WriteBlockLvPointer(db ethdb.KeyValueWriter, blockNumber, lvPointer uint64) {
var encPtr [8]byte
binary.BigEndian.PutUint64(encPtr[:], lvPointer)
if err := db.Put(blockLVKey(blockNumber), encPtr[:]); err != nil {
log.Crit("Failed to store block log value pointer", "err", err)
}
}

// DeleteBlockLvPointer deletes the starting log value index where the log values
// generated by the given block are located.
func DeleteBlockLvPointer(db ethdb.KeyValueWriter, blockNumber uint64) {
if err := db.Delete(blockLVKey(blockNumber)); err != nil {
log.Crit("Failed to delete block log value pointer", "err", err)
}
}

// FilterMapsRange is a storage representation of the block range covered by the
// filter maps structure and the corresponting log value index range.
type FilterMapsRange struct {
Initialized bool
HeadLvPointer, TailLvPointer uint64
HeadBlockNumber, TailBlockNumber uint64
HeadBlockHash, TailParentHash common.Hash
}

// ReadFilterMapsRange retrieves the filter maps range data. Note that if the
// database entry is not present, that is interpreted as a valid non-initialized
// state and returns a blank range structure and no error.
func ReadFilterMapsRange(db ethdb.KeyValueReader) (FilterMapsRange, error) {
if has, err := db.Has(filterMapsRangeKey); !has || err != nil {
return FilterMapsRange{}, err
}
encRange, err := db.Get(filterMapsRangeKey)
if err != nil {
return FilterMapsRange{}, err
}
var fmRange FilterMapsRange
if err := rlp.DecodeBytes(encRange, &fmRange); err != nil {
return FilterMapsRange{}, err
}
return fmRange, err
}

// WriteFilterMapsRange stores the filter maps range data.
func WriteFilterMapsRange(db ethdb.KeyValueWriter, fmRange FilterMapsRange) {
encRange, err := rlp.EncodeToBytes(&fmRange)
if err != nil {
log.Crit("Failed to encode filter maps range", "err", err)
}
if err := db.Put(filterMapsRangeKey, encRange); err != nil {
log.Crit("Failed to store filter maps range", "err", err)
}
}

// DeleteFilterMapsRange deletes the filter maps range data which is interpreted
// as reverting to the un-initialized state.
func DeleteFilterMapsRange(db ethdb.KeyValueWriter) {
if err := db.Delete(filterMapsRangeKey); err != nil {
log.Crit("Failed to delete filter maps range", "err", err)
}
}

// RevertPoint is the storage representation of a filter maps revert point.
type RevertPoint struct {
BlockHash common.Hash
MapIndex uint32
RowLength []uint
}

// ReadRevertPoint retrieves the revert point for the given block number if
// present. Note that revert points may or may not exist for any block number
// and a non-existent entry causes no error.
func ReadRevertPoint(db ethdb.KeyValueReader, blockNumber uint64) (*RevertPoint, error) {
key := revertPointKey(blockNumber)
if has, err := db.Has(key); !has || err != nil {
return nil, err
}
enc, err := db.Get(key)
if err != nil {
return nil, err
}
rp := new(RevertPoint)
if err := rlp.DecodeBytes(enc, rp); err != nil {
return nil, err
}
return rp, nil
}

// WriteRevertPoint stores a revert point for the given block number.
func WriteRevertPoint(db ethdb.KeyValueWriter, blockNumber uint64, rp *RevertPoint) {
enc, err := rlp.EncodeToBytes(rp)
if err != nil {
log.Crit("Failed to encode revert point", "err", err)
}
if err := db.Put(revertPointKey(blockNumber), enc); err != nil {
log.Crit("Failed to store revert point", "err", err)
}
}

// DeleteRevertPoint deletes the given revert point.
func DeleteRevertPoint(db ethdb.KeyValueWriter, blockNumber uint64) {
if err := db.Delete(revertPointKey(blockNumber)); err != nil {
log.Crit("Failed to delete revert point", "err", err)
}
}
31 changes: 31 additions & 0 deletions core/rawdb/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,13 @@ var (
FixedCommitteeRootKey = []byte("fixedRoot-") // bigEndian64(syncPeriod) -> committee root hash
SyncCommitteeKey = []byte("committee-") // bigEndian64(syncPeriod) -> serialized committee

FilterMapsPrefix = []byte("fT5-") //TODO fm-
filterMapsRangeKey = append(FilterMapsPrefix, byte('R'))
filterMapRowPrefix = append(FilterMapsPrefix, byte('r')) // filterMapRowPrefix + mapRowIndex (uint64 big endian) -> filter row
filterMapBlockPtrPrefix = append(FilterMapsPrefix, byte('b')) // filterMapBlockPtrPrefix + mapIndex (uint32 big endian) -> block number (uint64 big endian)
blockLVPrefix = append(FilterMapsPrefix, byte('p')) // blockLVPrefix + num (uint64 big endian) -> log value pointer (uint64 big endian)
revertPointPrefix = append(FilterMapsPrefix, byte('v')) // revertPointPrefix + num (uint64 big endian) -> revert data

preimageCounter = metrics.NewRegisteredCounter("db/preimage/total", nil)
preimageHitCounter = metrics.NewRegisteredCounter("db/preimage/hits", nil)
)
Expand Down Expand Up @@ -346,3 +353,27 @@ func IsStorageTrieNode(key []byte) bool {
ok, _, _ := ResolveStorageTrieNode(key)
return ok
}

// filterMapRowKey = filterMapRowPrefix + mapRowIndex (uint64 big endian)
func filterMapRowKey(mapRowIndex uint64) []byte {
key := append(filterMapRowPrefix, make([]byte, 8)...)
binary.BigEndian.PutUint64(key[1:], mapRowIndex)
return key
}

// filterMapBlockPtrKey = filterMapBlockPtrPrefix + mapIndex (uint32 big endian)
func filterMapBlockPtrKey(mapIndex uint32) []byte {
key := append(filterMapBlockPtrPrefix, make([]byte, 4)...)
binary.BigEndian.PutUint32(key[1:], mapIndex)
return key
}

// blockLVKey = blockLVPrefix + num (uint64 big endian)
func blockLVKey(number uint64) []byte {
return append(blockLVPrefix, encodeBlockNumber(number)...)
}

// revertPointKey = revertPointPrefix + num (uint64 big endian)
func revertPointKey(number uint64) []byte {
return append(revertPointPrefix, encodeBlockNumber(number)...)
}
2 changes: 2 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/txpool"
Expand All @@ -44,6 +45,7 @@ import (

// EthAPIBackend implements ethapi.Backend and tracers.Backend for full nodes
type EthAPIBackend struct {
*filtermaps.FilterMapsMatcherBackend
extRPCEnabled bool
allowUnprotectedTxs bool
eth *Ethereum
Expand Down
13 changes: 12 additions & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state/pruner"
"github.com/ethereum/go-ethereum/core/txpool"
Expand Down Expand Up @@ -83,6 +84,8 @@ type Ethereum struct {
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
closeBloomHandler chan struct{}

filterMaps *filtermaps.FilterMaps

APIBackend *EthAPIBackend

miner *miner.Miner
Expand Down Expand Up @@ -221,6 +224,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
return nil, err
}
eth.bloomIndexer.Start(eth.blockchain)
eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.blockchain)

if config.BlobPool.Datadir != "" {
config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir)
Expand Down Expand Up @@ -255,7 +259,13 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
eth.miner = miner.New(eth, config.Miner, eth.engine)
eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData))

eth.APIBackend = &EthAPIBackend{stack.Config().ExtRPCEnabled(), stack.Config().AllowUnprotectedTxs, eth, nil}
eth.APIBackend = &EthAPIBackend{
FilterMapsMatcherBackend: (*filtermaps.FilterMapsMatcherBackend)(eth.filterMaps),
extRPCEnabled: stack.Config().ExtRPCEnabled(),
allowUnprotectedTxs: stack.Config().AllowUnprotectedTxs,
eth: eth,
gpo: nil,
}
if eth.APIBackend.allowUnprotectedTxs {
log.Info("Unprotected transactions allowed")
}
Expand Down Expand Up @@ -407,6 +417,7 @@ func (s *Ethereum) Stop() error {

// Then stop everything else.
s.bloomIndexer.Close()
s.filterMaps.Close()
close(s.closeBloomHandler)
s.txPool.Close()
s.blockchain.Stop()
Expand Down
41 changes: 30 additions & 11 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@ package filters
import (
"context"
"errors"
"fmt"
"math/big"

//"reflect"
"slices"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
)
Expand All @@ -35,8 +40,9 @@ type Filter struct {
addresses []common.Address
topics [][]common.Hash

block *common.Hash // Block hash if filtering a single block
begin, end int64 // Range interval if filtering multiple blocks
block *common.Hash // Block hash if filtering a single block
begin, end int64 // Range interval if filtering multiple blocks
bbMatchCount uint64

matcher *bloombits.Matcher
}
Expand Down Expand Up @@ -148,16 +154,28 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
return nil, err
}

logChan, errChan := f.rangeLogsAsync(ctx)
var logs []*types.Log
for {
select {
case log := <-logChan:
logs = append(logs, log)
case err := <-errChan:
return logs, err
start := time.Now()
logs, err := filtermaps.GetPotentialMatches(ctx, f.sys.backend, uint64(f.begin), uint64(f.end), f.addresses, f.topics)
fmLogs := filterLogs(logs, nil, nil, f.addresses, f.topics)
fmt.Println("filtermaps (new) runtime", time.Since(start), "true matches", len(fmLogs), "false positives", len(logs)-len(fmLogs))

//TODO remove
/*f.bbMatchCount = 0
start = time.Now()
logChan, errChan := f.rangeLogsAsync(ctx)
var bbLogs []*types.Log
loop:
for {
select {
case log := <-logChan:
bbLogs = append(bbLogs, log)
case <-errChan:
break loop
}
}
}
fmt.Println("bloombits (old) runtime", time.Since(start), "true matches", len(bbLogs), "false positives", f.bbMatchCount-uint64(len(bbLogs)))
fmt.Println("DeepEqual", reflect.DeepEqual(fmLogs, bbLogs))*/
return fmLogs, err
}

// rangeLogsAsync retrieves block-range logs that match the filter criteria asynchronously,
Expand Down Expand Up @@ -218,6 +236,7 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64, logChan chan *type
for {
select {
case number, ok := <-matches:
f.bbMatchCount++
// Abort if all matches have been fulfilled
if !ok {
err := session.Error()
Expand Down
Loading

0 comments on commit 9ad34e5

Please sign in to comment.