Skip to content

Commit

Permalink
Tweak API layout, more docs
Browse files Browse the repository at this point in the history
  • Loading branch information
sagebind committed May 22, 2019
1 parent aeb0c7a commit e0b94e9
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 21 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ futures-preview = "0.3.0-alpha.16"
criterion = "0.2"

[[bench]]
name = "chunked_pipe"
name = "pipe"
harness = false
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,17 @@ Asynchronous byte buffers and pipes for concurrent I/O programming.
[![Documentation](https://docs.rs/sluice/badge.svg)](https://docs.rs/sluice)
![License](https://img.shields.io/badge/license-MIT-blue.svg)

[Documentation](https://docs.rs/sluice)
## [Documentation]

Check the [documentation] for up-to-date usage and examples.

## Requirements

Currently Sluice requires a nightly Rust compiler as it uses stdlib futures, and uses async/await in tests and benchmarks.

## License

This library is licensed under the MIT license. See the [LICENSE](LICENSE) file for details.


[Documentation]: https://docs.rs/sluice
9 changes: 5 additions & 4 deletions benches/chunked_pipe.rs → benches/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@
extern crate criterion;

use criterion::Criterion;
use futures::executor::ThreadPool;
use futures::prelude::*;
use futures::executor::block_on;
use std::io;

fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("pipe_read_write", |b| {
let mut pool = ThreadPool::new().unwrap();
let data = [1; 0x1000];

b.iter(move || {
let (mut reader, mut writer) = sluice::pipe::chunked_pipe();
b.iter(|| {
let (mut reader, mut writer) = sluice::pipe::pipe();

let producer = async {
for _ in 0..0x10 {
Expand All @@ -27,7 +28,7 @@ fn criterion_benchmark(c: &mut Criterion) {
reader.copy_into(&mut sink).await.unwrap();
};

block_on(future::join(producer, consumer));
pool.run(future::join(producer, consumer));
})
});
}
Expand Down
8 changes: 8 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
//! Asynchronous byte buffers and pipes for concurrent I/O programming.
//!
//! ## Pipes
//!
//! The primary feature offered by Sluice are _pipes_, which are asynchronous
//! in-memory byte buffers that allow separate tasks to read and write from the
//! buffer in parallel.
//!
//! See the `pipe` module for details.
#![cfg_attr(test, feature(async_await))]

Expand Down
25 changes: 17 additions & 8 deletions src/pipe/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
//! amount of memory. Setting a fixed memory limit also gives you a degree of
//! flow control if the producer ends up being faster than the consumer.
//!
//! But for chttp a ring buffer will not work because of how curl's write
//! callbacks are designed. Curl has its own internal buffer management, which
//! we borrow a slice of when receiving data. The size of this slice is unknown,
//! and we must consume all of it at once or none of it.
//! But for some use cases a ring buffer will not work if an application uses
//! its own internal buffer management and requires you to consume either all of
//! a "chunk" of bytes, or none of it.
//!
//! Because of these constraints, instead we use a quite unique type of buffer
//! that uses a fixed number of growable buffers that are exchanged back and
Expand All @@ -29,12 +28,22 @@ use std::io::{self, Cursor};
use std::pin::Pin;

/// Create a new chunked pipe with room for a fixed number of chunks.
pub fn new(size: usize) -> (Reader, Writer) {
let (mut buf_pool_tx, buf_pool_rx) = mpsc::channel(size);
let (buf_stream_tx, buf_stream_rx) = mpsc::channel(size);
///
/// The `count` parameter sets how many buffers are available in the pipe at
/// once. Smaller values will reduce the number of allocations and reallocations
/// may be required when writing and reduce overall memory usage. Larger values
/// reduce the amount of waiting done between chunks if you have a producer and
/// consumer that run at different speeds.
///
/// If `count` is set to 1, then the pipe is essentially serial, since only the
/// reader or writer can operate on the single buffer at one time and cannot be
/// run in parallel.
pub fn new(count: usize) -> (Reader, Writer) {
let (mut buf_pool_tx, buf_pool_rx) = mpsc::channel(count);
let (buf_stream_tx, buf_stream_rx) = mpsc::channel(count);

// Fill up the buffer pool.
for _ in 0..size {
for _ in 0..count {
buf_pool_tx.try_send(Vec::new()).expect("buffer pool overflow");
}

Expand Down
22 changes: 15 additions & 7 deletions src/pipe/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
//! Asynchronous in-memory byte buffers aimed at producer-consumer problems.
//!
//! Pipes are like byte-oriented channels that implement I/O traits for reading
//! and writing.
use futures::prelude::*;
use std::io;
use std::pin::Pin;
use std::task::*;

mod chunked;

/// Creates a new asynchronous pipe implemented using a pool of growable buffers
/// that allow writing a single chunk of any size at a time.
/// How many chunks should be available in a chunked pipe. Default is 4, which
/// strikes a good balance of low memory usage and throughput.
const DEFAULT_CHUNK_COUNT: usize = 4;

/// Creates a new asynchronous pipe with the default configuration.
///
/// This implementation guarantees that when writing a slice of bytes, either
/// the entire slice is written at once or not at all. Slices will never be
/// partially written.
pub fn chunked_pipe() -> (PipeReader, PipeWriter) {
let (reader, writer) = chunked::new(8);
/// The default implementation guarantees that when writing a slice of bytes,
/// either the entire slice is written at once or not at all. Slices will never
/// be partially written.
pub fn pipe() -> (PipeReader, PipeWriter) {
let (reader, writer) = chunked::new(DEFAULT_CHUNK_COUNT);

(
PipeReader {
Expand Down

0 comments on commit e0b94e9

Please sign in to comment.