diff --git a/timely/src/dataflow/operators/probe.rs b/timely/src/dataflow/operators/probe.rs index 7c5a8567e..51dd5039d 100644 --- a/timely/src/dataflow/operators/probe.rs +++ b/timely/src/dataflow/operators/probe.rs @@ -4,6 +4,7 @@ use std::rc::Rc; use std::cell::RefCell; use crate::progress::Timestamp; +use crate::order::PartialOrder; use crate::progress::frontier::{AntichainRef, MutableAntichain}; use crate::dataflow::channels::pushers::CounterCore as PushCounter; use crate::dataflow::channels::pushers::buffer::BufferCore as PushBuffer; @@ -94,7 +95,7 @@ impl Probe for StreamCore { let (tee, stream) = builder.new_output(); let mut output = PushBuffer::new(PushCounter::new(tee)); - let shared_frontier = handle.frontier.clone(); + let mut handle = handle.clone(); let mut started = false; let mut vector = Default::default(); @@ -103,8 +104,7 @@ impl Probe for StreamCore { move |progress| { // surface all frontier changes to the shared frontier. - let mut borrow = shared_frontier.borrow_mut(); - borrow.update_iter(progress.frontiers[0].drain()); + handle.update_iter(progress.frontiers[0].drain()); if !started { // discard initial capability. @@ -139,7 +139,10 @@ impl Probe for StreamCore { /// Reports information about progress at the probe. #[derive(Debug)] pub struct Handle { - frontier: Rc>> + /// The overall shared frontier managed by all the handles + frontier: Rc>>, + /// The private frontier containing the changes produced by this handle only + handle_frontier: MutableAntichain, } impl Handle { @@ -150,7 +153,21 @@ impl Handle { /// returns true iff the frontier is empty. #[inline] pub fn done(&self) -> bool { self.frontier.borrow().is_empty() } /// Allocates a new handle. - #[inline] pub fn new() -> Self { Handle { frontier: Rc::new(RefCell::new(MutableAntichain::new())) } } + #[inline] pub fn new() -> Self { + Handle { + frontier: Rc::new(RefCell::new(MutableAntichain::new())), + handle_frontier: MutableAntichain::new() + } + } + + #[inline] + fn update_iter(&mut self, updates: I) + where + T: Clone + PartialOrder + Ord, + I: IntoIterator, + { + self.frontier.borrow_mut().update_iter(self.handle_frontier.update_iter(updates)); + } /// Invokes a method on the frontier, returning its result. /// @@ -171,10 +188,20 @@ impl Handle { } } +impl Drop for Handle { + fn drop(&mut self) { + // This handle is being dropped so remove it from the overall calculation + self.frontier.borrow_mut().update_iter( + self.handle_frontier.frontier().iter().map(|t| (t.clone(), -1)) + ); + } +} + impl Clone for Handle { fn clone(&self) -> Self { Handle { - frontier: self.frontier.clone() + frontier: self.frontier.clone(), + handle_frontier: MutableAntichain::new(), } } }