Skip to content

Commit

Permalink
Move reporting of result out of do_work
Browse files Browse the repository at this point in the history
Summary:
Now with us reporting the result at the outermost place, we can also
report the actual CancellationReason.

Reviewed By: JakobDegen

Differential Revision: D65371045

fbshipit-source-id: 4d77c9a41b63b706b122e349c98d0599e98b6697
  • Loading branch information
cjhopman authored and facebook-github-bot committed Dec 20, 2024
1 parent 5efb586 commit 23f8fbe
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 24 deletions.
4 changes: 2 additions & 2 deletions dice/dice/src/impls/task/dice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ impl DiceTaskInternal {
}
}

pub(super) fn set_value(
pub(crate) fn set_value(
&self,
value: DiceComputedValue,
) -> CancellableResult<DiceComputedValue> {
Expand Down Expand Up @@ -332,7 +332,7 @@ impl DiceTaskInternal {

/// report the task as terminated. This should only be called once. No effect if called affect
/// task is already ready
pub(super) fn report_terminated(&self, reason: CancellationReason) {
pub(crate) fn report_terminated(&self, reason: CancellationReason) {
match self.state.sync() {
TaskState::Continue => {}
TaskState::Finished => {
Expand Down
4 changes: 4 additions & 0 deletions dice/dice/src/impls/task/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ impl<'a> DiceTaskHandle<'a> {
pub(crate) fn finished(&mut self, value: DiceComputedValue) {
self.result = Some(Ok(value));
}

pub(crate) fn cancelled(&mut self, reason: CancellationReason) {
self.result = Some(Err(reason));
}
}

unsafe impl<'a> Send for DiceTaskHandle<'a> {}
Expand Down
23 changes: 17 additions & 6 deletions dice/dice/src/impls/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,23 @@ impl DiceTaskWorker {
None => Either::Right(state.no_previous_task(handle).await),
};

match previous_result {
Either::Left(_) => {
let result = match previous_result {
Either::Left(previous_result) => {
// previous result actually finished
previous_result
}
Either::Right(state) => {
let _ignore = worker.do_work(handle, state_handle, state).await;
Either::Right(state) => worker.do_work(handle, state_handle, state).await,
};

match result {
Ok(DiceWorkerStateFinishedAndCached {
_prevent_cancellation,
value,
}) => {
handle.finished(value);
}
Err(reason) => {
handle.cancelled(reason);
}
}

Expand Down Expand Up @@ -208,7 +219,7 @@ impl DiceTaskWorker {
)
.await;

return response.map(|r| task_state.cached(handle, r, activation_info));
return response.map(|r| task_state.cached(r, activation_info));
}
CheckDependenciesResult::NoDeps => {
// TODO(cjhopman): Why do we treat nodeps as deps not matching? There seems to be some
Expand Down Expand Up @@ -265,7 +276,7 @@ impl DiceTaskWorker {
}
};

res.map(|res| state.cached(handle, res, activation_info))
res.map(|res| state.cached(res, activation_info))
}

async fn compute(
Expand Down
26 changes: 10 additions & 16 deletions dice/dice/src/impls/worker/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,12 @@ impl<'a> DiceWorkerStateAwaitingPrevious<'a> {

pub(crate) fn previously_finished(
self,
internals: &mut DiceTaskHandle,
value: DiceComputedValue,
) -> CancellableResult<DiceWorkerStateFinishedAndCached> {
debug!(msg = "previously cancelled task actually finished");

let guard = self.prevent_cancellation.try_disable_cancellation();

finish_with_cached_value(internals, value, guard)
finish_with_cached_value(value, guard)
}

pub(crate) async fn previously_cancelled(
Expand Down Expand Up @@ -115,7 +113,7 @@ impl<'a> DiceWorkerStateAwaitingPrevious<'a> {
.expect("Terminated task must have finished value")
{
Ok(res) => {
return Either::Left(self.previously_finished(internals, res));
return Either::Left(self.previously_finished(res));
}
Err(_cancelled) => {
// actually was cancelled, so just continue re-evaluating
Expand All @@ -127,19 +125,15 @@ impl<'a> DiceWorkerStateAwaitingPrevious<'a> {
}

fn finish_with_cached_value(
internals: &mut DiceTaskHandle<'_>,
value: DiceComputedValue,
disable_cancellation: Option<DisableCancellationGuard>,
) -> CancellableResult<DiceWorkerStateFinishedAndCached> {
match disable_cancellation {
None => Err(CancellationReason::Cached),
Some(g) => {
internals.finished(value);

Ok(DiceWorkerStateFinishedAndCached {
_prevent_cancellation: g,
})
}
Some(g) => Ok(DiceWorkerStateFinishedAndCached {
value,
_prevent_cancellation: g,
}),
}
}

Expand Down Expand Up @@ -198,7 +192,7 @@ impl DiceWorkerStateLookupNode {
debug!(msg = "found existing entry with matching version in cache. reusing result.");

let guard = internals.cancellation_ctx().try_disable_cancellation();
finish_with_cached_value(internals, value, guard)
finish_with_cached_value(value, guard)
}
}

Expand Down Expand Up @@ -287,7 +281,6 @@ pub(crate) struct DiceWorkerStateFinished {
impl DiceWorkerStateFinished {
pub(crate) fn cached(
self,
internals: &mut DiceTaskHandle,
value: DiceComputedValue,
activation_info: Option<ActivationInfo>,
) -> DiceWorkerStateFinishedAndCached {
Expand All @@ -300,9 +293,9 @@ impl DiceWorkerStateFinished {
activation_info.activation_data,
)
}
internals.finished(value);

DiceWorkerStateFinishedAndCached {
value,
_prevent_cancellation: self._prevent_cancellation,
}
}
Expand Down Expand Up @@ -342,5 +335,6 @@ impl ActivationInfo {
/// When the spawned dice worker is done computing and saving the value to core state cache.
/// The final value is known.
pub(crate) struct DiceWorkerStateFinishedAndCached {
_prevent_cancellation: DisableCancellationGuard,
pub(crate) value: DiceComputedValue,
pub(crate) _prevent_cancellation: DisableCancellationGuard,
}

0 comments on commit 23f8fbe

Please sign in to comment.