From 76ef68aaade0bbfeed264173bcd13731a2a04280 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mak=20Naze=C4=8Di=C4=87-Andrlon?= Date: Mon, 20 Jan 2025 13:38:09 +1100 Subject: [PATCH 1/2] Add into_inner() on Rust writer --- rust/src/io_utils.rs | 4 ++ rust/src/write.rs | 126 +++++++++++++++++++++++++++++++++++-------- 2 files changed, 107 insertions(+), 23 deletions(-) diff --git a/rust/src/io_utils.rs b/rust/src/io_utils.rs index 38330df72..15c94505b 100644 --- a/rust/src/io_utils.rs +++ b/rust/src/io_utils.rs @@ -37,6 +37,10 @@ impl CountingCrcWriter { pub fn finalize(self) -> (W, Hasher) { (self.inner, self.hasher) } + + pub fn current_checksum(&self) -> u32 { + self.hasher.clone().finalize() + } } impl Write for CountingCrcWriter { diff --git a/rust/src/write.rs b/rust/src/write.rs index 2b2145635..2d3c1089e 100644 --- a/rust/src/write.rs +++ b/rust/src/write.rs @@ -311,6 +311,7 @@ struct SchemaContent<'a> { /// and check for errors when done; otherwise the result will be unwrapped on drop. pub struct Writer { writer: Option>, + is_finished: bool, chunk_mode: ChunkMode, options: WriteOptions, schemas: BiHashMap, u16>, @@ -363,6 +364,7 @@ impl Writer { Ok(Self { writer: Some(WriteMode::Raw(writer)), + is_finished: false, options: opts, chunk_mode, schemas: Default::default(), @@ -667,10 +669,8 @@ impl Writer { ) -> McapResult<()> { self.finish_chunk()?; - let prev_writer = self.writer.take().expect(Self::WHERE_WRITER); - - let WriteMode::Raw(w) = prev_writer else { - panic!( + let WriteMode::Raw(w) = self.writer.take().expect(Self::WHERE_WRITER) else { + unreachable!( "since finish_chunk was called, write mode is guaranteed to be raw at this point" ); }; @@ -792,25 +792,23 @@ impl Writer { /// Starts a new chunk if we haven't done so already. fn start_chunk(&mut self) -> McapResult<&mut ChunkWriter> { + assert!(!self.is_finished, "{}", Self::WHERE_WRITER); + // It is not possible to start writing a chunk if we're still writing an attachment. Return // an error instead. if let Some(WriteMode::Attachment(..)) = self.writer { return Err(McapError::AttachmentNotInProgress); } - // Some Rust tricky: we can't move the writer out of self.writer, - // leave that empty for a bit, and then replace it with a ChunkWriter. - // (That would leave it in an unspecified state if we bailed here!) - // Instead briefly swap it out for a null writer while we set up the chunker - // The writer will only be None if finish() was called. assert!( self.options.use_chunks, "Trying to write to a chunk when chunking is disabled" ); - let prev_writer = self.writer.take().expect(Self::WHERE_WRITER); - - self.writer = Some(match prev_writer { + // Rust forbids moving values out of a &mut reference. We made self.writer an Option so we + // can work around this by using take() to temporarily replace it with None while we + // construct the ChunkWriter. + self.writer = Some(match self.writer.take().expect(Self::WHERE_WRITER) { WriteMode::Raw(w) => { // It's chunkin time. WriteMode::Chunk(ChunkWriter::new( @@ -832,16 +830,15 @@ impl Writer { /// Finish the current chunk, if we have one. fn finish_chunk(&mut self) -> McapResult<&mut CountingCrcWriter> { + assert!(!self.is_finished, "{}", Self::WHERE_WRITER); // If we're currently writing an attachment then we're not writing a chunk. Return an // error instead. if let Some(WriteMode::Attachment(..)) = self.writer { return Err(McapError::AttachmentNotInProgress); } - // See above - let prev_writer = self.writer.take().expect(Self::WHERE_WRITER); - - self.writer = Some(match prev_writer { + // See start_chunk() for why we use take() here. + self.writer = Some(match self.writer.take().expect(Self::WHERE_WRITER) { WriteMode::Chunk(c) => { let (w, mode, index) = c.finish()?; self.chunk_indexes.push(index); @@ -862,7 +859,7 @@ impl Writer { /// /// Subsequent calls to other methods will panic. pub fn finish(&mut self) -> McapResult<()> { - if self.writer.is_none() { + if self.is_finished { // We already called finish(). // Maybe we're dropping after the user called it? return Ok(()); @@ -870,20 +867,21 @@ impl Writer { // Finish any chunk we were working on and update stats, indexes, etc. self.finish_chunk()?; + self.is_finished = true; // Grab the writer - self.writer becoming None makes subsequent writes fail. - let writer = match self.writer.take() { + let writer = match &mut self.writer { // We called finish_chunk() above, so we're back to raw writes for // the summary section. Some(WriteMode::Raw(w)) => w, _ => unreachable!(), }; - let (mut writer, data_section_crc) = writer.finalize(); - let data_section_crc = data_section_crc.finalize(); + let data_section_crc = writer.current_checksum(); + let writer = writer.get_mut(); // We're done with the data secton! write_record( - &mut writer, + writer, &Record::DataEnd(records::DataEnd { data_section_crc }), )?; @@ -952,8 +950,8 @@ impl Writer { } // Write all schemas. - let schemas_start = summary_start; if self.options.repeat_schemas && !all_schemas.is_empty() { + let schemas_start: u64 = summary_start; for schema in all_schemas.iter() { write_record(&mut ccw, schema)?; } @@ -1053,7 +1051,7 @@ impl Writer { ccw.write_u64::(summary_start)?; ccw.write_u64::(summary_offset_start)?; - let (mut writer, summary_crc) = ccw.finalize(); + let (writer, summary_crc) = ccw.finalize(); writer.write_u32::(summary_crc.finalize())?; @@ -1061,6 +1059,30 @@ impl Writer { writer.flush()?; Ok(()) } + + /// Consumes this writer, returning the underlying stream. Unless [`Self::finish()`] was called + /// first, the underlying stream __will not contain a complete MCAP.__ + /// + /// Use this if you wish to handle any errors returned when the underlying stream is closed. In + /// particular, if using [`std::fs::File`], you may wish to call [`std::fs::File::sync_all()`] + /// to ensure all data was sent to the filesystem. + pub fn into_inner(mut self) -> W { + self.is_finished = true; + match self.writer.take().expect(Self::WHERE_WRITER) { + WriteMode::Raw(w) => w.finalize().0, + WriteMode::Attachment(w) => w.writer.finalize().0.finalize().0, + WriteMode::Chunk(w) => { + w.compressor + .finalize() + .0 + .finish() + .expect("compression error") + .finalize() + .0 + .inner + } + } + } } impl Drop for Writer { @@ -1510,4 +1532,62 @@ mod tests { }; assert!(matches!(too_many, McapError::TooManySchemas)); } + + #[test] + #[should_panic(expected = "Trying to write a record on a finished MCAP")] + fn panics_if_write_called_after_finish() { + let file = std::io::Cursor::new(Vec::new()); + let mut writer = Writer::new(file).expect("failed to construct writer"); + writer.finish().expect("failed to finish writer"); + + let custom_channel = std::sync::Arc::new(crate::Channel { + id: 1, + topic: "chat".into(), + message_encoding: "json".into(), + metadata: BTreeMap::new(), + schema: None, + }); + + writer + .write(&crate::Message { + channel: custom_channel.clone(), + sequence: 0, + log_time: 0, + publish_time: 0, + data: Cow::Owned(Vec::new()), + }) + .expect("could not write message"); + } + + #[test] + fn writes_message_and_checks_stream_length() { + let file = std::io::Cursor::new(Vec::new()); + let mut writer = Writer::new(file).expect("failed to construct writer"); + + let custom_channel = std::sync::Arc::new(crate::Channel { + id: 1, + topic: "chat".into(), + message_encoding: "json".into(), + metadata: BTreeMap::new(), + schema: None, + }); + + writer + .write(&crate::Message { + channel: custom_channel.clone(), + sequence: 0, + log_time: 0, + publish_time: 0, + data: Cow::Owned(Vec::new()), + }) + .expect("could not write message"); + + writer.finish().expect("failed to finish writer"); + + let output_len = writer + .into_inner() + .stream_position() + .expect("failed to get stream position"); + assert_eq!(output_len, 487); + } } From d2cfc83b892db71598fcf3a8ffd6343b7dc763e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mak=20Naze=C4=8Di=C4=87-Andrlon?= Date: Mon, 20 Jan 2025 15:03:27 +1100 Subject: [PATCH 2/2] Avoid unwrapping in into_inner() --- rust/src/write.rs | 64 +++++++++++++++++++++++++++++------------------ 1 file changed, 40 insertions(+), 24 deletions(-) diff --git a/rust/src/write.rs b/rust/src/write.rs index 2d3c1089e..be562a5be 100644 --- a/rust/src/write.rs +++ b/rust/src/write.rs @@ -10,6 +10,8 @@ use std::{ use bimap::BiHashMap; use binrw::prelude::*; use byteorder::{WriteBytesExt, LE}; +#[cfg(feature = "zstd")] +use zstd::stream::{raw as zraw, zio}; use crate::{ chunk_sink::{ChunkMode, ChunkSink}, @@ -669,7 +671,7 @@ impl Writer { ) -> McapResult<()> { self.finish_chunk()?; - let WriteMode::Raw(w) = self.writer.take().expect(Self::WHERE_WRITER) else { + let WriteMode::Raw(w) = self.writer.take().expect(Self::WRITER_IS_NONE) else { unreachable!( "since finish_chunk was called, write mode is guaranteed to be raw at this point" ); @@ -786,13 +788,19 @@ impl Writer { Ok(()) } - /// `.expect()` message when we go to write and self.writer is `None`, - /// which should only happen when [`Writer::finish()`] was called. - const WHERE_WRITER: &'static str = "Trying to write a record on a finished MCAP"; + const WRITER_IS_NONE: &'static str = "unreachable: self.writer should never be None"; + + fn assert_not_finished(&self) { + assert!( + !self.is_finished, + "{}", + "Trying to write a record on a finished MCAP" + ); + } /// Starts a new chunk if we haven't done so already. fn start_chunk(&mut self) -> McapResult<&mut ChunkWriter> { - assert!(!self.is_finished, "{}", Self::WHERE_WRITER); + self.assert_not_finished(); // It is not possible to start writing a chunk if we're still writing an attachment. Return // an error instead. @@ -808,7 +816,7 @@ impl Writer { // Rust forbids moving values out of a &mut reference. We made self.writer an Option so we // can work around this by using take() to temporarily replace it with None while we // construct the ChunkWriter. - self.writer = Some(match self.writer.take().expect(Self::WHERE_WRITER) { + self.writer = Some(match self.writer.take().expect(Self::WRITER_IS_NONE) { WriteMode::Raw(w) => { // It's chunkin time. WriteMode::Chunk(ChunkWriter::new( @@ -830,7 +838,7 @@ impl Writer { /// Finish the current chunk, if we have one. fn finish_chunk(&mut self) -> McapResult<&mut CountingCrcWriter> { - assert!(!self.is_finished, "{}", Self::WHERE_WRITER); + self.assert_not_finished(); // If we're currently writing an attachment then we're not writing a chunk. Return an // error instead. if let Some(WriteMode::Attachment(..)) = self.writer { @@ -838,7 +846,7 @@ impl Writer { } // See start_chunk() for why we use take() here. - self.writer = Some(match self.writer.take().expect(Self::WHERE_WRITER) { + self.writer = Some(match self.writer.take().expect(Self::WRITER_IS_NONE) { WriteMode::Chunk(c) => { let (w, mode, index) = c.finish()?; self.chunk_indexes.push(index); @@ -1068,19 +1076,11 @@ impl Writer { /// to ensure all data was sent to the filesystem. pub fn into_inner(mut self) -> W { self.is_finished = true; - match self.writer.take().expect(Self::WHERE_WRITER) { + // Peel away all the layers of the writer to get the underlying stream. + match self.writer.take().expect(Self::WRITER_IS_NONE) { WriteMode::Raw(w) => w.finalize().0, WriteMode::Attachment(w) => w.writer.finalize().0.finalize().0, - WriteMode::Chunk(w) => { - w.compressor - .finalize() - .0 - .finish() - .expect("compression error") - .finalize() - .0 - .inner - } + WriteMode::Chunk(w) => w.compressor.finalize().0.into_inner().finalize().0.inner, } } } @@ -1093,8 +1093,10 @@ impl Drop for Writer { enum Compressor { Null(W), + // zstd's Encoder wrapper doesn't let us get the inner writer without calling finish(), so use + // zio::Writer directly instead. #[cfg(feature = "zstd")] - Zstd(zstd::Encoder<'static, W>), + Zstd(zio::Writer>), #[cfg(feature = "lz4")] Lz4(lz4::Encoder), } @@ -1104,7 +1106,10 @@ impl Compressor { Ok(match self { Compressor::Null(w) => w, #[cfg(feature = "zstd")] - Compressor::Zstd(w) => w.finish()?, + Compressor::Zstd(mut w) => { + w.finish()?; + w.into_inner().0 + } #[cfg(feature = "lz4")] Compressor::Lz4(w) => { let (output, result) = w.finish(); @@ -1113,6 +1118,16 @@ impl Compressor { } }) } + + fn into_inner(self) -> W { + match self { + Compressor::Null(w) => w, + #[cfg(feature = "zstd")] + Compressor::Zstd(w) => w.into_inner().0, + #[cfg(feature = "lz4")] + Compressor::Lz4(w) => w.finish().0, + } + } } impl Write for Compressor { @@ -1200,10 +1215,11 @@ impl ChunkWriter { #[cfg(feature = "zstd")] Some(Compression::Zstd) => { #[allow(unused_mut)] - let mut enc = zstd::Encoder::new(sink, 0)?; + let mut enc = zraw::Encoder::with_dictionary(0, &[])?; + // Enable multithreaded encoding on non-WASM targets. #[cfg(not(target_arch = "wasm32"))] - enc.multithread(num_cpus::get_physical() as u32)?; - Compressor::Zstd(enc) + enc.set_parameter(zraw::CParameter::NbWorkers(num_cpus::get_physical() as u32))?; + Compressor::Zstd(zio::Writer::new(sink, enc)) } #[cfg(feature = "lz4")] Some(Compression::Lz4) => Compressor::Lz4(