diff --git a/Cargo.toml b/Cargo.toml index 74e95354..d6d3616c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,7 @@ rstest = "0.9" walkdir = "2.3" test-generator = "0.3" pretty-hex = "0.2" +memmap = "0.7.0" [profile.release] lto = true diff --git a/examples/read_all_values.rs b/examples/read_all_values.rs index 799f8d66..d20ef267 100644 --- a/examples/read_all_values.rs +++ b/examples/read_all_values.rs @@ -1,34 +1,57 @@ +use ion_rs::binary::non_blocking::raw_binary_reader::RawBinaryBufferReader; use ion_rs::raw_reader::RawStreamItem; use ion_rs::result::IonResult; -use ion_rs::StreamReader; -use ion_rs::{IonDataSource, IonType, RawBinaryReader}; +use ion_rs::RawReader; +use ion_rs::{IonType, RawBinaryReader}; +use memmap::MmapOptions; use std::fs::File; use std::process::exit; fn main() -> IonResult<()> { let args: Vec = std::env::args().collect(); - let path = args.get(1).unwrap_or_else(|| { + let mode = args.get(1).unwrap_or_else(|| { eprintln!("USAGE:\n\n {} [Binary Ion file]\n", args.get(0).unwrap()); - eprintln!("No input file was specified."); + eprintln!("No mode was specified."); exit(1); }); + let path = args.get(2).unwrap_or_else(|| { + eprintln!("USAGE:\n\n {} [Binary Ion file]\n", args.get(0).unwrap()); + eprintln!("No input file was specified."); + exit(2); + }); + let file = File::open(path).unwrap(); + + // This example uses `mmap` so we can use either the blocking reader (which reads from an + // io::BufRead) or the non-blocking reader (which reads from an AsRef<[u8]>). + let mmap = unsafe { MmapOptions::new().map(&file).unwrap() }; + + // Treat the mmap as a byte array. + let ion_data: &[u8] = &mmap[..]; + + if mode == "blocking" { + let mut reader = RawBinaryReader::new(ion_data); + let number_of_values = read_all_values(&mut reader)?; + println!("Blocking: read {} values", number_of_values); + } else if mode == "nonblocking" { + let mut reader = RawBinaryBufferReader::new(ion_data); + let number_of_values = read_all_values(&mut reader)?; + println!("Non-blocking: read {} values", number_of_values); + } else { + eprintln!("Unsupported `mode`: {}.", mode); + exit(3); + } - let file = File::open(path)?; - let buf_reader = std::io::BufReader::new(file); - let mut cursor = RawBinaryReader::new(buf_reader); - let number_of_values = read_all_values(&mut cursor)?; - println!("Read {} values", number_of_values); Ok(()) } // Visits each value in the stream recursively, reading each scalar into a native Rust type. // Prints the total number of values read upon completion. -fn read_all_values(cursor: &mut RawBinaryReader) -> IonResult { +fn read_all_values(reader: &mut R) -> IonResult { use IonType::*; use RawStreamItem::{Nothing, Null as NullValue, Value, VersionMarker}; let mut count: usize = 0; loop { - match cursor.next()? { + match reader.next()? { VersionMarker(_major, _minor) => {} NullValue(_ion_type) => { count += 1; @@ -37,39 +60,39 @@ fn read_all_values(cursor: &mut RawBinaryReader) -> IonResu Value(ion_type) => { count += 1; match ion_type { - Struct | List | SExpression => cursor.step_in()?, + Struct | List | SExpression => reader.step_in()?, String => { - let _text = cursor.map_string(|_s| ())?; + let _text = reader.map_string(|_s| ())?; } Symbol => { - let _symbol_id = cursor.read_symbol()?; + let _symbol_id = reader.read_symbol()?; } Integer => { - let _int = cursor.read_i64()?; + let _int = reader.read_i64()?; } Float => { - let _float = cursor.read_f64()?; + let _float = reader.read_f64()?; } Decimal => { - let _decimal = cursor.read_decimal()?; + let _decimal = reader.read_decimal()?; } Timestamp => { - let _timestamp = cursor.read_timestamp()?; + let _timestamp = reader.read_timestamp()?; } Boolean => { - let _boolean = cursor.read_bool()?; + let _boolean = reader.read_bool()?; } Blob => { - let _blob = cursor.map_blob(|_b| ())?; + let _blob = reader.map_blob(|_b| ())?; } Clob => { - let _clob = cursor.map_clob(|_c| ())?; + let _clob = reader.map_clob(|_c| ())?; } Null => {} } } - Nothing if cursor.depth() > 0 => { - cursor.step_out()?; + Nothing if reader.depth() > 0 => { + reader.step_out()?; } _ => break, } diff --git a/src/binary/int.rs b/src/binary/int.rs index ba3a01c2..200b0af2 100644 --- a/src/binary/int.rs +++ b/src/binary/int.rs @@ -30,6 +30,14 @@ pub struct DecodedInt { } impl DecodedInt { + pub(crate) fn new(value: Integer, is_negative: bool, size_in_bytes: usize) -> Self { + DecodedInt { + size_in_bytes, + value, + is_negative, + } + } + /// Reads an Int with `length` bytes from the provided data source. pub fn read(data_source: &mut R, length: usize) -> IonResult { if length == 0 { diff --git a/src/binary/mod.rs b/src/binary/mod.rs index 79d9ad0b..f865dc84 100644 --- a/src/binary/mod.rs +++ b/src/binary/mod.rs @@ -9,12 +9,13 @@ pub mod decimal; mod header; pub mod int; mod nibbles; +pub mod non_blocking; pub(crate) mod raw_binary_reader; pub mod raw_binary_writer; pub mod timestamp; mod type_code; pub mod uint; -mod var_int; -mod var_uint; +pub mod var_int; +pub mod var_uint; pub use type_code::IonTypeCode; diff --git a/src/binary/nibbles.rs b/src/binary/nibbles.rs index 2eace3e4..d2306bfe 100644 --- a/src/binary/nibbles.rs +++ b/src/binary/nibbles.rs @@ -5,7 +5,7 @@ const MAX_NIBBLE_VALUE: u8 = 15; const NIBBLE_SIZE_IN_BITS: u8 = 4; /// Given a byte, will return a tuple containing the values of its left and right nibbles. -pub(crate) fn nibbles_from_byte(byte: u8) -> (u8, u8) { +pub(crate) const fn nibbles_from_byte(byte: u8) -> (u8, u8) { let left = byte >> NIBBLE_SIZE_IN_BITS; let right = byte & 0b1111; (left, right) diff --git a/src/binary/non_blocking/binary_buffer.rs b/src/binary/non_blocking/binary_buffer.rs new file mode 100644 index 00000000..3ac7241b --- /dev/null +++ b/src/binary/non_blocking/binary_buffer.rs @@ -0,0 +1,816 @@ +use crate::binary::constants::v1_0::{length_codes, IVM}; +use crate::binary::int::DecodedInt; +use crate::binary::non_blocking::type_descriptor::{ + Header, TypeDescriptor, ION_1_0_TYPE_DESCRIPTORS, +}; +use crate::binary::uint::DecodedUInt; +use crate::binary::var_int::VarInt; +use crate::binary::var_uint::VarUInt; +use crate::result::{decoding_error, incomplete_data_error, incomplete_data_error_raw}; +use crate::types::integer::UInteger; +use crate::{Integer, IonResult, IonType}; +use num_bigint::{BigInt, BigUint, Sign}; +use std::io::Read; +use std::mem; + +// This limit is used for stack-allocating buffer space to encode/decode UInts. +const UINT_STACK_BUFFER_SIZE: usize = 16; +// This number was chosen somewhat arbitrarily and could be lifted if a use case demands it. +const MAX_UINT_SIZE_IN_BYTES: usize = 2048; + +// This limit is used for stack-allocating buffer space to encode/decode Ints. +const INT_STACK_BUFFER_SIZE: usize = 16; +// This number was chosen somewhat arbitrarily and could be lifted if a use case demands it. +const MAX_INT_SIZE_IN_BYTES: usize = 2048; + +/// A stack-allocated wrapper around an `AsRef<[u8]>` that provides methods to read Ion's +/// encoding primitives. +/// +/// When the wrapped type is a `Vec`, data can be appended to the buffer between read +/// operations. +#[derive(Debug, PartialEq)] +pub(crate) struct BinaryBuffer> { + data: A, + start: usize, + end: usize, + total_consumed: usize, +} + +impl> BinaryBuffer { + /// Constructs a new BinaryBuffer that wraps `data`. + #[inline] + pub fn new(data: A) -> BinaryBuffer { + let end = data.as_ref().len(); + BinaryBuffer { + data, + start: 0, + end, + total_consumed: 0, + } + } + + /// Creates an independent view of the `BinaryBuffer`'s data. The `BinaryBuffer` that is + /// returned tracks its own position and consumption without affecting the original. + pub fn slice(&self) -> BinaryBuffer<&A> { + BinaryBuffer { + data: &self.data, + start: self.start, + end: self.end, + total_consumed: self.total_consumed, + } + } + + /// Returns a slice containing all of the buffer's remaining bytes. + pub fn bytes(&self) -> &[u8] { + &self.data.as_ref()[self.start..self.end] + } + + /// Gets a slice from the buffer starting at `offset` and ending at `offset + length`. + /// The caller must check that the buffer contains `length + offset` bytes prior + /// to calling this method. + pub fn bytes_range(&self, offset: usize, length: usize) -> &[u8] { + let from = self.start + offset; + let to = from + length; + &self.data.as_ref()[from..to] + } + + /// Returns the number of bytes that have been marked as read either via the [consume] method + /// or one of the `read_*` methods. + pub fn total_consumed(&self) -> usize { + self.total_consumed + } + + /// Returns the number of unread bytes left in the buffer. + pub fn remaining(&self) -> usize { + self.end - self.start + } + + /// Returns `true` if there are no bytes remaining in the buffer. Otherwise, returns `false`. + pub fn is_empty(&self) -> bool { + self.start == self.end + } + + /// If the buffer is not empty, returns `Some(_)` containing the next byte in the buffer. + /// Otherwise, returns `None`. + pub fn peek_next_byte(&self) -> Option { + self.bytes().get(0).copied() + } + + /// If there are at least `n` bytes left in the buffer, returns `Some(_)` containing a slice + /// with the first `n` bytes. Otherwise, returns `None`. + pub fn peek_n_bytes(&self, n: usize) -> Option<&[u8]> { + self.data.as_ref().get(self.start..self.start + n) + } + + /// Marks the first `num_bytes_to_consume` bytes in the buffer as having been read. + /// + /// After data has been inspected using the `peek` methods, those bytes can be marked as read + /// by calling the `consume` method. + /// + /// Note that the various `read_*` methods to parse Ion encoding primitives automatically + /// consume the bytes they read if they are successful. + #[inline] + pub fn consume(&mut self, num_bytes_to_consume: usize) { + // This assertion is always run during testing but is removed in the release build. + debug_assert!(num_bytes_to_consume <= self.remaining()); + self.start += num_bytes_to_consume; + self.total_consumed += num_bytes_to_consume; + } + + /// Reads (but does not consume) the first byte in the buffer and returns it as a + /// [TypeDescriptor]. + pub fn peek_type_descriptor(&self) -> IonResult { + if self.is_empty() { + return incomplete_data_error("a type descriptor", self.total_consumed()); + } + let next_byte = self.data.as_ref()[self.start]; + Ok(ION_1_0_TYPE_DESCRIPTORS[next_byte as usize]) + } + + /// Reads the first four bytes in the buffer as an Ion version marker. If it is successful, + /// returns an `Ok(_)` containing a `(major, minor)` version tuple and consumes the + /// source bytes. + /// + /// See: https://amzn.github.io/ion-docs/docs/binary.html#value-streams + pub fn read_ivm(&mut self) -> IonResult<(u8, u8)> { + let bytes = self + .peek_n_bytes(IVM.len()) + .ok_or_else(|| incomplete_data_error_raw("an IVM", self.total_consumed()))?; + + match bytes { + [0xE0, major, minor, 0xEA] => { + let version = (*major, *minor); + self.consume(IVM.len()); + Ok(version) + } + invalid_ivm => decoding_error(format!("invalid IVM: {:?}", invalid_ivm)), + } + } + + /// Reads a `VarUInt` encoding primitive from the beginning of the buffer. If it is successful, + /// returns an `Ok(_)` containing its [VarUInt] representation and consumes the source bytes. + /// + /// See: https://amzn.github.io/ion-docs/docs/binary.html#varuint-and-varint-fields + pub fn read_var_uint(&mut self) -> IonResult { + const BITS_PER_ENCODED_BYTE: usize = 7; + const STORAGE_SIZE_IN_BITS: usize = mem::size_of::() * 8; + const MAX_ENCODED_SIZE_IN_BYTES: usize = STORAGE_SIZE_IN_BITS / BITS_PER_ENCODED_BYTE; + + const LOWER_7_BITMASK: u8 = 0b0111_1111; + const HIGHEST_BIT_VALUE: u8 = 0b1000_0000; + + let mut magnitude: usize = 0; + let mut encoded_size_in_bytes = 0; + + for byte in self.bytes().iter().copied() { + encoded_size_in_bytes += 1; + magnitude <<= 7; // Shifts 0 to 0 in the first iteration + let lower_seven = (LOWER_7_BITMASK & byte) as usize; + magnitude |= lower_seven; + if byte >= HIGHEST_BIT_VALUE { + // This is the final byte. + // Make sure we haven't exceeded the configured maximum size + if encoded_size_in_bytes > MAX_ENCODED_SIZE_IN_BYTES { + return Self::value_too_large( + "a VarUInt", + encoded_size_in_bytes, + MAX_ENCODED_SIZE_IN_BYTES, + ); + } + self.consume(encoded_size_in_bytes); + return Ok(VarUInt::new(magnitude, encoded_size_in_bytes)); + } + } + + incomplete_data_error("a VarUInt", self.total_consumed() + encoded_size_in_bytes) + } + + /// Reads a `VarInt` encoding primitive from the beginning of the buffer. If it is successful, + /// returns an `Ok(_)` containing its [VarInt] representation and consumes the source bytes. + /// + /// See: https://amzn.github.io/ion-docs/docs/binary.html#varuint-and-varint-fields + pub fn read_var_int(&mut self) -> IonResult { + const BITS_PER_ENCODED_BYTE: usize = 7; + const STORAGE_SIZE_IN_BITS: usize = mem::size_of::() * 8; + const MAX_ENCODED_SIZE_IN_BYTES: usize = STORAGE_SIZE_IN_BITS / BITS_PER_ENCODED_BYTE; + + const LOWER_6_BITMASK: u8 = 0b0011_1111; + const LOWER_7_BITMASK: u8 = 0b0111_1111; + const HIGHEST_BIT_VALUE: u8 = 0b1000_0000; + + const BITS_PER_BYTE: usize = 8; + const BITS_PER_U64: usize = mem::size_of::() * BITS_PER_BYTE; + + // Unlike VarUInt's encoding, the first byte in a VarInt is a special case because + // bit #6 (0-indexed, from the right) indicates whether the value is positive (0) or + // negative (1). + + if self.is_empty() { + return incomplete_data_error("a VarInt", self.total_consumed()); + } + let first_byte: u8 = self.peek_next_byte().unwrap(); + let no_more_bytes: bool = first_byte >= 0b1000_0000; // If the first bit is 1, we're done. + let is_negative: bool = (first_byte & 0b0100_0000) == 0b0100_0000; + let sign: i64 = if is_negative { -1 } else { 1 }; + let mut magnitude = (first_byte & 0b0011_1111) as i64; + + if no_more_bytes { + self.consume(1); + return Ok(VarInt::new(magnitude * sign, is_negative, 1)); + } + + let mut encoded_size_in_bytes = 1; + // Whether we found the terminating byte in this buffer. + let mut terminated = false; + + for byte in self.bytes()[1..].iter().copied() { + let lower_seven = (0b0111_1111 & byte) as i64; + magnitude <<= 7; + magnitude |= lower_seven; + encoded_size_in_bytes += 1; + if byte >= 0b1000_0000 { + terminated = true; + break; + } + } + + if !terminated { + return incomplete_data_error( + "a VarInt", + self.total_consumed() + encoded_size_in_bytes, + ); + } + + if encoded_size_in_bytes > MAX_ENCODED_SIZE_IN_BYTES { + return decoding_error(format!( + "Found a {}-byte VarInt. Max supported size is {} bytes.", + encoded_size_in_bytes, MAX_ENCODED_SIZE_IN_BYTES + )); + } + + self.consume(encoded_size_in_bytes); + Ok(VarInt::new( + magnitude * sign, + is_negative, + encoded_size_in_bytes, + )) + } + + /// Reads the first `length` bytes from the buffer as a `UInt` encoding primitive. If it is + /// successful, returns an `Ok(_)` containing its [DecodedUInt] representation and consumes the + /// source bytes. + /// + /// See: https://amzn.github.io/ion-docs/docs/binary.html#uint-and-int-fields + pub fn read_uint(&mut self, length: usize) -> IonResult { + if length <= mem::size_of::() { + return self.read_small_uint(length); + } + + // The UInt is too large to fit in a u64; read it as a BigUInt instead. + self.read_big_uint(length) + } + + /// Reads the first `length` bytes from the buffer as a `UInt`. The caller must confirm that + /// `length` is small enough to fit in a `usize`. + #[inline] + fn read_small_uint(&mut self, length: usize) -> IonResult { + let uint_bytes = self + .peek_n_bytes(length) + .ok_or_else(|| incomplete_data_error_raw("a UInt", self.total_consumed()))?; + let mut magnitude: u64 = 0; + for &byte in uint_bytes { + let byte = u64::from(byte); + magnitude <<= 8; + magnitude |= byte; + } + self.consume(length); + Ok(DecodedUInt::new(UInteger::U64(magnitude), length)) + } + + /// Reads the first `length` bytes from the buffer as a `UInt`. If `length` is small enough + /// that the value can fit in a `usize`, it is strongly recommended that you use + /// `read_small_uint` instead as it will be much faster. + #[inline(never)] + // This method performs allocations and its generated assembly is rather large. Isolating its + // logic in a separate method that is never inlined keeps `read_uint` (its caller) small enough + // to inline. This is important as `read_uint` is on the hot path for most Ion streams. + fn read_big_uint(&mut self, length: usize) -> IonResult { + if length > MAX_UINT_SIZE_IN_BYTES { + return Self::value_too_large("a Uint", length, MAX_UINT_SIZE_IN_BYTES); + } + + let uint_bytes = self + .peek_n_bytes(length) + .ok_or_else(|| incomplete_data_error_raw("a UInt", self.total_consumed()))?; + + let magnitude = BigUint::from_bytes_be(uint_bytes); + self.consume(length); + Ok(DecodedUInt::new(UInteger::BigUInt(magnitude), length)) + } + + #[inline(never)] + // This method is inline(never) because it is rarely invoked and its allocations/formatting + // compile to a non-trivial number of instructions. + fn value_too_large(label: &str, length: usize, max_length: usize) -> IonResult { + decoding_error(format!( + "found {} that was too large; size = {}, max size = {}", + label, length, max_length + )) + } + + /// Reads the first `length` bytes from the buffer as an `Int` encoding primitive. If it is + /// successful, returns an `Ok(_)` containing its [DecodedInt] representation and consumes the + /// source bytes. + /// + /// See: https://amzn.github.io/ion-docs/docs/binary.html#uint-and-int-fields + pub fn read_int(&mut self, length: usize) -> IonResult { + if length == 0 { + return Ok(DecodedInt::new(Integer::I64(0), false, 0)); + } else if length > MAX_INT_SIZE_IN_BYTES { + return decoding_error(format!( + "Found a {}-byte Int. Max supported size is {} bytes.", + length, MAX_INT_SIZE_IN_BYTES + )); + } + + let int_bytes = self.peek_n_bytes(length).ok_or_else(|| { + incomplete_data_error_raw("an Int encoding primitive", self.total_consumed()) + })?; + + let mut is_negative: bool = false; + + let value = if length <= mem::size_of::() { + // This Int will fit in an i64. + let first_byte: i64 = i64::from(int_bytes[0]); + let sign: i64 = if first_byte & 0b1000_0000 == 0 { + 1 + } else { + is_negative = true; + -1 + }; + let mut magnitude: i64 = first_byte & 0b0111_1111; + for &byte in &int_bytes[1..] { + let byte = i64::from(byte); + magnitude <<= 8; + magnitude |= byte; + } + Integer::I64(sign * magnitude) + } else { + // This Int is too big for an i64, we'll need to use a BigInt + let value = if int_bytes[0] & 0b1000_0000 == 0 { + BigInt::from_bytes_be(Sign::Plus, int_bytes) + } else { + is_negative = true; + // The leading sign bit is the only part of the input that can't be considered + // unsigned, big-endian integer bytes. We need to make our own copy of the input + // so we can flip that bit back to a zero before calling `from_bytes_be`. + let mut owned_int_bytes = Vec::from(int_bytes); + owned_int_bytes[0] &= 0b0111_1111; + BigInt::from_bytes_be(Sign::Minus, owned_int_bytes.as_slice()) + }; + + Integer::BigInt(value) + }; + self.consume(length); + Ok(DecodedInt::new(value, is_negative, length)) + } + + /// Reads a `NOP` encoding primitive from the buffer. If it is successful, returns an `Ok(_)` + /// containing the number of bytes that were consumed. + /// + /// See: https://amzn.github.io/ion-docs/docs/binary.html#nop-pad + #[inline(never)] + // NOP padding is not widely used in Ion 1.0, in part because many writer implementations do not + // expose the ability to write them. As such, this method has been marked `inline(never)` to + // allow the hot path to be better optimized. + pub fn read_nop_pad(&mut self) -> IonResult { + let type_descriptor = self.peek_type_descriptor()?; + // Advance beyond the type descriptor + self.consume(1); + // If the type descriptor says we should skip more bytes, skip them. + let length = self.read_length(type_descriptor.length_code)?; + if self.remaining() < length.value() { + return incomplete_data_error("a NOP", self.total_consumed()); + } + self.consume(length.value()); + Ok(1 + length.size_in_bytes() + length.value()) + } + + /// Interprets the length code in the provided [Header]; if necessary, will read more bytes + /// from the buffer to interpret as the value's length. If it is successful, returns an `Ok(_)` + /// containing a [VarUInt] representation of the value's length and consumes any additional + /// bytes read. If no additional bytes were read, the returned `VarUInt`'s `size_in_bytes()` + /// method will return `0`. + pub fn read_value_length(&mut self, header: Header) -> IonResult { + use IonType::*; + // Some type-specific `length` field overrides + let length_code = match header.ion_type { + // Null (0x0F) and Boolean (0x10, 0x11) are the only types that don't have/use a `length` + // field; the header contains the complete value. + Null | Boolean => 0, + // If a struct has length = 1, its fields are ordered and the actual length follows. + // For the time being, this reader does not have any special handling for this case. + // Use `0xE` (14) as the length code instead so the call to `read_length` below + // consumes a VarUInt. + Struct if header.length_code == 1 => length_codes::VAR_UINT, + // For any other type, use the header's declared length code. + _ => header.length_code, + }; + + // Read the length, potentially consuming a VarUInt in the process. + let length = self.read_length(length_code)?; + + // After we get the length, perform some type-specific validation. + match header.ion_type { + Float => match header.length_code { + 0 | 4 | 8 | 15 => {} + _ => return decoding_error("found a float with an illegal length code"), + }, + Timestamp if !header.is_null() && length.value() <= 1 => { + return decoding_error("found a timestamp with length <= 1") + } + Struct if header.length_code == 1 && length.value() == 0 => { + return decoding_error("found an empty ordered struct") + } + _ => {} + }; + + Ok(length) + } + + /// Interprets a type descriptor's `L` nibble (length) in the way used by most Ion types. + /// + /// If `L` is... + /// * `f`: the value is a typed `null` and its length is `0`. + /// * `e`: the length is encoded as a `VarUInt` that follows the type descriptor. + /// * anything else: the `L` represents the actual length. + /// + /// If successful, returns an `Ok(_)` that contains the [VarUInt] representation + /// of the value's length and consumes any additional bytes read. + pub fn read_length(&mut self, length_code: u8) -> IonResult { + let length = match length_code { + length_codes::NULL => VarUInt::new(0, 0), + length_codes::VAR_UINT => self.read_var_uint()?, + magnitude => VarUInt::new(magnitude as usize, 0), + }; + + Ok(length) + } +} + +/// These methods are only available to `BinaryBuffer`s that wrap a `Vec`. That is: buffers +/// that own a growable array into which more data can be appended. +// TODO: Instead of pinning this to Vec, we should define a trait that allows any owned/growable +// byte buffer type to be used. +impl BinaryBuffer> { + /// Moves any unread bytes to the front of the `Vec`, making room for more data at the tail. + /// This method should only be called when the bytes remaining in the buffer represent an + /// incomplete value; as such, the required `memcpy` should typically be quite small. + fn restack(&mut self) { + let remaining = self.remaining(); + self.data.copy_within(self.start..self.end, 0); + self.start = 0; + self.end = remaining; + self.data.truncate(remaining); + } + + /// Copies the provided bytes to end of the input buffer. + pub fn append_bytes(&mut self, bytes: &[u8]) { + self.restack(); + self.data.extend_from_slice(bytes); + self.end += bytes.len(); + } + + /// Tries to read `length` bytes from `source`. Unlike [append_bytes], this method does not do + /// any copying. A slice of the reader's buffer is handed to `source` so it can be populated + /// directly. + /// + /// If successful, returns an `Ok(_)` containing the number of bytes that were actually read. + pub fn read_from(&mut self, mut source: R, length: usize) -> IonResult { + self.restack(); + // Make sure that there are `length` bytes in the `Vec` beyond `self.end`. + self.reserve_capacity(length); + // Get a mutable slice to the first `length` bytes starting at `self.end`. + let read_buffer = &mut self.data.as_mut_slice()[self.end..length]; + // Use that slice as our input buffer to read from the source. + let bytes_read = source.read(read_buffer)?; + // Update `self.end` to reflect that we have more data available to read. + self.end += bytes_read; + Ok(bytes_read) + } + + /// Pushes `0u8` onto the end of the `Vec` until there are `length` bytes available beyond + /// `self.end`. This block of zeroed out bytes can then be used as an input I/O buffer for calls + /// to `read_from`. Applications should only use `read_from` when the buffer has been depleted, + /// which means that calls to this method should usually be no-ops. + fn reserve_capacity(&mut self, length: usize) { + // TODO: More sophisticated logic to avoid potentially reallocating multiple times per call. + // For now, it is unlikely that this would happen often. + let capacity = self.data.len() - self.end; + if capacity < length { + for _ in 0..(length - capacity) { + self.data.push(0); + } + } + } +} + +/// Constructs a [BinaryBuffer] from anything that can be viewed as a slice of bytes, including +/// `&[u8]`, `Vec`, `Buf`, etc. +impl> From for BinaryBuffer { + fn from(data: A) -> Self { + let end = data.as_ref().len(); + BinaryBuffer { + data, + start: 0, + end, + total_consumed: 0, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::IonError; + use num_traits::Num; + + fn input_test + Into>>(input: I) { + let mut input = input.into(); + // We can peek at the first byte... + assert_eq!(input.peek_next_byte(), Some(b'f')); + // ...without modifying the input. Looking at the next 3 bytes still includes 'f'. + assert_eq!(input.peek_n_bytes(3), Some("foo".as_bytes())); + // Advancing the cursor by 1... + input.consume(1); + // ...causes next_byte() to return 'o'. + assert_eq!(input.peek_next_byte(), Some(b'o')); + input.consume(1); + assert_eq!(input.peek_next_byte(), Some(b'o')); + input.consume(1); + assert_eq!(input.peek_n_bytes(2), Some(" b".as_bytes())); + assert_eq!(input.peek_n_bytes(6), Some(" bar b".as_bytes())); + } + + #[test] + fn string_test() { + input_test(String::from("foo bar baz")); + } + + #[test] + fn slice_test() { + input_test("foo bar baz".as_bytes()); + } + + #[test] + fn vec_test() { + input_test(Vec::from("foo bar baz".as_bytes())); + } + + #[test] + fn read_var_uint() -> IonResult<()> { + let mut buffer = BinaryBuffer::new(&[0b0111_1001, 0b0000_1111, 0b1000_0001]); + let var_uint = buffer.read_var_uint()?; + assert_eq!(3, var_uint.size_in_bytes()); + assert_eq!(1_984_385, var_uint.value()); + Ok(()) + } + + #[test] + fn read_var_uint_zero() -> IonResult<()> { + let mut buffer = BinaryBuffer::new(&[0b1000_0000]); + let var_uint = buffer.read_var_uint()?; + assert_eq!(var_uint.size_in_bytes(), 1); + assert_eq!(var_uint.value(), 0); + Ok(()) + } + + #[test] + fn read_var_uint_two_bytes_max_value() -> IonResult<()> { + let mut buffer = BinaryBuffer::new(&[0b0111_1111, 0b1111_1111]); + let var_uint = buffer.read_var_uint()?; + assert_eq!(var_uint.size_in_bytes(), 2); + assert_eq!(var_uint.value(), 16_383); + Ok(()) + } + + #[test] + fn read_incomplete_var_uint() -> IonResult<()> { + let mut buffer = BinaryBuffer::new(&[0b0111_1001, 0b0000_1111]); + match buffer.read_var_uint() { + Err(IonError::Incomplete { .. }) => Ok(()), + other => panic!("expected IonError::Incomplete, but found: {:?}", other), + } + } + + #[test] + fn read_var_uint_overflow_detection() { + let mut buffer = BinaryBuffer::new(&[ + 0b0111_1111, + 0b0111_1111, + 0b0111_1111, + 0b0111_1111, + 0b0111_1111, + 0b0111_1111, + 0b0111_1111, + 0b0111_1111, + 0b0111_1111, + 0b1111_1111, + ]); + buffer + .read_var_uint() + .expect_err("This should have failed due to overflow."); + } + + #[test] + fn read_var_int_zero() -> IonResult<()> { + let mut buffer = BinaryBuffer::new(&[0b1000_0000]); + let var_int = buffer.read_var_int()?; + assert_eq!(var_int.size_in_bytes(), 1); + assert_eq!(var_int.value(), 0); + Ok(()) + } + + #[test] + fn read_negative_var_int() -> IonResult<()> { + let mut buffer = BinaryBuffer::new(&[0b0111_1001, 0b0000_1111, 0b1000_0001]); + let var_int = buffer.read_var_int()?; + assert_eq!(var_int.size_in_bytes(), 3); + assert_eq!(var_int.value(), -935_809); + Ok(()) + } + + #[test] + fn read_positive_var_int() -> IonResult<()> { + let mut buffer = BinaryBuffer::new(&[0b0011_1001, 0b0000_1111, 0b1000_0001]); + let var_int = buffer.read_var_int()?; + assert_eq!(var_int.size_in_bytes(), 3); + assert_eq!(var_int.value(), 935_809); + Ok(()) + } + + #[test] + fn read_var_int_two_byte_min() -> IonResult<()> { + let mut buffer = BinaryBuffer::new(&[0b0111_1111, 0b1111_1111]); + let var_int = buffer.read_var_int()?; + assert_eq!(var_int.size_in_bytes(), 2); + assert_eq!(var_int.value(), -8_191); + Ok(()) + } + + #[test] + fn read_var_int_two_byte_max() -> IonResult<()> { + let mut buffer = BinaryBuffer::new(&[0b0011_1111, 0b1111_1111]); + let var_int = buffer.read_var_int()?; + assert_eq!(var_int.size_in_bytes(), 2); + assert_eq!(var_int.value(), 8_191); + Ok(()) + } + + #[test] + fn read_var_int_overflow_detection() -> IonResult<()> { + let mut buffer = BinaryBuffer::new(&[ + 0b0111_1111, + 0b0111_1111, + 0b0111_1111, + 0b0111_1111, + 0b0111_1111, + 0b0111_1111, + 0b0111_1111, + 0b0111_1111, + 0b0111_1111, + 0b1111_1111, + ]); + buffer + .read_var_int() + .expect_err("This should have failed due to overflow."); + Ok(()) + } + + #[test] + fn read_one_byte_uint() -> IonResult<()> { + let mut buffer = BinaryBuffer::new(&[0b1000_0000]); + let var_int = buffer.read_uint(buffer.remaining())?; + assert_eq!(var_int.size_in_bytes(), 1); + assert_eq!(var_int.value(), &UInteger::U64(128)); + Ok(()) + } + + #[test] + fn read_two_byte_uint() -> IonResult<()> { + let mut buffer = BinaryBuffer::new(&[0b0111_1111, 0b1111_1111]); + let var_int = buffer.read_uint(buffer.remaining())?; + assert_eq!(var_int.size_in_bytes(), 2); + assert_eq!(var_int.value(), &UInteger::U64(32_767)); + Ok(()) + } + + #[test] + fn read_three_byte_uint() -> IonResult<()> { + let mut buffer = BinaryBuffer::new(&[0b0011_1100, 0b1000_0111, 0b1000_0001]); + let var_int = buffer.read_uint(buffer.remaining())?; + assert_eq!(var_int.size_in_bytes(), 3); + assert_eq!(var_int.value(), &UInteger::U64(3_966_849)); + Ok(()) + } + + #[test] + fn test_read_ten_byte_uint() -> IonResult<()> { + let data = vec![0xFFu8; 10]; + let mut buffer = BinaryBuffer::new(data); + let uint = buffer.read_uint(buffer.remaining())?; + assert_eq!(uint.size_in_bytes(), 10); + assert_eq!( + uint.value(), + &UInteger::BigUInt(BigUint::from_str_radix("ffffffffffffffffffff", 16).unwrap()) + ); + Ok(()) + } + + #[test] + fn test_read_uint_too_large() { + let mut buffer = Vec::with_capacity(MAX_UINT_SIZE_IN_BYTES + 1); + for _ in 0..(MAX_UINT_SIZE_IN_BYTES + 1) { + buffer.push(1); + } + let mut buffer = BinaryBuffer::new(buffer); + let _uint = buffer + .read_uint(buffer.remaining()) + .expect_err("This exceeded the configured max UInt size."); + } + + #[test] + fn read_int_negative_zero() -> IonResult<()> { + let mut buffer = BinaryBuffer::new(&[0b1000_0000]); // Negative zero + let int = buffer.read_int(buffer.remaining())?; + assert_eq!(int.size_in_bytes(), 1); + assert_eq!(int.value(), &Integer::I64(0)); + assert!(int.is_negative_zero()); + Ok(()) + } + + #[test] + fn read_int_positive_zero() -> IonResult<()> { + let mut buffer = BinaryBuffer::new(&[0b0000_0000]); // Negative zero + let int = buffer.read_int(buffer.remaining())?; + assert_eq!(int.size_in_bytes(), 1); + assert_eq!(int.value(), &Integer::I64(0)); + assert!(!int.is_negative_zero()); + Ok(()) + } + + #[test] + fn read_int_length_zero() -> IonResult<()> { + let mut buffer = BinaryBuffer::new(&[]); // Negative zero + let int = buffer.read_int(buffer.remaining())?; + assert_eq!(int.size_in_bytes(), 0); + assert_eq!(int.value(), &Integer::I64(0)); + assert!(!int.is_negative_zero()); + Ok(()) + } + + #[test] + fn read_two_byte_negative_int() -> IonResult<()> { + let mut buffer = BinaryBuffer::new(&[0b1111_1111, 0b1111_1111]); + let int = buffer.read_int(buffer.remaining())?; + assert_eq!(int.size_in_bytes(), 2); + assert_eq!(int.value(), &Integer::I64(-32_767)); + Ok(()) + } + + #[test] + fn read_two_byte_positive_int() -> IonResult<()> { + let mut buffer = BinaryBuffer::new(&[0b0111_1111, 0b1111_1111]); + let int = buffer.read_int(buffer.remaining())?; + assert_eq!(int.size_in_bytes(), 2); + assert_eq!(int.value(), &Integer::I64(32_767)); + Ok(()) + } + + #[test] + fn read_three_byte_negative_int() -> IonResult<()> { + let mut buffer = BinaryBuffer::new(&[0b1011_1100, 0b1000_0111, 0b1000_0001]); + let int = buffer.read_int(buffer.remaining())?; + assert_eq!(int.size_in_bytes(), 3); + assert_eq!(int.value(), &Integer::I64(-3_966_849)); + Ok(()) + } + + #[test] + fn read_three_byte_positive_int() -> IonResult<()> { + let mut buffer = BinaryBuffer::new(&[0b0011_1100, 0b1000_0111, 0b1000_0001]); + let int = buffer.read_int(buffer.remaining())?; + assert_eq!(int.size_in_bytes(), 3); + assert_eq!(int.value(), &Integer::I64(3_966_849)); + Ok(()) + } + + #[test] + fn read_int_overflow() -> IonResult<()> { + let mut buffer = BinaryBuffer::new(vec![1; MAX_INT_SIZE_IN_BYTES + 1]); // Negative zero + buffer + .read_int(buffer.remaining()) + .expect_err("This exceeded the configured max Int size."); + Ok(()) + } +} diff --git a/src/binary/non_blocking/mod.rs b/src/binary/non_blocking/mod.rs new file mode 100644 index 00000000..58706cf4 --- /dev/null +++ b/src/binary/non_blocking/mod.rs @@ -0,0 +1,3 @@ +pub mod binary_buffer; +pub mod raw_binary_reader; +pub mod type_descriptor; diff --git a/src/binary/non_blocking/raw_binary_reader.rs b/src/binary/non_blocking/raw_binary_reader.rs new file mode 100644 index 00000000..3a4da06a --- /dev/null +++ b/src/binary/non_blocking/raw_binary_reader.rs @@ -0,0 +1,1796 @@ +use crate::binary::constants::v1_0::{length_codes, IVM}; +use crate::binary::int::DecodedInt; +use crate::binary::non_blocking::binary_buffer::BinaryBuffer; +use crate::binary::non_blocking::type_descriptor::{Header, TypeDescriptor}; +use crate::binary::uint::DecodedUInt; +use crate::binary::var_uint::VarUInt; +use crate::binary::IonTypeCode; +use crate::result::{ + decoding_error, decoding_error_raw, illegal_operation, illegal_operation_raw, + incomplete_data_error, +}; +use crate::types::integer::{IntAccess, UInteger}; +use crate::types::SymbolId; +use crate::{ + Decimal, Integer, IonResult, IonType, RawStreamItem, RawSymbolToken, StreamReader, Timestamp, +}; +use bytes::{BigEndian, Buf, ByteOrder}; +use num_bigint::BigUint; +use num_traits::Zero; +use std::io::Read; +use std::ops::Range; + +/// Type, offset, and length information about the serialized value over which the +/// NonBlockingRawBinaryReader is currently positioned. +#[derive(Clone, Copy, Debug, PartialEq)] +struct EncodedValue { + // If the compiler decides that a value is too large to be moved/copied with inline code, + // it will relocate the value using memcpy instead. This can be quite slow by comparison. + // + // Be cautious when adding new member fields or modifying the data types of existing member + // fields, as this may cause the in-memory size of `EncodedValue` instances to grow. + // + // See the Rust Performance Book section on measuring type sizes[1] for more information. + // [1] https://nnethercote.github.io/perf-book/type-sizes.html#measuring-type-sizes + + // The type descriptor byte that identified this value; includes the type code, length code, + // and IonType. + header: Header, + + // Each encoded value has up to five components, appearing in the following order: + // + // [ field_id? | annotations? | header (type descriptor) | header_length? | value ] + // + // Components shown with a `?` are optional. + // + // EncodedValue stores the offset of the type descriptor byte from the beginning of the + // data source (`header_offset`). The lengths of the other fields can be used to calculate + // their positions relative to the type descriptor byte. For example, to find the offset of the + // field ID (if present), we can do: + // header_offset - annotations_header_length - field_id_length + // + // This allows us to store a single `usize` for the header offset, while other lengths can be + // packed into a `u8`. Values are not permitted to have a field ID or annotations that take + // more than 255 bytes to represent. + // + // We store the offset for the header byte because it is guaranteed to be present for all values. + // Field IDs and annotations appear earlier in the stream but are optional. + + // The number of bytes used to encode the field ID (if present) preceding the Ion value. If + // `field_id` is undefined, `field_id_length` will be zero. + field_id_length: u8, + // If this value is inside a struct, `field_id` will contain the SymbolId that represents + // its field name. + field_id: Option, + // The number of bytes used to encode the annotations wrapper (if present) preceding the Ion + // value. If `annotations` is empty, `annotations_header_length` will be zero. + annotations_header_length: u8, + // The number of bytes used to encode the series of symbol IDs inside the annotations wrapper. + annotations_sequence_length: u8, + // Type descriptor byte location. + header_offset: usize, + // The number of bytes used to encode the header not including the type descriptor byte. + header_length: u8, + // The number of bytes used to encode the value itself, not including the header byte + // or length fields. + value_length: usize, + // The sum total of: + // field_id_length + annotations_header_length + header_length + value_length + // While this can be derived from the above fields, storing it for reuse offers a modest + // optimization. `total_length` is needed when stepping into a value, skipping a value, + // and reading a value's data. + total_length: usize, +} + +impl EncodedValue { + /// Returns the offset of the current value's type descriptor byte. + fn header_offset(&self) -> usize { + self.header_offset as usize + } + + /// Returns the length of this value's header, including the type descriptor byte and any + /// additional bytes used to encode the value's length. + fn header_length(&self) -> usize { + // The `header_length` field does not include the type descriptor byte, so add 1. + self.header_length as usize + 1 + } + + /// Returns an offset Range that contains this value's type descriptor byte and any additional + /// bytes used to encode the `length`. + fn header_range(&self) -> Range { + let start = self.header_offset; + let end = start + self.header_length(); + start..end + } + + /// Returns the number of bytes used to encode this value's data. + /// If the value can fit in the type descriptor byte (e.g. `true`, `false`, `null`, `0`), + /// this function will return 0. + #[inline(always)] + fn value_length(&self) -> usize { + self.value_length + } + + /// The offset of the first byte following the header (including length bytes, if present). + /// If `value_length()` returns zero, this offset is actually the first byte of + /// the next encoded value and should not be read. + fn value_offset(&self) -> usize { + self.header_offset + self.header_length() + } + + /// Returns an offset Range containing any bytes following the header. + fn value_range(&self) -> Range { + let start = self.value_offset(); + let end = start + self.value_length; + start..end + } + + /// Returns the index of the first byte that is beyond the end of the current value's encoding. + fn value_end_exclusive(&self) -> usize { + self.value_offset() + self.value_length + } + + /// Returns the number of bytes used to encode this value's field ID, if present. + fn field_id_length(&self) -> Option { + self.field_id.as_ref()?; + Some(self.field_id_length as usize) + } + + /// Returns the offset of the first byte used to encode this value's field ID, if present. + fn field_id_offset(&self) -> Option { + self.field_id.as_ref()?; + Some( + self.header_offset + - self.annotations_header_length as usize + - self.field_id_length as usize, + ) + } + + /// Returns an offset Range that contains the bytes used to encode this value's field ID, + /// if present. + fn field_id_range(&self) -> Option> { + if let Some(start) = self.field_id_offset() { + let end = start + self.field_id_length as usize; + return Some(start..end); + } + None + } + + /// Returns the number of bytes used to encode this value's annotations, if any. + /// While annotations envelope the value that they decorate, this function does not include + /// the length of the value itself. + fn annotations_header_length(&self) -> Option { + if self.annotations_header_length == 0 { + return None; + } + Some(self.annotations_header_length as usize) + } + + /// Returns the number of bytes used to encode the series of VarUInt annotation symbol IDs, if + /// any. + /// + /// See: https://amzn.github.io/ion-docs/docs/binary.html#annotations + fn annotations_sequence_length(&self) -> Option { + if self.annotations_header_length == 0 { + return None; + } + Some(self.annotations_sequence_length as usize) + } + + /// Returns the offset of the beginning of the annotations wrapper, if present. + fn annotations_offset(&self) -> Option { + if self.annotations_header_length == 0 { + return None; + } + Some(self.header_offset - self.annotations_header_length as usize) + } + + /// Returns an offset Range that includes the bytes used to encode this value's annotations, + /// if any. While annotations envelope the value that they modify, this function does not + /// include the bytes of the encoded value itself. + fn annotations_range(&self) -> Option> { + if let Some(start) = self.annotations_offset() { + let end = start + self.annotations_header_length as usize; + return Some(start..end); + } + None + } + + /// Returns the total number of bytes used to represent the current value, including the + /// field ID (if any), its annotations (if any), its header (type descriptor + length bytes), + /// and its value. + fn total_length(&self) -> usize { + self.total_length + } + + fn ion_type(&self) -> IonType { + self.header.ion_type + } +} + +/// Constructs an 'empty' EncodedValue that the reader can populate while parsing. +impl Default for EncodedValue { + fn default() -> EncodedValue { + EncodedValue { + header: Header { + ion_type: IonType::Null, + ion_type_code: IonTypeCode::NullOrNop, + length_code: length_codes::NULL, + }, + field_id: None, + field_id_length: 0, + annotations_header_length: 0, + annotations_sequence_length: 0, + header_offset: 0, + header_length: 0, + value_length: 0, + total_length: 0, + } + } +} + +/// Tracks whether the non-blocking binary reader is currently positioned... +#[derive(Debug, PartialEq, Clone)] +enum ReaderState { + /// ...on a type descriptor byte offset, ready to attempt parsing... + /// + /// The reader will be in this state + /// * before it begins reading from the stream + /// * after stepping into a container + /// * after stepping out of a container + /// * at the end of a stream + Ready, + /// ...on the first byte of an IVM... + OnIvm, + /// ...on the first byte of a value... + /// + /// Depending on the value, this byte will be the first of: + /// * the field ID (if present) + /// * the annotations wrapper (if present) + /// * the value's type descriptor byte + OnValue(EncodedValue), + /// ...or between stream items. The nested `usize` indicates how many bytes must be read + /// before we can attempt parsing again. + /// + /// If the reader's state is `Skipping(n)`, it means that the reader ran out of data before it + /// was able to read the next item in the stream; more data will have to be made available to + /// the reader before parsing can resume. + Skipping(usize), +} + +/// Represents the subset of [IonType] variants that are containers. +#[derive(Debug, PartialEq, Clone, Copy)] +enum ContainerType { + List, + SExpression, + Struct, +} + +impl ContainerType { + /// Returns the [IonType] that corresponds to this [ContainerType]. + pub fn ion_type(&self) -> IonType { + match self { + ContainerType::List => IonType::List, + ContainerType::SExpression => IonType::SExpression, + ContainerType::Struct => IonType::Struct, + } + } +} + +/// Represents a container into which the reader has stepped. +#[derive(Debug, PartialEq, Clone, Copy)] +struct Container { + kind: ContainerType, + /// The offset of the first byte *after* the parent container. For example: if the container + /// starts at index 0 and is 4 bytes long, `exclusive_end` will be `4`. + exclusive_end: usize, +} + +impl Container { + /// Returns the [IonType] that corresponds to this [Container]. + pub fn ion_type(&self) -> IonType { + self.kind.ion_type() + } +} + +/// A raw binary reader that pulls input bytes from a fixed buffer. +/// +/// If any read operation fails due to the buffer containing incomplete data, that method will +/// return [IonError::Incomplete]. +/// +/// If the buffer (generic type `A`) is a [Vec], then data can be appended to it between read +/// operations. This can be useful when reading from a data source that is growing over time, such +/// as when tailing a growing file, reading over the network, or waiting for user input. +/// Applications can read from the buffer until they encounter an `Incomplete`. Then, when more +/// data is available, they can use [read_from] or [append_bytes] to add that data to the buffer. +/// Finally, they can retry the read operation that had previously failed. +/// +/// Note that if the buffer runs out of data between top level values, this will be interpreted +/// as the end of the stream. Applications can still add more data to the buffer and resume reading. +#[derive(Debug)] +pub struct RawBinaryBufferReader> { + ion_version: (u8, u8), + state: ReaderState, + buffer: BinaryBuffer, + parents: Vec, +} + +impl> RawBinaryBufferReader { + /// Constructs a RawBinaryBufferReader from a value that can be viewed as a byte slice. + pub fn new(source: A) -> RawBinaryBufferReader { + RawBinaryBufferReader { + ion_version: (1, 0), + state: ReaderState::Ready, + buffer: BinaryBuffer::new(source), + parents: Vec::new(), // Does not allocate yet + } + } + + /// Constructs a disposable view of the buffer's current contents that can be used to find the + /// next item in the stream. If the TxReader encounters a problem like invalid or incomplete + /// data, it can be discarded without affecting the RawBinaryBufferReader that created it. + fn transaction_reader(&mut self) -> TxReader { + // Temporarily break apart `self` to get simultaneous mutable references to the buffer, + // the reader state, and the parents. + let RawBinaryBufferReader { + state, + buffer, + parents, + .. + } = self; + + // Create a slice of the main buffer that has its own records of how many bytes have been + // read and how many remain. + let tx_buffer = buffer.slice(); + TxReader { + state, + parent: parents.last(), + tx_buffer, + encoded_value: Default::default(), + nop_bytes_count: 0, + } + } + + /// Moves the reader to the first byte of the next item in the stream. + /// If it succeeds, the reader's state will be [ReaderState::Ready]. + /// If there is not enough data in the buffer to reach the next item, the reader's state will + /// be [ReaderState::Skipping], indicating that it is mid-stream and awaiting more data. + fn advance_to_next_item(&mut self) -> IonResult<()> { + use ReaderState::*; + let bytes_to_skip = match self.state { + Ready => return Ok(()), + Skipping(bytes_to_skip) => bytes_to_skip, + OnIvm => IVM.len(), + OnValue(encoded_value) => encoded_value.total_length(), + }; + + let bytes_available = self.buffer.remaining(); + if bytes_available >= bytes_to_skip { + self.buffer.consume(bytes_to_skip); + self.state = Ready; + Ok(()) + } else { + self.buffer.consume(bytes_available); + self.state = Skipping(bytes_to_skip - bytes_available); + incomplete_data_error("ahead to next item", self.buffer.total_consumed()) + } + } + + /// Creates an iterator that lazily reads the VarUInt symbol IDs in this value's annotations + /// wrapper. If the reader is not on a value or the current value does not have annotations, + /// the iterator will be empty. + pub fn annotations_iter(&self) -> impl Iterator> + '_ { + // If the reader is currently on a value... + if let ReaderState::OnValue(encoded_value) = &self.state { + // ...and that value has one or more annotations... + if let Some(length) = encoded_value.annotations_sequence_length() { + // ...then we'll create an iterator over those annotations. + // Find the relative offset of the value's header byte within the buffer. + let header_relative_offset = + encoded_value.header_offset - self.buffer.total_consumed(); + // The annotations sequence immediately precedes the header byte in the buffer. + // Subtract its length to find the beginning of the sequence. + let start = header_relative_offset - length; + // Get the slice of the buffer that contains the VarUInt annotations sequence. + let annotations_bytes = &self.buffer.bytes()[start..header_relative_offset]; + // Construct an annotations iterator over that slice. + return AnnotationsIterator::new(annotations_bytes); + } + } + // If the reader is either not on a value or the current value has no annotations.else + // Return an iterator over an arbitrary empty slice. + AnnotationsIterator::new(&self.buffer.bytes()[0..0]) + } + + /// If the reader is currently positioned on a value, returns `Some(&value)`. + /// Otherwise, returns `None`. + #[inline] + fn encoded_value(&self) -> Option<&EncodedValue> { + match &self.state { + ReaderState::OnValue(encoded_value) => Some(encoded_value), + _ => None, + } + } + + /// Returns an IonError::IllegalOperation describing why the current operation could not be + /// performed in the reader's current state. + #[inline(never)] + // This method performs allocations/formatting that compile to non-trivial instructions. + // It will only be called as a result of user error; making it `inline(never)` keeps the + // compiler from bloating functions on the hot path with its (rarely used) expansion. + fn expected(&self, expected: IonType) -> IonResult { + illegal_operation(format!( + "type mismatch: expected a(n) {} but positioned over a(n) {}", + expected, + self.current() + )) + } + + /// Verifies that the reader is currently positioned over an Ion value of the expected type. + /// If it is, returns a reference to the corresponding `&EncodedValue` and the slice of input + /// bytes that represents the body of the value. + /// If it is not, returns [IonError::IllegalOperation]. + #[inline] + fn value_and_bytes(&self, expected_ion_type: IonType) -> IonResult<(&EncodedValue, &[u8])> { + // Get a reference to the EncodedValue. This is only possible if the reader is parked + // on a value. + let encoded_value = if let Some(encoded_value) = self.encoded_value() { + // If the value we're parked on is not of the type we're expecting to read, return an error. + if encoded_value.ion_type() != expected_ion_type { + return self.expected(expected_ion_type); + } + encoded_value + } else { + return self.expected(expected_ion_type); + }; + + let value_total_length = encoded_value.total_length(); + if self.buffer.remaining() < value_total_length { + return incomplete_data_error( + "only part of the requested value is available in the buffer", + self.buffer.total_consumed(), + ); + } + + // Get the slice of buffer bytes that represent the value. This slice may be empty. + let value_offset = value_total_length - encoded_value.value_length(); + let bytes = self + .buffer + .bytes_range(value_offset, encoded_value.value_length()); + + Ok((encoded_value, bytes)) + } + + /// Like [value_and_bytes], but wraps the byte slice in a `BinaryBuffer` to make it easy + /// to read a series of encoding primitives from the slice. + #[inline] + fn value_and_buffer( + &mut self, + expected_ion_type: IonType, + ) -> IonResult<(&EncodedValue, BinaryBuffer<&[u8]>)> { + let (encoded_value, bytes) = self.value_and_bytes(expected_ion_type)?; + // Wrap the &[u8] representing the body of the value in a stack-allocated BinaryBuffer. + Ok((encoded_value, BinaryBuffer::new(bytes))) + } + + /// If the reader is currently positioned on a symbol value, parses that value into a `SymbolId`. + pub fn read_symbol_id(&mut self) -> IonResult { + let (encoded_value, mut buffer) = self.value_and_buffer(IonType::Symbol)?; + match buffer.read_uint(encoded_value.value_length())?.value() { + UInteger::U64(symbol_id) => { + // This will always succeed on 64-bit platforms where u64 and usize are the same. + if let Ok(sid) = usize::try_from(*symbol_id) { + Ok(sid) + } else { + decoding_error("found a u64 symbol ID that was too large to fit in a usize") + } + } + UInteger::BigUInt(symbol_id) => Self::try_symbol_id_from_big_uint(symbol_id), + } + } + + /// Tries to downgrade the provided BigUint to a SymbolId (usize). + #[inline(never)] + // This method performs allocations/computation that compile to non-trivial instructions. + // It will only be called if the input stream contains unreadable data; making it `inline(never)` + // keeps the compiler from bloating functions on the hot path with its (rarely used) expansion. + fn try_symbol_id_from_big_uint(big_uint: &BigUint) -> IonResult { + // This will only succeed if the value in the big_uint was small enough to have been + // in a `usize`. This can happen if (e.g.) the encoding was padded with extra zeros. + if let Ok(sid) = big_uint.try_into() { + Ok(sid) + } else { + decoding_error("found a big_uint symbol ID that was too large to fit in a usize") + } + } + + /// If the reader is currently positioned on a string, returns the slice of bytes that represents + /// that string's *UNVALIDATED* utf-8 bytes. This method is available for performance optimization + /// in scenarios where utf-8 validation may be unnecessary and/or a bottleneck. It is strongly + /// recommended that you use [read_str] unless absolutely necessary. + pub fn read_str_bytes(&mut self) -> IonResult<&[u8]> { + let (_encoded_value, bytes) = self.value_and_bytes(IonType::String)?; + Ok(bytes) + } + + /// If the reader is currently positioned on a string, returns a [&str] containing its text. + pub fn read_str(&mut self) -> IonResult<&str> { + self.read_str_bytes().and_then(|bytes| { + std::str::from_utf8(bytes) + .map_err(|_| decoding_error_raw("encountered a string with invalid utf-8 data")) + }) + } + + /// If the reader is currently positioned on a blob, returns a slice containing its bytes. + pub fn read_blob_bytes(&mut self) -> IonResult<&[u8]> { + let (_encoded_value, bytes) = self.value_and_bytes(IonType::Blob)?; + Ok(bytes) + } + + /// If the reader is currently positioned on a clob, returns a slice containing its bytes. + pub fn read_clob_bytes(&mut self) -> IonResult<&[u8]> { + let (_encoded_value, bytes) = self.value_and_bytes(IonType::Clob)?; + Ok(bytes) + } +} + +/// If the RawBinaryBufferReader's data source is a Vec, it gains the ability to add data to +/// the buffer between read operations. This is useful when reading from a streaming data source +/// like a file or TCP socket; the reader can read the contents of its buffer, add more bytes as +/// they arrive, and then continue reading. +impl RawBinaryBufferReader> { + /// Copies the provided bytes to end of the reader's input buffer. + fn append_bytes(&mut self, bytes: &[u8]) { + self.buffer.append_bytes(bytes); + } + + /// Tries to read `length` bytes from `source`. Unlike [append_bytes], this method does not do + /// any copying. A slice of the reader's buffer is handed to `source` so it can be populated + /// directly. + fn read_from(&mut self, source: R, length: usize) -> IonResult { + self.buffer.read_from(source, length) + } +} + +impl> StreamReader for RawBinaryBufferReader { + type Item = RawStreamItem; + type Symbol = RawSymbolToken; + + fn ion_version(&self) -> (u8, u8) { + self.ion_version + } + + #[inline] + fn next(&mut self) -> IonResult { + // `advance_to_next_item` is the only method that can modify `self.buffer`. It causes the + // bytes representing the current stream item to be consumed. + // + // If the buffer contains enough data, the reader's new position will be the first byte of + // the next type descriptor byte (which may represent a field_id, annotation wrapper, value + // header, or NOP bytes) and its state will be set to `Ready`. + // + // If there is not enough data, `self.state` will be set to `Skipping(n)` to keep track of + // how many more bytes we would need to add to the buffer before we could reach the next + // type descriptor. If `self.state` is `Skipping(n)`, the only way to advance is to add + // more data to the buffer. + self.advance_to_next_item()?; + + if let Some(parent) = self.parents.last() { + // We're inside a container. If we've reached its end, return `Nothing`. + if self.buffer.total_consumed() >= parent.exclusive_end { + return Ok(RawStreamItem::Nothing); + } + } else { + // We're at the top level. If we're out of data (`buffer.is_empty()`) and aren't waiting + // on more data (`Skipping(n)`), we return `Nothing` to indicate that we're at EOF. + if self.buffer.is_empty() && self.state == ReaderState::Ready { + return Ok(RawStreamItem::Nothing); + } + } + + // Make a 'transaction' reader. This is a disposable view of the reader's main input buffer; + // it's reading the same bytes, but keeps its own records of how many bytes have been + // consumed. If reading fails at some point due to incomplete data or another error, the + // `tx_reader` can be discarded without affecting `self.buffer`. The next attempt at + // parsing will create a fresh transaction reader starting from the last good state. + let mut tx_reader = self.transaction_reader(); + + let item_result = tx_reader.read_next_item(); + + // If we encountered any leading NOP bytes during this transaction, consume them. + // This guarantees that the first byte in the buffer is the first byte of the current item. + let nop_bytes_count = tx_reader.nop_bytes_count as usize; + self.buffer.consume(nop_bytes_count); + item_result + } + + fn current(&self) -> Self::Item { + use ReaderState::*; + match self.state { + OnIvm => RawStreamItem::VersionMarker(self.ion_version.0, self.ion_version.1), + OnValue(ref encoded_value) => { + let ion_type = encoded_value.header.ion_type; + if encoded_value.header.is_null() { + RawStreamItem::Null(ion_type) + } else { + RawStreamItem::Value(ion_type) + } + } + Ready | Skipping(_) => RawStreamItem::Nothing, + } + } + + fn ion_type(&self) -> Option { + self.encoded_value().map(|ev| ev.ion_type()) + } + + fn annotations<'a>(&'a self) -> Box> + 'a> { + Box::new(self.annotations_iter()) + } + + fn field_name(&self) -> IonResult { + // If the reader is parked on a value... + self.encoded_value() + .ok_or_else(|| illegal_operation_raw("the reader is not positioned on a value")) + // and that value has a field ID (because it's inside a struct)... + .and_then(|ev| + // ...then convert that field ID into a RawSymbolToken. + ev.field_id + .map(RawSymbolToken::SymbolId) + .ok_or_else(|| illegal_operation_raw("the current value is not inside a struct"))) + } + + fn is_null(&self) -> bool { + self.encoded_value() + .map(|ev| ev.header.is_null()) + .unwrap_or(false) + } + + fn read_null(&mut self) -> IonResult { + if let Some(value) = self.encoded_value() { + // If the reader is on a value, the IonType is present. + let ion_type = value.header.ion_type; + return if value.header.is_null() { + Ok(ion_type) + } else { + illegal_operation(format!( + "cannot read null; reader is currently positioned on a non-null {}", + ion_type + )) + }; + } + Err(illegal_operation_raw( + "the reader is not currently positioned on a value", + )) + } + + fn read_bool(&mut self) -> IonResult { + let (encoded_value, _) = self.value_and_bytes(IonType::Boolean)?; + + let representation = encoded_value.header.length_code; + match representation { + 0 => Ok(false), + 1 => Ok(true), + _ => decoding_error( + "found a boolean value with an illegal representation (must be 0 or 1): {}", + ), + } + } + + fn read_i64(&mut self) -> IonResult { + self.read_integer().and_then(|i| { + i.as_i64() + .ok_or_else(|| decoding_error_raw("integer was too large to fit in an i64")) + }) + } + + fn read_integer(&mut self) -> IonResult { + let (encoded_value, mut buffer) = self.value_and_buffer(IonType::Integer)?; + let uint: DecodedUInt = buffer.read_uint(encoded_value.value_length())?; + let value: Integer = uint.into(); + + use self::IonTypeCode::*; + let value = match (encoded_value.header.ion_type_code, value) { + (PositiveInteger, integer) => integer, + (NegativeInteger, integer) if integer.is_zero() => { + return decoding_error("found a negative integer (typecode=3) with a value of 0"); + } + (NegativeInteger, integer) => -integer, + _itc => return decoding_error("unexpected ion type code"), + }; + + Ok(value) + } + + fn read_f32(&mut self) -> IonResult { + self.read_f64().map(|f| f as f32) + } + + fn read_f64(&mut self) -> IonResult { + let (encoded_value, bytes) = self.value_and_bytes(IonType::Float)?; + let number_of_bytes = encoded_value.value_length(); + let value = match number_of_bytes { + 0 => 0f64, + 4 => f64::from(BigEndian::read_f32(bytes)), + 8 => BigEndian::read_f64(bytes), + _ => return decoding_error("encountered a float with an illegal length"), + }; + Ok(value) + } + + fn read_decimal(&mut self) -> IonResult { + let (encoded_value, mut buffer) = self.value_and_buffer(IonType::Decimal)?; + + if encoded_value.value_length() == 0 { + return Ok(Decimal::new(0i32, 0i64)); + } + + let exponent_var_int = buffer.read_var_int()?; + let coefficient_size_in_bytes = + encoded_value.value_length() - exponent_var_int.size_in_bytes(); + + let exponent = exponent_var_int.value() as i64; + let coefficient = buffer.read_int(coefficient_size_in_bytes)?; + + if coefficient.is_negative_zero() { + return Ok(Decimal::negative_zero_with_exponent(exponent)); + } + + Ok(Decimal::new(coefficient, exponent)) + } + + fn read_string(&mut self) -> IonResult { + self.read_str().map(|s| s.to_string()) + } + + fn map_string(&mut self, f: F) -> IonResult + where + Self: Sized, + F: FnOnce(&str) -> U, + { + self.read_str().map(f) + } + + fn map_string_bytes(&mut self, f: F) -> IonResult + where + Self: Sized, + F: FnOnce(&[u8]) -> U, + { + self.read_str_bytes().map(f) + } + + fn read_symbol(&mut self) -> IonResult { + self.read_symbol_id().map(RawSymbolToken::SymbolId) + } + + fn read_blob(&mut self) -> IonResult> { + self.read_blob_bytes().map(Vec::from) + } + + fn map_blob(&mut self, f: F) -> IonResult + where + Self: Sized, + F: FnOnce(&[u8]) -> U, + { + self.read_blob_bytes().map(f) + } + + fn read_clob(&mut self) -> IonResult> { + self.read_clob_bytes().map(Vec::from) + } + + fn map_clob(&mut self, f: F) -> IonResult + where + Self: Sized, + F: FnOnce(&[u8]) -> U, + { + self.read_clob_bytes().map(f) + } + + fn read_timestamp(&mut self) -> IonResult { + let (encoded_value, mut buffer) = self.value_and_buffer(IonType::Timestamp)?; + + let offset = buffer.read_var_int()?; + let is_known_offset = !offset.is_negative_zero(); + let offset_minutes = offset.value() as i32; + let year = buffer.read_var_uint()?.value() as u32; + + // Year precision + + let builder = Timestamp::with_year(year); + if buffer.is_empty() { + let timestamp = builder.build()?; + return Ok(timestamp); + } + + // Month precision + + let month = buffer.read_var_uint()?.value() as u32; + let builder = builder.with_month(month); + if buffer.is_empty() { + let timestamp = builder.build()?; + return Ok(timestamp); + } + + // Day precision + + let day = buffer.read_var_uint()?.value() as u32; + let builder = builder.with_day(day); + if buffer.is_empty() { + let timestamp = builder.build()?; + return Ok(timestamp); + } + + // Hour-and-minute precision + + let hour = buffer.read_var_uint()?.value() as u32; + if buffer.is_empty() { + return decoding_error("timestamps with an hour must also specify a minute"); + } + let minute = buffer.read_var_uint()?.value() as u32; + let builder = builder.with_hour_and_minute(hour, minute); + if buffer.is_empty() { + let timestamp = if is_known_offset { + builder.build_utc_fields_at_offset(offset_minutes) + } else { + builder.build_at_unknown_offset() + }?; + return Ok(timestamp); + } + + // Second precision + + let second = buffer.read_var_uint()?.value() as u32; + let builder = builder.with_second(second); + if buffer.is_empty() { + let timestamp = if is_known_offset { + builder.build_utc_fields_at_offset(offset_minutes) + } else { + builder.build_at_unknown_offset() + }?; + return Ok(timestamp); + } + + // Fractional second precision + + let subsecond_exponent = buffer.read_var_int()?.value(); + // The remaining bytes represent the coefficient. + let coefficient_size_in_bytes = encoded_value.value_length() - buffer.total_consumed(); + let subsecond_coefficient = if coefficient_size_in_bytes == 0 { + DecodedInt::zero() + } else { + buffer.read_int(coefficient_size_in_bytes)? + }; + + let builder = builder + .with_fractional_seconds(Decimal::new(subsecond_coefficient, subsecond_exponent)); + let timestamp = if is_known_offset { + builder.build_utc_fields_at_offset(offset_minutes) + } else { + builder.build_at_unknown_offset() + }?; + + Ok(timestamp) + } + + fn step_in(&mut self) -> IonResult<()> { + let value = self.encoded_value().ok_or_else(|| { + illegal_operation_raw("cannot step in; the reader is not positioned over a container") + })?; + + let container_type = match value.header.ion_type { + IonType::List => ContainerType::List, + IonType::SExpression => ContainerType::SExpression, + IonType::Struct => ContainerType::Struct, + _other => { + return illegal_operation( + "cannot step in; the reader is not positioned over a container", + ) + } + }; + + let total_length = value.total_length(); + + let container = Container { + kind: container_type, + exclusive_end: self.buffer.total_consumed() + total_length, + }; + + // Move the reader to the first byte within the container's value. + // Here, `bytes_to_skip` is the sum of the container's number of field ID bytes, annotation + // wrapper bytes, and header bytes. + let bytes_to_skip = total_length - value.value_length(); + // The buffer will always contain enough bytes to perform this skip; it had to read all of + // those bytes in order to be parked on this container in the first place. + self.buffer.consume(bytes_to_skip); + // The reader is no longer positioned over a value + self.state = ReaderState::Ready; + // Add the container to the `parents` stack. + self.parents.push(container); + + Ok(()) + } + + fn step_out(&mut self) -> IonResult<()> { + let parent = match self.parents.pop() { + Some(parent) => parent, + None => return illegal_operation("reader cannot step out at the top level (depth=0)"), + }; + + // We need to advance the reader to the first byte beyond the end of the parent container. + // We'll skip as many bytes as we can from the current buffer, which may or may not be enough. + let bytes_to_skip = parent.exclusive_end - self.buffer.total_consumed(); + let bytes_available = self.buffer.remaining(); + // Calculate the number of bytes we'll consume based on what's available in the buffer. + let bytes_to_consume = if bytes_to_skip < bytes_available { + // All of the bytes we need to skip are in the buffer. + self.state = ReaderState::Ready; + bytes_to_skip + } else { + // Only some of the bytes we need to skip are in the buffer. + let bytes_left_to_skip = bytes_to_skip - bytes_available; + self.state = ReaderState::Skipping(bytes_left_to_skip); + // Skip what we can; the next call to next() will return `Incomplete` unless more + // data is added in the interim. + bytes_available + }; + self.buffer.consume(bytes_to_consume); + Ok(()) + } + + fn parent_type(&self) -> Option { + self.parents.last().map(|c| c.kind.ion_type()) + } + + fn depth(&self) -> usize { + self.parents.len() + } +} + +/// Iterates over a slice of bytes, lazily reading them as a sequence of VarUInt symbol IDs. +struct AnnotationsIterator<'a> { + data: std::io::Cursor<&'a [u8]>, +} + +impl<'a> AnnotationsIterator<'a> { + pub(crate) fn new(bytes: &[u8]) -> AnnotationsIterator { + AnnotationsIterator { + data: std::io::Cursor::new(bytes), + } + } +} + +impl<'a> Iterator for AnnotationsIterator<'a> { + type Item = IonResult; + + fn next(&mut self) -> Option { + let remaining = self.data.remaining(); + if remaining == 0 { + return None; + } + // This iterator cannot be created unless the reader is currently parked on a value. + // If the reader is parked on a value, the complete annotations sequence is in the buffer. + // Therefore, reading symbol IDs from this byte slice cannot fail. This allows us to safely + // unwrap the result of this `read` call. + let var_uint = VarUInt::read(&mut self.data).unwrap(); + // If this var_uint was longer than the declared annotations wrapper length, return an error. + if var_uint.size_in_bytes() > remaining { + Some(decoding_error( + "found an annotation that exceeded the wrapper's declared length", + )) + } else { + Some(Ok(RawSymbolToken::SymbolId(var_uint.value()))) + } + } +} + +/// A disposable view of the RawBinaryBufferReader's position. +/// +/// The TxReader holds a borrowed (immutable) reference to the RawBinaryBufferReader's buffer +/// and a mutable reference to its state. +/// +/// By making a slice (view) of the buffer, it is able to read ahead in the buffer without affecting +/// the RawBinaryBufferReader. If it is able to find the next item in the stream, it can then update +/// the RawBinaryBufferReader's state. +/// +/// In this way, the RawBinaryBufferReader will never be in a bad state. It only updates when the +/// TxReader has already found the next item. +struct TxReader<'a, A: AsRef<[u8]>> { + state: &'a mut ReaderState, + parent: Option<&'a Container>, + tx_buffer: BinaryBuffer<&'a A>, + encoded_value: EncodedValue, + nop_bytes_count: u32, +} + +impl<'a, A: AsRef<[u8]>> TxReader<'a, A> { + /// Begins reading ahead to find the next item. + #[inline] + pub(crate) fn read_next_item(&mut self) -> IonResult { + let type_descriptor = self.tx_buffer.peek_type_descriptor()?; + + match self.parent.map(|p| p.ion_type()) { + // We're at the top level; check to see if this is an 0xE0 + None if type_descriptor.is_ivm_start() => self.read_ivm(), + // We're inside a struct; the next item must be a (fieldID, value_header) pair. + Some(IonType::Struct) => self.read_struct_field_header(), + // We're... + // * At the top level (but not at an IVM) + // * Inside a list + // * Inside an s-expression + // The next item must be a (potentially annotated) value. + _ => self.read_sequence_item(type_descriptor), + } + } + + /// Looks for zero or more NOP pads followed by either: + /// * an annotated value + /// * a value + #[inline] + fn read_sequence_item( + &mut self, + mut type_descriptor: TypeDescriptor, + ) -> IonResult { + if type_descriptor.is_nop() { + if let Some(item) = self.consume_nop_padding(&mut type_descriptor)? { + // We may encounter the end of the file or container while reading NOP padding, + // in which case `item` will be RawStreamItem::Nothing. + return Ok(item); + } + // Note that if `consume_nop_padding` reads NOP bytes but doesn't hit EOF, it will + // have updated `type_descriptor` by the time we continue on below. + } + + if type_descriptor.is_annotation_wrapper() { + self.read_annotated_value_header(type_descriptor) + } else { + self.read_unannotated_value_header(type_descriptor, None) + } + } + + /// Looks for zero or more (fieldId, NOP pad) pairs followed by a (fieldId, fieldValue) pair. + fn read_struct_field_header(&mut self) -> IonResult { + let mut field_id: VarUInt; + // NOP padding makes this slightly convoluted. We always read the field ID, but if the value + // is a NOP then we discard the field ID, read past the NOP, and then start the process again. + let mut type_descriptor; + loop { + // If we've reached the end of the parent struct, return `Nothing`. Note that a struct + // can be empty (no values) but still contain NOP pads. + if self.is_at_end_of_container() { + return Ok(RawStreamItem::Nothing); + } + // If there are any bytes in this container (even NOP bytes), there must be a field ID. + field_id = self.tx_buffer.read_var_uint()?; + // If there was a field ID, there must be at least one more byte for the NOP or value. + type_descriptor = self.tx_buffer.peek_type_descriptor()?; + if type_descriptor.is_nop() { + let bytes_skipped = self.tx_buffer.read_nop_pad()?; + self.nop_bytes_count += (field_id.size_in_bytes() + bytes_skipped) as u32; + } else { + // We've moved beyond any NOP pads. The last field ID we read was a real one. + // Record its length and offset information. + self.encoded_value.field_id_length = match u8::try_from(field_id.size_in_bytes()) { + Ok(length) => length, + Err(_e) => return decoding_error("found a field ID with more than 255 bytes"), + }; + self.encoded_value.field_id = Some(field_id.value()); + return if type_descriptor.is_annotation_wrapper() { + self.read_annotated_value_header(type_descriptor) + } else { + self.read_unannotated_value_header(type_descriptor, None) + }; + } + } + } + + /// Reads an annotation wrapper followed by a mandatory unannotated value. + fn read_annotated_value_header( + &mut self, + mut type_descriptor: TypeDescriptor, + ) -> IonResult { + // Read the annotations envelope from tx_buffer + let expected_value_length = self.read_annotations_wrapper(type_descriptor)?; + // If there's no type descriptor after the annotations envelope, return Incomplete. + type_descriptor = self.tx_buffer.peek_type_descriptor()?; + // Read the value's header from tx_buffer + self.read_unannotated_value_header(type_descriptor, Some(expected_value_length)) + } + + /// Reads the unannotated header byte (and any length bytes) for the next value. + fn read_unannotated_value_header( + &mut self, + type_descriptor: TypeDescriptor, + expected_length: Option, + ) -> IonResult { + // Resolve the TypeDescriptor to a value Header. A Header holds the same information but, + // because we know it's a value (not a NOP, IVM, or annotation wrapper), it holds an + // `IonType` instead of an `Option`. + let header: Header = type_descriptor + .to_header() + .ok_or_else(|| decoding_error_raw("found a non-value in value position"))?; + + // Add the header to the encoded value we're constructing + self.encoded_value.header = header; + // Record the *absolute* offset of the type descriptor -- its offset from the beginning of + // the stream. + self.encoded_value.header_offset = self.tx_buffer.total_consumed(); + // Advance beyond the type descriptor + self.tx_buffer.consume(1); + + // Record the header's offset/length information. + let length: VarUInt = self.tx_buffer.read_value_length(header)?; + self.encoded_value.header_length = u8::try_from(length.size_in_bytes()).map_err(|_e| { + decoding_error_raw("found a value with a header length field over 255 bytes long") + })?; + self.encoded_value.value_length = length.value(); + self.encoded_value.total_length = self.encoded_value.field_id_length as usize + + self.encoded_value.annotations_header_length as usize + + self.encoded_value.header_length() + + self.encoded_value.value_length(); + + // If this value was annotated, make sure that the length declared in the header matches + // the one that was declared in the preceding annotations wrapper. + if let Some(expected_length) = expected_length { + if expected_length + != self.encoded_value.header_length() + self.encoded_value.value_length() + { + return decoding_error( + "annotations wrapper length did not align with value length", + ); + } + } + + // Now that we've successfully read the field ID (if present), annotations wrapper (if + // present), and value header, update the reader's state to hold the EncodedValue we created. + *self.state = ReaderState::OnValue(self.encoded_value); + + if type_descriptor.is_null() { + Ok(RawStreamItem::Null(header.ion_type)) + } else { + Ok(RawStreamItem::Value(header.ion_type)) + } + } + + #[inline(never)] + // NOP padding is not widely used in Ion 1.0. This method is annotated with `inline(never)` + // to avoid the compiler bloating other methods on the hot path with its rarely used + // instructions. + fn consume_nop_padding( + &mut self, + type_descriptor: &mut TypeDescriptor, + ) -> IonResult> { + // Skip over any number of NOP regions + while type_descriptor.is_nop() { + // We're not on a value, but we are at a place where the reader can safely resume + // reading if necessary. + let bytes_skipped = self.tx_buffer.read_nop_pad()?; + self.nop_bytes_count += bytes_skipped as u32; + // If we don't reach a value header by the end of this method, make a note to discard + // these NOP bytes before we do our next attempt. We don't want the reader to have to + // hold NOP bytes in the buffer if we've already processed them. + if self.is_eof() || self.is_at_end_of_container() { + return Ok(Some(RawStreamItem::Nothing)); + } + *type_descriptor = self.tx_buffer.peek_type_descriptor()?; + } + Ok(None) + } + + /// Populates the annotations-related offsets in the `EncodedValue` based on the information + /// read from the annotations envelope. This method does not read the annotations themselves. + /// Returns the expected length of the annotated value nested inside the envelope. + fn read_annotations_wrapper(&mut self, type_descriptor: TypeDescriptor) -> IonResult { + let initial_consumed = self.tx_buffer.total_consumed(); + // Consume the first byte; its contents are already in the `type_descriptor` parameter. + self.tx_buffer.consume(1); + + // Read the combined length of the annotations sequence and the value that follows it + let annotations_and_value_length = match type_descriptor.length_code { + length_codes::NULL => 0, + length_codes::VAR_UINT => self.tx_buffer.read_var_uint()?.value(), + length => length as usize, + }; + + // Read the length of the annotations sequence + let annotations_length = self.tx_buffer.read_var_uint()?; + + // Validate that neither the annotations sequence is not empty. + if annotations_length.value() == 0 { + return decoding_error("found an annotations wrapper with no annotations"); + } + + // Validate that the annotated value is not missing. + let expected_value_length = annotations_and_value_length + - annotations_length.size_in_bytes() + - annotations_length.value(); + self.tx_buffer.total_consumed(); + if expected_value_length == 0 { + return decoding_error("found an annotation wrapper with no value"); + } + + // Skip over the annotations sequence itself; the reader will return to it if/when the + // reader asks to iterate over those symbol IDs. + self.tx_buffer.consume(annotations_length.value()); + + // Record the important offsets/lengths so we can revisit the annotations sequence later. + self.encoded_value.annotations_header_length = + u8::try_from(self.tx_buffer.total_consumed() - initial_consumed).map_err(|_e| { + decoding_error_raw("found an annotations header greater than 255 bytes long") + })?; + self.encoded_value.annotations_sequence_length = u8::try_from(annotations_length.value()) + .map_err(|_e| { + decoding_error_raw("found an annotations sequence greater than 255 bytes long") + })?; + + Ok(expected_value_length) + } + + /// Reads a four-byte Ion v1.0 version marker. + #[inline(never)] + fn read_ivm(&mut self) -> IonResult { + if let Some(container) = self.parent { + return decoding_error(format!( + "found an Ion version marker inside a {:?}", + container + )); + }; + let (major, minor) = self.tx_buffer.read_ivm()?; + if !matches!((major, minor), (1, 0)) { + return decoding_error(format!("unsupported Ion version {:X}.{:X}", major, minor)); + } + *self.state = ReaderState::OnIvm; + Ok(RawStreamItem::VersionMarker(major, minor)) + } + + /// Returns `true` if the reader is currently positioned inside a struct. Otherwise, returns false. + fn is_in_struct(&self) -> bool { + self.parent + .map(|p| p.kind == ContainerType::Struct) + .unwrap_or(false) + } + + /// Returns `true` if the reader is inside a container and has consumed enough bytes to have + /// reached the end. + fn is_at_end_of_container(&self) -> bool { + if let Some(parent) = self.parent { + let position = self.tx_buffer.total_consumed(); + if position >= parent.exclusive_end { + return true; + } + } + false + } + + /// Returns `true` if, at this point in the read transaction, the reader is: + /// * At the top level + /// * Not inside an annotations wrapper (where a value would be expected) + /// * Out of tx_buffer bytes + fn is_eof(&self) -> bool { + // We're at the top level + self.parent == None + && self.encoded_value.annotations_header_length == 0 + && self.tx_buffer.is_empty() + } +} + +#[cfg(test)] +mod tests { + use crate::binary::non_blocking::raw_binary_reader::RawBinaryBufferReader; + use crate::text::text_value::IntoAnnotations; + use crate::{IonError, IonResult}; + use std::fmt::Debug; + + use super::*; + + fn expect_incomplete(result: IonResult) { + if let Err(IonError::Incomplete { .. }) = result { + // do nothing + } else { + panic!("expected incomplete, found: {:?}", result) + } + } + + fn expect_eof(result: IonResult) { + if let Ok(RawStreamItem::Nothing) = result { + // do nothing + } else { + panic!("expected RawStreamItem::Nothing, found: {:?}", result) + } + } + + fn expect_value(result: IonResult, ion_type: IonType) { + if let Ok(RawStreamItem::Value(result_ion_type)) = result { + assert_eq!(result_ion_type, ion_type); + } else { + panic!("expected a value, but got: {:?}", result); + } + } + + fn expect_annotations, I: IntoAnnotations>( + reader: &RawBinaryBufferReader, + annotations: I, + ) { + let expected = annotations.into_annotations(); + let actual = reader + .annotations_iter() + .collect::>>() + .unwrap(); + assert_eq!(actual, expected); + } + + #[test] + fn read_complete_ivm() -> IonResult<()> { + let data = &[0xE0, 1, 0, 0xEA]; + let mut reader = RawBinaryBufferReader::new(data); + assert_eq!(RawStreamItem::VersionMarker(1, 0), reader.next()?); + Ok(()) + } + + #[test] + fn read_incomplete_ivm() -> IonResult<()> { + let data = vec![0xE0]; + let mut reader = RawBinaryBufferReader::new(data); + // The buffer doesn't contain an entire item + expect_incomplete(reader.next()); + // We can call .next() again safely any number of times; the result will be the same + // as the underlying buffer data hasn't changed. + expect_incomplete(reader.next()); + expect_incomplete(reader.next()); + // We can append data as it becomes available even if it doesn't produce a complete item. + reader.append_bytes(&[1, 0]); + expect_incomplete(reader.next()); + // Finally, when we have enough data to produce an item, a call to next() works as expected. + reader.append_bytes(&[0xEA]); + assert_eq!(RawStreamItem::VersionMarker(1, 0), reader.next().unwrap()); + Ok(()) + } + + #[test] + fn read_int_header() -> IonResult<()> { + let data = vec![0x21, 0x03]; + let mut reader = RawBinaryBufferReader::new(data); + expect_value(reader.next(), IonType::Integer); + expect_eof(reader.next()); + Ok(()) + } + + #[test] + fn read_incomplete_int() -> IonResult<()> { + let data = vec![0x21]; + let mut reader = RawBinaryBufferReader::new(data); + // We can read the *header* of the int just fine + expect_value(reader.next(), IonType::Integer); + // Trying to advance beyond it is a problem. + expect_incomplete(reader.next()); + // This byte completes the int, but we still don't have another value to move to. + reader.append_bytes(&[0x03]); + expect_eof(reader.next()); + // Now there's an empty string after the int + reader.append_bytes(&[0x80]); + expect_value(reader.next(), IonType::String); + Ok(()) + } + + #[test] + fn read_many_ints() -> IonResult<()> { + let data = vec![ + 0x21, 0x01, // 1 + 0x21, 0x02, // 2 + 0x21, 0x03, // 3 + ]; + let mut reader = RawBinaryBufferReader::new(data); + expect_value(reader.next(), IonType::Integer); + assert_eq!(reader.read_integer()?, Integer::I64(1)); + expect_value(reader.next(), IonType::Integer); + assert_eq!(reader.read_integer()?, Integer::I64(2)); + expect_value(reader.next(), IonType::Integer); + assert_eq!(reader.read_integer()?, Integer::I64(3)); + // Nothing else in the buffer + expect_eof(reader.next()); + Ok(()) + } + + #[test] + fn read_many_floats() -> IonResult<()> { + let data = vec![ + 0x48, 0x40, 0x16, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // 5.5e0 + 0x48, 0x40, 0x92, 0xc0, 0x00, 0x00, 0x00, 0x00, 0x00, // 1.2e3 + 0x48, 0xc0, 0x20, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, // -8.125e0 + ]; + let mut reader = RawBinaryBufferReader::new(data); + expect_value(reader.next(), IonType::Float); + assert_eq!(reader.read_f64()?, 5.5f64); + expect_value(reader.next(), IonType::Float); + assert_eq!(reader.read_f64()?, 1200f64); + expect_value(reader.next(), IonType::Float); + assert_eq!(reader.read_f64()?, -8.125f64); + // Nothing else in the buffer + expect_eof(reader.next()); + Ok(()) + } + + #[test] + fn read_many_decimals() -> IonResult<()> { + let data = vec![ + 0x50, // 0. + 0x52, 0xc1, 0x33, // 5.1 + 0x52, 0x80, 0xe4, // -100. + 0x52, 0x80, 0x1c, // 28. + ]; + let mut reader = RawBinaryBufferReader::new(data); + expect_value(reader.next(), IonType::Decimal); + assert_eq!(reader.read_decimal()?, Decimal::new(0, 0)); + expect_value(reader.next(), IonType::Decimal); + assert_eq!(reader.read_decimal()?, Decimal::new(51, -1)); + expect_value(reader.next(), IonType::Decimal); + assert_eq!(reader.read_decimal()?, Decimal::new(-1, 2)); + expect_value(reader.next(), IonType::Decimal); + assert_eq!(reader.read_decimal()?, Decimal::new(28, 0)); + // Nothing else in the buffer + expect_eof(reader.next()); + Ok(()) + } + + #[test] + fn read_many_timestamps() -> IonResult<()> { + let data = vec![ + 0x63, 0xc0, 0x0f, 0xe6, // 2022T + 0x64, 0xc0, 0x0f, 0xe6, 0x86, // 2022-06T + 0x65, 0xc0, 0x0f, 0xe6, 0x86, 0x92, // 2022-06-18 + 0x67, 0xc0, 0x0f, 0xe6, 0x86, 0x92, 0x8b, 0x9e, // 2022-06-18T11:30+00:00 + 0x6b, 0x42, 0xac, 0x0f, 0xe6, 0x86, 0x92, 0x90, // 2022-06-18T11:30:51.115-05:00 + 0x9e, 0xb3, 0xc3, 0x73, 0x6a, 0x80, 0x0f, 0xe6, 0x86, 0x89, 0x97, + 0x80, // 2022-06-09T23:00:59.045+00:00 + 0xbb, 0xc3, 0x2d, 0x69, 0x80, 0x0f, 0xe6, 0x86, 0x89, 0x96, + 0xbb, // 2022-06-09T22:59:59.000+00:00 + 0xbb, 0xc3, + ]; + let mut reader = RawBinaryBufferReader::new(data); + expect_value(reader.next(), IonType::Timestamp); + assert_eq!( + reader.read_timestamp()?, + Timestamp::with_year(2022).build()? + ); + expect_value(reader.next(), IonType::Timestamp); + assert_eq!( + reader.read_timestamp()?, + Timestamp::with_year(2022).with_month(6).build()? + ); + expect_value(reader.next(), IonType::Timestamp); + assert_eq!( + reader.read_timestamp()?, + Timestamp::with_ymd(2022, 6, 18).build()? + ); + expect_value(reader.next(), IonType::Timestamp); + assert_eq!( + reader.read_timestamp()?, + Timestamp::with_ymd(2022, 6, 18) + .with_hour_and_minute(11, 30) + .build_at_offset(0)? + ); + expect_value(reader.next(), IonType::Timestamp); + assert_eq!( + reader.read_timestamp()?, + Timestamp::with_ymd(2022, 6, 18) + .with_hms(11, 30, 51) + .with_milliseconds(115) + .build_at_offset(-5 * 60)? + ); + // 2022-06-09T23:00:59.045+00:00 + expect_value(reader.next(), IonType::Timestamp); + assert_eq!( + reader.read_timestamp()?, + Timestamp::with_ymd(2022, 6, 9) + .with_hms(23, 0, 59) + .with_milliseconds(45) + .build_at_offset(0)? + ); + // 2022-06-09T22:59:59.000+00:00 + expect_value(reader.next(), IonType::Timestamp); + assert_eq!( + reader.read_timestamp()?, + Timestamp::with_ymd(2022, 6, 9) + .with_hms(22, 59, 59) + .with_milliseconds(0) + .build_at_offset(0)? + ); + // Nothing else in the buffer + expect_eof(reader.next()); + Ok(()) + } + + #[test] + fn read_many_symbols() -> IonResult<()> { + let data = vec![ + 0x70, // $0 + 0x71, 0x01, // $1 + 0x71, 0x02, // $2 + 0x72, 0x00, 0x03, // inefficiently encoded $3 + ]; + let mut reader = RawBinaryBufferReader::new(data); + expect_value(reader.next(), IonType::Symbol); + assert_eq!(reader.read_symbol_id()?, 0); + expect_value(reader.next(), IonType::Symbol); + assert_eq!(reader.read_symbol_id()?, 1); + expect_value(reader.next(), IonType::Symbol); + assert_eq!(reader.read_symbol_id()?, 2); + expect_value(reader.next(), IonType::Symbol); + assert_eq!(reader.read_symbol_id()?, 3); + // Nothing else in the buffer + expect_eof(reader.next()); + Ok(()) + } + + #[test] + fn read_many_strings() -> IonResult<()> { + let data = vec![ + 0x80, // "" + 0x83, 0x66, 0x6f, 0x6f, // "foo" + 0x83, 0x62, 0x61, 0x72, // "bar" + 0x83, 0x62, 0x61, 0x7a, // "baz" + ]; + let mut reader = RawBinaryBufferReader::new(data); + expect_value(reader.next(), IonType::String); + assert_eq!(reader.read_str()?, ""); + expect_value(reader.next(), IonType::String); + assert_eq!(reader.read_str()?, "foo"); + expect_value(reader.next(), IonType::String); + assert_eq!(reader.read_str()?, "bar"); + expect_value(reader.next(), IonType::String); + assert_eq!(reader.read_str()?, "baz"); + // Nothing else in the buffer + expect_eof(reader.next()); + Ok(()) + } + + #[test] + fn read_many_clobs() -> IonResult<()> { + let data = vec![ + 0x90, // empty + 0x93, 0x66, 0x6f, 0x6f, // b"foo" + 0x93, 0x62, 0x61, 0x72, // b"bar" + 0x93, 0x62, 0x61, 0x7a, // b"baz" + ]; + let mut reader = RawBinaryBufferReader::new(data); + expect_value(reader.next(), IonType::Clob); + assert_eq!(reader.read_clob_bytes()?, b""); + expect_value(reader.next(), IonType::Clob); + assert_eq!(reader.read_clob_bytes()?, b"foo"); + expect_value(reader.next(), IonType::Clob); + assert_eq!(reader.read_clob_bytes()?, b"bar"); + expect_value(reader.next(), IonType::Clob); + assert_eq!(reader.read_clob_bytes()?, b"baz"); + // Nothing else in the buffer + expect_eof(reader.next()); + Ok(()) + } + + #[test] + fn read_many_blobs() -> IonResult<()> { + let data = vec![ + 0xA0, // empty + 0xA3, 0x66, 0x6f, 0x6f, // b"foo" + 0xA3, 0x62, 0x61, 0x72, // b"bar" + 0xA3, 0x62, 0x61, 0x7a, // b"baz" + ]; + let mut reader = RawBinaryBufferReader::new(data); + expect_value(reader.next(), IonType::Blob); + assert_eq!(reader.read_blob_bytes()?, b""); + expect_value(reader.next(), IonType::Blob); + assert_eq!(reader.read_blob_bytes()?, b"foo"); + expect_value(reader.next(), IonType::Blob); + assert_eq!(reader.read_blob_bytes()?, b"bar"); + expect_value(reader.next(), IonType::Blob); + assert_eq!(reader.read_blob_bytes()?, b"baz"); + // Nothing else in the buffer + expect_eof(reader.next()); + Ok(()) + } + + #[test] + fn read_many_annotated_ints() -> IonResult<()> { + let data = vec![ + 0xE4, 0x81, 0x84, 0x21, 0x01, // $4::1 + 0xE4, 0x81, 0x85, 0x21, 0x02, // $5::2 + 0xE6, 0x83, 0x86, 0x87, 0x88, 0x21, 0x03, // $6::$7::$8::3 + ]; + let mut reader = RawBinaryBufferReader::new(data); + + expect_value(reader.next(), IonType::Integer); + expect_annotations(&reader, &[4]); + + expect_value(reader.next(), IonType::Integer); + expect_annotations(&reader, &[5]); + + expect_value(reader.next(), IonType::Integer); + expect_annotations(&reader, &[6, 7, 8]); + // Nothing else in the buffer + expect_eof(reader.next()); + Ok(()) + } + + #[test] + fn step_into_list() -> IonResult<()> { + let data = &[ + 0xb4, // [ + 0x21, 0x01, // 1, + 0x21, 0x02, // 2 ] + 0x80, // "" /*empty string*/ + ]; + + // === Skip over list === + let mut reader = RawBinaryBufferReader::new(data); + expect_value(reader.next(), IonType::List); + expect_value(reader.next(), IonType::String); + // Nothing else in the buffer + expect_eof(reader.next()); + + // === Early step out === + let mut reader = RawBinaryBufferReader::new(data); + expect_value(reader.next(), IonType::List); + reader.step_in()?; + expect_value(reader.next(), IonType::Integer); + reader.step_out()?; // Skips second int in list + expect_value(reader.next(), IonType::String); + // Nothing else in the buffer + expect_eof(reader.next()); + + // === Visit all values === + let mut reader = RawBinaryBufferReader::new(data); + expect_value(reader.next(), IonType::List); + reader.step_in()?; + expect_value(reader.next(), IonType::Integer); + expect_value(reader.next(), IonType::Integer); + reader.step_out()?; + // There's an empty string after the list + expect_value(reader.next(), IonType::String); + // Nothing else in the buffer + expect_eof(reader.next()); + Ok(()) + } + + #[test] + fn step_into_empty_list() -> IonResult<()> { + let data = &[0xB0, 0x80]; // Empty list, empty string + let mut reader = RawBinaryBufferReader::new(data); + expect_value(reader.next(), IonType::List); + reader.step_in()?; + // Empty list, calling next() returns Nothing + assert_eq!(reader.next().unwrap(), RawStreamItem::Nothing); + reader.step_out()?; + expect_value(reader.next(), IonType::String); + expect_eof(reader.next()); + Ok(()) + } + + #[test] + fn step_into_empty_list_with_nop_padding() -> IonResult<()> { + let data = &[0xB3, 0x00, 0x00, 0x00, 0x80]; // Empty list, empty string + let mut reader = RawBinaryBufferReader::new(data); + expect_value(reader.next(), IonType::List); + reader.step_in()?; + // Empty list, calling next() returns Nothing + assert_eq!(reader.next().unwrap(), RawStreamItem::Nothing); + reader.step_out()?; + expect_value(reader.next(), IonType::String); + expect_eof(reader.next()); + Ok(()) + } + + #[test] + fn step_into_empty_struct() -> IonResult<()> { + let data = &[0xD0, 0x80]; // Empty struct, empty string + let mut reader = RawBinaryBufferReader::new(data); + expect_value(reader.next(), IonType::Struct); + reader.step_in()?; + // Empty list, calling next() returns Nothing + assert_eq!(reader.next().unwrap(), RawStreamItem::Nothing); + reader.step_out()?; + expect_value(reader.next(), IonType::String); + expect_eof(reader.next()); + Ok(()) + } + + #[test] + fn step_into_empty_struct_with_nop_padding() -> IonResult<()> { + let data = &[ + 0xD4, 0x80, 0x00, // $0: NOP, + 0x80, 0x00, // $0: NOP, + 0x80, // Empty string + ]; + let mut reader = RawBinaryBufferReader::new(data); + expect_value(reader.next(), IonType::Struct); + reader.step_in()?; + // Empty list, calling next() returns Nothing + assert_eq!(reader.next().unwrap(), RawStreamItem::Nothing); + reader.step_out()?; + expect_value(reader.next(), IonType::String); + expect_eof(reader.next()); + Ok(()) + } + + #[test] + fn null_string() -> IonResult<()> { + let data = &[ + 0xE0, 0x01, 0x00, 0xEA, // IVM + 0x8F, // null.string + ]; + let mut reader = RawBinaryBufferReader::new(data); + let item = reader.next()?; + assert_eq!(item, RawStreamItem::VersionMarker(1, 0)); + let item = reader.next()?; + assert_eq!(item, RawStreamItem::Null(IonType::String)); + let item = reader.next()?; + assert_eq!(item, RawStreamItem::Nothing); + Ok(()) + } + + #[test] + fn nop_before_scalar() -> IonResult<()> { + let data = &[ + 0xE0, 0x01, 0x00, 0xEA, // IVM + 0x00, // 1-byte NOP + 0x01, 0xff, // 2-byte NOP + 0x83, 0x66, 0x6f, 0x6f, // "foo" + ]; // Empty string + let mut reader = RawBinaryBufferReader::new(data); + let item = reader.next()?; + assert_eq!(item, RawStreamItem::VersionMarker(1, 0)); + let item = reader.next()?; + assert_eq!(item, RawStreamItem::Value(IonType::String)); + assert_eq!(reader.read_str()?, "foo"); + let item = reader.next()?; + assert_eq!(item, RawStreamItem::Nothing); + Ok(()) + } + + #[test] + fn debug() -> IonResult<()> { + let data = &[ + 0xE0, 0x01, 0x00, 0xEA, // IVM + 0xc3, 0xd2, 0x84, 0x11, // {'name': true} + ]; // Empty string + let mut reader = RawBinaryBufferReader::new(data); + let item = reader.next()?; + assert_eq!(item, RawStreamItem::VersionMarker(1, 0)); + let item = reader.next()?; + assert_eq!(item, RawStreamItem::Value(IonType::SExpression)); + reader.step_in()?; + expect_value(reader.next(), IonType::Struct); + reader.step_in()?; + expect_value(reader.next(), IonType::Boolean); + assert_eq!(reader.field_name()?, RawSymbolToken::SymbolId(4)); + let item = reader.next()?; + assert_eq!(item, RawStreamItem::Nothing); + Ok(()) + } + + #[test] + fn various_nop_sizes() -> IonResult<()> { + let data = &[ + 0x00, 0x01, 0xff, 0x02, 0xff, 0xff, 0x03, 0xff, 0xff, 0xff, 0x0f, + ]; + let mut reader = RawBinaryBufferReader::new(data); + let item = reader.next()?; + assert_eq!(item, RawStreamItem::Null(IonType::Null)); + Ok(()) + } + + #[test] + fn incomplete_nops() -> IonResult<()> { + let data = vec![0x04, 0xff, 0xff]; + let mut reader = RawBinaryBufferReader::new(data); + expect_incomplete(reader.next()); + // Add another nop byte, but we're still one short + reader.append_bytes(&[0xff]); + expect_incomplete(reader.next()); + // Add another nop byte; the NOP is complete, but there's still no value + reader.append_bytes(&[0xff]); + assert_eq!(reader.next()?, RawStreamItem::Nothing); + reader.append_bytes(&[0x20]); + assert_eq!(reader.next()?, RawStreamItem::Value(IonType::Integer)); + assert_eq!(reader.read_integer()?, Integer::I64(0)); + Ok(()) + } +} diff --git a/src/binary/non_blocking/type_descriptor.rs b/src/binary/non_blocking/type_descriptor.rs new file mode 100644 index 00000000..1ce70b04 --- /dev/null +++ b/src/binary/non_blocking/type_descriptor.rs @@ -0,0 +1,136 @@ +use crate::{ + binary::{constants::v1_0::length_codes, nibbles::nibbles_from_byte, IonTypeCode}, + types::IonType, +}; + +/// Contains all of the information that can be extracted from the one-octet type descriptor +/// found at the beginning of each value, annotations wrapper, IVM, or NOP in a binary Ion stream. +/// For more information, consult the +/// [Typed Value Formats](http://amzn.github.io/ion-docs/docs/binary.html#typed-value-formats) +/// section of the binary Ion spec. +#[derive(Copy, Clone, Debug, PartialEq)] +pub(crate) struct TypeDescriptor { + pub ion_type_code: IonTypeCode, + pub ion_type: Option, + pub length_code: u8, +} + +/// A statically defined array of TypeDescriptor that allows a binary reader to map a given +/// byte (`u8`) to a `TypeDescriptor` without having to perform any masking or bitshift operations. +pub(crate) const ION_1_0_TYPE_DESCRIPTORS: &[TypeDescriptor; 256] = &init_type_descriptor_cache(); + +const DEFAULT_HEADER: TypeDescriptor = TypeDescriptor { + ion_type_code: IonTypeCode::NullOrNop, + ion_type: None, + length_code: 0, +}; + +pub(crate) const fn init_type_descriptor_cache() -> [TypeDescriptor; 256] { + let mut jump_table = [DEFAULT_HEADER; 256]; + let mut index: usize = 0; + while index < 256 { + let byte = index as u8; + jump_table[index] = TypeDescriptor::from_byte(byte); + index += 1; + } + jump_table +} + +impl TypeDescriptor { + /// Attempts to parse the provided byte. If the type code is unrecognized or the + /// type code + length code combination is illegal, an error will be returned. + pub const fn from_byte(byte: u8) -> TypeDescriptor { + let (type_code, length_code) = nibbles_from_byte(byte); + use IonTypeCode::*; + let ion_type_code = match type_code { + 0 => NullOrNop, + 1 => Boolean, + 2 => PositiveInteger, + 3 => NegativeInteger, + 4 => Float, + 5 => Decimal, + 6 => Timestamp, + 7 => Symbol, + 8 => String, + 9 => Clob, + 10 => Blob, + 11 => List, + 12 => SExpression, + 13 => Struct, + 14 => AnnotationOrIvm, + 15 => Reserved, + _ => panic!("type code was larger than a nibble"), + }; + let ion_type = match ion_type_code { + NullOrNop if length_code == length_codes::NULL => Some(IonType::Null), + NullOrNop => None, + Boolean => Some(IonType::Boolean), + PositiveInteger => Some(IonType::Integer), + NegativeInteger => Some(IonType::Integer), + Float => Some(IonType::Float), + Decimal => Some(IonType::Decimal), + Timestamp => Some(IonType::Timestamp), + Symbol => Some(IonType::Symbol), + String => Some(IonType::String), + Clob => Some(IonType::Clob), + Blob => Some(IonType::Blob), + List => Some(IonType::List), + SExpression => Some(IonType::SExpression), + Struct => Some(IonType::Struct), + AnnotationOrIvm => None, + Reserved => None, + }; + TypeDescriptor { + ion_type, + ion_type_code, + length_code, + } + } + + pub fn is_null(&self) -> bool { + self.ion_type.is_some() && self.length_code == length_codes::NULL + } + + pub fn is_nop(&self) -> bool { + self.ion_type_code == IonTypeCode::NullOrNop && self.length_code != length_codes::NULL + } + + pub fn is_ivm_start(&self) -> bool { + self.ion_type_code == IonTypeCode::AnnotationOrIvm && self.length_code == 0 + } + + pub fn is_annotation_wrapper(&self) -> bool { + self.ion_type_code == IonTypeCode::AnnotationOrIvm && self.length_code > 0 + } + + #[inline] + pub fn to_header(self) -> Option
{ + let ion_type = self.ion_type?; + let header = Header { + ion_type, + ion_type_code: self.ion_type_code, + length_code: self.length_code, + }; + Some(header) + } +} + +/// Represents a [TypeDescriptor] that appears before an Ion value (and not a NOP, IVM, +/// or annotations wrapper). +/// +/// Notably, it stores an `IonType` instead of an `Option`, allowing functions that expect +/// a value header to avoid matching/unwrapping. +#[derive(Copy, Clone, Debug, PartialEq)] +pub struct Header { + pub ion_type: IonType, + // The only time the `ion_type_code` is required is to distinguish between positive + // and negative integers. + pub ion_type_code: IonTypeCode, + pub length_code: u8, +} + +impl Header { + pub fn is_null(&self) -> bool { + self.length_code == length_codes::NULL + } +} diff --git a/src/binary/raw_binary_reader.rs b/src/binary/raw_binary_reader.rs index c249fa87..58289af0 100644 --- a/src/binary/raw_binary_reader.rs +++ b/src/binary/raw_binary_reader.rs @@ -347,7 +347,7 @@ impl StreamReader for RawBinaryReader { self.clear_annotations(); let mut expected_annotated_value_length = None; - if header.ion_type_code == IonTypeCode::Annotation { + if header.ion_type_code == IonTypeCode::AnnotationOrIvm { if header.length_code == 0 { // This is actually the first byte in an Ion Version Marker let ivm = self.read_ivm()?; @@ -996,7 +996,7 @@ where } Float => self.read_float_length()?, Struct => self.read_struct_length()?, - Annotation => return decoding_error("found an annotation wrapping an annotation"), + AnnotationOrIvm => return decoding_error("found an annotation wrapping an annotation"), Reserved => return decoding_error("found an Ion Value with a Reserved type code"), }; diff --git a/src/binary/type_code.rs b/src/binary/type_code.rs index ab1b3403..9495b881 100644 --- a/src/binary/type_code.rs +++ b/src/binary/type_code.rs @@ -30,7 +30,7 @@ pub enum IonTypeCode { List, // 11 SExpression, // 12 Struct, // 13 - Annotation, // 14 + AnnotationOrIvm, // 14 Reserved, // 15 } @@ -87,7 +87,7 @@ impl TryFrom for IonTypeCode { 11 => List, 12 => SExpression, 13 => Struct, - 14 => Annotation, + 14 => AnnotationOrIvm, 15 => Reserved, _ => { return decoding_error(format!("{:?} is not a valid header type code.", type_code)); @@ -116,7 +116,7 @@ impl IonTypeCode { List => 11, SExpression => 12, Struct => 13, - Annotation => 14, + AnnotationOrIvm => 14, Reserved => 15, } } diff --git a/src/binary/uint.rs b/src/binary/uint.rs index 668d978e..8a99281e 100644 --- a/src/binary/uint.rs +++ b/src/binary/uint.rs @@ -21,6 +21,13 @@ pub struct DecodedUInt { } impl DecodedUInt { + pub(crate) fn new(value: UInteger, size_in_bytes: usize) -> Self { + DecodedUInt { + size_in_bytes, + value, + } + } + /// Reads a UInt with `length` bytes from the provided data source. pub fn read(data_source: &mut R, length: usize) -> IonResult { if length > MAX_UINT_SIZE_IN_BYTES { diff --git a/src/binary/var_int.rs b/src/binary/var_int.rs index 3982e85b..945c9e73 100644 --- a/src/binary/var_int.rs +++ b/src/binary/var_int.rs @@ -38,6 +38,14 @@ const MAGNITUDE_BITS_IN_FINAL_BYTE: usize = 6; /// [VarUInt and VarInt Fields](amzn.github.io/ion-docs/docs/binary.html#varuint-and-varint-fields) /// section of the binary Ion spec for more details. impl VarInt { + pub(crate) fn new(value: i64, is_negative: bool, size_in_bytes: usize) -> Self { + VarInt { + size_in_bytes, + value, + is_negative, + } + } + /// Reads a VarInt from the provided data source. pub fn read(data_source: &mut R) -> IonResult { // Unlike VarUInt's encoding, the first byte in a VarInt is a special case because diff --git a/src/binary/var_uint.rs b/src/binary/var_uint.rs index 2899e5ca..d51d1a8e 100644 --- a/src/binary/var_uint.rs +++ b/src/binary/var_uint.rs @@ -7,11 +7,9 @@ use std::mem; // These type aliases will simplify the process of changing the data types used to represent each // VarUInt's magnitude and byte length in the future. // See: https://github.com/amzn/ion-rust/issues/7 -pub type VarUIntStorage = usize; -pub type VarUIntSizeStorage = usize; const BITS_PER_ENCODED_BYTE: usize = 7; -const STORAGE_SIZE_IN_BITS: usize = mem::size_of::() * 8; +const STORAGE_SIZE_IN_BITS: usize = mem::size_of::() * 8; const MAX_ENCODED_SIZE_IN_BYTES: usize = STORAGE_SIZE_IN_BITS / BITS_PER_ENCODED_BYTE; const LOWER_7_BITMASK: u8 = 0b0111_1111; @@ -22,17 +20,24 @@ const HIGHEST_BIT_VALUE: u8 = 0b1000_0000; /// section of the binary Ion spec for more details. #[derive(Debug)] pub struct VarUInt { - value: VarUIntStorage, - size_in_bytes: VarUIntSizeStorage, + value: usize, + size_in_bytes: usize, } impl VarUInt { + pub(crate) fn new(value: usize, size_in_bytes: usize) -> Self { + VarUInt { + value, + size_in_bytes, + } + } + /// Reads a VarUInt from the provided data source. pub fn read(data_source: &mut R) -> IonResult { - let mut magnitude: VarUIntStorage = 0; + let mut magnitude: usize = 0; let mut byte_processor = |byte: u8| { - let lower_seven = (LOWER_7_BITMASK & byte) as VarUIntStorage; + let lower_seven = (LOWER_7_BITMASK & byte) as usize; magnitude <<= 7; // Shifts 0 to 0 in the first iteration magnitude |= lower_seven; byte < HIGHEST_BIT_VALUE // If the highest bit is zero, continue reading @@ -104,15 +109,15 @@ impl VarUInt { /// Returns the magnitude of the unsigned integer #[inline(always)] - pub fn value(&self) -> VarUIntStorage { + pub fn value(&self) -> usize { self.value } /// Returns the number of bytes that were read from the data source to construct this /// unsigned integer #[inline(always)] - pub fn size_in_bytes(&self) -> VarUIntSizeStorage { - self.size_in_bytes + pub fn size_in_bytes(&self) -> usize { + self.size_in_bytes as usize } } diff --git a/src/data_source.rs b/src/data_source.rs index a36b459f..6bf08c6a 100644 --- a/src/data_source.rs +++ b/src/data_source.rs @@ -1,4 +1,6 @@ -use std::io::BufRead; +use std::fs::File; +use std::io; +use std::io::{BufRead, BufReader, Read, StdinLock}; use crate::result::{decoding_error, IonError, IonResult}; @@ -302,3 +304,76 @@ mod tests { assert!(matches!(result, Err(IonError::DecodingError { .. }))); } } + +/// Types that implement this trait can be converted into an implementation of [io::BufRead], +/// allowing users to build a [Reader] from a variety of types that might not define I/O operations +/// on their own. +pub trait ToIonDataSource { + type DataSource: BufRead; + fn to_ion_data_source(self) -> Self::DataSource; +} + +impl ToIonDataSource for String { + type DataSource = io::Cursor; + + fn to_ion_data_source(self) -> Self::DataSource { + io::Cursor::new(self) + } +} + +impl<'a> ToIonDataSource for &'a str { + type DataSource = io::Cursor; + + fn to_ion_data_source(self) -> Self::DataSource { + io::Cursor::new(self) + } +} + +impl<'a> ToIonDataSource for &'a [u8] { + type DataSource = io::Cursor; + + fn to_ion_data_source(self) -> Self::DataSource { + io::Cursor::new(self) + } +} + +impl<'a> ToIonDataSource for Vec { + type DataSource = io::Cursor; + + fn to_ion_data_source(self) -> Self::DataSource { + io::Cursor::new(self) + } +} + +impl ToIonDataSource for io::Chain { + type DataSource = Self; + + fn to_ion_data_source(self) -> Self::DataSource { + self + } +} + +impl ToIonDataSource for BufReader { + type DataSource = Self; + + fn to_ion_data_source(self) -> Self::DataSource { + self + } +} + +impl ToIonDataSource for File { + type DataSource = BufReader; + + fn to_ion_data_source(self) -> Self::DataSource { + BufReader::new(self) + } +} + +// Allows Readers to consume Ion directly from STDIN +impl<'a> ToIonDataSource for StdinLock<'a> { + type DataSource = Self; + + fn to_ion_data_source(self) -> Self::DataSource { + self + } +} diff --git a/src/reader.rs b/src/reader.rs index 4f90722b..86b9fae2 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -6,13 +6,13 @@ use delegate::delegate; use crate::binary::constants::v1_0::IVM; use crate::constants::v1_0::system_symbol_ids; +use crate::data_source::ToIonDataSource; use crate::raw_reader::{RawReader, RawStreamItem}; use crate::raw_symbol_token::RawSymbolToken; use crate::result::{decoding_error, decoding_error_raw, IonResult}; use crate::stream_reader::StreamReader; use crate::symbol::Symbol; use crate::symbol_table::SymbolTable; -use crate::text::ion_data_source::ToIonDataSource; use crate::types::decimal::Decimal; use crate::types::integer::Integer; use crate::types::timestamp::Timestamp; @@ -121,6 +121,15 @@ pub struct UserReader { symbol_table: SymbolTable, } +impl UserReader { + pub(crate) fn new(raw_reader: R) -> UserReader { + UserReader { + raw_reader, + symbol_table: SymbolTable::new(), + } + } +} + /// Stream components that an application-level [Reader] implementation may encounter. #[derive(Eq, PartialEq, Debug)] pub enum StreamItem { diff --git a/src/result.rs b/src/result.rs index a01bf49e..a595670b 100644 --- a/src/result.rs +++ b/src/result.rs @@ -1,5 +1,5 @@ use std::convert::From; -use std::fmt::{Debug, Display, Formatter}; +use std::fmt::{Debug, Display, Error, Formatter}; use std::{fmt, io}; use thiserror::Error; @@ -29,9 +29,6 @@ impl std::error::Error for ErrorStub {} /// Represents the different types of high-level failures that might occur when reading Ion data. #[derive(Debug, Error)] pub enum IonError { - // TODO: Add an `IncompleteData` error variant that provides position information, - // what was being read, the number of bytes needed, etc. - // See: https://github.com/amzn/ion-rust/issues/299 /// Indicates that an IO error was encountered while reading or writing. #[error("{source:?}")] IoError { @@ -39,11 +36,15 @@ pub enum IonError { source: io::Error, }, - #[error("{source:?}")] - FmtError { - #[from] - source: fmt::Error, - }, + /// Indicates that the input buffer did not contain enough data to perform the requested read + /// operation. If the input source contains more data, the reader can append it to the buffer + /// and try again. + #[error("ran out of input while reading {label} at offset {offset}")] + Incomplete { label: &'static str, offset: usize }, + + /// Indicates that the writer encountered a problem while serializing a given piece of data. + #[error("{description}")] + EncodingError { description: String }, /// Indicates that the data stream being read contained illegal or otherwise unreadable data. #[error("{description}")] @@ -64,6 +65,14 @@ pub enum IonError { }, } +impl From for IonError { + fn from(error: Error) -> Self { + IonError::EncodingError { + description: error.to_string(), + } + } +} + // io::Error does not implement Clone, which precludes us from simply deriving an implementation. // IonError needs a Clone implementation because we use a jump table of cached IonResult values when // parsing type descriptor bytes. The only error type that will be cloned by virtue of using the jump @@ -76,7 +85,13 @@ impl Clone for IonError { // io::Error implements From, and ErrorKind is cloneable. source: io::Error::from(source.kind()), }, - FmtError { source } => FmtError { source: *source }, + Incomplete { label, offset } => Incomplete { + label, + offset: *offset, + }, + EncodingError { description } => EncodingError { + description: description.clone(), + }, DecodingError { description } => DecodingError { description: description.clone(), }, @@ -97,7 +112,17 @@ impl PartialEq for IonError { match (self, other) { // We can compare the io::Errors' ErrorKinds, offering a weak definition of equality. (IoError { source: s1 }, IoError { source: s2 }) => s1.kind() == s2.kind(), - (FmtError { source: s1 }, FmtError { source: s2 }) => s1 == s2, + ( + Incomplete { + label: l1, + offset: o1, + }, + Incomplete { + label: l2, + offset: o2, + }, + ) => l1 == l2 && o1 == o2, + (EncodingError { description: s1 }, EncodingError { description: s2 }) => s1 == s2, (DecodingError { description: s1 }, DecodingError { description: s2 }) => s1 == s2, (IllegalOperation { operation: s1 }, IllegalOperation { operation: s2 }) => s1 == s2, (IonCError { source: s1 }, IonCError { source: s2 }) => s1 == s2, @@ -106,6 +131,14 @@ impl PartialEq for IonError { } } +pub fn incomplete_data_error(label: &'static str, offset: usize) -> IonResult { + Err(incomplete_data_error_raw(label, offset)) +} + +pub fn incomplete_data_error_raw(label: &'static str, offset: usize) -> IonError { + IonError::Incomplete { label, offset } +} + /// A convenience method for creating an IonResult containing an IonError::DecodingError with the /// provided description text. pub fn decoding_error>(description: S) -> IonResult { @@ -114,6 +147,7 @@ pub fn decoding_error>(description: S) -> IonResult { /// A convenience method for creating an IonError::DecodingError with the provided operation /// text. Useful for calling Option#ok_or_else. +#[inline(never)] pub fn decoding_error_raw>(description: S) -> IonError { IonError::DecodingError { description: description.as_ref().to_string(), @@ -128,6 +162,7 @@ pub fn illegal_operation>(operation: S) -> IonResult { /// A convenience method for creating an IonError::IllegalOperation with the provided operation /// text. Useful for calling Option#ok_or_else. +#[inline(never)] pub fn illegal_operation_raw>(operation: S) -> IonError { IonError::IllegalOperation { operation: operation.as_ref().to_string(), diff --git a/src/text/ion_data_source.rs b/src/text/ion_data_source.rs deleted file mode 100644 index 3c5cf81e..00000000 --- a/src/text/ion_data_source.rs +++ /dev/null @@ -1,67 +0,0 @@ -use std::fs::File; -use std::io; -use std::io::{BufRead, BufReader, Read}; - -/// Types that implement this trait can be converted into an implementation of [io::BufRead], -/// allowing users to build a [Reader] from a variety of types that might not define I/O operations -/// on their own. -pub trait ToIonDataSource { - type DataSource: BufRead; - fn to_ion_data_source(self) -> Self::DataSource; -} - -impl ToIonDataSource for String { - type DataSource = io::Cursor; - - fn to_ion_data_source(self) -> Self::DataSource { - io::Cursor::new(self) - } -} - -impl<'a> ToIonDataSource for &'a str { - type DataSource = io::Cursor; - - fn to_ion_data_source(self) -> Self::DataSource { - io::Cursor::new(self) - } -} - -impl<'a> ToIonDataSource for &'a [u8] { - type DataSource = io::Cursor; - - fn to_ion_data_source(self) -> Self::DataSource { - io::Cursor::new(self) - } -} - -impl<'a> ToIonDataSource for Vec { - type DataSource = io::Cursor; - - fn to_ion_data_source(self) -> Self::DataSource { - io::Cursor::new(self) - } -} - -impl ToIonDataSource for io::Chain { - type DataSource = Self; - - fn to_ion_data_source(self) -> Self::DataSource { - self - } -} - -impl ToIonDataSource for BufReader { - type DataSource = Self; - - fn to_ion_data_source(self) -> Self::DataSource { - self - } -} - -impl ToIonDataSource for File { - type DataSource = BufReader; - - fn to_ion_data_source(self) -> Self::DataSource { - BufReader::new(self) - } -} diff --git a/src/text/mod.rs b/src/text/mod.rs index 475c515a..bdfc5753 100644 --- a/src/text/mod.rs +++ b/src/text/mod.rs @@ -1,4 +1,3 @@ -pub mod ion_data_source; mod parent_container; pub(crate) mod parse_result; pub(in crate::text) mod parsers; @@ -6,5 +5,5 @@ pub mod raw_text_reader; pub mod raw_text_writer; mod text_buffer; pub mod text_formatter; -mod text_value; +pub(crate) mod text_value; pub(crate) mod text_writer; diff --git a/src/text/raw_text_reader.rs b/src/text/raw_text_reader.rs index 3852a836..019e0569 100644 --- a/src/text/raw_text_reader.rs +++ b/src/text/raw_text_reader.rs @@ -2,13 +2,13 @@ use std::fmt::Display; use nom::Err::{Error, Failure, Incomplete}; +use crate::data_source::ToIonDataSource; use crate::raw_reader::RawStreamItem; use crate::raw_symbol_token::RawSymbolToken; use crate::result::{ decoding_error, illegal_operation, illegal_operation_raw, IonError, IonResult, }; use crate::stream_reader::StreamReader; -use crate::text::ion_data_source::ToIonDataSource; use crate::text::parent_container::ParentContainer; use crate::text::parse_result::IonParseResult; use crate::text::parsers::containers::{ diff --git a/src/text/text_buffer.rs b/src/text/text_buffer.rs index c4495858..c69b8b51 100644 --- a/src/text/text_buffer.rs +++ b/src/text/text_buffer.rs @@ -117,9 +117,12 @@ impl TextBuffer { } let remaining_bytes = self.remaining_text().len(); unsafe { - // The `copy_within()` method below is unsafe. - // https://doc.rust-lang.org/std/primitive.slice.html#method.copy_within - // The invariants enforced by the `consume` method guarantee that this will behave safely. + // The `as_bytes_mut()` method below is unsafe. + // https://doc.rust-lang.org/std/primitive.str.html#method.as_bytes_mut + // A [str] is a byte array that is guaranteed to contain valid utf-8. Getting a mutable + // reference to the contents of the array means that a user could modify it in such a + // way that it was no longer valid utf-8. In this case, the invariants enforced by the + // `consume` method guarantee that this will behave safely. // Copy the remaining text from the end of the String to the beginning of the String. self.line.as_bytes_mut().copy_within(self.line_offset.., 0); // Now that the remaining bytes are back at the beginning of the buffer, there's diff --git a/src/types/integer.rs b/src/types/integer.rs index e0288803..d7a812ac 100644 --- a/src/types/integer.rs +++ b/src/types/integer.rs @@ -128,14 +128,24 @@ impl From for Integer { } else { // The u64 was slightly too big to be represented as an i64; it required the // 64th bit to store the magnitude. Up-convert it to a BigInt. - Integer::BigInt(BigInt::from(uint)) + big_integer_from_u64(uint) } } - UInteger::BigUInt(big_uint) => Integer::BigInt(BigInt::from(big_uint)), + UInteger::BigUInt(big_uint) => big_integer_from_big_uint(big_uint), } } } +#[inline(never)] +fn big_integer_from_u64(value: u64) -> Integer { + Integer::BigInt(BigInt::from(value)) +} + +#[inline(never)] +fn big_integer_from_big_uint(value: BigUint) -> Integer { + Integer::BigInt(BigInt::from(value)) +} + impl TryFrom<&UInteger> for i64 { type Error = IonError; diff --git a/src/value/native_reader.rs b/src/value/native_reader.rs index 7397be19..6afe8fda 100644 --- a/src/value/native_reader.rs +++ b/src/value/native_reader.rs @@ -1,3 +1,5 @@ +use crate::binary::constants::v1_0::IVM; +use crate::binary::non_blocking::raw_binary_reader::RawBinaryBufferReader; use crate::raw_reader::RawReader; use crate::reader::ReaderBuilder; use crate::result::IonResult; @@ -6,9 +8,6 @@ use crate::value::owned::{OwnedElement, OwnedSequence, OwnedStruct, OwnedValue}; use crate::value::reader::ElementReader; use crate::{IonType, StreamItem, StreamReader, UserReader}; -/// Provides an implementation of [ElementReader] that is backed by a native Rust [Reader]. -pub struct NativeElementReader; - struct NativeElementIterator { reader: UserReader, } @@ -21,17 +20,6 @@ impl Iterator for NativeElementIterator { } } -impl ElementReader for NativeElementReader { - fn iterate_over<'a, 'b>( - &'a self, - data: &'b [u8], - ) -> IonResult> + 'b>> { - let reader = ReaderBuilder::new().build(data)?; - let iterator = NativeElementIterator { reader }; - Ok(Box::new(iterator)) - } -} - impl NativeElementIterator { /// Advances the reader to the next value in the stream and uses [Self::materialize_current] /// to materialize it. @@ -122,3 +110,40 @@ impl NativeElementIterator { Ok(OwnedStruct::from_iter(child_elements.into_iter())) } } + +/// Provides an implementation of [ElementReader] that is backed by a native Rust [Reader]. +pub struct NativeElementReader; + +impl ElementReader for NativeElementReader { + fn iterate_over<'a, 'b>( + &'a self, + data: &'b [u8], + ) -> IonResult> + 'b>> { + let reader = ReaderBuilder::new().build(data)?; + let iterator = NativeElementIterator { reader }; + Ok(Box::new(iterator)) + } +} + +pub struct NonBlockingNativeElementReader; + +impl ElementReader for NonBlockingNativeElementReader { + fn iterate_over<'a, 'b>( + &'a self, + data: &'b [u8], + ) -> IonResult> + 'b>> { + // If the data is binary, create a non-blocking binary reader. + if data.starts_with(&IVM) { + let raw_reader = RawBinaryBufferReader::new(data); + let reader = UserReader::new(raw_reader); + let iterator = NativeElementIterator { reader }; + return Ok(Box::new(iterator)); + } + + // TODO: There is not yet a non-blocking text reader. When one is available, this code + // path should be modified to use it. + let reader = ReaderBuilder::new().build(data)?; + let iterator = NativeElementIterator { reader }; + Ok(Box::new(iterator)) + } +} diff --git a/tests/element_test_vectors.rs b/tests/element_test_vectors.rs index 944a9219..4fbf9130 100644 --- a/tests/element_test_vectors.rs +++ b/tests/element_test_vectors.rs @@ -625,7 +625,6 @@ mod ion_c_element_api_tests { mod native_element_tests { use super::*; use ion_rs::value::native_reader::NativeElementReader; - use ion_rs::value::reader::native_element_reader; struct NativeElementApi; @@ -703,7 +702,7 @@ mod native_element_tests { } fn make_reader() -> Self::ReaderApi { - native_element_reader() + NativeElementReader } } @@ -763,3 +762,144 @@ mod native_element_tests { non_equivs(NativeElementApi, file_name) } } + +mod non_blocking_native_element_tests { + use super::*; + use ion_rs::value::native_reader::NonBlockingNativeElementReader; + + struct NonBlockingNativeElementApi; + + impl ElementApi for NonBlockingNativeElementApi { + type ReaderApi = NonBlockingNativeElementReader; + type RoundTripper = NativeElementWriterApi; + + fn global_skip_list() -> SkipList { + &[ + // The binary reader does not check whether nested values are longer than their + // parent container. + "ion-tests/iontestdata/bad/listWithValueLargerThanSize.10n", + // ROUND TRIP + // These tests have shared symbol table imports in them, which the Reader does not + // yet support. + "ion-tests/iontestdata/good/subfieldInt.ion", + "ion-tests/iontestdata/good/subfieldUInt.ion", + "ion-tests/iontestdata/good/subfieldVarInt.ion", + "ion-tests/iontestdata/good/subfieldVarUInt.ion", + "ion-tests/iontestdata/good/subfieldVarUInt15bit.ion", + "ion-tests/iontestdata/good/subfieldVarUInt16bit.ion", + "ion-tests/iontestdata/good/subfieldVarUInt32bit.ion", + // --- + // Requires importing shared symbol tables + "ion-tests/iontestdata/good/item1.10n", + "ion-tests/iontestdata/good/localSymbolTableImportZeroMaxId.ion", + // Requires importing shared symbol tables + "ion-tests/iontestdata/good/testfile35.ion", + // These files are encoded in utf16 and utf32; the reader currently assumes utf8. + "ion-tests/iontestdata/good/utf16.ion", + "ion-tests/iontestdata/good/utf32.ion", + // NON-EQUIVS + "ion-tests/iontestdata/good/non-equivs/localSymbolTableWithAnnotations.ion", + "ion-tests/iontestdata/good/non-equivs/symbolTablesUnknownText.ion", + ] + } + + fn read_one_equivs_skip_list() -> SkipList { + &[] + } + + fn round_trip_skip_list() -> SkipList { + &[ + "ion-tests/iontestdata/good/item1.10n", + "ion-tests/iontestdata/good/localSymbolTableImportZeroMaxId.ion", + "ion-tests/iontestdata/good/notVersionMarkers.ion", + "ion-tests/iontestdata/good/subfieldInt.ion", + "ion-tests/iontestdata/good/subfieldUInt.ion", + "ion-tests/iontestdata/good/subfieldVarInt.ion", + "ion-tests/iontestdata/good/subfieldVarUInt.ion", + "ion-tests/iontestdata/good/subfieldVarUInt15bit.ion", + "ion-tests/iontestdata/good/subfieldVarUInt16bit.ion", + "ion-tests/iontestdata/good/subfieldVarUInt32bit.ion", + "ion-tests/iontestdata/good/utf16.ion", + "ion-tests/iontestdata/good/utf32.ion", + // These tests have symbols with unknown text. While the raw and system readers + // could process these, the user-level `Reader` simply raises an `IonError`. + // This is in keeping with the Ion spec, but causes these tests to fail. + "ion-tests/iontestdata/good/symbolExplicitZero.10n", + "ion-tests/iontestdata/good/symbolImplicitZero.10n", + "ion-tests/iontestdata/good/symbolZero.ion", + ] + } + + fn equivs_skip_list() -> SkipList { + &[ + "ion-tests/iontestdata/good/equivs/localSymbolTableAppend.ion", + "ion-tests/iontestdata/good/equivs/localSymbolTableNullSlots.ion", + "ion-tests/iontestdata/good/equivs/nonIVMNoOps.ion", + ] + } + + fn non_equivs_skip_list() -> SkipList { + &[] + } + + fn make_reader() -> Self::ReaderApi { + NonBlockingNativeElementReader + } + } + + #[test_resources("ion-tests/iontestdata/good/**/*.ion")] + #[test_resources("ion-tests/iontestdata/good/**/*.10n")] + fn native_good_roundtrip_text_binary(file_name: &str) { + good_roundtrip_text_binary(NonBlockingNativeElementApi, file_name) + } + + #[test_resources("ion-tests/iontestdata/good/**/*.ion")] + #[test_resources("ion-tests/iontestdata/good/**/*.10n")] + fn native_good_roundtrip_binary_text(file_name: &str) { + good_roundtrip_binary_text(NonBlockingNativeElementApi, file_name) + } + + #[test_resources("ion-tests/iontestdata/good/**/*.ion")] + #[test_resources("ion-tests/iontestdata/good/**/*.10n")] + fn native_good_roundtrip_text_pretty(file_name: &str) { + good_roundtrip_text_pretty(NonBlockingNativeElementApi, file_name) + } + + #[test_resources("ion-tests/iontestdata/good/**/*.ion")] + #[test_resources("ion-tests/iontestdata/good/**/*.10n")] + fn native_good_roundtrip_pretty_text(file_name: &str) { + good_roundtrip_pretty_text(NonBlockingNativeElementApi, file_name) + } + + #[test_resources("ion-tests/iontestdata/good/**/*.ion")] + #[test_resources("ion-tests/iontestdata/good/**/*.10n")] + fn native_good_roundtrip_pretty_binary(file_name: &str) { + good_roundtrip_pretty_binary(NonBlockingNativeElementApi, file_name) + } + + #[test_resources("ion-tests/iontestdata/good/**/*.ion")] + #[test_resources("ion-tests/iontestdata/good/**/*.10n")] + fn native_good_roundtrip_binary_pretty(file_name: &str) { + good_roundtrip_binary_pretty(NonBlockingNativeElementApi, file_name) + } + + #[test_resources("ion-tests/iontestdata/bad/**/*.ion")] + #[test_resources("ion-tests/iontestdata/bad/**/*.10n")] + fn native_bad(file_name: &str) { + bad(NonBlockingNativeElementApi, file_name) + } + + #[test_resources("ion-tests/iontestdata/good/equivs/**/*.ion")] + #[test_resources("ion-tests/iontestdata/good/equivs/**/*.10n")] + fn native_equivs(file_name: &str) { + equivs(NonBlockingNativeElementApi, file_name) + } + + #[test_resources("ion-tests/iontestdata/good/non-equivs/**/*.ion")] + // no binary files exist and the macro doesn't like empty globs... + // see frehberg/test-generator#12 + //#[test_resources("ion-tests/iontestdata/good/non-equivs/**/*.10n")] + fn native_non_equivs(file_name: &str) { + non_equivs(NonBlockingNativeElementApi, file_name) + } +}