Skip to content

Commit

Permalink
ouroboros-network-framework: added ForkPolicy
Browse files Browse the repository at this point in the history
Adding `ForkPolicy` callback rather than extending `MiniProtocol` type
(as it is done in `network-mux`'s `MiniProtocolInfo`).  This way we'll
be able to hide the policy inside `ouroboros-network` and not leak it to
`ouroboros-consensus` where the list of `MiniProtocol`s is constructed.
  • Loading branch information
coot committed Feb 20, 2025
1 parent b66c270 commit c20311c
Show file tree
Hide file tree
Showing 12 changed files with 84 additions and 18 deletions.
1 change: 1 addition & 0 deletions ouroboros-network-framework/demo/connection-manager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ withBidirectionalConnectionManager snocket makeBearer socket
(makeConnectionHandler
muxTracer
SingInitiatorResponderMode
noBindForkPolicy
HandshakeArguments {
-- TraceSendRecv
haHandshakeTracer = ("handshake",) `contramap` debugTracer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ prop_socket_recv_error f rerr =
_ <- async $ do
threadDelay 0.1
atomically $ putTMVar lock ()
mux <- Mx.new (toMiniProtocolInfos app)
mux <- Mx.new (toMiniProtocolInfos (\_ _ -> Nothing) app)
let respCtx = ResponderContext connectionId
resOps <- sequence
[ Mx.runMiniProtocol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ prop_socket_recv_error f rerr =
_ <- async $ do
threadDelay 0.1
atomically $ putTMVar lock ()
mux <- Mx.new (toMiniProtocolInfos app)
mux <- Mx.new (toMiniProtocolInfos (\_ _ -> Nothing) app)
let respCtx = ResponderContext connectionId
resOps <- sequence
[ Mx.runMiniProtocol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import Ouroboros.Network.Protocol.Handshake
import Ouroboros.Network.Protocol.Handshake.Version qualified as Handshake
import Ouroboros.Network.RethrowPolicy


-- | We place an upper limit of `30s` on the time we wait on receiving an SDU.
-- There is no upper bound on the time we wait when waiting for a new SDU.
-- This makes it possible for mini-protocols to use timeouts that are larger
Expand Down Expand Up @@ -223,6 +224,7 @@ makeConnectionHandler
)
=> Tracer m (Mx.WithBearer (ConnectionId peerAddr) Mx.Trace)
-> SingMuxMode muxMode
-> ForkPolicy peerAddr
-- ^ describe whether this is outbound or inbound connection, and bring
-- evidence that we can use mux with it.
-> HandshakeArguments (ConnectionId peerAddr) versionNumber versionData m
Expand All @@ -233,6 +235,7 @@ makeConnectionHandler
-- exception to that thread, when trying to terminate the process.
-> MuxConnectionHandler muxMode socket initiatorCtx responderCtx peerAddr versionNumber versionData ByteString m a b
makeConnectionHandler muxTracer singMuxMode
forkPolicy
handshakeArguments
versionedApplication
(mainThreadId, rethrowPolicy) =
Expand Down Expand Up @@ -323,7 +326,7 @@ makeConnectionHandler muxTracer singMuxMode
<$> newTVarIO Continue
<*> newTVarIO Continue
<*> newTVarIO Continue
mux <- Mx.new (mkMiniProtocolInfos app)
mux <- Mx.new (mkMiniProtocolInfos (runForkPolicy forkPolicy remoteAddress) app)
let !handle = Handle {
hMux = mux,
hMuxBundle = app,
Expand Down Expand Up @@ -392,7 +395,7 @@ makeConnectionHandler muxTracer singMuxMode
<$> newTVarIO Continue
<*> newTVarIO Continue
<*> newTVarIO Continue
mux <- Mx.new (mkMiniProtocolInfos app)
mux <- Mx.new (mkMiniProtocolInfos (runForkPolicy forkPolicy remoteAddress) app)

let !handle = Handle {
hMux = mux,
Expand Down
65 changes: 57 additions & 8 deletions ouroboros-network-framework/src/Ouroboros/Network/Mux.hs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ module Ouroboros.Network.Mux
, MiniProtocolWithMinimalCtx
, MiniProtocolNum (..)
, MiniProtocolLimits (..)
, ForkPolicy (..)
, noBindForkPolicy
, responderForkPolicy
-- * MiniProtocol bundle
, OuroborosBundle
, OuroborosBundleWithExpandedCtx
Expand All @@ -60,6 +63,7 @@ import Control.Monad.Class.MonadThrow
import Control.Tracer (Tracer)

import Data.Foldable (fold)
import Data.Hashable
import Data.Kind (Type)
import Data.Void (Void)

Expand All @@ -72,6 +76,7 @@ import Network.TypedProtocol.Stateful.Peer qualified as Stateful
import Network.Mux qualified as Mux
import Network.Mux.Types (MiniProtocolInfo, MiniProtocolLimits,
MiniProtocolNum (..))
import Network.Mux.Types qualified as Mux

import Ouroboros.Network.Channel
import Ouroboros.Network.Context (ExpandedInitiatorContext,
Expand Down Expand Up @@ -245,8 +250,10 @@ data MiniProtocol (mode :: Mux.Mode) initiatorCtx responderCtx bytes m a b =
-- ^ mini-protocol callback(s)
}

mkMiniProtocolInfo :: MiniProtocol mode initiatorCtx responderCtx bytes m a b -> [MiniProtocolInfo mode]
mkMiniProtocolInfo MiniProtocol {
mkMiniProtocolInfo :: ForkPolicyCb
-> MiniProtocol mode initiatorCtx responderCtx bytes m a b
-> [MiniProtocolInfo mode]
mkMiniProtocolInfo forkPolicy MiniProtocol {
miniProtocolNum,
miniProtocolLimits,
miniProtocolRun
Expand All @@ -256,7 +263,9 @@ mkMiniProtocolInfo MiniProtocol {
Mux.miniProtocolNum,
Mux.miniProtocolDir = dir,
Mux.miniProtocolLimits,
Mux.miniProtocolCapability = Nothing
Mux.miniProtocolCapability =
forkPolicy miniProtocolNum
(Mux.protocolDirEnum dir)
}
| dir <- case miniProtocolRun of
InitiatorProtocolOnly{} -> [ Mux.InitiatorDirectionOnly ]
Expand Down Expand Up @@ -431,10 +440,48 @@ fromOuroborosBundle :: OuroborosBundle mode initiatorCtx responderCtx bytes
fromOuroborosBundle = OuroborosApplication . fold


toMiniProtocolInfos :: OuroborosApplication mode initiatorCtx responderCtx bytes m a b
-- | `ForkPolicy` guards to which capability each mini-protocol is bound.
--
type ForkPolicyCb = Mux.MiniProtocolNum
-> Mux.MiniProtocolDir
-> Maybe Int

-- | Extension of a `ForkPolicyCb` used by `ouroboros-network-framework` outside
-- of this module.
--
newtype ForkPolicy peerAddr = ForkPolicy {
runForkPolicy :: peerAddr -> ForkPolicyCb
}


-- | A `ForkPolicy` which does not bind mini-protocol threads to a given capability.
--
noBindForkPolicy :: ForkPolicy peerAddr
noBindForkPolicy = ForkPolicy (\_ _ _ -> Nothing)


-- | A `ForkPolicy` which binds responders mini-protocols to lower capabilities.
--
responderForkPolicy :: Hashable peerAddr
=> Int -- ^ salt
-> Int -- ^ number of capabilities
-> ForkPolicy peerAddr
responderForkPolicy salt numCapabilities = ForkPolicy {
runForkPolicy = \peerAddr _ miniProtocolDir ->
case miniProtocolDir of
Mux.InitiatorDir -> Nothing
Mux.ResponderDir -> Just $ hashWithSalt salt peerAddr
`mod` max 1 (numCapabilities - 2)


}


toMiniProtocolInfos :: ForkPolicyCb
-> OuroborosApplication mode initiatorCtx responderCtx bytes m a b
-> [MiniProtocolInfo mode]
toMiniProtocolInfos =
foldMap mkMiniProtocolInfo . getOuroborosApplication
toMiniProtocolInfos forkPolicy =
foldMap (mkMiniProtocolInfo forkPolicy) . getOuroborosApplication


contramapInitiatorCtx :: (initiatorCtx' -> initiatorCtx)
Expand All @@ -457,6 +504,8 @@ contramapInitiatorCtx f (OuroborosApplication ptcls) = OuroborosApplication
-- | Make 'MiniProtocolBundle', which is used to create a mux interface with
-- 'newMux'.
--
mkMiniProtocolInfos :: OuroborosBundle mode initiatorCtx responderCtx bytes m a b
mkMiniProtocolInfos :: ForkPolicyCb
-> OuroborosBundle mode initiatorCtx responderCtx bytes m a b
-> [MiniProtocolInfo mode]
mkMiniProtocolInfos = foldMap (foldMap mkMiniProtocolInfo)
mkMiniProtocolInfos forkPolicy = foldMap (foldMap (mkMiniProtocolInfo forkPolicy))

5 changes: 3 additions & 2 deletions ouroboros-network-framework/src/Ouroboros/Network/Socket.hs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ connectToNodeWithMux'
Right (HandshakeNegotiationResult app versionNumber agreedOptions) -> do
traceWith muxTracer $ Mx.TraceHandshakeClientEnd (diffTime ts_end ts_start)
bearer <- Mx.getBearer makeBearer sduTimeout muxTracer sd
mux <- Mx.new (toMiniProtocolInfos app)
mux <- Mx.new (toMiniProtocolInfos (runForkPolicy noBindForkPolicy remoteAddress) app)
withAsync (Mx.run muxTracer mux bearer) $ \aid ->
k connectionId versionNumber agreedOptions app mux aid

Expand Down Expand Up @@ -611,7 +611,8 @@ beginConnection makeBearer muxTracer handshakeTracer handshakeCodec handshakeTim
Right (HandshakeNegotiationResult (SomeResponderApplication app) versionNumber agreedOptions) -> do
traceWith muxTracer' Mx.TraceHandshakeServerEnd
bearer <- Mx.getBearer makeBearer sduTimeout muxTracer' sd
mux <- Mx.new (toMiniProtocolInfos app)
-- non-p2p: use `noBindForkPolicy`
mux <- Mx.new (toMiniProtocolInfos (runForkPolicy noBindForkPolicy remoteAddress) app)
withAsync (Mx.run muxTracer' mux bearer) $ \aid ->
void $ simpleMuxCallback connectionId versionNumber agreedOptions app mux aid

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ withInitiatorOnlyConnectionManager name timeouts trTracer tracer stdGen snocket
(makeConnectionHandler
muxTracer
SingInitiatorMode
noBindForkPolicy
HandshakeArguments {
-- TraceSendRecv
haHandshakeTracer = (name,) `contramap` nullTracer,
Expand Down Expand Up @@ -501,6 +502,7 @@ withBidirectionalConnectionManager name timeouts
(makeConnectionHandler
muxTracer
SingInitiatorResponderMode
noBindForkPolicy
HandshakeArguments {
-- TraceSendRecv
haHandshakeTracer = WithName name `contramap` nullTracer,
Expand Down
1 change: 1 addition & 0 deletions ouroboros-network/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@
(`PeerTrustable` was abstracted out, since this was a bootstrap peers
Cardano specific type.).
- `LocalRootPeers` now has `extraFlags` type parameter.
- `daForkPolicy` field added to `Ouroboros.Network.P2P.ArgumentsExtra`.

### Non-breaking changes

Expand Down
4 changes: 2 additions & 2 deletions ouroboros-network/io-tests/Test/Ouroboros/Network/Pipe.hs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ demo chain0 updates = do
serverBearer <- Mx.getBearer Mx.makePipeChannelBearer (-1) activeTracer chan2

_ <- async $ do
clientMux <- Mx.new (toMiniProtocolInfos consumerApp)
clientMux <- Mx.new (toMiniProtocolInfos (\_ _ -> Nothing) consumerApp)
let initCtx = MinimalInitiatorContext (ConnectionId "consumer" "producer")
resOps <- sequence
[ Mx.runMiniProtocol
Expand All @@ -223,7 +223,7 @@ demo chain0 updates = do
wait aid

_ <- async $ do
serverMux <- Mx.new (toMiniProtocolInfos producerApp)
serverMux <- Mx.new (toMiniProtocolInfos (\_ _ -> Nothing) producerApp)
let respCtx = ResponderContext (ConnectionId "consumer" "producer")
resOps <- sequence
[ Mx.runMiniProtocol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import System.Random (StdGen, split)

import Network.DNS (Domain, TTL)

import Ouroboros.Network.Mux (noBindForkPolicy)
import Ouroboros.Network.Protocol.Handshake (HandshakeArguments (..))
import Ouroboros.Network.Protocol.Handshake.Codec (VersionDataCodec (..),
noTimeLimitsHandshake, timeLimitsHandshake)
Expand Down Expand Up @@ -485,6 +486,7 @@ run blockGeneratorArgs limits ni na
, P2P.daPeerChurnGovernor = peerChurnGovernor
, P2P.daPeerSelectionGovernorArgs = psArgs
, P2P.daPeerSelectionStateToExtraCounters = psToExtraCounters
, P2P.daMuxForkPolicy = noBindForkPolicy
}

appArgs :: Node.AppArgs extraAPI BlockHeader Block m
Expand Down
4 changes: 2 additions & 2 deletions ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Mux.hs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ demo chain0 updates delay = do
}

clientAsync <- async $ do
clientMux <- Mx.new (toMiniProtocolInfos consumerApp)
clientMux <- Mx.new (toMiniProtocolInfos (\_ _ -> Nothing) consumerApp)
let initCtx = MinimalInitiatorContext (ConnectionId "consumer" "producer")
resOps <- sequence
[ Mx.runMiniProtocol
Expand All @@ -190,7 +190,7 @@ demo chain0 updates delay = do
wait aid

serverAsync <- async $ do
serverMux <- Mx.new (toMiniProtocolInfos producerApp)
serverMux <- Mx.new (toMiniProtocolInfos (\_ _ -> Nothing) producerApp)
let respCtx = ResponderContext (ConnectionId "producer" "consumer")
resOps <- sequence
[ Mx.runMiniProtocol
Expand Down
7 changes: 7 additions & 0 deletions ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,10 @@ data ArgumentsExtra extraState extraDebugState extraFlags extraPeers
-- | Provide extraChurnArgs to be passed to churn governor
--
, daExtraChurnArgs :: extraChurnArgs

-- | A fork policy for mini-protocol threads spawn by mux.
--
, daMuxForkPolicy :: ForkPolicy peeraddr
}


Expand Down Expand Up @@ -742,6 +746,7 @@ runM Interfaces
, daToExtraPeers
, daRequestPublicRootPeers
, daExtraChurnArgs
, daMuxForkPolicy
}
Applications
{ daApplicationInitiatorMode
Expand Down Expand Up @@ -825,6 +830,7 @@ runM Interfaces
makeConnectionHandler
dtLocalMuxTracer
SingResponderMode
noBindForkPolicy
diNtcHandshakeArguments
( ( \ (OuroborosApplication apps)
-> TemperatureBundle
Expand Down Expand Up @@ -1001,6 +1007,7 @@ runM Interfaces
makeConnectionHandler
dtMuxTracer
muxMode
daMuxForkPolicy
diNtnHandshakeArguments
versions
(mainThreadId, rethrowPolicy <> daRethrowPolicy)
Expand Down

0 comments on commit c20311c

Please sign in to comment.