Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(P2P): add test for peer time sync validation #2304

Merged
merged 11 commits into from
Jan 7, 2025
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mm2src/mm2_p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,4 @@ timed-map = { version = "1.1.1", features = ["rustc-hash"] }
[dev-dependencies]
async-std = "1.6.2"
env_logger = "0.9.3"
chrono = "0.4"
45 changes: 42 additions & 3 deletions mm2src/mm2_p2p/src/behaviours/atomicdex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,23 @@ pub async fn get_relay_mesh(mut cmd_tx: AdexCmdTx) -> Vec<String> {
rx.await.expect("Tx should be present")
}

async fn validate_peer_time(peer: PeerId, mut response_tx: Sender<PeerId>, rp_sender: RequestResponseSender) {
async fn validate_peer_time(
peer: PeerId,
mut response_tx: Sender<PeerId>,
rp_sender: RequestResponseSender,
#[cfg(test)] local_peer_id: PeerId,
) {
let request = P2PRequest::NetworkInfo(NetworkInfoRequest::GetPeerUtcTimestamp);
let encoded_request = encode_message(&request)
.expect("Static type `PeerInfoRequest::GetPeerUtcTimestamp` should never fail in serialization.");

match request_one_peer(peer, encoded_request, rp_sender).await {
PeerResponse::Ok { res } => {
if let Ok(timestamp) = decode_message::<u64>(&res) {
#[cfg(test)]
let now = mock_timestamp::get_utc_timestamp_for_test(local_peer_id);

#[cfg(not(test))]
onur-ozkan marked this conversation as resolved.
Show resolved Hide resolved
let now = common::get_utc_timestamp();
let now: u64 = now
.try_into()
Expand Down Expand Up @@ -450,12 +459,12 @@ impl AtomicDexBehaviour {
self.spawn(future);
},
AdexBehaviourCmd::SendResponse { res, response_channel } => {
if let Err(response) = self
if let Err(PeerResponse::Err { err }) = self
onur-ozkan marked this conversation as resolved.
Show resolved Hide resolved
.core
.request_response
.send_response(response_channel.into(), res.into())
{
error!("Error sending response: {:?}", response);
error!("Error sending response: {:?}", err);
}
},
AdexBehaviourCmd::GetPeersInfo { result_tx } => {
Expand Down Expand Up @@ -833,6 +842,8 @@ fn start_gossipsub(
*peer_id,
timestamp_tx.clone(),
swarm.behaviour().core.request_response.sender(),
#[cfg(test)]
local_peer_id,
);
swarm.behaviour().spawn(future);
}
Expand Down Expand Up @@ -1249,3 +1260,31 @@ pub async fn spawn_gossipsub(
runtime_c.spawn(fut);
result_rx.await.expect("Fatal error on starting gossipsub")
}

#[cfg(test)]
pub mod mock_timestamp {
use crate::PeerId;
use chrono::Utc;
use lazy_static::lazy_static;
use std::collections::HashMap;
use std::sync::Mutex;

lazy_static! {
static ref TIMESTAMP_OFFSET: Mutex<HashMap<PeerId, i64>> = Mutex::new(HashMap::new());
}

pub(crate) fn get_utc_timestamp_for_test(local_peer_id: PeerId) -> i64 {
Utc::now().timestamp()
+ TIMESTAMP_OFFSET
.lock()
.unwrap()
.iter()
.find(|(peer, _)| peer == &&local_peer_id)
.map(|(_, offset)| *offset)
.unwrap_or_default()
}

pub(crate) fn set_timestamp_offset_for_test(local_peer_id: PeerId, offset: i64) {
TIMESTAMP_OFFSET.lock().unwrap().insert(local_peer_id, offset);
}
}
97 changes: 76 additions & 21 deletions mm2src/mm2_p2p/src/behaviours/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub(crate) mod request_response;

#[cfg(test)]
mod tests {
use crate::{decode_message, encode_message};
use async_std::task::spawn;
use common::executor::abortable_queue::AbortableQueue;
use futures::channel::{mpsc, oneshot};
Expand All @@ -19,12 +20,13 @@ mod tests {
#[cfg(not(windows))] use std::sync::Mutex;
use std::time::Duration;

use super::atomicdex::GossipsubConfig;
use crate::application::request_response::{network_info::NetworkInfoRequest, P2PRequest};
use crate::behaviours::atomicdex::mock_timestamp;
use crate::behaviours::peers_exchange::{PeerIdSerde, PeersExchange};
use crate::{spawn_gossipsub, AdexBehaviourCmd, AdexBehaviourEvent, AdexResponse, AdexResponseChannel, NetworkInfo,
NetworkPorts, NodeType, RelayAddress, RequestResponseBehaviourEvent, SwarmRuntime};

use super::atomicdex::GossipsubConfig;

static TEST_LISTEN_PORT: AtomicU64 = AtomicU64::new(1);

lazy_static! {
Expand All @@ -41,7 +43,7 @@ mod tests {
impl Node {
async fn spawn<F>(port: u64, seednodes: Vec<u64>, on_event: F) -> Node
where
F: Fn(mpsc::Sender<AdexBehaviourCmd>, AdexBehaviourEvent) + Send + 'static,
F: Fn(mpsc::Sender<AdexBehaviourCmd>, AdexBehaviourEvent, PeerId) + Send + 'static,
{
let spawner = SwarmRuntime::new(SYSTEM.weak_spawner());
let node_type = NodeType::RelayInMemory { port };
Expand All @@ -58,7 +60,7 @@ mod tests {
loop {
let cmd_tx_fut = cmd_tx_fut.clone();
match event_rx.next().await {
Some(r) => on_event(cmd_tx_fut, r),
Some(r) => on_event(cmd_tx_fut, r, peer_id),
_ => {
println!("Finish response future");
break;
Expand All @@ -72,7 +74,7 @@ mod tests {

async fn send_cmd(&mut self, cmd: AdexBehaviourCmd) { self.cmd_tx.send(cmd).await.unwrap(); }

async fn wait_peers(&mut self, number: usize) {
async fn wait_peers(&mut self, number: usize) -> Result<(), String> {
let mut attempts = 0;
loop {
let (tx, rx) = oneshot::channel();
Expand All @@ -83,15 +85,15 @@ mod tests {
match rx.await {
Ok(map) => {
if map.len() >= number {
return;
return Ok(());
}
async_std::task::sleep(Duration::from_millis(500)).await;
},
Err(e) => panic!("{}", e),
}
attempts += 1;
if attempts >= 10 {
panic!("wait_peers {} attempts exceeded", attempts);
return Err(format!("wait_peers {} attempts exceeded", attempts));
}
}
}
Expand All @@ -105,7 +107,7 @@ mod tests {
let request_received_cpy = request_received.clone();

let node1_port = next_port();
let node1 = Node::spawn(node1_port, vec![], move |mut cmd_tx, event| {
let node1 = Node::spawn(node1_port, vec![], move |mut cmd_tx, event, _| {
let response_channel = match event {
AdexBehaviourEvent::RequestResponse(RequestResponseBehaviourEvent::InboundRequest {
request,
Expand All @@ -126,9 +128,9 @@ mod tests {
})
.await;

let mut node2 = Node::spawn(next_port(), vec![node1_port], |_, _| ()).await;
let mut node2 = Node::spawn(next_port(), vec![node1_port], |_, _, _| ()).await;

node2.wait_peers(1).await;
let _ = node2.wait_peers(1).await;

let (response_tx, response_rx) = oneshot::channel();
node2
Expand Down Expand Up @@ -208,7 +210,7 @@ mod tests {
for _ in 0..3 {
let handler = request_handler.clone();
let receiver_port = next_port();
let receiver = Node::spawn(receiver_port, vec![], move |cmd_tx, event| {
let receiver = Node::spawn(receiver_port, vec![], move |cmd_tx, event, _| {
let mut handler = handler.lock().unwrap();
handler.handle(cmd_tx, event)
})
Expand All @@ -219,11 +221,11 @@ mod tests {
let mut sender = Node::spawn(
next_port(),
receivers.iter().map(|(port, _)| *port).collect(),
|_, _| (),
|_, _, _| (),
)
.await;

sender.wait_peers(3).await;
let _ = sender.wait_peers(3).await;

let (response_tx, response_rx) = oneshot::channel();
sender
Expand All @@ -245,7 +247,7 @@ mod tests {
let request_received_cpy = request_received.clone();

let node1_port = next_port();
let _node1 = Node::spawn(node1_port, vec![], move |mut cmd_tx, event| {
let _node1 = Node::spawn(node1_port, vec![], move |mut cmd_tx, event, _| {
let response_channel = match event {
AdexBehaviourEvent::RequestResponse(RequestResponseBehaviourEvent::InboundRequest {
request,
Expand All @@ -264,9 +266,9 @@ mod tests {
})
.await;

let mut node2 = Node::spawn(next_port(), vec![node1_port], |_, _| ()).await;
let mut node2 = Node::spawn(next_port(), vec![node1_port], |_, _, _| ()).await;

node2.wait_peers(1).await;
let _ = node2.wait_peers(1).await;

let (response_tx, response_rx) = oneshot::channel();
node2
Expand All @@ -288,7 +290,7 @@ mod tests {
let _ = env_logger::try_init();

let receiver1_port = next_port();
let receiver1 = Node::spawn(receiver1_port, vec![], move |mut cmd_tx, event| {
let receiver1 = Node::spawn(receiver1_port, vec![], move |mut cmd_tx, event, _| {
let response_channel = match event {
AdexBehaviourEvent::RequestResponse(RequestResponseBehaviourEvent::InboundRequest {
request,
Expand All @@ -306,7 +308,7 @@ mod tests {
.await;

let receiver2_port = next_port();
let receiver2 = Node::spawn(receiver2_port, vec![], move |mut cmd_tx, event| {
let receiver2 = Node::spawn(receiver2_port, vec![], move |mut cmd_tx, event, _| {
let response_channel = match event {
AdexBehaviourEvent::RequestResponse(RequestResponseBehaviourEvent::InboundRequest {
request,
Expand All @@ -326,7 +328,7 @@ mod tests {
.await;

let receiver3_port = next_port();
let receiver3 = Node::spawn(receiver3_port, vec![], move |mut cmd_tx, event| {
let receiver3 = Node::spawn(receiver3_port, vec![], move |mut cmd_tx, event, _| {
let response_channel = match event {
AdexBehaviourEvent::RequestResponse(RequestResponseBehaviourEvent::InboundRequest {
request,
Expand All @@ -347,11 +349,11 @@ mod tests {
let mut sender = Node::spawn(
next_port(),
vec![receiver1_port, receiver2_port, receiver3_port],
|_, _| (),
|_, _, _| (),
)
.await;

sender.wait_peers(3).await;
let _ = sender.wait_peers(3).await;

let (response_tx, response_rx) = oneshot::channel();
sender
Expand Down Expand Up @@ -452,4 +454,57 @@ mod tests {
assert_eq!(addresses.len(), 1);
assert!(addresses.contains(&address));
}

#[tokio::test]
async fn test_peer_timestamp_difference() {
let _ = env_logger::try_init();

let event_fn = |mut cmd_tx: mpsc::Sender<AdexBehaviourCmd>, event, local_peer_id| {
if let AdexBehaviourEvent::RequestResponse(RequestResponseBehaviourEvent::InboundRequest {
request,
response_channel,
..
}) = event
{
let response_channel = AdexResponseChannel(response_channel);
let request = decode_message::<P2PRequest>(&request.req).unwrap();
let result = match request {
P2PRequest::NetworkInfo(NetworkInfoRequest::GetPeerUtcTimestamp) => {
let timestamp = mock_timestamp::get_utc_timestamp_for_test(local_peer_id);
let timestamp: u64 = timestamp.try_into().unwrap_or_else(|_| {
panic!("`get_utc_timestamp_for_test` returned invalid data: {}", timestamp)
});
encode_message(&timestamp).map_err(|e| e.to_string()).map(Some)
},
_ => return,
};

let res = match result {
Ok(Some(response)) => AdexResponse::Ok { response },
Ok(None) => AdexResponse::None,
Err(e) => AdexResponse::Err { error: e },
};

let cmd = AdexBehaviourCmd::SendResponse { res, response_channel };
cmd_tx.try_send(cmd).unwrap()
}
};

// check with time difference of 21s
let node1_port = next_port();
let mut node1 = Node::spawn(node1_port, vec![], event_fn).await;
let mut node2 = Node::spawn(next_port(), vec![node1_port], event_fn).await;

mock_timestamp::set_timestamp_offset_for_test(node2.peer_id, 21);
assert!(node1.wait_peers(1).await.is_err());
assert!(node2.wait_peers(1).await.is_err());

// check with no time difference
let node3_port = next_port();
let mut node3 = Node::spawn(node3_port, vec![], event_fn).await;
let mut node4 = Node::spawn(next_port(), vec![node3_port], event_fn).await;

assert!(node3.wait_peers(1).await.is_ok());
assert!(node4.wait_peers(1).await.is_ok());
}
}
Loading