Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel encoding #23

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Parallel encoding
fintelia committed Mar 10, 2024
commit de8e50d9e28259ba98ddf8d5a512584d70b83367
8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -17,9 +17,17 @@ repository = "https://github.com/image-rs/fdeflate"
homepage = "https://github.com/image-rs/fdeflate"
categories = ["compression"]

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[dependencies]
rayon = { version = "1.9.0", optional = true }
simd-adler32 = "0.3.4"

[dev-dependencies]
miniz_oxide = "0.7.1"
rand = "0.8.5"

[features]
rayon = ["dep:rayon"]
133 changes: 133 additions & 0 deletions src/compress.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use simd_adler32::Adler32;
use std::io::{self, Seek, SeekFrom, Write};

#[cfg(feature = "rayon")]
use rayon::prelude::*;

use crate::tables::{
BITMASKS, HUFFMAN_CODES, HUFFMAN_LENGTHS, LENGTH_TO_LEN_EXTRA, LENGTH_TO_SYMBOL,
};
@@ -166,6 +169,48 @@ impl<W: Write> Compressor<W> {
Ok(())
}

/// Same as `write_data` but compression happens in parallel.
#[cfg(feature = "rayon")]
pub fn par_write_data<D, I>(&mut self, data: D) -> io::Result<()>
where
D: ParallelIterator<Item = I>,
I: AsRef<[u8]>,
{
let chunks: Vec<_> = data
.map(|chunk| {
let mut c = Compressor {
checksum: Adler32::new(),
buffer: 0,
nbits: 0,
writer: Vec::new(),
};
c.write_data(chunk.as_ref()).unwrap();
(c, chunk.as_ref().len())
})
.collect();

for (chunk, length) in chunks {
let mut vs = chunk.writer.chunks_exact(8);
let buffer_mask = if self.nbits == 0 { 0 } else { u64::MAX };
for v in &mut vs {
let bits = u64::from_le_bytes(v.try_into().unwrap());
self.buffer |= bits << self.nbits;
self.writer.write_all(&self.buffer.to_le_bytes())?;
self.buffer = bits.wrapping_shr(64 - self.nbits as u32) & buffer_mask;
}
for b in vs.remainder() {
self.write_bits(*b as u64, 8)?;
}
self.write_bits(chunk.buffer, chunk.nbits)?;

let adler1 = self.checksum.finish();
let adler2 = chunk.checksum.finish();
self.checksum = Adler32::from_checksum(concat_adler32(adler1, adler2, length));
}

Ok(())
}

/// Write the remainder of the stream and return the inner writer.
pub fn finish(mut self) -> io::Result<W> {
// Write end of block
@@ -181,6 +226,31 @@ impl<W: Write> Compressor<W> {
}
}

// pub struct ParallelCompressor<W: Write> {
// checksum: Adler32,
// buffer: u64,
// nbits: u8,
// writer: W,
// }

// impl<W: Write> ParallelCompressor<W> {
// /// Create a new Compressor.
// pub fn new(writer: W) -> io::Result<Self> {
// let mut compressor = Self {
// checksum: Adler32::new(),
// buffer: 0,
// nbits: 0,
// writer,
// };
// compressor.write_headers()?;
// Ok(compressor)
// }

// pub fn write_data(&mut self, impl) -> io::Result<()> {
// let block_header = [236, 192, 3, 160, 36, 89, 150, 198, 241, 255, 119, 238, 141, 200, 204, 167, 114, 75, 99, 174, 109, 219, 182, 109, 219, 182, 109, 219, 182, 109, 105, 140, 158, 150, 74, 175, 158, 50, 51, 34, 238, 249, 118, 183, 106, 122, 166, 135, 59, 107, 213, 15];
// }
// }

/// Compressor that only writes the stored blocks.
///
/// This is useful for writing files that are not compressed, but still need to be wrapped in a
@@ -267,17 +337,67 @@ pub fn compress_to_vec(input: &[u8]) -> Vec<u8> {
compressor.finish().unwrap()
}

/// Compute the adler32 checksum of concatenated slices.
///
/// `a` and `b` are the adler32 checksums of the first and second slice, respectively. `b_len` is
/// the length of the second slice.
#[allow(unused)]
fn concat_adler32(a: u32, b: u32, b_len: usize) -> u32 {
let a_lo = a & 0xFFFF;
let a_hi = a >> 16;
let b_lo = b & 0xFFFF;
let b_hi = b >> 16;

let len = (b_len % 65521) as u32;
let a_sum = (a_lo + 65520) % 65521;

let out_lo = (a_sum + b_lo) % 65521;
let out_hi = (a_hi + b_hi + a_sum * len) % 65521;

(out_hi << 16) | out_lo
}

#[cfg(test)]
mod tests {
use super::*;
use rand::Rng;

#[test]
fn test_concat_adler32() {
let mut rng = rand::thread_rng();
let mut data = vec![0u8; 2048];
for byte in &mut data {
*byte = rng.gen();
}

let adler1 = simd_adler32::adler32(&&data[..1024]);
let adler2 = simd_adler32::adler32(&&data[1024..]);

let full = simd_adler32::adler32(&&*data);

let computed = concat_adler32(adler1, adler2, 1024);

assert_eq!(full, computed, "full: {:x}, computed: {:x}", full, computed);
}

fn roundtrip(data: &[u8]) {
let compressed = compress_to_vec(data);
let decompressed = miniz_oxide::inflate::decompress_to_vec_zlib(&compressed).unwrap();
assert_eq!(&decompressed, data);
}

#[cfg(feature = "rayon")]
fn par_roundtrip(data: &[u8]) {
let mut compressor = Compressor::new(Vec::with_capacity(data.len() / 4)).unwrap();
compressor
.par_write_data(data.par_chunks(1000).map(|chunk| chunk.to_vec()))
.unwrap();
let compressed = compressor.finish().unwrap();

let decompressed = miniz_oxide::inflate::decompress_to_vec_zlib(&compressed).unwrap();
assert_eq!(&decompressed, data);
}

#[test]
fn it_works() {
roundtrip(b"Hello world!");
@@ -302,4 +422,17 @@ mod tests {
roundtrip(&data);
}
}

#[test]
#[cfg(feature = "rayon")]
fn par_random() {
let mut rng = rand::thread_rng();
let mut data = vec![0; 20480];
for _ in 0..10 {
for byte in &mut data {
*byte = rng.gen();
}
par_roundtrip(&data);
}
}
}
82 changes: 82 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@
//! * [RealTime Data Compression blog](https://fastcompression.blogspot.com/2015/10/huffman-revisited-part-4-multi-bytes.html)
#![forbid(unsafe_code)]
#![warn(missing_docs)]
#![cfg_attr(docsrs, feature(doc_auto_cfg))]

mod compress;
mod decompress;
@@ -121,3 +122,84 @@ const fn compute_codes<const NSYMS: usize>(lengths: &[u8; NSYMS]) -> Option<[u16
None
}
}

// #[cfg(test)]
// mod tests {

// #[test]
// fn header() {
// use crate::tables::HUFFMAN_LENGTHS;

// let mut runs = Vec::new();
// let mut run_start = None;
// for i in 0..HUFFMAN_LENGTHS.len() {
// if run_start.is_none() {
// run_start = Some(i);
// } else if HUFFMAN_LENGTHS[i] != HUFFMAN_LENGTHS[i - 1] {
// runs.push((HUFFMAN_LENGTHS[i - 1], i - run_start.unwrap()));
// run_start = Some(i);
// }
// }
// runs.push((
// *HUFFMAN_LENGTHS.last().unwrap(),
// HUFFMAN_LENGTHS.len() - run_start.unwrap(),
// ));

// let mut freqs = [0u64; 14];
// freqs[1] = 1;

// let mut symbols = Vec::new();

// for (val, mut count) in runs {
// if count < 4 {
// freqs[val as usize] += count as u64;
// for _ in 0..count {
// symbols.push((val, 0));
// }
// } else {
// symbols.push((val, 0));
// freqs[val as usize] += 1;
// count -= 1;

// while count >= 9 {
// freqs[13] += 1;
// count -= 6;
// symbols.push((16, 3));
// }
// // match count {
// // 0 ..= 2 => unreachable!("Shouldn't happen"),
// // 3 ..= 6 => freqs[13] += 1,
// // 7 ..= 12 => freqs[13] += 2,
// // 13 .. => unreachable!("Shouldn't happen"),
// // }
// match count {
// 0..=2 => unreachable!("Shouldn't happen"),
// 3..=6 => symbols.push((16, count - 3)),
// 7 => {
// symbols.push((16, 0));
// symbols.push((16, 1));
// }
// 8 => {
// symbols.push((16, 1));
// symbols.push((16, 1));
// }
// 9.. => unreachable!("Shouldn't happen"),
// }
// }
// }
// println!("symbols = {:?}", symbols);

// let mut lengths = [0u8; 13];
// crate::compute_code_lengths(&freqs[1..], &[1; 13], &[7; 13], &mut lengths);

// let mut bits_used = 0;
// for i in 0..13 {
// bits_used += u64::from(freqs[i + 1]) * u64::from(lengths[i]);
// }
// println!("Bits used: {}", bits_used);

// println!("{:?}", lengths);

// panic!();
// }
// }