Skip to content

Commit

Permalink
fix: removed half-backed test
Browse files Browse the repository at this point in the history
  • Loading branch information
wyfo committed Dec 11, 2024
1 parent 9d051b1 commit f862ece
Showing 1 changed file with 0 additions and 100 deletions.
100 changes: 0 additions & 100 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1286,104 +1286,4 @@ mod tests {
tokio::time::sleep(Duration::from_millis(500)).await;
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn tx_pipeline_closed() -> ZResult<()> {
fn schedule(queue: TransmissionPipelineProducer, counter: Arc<AtomicUsize>, id: usize) {
// Make sure to put only one message per batch: set the payload size
// to half of the batch in such a way the serialized zenoh message
// will be larger then half of the batch size (header + payload).
let payload_size = (CONFIG_STREAMED.batch.mtu / 2) as usize;

// Send reliable messages
let key = "test".into();
let payload = ZBuf::from(vec![0_u8; payload_size]);

let message: NetworkMessage = Push {
wire_expr: key,
ext_qos: ext::QoSType::new(Priority::Control, CongestionControl::Block, false),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
payload: PushBody::Put(Put {
timestamp: None,
encoding: Encoding::empty(),
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment: None,
ext_unknown: vec![],
payload,
}),
}
.into();

// The last push should block since there shouldn't any more batches
// available for serialization.
let num_msg = 1 + CONFIG_STREAMED.queue_size[0];
for i in 0..num_msg {
println!(
"Pipeline Blocking [>>>]: ({id}) Scheduling message #{i} with payload size of {payload_size} bytes"
);
queue.push_network_message(message.clone()).unwrap();
let c = counter.fetch_add(1, Ordering::AcqRel);
println!(
"Pipeline Blocking [>>>]: ({}) Scheduled message #{} (tot {}) with payload size of {} bytes",
id, i, c + 1,
payload_size
);
}
}

// Pipeline
let tct = TransportPriorityTx::make(Bits::from(TransportSn::MAX))?;
let priorities = vec![tct];
let (producer, mut consumer) =
TransmissionPipeline::make(CONFIG_NOT_STREAMED, priorities.as_slice());

let counter = Arc::new(AtomicUsize::new(0));

let c_producer = producer.clone();
let c_counter = counter.clone();
let h1 = task::spawn_blocking(move || {
schedule(c_producer, c_counter, 1);
});

let c_counter = counter.clone();
let h2 = task::spawn_blocking(move || {
schedule(producer, c_counter, 2);
});

// Wait to have sent enough messages and to have blocked
println!(
"Pipeline Blocking [---]: waiting to have {} messages being scheduled",
CONFIG_STREAMED.queue_size[Priority::MAX as usize]
);
let check = async {
while counter.load(Ordering::Acquire)
< CONFIG_STREAMED.queue_size[Priority::MAX as usize]
{
tokio::time::sleep(SLEEP).await;
}
};

timeout(TIMEOUT, check).await?;

// Disable and drain the queue
timeout(
TIMEOUT,
task::spawn_blocking(move || {
println!("Pipeline Blocking [---]: draining the queue");
let _ = consumer.drain();
}),
)
.await??;

// Make sure that the tasks scheduling have been unblocked
println!("Pipeline Blocking [---]: waiting for schedule (1) to be unblocked");
timeout(TIMEOUT, h1).await??;
println!("Pipeline Blocking [---]: waiting for schedule (2) to be unblocked");
timeout(TIMEOUT, h2).await??;

Ok(())
}
}

0 comments on commit f862ece

Please sign in to comment.