diff --git a/ki/ki.cabal b/ki/ki.cabal index 2629c34..d2eb3d1 100644 --- a/ki/ki.cabal +++ b/ki/ki.cabal @@ -95,6 +95,7 @@ library Ki.Internal.Scope Ki.Internal.Thread Ki.Internal.ThreadAffinity + Ki.Internal.ThreadOptions test-suite tests import: component diff --git a/ki/src/Ki.hs b/ki/src/Ki.hs index 1a27f37..8be7098 100644 --- a/ki/src/Ki.hs +++ b/ki/src/Ki.hs @@ -48,8 +48,9 @@ import Ki.Internal.Scope fork_, scoped, ) -import Ki.Internal.Thread (Thread, ThreadOptions (..), await, defaultThreadOptions) +import Ki.Internal.Thread (Thread, await) import Ki.Internal.ThreadAffinity (ThreadAffinity (..)) +import Ki.Internal.ThreadOptions (ThreadOptions (..), defaultThreadOptions) -- $introduction -- diff --git a/ki/src/Ki/Internal/Scope.hs b/ki/src/Ki/Internal/Scope.hs index e06d4d0..9cedca3 100644 --- a/ki/src/Ki/Internal/Scope.hs +++ b/ki/src/Ki/Internal/Scope.hs @@ -61,7 +61,8 @@ import Ki.Internal.IO ) import Ki.Internal.NonblockingSTM import Ki.Internal.Propagating (Tid, peelOffPropagating, propagate, pattern PropagatingFrom) -import Ki.Internal.Thread (Thread, ThreadOptions (..), defaultThreadOptions, forkWithAffinity, makeThread) +import Ki.Internal.Thread (Thread, forkWithAffinity, makeThread) +import Ki.Internal.ThreadOptions (ThreadOptions (..), defaultThreadOptions) -- | A scope. -- @@ -159,8 +160,8 @@ scoped action = do atomically do -- Block until we haven't committed to starting any threads. Without this, we may create a thread concurrently -- with closing its scope, and not grab its thread id to throw an exception to. - n <- readTVar statusVar - assert (n >= 0) (guard (n == 0)) + starting <- readTVar statusVar + assert (starting >= 0) (guard (starting == 0)) -- Indicate that this scope is closing, so attempts to create a new thread within it will throw ScopeClosing -- (as if the calling thread was a parent of this scope, which it should be, and we threw it a ScopeClosing -- ourselves). @@ -185,7 +186,8 @@ scoped action = do -- Block until all children have terminated; this relies on children respecting the async exception, which they -- must, for correctness. Otherwise, a thread could indeed outlive the scope in which it's created, which is -- definitely not structured concurrency! - blockUntilEmpty childrenVar + children <- readTVar childrenVar + guard (IntMap.Lazy.null children) -- Record the scope as closed (from closing), so subsequent attempts to use it will throw a runtime exception writeTVar statusVar Closed @@ -292,19 +294,14 @@ unrecordChild childrenVar childId = do -- | Wait until all threads created within a scope terminate. awaitAll :: Scope -> STM () awaitAll Scope {childrenVar, statusVar} = do - blockUntilEmpty childrenVar - n <- readTVar statusVar - case n of - Open -> guard (n == 0) + children <- readTVar childrenVar + guard (IntMap.Lazy.null children) + status <- readTVar statusVar + case status of + Open -> guard (status == 0) Closing -> retry -- block until closed Closed -> pure () --- Block until an IntMap becomes empty. -blockUntilEmpty :: TVar (IntMap a) -> STM () -blockUntilEmpty var = do - x <- readTVar var - guard (IntMap.Lazy.null x) - -- | Create a child thread to execute an action within a scope. -- -- /Note/: The child thread does not mask asynchronous exceptions, regardless of the parent thread's masking state. To diff --git a/ki/src/Ki/Internal/Thread.hs b/ki/src/Ki/Internal/Thread.hs index 7c369ba..fde4b2d 100644 --- a/ki/src/Ki/Internal/Thread.hs +++ b/ki/src/Ki/Internal/Thread.hs @@ -3,15 +3,12 @@ module Ki.Internal.Thread makeThread, await, forkWithAffinity, - ThreadOptions (..), - defaultThreadOptions, ) where import Control.Concurrent (ThreadId, forkOS) -import Control.Exception (BlockedIndefinitelyOnSTM (..), MaskingState (..)) +import Control.Exception (BlockedIndefinitelyOnSTM (..)) import GHC.Conc (STM) -import Ki.Internal.ByteCount (ByteCount) import Ki.Internal.IO (forkIO, forkOn, tryEitherSTM) import Ki.Internal.ThreadAffinity (ThreadAffinity (..)) @@ -58,62 +55,6 @@ forkWithAffinity = \case Capability n -> forkOn n OsThread -> Control.Concurrent.forkOS --- | --- --- [@affinity@]: --- --- The affinity of a thread. A thread can be unbound, bound to a specific capability, or bound to a specific OS --- thread. --- --- Default: 'Unbound' --- --- [@allocationLimit@]: --- --- The maximum number of bytes a thread may allocate before it is delivered an --- 'Control.Exception.AllocationLimitExceeded' exception. If caught, the thread is allowed to allocate an additional --- 100kb (tunable with @+RTS -xq@) to perform any necessary cleanup actions; if exceeded, the thread is delivered --- another. --- --- Default: @Nothing@ (no limit) --- --- [@label@]: --- --- The label of a thread, visible in the [event log](https://downloads.haskell.org/ghc/latest/docs/html/users_guide/runtime_control.html#rts-eventlog) (@+RTS -l@). --- --- Default: @""@ (no label) --- --- [@maskingState@]: --- --- The masking state a thread is created in. To unmask, use 'GHC.IO.unsafeUnmask'. --- --- Default: @Unmasked@ -data ThreadOptions = ThreadOptions - { affinity :: ThreadAffinity, - allocationLimit :: Maybe ByteCount, - label :: String, - maskingState :: MaskingState - } - deriving stock (Eq, Show) - --- | Default thread options. --- --- @ --- 'Ki.ThreadOptions' --- { 'Ki.affinity' = 'Ki.Unbound' --- , 'Ki.allocationLimit' = Nothing --- , 'Ki.label' = "" --- , 'Ki.maskingState' = 'Unmasked' --- } --- @ -defaultThreadOptions :: ThreadOptions -defaultThreadOptions = - ThreadOptions - { affinity = Unbound, - allocationLimit = Nothing, - label = "", - maskingState = Unmasked - } - -- | Wait for a thread to terminate. await :: Thread a -> STM a await = diff --git a/ki/src/Ki/Internal/ThreadOptions.hs b/ki/src/Ki/Internal/ThreadOptions.hs new file mode 100644 index 0000000..63543b5 --- /dev/null +++ b/ki/src/Ki/Internal/ThreadOptions.hs @@ -0,0 +1,65 @@ +module Ki.Internal.ThreadOptions + ( ThreadOptions (..), + defaultThreadOptions, + ) +where + +import Control.Exception (MaskingState (..)) +import Ki.Internal.ByteCount (ByteCount) +import Ki.Internal.ThreadAffinity (ThreadAffinity (..)) + +-- | +-- +-- [@affinity@]: +-- +-- The affinity of a thread. A thread can be unbound, bound to a specific capability, or bound to a specific OS +-- thread. +-- +-- Default: 'Unbound' +-- +-- [@allocationLimit@]: +-- +-- The maximum number of bytes a thread may allocate before it is delivered an +-- 'Control.Exception.AllocationLimitExceeded' exception. If caught, the thread is allowed to allocate an additional +-- 100kb (tunable with @+RTS -xq@) to perform any necessary cleanup actions; if exceeded, the thread is delivered +-- another. +-- +-- Default: @Nothing@ (no limit) +-- +-- [@label@]: +-- +-- The label of a thread, visible in the [event log](https://downloads.haskell.org/ghc/latest/docs/html/users_guide/runtime_control.html#rts-eventlog) (@+RTS -l@). +-- +-- Default: @""@ (no label) +-- +-- [@maskingState@]: +-- +-- The masking state a thread is created in. To unmask, use 'GHC.IO.unsafeUnmask'. +-- +-- Default: @Unmasked@ +data ThreadOptions = ThreadOptions + { affinity :: ThreadAffinity, + allocationLimit :: Maybe ByteCount, + label :: String, + maskingState :: MaskingState + } + deriving stock (Eq, Show) + +-- | Default thread options. +-- +-- @ +-- 'Ki.ThreadOptions' +-- { 'Ki.affinity' = 'Ki.Unbound' +-- , 'Ki.allocationLimit' = Nothing +-- , 'Ki.label' = "" +-- , 'Ki.maskingState' = 'Unmasked' +-- } +-- @ +defaultThreadOptions :: ThreadOptions +defaultThreadOptions = + ThreadOptions + { affinity = Unbound, + allocationLimit = Nothing, + label = "", + maskingState = Unmasked + }