diff --git a/benches/benches.rs b/benches/benches.rs index a3d0af3..6b24547 100644 --- a/benches/benches.rs +++ b/benches/benches.rs @@ -1,7 +1,8 @@ use cabac::perf::{ - h265_get_pattern, h265_get_pattern_bypass, h265_put_pattern, h265_put_pattern_bypass, - rans32_get_pattern, rans32_get_pattern_bypass, rans32_put_pattern, rans32_put_pattern_bypass, - vp8_get_pattern, vp8_get_pattern_bypass, vp8_put_pattern, vp8_put_pattern_bypass, + fpaq_get_pattern, fpaq_put_pattern, h265_get_pattern, h265_get_pattern_bypass, + h265_put_pattern, h265_put_pattern_bypass, rans32_get_pattern, rans32_get_pattern_bypass, + rans32_put_pattern, rans32_put_pattern_bypass, vp8_get_pattern, vp8_get_pattern_bypass, + vp8_put_pattern, vp8_put_pattern_bypass, }; use criterion::{criterion_group, criterion_main, Criterion}; @@ -15,6 +16,7 @@ fn criterion_benchmark(c: &mut Criterion) { let rans_pattern = rans32_put_pattern(&pattern); let vp8_pattern = vp8_put_pattern(&pattern); let h265_pattern = h265_put_pattern(&pattern); + let fpaq_pattern = fpaq_put_pattern(&pattern); let rans_pattern_bypass = rans32_put_pattern_bypass(&pattern); let vp8_pattern_bypass = vp8_put_pattern_bypass(&pattern); @@ -56,6 +58,18 @@ fn criterion_benchmark(c: &mut Criterion) { }) }); + c.bench_function("Fpaq write", |b| { + b.iter(|| { + fpaq_put_pattern(&pattern); + }) + }); + + c.bench_function("Fpaq read", |b| { + b.iter(|| { + fpaq_get_pattern(&pattern, &fpaq_pattern); + }) + }); + c.bench_function("H265 read", |b| { b.iter(|| { h265_get_pattern(&pattern, &h265_pattern); diff --git a/src/fpaq0.rs b/src/fpaq0.rs new file mode 100644 index 0000000..3758bff --- /dev/null +++ b/src/fpaq0.rs @@ -0,0 +1,147 @@ +use crate::{ + traits::{CabacReader, CabacWriter, GetInnerBuffer}, + vp8::VP8Context, +}; +use std::io::{Read, Result, Write}; + +pub struct Fpaq0Decoder { + inner_reader: R, + xl: u32, + xr: u32, + x: u32, +} + +impl Fpaq0Decoder { + pub fn new(mut reader: R) -> Result { + let mut x: u32 = 0; + for _ in 0..4 { + let mut b = [0u8]; + let _ = reader.read(&mut b)?; + + x = (x << 8) | u32::from(b[0]); + } + + Ok(Fpaq0Decoder { + inner_reader: reader, + xl: 0, + xr: 0xffff_ffff, + x, + }) + } + + fn fill_bits(&mut self) -> Result<()> { + while 0 == ((self.xl ^ self.xr) & 0xFF00_0000) { + self.xl <<= 8; + self.xr = (self.xr << 8) | 0x0000_00FF; + + let mut b = [0u8]; + let _ = self.inner_reader.read(&mut b)?; + + self.x = (self.x << 8) | u32::from(b[0]); + } + Ok(()) + } +} + +impl CabacReader for Fpaq0Decoder { + fn get_bypass(&mut self) -> Result { + let xm = self.xl + (((self.xr - self.xl) & 0xffffff00) >> 1); + let mut bit = true; + if self.x <= xm { + bit = false; + self.xr = xm; + } else { + self.xl = xm + 1; + } + + self.fill_bits()?; + + Ok(bit) + } + + fn get(&mut self, cur_ctx: &mut VP8Context) -> Result { + let xm = self.xl + ((self.xr - self.xl) >> 8) * u32::from(cur_ctx.get_probability().get()); + let mut bit = true; + if self.x <= xm { + bit = false; + self.xr = xm; + } else { + self.xl = xm + 1; + } + + cur_ctx.record_and_update_bit(bit); + + self.fill_bits()?; + + Ok(bit) + } +} + +pub struct Fpaq0Encoder { + inner_writer: W, + xl: u32, + xr: u32, +} + +impl GetInnerBuffer for Fpaq0Encoder> { + fn inner_buffer(&self) -> &[u8] { + &self.inner_writer + } +} + +impl Fpaq0Encoder { + pub fn new(writer: W) -> Self { + Fpaq0Encoder { + inner_writer: writer, + xl: 0, + xr: 0xffff_ffff, + } + } + + fn flush_bits(&mut self) -> Result<()> { + while 0 == ((self.xl ^ self.xr) & 0xFF00_0000) { + let byte = (self.xr >> 24) as u8; + self.inner_writer.write_all(&[byte])?; + self.xl <<= 8; + self.xr = (self.xr << 8) | 0x0000_00FF; + } + Ok(()) + } +} + +impl CabacWriter for Fpaq0Encoder { + fn put(&mut self, bit: bool, branch: &mut VP8Context) -> Result<()> { + let xm = self.xl + ((self.xr - self.xl) >> 8) * u32::from(branch.get_probability().get()); + + // left/lower part of the interval corresponds to zero + + if !bit { + self.xr = xm; + } else { + self.xl = xm + 1; + } + + branch.record_and_update_bit(bit); + + self.flush_bits() + } + + fn put_bypass(&mut self, bit: bool) -> Result<()> { + let xm = self.xl + (((self.xr - self.xl) & 0xffffff00) >> 1); + + // left/lower part of the interval corresponds to zero + + if !bit { + self.xr = xm; + } else { + self.xl = xm + 1; + } + + self.flush_bits() + } + + fn finish(&mut self) -> Result<()> { + let byte = (self.xr >> 24) as u8; + self.inner_writer.write_all(&[byte]) + } +} diff --git a/src/fpaq0parallel.rs b/src/fpaq0parallel.rs new file mode 100644 index 0000000..7a736fc --- /dev/null +++ b/src/fpaq0parallel.rs @@ -0,0 +1,330 @@ +/// Special version of FPaq0 that allows for parallel encoding and decoding. There is some overhead on the encoding +/// side since we need to track the future output byte locations so that the reader can read them back without any +/// special signalling. +/// +/// Original algorithm developed by Matt Mahoney https://mattmahoney.net/dc/fpaq0.cpp +/// +/// I like this implementation since it has no carry processing compared to other arithmetic encoders and the bytes +/// align exactly with reads and writes. This makes it especially suitable for this kind of parallel encoding and decoding. +/// +/// As long as you exactly match your puts and gets, you can even put bytes in the middle of the stream, as long +/// as you read them back in the same order. +/// +/// This gives you many of the advantages of rANS decoding without the need to do everything in reverse, and also +/// the encoding doesn't require any divide/mod ops like rANS does. +/// +/// Parallelization implements the idea from: +/// P. G. Howard, "Interleaving entropy codes," Proceedings. Compression and Complexity of SEQUENCES 1997 +/// (Cat. No.97TB100171), Salerno, Italy, 1997, pp. 45-55, doi: 10.1109/SEQUEN.1997.666902. +use crate::vp8::VP8Context; +use std::{ + collections::VecDeque, + io::{Read, Result, Write}, +}; + +/// Decodes a byte stream encoded by Fpaq0EncoderParallel +pub struct Fpaq0DecoderParallel { + xl: u32, + xr: u32, + x: u32, +} + +impl std::fmt::Debug for Fpaq0DecoderParallel { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Fpaq0Decoder {{ xl: {:x}, xr: {:x}, x: {:x} }}", + self.xl, self.xr, self.x + ) + } +} + +impl Fpaq0DecoderParallel { + pub fn new(reader: &mut impl Read) -> Result { + let mut x: u32 = 0; + for _ in 0..4 { + let mut b = [0u8]; + let _ = reader.read_exact(&mut b)?; + + x = (x << 8) | u32::from(b[0]); + } + + Ok(Fpaq0DecoderParallel { + xl: 0, + xr: 0xffff_ffff, + x, + }) + } + + fn fill_bits(&mut self, reader: &mut impl Read) -> Result<()> { + while 0 == ((self.xl ^ self.xr) & 0xFF00_0000) { + self.xl <<= 8; + self.xr = (self.xr << 8) | 0x0000_00FF; + + let mut b = [0u8]; + let _ = reader.read_exact(&mut b)?; + + self.x = (self.x << 8) | u32::from(b[0]); + } + Ok(()) + } + + /// reads a bit from the stream given a certain probability context + pub fn get(&mut self, cur_ctx: &mut VP8Context, reader: &mut impl Read) -> Result { + let xm = self.xl + ((self.xr - self.xl) >> 8) * u32::from(cur_ctx.get_probability().get()); + let mut bit = true; + if self.x <= xm { + bit = false; + self.xr = xm; + } else { + self.xl = xm + 1; + } + + cur_ctx.record_and_update_bit(bit); + + self.fill_bits(reader)?; + + Ok(bit) + } +} + +#[derive(Clone, PartialEq, Eq)] +enum FutureOutputType { + Reserved(u8), + Commit(u8), +} + +/// This holds the output and stitches together the future output in the right +/// order so that the reader can read it back without any special signalling. +pub struct EncoderOutput { + future_output: VecDeque, + output: W, +} + +impl EncoderOutput { + pub fn new(output: W) -> Self { + EncoderOutput { + future_output: VecDeque::new(), + output, + } + } + + /// writes a byte to the output stream in such a position that it can be + /// read back by the decoder without any special signalling as long + /// as it is done in the same order as it was written + pub fn write_bypass_byte(&mut self, byte: u8) -> Result<()> { + self.future_output.push_back(FutureOutputType::Commit(byte)); + + self.write_ready_bytes()?; + + Ok(()) + } + + fn write_ready_bytes(&mut self) -> Result<()> { + while let Some(FutureOutputType::Commit(v)) = self.future_output.front() { + self.output.write_all(&[*v])?; + let _ = self.future_output.pop_front().unwrap(); + } + + Ok(()) + } +} + +pub struct Fpaq0EncoderParallel { + xl: u32, + xr: u32, + id: u8, +} + +impl std::fmt::Debug for Fpaq0EncoderParallel { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Fpaq0Encoder {{ xl: {:x}, xr: {:x} }}", self.xl, self.xr) + } +} + +impl Fpaq0EncoderParallel { + pub fn new(output: &mut EncoderOutput, id: u8) -> Self { + for _i in 0..4 { + output + .future_output + .push_back(FutureOutputType::Reserved(id)); + } + + Fpaq0EncoderParallel { + xl: 0, + xr: 0xffff_ffff, + id, + } + } + + /// writes a byte to the steam in its reserved location. If repush is true, it will + /// reserve a new location for the next byte. + fn flush_byte( + &mut self, + byte: u8, + output: &mut EncoderOutput, + repush: bool, + ) -> Result<()> { + for x in output.future_output.iter_mut() { + if *x == FutureOutputType::Reserved(self.id) { + *x = FutureOutputType::Commit(byte); + break; + } + } + if repush { + output + .future_output + .push_back(FutureOutputType::Reserved(self.id)); + } + + // empty out everything that is ready to be written + output.write_ready_bytes()?; + + Ok(()) + } + + fn flush_bits(&mut self, writer: &mut EncoderOutput) -> Result<()> { + while 0 == ((self.xl ^ self.xr) & 0xFF00_0000) { + let byte = (self.xr >> 24) as u8; + + self.flush_byte(byte, writer, true)?; + + self.xl <<= 8; + self.xr = (self.xr << 8) | 0x0000_00FF; + } + Ok(()) + } + + pub fn put( + &mut self, + bit: bool, + branch: &mut VP8Context, + writer: &mut EncoderOutput, + ) -> Result<()> { + let xm = self.xl + ((self.xr - self.xl) >> 8) * u32::from(branch.get_probability().get()); + + // left/lower part of the interval corresponds to zero + + if !bit { + self.xr = xm; + } else { + self.xl = xm + 1; + } + + branch.record_and_update_bit(bit); + + self.flush_bits(writer) + } + + pub fn finish(&mut self, writer: &mut EncoderOutput) -> Result<()> { + let mut byte = (self.xr >> 24) as u8; + + for _ in 0..4 { + self.flush_byte(byte, writer, false)?; + byte = 0; + } + + Ok(()) + } +} + +#[test] +fn bypass_byte() { + use byteorder::ReadBytesExt; + + let mut output = EncoderOutput { + future_output: VecDeque::new(), + output: Vec::new(), + }; + + { + let mut context = VP8Context::default(); + + let mut encoder = Fpaq0EncoderParallel::new(&mut output, 0); + for i in 0i32..1024 { + if i > 10 && i < 20 { + output.write_bypass_byte(i as u8).unwrap(); + } + encoder + .put((i % 47) != 0, &mut context, &mut output) + .unwrap(); + } + + encoder.finish(&mut output).unwrap(); + assert!(output.future_output.is_empty()); + } + + { + let mut context = VP8Context::default(); + let mut reader = std::io::Cursor::new(&output.output); + + let mut decoder = Fpaq0DecoderParallel::new(&mut reader).unwrap(); + for i in 0..1024 { + if i > 10 && i < 20 { + assert_eq!(reader.read_u8().unwrap(), i as u8); + } + assert_eq!( + decoder.get(&mut context, &mut reader).unwrap(), + (i % 47) != 0 + ); + } + } +} + +#[test] +fn bypass_dual() { + let mut output = EncoderOutput { + future_output: VecDeque::new(), + output: Vec::new(), + }; + { + let mut context1 = VP8Context::default(); + let mut context2 = VP8Context::default(); + let mut context3 = VP8Context::default(); + + let mut encoder1 = Fpaq0EncoderParallel::new(&mut output, 0); + let mut encoder2 = Fpaq0EncoderParallel::new(&mut output, 1); + let mut encoder3 = Fpaq0EncoderParallel::new(&mut output, 2); + for i in 0i32..1024 { + encoder1 + .put((i % 47) != 0, &mut context1, &mut output) + .unwrap(); + encoder2 + .put(i % 3 != 0, &mut context2, &mut output) + .unwrap(); + encoder3 + .put(i % 5 != 0, &mut context3, &mut output) + .unwrap(); + } + + encoder1.finish(&mut output).unwrap(); + encoder2.finish(&mut output).unwrap(); + encoder3.finish(&mut output).unwrap(); + } + + { + let mut context1 = VP8Context::default(); + let mut context2 = VP8Context::default(); + let mut context3 = VP8Context::default(); + + let mut reader = std::io::Cursor::new(&output.output); + + let mut decoder1 = Fpaq0DecoderParallel::new(&mut reader).unwrap(); + let mut decoder2 = Fpaq0DecoderParallel::new(&mut reader).unwrap(); + let mut decoder3 = Fpaq0DecoderParallel::new(&mut reader).unwrap(); + for i in 0..1024 { + assert_eq!( + decoder1.get(&mut context1, &mut reader).unwrap(), + (i % 47) != 0 + ); + assert_eq!( + decoder2.get(&mut context2, &mut reader).unwrap(), + (i % 3) != 0 + ); + assert_eq!( + decoder3.get(&mut context3, &mut reader).unwrap(), + (i % 5) != 0 + ); + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 50b660b..50c4b2c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,8 @@ #![forbid(deprecated_in_future)] pub mod debug; +pub mod fpaq0; +pub mod fpaq0parallel; pub mod h265; pub mod perf; pub mod rans32; diff --git a/src/perf.rs b/src/perf.rs index 2bb88e5..a31218b 100644 --- a/src/perf.rs +++ b/src/perf.rs @@ -1,6 +1,7 @@ use std::io::Cursor; use crate::{ + fpaq0::{Fpaq0Decoder, Fpaq0Encoder}, h265::{H265Reader, H265Writer}, rans32::{RansReader32, RansWriter32}, traits::{CabacReader, CabacWriter, GetInnerBuffer}, @@ -62,7 +63,7 @@ fn generic_get_pattern<'a, C: Default, CR: CabacReader, FR: FnOnce(&'a [u8]) fn generic_test_pattern(get: fn(&[bool], &[u8]) -> Box<[bool]>, put: fn(&[bool]) -> Vec) { let mut pattern = Vec::new(); rand::Rng::sample_iter(rand::thread_rng(), &rand::distributions::Standard) - .take(65535) + .take(200) .for_each(|x| pattern.push(x)); let encoded = put(&pattern); @@ -173,3 +174,22 @@ fn h264_test_pattern() { generic_test_pattern(h265_get_pattern, h265_put_pattern); generic_test_pattern(h265_get_pattern_bypass, h265_put_pattern_bypass); } + +#[inline(never)] +#[allow(dead_code)] +pub fn fpaq_put_pattern(pattern: &[bool]) -> Vec { + generic_put_pattern(false, pattern, || Fpaq0Encoder::new(Vec::new())) +} + +#[inline(never)] +#[allow(dead_code)] +pub fn fpaq_get_pattern(pattern: &[bool], source: &[u8]) -> Box<[bool]> { + generic_get_pattern(false, pattern, source, |vec| { + Fpaq0Decoder::new(Cursor::new(vec)).unwrap() + }) +} + +#[test] +fn fpaq_test_pattern() { + generic_test_pattern(fpaq_get_pattern, fpaq_put_pattern); +} diff --git a/tests/tests.rs b/tests/tests.rs index a5179be..0fc6859 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -1,5 +1,6 @@ use std::io::Cursor; +use cabac::fpaq0::{Fpaq0Decoder, Fpaq0Encoder}; use cabac::h265::{H265Reader, H265Writer}; use cabac::rans32::{RansReader32, RansWriter32}; use cabac::traits::{CabacReader, CabacWriter, GetInnerBuffer}; @@ -64,6 +65,8 @@ fn end_to_end< } } } + + println!("scheme: {} len: {}", scheme, writer.inner_buffer().len()); } fn test_seq_vp8(seq: &[Seq]) { @@ -97,6 +100,13 @@ fn test_all(seq: &[Seq]) { test_seq_vp8(seq); test_seq_h265(seq); test_seq_rans(seq); + + end_to_end( + seq, + || Fpaq0Encoder::new(Vec::new()), + |buf| Fpaq0Decoder::new(Cursor::new(buf)).unwrap(), + "Fpaq", + ); } #[test] @@ -149,10 +159,10 @@ fn test_random_sequences() { 0.001, 0.01, 0.1, 0.11, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.9, 0.91, 0.99, 0.999, 0.9999, 1.0, ]; - for _ in 1..1000 { + for _ in 1..10 { let mut seq = Vec::new(); - for _ in 0..1000 { + for _ in 0..100000 { let ctx = rng.gen_range(0..16); seq.push(match rng.gen_range(0..4) {