Skip to content

Makes .ok() use in forked scopes illegal #146

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 72 additions & 34 deletions core/src/main/scala/ox/either.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,31 @@ import scala.util.boundary.{Label, break}
import scala.util.control.NonFatal

object either:

private type NotNested = NotGiven[Label[Either[Nothing, Nothing]]]

/** Within an [[either]] block, allows unwrapping [[Either]] and [[Option]] values using [[ok()]]. The result is the right-value of an
/** This technique allows to prevent implicit search from finding the givens thanks to ambiguity. In the scope that returns `A` it will be
* impossible to find a given [[Supervised]] and [[Forked]].
*/
private type WithoutScopeMarkers[A] = (Supervised, Supervised) ?=> (Forked, Forked) ?=> A

private inline def availableInScope[A]: Boolean =
summonFrom {
case _: NotGiven[A] => false
case _: A => true
}

/** Within an [[either]] block, allows unwrapping [[Either]] and [[Option]] values using [[#ok()]]. The result is the right-value of an
* `Either`, or the defined-value of the `Option`. In case a failure is encountered (a left-value of an `Either`, or a `None`), the
* computation is short-circuited and the failure becomes the result. Failures can also be reported using [[fail()]].
* computation is short-circuited and the failure becomes the result. Failures can also be reported using [[#fail()]].
*
* Uses the [[boundary]]-break mechanism.
*
* Uses ambiguity-based given removal technique (given't) to enable usage of [[#ok()]] combinator in [[either]] blocks nested inside
* [[ox.fork]] blocks.
*
* @param body
* The code block, within which [[Either]]s and [[Option]]s can be unwrapped using [[ok()]]. Failures can be reported using [[fail()]].
* Both [[ok()]] and [[fail()]] are extension methods.
* The code block, within which [[Either]]s and [[Option]]s can be unwrapped using [[#ok()]]. Failures can be reported using
* [[#fail()]]. Both [[#ok()]] and [[#fail()]] are extension methods.
* @tparam E
* The error type.
* @tparam A
Expand All @@ -36,39 +49,56 @@ object either:
* v1.ok() ++ v2.ok()
* }}}
*/
inline def apply[E, A](inline body: Label[Either[E, A]] ?=> A)(using
inline def apply[E, A](inline body: WithoutScopeMarkers[Label[Either[E, A]] ?=> A])(using
@implicitNotFound(
"Nesting of either blocks is not allowed as it's error prone, due to type inference. Consider extracting the nested either block to a separate function."
) nn: NotNested
): Either[E, A] = boundary(Right(body))
): Either[E, A] =
given Forked = ForkedEvidence // just to satisfy the context function
given Supervised = SupervisedEvidence
boundary(Right(body))

extension [E, A](inline t: Either[E, A])
/** Unwrap the value of the `Either`, short-circuiting the computation to the enclosing [[either]], in case this is a left-value. */
/** Unwrap the value of the `Either`, short-circuiting the computation to the enclosing [[either]], in case this is a left-value. Can't
* be used in forked blocks without an either block in fork to prevent escaped Breaks that crash forked threads.
*/
transparent inline def ok(): A =
summonFrom {
case given boundary.Label[Either[E, Nothing]] =>
t match
case Left(e) => break(Left(e))
case Right(a) => a
case given boundary.Label[Either[Nothing, Nothing]] =>
error("The enclosing `either` call uses a different error type.\nIf it's explicitly typed, is the error type correct?")
case _ => error("`.ok()` can only be used within an `either` call.\nIs it present?")
}
inline if availableInScope[Forked] && !availableInScope[Supervised] then
error(
"This use of .ok() belongs to either block outside of the fork and is therefore illegal. Use either block inside of the forked block."
)
else
summonFrom {
case given boundary.Label[Either[E, Nothing]] =>
t match
case Left(e) => break(Left(e))
case Right(a) => a
case given boundary.Label[Either[Nothing, Nothing]] =>
error("The enclosing `either` call uses a different error type.\nIf it's explicitly typed, is the error type correct?")
case _ => error("`.ok()` can only be used within an `either` call.\nIs it present?")
}

extension [A](inline t: Option[A])
/** Unwrap the value of the `Option`, short-circuiting the computation to the enclosing [[either]], in case this is a `None`. */
/** Unwrap the value of the `Option`, short-circuiting the computation to the enclosing [[either]], in case this is a `None`. Can't be
* used in forked blocks without an either block in fork to prevent escaped Breaks that crash forked threads.
*/
transparent inline def ok(): A =
summonFrom {
case given boundary.Label[Either[Unit, Nothing]] =>
t match
case None => break(Left(()))
case Some(a) => a
case given boundary.Label[Either[Nothing, Nothing]] =>
error(
"The enclosing `either` call uses a different error type.\nIf it's explicitly typed, is the error type correct?\nNote that for options, the error type must contain a `Unit`."
)
case _ => error("`.ok()` can only be used within an `either` call.\nIs it present?")
}
inline if availableInScope[Forked] && !availableInScope[Supervised] then
error(
"This use of .ok() belongs to either block outside of the fork and is therefore illegal. Use either block inside of the forked block."
)
else
summonFrom {
case given boundary.Label[Either[Unit, Nothing]] =>
t match
case None => break(Left(()))
case Some(a) => a
case given boundary.Label[Either[Nothing, Nothing]] =>
error(
"The enclosing `either` call uses a different error type.\nIf it's explicitly typed, is the error type correct?\nNote that for options, the error type must contain a `Unit`."
)
case _ => error("`.ok()` can only be used within an `either` call.\nIs it present?")
}

extension [E, A](inline f: Fork[Either[E, A]])
/** Join the fork and unwrap the value of its `Either` result, short-circuiting the computation to the enclosing [[either]], in case
Expand All @@ -80,12 +110,20 @@ object either:
transparent inline def ok(): A = f.join().ok()

extension [E](e: E)
/** Fail the computation by short-circuiting the enclosing [[either]] block with en error of type `E`. Can't be used in forked blocks
* without an either block in fork to prevent escaped Breaks that crash forked threads.
*/
transparent inline def fail(): Nothing =
summonFrom {
case given boundary.Label[Either[E, Nothing]] => break(Left(e))
case given boundary.Label[Either[Nothing, Nothing]] =>
error("The enclosing `either` call uses a different error type.\nIf it's explicitly typed, is the error type correct?")
}
inline if availableInScope[Forked] && !availableInScope[Supervised] then
error(
"This use of .ok() belongs to either block outside of the fork and is therefore illegal. Use either block inside of the forked block."
)
else
summonFrom {
case given boundary.Label[Either[E, Nothing]] => break(Left(e))
case given boundary.Label[Either[Nothing, Nothing]] =>
error("The enclosing `either` call uses a different error type.\nIf it's explicitly typed, is the error type correct?")
}

/** Catches non-fatal exceptions that occur when evaluating `t` and returns them as the left side of the returned `Either`. */
inline def catching[T](inline t: => T): Either[Throwable, T] =
Expand Down
27 changes: 19 additions & 8 deletions core/src/main/scala/ox/fork.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ import java.util.concurrent.{CompletableFuture, Semaphore}
import scala.concurrent.ExecutionException
import scala.util.control.NonFatal

/** Implicit evidence that given block of code will be evaluated in a forked scope on a separate thread and therefore that capture of
* `scala.util.boundary.Label` instances is unsafe.
*/
private[ox] sealed abstract class Forked
private[ox] case object ForkedEvidence extends Forked

/** Starts a fork (logical thread of execution), which is guaranteed to complete before the enclosing [[supervised]] or [[supervisedError]]
* block completes.
*
Expand All @@ -18,7 +24,7 @@ import scala.util.control.NonFatal
* For alternate behaviors regarding ending the scope, see [[forkUser]], [[forkError]], [[forkUserError]], [[forkCancellable]] and
* [[forkUnsupervised]].
*/
def fork[T](f: => T)(using Ox): Fork[T] = forkError(using summon[Ox].asNoErrorMode)(f)
def fork[T](f: Forked ?=> T)(using Ox): Fork[T] = forkError(using summon[Ox].asNoErrorMode)(f)

/** Starts a fork (logical thread of execution), which is guaranteed to complete before the enclosing [[supervisedError]] block completes.
*
Expand All @@ -30,13 +36,14 @@ def fork[T](f: => T)(using Ox): Fork[T] = forkError(using summon[Ox].asNoErrorMo
* enclosing scope. If the [[ErrorMode]] provided when creating the scope using [[supervisedError]] classifies a fork return value as an
* error, the scope ends (cancelling all other running forks).
*/
def forkError[E, F[_], T](using OxError[E, F])(f: => F[T]): Fork[T] =
def forkError[E, F[_], T](using OxError[E, F])(f: Forked ?=> F[T]): Fork[T] =
val oxError = summon[OxError[E, F]]
// the separate result future is needed to wait for the result, as there's no .join on individual tasks (only whole scopes can be joined)
val result = new CompletableFuture[T]()
oxError.scope.fork { () =>
val supervisor = oxError.supervisor
try
given Forked = ForkedEvidence
val resultOrError = f
val errorMode = oxError.errorMode
if errorMode.isError(resultOrError) then
Expand All @@ -63,7 +70,7 @@ def forkError[E, F[_], T](using OxError[E, F])(f: => F[T]): Fork[T] =
*
* For alternate behaviors, see [[fork]], [[forkError]], [[forkUserError]], [[forkCancellable]] and [[forkUnsupervised]].
*/
def forkUser[T](f: => T)(using Ox): Fork[T] = forkUserError(using summon[Ox].asNoErrorMode)(f)
def forkUser[T](f: Forked ?=> T)(using Ox): Fork[T] = forkUserError(using summon[Ox].asNoErrorMode)(f)

/** Starts a fork (logical thread of execution), which is guaranteed to complete before the enclosing [[supervisedError]] block completes.
*
Expand All @@ -75,13 +82,14 @@ def forkUser[T](f: => T)(using Ox): Fork[T] = forkUserError(using summon[Ox].asN
* enclosing scope. If the [[ErrorMode]] provided when creating the scope using [[supervisedError]] classifies a fork return value as an
* error, the scope ends (cancelling all other running forks).
*/
def forkUserError[E, F[_], T](using OxError[E, F])(f: => F[T]): Fork[T] =
def forkUserError[E, F[_], T](using OxError[E, F])(f: Forked ?=> F[T]): Fork[T] =
val oxError = summon[OxError[E, F]]
val result = new CompletableFuture[T]()
oxError.supervisor.forkStarts()
oxError.scope.fork { () =>
val supervisor = oxError.supervisor.asInstanceOf[DefaultSupervisor[E]]
try
given Forked = ForkedEvidence
val resultOrError = f
val errorMode = oxError.errorMode
if errorMode.isError(resultOrError) then
Expand All @@ -105,9 +113,10 @@ def forkUserError[E, F[_], T](using OxError[E, F])(f: => F[T]): Fork[T] =
*
* For alternate behaviors, see [[fork]], [[forkUser]] and [[forkCancellable]].
*/
def forkUnsupervised[T](f: => T)(using OxUnsupervised): Fork[T] =
def forkUnsupervised[T](f: Forked ?=> T)(using OxUnsupervised): Fork[T] =
val result = new CompletableFuture[T]()
summon[OxUnsupervised].scope.fork { () =>
given Forked = ForkedEvidence
try result.complete(f)
catch case e: Throwable => result.completeExceptionally(e)
}
Expand All @@ -118,7 +127,7 @@ def forkUnsupervised[T](f: => T)(using OxUnsupervised): Fork[T] =
*
* If ran in a [[supervised]] scope, all forks behave as daemon threads (see [[fork]] for details).
*/
def forkAll[T](fs: Seq[() => T])(using Ox): Fork[Seq[T]] =
def forkAll[T](fs: Seq[Forked ?=> () => T])(using Ox): Fork[Seq[T]] =
val forks = fs.map(f => fork(f()))
new Fork[Seq[T]]:
override def join(): Seq[T] = forks.map(_.join())
Expand All @@ -136,7 +145,7 @@ def forkAll[T](fs: Seq[() => T])(using Ox): Fork[Seq[T]] =
* Implementation note: a cancellable fork is created by starting a nested scope in a fork, and then starting a fork there. Hence, it is
* more expensive than [[fork]], as two virtual threads are started.
*/
def forkCancellable[T](f: => T)(using OxUnsupervised): CancellableFork[T] =
def forkCancellable[T](f: Forked ?=> T)(using OxUnsupervised): CancellableFork[T] =
val result = new CompletableFuture[T]()
// forks can be never run, if they are cancelled immediately - we need to detect this, not to await on result.get()
val started = new AtomicBoolean(false)
Expand All @@ -149,7 +158,9 @@ def forkCancellable[T](f: => T)(using OxUnsupervised): CancellableFork[T] =
nestedOx.scope.fork { () =>
// "else" means that the fork is already cancelled, so doing nothing in that case
if !started.getAndSet(true) then
try result.complete(f).discard
try
given Forked = ForkedEvidence
result.complete(f).discard
catch case e: Throwable => result.completeExceptionally(e).discard

done.release() // the nested scope can now finish
Expand Down
29 changes: 26 additions & 3 deletions core/src/main/scala/ox/supervised.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,17 @@ package ox
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
import scala.reflect.ClassTag
import scala.util.NotGiven
import scala.util.boundary.Label

sealed abstract class Supervised
object SupervisedEvidence extends Supervised

private inline def availableInScope[A]: Boolean =
compiletime.summonFrom {
case _: NotGiven[A] => false
case _: A => true
}

/** Starts a new concurrency scope, which allows starting forks in the given code block `f`. Forks can be started using [[fork]],
* [[forkUser]], [[forkCancellable]] and [[forkUnsupervised]]. All forks are guaranteed to complete before this scope completes.
Expand All @@ -24,7 +35,19 @@ import scala.reflect.ClassTag
* @see
* [[supervisedError]] Starts a scope in supervised mode, with the additional ability to report application errors
*/
def supervised[T](f: Ox ?=> T): T = supervisedError(NoErrorMode)(f)
inline def supervised[T](f: Supervised ?=> Ox ?=> T): T =
inline if availableInScope[Label[Either[Nothing, Nothing]]] && availableInScope[Forked] then
compiletime.error(
"Nesting supervised scopes along with fork and either blocks is disallowed to prevent unsafe .ok() combinator usage on forks."
)
else supervisedErrorInternal(NoErrorMode)(f)

inline def supervisedError[E, F[_], T](em: ErrorMode[E, F])(f: Supervised ?=> OxError[E, F] ?=> F[T]): F[T] =
inline if availableInScope[Label[Either[Nothing, Nothing]]] && availableInScope[Forked] then
compiletime.error(
"Nesting supervised scopes along with fork and either blocks is disallowed to prevent unsafe .ok() combinator usage on forks."
)
else supervisedErrorInternal(em)(f)

/** Starts a new concurrency scope, which allows starting forks in the given code block `f`. Forks can be started using [[fork]],
* [[forkError]], [[forkUser]], [[forkUserError]], [[forkCancellable]] and [[forkUnsupervised]]. All forks are guaranteed to complete
Expand All @@ -36,12 +59,12 @@ def supervised[T](f: Ox ?=> T): T = supervisedError(NoErrorMode)(f)
* @see
* [[forkError]] On details how to use application errors.
*/
def supervisedError[E, F[_], T](em: ErrorMode[E, F])(f: OxError[E, F] ?=> F[T]): F[T] =
def supervisedErrorInternal[E, F[_], T](em: ErrorMode[E, F])(f: Supervised ?=> OxError[E, F] ?=> F[T]): F[T] =
val s = DefaultSupervisor[E]
val capability = OxError(s, em)
try
val scopeResult = scopedWithCapability(capability) {
val mainBodyFork = forkUserError(using capability)(f(using capability))
val mainBodyFork = forkUserError(using capability)(f(using SupervisedEvidence)(using capability))
val supervisorResult = s.join() // might throw if any supervised fork threw
if supervisorResult == ErrorModeSupervisorResult.Success then
// if no exceptions, the main f-fork must be done by now
Expand Down
Loading
Loading