Skip to content

Commit

Permalink
network-mux: provide a way to run bind a mini-protocol to a capability
Browse files Browse the repository at this point in the history
Co-authored-by: Karl Knutsson (@karknu)
Co-authored-by: Marcin Szamotulski (@coot)
  • Loading branch information
coot committed Feb 19, 2025
1 parent 043280f commit 1cba25b
Show file tree
Hide file tree
Showing 11 changed files with 179 additions and 72 deletions.
14 changes: 8 additions & 6 deletions network-mux/demo/mux-demo.hs
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,10 @@ serverWorker bearer = do
where
ptcls :: [MiniProtocolInfo ResponderMode]
ptcls = [ MiniProtocolInfo {
miniProtocolNum = MiniProtocolNum 2,
miniProtocolDir = ResponderDirectionOnly,
miniProtocolLimits = defaultProtocolLimits
miniProtocolNum = MiniProtocolNum 2,
miniProtocolDir = ResponderDirectionOnly,
miniProtocolLimits = defaultProtocolLimits,
miniProtocolCapability = Nothing
}
]

Expand Down Expand Up @@ -196,9 +197,10 @@ clientWorker bearer n msg = do
where
ptcls :: [MiniProtocolInfo Mx.InitiatorMode]
ptcls = [ MiniProtocolInfo {
miniProtocolNum = MiniProtocolNum 2,
miniProtocolDir = InitiatorDirectionOnly,
miniProtocolLimits = defaultProtocolLimits
miniProtocolNum = MiniProtocolNum 2,
miniProtocolDir = InitiatorDirectionOnly,
miniProtocolLimits = defaultProtocolLimits,
miniProtocolCapability = Nothing
}
]

Expand Down
50 changes: 41 additions & 9 deletions network-mux/src/Control/Concurrent/JobPool.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module Control.Concurrent.JobPool
, Job (..)
, withJobPool
, forkJob
, forkJobOn
, readSize
, readGroupSize
, waitForJob
Expand All @@ -29,6 +30,7 @@ import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork (MonadThread (..))
import Control.Monad.Class.MonadThrow


-- | JobPool allows to submit asynchronous jobs, wait for their completion or
-- cancel. Jobs are grouped, each group can be cancelled separately.
--
Expand Down Expand Up @@ -69,16 +71,19 @@ withJobPool =
jobs <- readTVarIO jobsVar
mapM_ uninterruptibleCancel jobs

forkJob :: forall group m a.
( MonadAsync m, MonadMask m
, Ord group
)
=> JobPool group m a
-> Job group m a
-> m ()
forkJob JobPool{jobsVar, completionQueue} (Job action handler group label) =

forkJob' :: forall group m a.
( MonadAsync m, MonadMask m
, Ord group
)
=> (m () -> m (Async m ()))
-- ^ how to fork a thread, e.g. `async`, `asyncOn`.
-> JobPool group m a
-> Job group m a
-> m ()
forkJob' doFork JobPool{jobsVar, completionQueue} (Job action handler group label) =
mask $ \restore -> do
jobAsync <- async $ do
jobAsync <- doFork $ do
tid <- myThreadId
io tid restore
`onException`
Expand All @@ -104,6 +109,33 @@ forkJob JobPool{jobsVar, completionQueue} (Job action handler group label) =
restore action
atomically $ writeTQueue completionQueue res



-- | Fork a `Job` using `async`.
--
forkJob :: forall group m a.
( MonadAsync m, MonadMask m
, Ord group
)
=> JobPool group m a
-> Job group m a
-> m ()
forkJob = forkJob' async


-- | Fork a `Job` using `asyncOn`.
--
forkJobOn :: forall group m a.
( MonadAsync m, MonadMask m
, Ord group
)
=> Int
-> JobPool group m a
-> Job group m a
-> m ()
forkJobOn cap = forkJob' (asyncOn cap)


readSize :: MonadSTM m => JobPool group m a -> STM m Int
readSize JobPool{jobsVar} = Map.size <$> readTVar jobsVar

Expand Down
61 changes: 43 additions & 18 deletions network-mux/src/Network/Mux.hs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ data Group = MuxJob
-- * at any given time each @TranslocationServiceRequest@ contains a non-empty
-- 'Wanton'
--
run :: forall m mode.
run :: forall m (mode :: Mode).
( MonadAsync m
, MonadFork m
, MonadLabelledSTM m
Expand All @@ -214,7 +214,12 @@ run :: forall m mode.
-> Mux mode m
-> Bearer m
-> m ()
run tracer Mux {muxMiniProtocols, muxControlCmdQueue, muxStatus} bearer@Bearer {name} = do
run tracer
Mux { muxMiniProtocols,
muxControlCmdQueue,
muxStatus
}
bearer@Bearer{name} = do
egressQueue <- atomically $ newTBQueue 100

-- label shared variables
Expand Down Expand Up @@ -459,18 +464,28 @@ monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
ptclState@MiniProtocolState {
miniProtocolInfo = MiniProtocolInfo {
miniProtocolNum,
miniProtocolDir
miniProtocolDir,
miniProtocolCapability
}
}
ptclAction) -> do
traceWith tracer (TraceStartEagerly miniProtocolNum
(protocolDirEnum miniProtocolDir))
JobPool.forkJob jobpool $
miniProtocolJob
tracer
egressQueue
ptclState
ptclAction
(protocolDirEnum miniProtocolDir))
case miniProtocolCapability of
Nothing ->
JobPool.forkJob jobpool $
miniProtocolJob
tracer
egressQueue
ptclState
ptclAction
Just cap ->
JobPool.forkJobOn cap jobpool $
miniProtocolJob
tracer
egressQueue
ptclState
ptclAction
go monitorCtx

EventControlCmd (CmdStartProtocolThread
Expand Down Expand Up @@ -554,20 +569,30 @@ monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
doStartOnDemand ptclState@MiniProtocolState {
miniProtocolInfo = MiniProtocolInfo {
miniProtocolNum,
miniProtocolDir
miniProtocolDir,
miniProtocolCapability
},
miniProtocolStatusVar
}
ptclAction = do
traceWith tracer (TraceStartedOnDemand miniProtocolNum
(protocolDirEnum miniProtocolDir))
(protocolDirEnum miniProtocolDir))
atomically $ modifyTVar miniProtocolStatusVar (\a -> assert (a /= StatusRunning) StatusRunning)
JobPool.forkJob jobpool $
miniProtocolJob
tracer
egressQueue
ptclState
ptclAction
case miniProtocolCapability of
Nothing ->
JobPool.forkJob jobpool $
miniProtocolJob
tracer
egressQueue
ptclState
ptclAction
Just cap ->
JobPool.forkJobOn cap jobpool $
miniProtocolJob
tracer
egressQueue
ptclState
ptclAction

checkNonEmptyQueue :: IngressQueue m -> STM m ()
checkNonEmptyQueue q = do
Expand Down
8 changes: 5 additions & 3 deletions network-mux/src/Network/Mux/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,14 @@ type family HasResponder (mode :: Mode) :: Bool where
--
data MiniProtocolInfo (mode :: Mode) =
MiniProtocolInfo {
miniProtocolNum :: !MiniProtocolNum,
miniProtocolNum :: !MiniProtocolNum,
-- ^ Unique mini-protocol number.
miniProtocolDir :: !(MiniProtocolDirection mode),
miniProtocolDir :: !(MiniProtocolDirection mode),
-- ^ Mini-protocol direction.
miniProtocolLimits :: !MiniProtocolLimits
miniProtocolLimits :: !MiniProtocolLimits,
-- ^ ingress queue limits for the protocol
miniProtocolCapability :: !(Maybe Int)
-- ^ capability on which the mini-protocol should run
}

data MiniProtocolDirection (mode :: Mode) where
Expand Down
Loading

0 comments on commit 1cba25b

Please sign in to comment.