From d77af8bbc8d78523d45a5dea2291479feaca6256 Mon Sep 17 00:00:00 2001 From: timvisee Date: Thu, 12 Nov 2020 11:51:23 +0100 Subject: [PATCH 1/3] Add functions to take/release just producer or consumer See https://github.com/jamesmunns/bbqueue/issues/40 See https://github.com/jamesmunns/bbqueue/issues/67 --- core/src/bbbuffer.rs | 248 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 236 insertions(+), 12 deletions(-) diff --git a/core/src/bbbuffer.rs b/core/src/bbbuffer.rs index 78610b5..d54bbb6 100644 --- a/core/src/bbbuffer.rs +++ b/core/src/bbbuffer.rs @@ -12,10 +12,17 @@ use core::{ result::Result as CoreResult, slice::from_raw_parts_mut, sync::atomic::{ - AtomicBool, AtomicUsize, + AtomicBool, AtomicU8, AtomicUsize, Ordering::{AcqRel, Acquire, Release}, }, }; + +/// Bit to define producer taken. +const BIT_PRODUCER: u8 = 1 << 0; + +/// Bit to define consumer taken. +const BIT_CONSUMER: u8 = 1 << 1; + #[derive(Debug)] /// A backing structure for a BBQueue. Can be used to create either /// a BBQueue or a split Producer/Consumer pair @@ -47,8 +54,10 @@ pub struct BBBuffer { /// Is there an active write grant? write_in_progress: AtomicBool, - /// Have we already split? - already_split: AtomicBool, + /// Whether we have split the producer and/or consumer parts off. + /// + /// See the `BIT_PRODUCER` and `BIT_CONSUMER` bits which define what parts have been split off. + split_prod_cons: AtomicU8, } unsafe impl Sync for BBBuffer {} @@ -63,7 +72,7 @@ impl<'a, const N: usize> BBBuffer { /// is placed at `static` scope within the `.bss` region, the explicit initialization /// will be elided (as it is already performed as part of memory initialization) /// - /// NOTE: If the `thumbv6` feature is selected, this function takes a short critical section + /// NOTE: If the `thumbv6` feature is selected, this function takes a short critical section /// while splitting. /// /// ```rust @@ -86,7 +95,12 @@ impl<'a, const N: usize> BBBuffer { /// # } /// ``` pub fn try_split(&'a self) -> Result<(Producer<'a, N>, Consumer<'a, N>)> { - if atomic::swap(&self.already_split, true, AcqRel) { + // Set producer/consumer taken bit, error and reset if one was already set + let prev = self + .split_prod_cons + .fetch_or(BIT_PRODUCER | BIT_CONSUMER, AcqRel); + if prev > 0 { + self.split_prod_cons.store(prev, Release); return Err(Error::AlreadySplit); } @@ -123,14 +137,82 @@ impl<'a, const N: usize> BBBuffer { /// is placed at `static` scope within the `.bss` region, the explicit initialization /// will be elided (as it is already performed as part of memory initialization) /// - /// NOTE: If the `thumbv6` feature is selected, this function takes a short critical + /// NOTE: If the `thumbv6` feature is selected, this function takes a short critical /// section while splitting. pub fn try_split_framed(&'a self) -> Result<(FrameProducer<'a, N>, FrameConsumer<'a, N>)> { let (producer, consumer) = self.try_split()?; Ok((FrameProducer { producer }, FrameConsumer { consumer })) } - /// Attempt to release the Producer and Consumer + /// Attempt to take a `Producer` from the `BBBuffer` to gain access to the + /// buffer. If a producer has already been taken, an error will be returned. + /// + /// NOTE: When splitting, the underlying buffer will be explicitly initialized + /// to zero. This may take a measurable amount of time, depending on the size + /// of the buffer. This is necessary to prevent undefined behavior. If the buffer + /// is placed at `static` scope within the `.bss` region, the explicit initialization + /// will be elided (as it is already performed as part of memory initialization) + /// + /// NOTE: If the `thumbv6` feature is selected, this function takes a short critical section + /// while splitting. + pub fn try_take_producer(&'a self) -> Result> { + // Set producer taken bit, error if already set + if self.split_prod_cons.fetch_or(BIT_PRODUCER, AcqRel) & BIT_PRODUCER > 0 { + return Err(Error::AlreadySplit); + } + + unsafe { + // TODO: do we need to zero buffer here, like try_split? + // // Explicitly zero the data to avoid undefined behavior. + // // This is required, because we hand out references to the buffers, + // // which mean that creating them as references is technically UB for now + // let mu_ptr = self.buf.get(); + // (*mu_ptr).as_mut_ptr().write_bytes(0u8, 1); + + let nn1 = NonNull::new_unchecked(self as *const _ as *mut _); + + Ok(Producer { + bbq: nn1, + pd: PhantomData, + }) + } + } + + /// Attempt to take a `Consumer` from the `BBBuffer` to gain access to the + /// buffer. If a consumer has already been taken, an error will be returned. + /// + /// NOTE: When splitting, the underlying buffer will be explicitly initialized + /// to zero. This may take a measurable amount of time, depending on the size + /// of the buffer. This is necessary to prevent undefined behavior. If the buffer + /// is placed at `static` scope within the `.bss` region, the explicit initialization + /// will be elided (as it is already performed as part of memory initialization) + /// + /// NOTE: If the `thumbv6` feature is selected, this function takes a short critical section + /// while splitting. + pub fn try_take_consumer(&'a self) -> Result> { + // Set producer taken bit, error if already set + if self.split_prod_cons.fetch_or(BIT_CONSUMER, AcqRel) & BIT_CONSUMER > 0 { + return Err(Error::AlreadySplit); + } + + unsafe { + // TODO: do we need to zero buffer here, like try_split? + // // Explicitly zero the data to avoid undefined behavior. + // // This is required, because we hand out references to the buffers, + // // which mean that creating them as references is technically UB for now + // let mu_ptr = self.buf.get(); + // (*mu_ptr).as_mut_ptr().write_bytes(0u8, 1); + + let nn1 = NonNull::new_unchecked(self as *const _ as *mut _); + + Ok(Consumer { + bbq: nn1, + pd: PhantomData, + }) + } + } + + /// Attempt to release the `Producer` and `Consumer` /// /// This re-initializes the buffer so it may be split in a different mode at a later /// time. There must be no read or write grants active, or an error will be returned. @@ -204,7 +286,7 @@ impl<'a, const N: usize> BBBuffer { self.last.store(0, Release); // Mark the buffer as ready to be split - self.already_split.store(false, Release); + self.split_prod_cons.store(0, Release); Ok(()) } @@ -227,6 +309,148 @@ impl<'a, const N: usize> BBBuffer { (FrameProducer { producer }, FrameConsumer { consumer }) }) } + + /// Attempt to release the `Producer`. + /// + /// This re-initializes the buffer so it may be split in a different mode at a later + /// time. There must be no read or write grants active, or an error will be returned. + /// + /// The `Producer` ust be from THIS `BBBuffer`, or an error will be returned. + /// + /// ```rust + /// # // bbqueue test shim! + /// # fn bbqtest() { + /// use bbqueue::BBBuffer; + /// + /// // Create and split a new buffer + /// let buffer: BBBuffer<6> = BBBuffer::new(); + /// let (prod, cons) = buffer.try_split().unwrap(); + /// + /// // Not possible to split twice + /// assert!(buffer.try_split().is_err()); + /// + /// // Release the producer and consumer + /// assert!(buffer.try_release(prod, cons).is_ok()); + /// + /// // Split the buffer in framed mode + /// let (fprod, fcons) = buffer.try_split_framed().unwrap(); + /// # // bbqueue test shim! + /// # } + /// # + /// # fn main() { + /// # #[cfg(not(feature = "thumbv6"))] + /// # bbqtest(); + /// # } + /// ``` + pub fn try_release_producer( + &'a self, + prod: Producer<'a, N>, + ) -> CoreResult<(), Producer<'a, N>> { + // Note: Re-entrancy is not possible because we require ownership + // of the producer, which are not cloneable. We also + // can assume the buffer has been split, because + + // Is this our producer? + let our_prod = prod.bbq.as_ptr() as *const Self == self; + + if !(our_prod) { + // Can't release, not our producer + return Err(prod); + } + + let wr_in_progress = self.write_in_progress.load(Acquire); + let rd_in_progress = self.read_in_progress.load(Acquire); + + if wr_in_progress || rd_in_progress { + // Can't release, active grant(s) in progress + return Err(prod); + } + + // Drop the producer + drop(prod); + + // Re-initialize the buffer (not totally needed, but nice to do) + self.write.store(0, Release); + self.read.store(0, Release); + self.reserve.store(0, Release); + self.last.store(0, Release); + + // Mark the buffer as ready to retake producer + self.split_prod_cons.fetch_and(!BIT_PRODUCER, Release); + + Ok(()) + } + + /// Attempt to release the `Consumer`. + /// + /// This re-initializes the buffer so it may be split in a different mode at a later + /// time. There must be no read or write grants active, or an error will be returned. + /// + /// The `Consumer` must be from THIS `BBBuffer`, or an error will be returned. + /// + /// ```rust + /// # // bbqueue test shim! + /// # fn bbqtest() { + /// use bbqueue::BBBuffer; + /// + /// // Create and split a new buffer + /// let buffer: BBBuffer<6> = BBBuffer::new(); + /// let (prod, cons) = buffer.try_split().unwrap(); + /// + /// // Not possible to split twice + /// assert!(buffer.try_split().is_err()); + /// + /// // Release the producer and consumer + /// assert!(buffer.try_release(prod, cons).is_ok()); + /// + /// // Split the buffer in framed mode + /// let (fprod, fcons) = buffer.try_split_framed().unwrap(); + /// # // bbqueue test shim! + /// # } + /// # + /// # fn main() { + /// # #[cfg(not(feature = "thumbv6"))] + /// # bbqtest(); + /// # } + /// ``` + pub fn try_release_consumer( + &'a self, + cons: Consumer<'a, N>, + ) -> CoreResult<(), Consumer<'a, N>> { + // Note: Re-entrancy is not possible because we require ownership + // of the consumer, which are not cloneable. We also + // can assume the buffer has been split, because + + // Is this our consumer? + let our_cons = cons.bbq.as_ptr() as *const Self == self; + + if !(our_cons) { + // Can't release, not our consumer + return Err(cons); + } + + let wr_in_progress = self.write_in_progress.load(Acquire); + let rd_in_progress = self.read_in_progress.load(Acquire); + + if wr_in_progress || rd_in_progress { + // Can't release, active grant(s) in progress + return Err(cons); + } + + // Drop the consumer + drop(cons); + + // Re-initialize the buffer (not totally needed, but nice to do) + self.write.store(0, Release); + self.read.store(0, Release); + self.reserve.store(0, Release); + self.last.store(0, Release); + + // Mark the buffer as ready to retake consumer + self.split_prod_cons.fetch_and(!BIT_CONSUMER, Release); + + Ok(()) + } } impl BBBuffer { @@ -280,7 +504,7 @@ impl BBBuffer { write_in_progress: AtomicBool::new(false), // We haven't split at the start - already_split: AtomicBool::new(false), + split_prod_cons: AtomicU8::new(0), } } } @@ -744,7 +968,7 @@ impl<'a, const N: usize> GrantW<'a, N> { /// If `used` is larger than the given grant, the maximum amount will /// be commited /// - /// NOTE: If the `thumbv6` feature is selected, this function takes a short critical + /// NOTE: If the `thumbv6` feature is selected, this function takes a short critical /// section while committing. pub fn commit(mut self, used: usize) { self.commit_inner(used); @@ -861,7 +1085,7 @@ impl<'a, const N: usize> GrantR<'a, N> { /// If `used` is larger than the given grant, the full grant will /// be released. /// - /// NOTE: If the `thumbv6` feature is selected, this function takes a short critical + /// NOTE: If the `thumbv6` feature is selected, this function takes a short critical /// section while releasing. pub fn release(mut self, used: usize) { // Saturate the grant release @@ -969,7 +1193,7 @@ impl<'a, const N: usize> SplitGrantR<'a, N> { /// If `used` is larger than the given grant, the full grant will /// be released. /// - /// NOTE: If the `thumbv6` feature is selected, this function takes a short critical + /// NOTE: If the `thumbv6` feature is selected, this function takes a short critical /// section while releasing. pub fn release(mut self, used: usize) { // Saturate the grant release From 324806bdb2a57bc6c4e16b11781c741ec5f23f26 Mon Sep 17 00:00:00 2001 From: timvisee Date: Thu, 12 Nov 2020 12:30:54 +0100 Subject: [PATCH 2/3] Implement atomic fetch_or and fetch_and for thumbv6 --- core/src/bbbuffer.rs | 45 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 36 insertions(+), 9 deletions(-) diff --git a/core/src/bbbuffer.rs b/core/src/bbbuffer.rs index d54bbb6..1fcedfa 100644 --- a/core/src/bbbuffer.rs +++ b/core/src/bbbuffer.rs @@ -96,9 +96,8 @@ impl<'a, const N: usize> BBBuffer { /// ``` pub fn try_split(&'a self) -> Result<(Producer<'a, N>, Consumer<'a, N>)> { // Set producer/consumer taken bit, error and reset if one was already set - let prev = self - .split_prod_cons - .fetch_or(BIT_PRODUCER | BIT_CONSUMER, AcqRel); + let prev = atomic::fetch_or(&self.split_prod_cons, BIT_PRODUCER | BIT_CONSUMER, AcqRel); + if prev > 0 { self.split_prod_cons.store(prev, Release); return Err(Error::AlreadySplit); @@ -157,7 +156,7 @@ impl<'a, const N: usize> BBBuffer { /// while splitting. pub fn try_take_producer(&'a self) -> Result> { // Set producer taken bit, error if already set - if self.split_prod_cons.fetch_or(BIT_PRODUCER, AcqRel) & BIT_PRODUCER > 0 { + if atomic::fetch_or(&self.split_prod_cons, BIT_PRODUCER, AcqRel) & BIT_PRODUCER > 0 { return Err(Error::AlreadySplit); } @@ -191,7 +190,7 @@ impl<'a, const N: usize> BBBuffer { /// while splitting. pub fn try_take_consumer(&'a self) -> Result> { // Set producer taken bit, error if already set - if self.split_prod_cons.fetch_or(BIT_CONSUMER, AcqRel) & BIT_CONSUMER > 0 { + if atomic::fetch_or(&self.split_prod_cons, BIT_CONSUMER, AcqRel) & BIT_CONSUMER > 0 { return Err(Error::AlreadySplit); } @@ -376,7 +375,7 @@ impl<'a, const N: usize> BBBuffer { self.last.store(0, Release); // Mark the buffer as ready to retake producer - self.split_prod_cons.fetch_and(!BIT_PRODUCER, Release); + atomic::fetch_and(&self.split_prod_cons, !BIT_PRODUCER, Release); Ok(()) } @@ -447,7 +446,7 @@ impl<'a, const N: usize> BBBuffer { self.last.store(0, Release); // Mark the buffer as ready to retake consumer - self.split_prod_cons.fetch_and(!BIT_CONSUMER, Release); + atomic::fetch_and(&self.split_prod_cons, !BIT_CONSUMER, Release); Ok(()) } @@ -1329,7 +1328,7 @@ impl<'a, const N: usize> DerefMut for GrantR<'a, N> { #[cfg(feature = "thumbv6")] mod atomic { use core::sync::atomic::{ - AtomicBool, AtomicUsize, + AtomicBool, AtomicU8, AtomicUsize, Ordering::{self, Acquire, Release}, }; use cortex_m::interrupt::free; @@ -1352,6 +1351,24 @@ mod atomic { }) } + #[inline(always)] + pub fn fetch_and(atomic: &AtomicU8, val: u8, _order: Ordering) -> u8 { + free(|_| { + let prev = atomic.load(Acquire); + atomic.store(prev & val, Release); + prev + }) + } + + #[inline(always)] + pub fn fetch_or(atomic: &AtomicU8, val: u8, _order: Ordering) -> u8 { + free(|_| { + let prev = atomic.load(Acquire); + atomic.store(prev | val, Release); + prev + }) + } + #[inline(always)] pub fn swap(atomic: &AtomicBool, val: bool, _order: Ordering) -> bool { free(|_| { @@ -1364,7 +1381,7 @@ mod atomic { #[cfg(not(feature = "thumbv6"))] mod atomic { - use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; + use core::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering}; #[inline(always)] pub fn fetch_add(atomic: &AtomicUsize, val: usize, order: Ordering) -> usize { @@ -1376,6 +1393,16 @@ mod atomic { atomic.fetch_sub(val, order) } + #[inline(always)] + pub fn fetch_and(atomic: &AtomicU8, val: u8, order: Ordering) -> u8 { + atomic.fetch_and(val, order) + } + + #[inline(always)] + pub fn fetch_or(atomic: &AtomicU8, val: u8, order: Ordering) -> u8 { + atomic.fetch_or(val, order) + } + #[inline(always)] pub fn swap(atomic: &AtomicBool, val: bool, order: Ordering) -> bool { atomic.swap(val, order) From 9a0a2cf5193afb04e80459c3af4bb1285c10afcf Mon Sep 17 00:00:00 2001 From: timvisee Date: Thu, 12 Nov 2020 15:57:08 +0100 Subject: [PATCH 3/3] Explicitly zero producer on take, reinitialise when both released See: - https://github.com/jamesmunns/bbqueue/pull/78#issuecomment-726126583 - https://github.com/jamesmunns/bbqueue/pull/78#issuecomment-734766802 --- core/src/bbbuffer.rs | 60 +++++++++++++++++++------------------------- 1 file changed, 26 insertions(+), 34 deletions(-) diff --git a/core/src/bbbuffer.rs b/core/src/bbbuffer.rs index 1fcedfa..b658944 100644 --- a/core/src/bbbuffer.rs +++ b/core/src/bbbuffer.rs @@ -146,7 +146,7 @@ impl<'a, const N: usize> BBBuffer { /// Attempt to take a `Producer` from the `BBBuffer` to gain access to the /// buffer. If a producer has already been taken, an error will be returned. /// - /// NOTE: When splitting, the underlying buffer will be explicitly initialized + /// NOTE: When taking the producer, the underlying buffer will be explicitly initialized /// to zero. This may take a measurable amount of time, depending on the size /// of the buffer. This is necessary to prevent undefined behavior. If the buffer /// is placed at `static` scope within the `.bss` region, the explicit initialization @@ -161,12 +161,11 @@ impl<'a, const N: usize> BBBuffer { } unsafe { - // TODO: do we need to zero buffer here, like try_split? - // // Explicitly zero the data to avoid undefined behavior. - // // This is required, because we hand out references to the buffers, - // // which mean that creating them as references is technically UB for now - // let mu_ptr = self.buf.get(); - // (*mu_ptr).as_mut_ptr().write_bytes(0u8, 1); + // Explicitly zero the data to avoid undefined behavior. + // This is required, because we hand out references to the buffers, + // which mean that creating them as references is technically UB for now + let mu_ptr = self.buf.get(); + (*mu_ptr).as_mut_ptr().write_bytes(0u8, 1); let nn1 = NonNull::new_unchecked(self as *const _ as *mut _); @@ -180,12 +179,6 @@ impl<'a, const N: usize> BBBuffer { /// Attempt to take a `Consumer` from the `BBBuffer` to gain access to the /// buffer. If a consumer has already been taken, an error will be returned. /// - /// NOTE: When splitting, the underlying buffer will be explicitly initialized - /// to zero. This may take a measurable amount of time, depending on the size - /// of the buffer. This is necessary to prevent undefined behavior. If the buffer - /// is placed at `static` scope within the `.bss` region, the explicit initialization - /// will be elided (as it is already performed as part of memory initialization) - /// /// NOTE: If the `thumbv6` feature is selected, this function takes a short critical section /// while splitting. pub fn try_take_consumer(&'a self) -> Result> { @@ -195,13 +188,6 @@ impl<'a, const N: usize> BBBuffer { } unsafe { - // TODO: do we need to zero buffer here, like try_split? - // // Explicitly zero the data to avoid undefined behavior. - // // This is required, because we hand out references to the buffers, - // // which mean that creating them as references is technically UB for now - // let mu_ptr = self.buf.get(); - // (*mu_ptr).as_mut_ptr().write_bytes(0u8, 1); - let nn1 = NonNull::new_unchecked(self as *const _ as *mut _); Ok(Consumer { @@ -311,8 +297,9 @@ impl<'a, const N: usize> BBBuffer { /// Attempt to release the `Producer`. /// - /// This re-initializes the buffer so it may be split in a different mode at a later - /// time. There must be no read or write grants active, or an error will be returned. + /// This re-initializes the buffer if the consumer was already released so it may be + /// split in a different mode at a later time. There must be no read or write grants + /// active, or an error will be returned. /// /// The `Producer` ust be from THIS `BBBuffer`, or an error will be returned. /// @@ -368,11 +355,13 @@ impl<'a, const N: usize> BBBuffer { // Drop the producer drop(prod); - // Re-initialize the buffer (not totally needed, but nice to do) - self.write.store(0, Release); - self.read.store(0, Release); - self.reserve.store(0, Release); - self.last.store(0, Release); + // Re-initialize the buffer if consumer is already released (not totally needed, but nice to do) + if self.split_prod_cons.load(Acquire) & BIT_CONSUMER == 0 { + self.write.store(0, Release); + self.read.store(0, Release); + self.reserve.store(0, Release); + self.last.store(0, Release); + } // Mark the buffer as ready to retake producer atomic::fetch_and(&self.split_prod_cons, !BIT_PRODUCER, Release); @@ -382,8 +371,9 @@ impl<'a, const N: usize> BBBuffer { /// Attempt to release the `Consumer`. /// - /// This re-initializes the buffer so it may be split in a different mode at a later - /// time. There must be no read or write grants active, or an error will be returned. + /// This re-initializes the buffer if the producer was already released so it may be + /// split in a different mode at a later time. There must be no read or write grants + /// active, or an error will be returned. /// /// The `Consumer` must be from THIS `BBBuffer`, or an error will be returned. /// @@ -439,11 +429,13 @@ impl<'a, const N: usize> BBBuffer { // Drop the consumer drop(cons); - // Re-initialize the buffer (not totally needed, but nice to do) - self.write.store(0, Release); - self.read.store(0, Release); - self.reserve.store(0, Release); - self.last.store(0, Release); + // Re-initialize the buffer if producer is already released (not totally needed, but nice to do) + if self.split_prod_cons.load(Acquire) & BIT_PRODUCER == 0 { + self.write.store(0, Release); + self.read.store(0, Release); + self.reserve.store(0, Release); + self.last.store(0, Release); + } // Mark the buffer as ready to retake consumer atomic::fetch_and(&self.split_prod_cons, !BIT_CONSUMER, Release);