Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 117 additions & 40 deletions Control/Concurrent/Async/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ import GHC.Exts
import GHC.IO hiding (finally, onException)
import GHC.Conc (ThreadId(..))

#ifdef DEBUG_AUTO_LABEL
import qualified GHC.Stack
#endif

#ifdef DEBUG_AUTO_LABEL
#define CALLSTACK GHC.Stack.HasCallStack =>
#else
#define CALLSTACK
#endif

-- -----------------------------------------------------------------------------
-- STM Async API

Expand Down Expand Up @@ -95,40 +105,53 @@ compareAsyncs (Async t1 _) (Async t2 _) = compare t1 t2
-- (see module-level documentation for details).
--
-- __Use 'withAsync' style functions wherever you can instead!__
async :: IO a -> IO (Async a)
async ::
CALLSTACK
IO a -> IO (Async a)
async = inline asyncUsing rawForkIO

-- | Like 'async' but using 'forkOS' internally.
asyncBound :: IO a -> IO (Async a)
asyncBound ::
CALLSTACK
IO a -> IO (Async a)
asyncBound = asyncUsing forkOS

-- | Like 'async' but using 'forkOn' internally.
asyncOn :: Int -> IO a -> IO (Async a)
asyncOn ::
CALLSTACK
Int -> IO a -> IO (Async a)
asyncOn = asyncUsing . rawForkOn

-- | Like 'async' but using 'forkIOWithUnmask' internally. The child
-- thread is passed a function that can be used to unmask asynchronous
-- exceptions.
asyncWithUnmask :: ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
asyncWithUnmask ::
CALLSTACK
((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
asyncWithUnmask actionWith = asyncUsing rawForkIO (actionWith unsafeUnmask)

-- | Like 'asyncOn' but using 'forkOnWithUnmask' internally. The
-- child thread is passed a function that can be used to unmask
-- asynchronous exceptions.
asyncOnWithUnmask :: Int -> ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
asyncOnWithUnmask ::
CALLSTACK
Int -> ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
asyncOnWithUnmask cpu actionWith =
asyncUsing (rawForkOn cpu) (actionWith unsafeUnmask)

asyncUsing :: (IO () -> IO ThreadId)
-> IO a -> IO (Async a)
asyncUsing ::
CALLSTACK
(IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing doFork = \action -> do
var <- newEmptyTMVarIO
let action_plus = debugLabelMe >> action
-- t <- forkFinally action (\r -> atomically $ putTMVar var r)
-- slightly faster:
t <- mask $ \restore ->
doFork $ try (restore action) >>= atomically . putTMVar var
doFork $ try (restore action_plus) >>= atomically . putTMVar var
return (Async t (readTMVar var))


-- | Spawn an asynchronous action in a separate thread, and pass its
-- @Async@ handle to the supplied function. When the function returns
-- or throws an exception, 'uninterruptibleCancel' is called on the @Async@.
Expand All @@ -144,41 +167,51 @@ asyncUsing doFork = \action -> do
-- to `withAsync` returns, so nesting many `withAsync` calls requires
-- linear memory.
--
withAsync :: IO a -> (Async a -> IO b) -> IO b
withAsync ::
CALLSTACK
IO a -> (Async a -> IO b) -> IO b
withAsync = inline withAsyncUsing rawForkIO

-- | Like 'withAsync' but uses 'forkOS' internally.
withAsyncBound :: IO a -> (Async a -> IO b) -> IO b
withAsyncBound ::
CALLSTACK
IO a -> (Async a -> IO b) -> IO b
withAsyncBound = withAsyncUsing forkOS

-- | Like 'withAsync' but uses 'forkOn' internally.
withAsyncOn :: Int -> IO a -> (Async a -> IO b) -> IO b
withAsyncOn ::
CALLSTACK
Int -> IO a -> (Async a -> IO b) -> IO b
withAsyncOn = withAsyncUsing . rawForkOn

-- | Like 'withAsync' but uses 'forkIOWithUnmask' internally. The
-- child thread is passed a function that can be used to unmask
-- asynchronous exceptions.
withAsyncWithUnmask
:: ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncWithUnmask ::
CALLSTACK
((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncWithUnmask actionWith =
withAsyncUsing rawForkIO (actionWith unsafeUnmask)

-- | Like 'withAsyncOn' but uses 'forkOnWithUnmask' internally. The
-- child thread is passed a function that can be used to unmask
-- asynchronous exceptions
withAsyncOnWithUnmask
:: Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncOnWithUnmask ::
CALLSTACK
Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncOnWithUnmask cpu actionWith =
withAsyncUsing (rawForkOn cpu) (actionWith unsafeUnmask)

withAsyncUsing :: (IO () -> IO ThreadId)
-> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing ::
CALLSTACK
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
-- The bracket version works, but is slow. We can do better by
-- hand-coding it:
withAsyncUsing doFork = \action inner -> do
var <- newEmptyTMVarIO
mask $ \restore -> do
t <- doFork $ try (restore action) >>= atomically . putTMVar var
let action_plus = debugLabelMe >> action
t <- doFork $ try (restore action_plus) >>= atomically . putTMVar var
let a = Async t (readTMVar var)
r <- restore (inner a) `catchAll` \e -> do
uninterruptibleCancel a
Expand Down Expand Up @@ -554,11 +587,15 @@ isCancel e
-- > withAsync right $ \b ->
-- > waitEither a b
--
race :: IO a -> IO b -> IO (Either a b)
race ::
CALLSTACK
IO a -> IO b -> IO (Either a b)

-- | Like 'race', but the result is ignored.
--
race_ :: IO a -> IO b -> IO ()
race_ ::
CALLSTACK
IO a -> IO b -> IO ()


-- | Run two @IO@ actions concurrently, and return both results. If
Expand All @@ -570,19 +607,25 @@ race_ :: IO a -> IO b -> IO ()
-- > withAsync left $ \a ->
-- > withAsync right $ \b ->
-- > waitBoth a b
concurrently :: IO a -> IO b -> IO (a,b)
concurrently ::
CALLSTACK
IO a -> IO b -> IO (a,b)


-- | Run two @IO@ actions concurrently. If both of them end with @Right@,
-- return both results. If one of then ends with @Left@, interrupt the other
-- action and return the @Left@.
--
concurrentlyE :: IO (Either e a) -> IO (Either e b) -> IO (Either e (a, b))
concurrentlyE ::
CALLSTACK
IO (Either e a) -> IO (Either e b) -> IO (Either e (a, b))

-- | 'concurrently', but ignore the result values
--
-- @since 2.1.1
concurrently_ :: IO a -> IO b -> IO ()
concurrently_ ::
CALLSTACK
IO a -> IO b -> IO ()

#define USE_ASYNC_VERSIONS 0

Expand Down Expand Up @@ -643,9 +686,11 @@ concurrentlyE left right = concurrently' left right (collect [])
Left ex -> throwIO ex
Right r -> collect (r:xs) m

concurrently' :: IO a -> IO b
-> (IO (Either SomeException (Either a b)) -> IO r)
-> IO r
concurrently' ::
CALLSTACK
IO a -> IO b
-> (IO (Either SomeException (Either a b)) -> IO r)
-> IO r
concurrently' left right collect = do
done <- newEmptyMVar
mask $ \restore -> do
Expand Down Expand Up @@ -721,37 +766,49 @@ concurrently_ left right = concurrently' left right (collect 0)
-- for each element of the @Traversable@, so running this on large
-- inputs without care may lead to resource exhaustion (of memory,
-- file descriptors, or other limited resources).
mapConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b)
mapConcurrently ::
CALLSTACK
Traversable t => (a -> IO b) -> t a -> IO (t b)
mapConcurrently f = runConcurrently . traverse (Concurrently . f)

-- | `forConcurrently` is `mapConcurrently` with its arguments flipped
--
-- > pages <- forConcurrently ["url1", "url2", "url3"] $ \url -> getURL url
--
-- @since 2.1.0
forConcurrently :: Traversable t => t a -> (a -> IO b) -> IO (t b)
forConcurrently ::
CALLSTACK
Traversable t => t a -> (a -> IO b) -> IO (t b)
forConcurrently = flip mapConcurrently

-- | `mapConcurrently_` is `mapConcurrently` with the return value discarded;
-- a concurrent equivalent of 'mapM_'.
mapConcurrently_ :: F.Foldable f => (a -> IO b) -> f a -> IO ()
mapConcurrently_ ::
CALLSTACK
F.Foldable f => (a -> IO b) -> f a -> IO ()
mapConcurrently_ f = runConcurrently . F.foldMap (Concurrently . void . f)

-- | `forConcurrently_` is `forConcurrently` with the return value discarded;
-- a concurrent equivalent of 'forM_'.
forConcurrently_ :: F.Foldable f => f a -> (a -> IO b) -> IO ()
forConcurrently_ ::
CALLSTACK
F.Foldable f => f a -> (a -> IO b) -> IO ()
forConcurrently_ = flip mapConcurrently_

-- | Perform the action in the given number of threads.
--
-- @since 2.1.1
replicateConcurrently :: Int -> IO a -> IO [a]
replicateConcurrently ::
CALLSTACK
Int -> IO a -> IO [a]
replicateConcurrently cnt = runConcurrently . sequenceA . replicate cnt . Concurrently

-- | Same as 'replicateConcurrently', but ignore the results.
--
-- @since 2.1.1
replicateConcurrently_ :: Int -> IO a -> IO ()
replicateConcurrently_ ::
CALLSTACK
Int -> IO a -> IO ()
replicateConcurrently_ cnt = runConcurrently . F.fold . replicate cnt . Concurrently . void

-- -----------------------------------------------------------------------------
Expand Down Expand Up @@ -845,14 +902,16 @@ instance (Semigroup a, Monoid a) => Monoid (ConcurrentlyE e a) where
-- | Fork a thread that runs the supplied action, and if it raises an
-- exception, re-runs the action. The thread terminates only when the
-- action runs to completion without raising an exception.
forkRepeat :: IO a -> IO ThreadId
forkRepeat ::
CALLSTACK
IO a -> IO ThreadId
forkRepeat action =
mask $ \restore ->
let go = do r <- tryAll (restore action)
case r of
Left _ -> go
_ -> return ()
in forkIO go
in forkIO (debugLabelMe >> go)

catchAll :: IO a -> (SomeException -> IO a) -> IO a
catchAll = catch
Expand All @@ -864,11 +923,29 @@ tryAll = try
-- handler: saves a bit of time when we will be installing our own
-- exception handler.
{-# INLINE rawForkIO #-}
rawForkIO :: IO () -> IO ThreadId
rawForkIO (IO action) = IO $ \ s ->
case (fork# action s) of (# s1, tid #) -> (# s1, ThreadId tid #)
rawForkIO ::
CALLSTACK
IO () -> IO ThreadId
rawForkIO action = IO $ \ s ->
case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
where
(IO action_plus) = debugLabelMe >> action

{-# INLINE rawForkOn #-}
rawForkOn :: Int -> IO () -> IO ThreadId
rawForkOn (I# cpu) (IO action) = IO $ \ s ->
case (forkOn# cpu action s) of (# s1, tid #) -> (# s1, ThreadId tid #)
rawForkOn ::
CALLSTACK
Int -> IO () -> IO ThreadId
rawForkOn (I# cpu) action = IO $ \ s ->
case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
where
(IO action_plus) = debugLabelMe >> action

debugLabelMe ::
CALLSTACK
IO ()
debugLabelMe =
#ifdef DEBUG_AUTO_LABEL
myThreadId >>= flip labelThread (GHC.Stack.prettyCallStack callStack)
#else
pure ()
#endif
12 changes: 12 additions & 0 deletions async.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ source-repository head
type: git
location: https://github.com/simonmar/async.git

flag debug-auto-label
description:
Strictly for debugging as it might have a non-negligible overhead.

Enabling this flag will auto-label the threads spawned by @async@. Use it to
find where are unlabelled threads spawned in your program (be it your code or
dependency code).
default: False
manual: True

library
default-language: Haskell2010
other-extensions: CPP, MagicHash, RankNTypes, UnboxedTuples
Expand All @@ -74,6 +84,8 @@ library
build-depends: base >= 4.3 && < 4.22,
hashable >= 1.1.2.0 && < 1.6,
stm >= 2.2 && < 2.6
if flag(debug-auto-label)
cpp-options: -DDEBUG_AUTO_LABEL

test-suite test-async
default-language: Haskell2010
Expand Down