diff --git a/container/src/lib.rs b/container/src/lib.rs index e9aec213a..25f7321d3 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -125,6 +125,12 @@ pub trait ContainerBuilder: Default + 'static { } container.clear(); } + + /// Indicates a good moment to release resources. By default, does nothing. Callers should + /// not rely on this method releasing any internal state though, i.e., the caller first + /// needs to drain the contents using [`Self::finish`]. + #[inline] + fn relax(&mut self) { } } /// A wrapper trait indicating that the container building will preserve the number of records. diff --git a/timely/examples/columnar.rs b/timely/examples/columnar.rs index b59ddcae6..5f658822f 100644 --- a/timely/examples/columnar.rs +++ b/timely/examples/columnar.rs @@ -300,6 +300,7 @@ mod builder { } impl Default for ColumnBuilder { + #[inline] fn default() -> Self { ColumnBuilder { current: Default::default(), @@ -331,6 +332,11 @@ mod builder { self.empty = self.pending.pop_front(); self.empty.as_mut() } + + #[inline] + fn relax(&mut self) { + *self = Self::default(); + } } impl LengthPreservingContainerBuilder for ColumnBuilder where C::Container: Clone { } diff --git a/timely/src/dataflow/channels/pushers/buffer.rs b/timely/src/dataflow/channels/pushers/buffer.rs index f42453741..6a7badff6 100644 --- a/timely/src/dataflow/channels/pushers/buffer.rs +++ b/timely/src/dataflow/channels/pushers/buffer.rs @@ -81,6 +81,7 @@ impl>> Buffer H: FnMut(&::Item<'a>) -> u64 { pushers: Vec

, - buffers: Vec, + builders: Vec, current: Option, hash_func: H, } @@ -27,20 +27,20 @@ where { /// Allocates a new `Exchange` from a supplied set of pushers and a distribution function. pub fn new(pushers: Vec

, key: H) -> Exchange { - let mut buffers = vec![]; + let mut builders = vec![]; for _ in 0..pushers.len() { - buffers.push(Default::default()); + builders.push(Default::default()); } Exchange { pushers, hash_func: key, - buffers, + builders, current: None, } } #[inline] fn flush(&mut self, index: usize) { - while let Some(container) = self.buffers[index].finish() { + while let Some(container) = self.builders[index].finish() { if let Some(ref time) = self.current { Message::push_at(container, time.clone(), &mut self.pushers[index]); } @@ -79,14 +79,14 @@ where // if the number of pushers is a power of two, use a mask if self.pushers.len().is_power_of_two() { let mask = (self.pushers.len() - 1) as u64; - CB::partition(data, &mut self.buffers, |datum| ((hash_func)(datum) & mask) as usize); + CB::partition(data, &mut self.builders, |datum| ((hash_func)(datum) & mask) as usize); } // as a last resort, use mod (%) else { let num_pushers = self.pushers.len() as u64; - CB::partition(data, &mut self.buffers, |datum| ((hash_func)(datum) % num_pushers) as usize); + CB::partition(data, &mut self.builders, |datum| ((hash_func)(datum) % num_pushers) as usize); } - for (buffer, pusher) in self.buffers.iter_mut().zip(self.pushers.iter_mut()) { + for (buffer, pusher) in self.builders.iter_mut().zip(self.pushers.iter_mut()) { while let Some(container) = buffer.extract() { Message::push_at(container, time.clone(), pusher); } @@ -96,6 +96,7 @@ where // flush for index in 0..self.pushers.len() { self.flush(index); + self.builders[index].relax(); self.pushers[index].push(&mut None); } }