From ed8161c73c4ad9eb4d263c9225a235e54d864af2 Mon Sep 17 00:00:00 2001 From: Karl Knutsson Date: Tue, 11 Feb 2025 08:37:44 +0100 Subject: [PATCH] add a writeMany option for batch sending of SDUs Permit sending of multiple SDUs through a single call to sendMany for Socket bearers Bearers without vector IO support emulate it through multiple calls to write. --- network-mux/bench/socket_read_write/Main.hs | 115 ++++++++++++++---- network-mux/src/Network/Mux/Bearer.hs | 7 +- .../Network/Mux/Bearer/AttenuatedChannel.hs | 12 +- .../src/Network/Mux/Bearer/NamedPipe.hs | 13 +- network-mux/src/Network/Mux/Bearer/Pipe.hs | 14 ++- network-mux/src/Network/Mux/Bearer/Queues.hs | 11 +- network-mux/src/Network/Mux/Bearer/Socket.hs | 43 ++++++- network-mux/src/Network/Mux/Egress.hs | 46 +++++-- network-mux/src/Network/Mux/Types.hs | 4 + .../Ouroboros/Network/ConnectionManager.hs | 4 +- 10 files changed, 215 insertions(+), 54 deletions(-) diff --git a/network-mux/bench/socket_read_write/Main.hs b/network-mux/bench/socket_read_write/Main.hs index 9c81cdec8e7..32ab0964f4e 100644 --- a/network-mux/bench/socket_read_write/Main.hs +++ b/network-mux/bench/socket_read_write/Main.hs @@ -4,14 +4,15 @@ import Control.Exception (bracket) import Control.Concurrent.Class.MonadSTM.Strict +import Data.Functor (void) import Control.Monad (forever, replicateM_) import Control.Monad.Class.MonadAsync import Control.Monad.Class.MonadTimer.SI import Control.Tracer import Data.Int -import qualified Network.Socket as Socket +import Network.Socket qualified as Socket import Network.Socket (Socket) -import qualified Data.ByteString.Lazy as BL +import Data.ByteString.Lazy qualified as BL import Test.Tasty.Bench import Network.Mux.Bearer @@ -19,6 +20,8 @@ import Network.Mux import Network.Mux.Types import Network.Mux.Channel +import Network.Mux.Timeout (withTimeoutSerial) + sduTimeout :: DiffTime sduTimeout = 10 @@ -28,6 +31,9 @@ numberOfPackets = 100000 totalPayloadLen :: Int64 -> Int64 totalPayloadLen sndSize = sndSize * numberOfPackets +-- | Run a client that connects to the specified addr. +-- Signals the message sndSize to the server by writing it +-- in the provided TMVar. readBenchmark :: StrictTMVar IO Int64 -> Int64 -> Socket.SockAddr -> IO () readBenchmark sndSizeV sndSize addr = do bracket @@ -51,48 +57,103 @@ readBenchmark sndSizeV sndSize addr = do doRead maxData chan (cnt + BL.length msg) Nothing -> error "doRead: nullread" - --- Start the server in a separate thread +-- | Run a server that accept connections on `ad`. startServer :: StrictTMVar IO Int64 -> Socket -> IO () startServer sndSizeV ad = forever $ do - (sd, _) <- Socket.accept ad bearer <- getBearer makeSocketBearer sduTimeout nullTracer sd sndSize <- atomically $ takeTMVar sndSizeV let chan = muxBearerAsChannel bearer (MiniProtocolNum 42) ResponderDir payload = BL.replicate sndSize 0xa5 - -- maxData = totalPayloadLen bearer maxData = totalPayloadLen sndSize numberOfSdus = fromIntegral $ maxData `div` sndSize replicateM_ numberOfSdus $ do send chan payload +-- | Like startServer but it uses the `writeMany` function +-- for vector IO. +startServerMany :: StrictTMVar IO Int64 -> Socket -> IO () +startServerMany sndSizeV ad = forever $ do + (sd, _) <- Socket.accept ad + bearer <- getBearer makeSocketBearer sduTimeout nullTracer sd + sndSize <- atomically $ takeTMVar sndSizeV + + let maxData = totalPayloadLen sndSize + numberOfSdus = fromIntegral $ maxData `div` sndSize + numberOfCalls = numberOfSdus `div` 10 + runtSdus = numberOfSdus `mod` 10 + + withTimeoutSerial $ \timeoutFn -> do + replicateM_ numberOfCalls $ do + let sdus = replicate 10 $ wrap $ BL.replicate sndSize 0xa5 + void $ writeMany bearer timeoutFn sdus + if runtSdus > 0 + then do + let sdus = replicate runtSdus $ wrap $ BL.replicate sndSize 0xa5 + void $ writeMany bearer timeoutFn sdus + else return () + + where + -- wrap a 'ByteString' as 'MuxSDU' + wrap :: BL.ByteString -> MuxSDU + wrap blob = MuxSDU { + -- it will be filled when the 'MuxSDU' is send by the 'bearer' + msHeader = MuxSDUHeader { + mhTimestamp = RemoteClockModel 0, + mhNum = MiniProtocolNum 42, + mhDir = ResponderDir, + mhLength = fromIntegral $ BL.length blob + }, + msBlob = blob + } + +setupServer :: Socket -> IO Socket.SockAddr +setupServer ad = do + muxAddress:_ <- Socket.getAddrInfo Nothing (Just "127.0.0.1") (Just "0") + Socket.setSocketOption ad Socket.ReuseAddr 1 + Socket.bind ad (Socket.addrAddress muxAddress) + addr <- Socket.getSocketName ad + Socket.listen ad 3 + + return addr + -- Main function to run the benchmarks main :: IO () main = do - -- Start the server in a separate thread - bracket - (Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol) - Socket.close - (\ad -> do - sndSizeV <- newEmptyTMVarIO - muxAddress:_ <- Socket.getAddrInfo Nothing (Just "127.0.0.1") (Just "0") - Socket.setSocketOption ad Socket.ReuseAddr 1 - Socket.bind ad (Socket.addrAddress muxAddress) - addr <- Socket.getSocketName ad - Socket.listen ad 3 - - withAsync (startServer sndSizeV ad) $ \said -> do + (do + ad1 <- Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol + ad2 <- Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol - defaultMain [ - -- Suggested Max SDU size for Socket bearer - bench "Read/Write Benchmark 12288 byte SDUs" $ nfIO $ readBenchmark sndSizeV 12288 addr - -- Payload size for ChainSync's RequestNext - , bench "Read/Write Benchmark 914 byte SDUs" $ nfIO $ readBenchmark sndSizeV 914 addr + return (ad1, ad2) + ) + (\(ad1, ad2) -> do + Socket.close ad1 + Socket.close ad2 + ) + (\(ad1, ad2) -> do + sndSizeV <- newEmptyTMVarIO + sndSizeMV <- newEmptyTMVarIO + addr <- setupServer ad1 + addrM <- setupServer ad2 + + withAsync (startServer sndSizeV ad1) $ \said -> do + withAsync (startServerMany sndSizeMV ad2) $ \saidM -> do + + defaultMain [ + -- Suggested Max SDU size for Socket bearer + bench "Read/Write Benchmark 12288 byte SDUs" $ nfIO $ readBenchmark sndSizeV 12288 addr + -- Payload size for ChainSync's RequestNext + , bench "Read/Write Benchmark 914 byte SDUs" $ nfIO $ readBenchmark sndSizeV 914 addr -- Payload size for ChainSync's RequestNext - , bench "Read/Write Benchmark 10 byte SDUs" $ nfIO $ readBenchmark sndSizeV 10 addr - ] - cancel said + , bench "Read/Write Benchmark 10 byte SDUs" $ nfIO $ readBenchmark sndSizeV 10 addr + + -- Send batches of SDUs at the same time + , bench "Read/Write-Many Benchmark 12288 byte SDUs" $ nfIO $ readBenchmark sndSizeMV 12288 addrM + , bench "Read/Write-Many Benchmark 914 byte SDUs" $ nfIO $ readBenchmark sndSizeMV 914 addrM + , bench "Read/Write-Many Benchmark 10 byte SDUs" $ nfIO $ readBenchmark sndSizeMV 10 addrM + ] + cancel said + cancel saidM ) diff --git a/network-mux/src/Network/Mux/Bearer.hs b/network-mux/src/Network/Mux/Bearer.hs index e1979cd89de..61a3a9e1c5e 100644 --- a/network-mux/src/Network/Mux/Bearer.hs +++ b/network-mux/src/Network/Mux/Bearer.hs @@ -23,7 +23,7 @@ import Control.Monad.Class.MonadTime.SI import Control.Tracer (Tracer) import Data.ByteString.Lazy qualified as BL -import Network.Socket (Socket) +import Network.Socket (getSocketOption, SocketOption (..), Socket) #if defined(mingw32_HOST_OS) import System.Win32 (HANDLE) #endif @@ -58,10 +58,11 @@ pureBearer f = \sduTimeout tr fd -> pure (f sduTimeout tr fd) makeSocketBearer :: MakeBearer IO Socket makeSocketBearer = MakeBearer $ (\sduTimeout tr fd -> do readBuffer <- newTVarIO BL.empty - return $ socketAsMuxBearer size readBuffer bufSize sduTimeout tr fd) + batch <- getSocketOption fd SendBuffer + return $ socketAsMuxBearer size batch readBuffer bufSize sduTimeout tr fd) where size = SDUSize 12_288 - bufSize = 16*1024 + bufSize = 16_384 makePipeChannelBearer :: MakeBearer IO PipeChannel makePipeChannelBearer = MakeBearer $ pureBearer (\_ -> pipeAsMuxBearer size) diff --git a/network-mux/src/Network/Mux/Bearer/AttenuatedChannel.hs b/network-mux/src/Network/Mux/Bearer/AttenuatedChannel.hs index b060c28aad9..b6d6f714304 100644 --- a/network-mux/src/Network/Mux/Bearer/AttenuatedChannel.hs +++ b/network-mux/src/Network/Mux/Bearer/AttenuatedChannel.hs @@ -260,8 +260,10 @@ attenuationChannelAsMuxBearer :: forall m. -> MuxBearer m attenuationChannelAsMuxBearer sduSize sduTimeout muxTracer chan = MuxBearer { - read = readMux, - write = writeMux, + read = readMux, + write = writeMux, + writeMany = writeMuxMany, + batchSize = fromIntegral $ getSDUSize sduSize, sduSize } where @@ -297,6 +299,12 @@ attenuationChannelAsMuxBearer sduSize sduTimeout muxTracer chan = traceWith muxTracer MuxTraceSendEnd return ts + writeMuxMany :: TimeoutFn m -> [MuxSDU] -> m Time + writeMuxMany timeoutFn sdus = do + ts <- getMonotonicTime + mapM_ (writeMux timeoutFn) sdus + return ts + -- -- Trace -- diff --git a/network-mux/src/Network/Mux/Bearer/NamedPipe.hs b/network-mux/src/Network/Mux/Bearer/NamedPipe.hs index f4fba4ebd8b..b6e3f169766 100644 --- a/network-mux/src/Network/Mux/Bearer/NamedPipe.hs +++ b/network-mux/src/Network/Mux/Bearer/NamedPipe.hs @@ -35,9 +35,10 @@ namedPipeAsBearer :: Mx.SDUSize -> MuxBearer IO namedPipeAsBearer sduSize tracer h = Mx.MuxBearer { - Mx.read = readNamedPipe, - Mx.write = writeNamedPipe, - Mx.sduSize = sduSize + Mx.read = readNamedPipe, + Mx.write = writeNamedPipe, + Mx.writeMany = writeNamedPipeMany, + Mx.sduSize = sduSize } where readNamedPipe :: Mx.TimeoutFn IO -> IO (Mx.MuxSDU, Time) @@ -81,3 +82,9 @@ namedPipeAsBearer sduSize tracer h = `catch` Mx.handleIOException "writeHandle errored" traceWith tracer Mx.MuxTraceSendEnd return ts + + writeNamedPipeMany :: Mx.TimeoutFn IO -> [Mx.MuxSDU] -> IO Time + writeNamedPipeMany timeoutFn sdus = do + ts <- getMonotonicTime + mapM_ (writeNamedPipe timeoutFn) sdus + return ts diff --git a/network-mux/src/Network/Mux/Bearer/Pipe.hs b/network-mux/src/Network/Mux/Bearer/Pipe.hs index 9fe601257e6..988798c4bb2 100644 --- a/network-mux/src/Network/Mux/Bearer/Pipe.hs +++ b/network-mux/src/Network/Mux/Bearer/Pipe.hs @@ -75,9 +75,11 @@ pipeAsMuxBearer -> MuxBearer IO pipeAsMuxBearer sduSize tracer channel = Mx.MuxBearer { - Mx.read = readPipe, - Mx.write = writePipe, - Mx.sduSize = sduSize + Mx.read = readPipe, + Mx.write = writePipe, + Mx.writeMany = writePipeMany, + Mx.sduSize = sduSize, + Mx.batchSize = fromIntegral $ Mx.getSDUSize sduSize } where readPipe :: Mx.TimeoutFn IO -> IO (Mx.MuxSDU, Time) @@ -117,3 +119,9 @@ pipeAsMuxBearer sduSize tracer channel = traceWith tracer Mx.MuxTraceSendEnd return ts + writePipeMany :: Mx.TimeoutFn IO -> [Mx.MuxSDU] -> IO Time + writePipeMany timeoutFn sdus = do + ts <- getMonotonicTime + mapM_ (writePipe timeoutFn) sdus + return ts + diff --git a/network-mux/src/Network/Mux/Bearer/Queues.hs b/network-mux/src/Network/Mux/Bearer/Queues.hs index 81156268103..c82f7d1859b 100644 --- a/network-mux/src/Network/Mux/Bearer/Queues.hs +++ b/network-mux/src/Network/Mux/Bearer/Queues.hs @@ -42,7 +42,10 @@ queueChannelAsMuxBearer sduSize tracer QueueChannel { writeQueue, readQueue } = Mx.MuxBearer { Mx.read = readMux, Mx.write = writeMux, - Mx.sduSize = sduSize + Mx.writeMany = writeMuxMany, + Mx.sduSize = sduSize, + -- KK XXXMx.batchSize = 2 * (fromIntegral $ Mx.getSDUSize sduSize) + Mx.batchSize = (fromIntegral $ Mx.getSDUSize sduSize) } where readMux :: Mx.TimeoutFn m -> m (Mx.MuxSDU, Time) @@ -69,3 +72,9 @@ queueChannelAsMuxBearer sduSize tracer QueueChannel { writeQueue, readQueue } = traceWith tracer Mx.MuxTraceSendEnd return ts + writeMuxMany :: Mx.TimeoutFn m -> [Mx.MuxSDU] -> m Time + writeMuxMany timeoutFn sdus = do + ts <- getMonotonicTime + mapM_ (writeMux timeoutFn) sdus + return ts + diff --git a/network-mux/src/Network/Mux/Bearer/Socket.hs b/network-mux/src/Network/Mux/Bearer/Socket.hs index 67014ba0610..523f53e061f 100644 --- a/network-mux/src/Network/Mux/Bearer/Socket.hs +++ b/network-mux/src/Network/Mux/Bearer/Socket.hs @@ -19,6 +19,7 @@ import Control.Monad.Class.MonadTimer.SI hiding (timeout) import Network.Socket qualified as Socket #if !defined(mingw32_HOST_OS) import Network.Socket.ByteString.Lazy qualified as Socket (recv, sendAll) +import Network.Socket.ByteString qualified as Socket (sendMany) #else import System.Win32.Async.Socket.ByteString.Lazy qualified as Win32.Async #endif @@ -46,17 +47,20 @@ import Network.Mux.TCPInfo (SocketOption (TCPInfoSocketOption)) -- socketAsMuxBearer :: Mx.SDUSize + -> Int -> StrictTVar IO BL.ByteString -> Int64 -> DiffTime -> Tracer IO Mx.MuxTrace -> Socket.Socket -> MuxBearer IO -socketAsMuxBearer sduSize readBuffer readBufferSize sduTimeout tracer sd = +socketAsMuxBearer sduSize batchSize readBuffer readBufferSize sduTimeout tracer sd = Mx.MuxBearer { - Mx.read = readSocket, - Mx.write = writeSocket, - Mx.sduSize = sduSize + Mx.read = readSocket, + Mx.write = writeSocket, + Mx.writeMany = writeSocketMany, + Mx.sduSize = sduSize, + Mx.batchSize = batchSize } where hdrLenght = 8 @@ -166,3 +170,34 @@ socketAsMuxBearer sduSize readBuffer readBufferSize sduTimeout tracer sd = #endif return ts + writeSocketMany :: Mx.TimeoutFn IO -> [Mx.MuxSDU] -> IO Time +#if defined(mingw32_HOST_OS) + writeSocketMany timeout sdus = do + ts <- getMonotonicTime + mapM_ (writeSocket timeout) sdus + return ts +#else + writeSocketMany timeout sdus = do + ts <- getMonotonicTime + let ts32 = Mx.timestampMicrosecondsLow32Bits ts + buf = map (Mx.encodeMuxSDU . + (\sdu -> Mx.setTimestamp sdu (Mx.RemoteClockModel ts32))) sdus + r <- timeout ((fromIntegral $ length sdus) * sduTimeout) $ + Socket.sendMany sd (concatMap BL.toChunks buf) + `catch` Mx.handleIOException "sendAll errored" + case r of + Nothing -> do + traceWith tracer Mx.MuxTraceSDUWriteTimeoutException + throwIO $ Mx.MuxError Mx.MuxSDUWriteTimeout "Mux SDU Timeout" + Just _ -> do + traceWith tracer Mx.MuxTraceSendEnd +#if defined(linux_HOST_OS) && defined(MUX_TRACE_TCPINFO) + -- If it was possible to detect if the TraceTCPInfo was + -- enable we wouldn't have to hide the getSockOpt + -- syscall in this ifdef. Instead we would only call it if + -- we knew that the information would be traced. + tcpi <- Socket.getSockOpt sd TCPInfoSocketOption + traceWith tracer $ Mx.TraceTCPInfo tcpi (sum $ map (Mx.mhLength . Mx.msHeader) sdus) +#endif + return ts +#endif diff --git a/network-mux/src/Network/Mux/Egress.hs b/network-mux/src/Network/Mux/Egress.hs index 3e477867bf1..f50a8645d04 100644 --- a/network-mux/src/Network/Mux/Egress.hs +++ b/network-mux/src/Network/Mux/Egress.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE BangPatterns #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE NamedFieldPuns #-} @@ -131,7 +132,8 @@ newtype Wanton m = Wanton { want :: StrictTVar m BL.ByteString } -- that each active demand gets a `maxSDU`s work of data processed -- each time it gets to the front of the queue muxer - :: ( MonadAsync m + :: forall m void. + ( MonadAsync m , MonadFork m , MonadMask m , MonadThrow (STM m) @@ -140,11 +142,36 @@ muxer => EgressQueue m -> MuxBearer m -> m void -muxer egressQueue bearer = +muxer egressQueue MuxBearer { writeMany, sduSize, batchSize } = withTimeoutSerial $ \timeout -> forever $ do TLSRDemand mpc md d <- atomically $ readTBQueue egressQueue - processSingleWanton egressQueue bearer timeout mpc md d + sdu <- processSingleWanton egressQueue sduSize mpc md d + sdus <- buildBatch [sdu] (sduLength sdu) + writeMany timeout (reverse sdus) + + where + maxSDUsPerBatch :: Int + maxSDUsPerBatch = 2 + + sduLength :: MuxSDU -> Int + sduLength sdu = 8 + fromIntegral (msLength sdu) + + -- Build a batch of SDUs to submit in one go to the bearer. + -- The egress queue is still processed one SDU at the time + -- to ensure that we don't cause starvation. + -- The batch size is either limited by the bearer + -- (e.g the SO_SNDBUF for Socket) or number of SPUs. + -- + buildBatch sdus _ | length sdus >= maxSDUsPerBatch = return sdus + buildBatch sdus sdusLength | sdusLength >= batchSize = return sdus + buildBatch sdus !sdusLength = do + demand_m <- atomically $ tryReadTBQueue egressQueue + case demand_m of + Just (TLSRDemand mpc md d) -> do + sdu <- processSingleWanton egressQueue sduSize mpc md d + buildBatch (sdu:sdus) (sdusLength + sduLength sdu) + Nothing -> return sdus -- | Pull a `maxSDU`s worth of data out out the `Wanton` - if there is -- data remaining requeue the `TranslocationServiceRequest` (this @@ -152,18 +179,17 @@ muxer egressQueue bearer = -- first. processSingleWanton :: MonadSTM m => EgressQueue m - -> MuxBearer m - -> TimeoutFn m + -> SDUSize -> MiniProtocolNum -> MiniProtocolDir -> Wanton m - -> m () -processSingleWanton egressQueue MuxBearer { write, sduSize } - timeout mpc md wanton = do + -> m MuxSDU +processSingleWanton egressQueue (SDUSize sduSize) + mpc md wanton = do blob <- atomically $ do -- extract next SDU d <- readTVar (want wanton) - let (frag, rest) = BL.splitAt (fromIntegral (getSDUSize sduSize)) d + let (frag, rest) = BL.splitAt (fromIntegral sduSize) d -- if more to process then enqueue remaining work if BL.null rest then writeTVar (want wanton) BL.empty @@ -184,5 +210,5 @@ processSingleWanton egressQueue MuxBearer { write, sduSize } }, msBlob = blob } - void $ write timeout sdu + return sdu --paceTransmission tNow diff --git a/network-mux/src/Network/Mux/Types.hs b/network-mux/src/Network/Mux/Types.hs index 4a2b93cf300..70767f0d39a 100644 --- a/network-mux/src/Network/Mux/Types.hs +++ b/network-mux/src/Network/Mux/Types.hs @@ -213,10 +213,14 @@ msLength = mhLength . msHeader data MuxBearer m = MuxBearer { -- | Timestamp and send MuxSDU. write :: TimeoutFn m -> MuxSDU -> m Time + -- | Timestamp and send many MuxSDU. + , writeMany :: TimeoutFn m -> [MuxSDU] -> m Time -- | Read a MuxSDU , read :: TimeoutFn m -> m (MuxSDU, Time) -- | Return a suitable MuxSDU payload size. , sduSize :: SDUSize + -- | Retirn a suitable batch size + , batchSize :: Int } newtype SDUSize = SDUSize { getSDUSize :: Word16 } diff --git a/ouroboros-network-framework/sim-tests/Test/Ouroboros/Network/ConnectionManager.hs b/ouroboros-network-framework/sim-tests/Test/Ouroboros/Network/ConnectionManager.hs index baa673a57f9..61e2c989d27 100644 --- a/ouroboros-network-framework/sim-tests/Test/Ouroboros/Network/ConnectionManager.hs +++ b/ouroboros-network-framework/sim-tests/Test/Ouroboros/Network/ConnectionManager.hs @@ -345,7 +345,9 @@ makeFDBearer = MakeBearer $ \_ _ _ -> return MuxBearer { write = \_ _ -> getMonotonicTime, read = \_ -> forever (threadDelay 3600), - sduSize = SDUSize 1500 + writeMany = \_ _ -> getMonotonicTime, + sduSize = SDUSize 1500, + batchSize = 1500 } -- | We only keep exceptions here which should not be handled by the test