From bbeb81130ec3eafd8ced81564cc8bd46d24aff08 Mon Sep 17 00:00:00 2001 From: Joris Dral Date: Fri, 15 Mar 2024 14:03:14 +0100 Subject: [PATCH 1/2] Use MutableByteArray as buffers, add manual keepAlive --- System/IO/BlockIO.hs | 45 +++++++++++++++++++++++++++++--------------- benchmark/Bench.hs | 31 +++++++++++++++--------------- blockio-uring.cabal | 8 +++++--- 3 files changed, 51 insertions(+), 33 deletions(-) diff --git a/System/IO/BlockIO.hs b/System/IO/BlockIO.hs index 342a9fe..827a097 100644 --- a/System/IO/BlockIO.hs +++ b/System/IO/BlockIO.hs @@ -34,8 +34,10 @@ import Data.Array.IArray import Data.Array.IO import Data.Array.Unboxed import Data.Coerce +import Data.Primitive.ByteArray import Control.Monad +import Control.Monad.Primitive import Control.Concurrent (forkIO) import Control.Concurrent.MVar import Control.Concurrent.QSemN @@ -141,9 +143,8 @@ closeIOCtx IOCtx {ioctxURing, ioctxCloseSync} = do URing.closeURing uring putMVar ioctxURing Nothing -data IOOp = IOOpRead !Fd !FileOffset !(Ptr Word8) !ByteCount - | IOOpWrite !Fd !FileOffset !(Ptr Word8) !ByteCount - deriving Show +data IOOp m = IOOpRead !Fd !FileOffset !(MutableByteArray (PrimState m)) !Int !ByteCount + | IOOpWrite !Fd !FileOffset !(MutableByteArray (PrimState m)) !Int !ByteCount newtype IOResult = IOResult_ URing.IOResult @@ -206,7 +207,7 @@ viewIOError (IOResult_ e) -- the target depth, fill it up to double again. This way there is always -- at least the target number in flight at once. -- -submitIO :: IOCtx -> [IOOp] -> IO [IOResult] +submitIO :: IOCtx -> [IOOp IO] -> IO [IOResult] submitIO IOCtx { ioctxQSemN, ioctxURing, @@ -219,11 +220,13 @@ submitIO IOCtx { waitQSemN ioctxQSemN (fromIntegral iobatchOpCount) iobatchIx <- readChan ioctxChanIOBatchIx iobatchCompletion <- newEmptyMVar + let iobatchKeepAlives = ioops writeChan ioctxChanIOBatch IOBatch { iobatchIx, iobatchOpCount, - iobatchCompletion + iobatchCompletion, + iobatchKeepAlives } withMVar ioctxURing $ \case Nothing -> throwIO closed @@ -232,11 +235,14 @@ submitIO IOCtx { sequence_ [ --print ioop >> case ioop of - IOOpRead fd off buf cnt -> - URing.prepareRead uring fd off buf cnt ioopid - - IOOpWrite fd off buf cnt -> - URing.prepareWrite uring fd off buf cnt ioopid + IOOpRead fd off buf bufOff cnt -> + URing.prepareRead uring fd off + (mutableByteArrayContents buf `plusPtr` bufOff) + cnt ioopid + IOOpWrite fd off buf bufOff cnt -> + URing.prepareWrite uring fd off + (mutableByteArrayContents buf `plusPtr` bufOff) + cnt ioopid | (ioop, ioopix) <- zip ioops [IOOpIx 0 ..] , let !ioopid = packIOOpId iobatchIx ioopix ] URing.submitIO uring @@ -247,7 +253,11 @@ submitIO IOCtx { data IOBatch = IOBatch { iobatchIx :: !IOBatchIx, iobatchOpCount :: !Word32, - iobatchCompletion :: MVar (UArray IOOpIx Int32) + iobatchCompletion :: MVar (UArray IOOpIx Int32), + -- | The list of I\/O operations is sent to the completion + -- thread so that the buffers are kept alive while the kernel + -- is using them. + iobatchKeepAlives :: [IOOp IO] } newtype IOBatchIx = IOBatchIx Word32 @@ -286,14 +296,16 @@ completionThread uring done maxc qsem chaniobatch chaniobatchix = do counts <- newArray iobatchixBounds (-1) results <- newArray iobatchixBounds invalidEntry completions <- newArray iobatchixBounds invalidEntry - collectCompletion counts results completions + keepAlives <- newArray iobatchixBounds invalidEntry + collectCompletion counts results completions keepAlives `finally` putMVar done () where collectCompletion :: IOUArray IOBatchIx Int -> IOArray IOBatchIx (IOUArray IOOpIx Int32) -> IOArray IOBatchIx (MVar (UArray IOOpIx Int32)) + -> IOArray IOBatchIx [IOOp IO] -> IO () - collectCompletion counts results completions = do + collectCompletion counts results completions keepAlives = do iocompletion <- URing.awaitIO uring let (URing.IOCompletion ioopid iores) = iocompletion unless (ioopid == URing.IOOpId maxBound) $ do @@ -311,12 +323,13 @@ completionThread uring done maxc qsem chaniobatch chaniobatchix = do writeArray counts iobatchix (-1) writeArray results iobatchix invalidEntry writeArray completions iobatchix invalidEntry + writeArray keepAlives iobatchix invalidEntry result' <- freeze result putMVar completion (result' :: UArray IOOpIx Int32) writeChan chaniobatchix iobatchix let !qrelease = rangeSize (bounds result') signalQSemN qsem qrelease - collectCompletion counts results completions + collectCompletion counts results completions keepAlives -- wait for single IO result -- if the count is positive, decrement and update result array @@ -330,7 +343,8 @@ completionThread uring done maxc qsem chaniobatch chaniobatchix = do IOBatch{ iobatchIx, iobatchOpCount, - iobatchCompletion + iobatchCompletion, + iobatchKeepAlives } <- readChan chaniobatch oldcount <- readArray counts iobatchIx assert (oldcount == (-1)) (return ()) @@ -338,6 +352,7 @@ completionThread uring done maxc qsem chaniobatch chaniobatchix = do result <- newArray (IOOpIx 0, IOOpIx (iobatchOpCount-1)) (-1) writeArray results iobatchIx result writeArray completions iobatchIx iobatchCompletion + writeArray keepAlives iobatchIx iobatchKeepAlives if iobatchIx == iobatchixNeeded then return $! fromIntegral iobatchOpCount else collectIOBatches iobatchixNeeded diff --git a/benchmark/Bench.hs b/benchmark/Bench.hs index 0df3290..db05dbf 100644 --- a/benchmark/Bench.hs +++ b/benchmark/Bench.hs @@ -1,8 +1,9 @@ -{-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE BangPatterns #-} +{- HLINT ignore "Use camelCase" -} module Main (main) where +import Data.Primitive import qualified Data.Set as Set import Control.Monad import Control.Exception @@ -100,20 +101,20 @@ main_highlevel filename = do ioctxConcurrencyLimit = 64 * 4 } blocks = zip [0..] (randomPermute rng [0..lastBlock]) - bracket (initIOCtx params) closeIOCtx $ \ioctx -> - allocaBytes (4096 * nbufs) $ \bufptr -> do - - before <- getCurrentTime - forConcurrently_ (groupsOfN 32 blocks) $ \batch -> - submitIO ioctx - [ IOOpRead fd blockoff bufptr' 4096 - | (i, block) <- batch - , let bufptr' = bufptr `plusPtr` ((i `mod` nbufs) * 4096) - blockoff = fromIntegral (block * 4096) - ] - after <- getCurrentTime - let total = lastBlock + 1 - report before after total + bracket (initIOCtx params) closeIOCtx $ \ioctx -> do + buf <- newPinnedByteArray (4096 * nbufs) + + before <- getCurrentTime + forConcurrently_ (groupsOfN 32 blocks) $ \batch -> + submitIO ioctx + [ IOOpRead fd blockoff buf bufOff 4096 + | (i, block) <- batch + , let bufOff = (i `mod` nbufs) * 4096 + blockoff = fromIntegral (block * 4096) + ] + after <- getCurrentTime + let total = lastBlock + 1 + report before after total report :: UTCTime -> UTCTime -> Int -> IO () report before after total = do diff --git a/blockio-uring.cabal b/blockio-uring.cabal index e4235cd..0f3150e 100644 --- a/blockio-uring.cabal +++ b/blockio-uring.cabal @@ -40,9 +40,10 @@ library System.IO.BlockIO.URingFFI build-depends: - , array ^>=0.5 - , base >=4.16 && <4.20 - , unix ^>=2.8 + , array ^>=0.5 + , base >=4.16 && <4.20 + , primitive ^>=0.9 + , unix ^>=2.8 pkgconfig-depends: liburing default-language: Haskell2010 @@ -58,6 +59,7 @@ benchmark bench , async , base , containers + , primitive , random , time , unix From bb4b65c5f403ffc0cb4dc8f385115bc547a73d6d Mon Sep 17 00:00:00 2001 From: Joris Dral Date: Fri, 15 Mar 2024 16:15:51 +0100 Subject: [PATCH 2/2] Guard that MutableByteArrays are pinned --- System/IO/BlockIO.hs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/System/IO/BlockIO.hs b/System/IO/BlockIO.hs index 827a097..60630b6 100644 --- a/System/IO/BlockIO.hs +++ b/System/IO/BlockIO.hs @@ -45,7 +45,7 @@ import Control.Concurrent.Chan import Control.Exception (mask_, throw, ArrayException(UndefinedElement), finally, assert, throwIO) import System.IO.Error -import GHC.IO.Exception (IOErrorType(ResourceVanished)) +import GHC.IO.Exception (IOErrorType(ResourceVanished, InvalidArgument)) import Foreign.Ptr import Foreign.C.Error (Errno(..)) @@ -143,6 +143,9 @@ closeIOCtx IOCtx {ioctxURing, ioctxCloseSync} = do URing.closeURing uring putMVar ioctxURing Nothing +-- | The 'MutableByteArray' buffers within __must__ be pinned. Addresses into +-- these buffers are passed to @io_uring@, and the buffers must therefore not be +-- moved around. data IOOp m = IOOpRead !Fd !FileOffset !(MutableByteArray (PrimState m)) !Int !ByteCount | IOOpWrite !Fd !FileOffset !(MutableByteArray (PrimState m)) !Int !ByteCount @@ -235,11 +238,13 @@ submitIO IOCtx { sequence_ [ --print ioop >> case ioop of - IOOpRead fd off buf bufOff cnt -> + IOOpRead fd off buf bufOff cnt -> do + guardPinned buf URing.prepareRead uring fd off (mutableByteArrayContents buf `plusPtr` bufOff) cnt ioopid - IOOpWrite fd off buf bufOff cnt -> + IOOpWrite fd off buf bufOff cnt -> do + guardPinned buf URing.prepareWrite uring fd off (mutableByteArrayContents buf `plusPtr` bufOff) cnt ioopid @@ -248,7 +253,11 @@ submitIO IOCtx { URing.submitIO uring -- print ("submitIO", "submitting done") map (IOResult_ . coerce) . elems <$> takeMVar iobatchCompletion - where closed = mkIOError ResourceVanished "IOCtx closed" Nothing Nothing + where + closed = mkIOError ResourceVanished "IOCtx closed" Nothing Nothing + guardPinned mba = do + unless (isMutableByteArrayPinned mba) $ throwIO notPinned + notPinned = mkIOError InvalidArgument "MutableByteArray is unpinned" Nothing Nothing data IOBatch = IOBatch { iobatchIx :: !IOBatchIx,