Skip to content

Commit 6035549

Browse files
committed
Cabal flag to control auto-labelling of threads
1 parent 8a8789c commit 6035549

File tree

2 files changed

+173
-37
lines changed

2 files changed

+173
-37
lines changed

Control/Concurrent/Async/Internal.hs

Lines changed: 161 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ import GHC.Exts
5757
import GHC.IO hiding (finally, onException)
5858
import GHC.Conc (ThreadId(..), labelThread)
5959

60+
#ifdef DEBUG_AUTO_LABEL
61+
import GHC.Stack
62+
#endif
63+
6064
-- -----------------------------------------------------------------------------
6165
-- STM Async API
6266

@@ -95,40 +99,65 @@ compareAsyncs (Async t1 _) (Async t2 _) = compare t1 t2
9599
-- (see module-level documentation for details).
96100
--
97101
-- __Use 'withAsync' style functions wherever you can instead!__
98-
async :: IO a -> IO (Async a)
102+
async ::
103+
#ifdef DEBUG_AUTO_LABEL
104+
HasCallStack =>
105+
#endif
106+
IO a -> IO (Async a)
99107
async = inline asyncUsing rawForkIO
100108

101109
-- | Like 'async' but using 'forkOS' internally.
102-
asyncBound :: IO a -> IO (Async a)
110+
asyncBound ::
111+
#ifdef DEBUG_AUTO_LABEL
112+
HasCallStack =>
113+
#endif
114+
IO a -> IO (Async a)
103115
asyncBound = asyncUsing forkOS
104116

105117
-- | Like 'async' but using 'forkOn' internally.
106-
asyncOn :: Int -> IO a -> IO (Async a)
118+
asyncOn ::
119+
#ifdef DEBUG_AUTO_LABEL
120+
HasCallStack =>
121+
#endif
122+
Int -> IO a -> IO (Async a)
107123
asyncOn = asyncUsing . rawForkOn
108124

109125
-- | Like 'async' but using 'forkIOWithUnmask' internally. The child
110126
-- thread is passed a function that can be used to unmask asynchronous
111127
-- exceptions.
112-
asyncWithUnmask :: ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
128+
asyncWithUnmask ::
129+
#ifdef DEBUG_AUTO_LABEL
130+
HasCallStack =>
131+
#endif
132+
((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
113133
asyncWithUnmask actionWith = asyncUsing rawForkIO (actionWith unsafeUnmask)
114134

115135
-- | Like 'asyncOn' but using 'forkOnWithUnmask' internally. The
116136
-- child thread is passed a function that can be used to unmask
117137
-- asynchronous exceptions.
118-
asyncOnWithUnmask :: Int -> ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
138+
asyncOnWithUnmask ::
139+
#ifdef DEBUG_AUTO_LABEL
140+
HasCallStack =>
141+
#endif
142+
Int -> ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
119143
asyncOnWithUnmask cpu actionWith =
120144
asyncUsing (rawForkOn cpu) (actionWith unsafeUnmask)
121145

122-
asyncUsing :: (IO () -> IO ThreadId)
123-
-> IO a -> IO (Async a)
146+
asyncUsing ::
147+
#ifdef DEBUG_AUTO_LABEL
148+
HasCallStack =>
149+
#endif
150+
(IO () -> IO ThreadId) -> IO a -> IO (Async a)
124151
asyncUsing doFork = \action -> do
125152
var <- newEmptyTMVarIO
153+
let action_plus = debugLabelMe >> action
126154
-- t <- forkFinally action (\r -> atomically $ putTMVar var r)
127155
-- slightly faster:
128156
t <- mask $ \restore ->
129-
doFork $ try (restore action) >>= atomically . putTMVar var
157+
doFork $ try (restore action_plus) >>= atomically . putTMVar var
130158
return (Async t (readTMVar var))
131159

160+
132161
-- | Spawn an asynchronous action in a separate thread, and pass its
133162
-- @Async@ handle to the supplied function. When the function returns
134163
-- or throws an exception, 'uninterruptibleCancel' is called on the @Async@.
@@ -144,41 +173,63 @@ asyncUsing doFork = \action -> do
144173
-- to `withAsync` returns, so nesting many `withAsync` calls requires
145174
-- linear memory.
146175
--
147-
withAsync :: IO a -> (Async a -> IO b) -> IO b
176+
withAsync ::
177+
#ifdef DEBUG_AUTO_LABEL
178+
HasCallStack =>
179+
#endif
180+
IO a -> (Async a -> IO b) -> IO b
148181
withAsync = inline withAsyncUsing rawForkIO
149182

150183
-- | Like 'withAsync' but uses 'forkOS' internally.
151-
withAsyncBound :: IO a -> (Async a -> IO b) -> IO b
184+
withAsyncBound ::
185+
#ifdef DEBUG_AUTO_LABEL
186+
HasCallStack =>
187+
#endif
188+
IO a -> (Async a -> IO b) -> IO b
152189
withAsyncBound = withAsyncUsing forkOS
153190

154191
-- | Like 'withAsync' but uses 'forkOn' internally.
155-
withAsyncOn :: Int -> IO a -> (Async a -> IO b) -> IO b
192+
withAsyncOn ::
193+
#ifdef DEBUG_AUTO_LABEL
194+
HasCallStack =>
195+
#endif
196+
Int -> IO a -> (Async a -> IO b) -> IO b
156197
withAsyncOn = withAsyncUsing . rawForkOn
157198

158199
-- | Like 'withAsync' but uses 'forkIOWithUnmask' internally. The
159200
-- child thread is passed a function that can be used to unmask
160201
-- asynchronous exceptions.
161-
withAsyncWithUnmask
162-
:: ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
202+
withAsyncWithUnmask ::
203+
#ifdef DEBUG_AUTO_LABEL
204+
HasCallStack =>
205+
#endif
206+
((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
163207
withAsyncWithUnmask actionWith =
164208
withAsyncUsing rawForkIO (actionWith unsafeUnmask)
165209

166210
-- | Like 'withAsyncOn' but uses 'forkOnWithUnmask' internally. The
167211
-- child thread is passed a function that can be used to unmask
168212
-- asynchronous exceptions
169-
withAsyncOnWithUnmask
170-
:: Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
213+
withAsyncOnWithUnmask ::
214+
#ifdef DEBUG_AUTO_LABEL
215+
HasCallStack =>
216+
#endif
217+
Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
171218
withAsyncOnWithUnmask cpu actionWith =
172219
withAsyncUsing (rawForkOn cpu) (actionWith unsafeUnmask)
173220

174-
withAsyncUsing :: (IO () -> IO ThreadId)
175-
-> IO a -> (Async a -> IO b) -> IO b
221+
withAsyncUsing ::
222+
#ifdef DEBUG_AUTO_LABEL
223+
HasCallStack =>
224+
#endif
225+
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
176226
-- The bracket version works, but is slow. We can do better by
177227
-- hand-coding it:
178228
withAsyncUsing doFork = \action inner -> do
179229
var <- newEmptyTMVarIO
180230
mask $ \restore -> do
181-
t <- doFork $ try (restore action) >>= atomically . putTMVar var
231+
let action_plus = debugLabelMe >> action
232+
t <- doFork $ try (restore action_plus) >>= atomically . putTMVar var
182233
let a = Async t (readTMVar var)
183234
r <- restore (inner a) `catchAll` \e -> do
184235
uninterruptibleCancel a
@@ -555,11 +606,19 @@ isCancel e
555606
-- > withAsync right $ \b ->
556607
-- > waitEither a b
557608
--
558-
race :: IO a -> IO b -> IO (Either a b)
609+
race ::
610+
#ifdef DEBUG_AUTO_LABEL
611+
HasCallStack =>
612+
#endif
613+
IO a -> IO b -> IO (Either a b)
559614

560615
-- | Like 'race', but the result is ignored.
561616
--
562-
race_ :: IO a -> IO b -> IO ()
617+
race_ ::
618+
#ifdef DEBUG_AUTO_LABEL
619+
HasCallStack =>
620+
#endif
621+
IO a -> IO b -> IO ()
563622

564623

565624
-- | Run two @IO@ actions concurrently, and return both results. If
@@ -571,19 +630,31 @@ race_ :: IO a -> IO b -> IO ()
571630
-- > withAsync left $ \a ->
572631
-- > withAsync right $ \b ->
573632
-- > waitBoth a b
574-
concurrently :: IO a -> IO b -> IO (a,b)
633+
concurrently ::
634+
#ifdef DEBUG_AUTO_LABEL
635+
HasCallStack =>
636+
#endif
637+
IO a -> IO b -> IO (a,b)
575638

576639

577640
-- | Run two @IO@ actions concurrently. If both of them end with @Right@,
578641
-- return both results. If one of then ends with @Left@, interrupt the other
579642
-- action and return the @Left@.
580643
--
581-
concurrentlyE :: IO (Either e a) -> IO (Either e b) -> IO (Either e (a, b))
644+
concurrentlyE ::
645+
#ifdef DEBUG_AUTO_LABEL
646+
HasCallStack =>
647+
#endif
648+
IO (Either e a) -> IO (Either e b) -> IO (Either e (a, b))
582649

583650
-- | 'concurrently', but ignore the result values
584651
--
585652
-- @since 2.1.1
586-
concurrently_ :: IO a -> IO b -> IO ()
653+
concurrently_ ::
654+
#ifdef DEBUG_AUTO_LABEL
655+
HasCallStack =>
656+
#endif
657+
IO a -> IO b -> IO ()
587658

588659
#define USE_ASYNC_VERSIONS 0
589660

@@ -722,37 +793,61 @@ concurrently_ left right = concurrently' left right (collect 0)
722793
-- for each element of the @Traversable@, so running this on large
723794
-- inputs without care may lead to resource exhaustion (of memory,
724795
-- file descriptors, or other limited resources).
725-
mapConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b)
796+
mapConcurrently ::
797+
#ifdef DEBUG_AUTO_LABEL
798+
HasCallStack =>
799+
#endif
800+
Traversable t => (a -> IO b) -> t a -> IO (t b)
726801
mapConcurrently f = runConcurrently . traverse (Concurrently . f)
727802

728803
-- | `forConcurrently` is `mapConcurrently` with its arguments flipped
729804
--
730805
-- > pages <- forConcurrently ["url1", "url2", "url3"] $ \url -> getURL url
731806
--
732807
-- @since 2.1.0
733-
forConcurrently :: Traversable t => t a -> (a -> IO b) -> IO (t b)
808+
forConcurrently ::
809+
#ifdef DEBUG_AUTO_LABEL
810+
HasCallStack =>
811+
#endif
812+
Traversable t => t a -> (a -> IO b) -> IO (t b)
734813
forConcurrently = flip mapConcurrently
735814

736815
-- | `mapConcurrently_` is `mapConcurrently` with the return value discarded;
737816
-- a concurrent equivalent of 'mapM_'.
738-
mapConcurrently_ :: F.Foldable f => (a -> IO b) -> f a -> IO ()
817+
mapConcurrently_ ::
818+
#ifdef DEBUG_AUTO_LABEL
819+
HasCallStack =>
820+
#endif
821+
F.Foldable f => (a -> IO b) -> f a -> IO ()
739822
mapConcurrently_ f = runConcurrently . F.foldMap (Concurrently . void . f)
740823

741824
-- | `forConcurrently_` is `forConcurrently` with the return value discarded;
742825
-- a concurrent equivalent of 'forM_'.
743-
forConcurrently_ :: F.Foldable f => f a -> (a -> IO b) -> IO ()
826+
forConcurrently_ ::
827+
#ifdef DEBUG_AUTO_LABEL
828+
HasCallStack =>
829+
#endif
830+
F.Foldable f => f a -> (a -> IO b) -> IO ()
744831
forConcurrently_ = flip mapConcurrently_
745832

746833
-- | Perform the action in the given number of threads.
747834
--
748835
-- @since 2.1.1
749-
replicateConcurrently :: Int -> IO a -> IO [a]
836+
replicateConcurrently ::
837+
#ifdef DEBUG_AUTO_LABEL
838+
HasCallStack =>
839+
#endif
840+
Int -> IO a -> IO [a]
750841
replicateConcurrently cnt = runConcurrently . sequenceA . replicate cnt . Concurrently
751842

752843
-- | Same as 'replicateConcurrently', but ignore the results.
753844
--
754845
-- @since 2.1.1
755-
replicateConcurrently_ :: Int -> IO a -> IO ()
846+
replicateConcurrently_ ::
847+
#ifdef DEBUG_AUTO_LABEL
848+
HasCallStack =>
849+
#endif
850+
Int -> IO a -> IO ()
756851
replicateConcurrently_ cnt = runConcurrently . F.fold . replicate cnt . Concurrently . void
757852

758853
-- -----------------------------------------------------------------------------
@@ -846,14 +941,18 @@ instance (Semigroup a, Monoid a) => Monoid (ConcurrentlyE e a) where
846941
-- | Fork a thread that runs the supplied action, and if it raises an
847942
-- exception, re-runs the action. The thread terminates only when the
848943
-- action runs to completion without raising an exception.
849-
forkRepeat :: IO a -> IO ThreadId
944+
forkRepeat ::
945+
#ifdef DEBUG_AUTO_LABEL
946+
HasCallStack =>
947+
#endif
948+
IO a -> IO ThreadId
850949
forkRepeat action =
851950
mask $ \restore ->
852951
let go = do r <- tryAll (restore action)
853952
case r of
854953
Left _ -> go
855954
_ -> return ()
856-
in forkIO go
955+
in forkIO (debugLabelMe >> go)
857956

858957
catchAll :: IO a -> (SomeException -> IO a) -> IO a
859958
catchAll = catch
@@ -865,11 +964,36 @@ tryAll = try
865964
-- handler: saves a bit of time when we will be installing our own
866965
-- exception handler.
867966
{-# INLINE rawForkIO #-}
868-
rawForkIO :: IO () -> IO ThreadId
869-
rawForkIO (IO action) = IO $ \ s ->
870-
case (fork# action s) of (# s1, tid #) -> (# s1, ThreadId tid #)
967+
rawForkIO ::
968+
#ifdef DEBUG_AUTO_LABEL
969+
HasCallStack =>
970+
#endif
971+
IO () -> IO ThreadId
972+
rawForkIO action = IO $ \ s ->
973+
case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
974+
where
975+
(IO action_plus) = debugLabelMe >> action
871976

872977
{-# INLINE rawForkOn #-}
873-
rawForkOn :: Int -> IO () -> IO ThreadId
874-
rawForkOn (I# cpu) (IO action) = IO $ \ s ->
875-
case (forkOn# cpu action s) of (# s1, tid #) -> (# s1, ThreadId tid #)
978+
rawForkOn ::
979+
#ifdef DEBUG_AUTO_LABEL
980+
HasCallStack =>
981+
#endif
982+
Int -> IO () -> IO ThreadId
983+
rawForkOn (I# cpu) action = IO $ \ s ->
984+
case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
985+
where
986+
(IO action_plus) = debugLabelMe >> action
987+
988+
989+
debugLabelMe ::
990+
#ifdef DEBUG_AUTO_LABEL
991+
HasCallStack =>
992+
#endif
993+
IO ()
994+
debugLabelMe =
995+
#ifdef DEBUG_AUTO_LABEL
996+
myThreadId >>= flip labelThread (prettyCallStack callStack)
997+
#else
998+
pure ()
999+
#endif

async.cabal

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,16 @@ source-repository head
6363
type: git
6464
location: https://github.com/simonmar/async.git
6565

66+
flag debug-auto-label
67+
description:
68+
Strictly for debugging as it might have a non-negligible overhead.
69+
70+
Enabling this flag will auto-label the threads spawned by @async@. Use it to
71+
find where are unlabelled threads spawned in your program (be it your code or
72+
dependency code).
73+
default: False
74+
manual: True
75+
6676
library
6777
default-language: Haskell2010
6878
other-extensions: CPP, MagicHash, RankNTypes, UnboxedTuples
@@ -73,6 +83,8 @@ library
7383
build-depends: base >= 4.3 && < 4.22,
7484
hashable >= 1.1.2.0 && < 1.6,
7585
stm >= 2.2 && < 2.6
86+
if flag(debug-auto-label)
87+
cpp-options: -DDEBUG_AUTO_LABEL
7688

7789
test-suite test-async
7890
default-language: Haskell2010

0 commit comments

Comments
 (0)