Skip to content

Cabal flag to control auto-labelling of threads #164

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 171 additions & 41 deletions Control/Concurrent/Async/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ import Data.IORef

import GHC.Exts
import GHC.IO hiding (finally, onException)
import GHC.Conc (ThreadId(..))
import GHC.Conc (ThreadId(..), labelThread)

#ifdef DEBUG_AUTO_LABEL
import GHC.Stack
#endif

-- -----------------------------------------------------------------------------
-- STM Async API
Expand Down Expand Up @@ -95,40 +99,65 @@ compareAsyncs (Async t1 _) (Async t2 _) = compare t1 t2
-- (see module-level documentation for details).
--
-- __Use 'withAsync' style functions wherever you can instead!__
async :: IO a -> IO (Async a)
async ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
IO a -> IO (Async a)
async = inline asyncUsing rawForkIO

-- | Like 'async' but using 'forkOS' internally.
asyncBound :: IO a -> IO (Async a)
asyncBound ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
IO a -> IO (Async a)
asyncBound = asyncUsing forkOS

-- | Like 'async' but using 'forkOn' internally.
asyncOn :: Int -> IO a -> IO (Async a)
asyncOn ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
Int -> IO a -> IO (Async a)
asyncOn = asyncUsing . rawForkOn

-- | Like 'async' but using 'forkIOWithUnmask' internally. The child
-- thread is passed a function that can be used to unmask asynchronous
-- exceptions.
asyncWithUnmask :: ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
asyncWithUnmask ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
asyncWithUnmask actionWith = asyncUsing rawForkIO (actionWith unsafeUnmask)

-- | Like 'asyncOn' but using 'forkOnWithUnmask' internally. The
-- child thread is passed a function that can be used to unmask
-- asynchronous exceptions.
asyncOnWithUnmask :: Int -> ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
asyncOnWithUnmask ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
Int -> ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
asyncOnWithUnmask cpu actionWith =
asyncUsing (rawForkOn cpu) (actionWith unsafeUnmask)

asyncUsing :: (IO () -> IO ThreadId)
-> IO a -> IO (Async a)
asyncUsing ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
(IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing doFork = \action -> do
var <- newEmptyTMVarIO
let action_plus = debugLabelMe >> action
-- t <- forkFinally action (\r -> atomically $ putTMVar var r)
-- slightly faster:
t <- mask $ \restore ->
doFork $ try (restore action) >>= atomically . putTMVar var
doFork $ try (restore action_plus) >>= atomically . putTMVar var
return (Async t (readTMVar var))


-- | Spawn an asynchronous action in a separate thread, and pass its
-- @Async@ handle to the supplied function. When the function returns
-- or throws an exception, 'uninterruptibleCancel' is called on the @Async@.
Expand All @@ -144,41 +173,63 @@ asyncUsing doFork = \action -> do
-- to `withAsync` returns, so nesting many `withAsync` calls requires
-- linear memory.
--
withAsync :: IO a -> (Async a -> IO b) -> IO b
withAsync ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
IO a -> (Async a -> IO b) -> IO b
withAsync = inline withAsyncUsing rawForkIO

-- | Like 'withAsync' but uses 'forkOS' internally.
withAsyncBound :: IO a -> (Async a -> IO b) -> IO b
withAsyncBound ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
IO a -> (Async a -> IO b) -> IO b
withAsyncBound = withAsyncUsing forkOS

-- | Like 'withAsync' but uses 'forkOn' internally.
withAsyncOn :: Int -> IO a -> (Async a -> IO b) -> IO b
withAsyncOn ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
Int -> IO a -> (Async a -> IO b) -> IO b
withAsyncOn = withAsyncUsing . rawForkOn

-- | Like 'withAsync' but uses 'forkIOWithUnmask' internally. The
-- child thread is passed a function that can be used to unmask
-- asynchronous exceptions.
withAsyncWithUnmask
:: ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncWithUnmask ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncWithUnmask actionWith =
withAsyncUsing rawForkIO (actionWith unsafeUnmask)

-- | Like 'withAsyncOn' but uses 'forkOnWithUnmask' internally. The
-- child thread is passed a function that can be used to unmask
-- asynchronous exceptions
withAsyncOnWithUnmask
:: Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncOnWithUnmask ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncOnWithUnmask cpu actionWith =
withAsyncUsing (rawForkOn cpu) (actionWith unsafeUnmask)

withAsyncUsing :: (IO () -> IO ThreadId)
-> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
-- The bracket version works, but is slow. We can do better by
-- hand-coding it:
withAsyncUsing doFork = \action inner -> do
var <- newEmptyTMVarIO
mask $ \restore -> do
t <- doFork $ try (restore action) >>= atomically . putTMVar var
let action_plus = debugLabelMe >> action
t <- doFork $ try (restore action_plus) >>= atomically . putTMVar var
let a = Async t (readTMVar var)
r <- restore (inner a) `catchAll` \e -> do
uninterruptibleCancel a
Expand Down Expand Up @@ -504,6 +555,7 @@ linkOnly
linkOnly shouldThrow a = do
me <- myThreadId
void $ forkRepeat $ do
myThreadId >>= flip labelThread ("linkOnly " ++ show (asyncThreadId a) ++ " -> " ++ show me)
r <- waitCatch a
case r of
Left e | shouldThrow e -> throwTo me (ExceptionInLinkedThread a e)
Expand Down Expand Up @@ -554,11 +606,19 @@ isCancel e
-- > withAsync right $ \b ->
-- > waitEither a b
--
race :: IO a -> IO b -> IO (Either a b)
race ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
IO a -> IO b -> IO (Either a b)

-- | Like 'race', but the result is ignored.
--
race_ :: IO a -> IO b -> IO ()
race_ ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
IO a -> IO b -> IO ()


-- | Run two @IO@ actions concurrently, and return both results. If
Expand All @@ -570,19 +630,31 @@ race_ :: IO a -> IO b -> IO ()
-- > withAsync left $ \a ->
-- > withAsync right $ \b ->
-- > waitBoth a b
concurrently :: IO a -> IO b -> IO (a,b)
concurrently ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
IO a -> IO b -> IO (a,b)


-- | Run two @IO@ actions concurrently. If both of them end with @Right@,
-- return both results. If one of then ends with @Left@, interrupt the other
-- action and return the @Left@.
--
concurrentlyE :: IO (Either e a) -> IO (Either e b) -> IO (Either e (a, b))
concurrentlyE ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
IO (Either e a) -> IO (Either e b) -> IO (Either e (a, b))

-- | 'concurrently', but ignore the result values
--
-- @since 2.1.1
concurrently_ :: IO a -> IO b -> IO ()
concurrently_ ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
IO a -> IO b -> IO ()

#define USE_ASYNC_VERSIONS 0

Expand Down Expand Up @@ -643,9 +715,13 @@ concurrentlyE left right = concurrently' left right (collect [])
Left ex -> throwIO ex
Right r -> collect (r:xs) m

concurrently' :: IO a -> IO b
-> (IO (Either SomeException (Either a b)) -> IO r)
-> IO r
concurrently' ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
IO a -> IO b
-> (IO (Either SomeException (Either a b)) -> IO r)
-> IO r
concurrently' left right collect = do
done <- newEmptyMVar
mask $ \restore -> do
Expand Down Expand Up @@ -684,6 +760,7 @@ concurrently' left right collect = do
-- putMVar.
when (count' > 0) $
void $ forkIO $ do
myThreadId >>= flip labelThread "concurrent stop"
throwTo rid AsyncCancelled
throwTo lid AsyncCancelled
-- ensure the children are really dead
Expand Down Expand Up @@ -721,37 +798,61 @@ concurrently_ left right = concurrently' left right (collect 0)
-- for each element of the @Traversable@, so running this on large
-- inputs without care may lead to resource exhaustion (of memory,
-- file descriptors, or other limited resources).
mapConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b)
mapConcurrently ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
Traversable t => (a -> IO b) -> t a -> IO (t b)
mapConcurrently f = runConcurrently . traverse (Concurrently . f)

-- | `forConcurrently` is `mapConcurrently` with its arguments flipped
--
-- > pages <- forConcurrently ["url1", "url2", "url3"] $ \url -> getURL url
--
-- @since 2.1.0
forConcurrently :: Traversable t => t a -> (a -> IO b) -> IO (t b)
forConcurrently ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
Traversable t => t a -> (a -> IO b) -> IO (t b)
forConcurrently = flip mapConcurrently

-- | `mapConcurrently_` is `mapConcurrently` with the return value discarded;
-- a concurrent equivalent of 'mapM_'.
mapConcurrently_ :: F.Foldable f => (a -> IO b) -> f a -> IO ()
mapConcurrently_ ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
F.Foldable f => (a -> IO b) -> f a -> IO ()
mapConcurrently_ f = runConcurrently . F.foldMap (Concurrently . void . f)

-- | `forConcurrently_` is `forConcurrently` with the return value discarded;
-- a concurrent equivalent of 'forM_'.
forConcurrently_ :: F.Foldable f => f a -> (a -> IO b) -> IO ()
forConcurrently_ ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
F.Foldable f => f a -> (a -> IO b) -> IO ()
forConcurrently_ = flip mapConcurrently_

-- | Perform the action in the given number of threads.
--
-- @since 2.1.1
replicateConcurrently :: Int -> IO a -> IO [a]
replicateConcurrently ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
Int -> IO a -> IO [a]
replicateConcurrently cnt = runConcurrently . sequenceA . replicate cnt . Concurrently

-- | Same as 'replicateConcurrently', but ignore the results.
--
-- @since 2.1.1
replicateConcurrently_ :: Int -> IO a -> IO ()
replicateConcurrently_ ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
Int -> IO a -> IO ()
replicateConcurrently_ cnt = runConcurrently . F.fold . replicate cnt . Concurrently . void

-- -----------------------------------------------------------------------------
Expand Down Expand Up @@ -845,14 +946,18 @@ instance (Semigroup a, Monoid a) => Monoid (ConcurrentlyE e a) where
-- | Fork a thread that runs the supplied action, and if it raises an
-- exception, re-runs the action. The thread terminates only when the
-- action runs to completion without raising an exception.
forkRepeat :: IO a -> IO ThreadId
forkRepeat ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
IO a -> IO ThreadId
forkRepeat action =
mask $ \restore ->
let go = do r <- tryAll (restore action)
case r of
Left _ -> go
_ -> return ()
in forkIO go
in forkIO (debugLabelMe >> go)

catchAll :: IO a -> (SomeException -> IO a) -> IO a
catchAll = catch
Expand All @@ -864,11 +969,36 @@ tryAll = try
-- handler: saves a bit of time when we will be installing our own
-- exception handler.
{-# INLINE rawForkIO #-}
rawForkIO :: IO () -> IO ThreadId
rawForkIO (IO action) = IO $ \ s ->
case (fork# action s) of (# s1, tid #) -> (# s1, ThreadId tid #)
rawForkIO ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
IO () -> IO ThreadId
rawForkIO action = IO $ \ s ->
case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
where
(IO action_plus) = debugLabelMe >> action

{-# INLINE rawForkOn #-}
rawForkOn :: Int -> IO () -> IO ThreadId
rawForkOn (I# cpu) (IO action) = IO $ \ s ->
case (forkOn# cpu action s) of (# s1, tid #) -> (# s1, ThreadId tid #)
rawForkOn ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
Int -> IO () -> IO ThreadId
rawForkOn (I# cpu) action = IO $ \ s ->
case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
where
(IO action_plus) = debugLabelMe >> action


debugLabelMe ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
IO ()
debugLabelMe =
#ifdef DEBUG_AUTO_LABEL
myThreadId >>= flip labelThread (prettyCallStack callStack)
#else
pure ()
#endif
Loading