diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index d2e3988d..6d693cdb 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -90,6 +90,20 @@ pub(crate) enum KademliaCommand { query_id: QueryId, }, + /// Store record to DHT to the given peers. + /// + /// Similar to [`KademliaCommand::PutRecord`] but allows user to specify the peers. + PutRecordToPeers { + /// Record. + record: Record, + + /// Query ID for the query. + query_id: QueryId, + + /// Use the following peers for the put request. + peers: Vec, + }, + /// Get record from DHT. GetRecord { /// Record key. @@ -207,6 +221,21 @@ impl KademliaHandle { query_id } + /// Store record to DHT to the given peers. + pub async fn put_record_to_peers(&mut self, record: Record, peers: Vec) -> QueryId { + let query_id = self.next_query_id(); + let _ = self + .cmd_tx + .send(KademliaCommand::PutRecordToPeers { + record, + query_id, + peers, + }) + .await; + + query_id + } + /// Get record from DHT. pub async fn get_record(&mut self, key: RecordKey, quorum: Quorum) -> QueryId { let query_id = self.next_query_id(); @@ -247,6 +276,24 @@ impl KademliaHandle { .map_err(|_| ()) } + /// Try to initiate `PUT_VALUE` query to the given peers and if the channel is clogged, + /// return an error. + pub fn try_put_record_to_peers( + &mut self, + record: Record, + peers: Vec, + ) -> Result { + let query_id = self.next_query_id(); + self.cmd_tx + .try_send(KademliaCommand::PutRecordToPeers { + record, + query_id, + peers, + }) + .map(|_| query_id) + .map_err(|_| ()) + } + /// Try to initiate `GET_VALUE` query and if the channel is clogged, return an error. pub fn try_get_record(&mut self, key: RecordKey, quorum: Quorum) -> Result { let query_id = self.next_query_id(); diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index e3cee89c..ed21dfb4 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -741,15 +741,34 @@ impl Kademlia { Some(KademliaCommand::PutRecord { record, query_id }) => { tracing::debug!(target: LOG_TARGET, ?query_id, key = ?record.key, "store record to DHT"); - self.store.put(record.clone()); let key = Key::new(record.key.clone()); + self.store.put(record.clone()); + self.engine.start_put_record( query_id, record, self.routing_table.closest(key, self.replication_factor).into(), ); } + Some(KademliaCommand::PutRecordToPeers { record, query_id, peers }) => { + tracing::debug!(target: LOG_TARGET, ?query_id, key = ?record.key, "store record to DHT to specified peers"); + + // Put the record to the specified peers. + let peers = peers.into_iter().filter_map(|peer| { + match self.routing_table.entry(Key::from(peer)) { + KBucketEntry::Occupied(entry) => Some(entry.clone()), + KBucketEntry::Vacant(entry) if !entry.addresses.is_empty() => Some(entry.clone()), + _ => None, + } + }).collect(); + + self.engine.start_put_record_to_peers( + query_id, + record, + peers, + ); + } Some(KademliaCommand::GetRecord { key, quorum, query_id }) => { tracing::debug!(target: LOG_TARGET, ?key, "get record from DHT"); diff --git a/src/protocol/libp2p/kademlia/query/find_many_nodes.rs b/src/protocol/libp2p/kademlia/query/find_many_nodes.rs new file mode 100644 index 00000000..3bc04a55 --- /dev/null +++ b/src/protocol/libp2p/kademlia/query/find_many_nodes.rs @@ -0,0 +1,64 @@ +// Copyright 2023 litep2p developers +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::{ + protocol::libp2p::kademlia::{ + query::{QueryAction, QueryId}, + types::KademliaPeer, + }, + PeerId, +}; + +/// Context for multiple `FIND_NODE` queries. +// TODO: implement https://github.com/paritytech/litep2p/issues/80. +#[derive(Debug)] +pub struct FindManyNodesContext { + /// Query ID. + pub query: QueryId, + + /// The peers we are looking for. + pub peers_to_report: Vec, +} + +impl FindManyNodesContext { + /// Creates a new [`FindManyNodesContext`]. + pub fn new(query: QueryId, peers_to_report: Vec) -> Self { + Self { + query, + peers_to_report, + } + } + + /// Register response failure for `peer`. + pub fn register_response_failure(&mut self, _peer: PeerId) {} + + /// Register `FIND_NODE` response from `peer`. + pub fn register_response(&mut self, _peer: PeerId, _peers: Vec) {} + + /// Get next action for `peer`. + pub fn next_peer_action(&mut self, _peer: &PeerId) -> Option { + None + } + + /// Get next action for a `FIND_NODE` query. + pub fn next_action(&mut self) -> Option { + return Some(QueryAction::QuerySucceeded { query: self.query }); + } +} diff --git a/src/protocol/libp2p/kademlia/query/mod.rs b/src/protocol/libp2p/kademlia/query/mod.rs index 71f47036..4bba3e1b 100644 --- a/src/protocol/libp2p/kademlia/query/mod.rs +++ b/src/protocol/libp2p/kademlia/query/mod.rs @@ -33,6 +33,9 @@ use bytes::Bytes; use std::collections::{HashMap, VecDeque}; +use self::find_many_nodes::FindManyNodesContext; + +mod find_many_nodes; mod find_node; mod get_record; @@ -63,6 +66,15 @@ enum QueryType { context: FindNodeContext, }, + /// `PUT_VALUE` query to specified peers. + PutRecordToPeers { + /// Record that needs to be stored. + record: Record, + + /// Context for finding peers. + context: FindManyNodesContext, + }, + /// `GET_VALUE` query. GetRecord { /// Context for the `GET_VALUE` query. @@ -227,6 +239,32 @@ impl QueryEngine { query_id } + /// Start `PUT_VALUE` query to specified peers. + pub fn start_put_record_to_peers( + &mut self, + query_id: QueryId, + record: Record, + peers_to_report: Vec, + ) -> QueryId { + tracing::debug!( + target: LOG_TARGET, + ?query_id, + target = ?record.key, + num_peers = ?peers_to_report.len(), + "start `PUT_VALUE` query to peers" + ); + + self.queries.insert( + query_id, + QueryType::PutRecordToPeers { + record, + context: FindManyNodesContext::new(query_id, peers_to_report), + }, + ); + + query_id + } + /// Start `GET_VALUE` query. pub fn start_get_record( &mut self, @@ -280,6 +318,9 @@ impl QueryEngine { Some(QueryType::PutRecord { context, .. }) => { context.register_response_failure(peer); } + Some(QueryType::PutRecordToPeers { context, .. }) => { + context.register_response_failure(peer); + } Some(QueryType::GetRecord { context }) => { context.register_response_failure(peer); } @@ -307,6 +348,12 @@ impl QueryEngine { } _ => unreachable!(), }, + Some(QueryType::PutRecordToPeers { context, .. }) => match message { + KademliaMessage::FindNode { peers, .. } => { + context.register_response(peer, peers); + } + _ => unreachable!(), + }, Some(QueryType::GetRecord { context }) => match message { KademliaMessage::GetRecord { record, peers, .. } => { context.register_response(peer, record, peers); @@ -323,11 +370,12 @@ impl QueryEngine { match self.queries.get_mut(query) { None => { tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response failure for a stale query"); - return None; + None } - Some(QueryType::FindNode { context }) => return context.next_peer_action(peer), - Some(QueryType::PutRecord { context, .. }) => return context.next_peer_action(peer), - Some(QueryType::GetRecord { context }) => return context.next_peer_action(peer), + Some(QueryType::FindNode { context }) => context.next_peer_action(peer), + Some(QueryType::PutRecord { context, .. }) => context.next_peer_action(peer), + Some(QueryType::PutRecordToPeers { context, .. }) => context.next_peer_action(peer), + Some(QueryType::GetRecord { context }) => context.next_peer_action(peer), } } @@ -344,6 +392,10 @@ impl QueryEngine { record, peers: context.responses.into_iter().map(|(_, peer)| peer).collect::>(), }, + QueryType::PutRecordToPeers { record, context } => QueryAction::PutRecordToFoundNodes { + record, + peers: context.peers_to_report, + }, QueryType::GetRecord { context } => QueryAction::GetRecordQueryDone { query_id: context.query, record: context.found_record(), @@ -365,6 +417,7 @@ impl QueryEngine { let action = match state { QueryType::FindNode { context } => context.next_action(), QueryType::PutRecord { context, .. } => context.next_action(), + QueryType::PutRecordToPeers { context, .. } => context.next_action(), QueryType::GetRecord { context } => context.next_action(), }; diff --git a/src/protocol/libp2p/kademlia/routing_table.rs b/src/protocol/libp2p/kademlia/routing_table.rs index 28dd251e..0077c861 100644 --- a/src/protocol/libp2p/kademlia/routing_table.rs +++ b/src/protocol/libp2p/kademlia/routing_table.rs @@ -176,7 +176,7 @@ impl RoutingTable { } } - /// Get `limit` closests peers to `target` from the k-buckets. + /// Get `limit` closest peers to `target` from the k-buckets. pub fn closest(&mut self, target: Key, limit: usize) -> Vec { ClosestBucketsIter::new(self.local_key.distance(&target)) .map(|index| self.buckets[index.get()].closest_iter(&target))