Skip to content

Commit

Permalink
move ThreadOptions into its own module
Browse files Browse the repository at this point in the history
  • Loading branch information
mitchellwrosen committed Nov 28, 2023
1 parent 0b1be3d commit bb0115d
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 75 deletions.
1 change: 1 addition & 0 deletions ki/ki.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ library
Ki.Internal.Scope
Ki.Internal.Thread
Ki.Internal.ThreadAffinity
Ki.Internal.ThreadOptions

test-suite tests
import: component
Expand Down
3 changes: 2 additions & 1 deletion ki/src/Ki.hs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ import Ki.Internal.Scope
fork_,
scoped,
)
import Ki.Internal.Thread (Thread, ThreadOptions (..), await, defaultThreadOptions)
import Ki.Internal.Thread (Thread, await)
import Ki.Internal.ThreadAffinity (ThreadAffinity (..))
import Ki.Internal.ThreadOptions (ThreadOptions (..), defaultThreadOptions)

-- $introduction
--
Expand Down
25 changes: 11 additions & 14 deletions ki/src/Ki/Internal/Scope.hs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ import Ki.Internal.IO
)
import Ki.Internal.NonblockingSTM
import Ki.Internal.Propagating (Tid, peelOffPropagating, propagate, pattern PropagatingFrom)
import Ki.Internal.Thread (Thread, ThreadOptions (..), defaultThreadOptions, forkWithAffinity, makeThread)
import Ki.Internal.Thread (Thread, forkWithAffinity, makeThread)
import Ki.Internal.ThreadOptions (ThreadOptions (..), defaultThreadOptions)

-- | A scope.
--
Expand Down Expand Up @@ -159,8 +160,8 @@ scoped action = do
atomically do
-- Block until we haven't committed to starting any threads. Without this, we may create a thread concurrently
-- with closing its scope, and not grab its thread id to throw an exception to.
n <- readTVar statusVar
assert (n >= 0) (guard (n == 0))
starting <- readTVar statusVar
assert (starting >= 0) (guard (starting == 0))
-- Indicate that this scope is closing, so attempts to create a new thread within it will throw ScopeClosing
-- (as if the calling thread was a parent of this scope, which it should be, and we threw it a ScopeClosing
-- ourselves).
Expand All @@ -185,7 +186,8 @@ scoped action = do
-- Block until all children have terminated; this relies on children respecting the async exception, which they
-- must, for correctness. Otherwise, a thread could indeed outlive the scope in which it's created, which is
-- definitely not structured concurrency!
blockUntilEmpty childrenVar
children <- readTVar childrenVar
guard (IntMap.Lazy.null children)
-- Record the scope as closed (from closing), so subsequent attempts to use it will throw a runtime exception
writeTVar statusVar Closed

Expand Down Expand Up @@ -292,19 +294,14 @@ unrecordChild childrenVar childId = do
-- | Wait until all threads created within a scope terminate.
awaitAll :: Scope -> STM ()
awaitAll Scope {childrenVar, statusVar} = do
blockUntilEmpty childrenVar
n <- readTVar statusVar
case n of
Open -> guard (n == 0)
children <- readTVar childrenVar
guard (IntMap.Lazy.null children)
status <- readTVar statusVar
case status of
Open -> guard (status == 0)
Closing -> retry -- block until closed
Closed -> pure ()

-- Block until an IntMap becomes empty.
blockUntilEmpty :: TVar (IntMap a) -> STM ()
blockUntilEmpty var = do
x <- readTVar var
guard (IntMap.Lazy.null x)

-- | Create a child thread to execute an action within a scope.
--
-- /Note/: The child thread does not mask asynchronous exceptions, regardless of the parent thread's masking state. To
Expand Down
61 changes: 1 addition & 60 deletions ki/src/Ki/Internal/Thread.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@ module Ki.Internal.Thread
makeThread,
await,
forkWithAffinity,
ThreadOptions (..),
defaultThreadOptions,
)
where

import Control.Concurrent (ThreadId, forkOS)
import Control.Exception (BlockedIndefinitelyOnSTM (..), MaskingState (..))
import Control.Exception (BlockedIndefinitelyOnSTM (..))
import GHC.Conc (STM)
import Ki.Internal.ByteCount (ByteCount)
import Ki.Internal.IO (forkIO, forkOn, tryEitherSTM)
import Ki.Internal.ThreadAffinity (ThreadAffinity (..))

Expand Down Expand Up @@ -58,62 +55,6 @@ forkWithAffinity = \case
Capability n -> forkOn n
OsThread -> Control.Concurrent.forkOS

-- |
--
-- [@affinity@]:
--
-- The affinity of a thread. A thread can be unbound, bound to a specific capability, or bound to a specific OS
-- thread.
--
-- Default: 'Unbound'
--
-- [@allocationLimit@]:
--
-- The maximum number of bytes a thread may allocate before it is delivered an
-- 'Control.Exception.AllocationLimitExceeded' exception. If caught, the thread is allowed to allocate an additional
-- 100kb (tunable with @+RTS -xq@) to perform any necessary cleanup actions; if exceeded, the thread is delivered
-- another.
--
-- Default: @Nothing@ (no limit)
--
-- [@label@]:
--
-- The label of a thread, visible in the [event log](https://downloads.haskell.org/ghc/latest/docs/html/users_guide/runtime_control.html#rts-eventlog) (@+RTS -l@).
--
-- Default: @""@ (no label)
--
-- [@maskingState@]:
--
-- The masking state a thread is created in. To unmask, use 'GHC.IO.unsafeUnmask'.
--
-- Default: @Unmasked@
data ThreadOptions = ThreadOptions
{ affinity :: ThreadAffinity,
allocationLimit :: Maybe ByteCount,
label :: String,
maskingState :: MaskingState
}
deriving stock (Eq, Show)

-- | Default thread options.
--
-- @
-- 'Ki.ThreadOptions'
-- { 'Ki.affinity' = 'Ki.Unbound'
-- , 'Ki.allocationLimit' = Nothing
-- , 'Ki.label' = ""
-- , 'Ki.maskingState' = 'Unmasked'
-- }
-- @
defaultThreadOptions :: ThreadOptions
defaultThreadOptions =
ThreadOptions
{ affinity = Unbound,
allocationLimit = Nothing,
label = "",
maskingState = Unmasked
}

-- | Wait for a thread to terminate.
await :: Thread a -> STM a
await =
Expand Down
65 changes: 65 additions & 0 deletions ki/src/Ki/Internal/ThreadOptions.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
module Ki.Internal.ThreadOptions
( ThreadOptions (..),
defaultThreadOptions,
)
where

import Control.Exception (MaskingState (..))
import Ki.Internal.ByteCount (ByteCount)
import Ki.Internal.ThreadAffinity (ThreadAffinity (..))

-- |
--
-- [@affinity@]:
--
-- The affinity of a thread. A thread can be unbound, bound to a specific capability, or bound to a specific OS
-- thread.
--
-- Default: 'Unbound'
--
-- [@allocationLimit@]:
--
-- The maximum number of bytes a thread may allocate before it is delivered an
-- 'Control.Exception.AllocationLimitExceeded' exception. If caught, the thread is allowed to allocate an additional
-- 100kb (tunable with @+RTS -xq@) to perform any necessary cleanup actions; if exceeded, the thread is delivered
-- another.
--
-- Default: @Nothing@ (no limit)
--
-- [@label@]:
--
-- The label of a thread, visible in the [event log](https://downloads.haskell.org/ghc/latest/docs/html/users_guide/runtime_control.html#rts-eventlog) (@+RTS -l@).
--
-- Default: @""@ (no label)
--
-- [@maskingState@]:
--
-- The masking state a thread is created in. To unmask, use 'GHC.IO.unsafeUnmask'.
--
-- Default: @Unmasked@
data ThreadOptions = ThreadOptions
{ affinity :: ThreadAffinity,
allocationLimit :: Maybe ByteCount,
label :: String,
maskingState :: MaskingState
}
deriving stock (Eq, Show)

-- | Default thread options.
--
-- @
-- 'Ki.ThreadOptions'
-- { 'Ki.affinity' = 'Ki.Unbound'
-- , 'Ki.allocationLimit' = Nothing
-- , 'Ki.label' = ""
-- , 'Ki.maskingState' = 'Unmasked'
-- }
-- @
defaultThreadOptions :: ThreadOptions
defaultThreadOptions =
ThreadOptions
{ affinity = Unbound,
allocationLimit = Nothing,
label = "",
maskingState = Unmasked
}

0 comments on commit bb0115d

Please sign in to comment.