From c20311c5c1e063d8c9ec67a5d45e5bffd49b1b18 Mon Sep 17 00:00:00 2001 From: Marcin Szamotulski Date: Thu, 6 Feb 2025 18:10:21 +0100 Subject: [PATCH] ouroboros-network-framework: added ForkPolicy Adding `ForkPolicy` callback rather than extending `MiniProtocol` type (as it is done in `network-mux`'s `MiniProtocolInfo`). This way we'll be able to hide the policy inside `ouroboros-network` and not leak it to `ouroboros-consensus` where the list of `MiniProtocol`s is constructed. --- .../demo/connection-manager.hs | 1 + .../io-tests/Test/Ouroboros/Network/Socket.hs | 2 +- .../Test/Ouroboros/Network/Socket.hs | 2 +- .../Ouroboros/Network/ConnectionHandler.hs | 7 +- .../src/Ouroboros/Network/Mux.hs | 65 ++++++++++++++++--- .../src/Ouroboros/Network/Socket.hs | 5 +- .../Network/ConnectionManager/Experiments.hs | 2 + ouroboros-network/CHANGELOG.md | 1 + .../io-tests/Test/Ouroboros/Network/Pipe.hs | 4 +- .../Test/Ouroboros/Network/Diffusion/Node.hs | 2 + .../Test/Ouroboros/Network/Mux.hs | 4 +- .../src/Ouroboros/Network/Diffusion/P2P.hs | 7 ++ 12 files changed, 84 insertions(+), 18 deletions(-) diff --git a/ouroboros-network-framework/demo/connection-manager.hs b/ouroboros-network-framework/demo/connection-manager.hs index 7600d97cd4c..4c891308b4f 100644 --- a/ouroboros-network-framework/demo/connection-manager.hs +++ b/ouroboros-network-framework/demo/connection-manager.hs @@ -258,6 +258,7 @@ withBidirectionalConnectionManager snocket makeBearer socket (makeConnectionHandler muxTracer SingInitiatorResponderMode + noBindForkPolicy HandshakeArguments { -- TraceSendRecv haHandshakeTracer = ("handshake",) `contramap` debugTracer, diff --git a/ouroboros-network-framework/io-tests/Test/Ouroboros/Network/Socket.hs b/ouroboros-network-framework/io-tests/Test/Ouroboros/Network/Socket.hs index 12ce4120fc6..dc3bad55e9e 100644 --- a/ouroboros-network-framework/io-tests/Test/Ouroboros/Network/Socket.hs +++ b/ouroboros-network-framework/io-tests/Test/Ouroboros/Network/Socket.hs @@ -354,7 +354,7 @@ prop_socket_recv_error f rerr = _ <- async $ do threadDelay 0.1 atomically $ putTMVar lock () - mux <- Mx.new (toMiniProtocolInfos app) + mux <- Mx.new (toMiniProtocolInfos (\_ _ -> Nothing) app) let respCtx = ResponderContext connectionId resOps <- sequence [ Mx.runMiniProtocol diff --git a/ouroboros-network-framework/sim-tests/Test/Ouroboros/Network/Socket.hs b/ouroboros-network-framework/sim-tests/Test/Ouroboros/Network/Socket.hs index 260eaddc8f7..d467e3bcbc5 100644 --- a/ouroboros-network-framework/sim-tests/Test/Ouroboros/Network/Socket.hs +++ b/ouroboros-network-framework/sim-tests/Test/Ouroboros/Network/Socket.hs @@ -356,7 +356,7 @@ prop_socket_recv_error f rerr = _ <- async $ do threadDelay 0.1 atomically $ putTMVar lock () - mux <- Mx.new (toMiniProtocolInfos app) + mux <- Mx.new (toMiniProtocolInfos (\_ _ -> Nothing) app) let respCtx = ResponderContext connectionId resOps <- sequence [ Mx.runMiniProtocol diff --git a/ouroboros-network-framework/src/Ouroboros/Network/ConnectionHandler.hs b/ouroboros-network-framework/src/Ouroboros/Network/ConnectionHandler.hs index 9eadd1ffbb0..d2720afb16b 100644 --- a/ouroboros-network-framework/src/Ouroboros/Network/ConnectionHandler.hs +++ b/ouroboros-network-framework/src/Ouroboros/Network/ConnectionHandler.hs @@ -68,6 +68,7 @@ import Ouroboros.Network.Protocol.Handshake import Ouroboros.Network.Protocol.Handshake.Version qualified as Handshake import Ouroboros.Network.RethrowPolicy + -- | We place an upper limit of `30s` on the time we wait on receiving an SDU. -- There is no upper bound on the time we wait when waiting for a new SDU. -- This makes it possible for mini-protocols to use timeouts that are larger @@ -223,6 +224,7 @@ makeConnectionHandler ) => Tracer m (Mx.WithBearer (ConnectionId peerAddr) Mx.Trace) -> SingMuxMode muxMode + -> ForkPolicy peerAddr -- ^ describe whether this is outbound or inbound connection, and bring -- evidence that we can use mux with it. -> HandshakeArguments (ConnectionId peerAddr) versionNumber versionData m @@ -233,6 +235,7 @@ makeConnectionHandler -- exception to that thread, when trying to terminate the process. -> MuxConnectionHandler muxMode socket initiatorCtx responderCtx peerAddr versionNumber versionData ByteString m a b makeConnectionHandler muxTracer singMuxMode + forkPolicy handshakeArguments versionedApplication (mainThreadId, rethrowPolicy) = @@ -323,7 +326,7 @@ makeConnectionHandler muxTracer singMuxMode <$> newTVarIO Continue <*> newTVarIO Continue <*> newTVarIO Continue - mux <- Mx.new (mkMiniProtocolInfos app) + mux <- Mx.new (mkMiniProtocolInfos (runForkPolicy forkPolicy remoteAddress) app) let !handle = Handle { hMux = mux, hMuxBundle = app, @@ -392,7 +395,7 @@ makeConnectionHandler muxTracer singMuxMode <$> newTVarIO Continue <*> newTVarIO Continue <*> newTVarIO Continue - mux <- Mx.new (mkMiniProtocolInfos app) + mux <- Mx.new (mkMiniProtocolInfos (runForkPolicy forkPolicy remoteAddress) app) let !handle = Handle { hMux = mux, diff --git a/ouroboros-network-framework/src/Ouroboros/Network/Mux.hs b/ouroboros-network-framework/src/Ouroboros/Network/Mux.hs index 0803116a6cb..ceec14bddf9 100644 --- a/ouroboros-network-framework/src/Ouroboros/Network/Mux.hs +++ b/ouroboros-network-framework/src/Ouroboros/Network/Mux.hs @@ -37,6 +37,9 @@ module Ouroboros.Network.Mux , MiniProtocolWithMinimalCtx , MiniProtocolNum (..) , MiniProtocolLimits (..) + , ForkPolicy (..) + , noBindForkPolicy + , responderForkPolicy -- * MiniProtocol bundle , OuroborosBundle , OuroborosBundleWithExpandedCtx @@ -60,6 +63,7 @@ import Control.Monad.Class.MonadThrow import Control.Tracer (Tracer) import Data.Foldable (fold) +import Data.Hashable import Data.Kind (Type) import Data.Void (Void) @@ -72,6 +76,7 @@ import Network.TypedProtocol.Stateful.Peer qualified as Stateful import Network.Mux qualified as Mux import Network.Mux.Types (MiniProtocolInfo, MiniProtocolLimits, MiniProtocolNum (..)) +import Network.Mux.Types qualified as Mux import Ouroboros.Network.Channel import Ouroboros.Network.Context (ExpandedInitiatorContext, @@ -245,8 +250,10 @@ data MiniProtocol (mode :: Mux.Mode) initiatorCtx responderCtx bytes m a b = -- ^ mini-protocol callback(s) } -mkMiniProtocolInfo :: MiniProtocol mode initiatorCtx responderCtx bytes m a b -> [MiniProtocolInfo mode] -mkMiniProtocolInfo MiniProtocol { +mkMiniProtocolInfo :: ForkPolicyCb + -> MiniProtocol mode initiatorCtx responderCtx bytes m a b + -> [MiniProtocolInfo mode] +mkMiniProtocolInfo forkPolicy MiniProtocol { miniProtocolNum, miniProtocolLimits, miniProtocolRun @@ -256,7 +263,9 @@ mkMiniProtocolInfo MiniProtocol { Mux.miniProtocolNum, Mux.miniProtocolDir = dir, Mux.miniProtocolLimits, - Mux.miniProtocolCapability = Nothing + Mux.miniProtocolCapability = + forkPolicy miniProtocolNum + (Mux.protocolDirEnum dir) } | dir <- case miniProtocolRun of InitiatorProtocolOnly{} -> [ Mux.InitiatorDirectionOnly ] @@ -431,10 +440,48 @@ fromOuroborosBundle :: OuroborosBundle mode initiatorCtx responderCtx bytes fromOuroborosBundle = OuroborosApplication . fold -toMiniProtocolInfos :: OuroborosApplication mode initiatorCtx responderCtx bytes m a b +-- | `ForkPolicy` guards to which capability each mini-protocol is bound. +-- +type ForkPolicyCb = Mux.MiniProtocolNum + -> Mux.MiniProtocolDir + -> Maybe Int + +-- | Extension of a `ForkPolicyCb` used by `ouroboros-network-framework` outside +-- of this module. +-- +newtype ForkPolicy peerAddr = ForkPolicy { + runForkPolicy :: peerAddr -> ForkPolicyCb + } + + +-- | A `ForkPolicy` which does not bind mini-protocol threads to a given capability. +-- +noBindForkPolicy :: ForkPolicy peerAddr +noBindForkPolicy = ForkPolicy (\_ _ _ -> Nothing) + + +-- | A `ForkPolicy` which binds responders mini-protocols to lower capabilities. +-- +responderForkPolicy :: Hashable peerAddr + => Int -- ^ salt + -> Int -- ^ number of capabilities + -> ForkPolicy peerAddr +responderForkPolicy salt numCapabilities = ForkPolicy { + runForkPolicy = \peerAddr _ miniProtocolDir -> + case miniProtocolDir of + Mux.InitiatorDir -> Nothing + Mux.ResponderDir -> Just $ hashWithSalt salt peerAddr + `mod` max 1 (numCapabilities - 2) + + + } + + +toMiniProtocolInfos :: ForkPolicyCb + -> OuroborosApplication mode initiatorCtx responderCtx bytes m a b -> [MiniProtocolInfo mode] -toMiniProtocolInfos = - foldMap mkMiniProtocolInfo . getOuroborosApplication +toMiniProtocolInfos forkPolicy = + foldMap (mkMiniProtocolInfo forkPolicy) . getOuroborosApplication contramapInitiatorCtx :: (initiatorCtx' -> initiatorCtx) @@ -457,6 +504,8 @@ contramapInitiatorCtx f (OuroborosApplication ptcls) = OuroborosApplication -- | Make 'MiniProtocolBundle', which is used to create a mux interface with -- 'newMux'. -- -mkMiniProtocolInfos :: OuroborosBundle mode initiatorCtx responderCtx bytes m a b +mkMiniProtocolInfos :: ForkPolicyCb + -> OuroborosBundle mode initiatorCtx responderCtx bytes m a b -> [MiniProtocolInfo mode] -mkMiniProtocolInfos = foldMap (foldMap mkMiniProtocolInfo) +mkMiniProtocolInfos forkPolicy = foldMap (foldMap (mkMiniProtocolInfo forkPolicy)) + diff --git a/ouroboros-network-framework/src/Ouroboros/Network/Socket.hs b/ouroboros-network-framework/src/Ouroboros/Network/Socket.hs index 247ed78b9e2..ac33c3b6c46 100644 --- a/ouroboros-network-framework/src/Ouroboros/Network/Socket.hs +++ b/ouroboros-network-framework/src/Ouroboros/Network/Socket.hs @@ -437,7 +437,7 @@ connectToNodeWithMux' Right (HandshakeNegotiationResult app versionNumber agreedOptions) -> do traceWith muxTracer $ Mx.TraceHandshakeClientEnd (diffTime ts_end ts_start) bearer <- Mx.getBearer makeBearer sduTimeout muxTracer sd - mux <- Mx.new (toMiniProtocolInfos app) + mux <- Mx.new (toMiniProtocolInfos (runForkPolicy noBindForkPolicy remoteAddress) app) withAsync (Mx.run muxTracer mux bearer) $ \aid -> k connectionId versionNumber agreedOptions app mux aid @@ -611,7 +611,8 @@ beginConnection makeBearer muxTracer handshakeTracer handshakeCodec handshakeTim Right (HandshakeNegotiationResult (SomeResponderApplication app) versionNumber agreedOptions) -> do traceWith muxTracer' Mx.TraceHandshakeServerEnd bearer <- Mx.getBearer makeBearer sduTimeout muxTracer' sd - mux <- Mx.new (toMiniProtocolInfos app) + -- non-p2p: use `noBindForkPolicy` + mux <- Mx.new (toMiniProtocolInfos (runForkPolicy noBindForkPolicy remoteAddress) app) withAsync (Mx.run muxTracer' mux bearer) $ \aid -> void $ simpleMuxCallback connectionId versionNumber agreedOptions app mux aid diff --git a/ouroboros-network-framework/testlib/Test/Ouroboros/Network/ConnectionManager/Experiments.hs b/ouroboros-network-framework/testlib/Test/Ouroboros/Network/ConnectionManager/Experiments.hs index ac56d76f2cc..71b4904527c 100644 --- a/ouroboros-network-framework/testlib/Test/Ouroboros/Network/ConnectionManager/Experiments.hs +++ b/ouroboros-network-framework/testlib/Test/Ouroboros/Network/ConnectionManager/Experiments.hs @@ -305,6 +305,7 @@ withInitiatorOnlyConnectionManager name timeouts trTracer tracer stdGen snocket (makeConnectionHandler muxTracer SingInitiatorMode + noBindForkPolicy HandshakeArguments { -- TraceSendRecv haHandshakeTracer = (name,) `contramap` nullTracer, @@ -501,6 +502,7 @@ withBidirectionalConnectionManager name timeouts (makeConnectionHandler muxTracer SingInitiatorResponderMode + noBindForkPolicy HandshakeArguments { -- TraceSendRecv haHandshakeTracer = WithName name `contramap` nullTracer, diff --git a/ouroboros-network/CHANGELOG.md b/ouroboros-network/CHANGELOG.md index 7bdb0e0e5ae..9204adac7ac 100644 --- a/ouroboros-network/CHANGELOG.md +++ b/ouroboros-network/CHANGELOG.md @@ -179,6 +179,7 @@ (`PeerTrustable` was abstracted out, since this was a bootstrap peers Cardano specific type.). - `LocalRootPeers` now has `extraFlags` type parameter. +- `daForkPolicy` field added to `Ouroboros.Network.P2P.ArgumentsExtra`. ### Non-breaking changes diff --git a/ouroboros-network/io-tests/Test/Ouroboros/Network/Pipe.hs b/ouroboros-network/io-tests/Test/Ouroboros/Network/Pipe.hs index 48c512a97aa..0b92e3cd97f 100644 --- a/ouroboros-network/io-tests/Test/Ouroboros/Network/Pipe.hs +++ b/ouroboros-network/io-tests/Test/Ouroboros/Network/Pipe.hs @@ -198,7 +198,7 @@ demo chain0 updates = do serverBearer <- Mx.getBearer Mx.makePipeChannelBearer (-1) activeTracer chan2 _ <- async $ do - clientMux <- Mx.new (toMiniProtocolInfos consumerApp) + clientMux <- Mx.new (toMiniProtocolInfos (\_ _ -> Nothing) consumerApp) let initCtx = MinimalInitiatorContext (ConnectionId "consumer" "producer") resOps <- sequence [ Mx.runMiniProtocol @@ -223,7 +223,7 @@ demo chain0 updates = do wait aid _ <- async $ do - serverMux <- Mx.new (toMiniProtocolInfos producerApp) + serverMux <- Mx.new (toMiniProtocolInfos (\_ _ -> Nothing) producerApp) let respCtx = ResponderContext (ConnectionId "consumer" "producer") resOps <- sequence [ Mx.runMiniProtocol diff --git a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Diffusion/Node.hs b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Diffusion/Node.hs index 6fe2316971d..2b3597cfb8c 100644 --- a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Diffusion/Node.hs +++ b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Diffusion/Node.hs @@ -66,6 +66,7 @@ import System.Random (StdGen, split) import Network.DNS (Domain, TTL) +import Ouroboros.Network.Mux (noBindForkPolicy) import Ouroboros.Network.Protocol.Handshake (HandshakeArguments (..)) import Ouroboros.Network.Protocol.Handshake.Codec (VersionDataCodec (..), noTimeLimitsHandshake, timeLimitsHandshake) @@ -485,6 +486,7 @@ run blockGeneratorArgs limits ni na , P2P.daPeerChurnGovernor = peerChurnGovernor , P2P.daPeerSelectionGovernorArgs = psArgs , P2P.daPeerSelectionStateToExtraCounters = psToExtraCounters + , P2P.daMuxForkPolicy = noBindForkPolicy } appArgs :: Node.AppArgs extraAPI BlockHeader Block m diff --git a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Mux.hs b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Mux.hs index 1e061558bad..a48556b7085 100644 --- a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Mux.hs +++ b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Mux.hs @@ -165,7 +165,7 @@ demo chain0 updates delay = do } clientAsync <- async $ do - clientMux <- Mx.new (toMiniProtocolInfos consumerApp) + clientMux <- Mx.new (toMiniProtocolInfos (\_ _ -> Nothing) consumerApp) let initCtx = MinimalInitiatorContext (ConnectionId "consumer" "producer") resOps <- sequence [ Mx.runMiniProtocol @@ -190,7 +190,7 @@ demo chain0 updates delay = do wait aid serverAsync <- async $ do - serverMux <- Mx.new (toMiniProtocolInfos producerApp) + serverMux <- Mx.new (toMiniProtocolInfos (\_ _ -> Nothing) producerApp) let respCtx = ResponderContext (ConnectionId "producer" "consumer") resOps <- sequence [ Mx.runMiniProtocol diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs index fda448b03bc..76896a7a4ef 100644 --- a/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs @@ -368,6 +368,10 @@ data ArgumentsExtra extraState extraDebugState extraFlags extraPeers -- | Provide extraChurnArgs to be passed to churn governor -- , daExtraChurnArgs :: extraChurnArgs + + -- | A fork policy for mini-protocol threads spawn by mux. + -- + , daMuxForkPolicy :: ForkPolicy peeraddr } @@ -742,6 +746,7 @@ runM Interfaces , daToExtraPeers , daRequestPublicRootPeers , daExtraChurnArgs + , daMuxForkPolicy } Applications { daApplicationInitiatorMode @@ -825,6 +830,7 @@ runM Interfaces makeConnectionHandler dtLocalMuxTracer SingResponderMode + noBindForkPolicy diNtcHandshakeArguments ( ( \ (OuroborosApplication apps) -> TemperatureBundle @@ -1001,6 +1007,7 @@ runM Interfaces makeConnectionHandler dtMuxTracer muxMode + daMuxForkPolicy diNtnHandshakeArguments versions (mainThreadId, rethrowPolicy <> daRethrowPolicy)