diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index d30d3d851d..3a668db9e9 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -974,7 +974,7 @@ var ( Aliases: []string{"discv5"}, Usage: "Enables the V5 discovery mechanism", Category: flags.NetworkingCategory, - Value: true, + Value: false, } NetrestrictFlag = &cli.StringFlag{ Name: "netrestrict", diff --git a/p2p/discover/node.go b/p2p/discover/node.go index 8454ed9b61..ac34b7c5b2 100644 --- a/p2p/discover/node.go +++ b/p2p/discover/node.go @@ -17,7 +17,6 @@ package discover import ( - "net" "slices" "sort" "time" @@ -51,11 +50,6 @@ func unwrapNodes(ns []*tableNode) []*enode.Node { return result } -//nolint:unused -func (n *tableNode) addr() *net.UDPAddr { - return &net.UDPAddr{IP: n.IP(), Port: n.UDP()} -} - func (n *tableNode) String() string { return n.Node.String() } diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 4076e42c1d..7c95fffd43 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -112,9 +112,10 @@ type bucket struct { } type addNodeOp struct { - node *enode.Node - isInbound bool - forceSetLive bool // for tests + node *enode.Node + isInbound bool + forceSetLive bool // for tests + syncExecution bool // for tests } type trackRequestOp struct { @@ -320,7 +321,7 @@ func (tab *Table) len() (n int) { // // The caller must not hold tab.mutex. func (tab *Table) addFoundNode(n *enode.Node, forceSetLive bool) bool { - op := addNodeOp{node: n, isInbound: false, forceSetLive: forceSetLive} + op := addNodeOp{node: n, isInbound: false, forceSetLive: forceSetLive, syncExecution: true} select { case tab.addNodeCh <- op: return <-tab.addNodeHandled @@ -337,9 +338,20 @@ func (tab *Table) addFoundNode(n *enode.Node, forceSetLive bool) bool { // repeatedly. // // The caller must not hold tab.mutex. -func (tab *Table) addInboundNode(n *enode.Node) bool { +func (tab *Table) addInboundNode(n *enode.Node) { op := addNodeOp{node: n, isInbound: true} select { + case tab.addNodeCh <- op: + return + case <-tab.closeReq: + return + } +} + +// Only for testing purposes +func (tab *Table) addInboundNodeSync(n *enode.Node) bool { + op := addNodeOp{node: n, isInbound: true, syncExecution: true} + select { case tab.addNodeCh <- op: return <-tab.addNodeHandled case <-tab.closeReq: @@ -387,10 +399,16 @@ loop: tab.revalidation.handleResponse(tab, r) case op := <-tab.addNodeCh: - tab.mutex.Lock() - ok := tab.handleAddNode(op) - tab.mutex.Unlock() - tab.addNodeHandled <- ok + // only happens in tests + if op.syncExecution { + ok := tab.handleAddNode(op) + tab.addNodeHandled <- ok + } else { + // async execution as handleAddNode is blocking + go func() { + tab.handleAddNode(op) + }() + } case op := <-tab.trackRequestCh: tab.handleTrackRequest(op) @@ -468,9 +486,7 @@ func (tab *Table) loadSeedNodes() { addr, _ := seed.UDPEndpoint() tab.log.Trace("Found seed node in database", "id", seed.ID(), "addr", addr, "age", age) } - tab.mutex.Lock() - tab.handleAddNode(addNodeOp{node: seed, isInbound: false}) - tab.mutex.Unlock() + go tab.handleAddNode(addNodeOp{node: seed, isInbound: false}) } } @@ -492,16 +508,15 @@ func (tab *Table) bucketAtDistance(d int) *bucket { return tab.buckets[d-bucketMinDistance-1] } -//nolint:unused -func (tab *Table) filterNode(n *tableNode) bool { +func (tab *Table) filterNode(n *enode.Node) bool { if tab.enrFilter == nil { return false } - if node, err := tab.net.RequestENR(n.Node); err != nil { - tab.log.Debug("ENR request failed", "id", n.ID(), "addr", n.addr(), "err", err) + if node, err := tab.net.RequestENR(n); err != nil { + tab.log.Debug("ENR request failed", "id", n.ID(), "ipAddr", n.IPAddr(), "updPort", n.UDP(), "err", err) return false } else if !tab.enrFilter(node.Record()) { - tab.log.Trace("ENR record filter out", "id", n.ID(), "addr", n.addr()) + tab.log.Trace("ENR record filter out", "id", n.ID(), "ipAddr", n.IPAddr(), "updPort", n.UDP()) return true } return false @@ -541,6 +556,13 @@ func (tab *Table) handleAddNode(req addNodeOp) bool { return false } + if tab.filterNode(req.node) { + return false + } + + tab.mutex.Lock() + defer tab.mutex.Unlock() + // For nodes from inbound contact, there is an additional safety measure: if the table // is still initializing the node is not added. if req.isInbound && !tab.isInitDone() { @@ -570,11 +592,6 @@ func (tab *Table) handleAddNode(req addNodeOp) bool { wn.isValidatedLive = true } - // TODO(Matus): fix the filterNode feature - // if tab.filterNode(wn) { - // return false - // } - b.entries = append(b.entries, wn) b.replacements = deleteNode(b.replacements, wn.ID()) tab.nodeAdded(b, wn) @@ -705,8 +722,6 @@ func (tab *Table) handleTrackRequest(op trackRequestOp) { } tab.mutex.Lock() - defer tab.mutex.Unlock() - b := tab.bucket(op.node.ID()) // Remove the node from the local table if it fails to return anything useful too // many times, but only if there are enough other nodes in the bucket. This latter @@ -715,10 +730,11 @@ func (tab *Table) handleTrackRequest(op trackRequestOp) { if fails >= maxFindnodeFailures && len(b.entries) >= bucketSize/4 { tab.deleteInBucket(b, op.node.ID()) } + tab.mutex.Unlock() // Add found nodes. for _, n := range op.foundNodes { - tab.handleAddNode(addNodeOp{n, false, false}) + go tab.handleAddNode(addNodeOp{n, false, false, false}) } } diff --git a/p2p/discover/table_reval.go b/p2p/discover/table_reval.go index 9e9abc5aa4..d4326fbcab 100644 --- a/p2p/discover/table_reval.go +++ b/p2p/discover/table_reval.go @@ -17,7 +17,6 @@ package discover import ( - "errors" "fmt" "math" "slices" @@ -121,13 +120,7 @@ func (tab *Table) doRevalidate(resp revalidationResponse, node *enode.Node) { if err != nil { tab.log.Debug("ENR request failed", "id", node.ID(), "err", err) } else { - if tab.enrFilter != nil && !tab.enrFilter(newrec.Record()) { - tab.log.Trace("ENR record filter out", "id", node.ID(), "err", errors.New("filtered node")) - // TODO: use didRespond to express failure temporarily - resp.didRespond = false - } else { - resp.newRecord = newrec - } + resp.newRecord = newrec } } @@ -181,6 +174,11 @@ func (tr *tableRevalidation) handleResponse(tab *Table, resp revalidationRespons tab.log.Debug("Node revalidated", "b", b.index, "id", n.ID(), "checks", n.livenessChecks, "q", n.revalList.name) var endpointChanged bool if resp.newRecord != nil { + if tab.enrFilter != nil && !tab.enrFilter(resp.newRecord.Record()) { + tab.log.Trace("ENR record filter out", "id", n.ID()) + tab.deleteInBucket(b, n.ID()) + return + } _, endpointChanged = tab.bumpInBucket(b, resp.newRecord, false) } diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go index a6666fdcbf..ae981f8b28 100644 --- a/p2p/discover/table_test.go +++ b/p2p/discover/table_test.go @@ -296,7 +296,7 @@ func TestTable_addInboundNode(t *testing.T) { newrec := n2.Record() newrec.Set(enr.IP{99, 99, 99, 99}) n2v2 := enode.SignNull(newrec, n2.ID()) - tab.addInboundNode(n2v2) + tab.addInboundNodeSync(n2v2) checkBucketContent(t, tab, []*enode.Node{n1, n2v2}) // Try updating n2 without sequence number change. The update is accepted @@ -305,7 +305,7 @@ func TestTable_addInboundNode(t *testing.T) { newrec.Set(enr.IP{100, 100, 100, 100}) newrec.SetSeq(n2.Seq()) n2v3 := enode.SignNull(newrec, n2.ID()) - tab.addInboundNode(n2v3) + tab.addInboundNodeSync(n2v3) checkBucketContent(t, tab, []*enode.Node{n1, n2v3}) } @@ -349,13 +349,13 @@ func TestTable_addInboundNodeUpdateV4Accept(t *testing.T) { // Add a v4 node. key, _ := crypto.HexToECDSA("dd3757a8075e88d0f2b1431e7d3c5b1562e1c0aab9643707e8cbfcc8dae5cfe3") n1 := enode.NewV4(&key.PublicKey, net.IP{88, 77, 66, 1}, 9000, 9000) - tab.addInboundNode(n1) + tab.addInboundNodeSync(n1) checkBucketContent(t, tab, []*enode.Node{n1}) // Add an updated version with changed IP. // The update will be accepted because it is inbound. n1v2 := enode.NewV4(&key.PublicKey, net.IP{99, 99, 99, 99}, 9000, 9000) - tab.addInboundNode(n1v2) + tab.addInboundNodeSync(n1v2) checkBucketContent(t, tab, []*enode.Node{n1v2}) } diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index 4453d10695..474c8badee 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -437,6 +437,9 @@ func (t *UDPv5) verifyResponseNode(c *callV5, r *enr.Record, distances []uint, s if node.UDP() <= 1024 { return nil, errLowPort } + if t.tab.enrFilter != nil && !t.tab.enrFilter(r) { + return nil, errors.New("filtered by ENR filter") + } if distances != nil { nd := enode.LogDist(c.id, node.ID()) if !slices.Contains(distances, uint(nd)) {