Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel encoding #23

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -17,9 +17,17 @@ repository = "https://github.com/image-rs/fdeflate"
homepage = "https://github.com/image-rs/fdeflate"
categories = ["compression"]

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[dependencies]
rayon = { version = "1.9.0", optional = true }
simd-adler32 = "0.3.4"

[dev-dependencies]
miniz_oxide = "0.7.1"
rand = "0.8.5"

[features]
rayon = ["dep:rayon"]
108 changes: 108 additions & 0 deletions src/compress.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use simd_adler32::Adler32;
use std::io::{self, Seek, SeekFrom, Write};

#[cfg(feature = "rayon")]
use rayon::prelude::*;

use crate::tables::{
BITMASKS, HUFFMAN_CODES, HUFFMAN_LENGTHS, LENGTH_TO_LEN_EXTRA, LENGTH_TO_SYMBOL,
};
@@ -166,6 +169,48 @@ impl<W: Write> Compressor<W> {
Ok(())
}

/// Same as `write_data` but compression happens in parallel.
#[cfg(feature = "rayon")]
pub fn par_write_data<D, I>(&mut self, data: D) -> io::Result<()>
where
D: ParallelIterator<Item = I>,
I: AsRef<[u8]>,
{
let chunks: Vec<_> = data
.map(|chunk| {
let mut c = Compressor {
checksum: Adler32::new(),
buffer: 0,
nbits: 0,
writer: Vec::new(),
};
c.write_data(chunk.as_ref()).unwrap();
(c, chunk.as_ref().len())
})
.collect();

for (chunk, length) in chunks {
let mut vs = chunk.writer.chunks_exact(8);
let buffer_mask = if self.nbits == 0 { 0 } else { u64::MAX };
for v in &mut vs {
let bits = u64::from_le_bytes(v.try_into().unwrap());
self.buffer |= bits << self.nbits;
self.writer.write_all(&self.buffer.to_le_bytes())?;
self.buffer = bits.wrapping_shr(64 - self.nbits as u32) & buffer_mask;
}
for b in vs.remainder() {
self.write_bits(*b as u64, 8)?;
}
self.write_bits(chunk.buffer, chunk.nbits)?;

let adler1 = self.checksum.finish();
let adler2 = chunk.checksum.finish();
self.checksum = Adler32::from_checksum(concat_adler32(adler1, adler2, length));
}

Ok(())
}

/// Write the remainder of the stream and return the inner writer.
pub fn finish(mut self) -> io::Result<W> {
// Write end of block
@@ -267,17 +312,67 @@ pub fn compress_to_vec(input: &[u8]) -> Vec<u8> {
compressor.finish().unwrap()
}

/// Compute the adler32 checksum of concatenated slices.
///
/// `a` and `b` are the adler32 checksums of the first and second slice, respectively. `b_len` is
/// the length of the second slice.
#[allow(unused)]
fn concat_adler32(a: u32, b: u32, b_len: usize) -> u32 {
let a_lo = a & 0xFFFF;
let a_hi = a >> 16;
let b_lo = b & 0xFFFF;
let b_hi = b >> 16;

let len = (b_len % 65521) as u32;
let a_sum = (a_lo + 65520) % 65521;

let out_lo = (a_sum + b_lo) % 65521;
let out_hi = (a_hi + b_hi + a_sum * len) % 65521;

(out_hi << 16) | out_lo
}

#[cfg(test)]
mod tests {
use super::*;
use rand::Rng;

#[test]
fn test_concat_adler32() {
let mut rng = rand::thread_rng();
let mut data = vec![0u8; 2048];
for byte in &mut data {
*byte = rng.gen();
}

let adler1 = simd_adler32::adler32(&&data[..1024]);
let adler2 = simd_adler32::adler32(&&data[1024..]);

let full = simd_adler32::adler32(&&*data);

let computed = concat_adler32(adler1, adler2, 1024);

assert_eq!(full, computed, "full: {:x}, computed: {:x}", full, computed);
}

fn roundtrip(data: &[u8]) {
let compressed = compress_to_vec(data);
let decompressed = miniz_oxide::inflate::decompress_to_vec_zlib(&compressed).unwrap();
assert_eq!(&decompressed, data);
}

#[cfg(feature = "rayon")]
fn par_roundtrip(data: &[u8]) {
let mut compressor = Compressor::new(Vec::with_capacity(data.len() / 4)).unwrap();
compressor
.par_write_data(data.par_chunks(1000).map(|chunk| chunk.to_vec()))
.unwrap();
let compressed = compressor.finish().unwrap();

let decompressed = miniz_oxide::inflate::decompress_to_vec_zlib(&compressed).unwrap();
assert_eq!(&decompressed, data);
}

#[test]
fn it_works() {
roundtrip(b"Hello world!");
@@ -302,4 +397,17 @@ mod tests {
roundtrip(&data);
}
}

#[test]
#[cfg(feature = "rayon")]
fn par_random() {
let mut rng = rand::thread_rng();
let mut data = vec![0; 20480];
for _ in 0..10 {
for byte in &mut data {
*byte = rng.gen();
}
par_roundtrip(&data);
}
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@
//! * [RealTime Data Compression blog](https://fastcompression.blogspot.com/2015/10/huffman-revisited-part-4-multi-bytes.html)
#![forbid(unsafe_code)]
#![warn(missing_docs)]
#![cfg_attr(docsrs, feature(doc_auto_cfg))]

mod compress;
mod decompress;