Skip to content

Commit

Permalink
p2p/discover: fix ENR filtering (#2770)
Browse files Browse the repository at this point in the history
  • Loading branch information
MatusKysel authored Nov 29, 2024
1 parent 2ae712b commit e2f2111
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 44 deletions.
2 changes: 1 addition & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,7 @@ var (
Aliases: []string{"discv5"},
Usage: "Enables the V5 discovery mechanism",
Category: flags.NetworkingCategory,
Value: true,
Value: false,
}
NetrestrictFlag = &cli.StringFlag{
Name: "netrestrict",
Expand Down
6 changes: 0 additions & 6 deletions p2p/discover/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package discover

import (
"net"
"slices"
"sort"
"time"
Expand Down Expand Up @@ -51,11 +50,6 @@ func unwrapNodes(ns []*tableNode) []*enode.Node {
return result
}

//nolint:unused
func (n *tableNode) addr() *net.UDPAddr {
return &net.UDPAddr{IP: n.IP(), Port: n.UDP()}
}

func (n *tableNode) String() string {
return n.Node.String()
}
Expand Down
66 changes: 41 additions & 25 deletions p2p/discover/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,10 @@ type bucket struct {
}

type addNodeOp struct {
node *enode.Node
isInbound bool
forceSetLive bool // for tests
node *enode.Node
isInbound bool
forceSetLive bool // for tests
syncExecution bool // for tests
}

type trackRequestOp struct {
Expand Down Expand Up @@ -320,7 +321,7 @@ func (tab *Table) len() (n int) {
//
// The caller must not hold tab.mutex.
func (tab *Table) addFoundNode(n *enode.Node, forceSetLive bool) bool {
op := addNodeOp{node: n, isInbound: false, forceSetLive: forceSetLive}
op := addNodeOp{node: n, isInbound: false, forceSetLive: forceSetLive, syncExecution: true}
select {
case tab.addNodeCh <- op:
return <-tab.addNodeHandled
Expand All @@ -337,9 +338,20 @@ func (tab *Table) addFoundNode(n *enode.Node, forceSetLive bool) bool {
// repeatedly.
//
// The caller must not hold tab.mutex.
func (tab *Table) addInboundNode(n *enode.Node) bool {
func (tab *Table) addInboundNode(n *enode.Node) {
op := addNodeOp{node: n, isInbound: true}
select {
case tab.addNodeCh <- op:
return
case <-tab.closeReq:
return
}
}

// Only for testing purposes
func (tab *Table) addInboundNodeSync(n *enode.Node) bool {
op := addNodeOp{node: n, isInbound: true, syncExecution: true}
select {
case tab.addNodeCh <- op:
return <-tab.addNodeHandled
case <-tab.closeReq:
Expand Down Expand Up @@ -387,10 +399,16 @@ loop:
tab.revalidation.handleResponse(tab, r)

case op := <-tab.addNodeCh:
tab.mutex.Lock()
ok := tab.handleAddNode(op)
tab.mutex.Unlock()
tab.addNodeHandled <- ok
// only happens in tests
if op.syncExecution {
ok := tab.handleAddNode(op)
tab.addNodeHandled <- ok
} else {
// async execution as handleAddNode is blocking
go func() {
tab.handleAddNode(op)
}()
}

case op := <-tab.trackRequestCh:
tab.handleTrackRequest(op)
Expand Down Expand Up @@ -468,9 +486,7 @@ func (tab *Table) loadSeedNodes() {
addr, _ := seed.UDPEndpoint()
tab.log.Trace("Found seed node in database", "id", seed.ID(), "addr", addr, "age", age)
}
tab.mutex.Lock()
tab.handleAddNode(addNodeOp{node: seed, isInbound: false})
tab.mutex.Unlock()
go tab.handleAddNode(addNodeOp{node: seed, isInbound: false})
}
}

Expand All @@ -492,16 +508,15 @@ func (tab *Table) bucketAtDistance(d int) *bucket {
return tab.buckets[d-bucketMinDistance-1]
}

//nolint:unused
func (tab *Table) filterNode(n *tableNode) bool {
func (tab *Table) filterNode(n *enode.Node) bool {
if tab.enrFilter == nil {
return false
}
if node, err := tab.net.RequestENR(n.Node); err != nil {
tab.log.Debug("ENR request failed", "id", n.ID(), "addr", n.addr(), "err", err)
if node, err := tab.net.RequestENR(n); err != nil {
tab.log.Debug("ENR request failed", "id", n.ID(), "ipAddr", n.IPAddr(), "updPort", n.UDP(), "err", err)
return false
} else if !tab.enrFilter(node.Record()) {
tab.log.Trace("ENR record filter out", "id", n.ID(), "addr", n.addr())
tab.log.Trace("ENR record filter out", "id", n.ID(), "ipAddr", n.IPAddr(), "updPort", n.UDP())
return true
}
return false
Expand Down Expand Up @@ -541,6 +556,13 @@ func (tab *Table) handleAddNode(req addNodeOp) bool {
return false
}

if tab.filterNode(req.node) {
return false
}

tab.mutex.Lock()
defer tab.mutex.Unlock()

// For nodes from inbound contact, there is an additional safety measure: if the table
// is still initializing the node is not added.
if req.isInbound && !tab.isInitDone() {
Expand Down Expand Up @@ -570,11 +592,6 @@ func (tab *Table) handleAddNode(req addNodeOp) bool {
wn.isValidatedLive = true
}

// TODO(Matus): fix the filterNode feature
// if tab.filterNode(wn) {
// return false
// }

b.entries = append(b.entries, wn)
b.replacements = deleteNode(b.replacements, wn.ID())
tab.nodeAdded(b, wn)
Expand Down Expand Up @@ -705,8 +722,6 @@ func (tab *Table) handleTrackRequest(op trackRequestOp) {
}

tab.mutex.Lock()
defer tab.mutex.Unlock()

b := tab.bucket(op.node.ID())
// Remove the node from the local table if it fails to return anything useful too
// many times, but only if there are enough other nodes in the bucket. This latter
Expand All @@ -715,10 +730,11 @@ func (tab *Table) handleTrackRequest(op trackRequestOp) {
if fails >= maxFindnodeFailures && len(b.entries) >= bucketSize/4 {
tab.deleteInBucket(b, op.node.ID())
}
tab.mutex.Unlock()

// Add found nodes.
for _, n := range op.foundNodes {
tab.handleAddNode(addNodeOp{n, false, false})
go tab.handleAddNode(addNodeOp{n, false, false, false})
}
}

Expand Down
14 changes: 6 additions & 8 deletions p2p/discover/table_reval.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package discover

import (
"errors"
"fmt"
"math"
"slices"
Expand Down Expand Up @@ -121,13 +120,7 @@ func (tab *Table) doRevalidate(resp revalidationResponse, node *enode.Node) {
if err != nil {
tab.log.Debug("ENR request failed", "id", node.ID(), "err", err)
} else {
if tab.enrFilter != nil && !tab.enrFilter(newrec.Record()) {
tab.log.Trace("ENR record filter out", "id", node.ID(), "err", errors.New("filtered node"))
// TODO: use didRespond to express failure temporarily
resp.didRespond = false
} else {
resp.newRecord = newrec
}
resp.newRecord = newrec
}
}

Expand Down Expand Up @@ -181,6 +174,11 @@ func (tr *tableRevalidation) handleResponse(tab *Table, resp revalidationRespons
tab.log.Debug("Node revalidated", "b", b.index, "id", n.ID(), "checks", n.livenessChecks, "q", n.revalList.name)
var endpointChanged bool
if resp.newRecord != nil {
if tab.enrFilter != nil && !tab.enrFilter(resp.newRecord.Record()) {
tab.log.Trace("ENR record filter out", "id", n.ID())
tab.deleteInBucket(b, n.ID())
return
}
_, endpointChanged = tab.bumpInBucket(b, resp.newRecord, false)
}

Expand Down
8 changes: 4 additions & 4 deletions p2p/discover/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func TestTable_addInboundNode(t *testing.T) {
newrec := n2.Record()
newrec.Set(enr.IP{99, 99, 99, 99})
n2v2 := enode.SignNull(newrec, n2.ID())
tab.addInboundNode(n2v2)
tab.addInboundNodeSync(n2v2)
checkBucketContent(t, tab, []*enode.Node{n1, n2v2})

// Try updating n2 without sequence number change. The update is accepted
Expand All @@ -305,7 +305,7 @@ func TestTable_addInboundNode(t *testing.T) {
newrec.Set(enr.IP{100, 100, 100, 100})
newrec.SetSeq(n2.Seq())
n2v3 := enode.SignNull(newrec, n2.ID())
tab.addInboundNode(n2v3)
tab.addInboundNodeSync(n2v3)
checkBucketContent(t, tab, []*enode.Node{n1, n2v3})
}

Expand Down Expand Up @@ -349,13 +349,13 @@ func TestTable_addInboundNodeUpdateV4Accept(t *testing.T) {
// Add a v4 node.
key, _ := crypto.HexToECDSA("dd3757a8075e88d0f2b1431e7d3c5b1562e1c0aab9643707e8cbfcc8dae5cfe3")
n1 := enode.NewV4(&key.PublicKey, net.IP{88, 77, 66, 1}, 9000, 9000)
tab.addInboundNode(n1)
tab.addInboundNodeSync(n1)
checkBucketContent(t, tab, []*enode.Node{n1})

// Add an updated version with changed IP.
// The update will be accepted because it is inbound.
n1v2 := enode.NewV4(&key.PublicKey, net.IP{99, 99, 99, 99}, 9000, 9000)
tab.addInboundNode(n1v2)
tab.addInboundNodeSync(n1v2)
checkBucketContent(t, tab, []*enode.Node{n1v2})
}

Expand Down
3 changes: 3 additions & 0 deletions p2p/discover/v5_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,9 @@ func (t *UDPv5) verifyResponseNode(c *callV5, r *enr.Record, distances []uint, s
if node.UDP() <= 1024 {
return nil, errLowPort
}
if t.tab.enrFilter != nil && !t.tab.enrFilter(r) {
return nil, errors.New("filtered by ENR filter")
}
if distances != nil {
nd := enode.LogDist(c.id, node.ID())
if !slices.Contains(distances, uint(nd)) {
Expand Down

0 comments on commit e2f2111

Please sign in to comment.