Skip to content
This repository was archived by the owner on Aug 23, 2022. It is now read-only.

Commit

Permalink
add bridge test
Browse files Browse the repository at this point in the history
  • Loading branch information
rainliu committed May 28, 2021
1 parent 6172a7a commit 7c8364a
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 3 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "webrtc-util"
version = "0.1.19"
version = "0.1.20"
authors = ["Rain Liu <[email protected]>"]
edition = "2018"
description = "Utilities for WebRTC.rs stack"
Expand Down
4 changes: 2 additions & 2 deletions src/conn/conn_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl Bridge {

/// Drop drops the specified number of packets from the given offset index
/// of the specified queue.
pub async fn drop(&self, id: usize, offset: usize, n: usize) {
pub async fn drop_offset(&self, id: usize, offset: usize, n: usize) {
let mut queue = self.queue[id].lock().await;
queue.drain(offset..offset + n);
}
Expand Down Expand Up @@ -205,7 +205,7 @@ impl Bridge {
}
}

fn inverse(s: &mut VecDeque<Bytes>) -> bool {
pub(crate) fn inverse(s: &mut VecDeque<Bytes>) -> bool {
if s.len() < 2 {
return false;
}
Expand Down
254 changes: 254 additions & 0 deletions src/conn/conn_bridge_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
use super::conn_bridge::*;
use super::*;

use bytes::Bytes;
use std::collections::VecDeque;
use std::sync::Arc;
use tokio::sync::mpsc;

const MSG1: Bytes = Bytes::from_static(b"ADC");
const MSG2: Bytes = Bytes::from_static(b"DEFG");

#[tokio::test]
async fn test_bridge_normal() -> Result<()> {
let (br, conn0, conn1) = Bridge::new(0, None, None);

let n = conn0.send(&MSG1).await?;
assert_eq!(n, MSG1.len(), "unexpected length");

let (tx, mut rx) = mpsc::channel(1);

tokio::spawn(async move {
let mut buf = vec![0u8; 256];
let n = conn1.recv(&mut buf).await?;
let _ = tx.send(n).await;
Ok::<(), std::io::Error>(())
});

br.process().await;

let n = rx.recv().await.unwrap();
assert_eq!(n, MSG1.len(), "unexpected length");

Ok(())
}

#[tokio::test]
async fn test_bridge_drop_1st_packet_from_conn0() -> Result<()> {
let (br, conn0, conn1) = Bridge::new(0, None, None);

let n = conn0.send(&MSG1).await?;
assert_eq!(n, MSG1.len(), "unexpected length");
let n = conn0.send(&MSG2).await?;
assert_eq!(n, MSG2.len(), "unexpected length");

let (tx, mut rx) = mpsc::channel(1);

tokio::spawn(async move {
let mut buf = vec![0u8; 256];
let n = conn1.recv(&mut buf).await?;
let _ = tx.send(n).await;
Ok::<(), std::io::Error>(())
});

br.drop_offset(0, 0, 1).await;
br.process().await;

let n = rx.recv().await.unwrap();
assert_eq!(n, MSG2.len(), "unexpected length");

Ok(())
}

#[tokio::test]
async fn test_bridge_drop_2nd_packet_from_conn0() -> Result<()> {
let (br, conn0, conn1) = Bridge::new(0, None, None);

let n = conn0.send(&MSG1).await?;
assert_eq!(n, MSG1.len(), "unexpected length");
let n = conn0.send(&MSG2).await?;
assert_eq!(n, MSG2.len(), "unexpected length");

let (tx, mut rx) = mpsc::channel(1);

tokio::spawn(async move {
let mut buf = vec![0u8; 256];
let n = conn1.recv(&mut buf).await?;
let _ = tx.send(n).await;
Ok::<(), std::io::Error>(())
});

br.drop_offset(0, 1, 1).await;
br.process().await;

let n = rx.recv().await.unwrap();
assert_eq!(n, MSG1.len(), "unexpected length");

Ok(())
}

#[tokio::test]
async fn test_bridge_drop_1st_packet_from_conn1() -> Result<()> {
let (br, conn0, conn1) = Bridge::new(0, None, None);

let n = conn1.send(&MSG1).await?;
assert_eq!(n, MSG1.len(), "unexpected length");
let n = conn1.send(&MSG2).await?;
assert_eq!(n, MSG2.len(), "unexpected length");

let (tx, mut rx) = mpsc::channel(1);

tokio::spawn(async move {
let mut buf = vec![0u8; 256];
let n = conn0.recv(&mut buf).await?;
let _ = tx.send(n).await;
Ok::<(), std::io::Error>(())
});

br.drop_offset(1, 0, 1).await;
br.process().await;

let n = rx.recv().await.unwrap();
assert_eq!(n, MSG2.len(), "unexpected length");

Ok(())
}

#[tokio::test]
async fn test_bridge_drop_2nd_packet_from_conn1() -> Result<()> {
let (br, conn0, conn1) = Bridge::new(0, None, None);

let n = conn1.send(&MSG1).await?;
assert_eq!(n, MSG1.len(), "unexpected length");
let n = conn1.send(&MSG2).await?;
assert_eq!(n, MSG2.len(), "unexpected length");

let (tx, mut rx) = mpsc::channel(1);

tokio::spawn(async move {
let mut buf = vec![0u8; 256];
let n = conn0.recv(&mut buf).await?;
let _ = tx.send(n).await;
Ok::<(), std::io::Error>(())
});

br.drop_offset(1, 1, 1).await;
br.process().await;

let n = rx.recv().await.unwrap();
assert_eq!(n, MSG1.len(), "unexpected length");

Ok(())
}

#[tokio::test]
async fn test_bridge_reorder_packets_from_conn0() -> Result<()> {
let (br, conn0, conn1) = Bridge::new(0, None, None);

let n = conn0.send(&MSG1).await?;
assert_eq!(n, MSG1.len(), "unexpected length");
let n = conn0.send(&MSG2).await?;
assert_eq!(n, MSG2.len(), "unexpected length");

let (tx, mut rx) = mpsc::channel(1);

tokio::spawn(async move {
let mut buf = vec![0u8; 256];
let n = conn1.recv(&mut buf).await?;
assert_eq!(n, MSG2.len(), "unexpected length");
let n = conn1.recv(&mut buf).await?;
assert_eq!(n, MSG1.len(), "unexpected length");

let _ = rx.recv().await;

Ok::<(), std::io::Error>(())
});

br.reorder(0).await;
br.process().await;

let _ = tx.send(()).await;

Ok(())
}

#[tokio::test]
async fn test_bridge_reorder_packets_from_conn1() -> Result<()> {
let (br, conn0, conn1) = Bridge::new(0, None, None);

let n = conn1.send(&MSG1).await?;
assert_eq!(n, MSG1.len(), "unexpected length");
let n = conn1.send(&MSG2).await?;
assert_eq!(n, MSG2.len(), "unexpected length");

let (tx, mut rx) = mpsc::channel(1);

tokio::spawn(async move {
let mut buf = vec![0u8; 256];
let n = conn0.recv(&mut buf).await?;
assert_eq!(n, MSG2.len(), "unexpected length");
let n = conn0.recv(&mut buf).await?;
assert_eq!(n, MSG1.len(), "unexpected length");

let _ = rx.recv().await;

Ok::<(), std::io::Error>(())
});

br.reorder(1).await;
br.process().await;

let _ = tx.send(()).await;

Ok(())
}

#[tokio::test]
async fn test_bridge_inverse_error() -> Result<()> {
let mut q = VecDeque::new();
q.push_back(MSG1.clone());
assert_eq!(inverse(&mut q), false);
Ok(())
}

#[tokio::test]
async fn test_bridge_drop_next_n_packets() -> Result<()> {
for id in 0..2 {
let (br, conn0, conn1) = Bridge::new(0, None, None);
br.drop_next_nwrites(id, 3).await;
let conns: Vec<Arc<dyn Conn + Send + Sync>> = vec![Arc::new(conn0), Arc::new(conn1)];
let src_conn = Arc::clone(&conns[id]);
let dst_conn = Arc::clone(&conns[1 - id]);

let (tx, mut rx) = mpsc::channel(5);

tokio::spawn(async move {
let mut buf = vec![0u8; 256];
for _ in 0..2u8 {
let n = dst_conn.recv(&mut buf).await?;
let _ = tx.send(buf[..n].to_vec()).await;
}

Ok::<(), std::io::Error>(())
});

let mut msgs = vec![];
for i in 0..5u8 {
let msg = format!("msg{}", i);
let n = src_conn.send(msg.as_bytes()).await?;
assert_eq!(n, msg.len(), "[{}] unexpected length", id);
msgs.push(msg);
br.process().await;
}

for i in 0..2 {
if let Some(buf) = rx.recv().await {
assert_eq!(msgs[i + 3].as_bytes(), &buf);
} else {
assert!(false, "{} unexpected number of packets", id);
}
}
}

Ok(())
}
2 changes: 2 additions & 0 deletions src/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ pub mod conn_bridge;
pub mod conn_pipe;
pub mod conn_udp;

#[cfg(test)]
mod conn_bridge_test;
#[cfg(test)]
mod conn_pipe_test;
#[cfg(test)]
Expand Down

0 comments on commit 7c8364a

Please sign in to comment.