Skip to content

Commit

Permalink
ouroboros-network-framework: added ForkPolicy
Browse files Browse the repository at this point in the history
Adding `ForkPolicy` callback rather than extending `MiniProtocol` type
(as it is done in `network-mux`'s `MiniProtocolInfo`).  This way we'll
be able to hide the policy inside `ouroboros-network` and not leak it to
`ouroboros-consensus` where the list of `MiniProtocol`s is constructed.
  • Loading branch information
coot committed Feb 19, 2025
1 parent 1cba25b commit b54fbc0
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 22 deletions.
1 change: 1 addition & 0 deletions ouroboros-network-framework/demo/connection-manager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ withBidirectionalConnectionManager snocket makeBearer socket
(makeConnectionHandler
muxTracer
SingInitiatorResponderMode
noBindForkPolicy
HandshakeArguments {
-- TraceSendRecv
haHandshakeTracer = ("handshake",) `contramap` debugTracer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ prop_socket_recv_error f rerr =
_ <- async $ do
threadDelay 0.1
atomically $ putTMVar lock ()
mux <- Mx.new (toMiniProtocolInfos app)
mux <- Mx.new (toMiniProtocolInfos (\_ _ -> Nothing) app)
let respCtx = ResponderContext connectionId
resOps <- sequence
[ Mx.runMiniProtocol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ prop_socket_recv_error f rerr =
_ <- async $ do
threadDelay 0.1
atomically $ putTMVar lock ()
mux <- Mx.new (toMiniProtocolInfos app)
mux <- Mx.new (toMiniProtocolInfos (\_ _ -> Nothing) app)
let respCtx = ResponderContext connectionId
resOps <- sequence
[ Mx.runMiniProtocol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import Ouroboros.Network.Protocol.Handshake
import Ouroboros.Network.Protocol.Handshake.Version qualified as Handshake
import Ouroboros.Network.RethrowPolicy


-- | We place an upper limit of `30s` on the time we wait on receiving an SDU.
-- There is no upper bound on the time we wait when waiting for a new SDU.
-- This makes it possible for mini-protocols to use timeouts that are larger
Expand Down Expand Up @@ -223,6 +224,7 @@ makeConnectionHandler
)
=> Tracer m (Mx.WithBearer (ConnectionId peerAddr) Mx.Trace)
-> SingMuxMode muxMode
-> ForkPolicy peerAddr
-- ^ describe whether this is outbound or inbound connection, and bring
-- evidence that we can use mux with it.
-> HandshakeArguments (ConnectionId peerAddr) versionNumber versionData m
Expand All @@ -233,6 +235,7 @@ makeConnectionHandler
-- exception to that thread, when trying to terminate the process.
-> MuxConnectionHandler muxMode socket initiatorCtx responderCtx peerAddr versionNumber versionData ByteString m a b
makeConnectionHandler muxTracer singMuxMode
forkPolicy
handshakeArguments
versionedApplication
(mainThreadId, rethrowPolicy) =
Expand Down Expand Up @@ -323,7 +326,7 @@ makeConnectionHandler muxTracer singMuxMode
<$> newTVarIO Continue
<*> newTVarIO Continue
<*> newTVarIO Continue
mux <- Mx.new (mkMiniProtocolInfos app)
mux <- Mx.new (mkMiniProtocolInfos (runForkPolicy forkPolicy remoteAddress) app)
let !handle = Handle {
hMux = mux,
hMuxBundle = app,
Expand Down Expand Up @@ -393,7 +396,7 @@ makeConnectionHandler muxTracer singMuxMode
<$> newTVarIO Continue
<*> newTVarIO Continue
<*> newTVarIO Continue
mux <- Mx.new (mkMiniProtocolInfos app)
mux <- Mx.new (mkMiniProtocolInfos (runForkPolicy forkPolicy remoteAddress) app)

let !handle = Handle {
hMux = mux,
Expand Down
65 changes: 57 additions & 8 deletions ouroboros-network-framework/src/Ouroboros/Network/Mux.hs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ module Ouroboros.Network.Mux
, MiniProtocolWithMinimalCtx
, MiniProtocolNum (..)
, MiniProtocolLimits (..)
, ForkPolicy (..)
, noBindForkPolicy
, responderForkPolicy
-- * MiniProtocol bundle
, OuroborosBundle
, OuroborosBundleWithExpandedCtx
Expand All @@ -60,6 +63,7 @@ import Control.Monad.Class.MonadThrow
import Control.Tracer (Tracer)

import Data.Foldable (fold)
import Data.Hashable
import Data.Kind (Type)
import Data.Void (Void)

Expand All @@ -72,6 +76,7 @@ import Network.TypedProtocol.Stateful.Peer qualified as Stateful
import Network.Mux qualified as Mux
import Network.Mux.Types (MiniProtocolInfo, MiniProtocolLimits,
MiniProtocolNum (..))
import Network.Mux.Types qualified as Mux

import Ouroboros.Network.Channel
import Ouroboros.Network.Context (ExpandedInitiatorContext,
Expand Down Expand Up @@ -245,8 +250,10 @@ data MiniProtocol (mode :: Mux.Mode) initiatorCtx responderCtx bytes m a b =
-- ^ mini-protocol callback(s)
}

mkMiniProtocolInfo :: MiniProtocol mode initiatorCtx responderCtx bytes m a b -> [MiniProtocolInfo mode]
mkMiniProtocolInfo MiniProtocol {
mkMiniProtocolInfo :: ForkPolicyCb
-> MiniProtocol mode initiatorCtx responderCtx bytes m a b
-> [MiniProtocolInfo mode]
mkMiniProtocolInfo forkPolicy MiniProtocol {
miniProtocolNum,
miniProtocolLimits,
miniProtocolRun
Expand All @@ -256,7 +263,9 @@ mkMiniProtocolInfo MiniProtocol {
Mux.miniProtocolNum,
Mux.miniProtocolDir = dir,
Mux.miniProtocolLimits,
Mux.miniProtocolCapability = Nothing
Mux.miniProtocolCapability =
forkPolicy miniProtocolNum
(Mux.protocolDirEnum dir)
}
| dir <- case miniProtocolRun of
InitiatorProtocolOnly{} -> [ Mux.InitiatorDirectionOnly ]
Expand Down Expand Up @@ -431,10 +440,48 @@ fromOuroborosBundle :: OuroborosBundle mode initiatorCtx responderCtx bytes
fromOuroborosBundle = OuroborosApplication . fold


toMiniProtocolInfos :: OuroborosApplication mode initiatorCtx responderCtx bytes m a b
-- | `ForkPolicy` guards to which capability each mini-protocol is bound.
--
type ForkPolicyCb = Mux.MiniProtocolNum
-> Mux.MiniProtocolDir
-> Maybe Int

-- | Extension of a `ForkPolicyCb` used by `ouroboros-network-framework` outside
-- of this module.
--
newtype ForkPolicy peerAddr = ForkPolicy {
runForkPolicy :: peerAddr -> ForkPolicyCb
}


-- | A `ForkPolicy` which does not bind mini-protocol threads to a given capability.
--
noBindForkPolicy :: ForkPolicy peerAddr
noBindForkPolicy = ForkPolicy (\_ _ _ -> Nothing)


-- | A `ForkPolicy` which binds responders mini-protocols to lower capabilities.
--
responderForkPolicy :: Hashable peerAddr
=> Int -- ^ salt
-> Int -- ^ number of capabilities
-> ForkPolicy peerAddr
responderForkPolicy salt numCapabilities = ForkPolicy {
runForkPolicy = \peerAddr _ miniProtocolDir ->
case miniProtocolDir of
Mux.InitiatorDir -> Nothing
Mux.ResponderDir -> Just $ hashWithSalt salt peerAddr
`mod` max 1 (numCapabilities - 2)


}


toMiniProtocolInfos :: ForkPolicyCb
-> OuroborosApplication mode initiatorCtx responderCtx bytes m a b
-> [MiniProtocolInfo mode]
toMiniProtocolInfos =
foldMap mkMiniProtocolInfo . getOuroborosApplication
toMiniProtocolInfos forkPolicy =
foldMap (mkMiniProtocolInfo forkPolicy) . getOuroborosApplication


contramapInitiatorCtx :: (initiatorCtx' -> initiatorCtx)
Expand All @@ -457,6 +504,8 @@ contramapInitiatorCtx f (OuroborosApplication ptcls) = OuroborosApplication
-- | Make 'MiniProtocolBundle', which is used to create a mux interface with
-- 'newMux'.
--
mkMiniProtocolInfos :: OuroborosBundle mode initiatorCtx responderCtx bytes m a b
mkMiniProtocolInfos :: ForkPolicyCb
-> OuroborosBundle mode initiatorCtx responderCtx bytes m a b
-> [MiniProtocolInfo mode]
mkMiniProtocolInfos = foldMap (foldMap mkMiniProtocolInfo)
mkMiniProtocolInfos forkPolicy = foldMap (foldMap (mkMiniProtocolInfo forkPolicy))

5 changes: 3 additions & 2 deletions ouroboros-network-framework/src/Ouroboros/Network/Socket.hs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ connectToNodeWithMux'
Right (HandshakeNegotiationResult app versionNumber agreedOptions) -> do
traceWith muxTracer $ Mx.TraceHandshakeClientEnd (diffTime ts_end ts_start)
bearer <- Mx.getBearer makeBearer sduTimeout muxTracer sd
mux <- Mx.new (toMiniProtocolInfos app)
mux <- Mx.new (toMiniProtocolInfos (runForkPolicy noBindForkPolicy remoteAddress) app)
withAsync (Mx.run muxTracer mux bearer) $ \aid ->
k connectionId versionNumber agreedOptions app mux aid

Expand Down Expand Up @@ -611,7 +611,8 @@ beginConnection makeBearer muxTracer handshakeTracer handshakeCodec handshakeTim
Right (HandshakeNegotiationResult (SomeResponderApplication app) versionNumber agreedOptions) -> do
traceWith muxTracer' Mx.TraceHandshakeServerEnd
bearer <- Mx.getBearer makeBearer sduTimeout muxTracer' sd
mux <- Mx.new (toMiniProtocolInfos app)
-- non-p2p: use `noBindForkPolicy`
mux <- Mx.new (toMiniProtocolInfos (runForkPolicy noBindForkPolicy remoteAddress) app)
withAsync (Mx.run muxTracer' mux bearer) $ \aid ->
void $ simpleMuxCallback connectionId versionNumber agreedOptions app mux aid

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ withInitiatorOnlyConnectionManager name timeouts trTracer tracer stdGen snocket
(makeConnectionHandler
muxTracer
SingInitiatorMode
noBindForkPolicy
HandshakeArguments {
-- TraceSendRecv
haHandshakeTracer = (name,) `contramap` nullTracer,
Expand Down Expand Up @@ -501,6 +502,7 @@ withBidirectionalConnectionManager name timeouts
(makeConnectionHandler
muxTracer
SingInitiatorResponderMode
noBindForkPolicy
HandshakeArguments {
-- TraceSendRecv
haHandshakeTracer = WithName name `contramap` nullTracer,
Expand Down
4 changes: 2 additions & 2 deletions ouroboros-network/io-tests/Test/Ouroboros/Network/Pipe.hs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ demo chain0 updates = do
serverBearer <- Mx.getBearer Mx.makePipeChannelBearer (-1) activeTracer chan2

_ <- async $ do
clientMux <- Mx.new (toMiniProtocolInfos consumerApp)
clientMux <- Mx.new (toMiniProtocolInfos (\_ _ -> Nothing) consumerApp)
let initCtx = MinimalInitiatorContext (ConnectionId "consumer" "producer")
resOps <- sequence
[ Mx.runMiniProtocol
Expand All @@ -223,7 +223,7 @@ demo chain0 updates = do
wait aid

_ <- async $ do
serverMux <- Mx.new (toMiniProtocolInfos producerApp)
serverMux <- Mx.new (toMiniProtocolInfos (\_ _ -> Nothing) producerApp)
let respCtx = ResponderContext (ConnectionId "consumer" "producer")
resOps <- sequence
[ Mx.runMiniProtocol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ run blockGeneratorArgs limits ni na
, P2P.diUpdateVersionData = \versionData diffusionMode ->
versionData { ntnDiffusionMode = diffusionMode }
, P2P.diConnStateIdSupply = iConnStateIdSupply ni
, P2P.diGetNumCapabilities = return 1
}

appsExtra :: P2P.ApplicationsExtra NtNAddr m ()
Expand Down
4 changes: 2 additions & 2 deletions ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Mux.hs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ demo chain0 updates delay = do
}

clientAsync <- async $ do
clientMux <- Mx.new (toMiniProtocolInfos consumerApp)
clientMux <- Mx.new (toMiniProtocolInfos (\_ _ -> Nothing) consumerApp)
let initCtx = MinimalInitiatorContext (ConnectionId "consumer" "producer")
resOps <- sequence
[ Mx.runMiniProtocol
Expand All @@ -190,7 +190,7 @@ demo chain0 updates delay = do
wait aid

serverAsync <- async $ do
serverMux <- Mx.new (toMiniProtocolInfos producerApp)
serverMux <- Mx.new (toMiniProtocolInfos (\_ _ -> Nothing) producerApp)
let respCtx = ResponderContext (ConnectionId "producer" "consumer")
resOps <- sequence
[ Mx.runMiniProtocol
Expand Down
20 changes: 16 additions & 4 deletions ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ module Ouroboros.Network.Diffusion.P2P


import Control.Applicative (Alternative)
import Control.Concurrent qualified as IO
import Control.Concurrent.Class.MonadMVar (MonadMVar)
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Exception (IOException)
Expand All @@ -65,7 +66,7 @@ import Data.Void (Void)
import Network.Socket (Socket)
import GHC.IO.Exception (IOErrorType (..), IOException (..))
import System.Exit (ExitCode)
import System.Random (StdGen, newStdGen, split)
import System.Random (StdGen, newStdGen, split, random)

import Network.Socket qualified as Socket
import Network.Mux qualified as Mx
Expand Down Expand Up @@ -598,7 +599,12 @@ data Interfaces ntnFd ntnAddr ntnVersion ntnVersionData
-- `ConnStateIdSupply`.
--
diConnStateIdSupply
:: ConnStateIdSupply m
:: ConnStateIdSupply m,

-- | Get the number of capabilities used by the RTS.
--
diGetNumCapabilities
:: m Int
}


Expand Down Expand Up @@ -689,6 +695,7 @@ runM Interfaces
, diDnsActions
, diUpdateVersionData
, diConnStateIdSupply
, diGetNumCapabilities
}
Tracers
{ dtMuxTracer
Expand Down Expand Up @@ -774,7 +781,8 @@ runM Interfaces
(churnRng, rng3) = split rng2
(fuzzRng, rng4) = split rng3
(cmLocalStdGen, rng5) = split rng4
(cmStdGen1, cmStdGen2) = split rng5
(cmStdGen1, rng6) = split rng5
(cmStdGen2, forkRng) = split rng6


mkInboundPeersMap :: IG.PublicState ntnAddr ntnVersionData
Expand Down Expand Up @@ -825,6 +833,7 @@ runM Interfaces
makeConnectionHandler
dtLocalMuxTracer
SingResponderMode
noBindForkPolicy
diNtcHandshakeArguments
( ( \ (OuroborosApplication apps)
-> TemperatureBundle
Expand Down Expand Up @@ -989,6 +998,7 @@ runM Interfaces
simplePeerSelectionPolicy
policyRngVar daPeerMetrics (epErrorDelay exitPolicy)

numCapabilities <- diGetNumCapabilities
let makeConnectionHandler'
:: forall muxMode socket initiatorCtx responderCtx b c.
SingMuxMode muxMode
Expand All @@ -1001,6 +1011,7 @@ runM Interfaces
makeConnectionHandler
dtMuxTracer
muxMode
(responderForkPolicy (fst $ random forkRng) numCapabilities)
diNtnHandshakeArguments
versions
(mainThreadId, rethrowPolicy <> daRethrowPolicy)
Expand Down Expand Up @@ -1433,7 +1444,8 @@ run sigUSR1Signal tracers tracersExtra args argsExtra apps appsExtra = do
diNtcHandshakeArguments,
diRng,
diUpdateVersionData = \versionData diffusionMode -> versionData { diffusionMode },
diConnStateIdSupply
diConnStateIdSupply,
diGetNumCapabilities = IO.getNumCapabilities
}
tracers tracersExtra args argsExtra apps appsExtra

Expand Down

0 comments on commit b54fbc0

Please sign in to comment.