From de8e50d9e28259ba98ddf8d5a512584d70b83367 Mon Sep 17 00:00:00 2001 From: Jonathan Behrens Date: Sat, 9 Mar 2024 16:43:14 -0800 Subject: [PATCH 1/3] Parallel encoding --- Cargo.toml | 8 +++ src/compress.rs | 133 ++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 82 +++++++++++++++++++++++++++++ 3 files changed, 223 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 59910ff..4769c9f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,9 +17,17 @@ repository = "https://github.com/image-rs/fdeflate" homepage = "https://github.com/image-rs/fdeflate" categories = ["compression"] +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + [dependencies] +rayon = { version = "1.9.0", optional = true } simd-adler32 = "0.3.4" [dev-dependencies] miniz_oxide = "0.7.1" rand = "0.8.5" + +[features] +rayon = ["dep:rayon"] \ No newline at end of file diff --git a/src/compress.rs b/src/compress.rs index b55116e..43442f9 100644 --- a/src/compress.rs +++ b/src/compress.rs @@ -1,6 +1,9 @@ use simd_adler32::Adler32; use std::io::{self, Seek, SeekFrom, Write}; +#[cfg(feature = "rayon")] +use rayon::prelude::*; + use crate::tables::{ BITMASKS, HUFFMAN_CODES, HUFFMAN_LENGTHS, LENGTH_TO_LEN_EXTRA, LENGTH_TO_SYMBOL, }; @@ -166,6 +169,48 @@ impl Compressor { Ok(()) } + /// Same as `write_data` but compression happens in parallel. + #[cfg(feature = "rayon")] + pub fn par_write_data(&mut self, data: D) -> io::Result<()> + where + D: ParallelIterator, + I: AsRef<[u8]>, + { + let chunks: Vec<_> = data + .map(|chunk| { + let mut c = Compressor { + checksum: Adler32::new(), + buffer: 0, + nbits: 0, + writer: Vec::new(), + }; + c.write_data(chunk.as_ref()).unwrap(); + (c, chunk.as_ref().len()) + }) + .collect(); + + for (chunk, length) in chunks { + let mut vs = chunk.writer.chunks_exact(8); + let buffer_mask = if self.nbits == 0 { 0 } else { u64::MAX }; + for v in &mut vs { + let bits = u64::from_le_bytes(v.try_into().unwrap()); + self.buffer |= bits << self.nbits; + self.writer.write_all(&self.buffer.to_le_bytes())?; + self.buffer = bits.wrapping_shr(64 - self.nbits as u32) & buffer_mask; + } + for b in vs.remainder() { + self.write_bits(*b as u64, 8)?; + } + self.write_bits(chunk.buffer, chunk.nbits)?; + + let adler1 = self.checksum.finish(); + let adler2 = chunk.checksum.finish(); + self.checksum = Adler32::from_checksum(concat_adler32(adler1, adler2, length)); + } + + Ok(()) + } + /// Write the remainder of the stream and return the inner writer. pub fn finish(mut self) -> io::Result { // Write end of block @@ -181,6 +226,31 @@ impl Compressor { } } +// pub struct ParallelCompressor { +// checksum: Adler32, +// buffer: u64, +// nbits: u8, +// writer: W, +// } + +// impl ParallelCompressor { +// /// Create a new Compressor. +// pub fn new(writer: W) -> io::Result { +// let mut compressor = Self { +// checksum: Adler32::new(), +// buffer: 0, +// nbits: 0, +// writer, +// }; +// compressor.write_headers()?; +// Ok(compressor) +// } + +// pub fn write_data(&mut self, impl) -> io::Result<()> { +// let block_header = [236, 192, 3, 160, 36, 89, 150, 198, 241, 255, 119, 238, 141, 200, 204, 167, 114, 75, 99, 174, 109, 219, 182, 109, 219, 182, 109, 219, 182, 109, 105, 140, 158, 150, 74, 175, 158, 50, 51, 34, 238, 249, 118, 183, 106, 122, 166, 135, 59, 107, 213, 15]; +// } +// } + /// Compressor that only writes the stored blocks. /// /// This is useful for writing files that are not compressed, but still need to be wrapped in a @@ -267,17 +337,67 @@ pub fn compress_to_vec(input: &[u8]) -> Vec { compressor.finish().unwrap() } +/// Compute the adler32 checksum of concatenated slices. +/// +/// `a` and `b` are the adler32 checksums of the first and second slice, respectively. `b_len` is +/// the length of the second slice. +#[allow(unused)] +fn concat_adler32(a: u32, b: u32, b_len: usize) -> u32 { + let a_lo = a & 0xFFFF; + let a_hi = a >> 16; + let b_lo = b & 0xFFFF; + let b_hi = b >> 16; + + let len = (b_len % 65521) as u32; + let a_sum = (a_lo + 65520) % 65521; + + let out_lo = (a_sum + b_lo) % 65521; + let out_hi = (a_hi + b_hi + a_sum * len) % 65521; + + (out_hi << 16) | out_lo +} + #[cfg(test)] mod tests { use super::*; use rand::Rng; + #[test] + fn test_concat_adler32() { + let mut rng = rand::thread_rng(); + let mut data = vec![0u8; 2048]; + for byte in &mut data { + *byte = rng.gen(); + } + + let adler1 = simd_adler32::adler32(&&data[..1024]); + let adler2 = simd_adler32::adler32(&&data[1024..]); + + let full = simd_adler32::adler32(&&*data); + + let computed = concat_adler32(adler1, adler2, 1024); + + assert_eq!(full, computed, "full: {:x}, computed: {:x}", full, computed); + } + fn roundtrip(data: &[u8]) { let compressed = compress_to_vec(data); let decompressed = miniz_oxide::inflate::decompress_to_vec_zlib(&compressed).unwrap(); assert_eq!(&decompressed, data); } + #[cfg(feature = "rayon")] + fn par_roundtrip(data: &[u8]) { + let mut compressor = Compressor::new(Vec::with_capacity(data.len() / 4)).unwrap(); + compressor + .par_write_data(data.par_chunks(1000).map(|chunk| chunk.to_vec())) + .unwrap(); + let compressed = compressor.finish().unwrap(); + + let decompressed = miniz_oxide::inflate::decompress_to_vec_zlib(&compressed).unwrap(); + assert_eq!(&decompressed, data); + } + #[test] fn it_works() { roundtrip(b"Hello world!"); @@ -302,4 +422,17 @@ mod tests { roundtrip(&data); } } + + #[test] + #[cfg(feature = "rayon")] + fn par_random() { + let mut rng = rand::thread_rng(); + let mut data = vec![0; 20480]; + for _ in 0..10 { + for byte in &mut data { + *byte = rng.gen(); + } + par_roundtrip(&data); + } + } } diff --git a/src/lib.rs b/src/lib.rs index 569c872..00c367f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,6 +20,7 @@ //! * [RealTime Data Compression blog](https://fastcompression.blogspot.com/2015/10/huffman-revisited-part-4-multi-bytes.html) #![forbid(unsafe_code)] #![warn(missing_docs)] +#![cfg_attr(docsrs, feature(doc_auto_cfg))] mod compress; mod decompress; @@ -121,3 +122,84 @@ const fn compute_codes(lengths: &[u8; NSYMS]) -> Option<[u16 None } } + +// #[cfg(test)] +// mod tests { + +// #[test] +// fn header() { +// use crate::tables::HUFFMAN_LENGTHS; + +// let mut runs = Vec::new(); +// let mut run_start = None; +// for i in 0..HUFFMAN_LENGTHS.len() { +// if run_start.is_none() { +// run_start = Some(i); +// } else if HUFFMAN_LENGTHS[i] != HUFFMAN_LENGTHS[i - 1] { +// runs.push((HUFFMAN_LENGTHS[i - 1], i - run_start.unwrap())); +// run_start = Some(i); +// } +// } +// runs.push(( +// *HUFFMAN_LENGTHS.last().unwrap(), +// HUFFMAN_LENGTHS.len() - run_start.unwrap(), +// )); + +// let mut freqs = [0u64; 14]; +// freqs[1] = 1; + +// let mut symbols = Vec::new(); + +// for (val, mut count) in runs { +// if count < 4 { +// freqs[val as usize] += count as u64; +// for _ in 0..count { +// symbols.push((val, 0)); +// } +// } else { +// symbols.push((val, 0)); +// freqs[val as usize] += 1; +// count -= 1; + +// while count >= 9 { +// freqs[13] += 1; +// count -= 6; +// symbols.push((16, 3)); +// } +// // match count { +// // 0 ..= 2 => unreachable!("Shouldn't happen"), +// // 3 ..= 6 => freqs[13] += 1, +// // 7 ..= 12 => freqs[13] += 2, +// // 13 .. => unreachable!("Shouldn't happen"), +// // } +// match count { +// 0..=2 => unreachable!("Shouldn't happen"), +// 3..=6 => symbols.push((16, count - 3)), +// 7 => { +// symbols.push((16, 0)); +// symbols.push((16, 1)); +// } +// 8 => { +// symbols.push((16, 1)); +// symbols.push((16, 1)); +// } +// 9.. => unreachable!("Shouldn't happen"), +// } +// } +// } +// println!("symbols = {:?}", symbols); + +// let mut lengths = [0u8; 13]; +// crate::compute_code_lengths(&freqs[1..], &[1; 13], &[7; 13], &mut lengths); + +// let mut bits_used = 0; +// for i in 0..13 { +// bits_used += u64::from(freqs[i + 1]) * u64::from(lengths[i]); +// } +// println!("Bits used: {}", bits_used); + +// println!("{:?}", lengths); + +// panic!(); +// } +// } From 4d05f6ab72098c0731d5dd1ccf53438b61bdba92 Mon Sep 17 00:00:00 2001 From: Jonathan Behrens Date: Sat, 9 Mar 2024 17:04:14 -0800 Subject: [PATCH 2/3] cleanup --- Cargo.toml | 2 +- src/compress.rs | 25 ------------------------- 2 files changed, 1 insertion(+), 26 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4769c9f..6941885 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,4 +30,4 @@ miniz_oxide = "0.7.1" rand = "0.8.5" [features] -rayon = ["dep:rayon"] \ No newline at end of file +rayon = ["dep:rayon"] diff --git a/src/compress.rs b/src/compress.rs index 43442f9..66ac410 100644 --- a/src/compress.rs +++ b/src/compress.rs @@ -226,31 +226,6 @@ impl Compressor { } } -// pub struct ParallelCompressor { -// checksum: Adler32, -// buffer: u64, -// nbits: u8, -// writer: W, -// } - -// impl ParallelCompressor { -// /// Create a new Compressor. -// pub fn new(writer: W) -> io::Result { -// let mut compressor = Self { -// checksum: Adler32::new(), -// buffer: 0, -// nbits: 0, -// writer, -// }; -// compressor.write_headers()?; -// Ok(compressor) -// } - -// pub fn write_data(&mut self, impl) -> io::Result<()> { -// let block_header = [236, 192, 3, 160, 36, 89, 150, 198, 241, 255, 119, 238, 141, 200, 204, 167, 114, 75, 99, 174, 109, 219, 182, 109, 219, 182, 109, 219, 182, 109, 105, 140, 158, 150, 74, 175, 158, 50, 51, 34, 238, 249, 118, 183, 106, 122, 166, 135, 59, 107, 213, 15]; -// } -// } - /// Compressor that only writes the stored blocks. /// /// This is useful for writing files that are not compressed, but still need to be wrapped in a From 9403073e64ed5c9d18c1092094f8d229c43c6624 Mon Sep 17 00:00:00 2001 From: Jonathan Behrens Date: Sat, 9 Mar 2024 17:04:52 -0800 Subject: [PATCH 3/3] remove commented code --- src/lib.rs | 81 ------------------------------------------------------ 1 file changed, 81 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 00c367f..36832ed 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -122,84 +122,3 @@ const fn compute_codes(lengths: &[u8; NSYMS]) -> Option<[u16 None } } - -// #[cfg(test)] -// mod tests { - -// #[test] -// fn header() { -// use crate::tables::HUFFMAN_LENGTHS; - -// let mut runs = Vec::new(); -// let mut run_start = None; -// for i in 0..HUFFMAN_LENGTHS.len() { -// if run_start.is_none() { -// run_start = Some(i); -// } else if HUFFMAN_LENGTHS[i] != HUFFMAN_LENGTHS[i - 1] { -// runs.push((HUFFMAN_LENGTHS[i - 1], i - run_start.unwrap())); -// run_start = Some(i); -// } -// } -// runs.push(( -// *HUFFMAN_LENGTHS.last().unwrap(), -// HUFFMAN_LENGTHS.len() - run_start.unwrap(), -// )); - -// let mut freqs = [0u64; 14]; -// freqs[1] = 1; - -// let mut symbols = Vec::new(); - -// for (val, mut count) in runs { -// if count < 4 { -// freqs[val as usize] += count as u64; -// for _ in 0..count { -// symbols.push((val, 0)); -// } -// } else { -// symbols.push((val, 0)); -// freqs[val as usize] += 1; -// count -= 1; - -// while count >= 9 { -// freqs[13] += 1; -// count -= 6; -// symbols.push((16, 3)); -// } -// // match count { -// // 0 ..= 2 => unreachable!("Shouldn't happen"), -// // 3 ..= 6 => freqs[13] += 1, -// // 7 ..= 12 => freqs[13] += 2, -// // 13 .. => unreachable!("Shouldn't happen"), -// // } -// match count { -// 0..=2 => unreachable!("Shouldn't happen"), -// 3..=6 => symbols.push((16, count - 3)), -// 7 => { -// symbols.push((16, 0)); -// symbols.push((16, 1)); -// } -// 8 => { -// symbols.push((16, 1)); -// symbols.push((16, 1)); -// } -// 9.. => unreachable!("Shouldn't happen"), -// } -// } -// } -// println!("symbols = {:?}", symbols); - -// let mut lengths = [0u8; 13]; -// crate::compute_code_lengths(&freqs[1..], &[1; 13], &[7; 13], &mut lengths); - -// let mut bits_used = 0; -// for i in 0..13 { -// bits_used += u64::from(freqs[i + 1]) * u64::from(lengths[i]); -// } -// println!("Bits used: {}", bits_used); - -// println!("{:?}", lengths); - -// panic!(); -// } -// }