From 57fcf61c593fa027729f79dd1b13ef55989ac195 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 18 Apr 2024 11:46:49 +0300 Subject: [PATCH 01/11] kad: Fix typo Signed-off-by: Alexandru Vasile --- src/protocol/libp2p/kademlia/routing_table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol/libp2p/kademlia/routing_table.rs b/src/protocol/libp2p/kademlia/routing_table.rs index 28dd251e..0077c861 100644 --- a/src/protocol/libp2p/kademlia/routing_table.rs +++ b/src/protocol/libp2p/kademlia/routing_table.rs @@ -176,7 +176,7 @@ impl RoutingTable { } } - /// Get `limit` closests peers to `target` from the k-buckets. + /// Get `limit` closest peers to `target` from the k-buckets. pub fn closest(&mut self, target: Key, limit: usize) -> Vec { ClosestBucketsIter::new(self.local_key.distance(&target)) .map(|index| self.buckets[index.get()].closest_iter(&target)) From 1f38a1b3c7952aef5f9c06c629855f023b7c42a8 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 18 Apr 2024 11:47:06 +0300 Subject: [PATCH 02/11] kad: Introduce put_record_to and try_put_record_to Signed-off-by: Alexandru Vasile --- src/protocol/libp2p/kademlia/handle.rs | 47 ++++++++++++++++++++++++-- src/protocol/libp2p/kademlia/mod.rs | 32 ++++++++++++++---- 2 files changed, 70 insertions(+), 9 deletions(-) diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index d2e3988d..ccee62b5 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -88,6 +88,9 @@ pub(crate) enum KademliaCommand { /// Query ID for the query. query_id: QueryId, + + /// Use the following peers for the put request. + peers: Option>, }, /// Get record from DHT. @@ -202,7 +205,29 @@ impl KademliaHandle { /// Store record to DHT. pub async fn put_record(&mut self, record: Record) -> QueryId { let query_id = self.next_query_id(); - let _ = self.cmd_tx.send(KademliaCommand::PutRecord { record, query_id }).await; + let _ = self + .cmd_tx + .send(KademliaCommand::PutRecord { + record, + query_id, + peers: None, + }) + .await; + + query_id + } + + /// Store record to DHT to the given peers. + pub async fn put_record_to(&mut self, record: Record, peers: Vec) -> QueryId { + let query_id = self.next_query_id(); + let _ = self + .cmd_tx + .send(KademliaCommand::PutRecord { + record, + query_id, + peers: Some(peers), + }) + .await; query_id } @@ -242,7 +267,25 @@ impl KademliaHandle { pub fn try_put_record(&mut self, record: Record) -> Result { let query_id = self.next_query_id(); self.cmd_tx - .try_send(KademliaCommand::PutRecord { record, query_id }) + .try_send(KademliaCommand::PutRecord { + record, + query_id, + peers: None, + }) + .map(|_| query_id) + .map_err(|_| ()) + } + + /// Try to initiate `PUT_VALUE` query to the given peers and if the channel is clogged, + /// return an error. + pub fn try_put_record_to(&mut self, record: Record, peers: Vec) -> Result { + let query_id = self.next_query_id(); + self.cmd_tx + .try_send(KademliaCommand::PutRecord { + record, + query_id, + peers: Some(peers), + }) .map(|_| query_id) .map_err(|_| ()) } diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index e3cee89c..b482f03a 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -45,7 +45,7 @@ use futures::StreamExt; use multiaddr::Multiaddr; use tokio::sync::mpsc::{Receiver, Sender}; -use std::collections::{hash_map::Entry, HashMap}; +use std::collections::{hash_map::Entry, HashMap, VecDeque}; pub use config::{Config, ConfigBuilder}; pub use handle::{KademliaEvent, KademliaHandle, Quorum, RoutingTableUpdateMode}; @@ -738,17 +738,35 @@ impl Kademlia { self.routing_table.closest(Key::from(peer), self.replication_factor).into() ); } - Some(KademliaCommand::PutRecord { record, query_id }) => { + Some(KademliaCommand::PutRecord { record, query_id, peers }) => { tracing::debug!(target: LOG_TARGET, ?query_id, key = ?record.key, "store record to DHT"); self.store.put(record.clone()); let key = Key::new(record.key.clone()); - self.engine.start_put_record( - query_id, - record, - self.routing_table.closest(key, self.replication_factor).into(), - ); + + if let Some(peers) = peers { + // Put the record to the specified peers. + let peers: VecDeque<_> = peers.into_iter().filter_map(|peer| { + if let KBucketEntry::Occupied(entry) = self.routing_table.entry(Key::from(peer)) { + Some(entry.clone()) + } else { + None + } + }).collect(); + + self.engine.start_put_record( + query_id, + record, + peers + ); + } else { + self.engine.start_put_record( + query_id, + record, + self.routing_table.closest(key, self.replication_factor).into(), + ); + } } Some(KademliaCommand::GetRecord { key, quorum, query_id }) => { tracing::debug!(target: LOG_TARGET, ?key, "get record from DHT"); From 96e827b54f9f937c6d0489bef6a438b48cf50e58 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 18 Apr 2024 11:50:52 +0300 Subject: [PATCH 03/11] kad: Update local memory store only if the query is put_value Signed-off-by: Alexandru Vasile --- src/protocol/libp2p/kademlia/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index b482f03a..680d601d 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -741,7 +741,6 @@ impl Kademlia { Some(KademliaCommand::PutRecord { record, query_id, peers }) => { tracing::debug!(target: LOG_TARGET, ?query_id, key = ?record.key, "store record to DHT"); - self.store.put(record.clone()); let key = Key::new(record.key.clone()); @@ -761,6 +760,8 @@ impl Kademlia { peers ); } else { + self.store.put(record.clone()); + self.engine.start_put_record( query_id, record, From 200a0fddc9aaae2ec845cb569eb521f09e536da1 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 18 Apr 2024 17:37:50 +0300 Subject: [PATCH 04/11] kad: Include peers that are also not connected to the query Signed-off-by: Alexandru Vasile --- src/protocol/libp2p/kademlia/mod.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 680d601d..600eb925 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -743,14 +743,15 @@ impl Kademlia { let key = Key::new(record.key.clone()); - if let Some(peers) = peers { // Put the record to the specified peers. let peers: VecDeque<_> = peers.into_iter().filter_map(|peer| { - if let KBucketEntry::Occupied(entry) = self.routing_table.entry(Key::from(peer)) { - Some(entry.clone()) - } else { - None + match self.routing_table.entry(Key::from(peer)) { + // The routing table contains information about the peer address when: + // - Occupied: Established connection + // - Vacant: We'll try to establish the connection later, but the address is known. + KBucketEntry::Occupied(entry) | KBucketEntry::Vacant(entry) => Some(entry.clone()), + _ => None, } }).collect(); From db5d9ef823545be23ad83d5d69cc30905db9a3cb Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 18 Apr 2024 18:18:00 +0300 Subject: [PATCH 05/11] kad: Call peers directly Signed-off-by: Alexandru Vasile --- src/protocol/libp2p/kademlia/mod.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 600eb925..56e0c65e 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -745,7 +745,7 @@ impl Kademlia { if let Some(peers) = peers { // Put the record to the specified peers. - let peers: VecDeque<_> = peers.into_iter().filter_map(|peer| { + let peers = peers.into_iter().filter_map(|peer| { match self.routing_table.entry(Key::from(peer)) { // The routing table contains information about the peer address when: // - Occupied: Established connection @@ -755,11 +755,9 @@ impl Kademlia { } }).collect(); - self.engine.start_put_record( - query_id, - record, - peers - ); + if let Err(error) = self.on_query_action(QueryAction::PutRecordToFoundNodes { record, peers }).await { + tracing::debug!(target: LOG_TARGET, ?error, "failed to put record to predefined peers"); + } } else { self.store.put(record.clone()); From 99e88d81459eb49589f0539dc29b28caaf79518b Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 19 Apr 2024 13:54:59 +0300 Subject: [PATCH 06/11] kad: Keep only occupied entries, as vacant might have no addr Signed-off-by: Alexandru Vasile --- src/protocol/libp2p/kademlia/mod.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 56e0c65e..c1bf8cb3 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -747,10 +747,7 @@ impl Kademlia { // Put the record to the specified peers. let peers = peers.into_iter().filter_map(|peer| { match self.routing_table.entry(Key::from(peer)) { - // The routing table contains information about the peer address when: - // - Occupied: Established connection - // - Vacant: We'll try to establish the connection later, but the address is known. - KBucketEntry::Occupied(entry) | KBucketEntry::Vacant(entry) => Some(entry.clone()), + KBucketEntry::Occupied(entry) => Some(entry.clone()), _ => None, } }).collect(); From ad3ec0634dc42af3c573bd1e1e63cfeae304a6cd Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 19 Apr 2024 14:56:50 +0300 Subject: [PATCH 07/11] kad: Add put records to peers command Signed-off-by: Alexandru Vasile --- src/protocol/libp2p/kademlia/handle.rs | 44 ++++++------ src/protocol/libp2p/kademlia/mod.rs | 43 ++++++------ .../libp2p/kademlia/query/find_many_nodes.rs | 70 +++++++++++++++++++ src/protocol/libp2p/kademlia/query/mod.rs | 10 +++ 4 files changed, 125 insertions(+), 42 deletions(-) create mode 100644 src/protocol/libp2p/kademlia/query/find_many_nodes.rs diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index ccee62b5..6d693cdb 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -88,9 +88,20 @@ pub(crate) enum KademliaCommand { /// Query ID for the query. query_id: QueryId, + }, + + /// Store record to DHT to the given peers. + /// + /// Similar to [`KademliaCommand::PutRecord`] but allows user to specify the peers. + PutRecordToPeers { + /// Record. + record: Record, + + /// Query ID for the query. + query_id: QueryId, /// Use the following peers for the put request. - peers: Option>, + peers: Vec, }, /// Get record from DHT. @@ -205,27 +216,20 @@ impl KademliaHandle { /// Store record to DHT. pub async fn put_record(&mut self, record: Record) -> QueryId { let query_id = self.next_query_id(); - let _ = self - .cmd_tx - .send(KademliaCommand::PutRecord { - record, - query_id, - peers: None, - }) - .await; + let _ = self.cmd_tx.send(KademliaCommand::PutRecord { record, query_id }).await; query_id } /// Store record to DHT to the given peers. - pub async fn put_record_to(&mut self, record: Record, peers: Vec) -> QueryId { + pub async fn put_record_to_peers(&mut self, record: Record, peers: Vec) -> QueryId { let query_id = self.next_query_id(); let _ = self .cmd_tx - .send(KademliaCommand::PutRecord { + .send(KademliaCommand::PutRecordToPeers { record, query_id, - peers: Some(peers), + peers, }) .await; @@ -267,24 +271,24 @@ impl KademliaHandle { pub fn try_put_record(&mut self, record: Record) -> Result { let query_id = self.next_query_id(); self.cmd_tx - .try_send(KademliaCommand::PutRecord { - record, - query_id, - peers: None, - }) + .try_send(KademliaCommand::PutRecord { record, query_id }) .map(|_| query_id) .map_err(|_| ()) } /// Try to initiate `PUT_VALUE` query to the given peers and if the channel is clogged, /// return an error. - pub fn try_put_record_to(&mut self, record: Record, peers: Vec) -> Result { + pub fn try_put_record_to_peers( + &mut self, + record: Record, + peers: Vec, + ) -> Result { let query_id = self.next_query_id(); self.cmd_tx - .try_send(KademliaCommand::PutRecord { + .try_send(KademliaCommand::PutRecordToPeers { record, query_id, - peers: Some(peers), + peers, }) .map(|_| query_id) .map_err(|_| ()) diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index c1bf8cb3..eb96e159 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -45,7 +45,7 @@ use futures::StreamExt; use multiaddr::Multiaddr; use tokio::sync::mpsc::{Receiver, Sender}; -use std::collections::{hash_map::Entry, HashMap, VecDeque}; +use std::collections::{hash_map::Entry, HashMap}; pub use config::{Config, ConfigBuilder}; pub use handle::{KademliaEvent, KademliaHandle, Quorum, RoutingTableUpdateMode}; @@ -738,32 +738,31 @@ impl Kademlia { self.routing_table.closest(Key::from(peer), self.replication_factor).into() ); } - Some(KademliaCommand::PutRecord { record, query_id, peers }) => { + Some(KademliaCommand::PutRecord { record, query_id }) => { tracing::debug!(target: LOG_TARGET, ?query_id, key = ?record.key, "store record to DHT"); let key = Key::new(record.key.clone()); - if let Some(peers) = peers { - // Put the record to the specified peers. - let peers = peers.into_iter().filter_map(|peer| { - match self.routing_table.entry(Key::from(peer)) { - KBucketEntry::Occupied(entry) => Some(entry.clone()), - _ => None, - } - }).collect(); - - if let Err(error) = self.on_query_action(QueryAction::PutRecordToFoundNodes { record, peers }).await { - tracing::debug!(target: LOG_TARGET, ?error, "failed to put record to predefined peers"); - } - } else { - self.store.put(record.clone()); + self.store.put(record.clone()); - self.engine.start_put_record( - query_id, - record, - self.routing_table.closest(key, self.replication_factor).into(), - ); - } + self.engine.start_put_record( + query_id, + record, + self.routing_table.closest(key, self.replication_factor).into(), + ); + } + Some(KademliaCommand::PutRecordToPeers { record, query_id, peers }) => { + tracing::debug!(target: LOG_TARGET, ?query_id, key = ?record.key, "store record to DHT to specified peers"); + + let key = Key::new(record.key.clone()); + + self.store.put(record.clone()); + + self.engine.start_put_record( + query_id, + record, + self.routing_table.closest(key, self.replication_factor).into(), + ); } Some(KademliaCommand::GetRecord { key, quorum, query_id }) => { tracing::debug!(target: LOG_TARGET, ?key, "get record from DHT"); diff --git a/src/protocol/libp2p/kademlia/query/find_many_nodes.rs b/src/protocol/libp2p/kademlia/query/find_many_nodes.rs new file mode 100644 index 00000000..cc0f3de8 --- /dev/null +++ b/src/protocol/libp2p/kademlia/query/find_many_nodes.rs @@ -0,0 +1,70 @@ +// Copyright 2023 litep2p developers +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::{ + protocol::libp2p::kademlia::{ + query::{QueryAction, QueryId}, + types::KademliaPeer, + }, + PeerId, +}; + +/// Logging target for the file. +const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query::find_many_nodes"; + +/// Context for multiple `FIND_NODE` queries. +#[derive(Debug)] +pub struct FindManyNodesContext { + /// Local peer ID. + local_peer_id: PeerId, + + /// Query ID. + pub query: QueryId, + + /// The peers we are looking for. + pub peers_to_report: Vec, +} + +impl FindManyNodesContext { + /// Creates a new [`FindManyNodesContext`]. + pub fn new(local_peer_id: PeerId, query: QueryId, peers_to_report: Vec) -> Self { + Self { + local_peer_id, + query, + peers_to_report, + } + } + + /// Register response failure for `peer`. + pub fn register_response_failure(&mut self, _peer: PeerId) {} + + /// Register `FIND_NODE` response from `peer`. + pub fn register_response(&mut self, _peer: PeerId, _peers: Vec) {} + + /// Get next action for `peer`. + pub fn next_peer_action(&mut self, _peer: &PeerId) -> Option { + None + } + + /// Get next action for a `FIND_NODE` query. + pub fn next_action(&mut self) -> Option { + return Some(QueryAction::QuerySucceeded { query: self.query }); + } +} diff --git a/src/protocol/libp2p/kademlia/query/mod.rs b/src/protocol/libp2p/kademlia/query/mod.rs index 71f47036..d3fbc052 100644 --- a/src/protocol/libp2p/kademlia/query/mod.rs +++ b/src/protocol/libp2p/kademlia/query/mod.rs @@ -35,6 +35,7 @@ use std::collections::{HashMap, VecDeque}; mod find_node; mod get_record; +mod find_many_nodes; /// Logging target for the file. const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query"; @@ -63,6 +64,15 @@ enum QueryType { context: FindNodeContext, }, + /// `PUT_VALUE` query to specified peers. + PutRecordToPeers { + /// Record that needs to be stored. + record: Record, + + /// Context for the `FIND_NODE` query + context: FindNodeContext, + }, + /// `GET_VALUE` query. GetRecord { /// Context for the `GET_VALUE` query. From c0b4c8dccdf5b35c366ee92236481d89a41e3400 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 19 Apr 2024 15:24:47 +0300 Subject: [PATCH 08/11] kad: FindManyNodes stub Signed-off-by: Alexandru Vasile --- src/protocol/libp2p/kademlia/mod.rs | 15 +++-- .../libp2p/kademlia/query/find_many_nodes.rs | 11 +--- src/protocol/libp2p/kademlia/query/mod.rs | 57 ++++++++++++++++--- 3 files changed, 62 insertions(+), 21 deletions(-) diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index eb96e159..f6f34815 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -754,14 +754,19 @@ impl Kademlia { Some(KademliaCommand::PutRecordToPeers { record, query_id, peers }) => { tracing::debug!(target: LOG_TARGET, ?query_id, key = ?record.key, "store record to DHT to specified peers"); - let key = Key::new(record.key.clone()); - - self.store.put(record.clone()); + // Put the record to the specified peers. + let peers = peers.into_iter().filter_map(|peer| { + if let KBucketEntry::Occupied(entry) = self.routing_table.entry(Key::from(peer)) { + Some(entry.clone()) + } else { + None + } + }).collect(); - self.engine.start_put_record( + self.engine.start_put_record_to_peers( query_id, record, - self.routing_table.closest(key, self.replication_factor).into(), + peers, ); } Some(KademliaCommand::GetRecord { key, quorum, query_id }) => { diff --git a/src/protocol/libp2p/kademlia/query/find_many_nodes.rs b/src/protocol/libp2p/kademlia/query/find_many_nodes.rs index cc0f3de8..5ac43baf 100644 --- a/src/protocol/libp2p/kademlia/query/find_many_nodes.rs +++ b/src/protocol/libp2p/kademlia/query/find_many_nodes.rs @@ -26,27 +26,20 @@ use crate::{ PeerId, }; -/// Logging target for the file. -const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query::find_many_nodes"; - /// Context for multiple `FIND_NODE` queries. #[derive(Debug)] pub struct FindManyNodesContext { - /// Local peer ID. - local_peer_id: PeerId, - /// Query ID. pub query: QueryId, /// The peers we are looking for. - pub peers_to_report: Vec, + pub peers_to_report: Vec, } impl FindManyNodesContext { /// Creates a new [`FindManyNodesContext`]. - pub fn new(local_peer_id: PeerId, query: QueryId, peers_to_report: Vec) -> Self { + pub fn new(query: QueryId, peers_to_report: Vec) -> Self { Self { - local_peer_id, query, peers_to_report, } diff --git a/src/protocol/libp2p/kademlia/query/mod.rs b/src/protocol/libp2p/kademlia/query/mod.rs index d3fbc052..4bba3e1b 100644 --- a/src/protocol/libp2p/kademlia/query/mod.rs +++ b/src/protocol/libp2p/kademlia/query/mod.rs @@ -33,9 +33,11 @@ use bytes::Bytes; use std::collections::{HashMap, VecDeque}; +use self::find_many_nodes::FindManyNodesContext; + +mod find_many_nodes; mod find_node; mod get_record; -mod find_many_nodes; /// Logging target for the file. const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query"; @@ -69,8 +71,8 @@ enum QueryType { /// Record that needs to be stored. record: Record, - /// Context for the `FIND_NODE` query - context: FindNodeContext, + /// Context for finding peers. + context: FindManyNodesContext, }, /// `GET_VALUE` query. @@ -237,6 +239,32 @@ impl QueryEngine { query_id } + /// Start `PUT_VALUE` query to specified peers. + pub fn start_put_record_to_peers( + &mut self, + query_id: QueryId, + record: Record, + peers_to_report: Vec, + ) -> QueryId { + tracing::debug!( + target: LOG_TARGET, + ?query_id, + target = ?record.key, + num_peers = ?peers_to_report.len(), + "start `PUT_VALUE` query to peers" + ); + + self.queries.insert( + query_id, + QueryType::PutRecordToPeers { + record, + context: FindManyNodesContext::new(query_id, peers_to_report), + }, + ); + + query_id + } + /// Start `GET_VALUE` query. pub fn start_get_record( &mut self, @@ -290,6 +318,9 @@ impl QueryEngine { Some(QueryType::PutRecord { context, .. }) => { context.register_response_failure(peer); } + Some(QueryType::PutRecordToPeers { context, .. }) => { + context.register_response_failure(peer); + } Some(QueryType::GetRecord { context }) => { context.register_response_failure(peer); } @@ -317,6 +348,12 @@ impl QueryEngine { } _ => unreachable!(), }, + Some(QueryType::PutRecordToPeers { context, .. }) => match message { + KademliaMessage::FindNode { peers, .. } => { + context.register_response(peer, peers); + } + _ => unreachable!(), + }, Some(QueryType::GetRecord { context }) => match message { KademliaMessage::GetRecord { record, peers, .. } => { context.register_response(peer, record, peers); @@ -333,11 +370,12 @@ impl QueryEngine { match self.queries.get_mut(query) { None => { tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response failure for a stale query"); - return None; + None } - Some(QueryType::FindNode { context }) => return context.next_peer_action(peer), - Some(QueryType::PutRecord { context, .. }) => return context.next_peer_action(peer), - Some(QueryType::GetRecord { context }) => return context.next_peer_action(peer), + Some(QueryType::FindNode { context }) => context.next_peer_action(peer), + Some(QueryType::PutRecord { context, .. }) => context.next_peer_action(peer), + Some(QueryType::PutRecordToPeers { context, .. }) => context.next_peer_action(peer), + Some(QueryType::GetRecord { context }) => context.next_peer_action(peer), } } @@ -354,6 +392,10 @@ impl QueryEngine { record, peers: context.responses.into_iter().map(|(_, peer)| peer).collect::>(), }, + QueryType::PutRecordToPeers { record, context } => QueryAction::PutRecordToFoundNodes { + record, + peers: context.peers_to_report, + }, QueryType::GetRecord { context } => QueryAction::GetRecordQueryDone { query_id: context.query, record: context.found_record(), @@ -375,6 +417,7 @@ impl QueryEngine { let action = match state { QueryType::FindNode { context } => context.next_action(), QueryType::PutRecord { context, .. } => context.next_action(), + QueryType::PutRecordToPeers { context, .. } => context.next_action(), QueryType::GetRecord { context } => context.next_action(), }; From 93674dfc07083e58481420db26731ceaaeb2718f Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 19 Apr 2024 16:45:12 +0300 Subject: [PATCH 09/11] kad: Link followup issue Signed-off-by: Alexandru Vasile --- src/protocol/libp2p/kademlia/query/find_many_nodes.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/protocol/libp2p/kademlia/query/find_many_nodes.rs b/src/protocol/libp2p/kademlia/query/find_many_nodes.rs index 5ac43baf..642bdab5 100644 --- a/src/protocol/libp2p/kademlia/query/find_many_nodes.rs +++ b/src/protocol/libp2p/kademlia/query/find_many_nodes.rs @@ -27,6 +27,8 @@ use crate::{ }; /// Context for multiple `FIND_NODE` queries. +/// +/// TODO: implement https://github.com/paritytech/litep2p/issues/80. #[derive(Debug)] pub struct FindManyNodesContext { /// Query ID. From f7a0bf4e177f43c9c8513cb3b397beca8662ca8b Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 19 Apr 2024 16:46:58 +0300 Subject: [PATCH 10/11] kad: Use vacant records with addresses Signed-off-by: Alexandru Vasile --- src/protocol/libp2p/kademlia/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index f6f34815..ed21dfb4 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -756,10 +756,10 @@ impl Kademlia { // Put the record to the specified peers. let peers = peers.into_iter().filter_map(|peer| { - if let KBucketEntry::Occupied(entry) = self.routing_table.entry(Key::from(peer)) { - Some(entry.clone()) - } else { - None + match self.routing_table.entry(Key::from(peer)) { + KBucketEntry::Occupied(entry) => Some(entry.clone()), + KBucketEntry::Vacant(entry) if !entry.addresses.is_empty() => Some(entry.clone()), + _ => None, } }).collect(); From 61d3995799c9ca08602cf22e285c72ea47d774d6 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 19 Apr 2024 16:49:48 +0300 Subject: [PATCH 11/11] kad: Fix docs Signed-off-by: Alexandru Vasile --- src/protocol/libp2p/kademlia/query/find_many_nodes.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/protocol/libp2p/kademlia/query/find_many_nodes.rs b/src/protocol/libp2p/kademlia/query/find_many_nodes.rs index 642bdab5..3bc04a55 100644 --- a/src/protocol/libp2p/kademlia/query/find_many_nodes.rs +++ b/src/protocol/libp2p/kademlia/query/find_many_nodes.rs @@ -27,8 +27,7 @@ use crate::{ }; /// Context for multiple `FIND_NODE` queries. -/// -/// TODO: implement https://github.com/paritytech/litep2p/issues/80. +// TODO: implement https://github.com/paritytech/litep2p/issues/80. #[derive(Debug)] pub struct FindManyNodesContext { /// Query ID.