From d626dae9e7dd73c7c94f09f0f933779ab2887243 Mon Sep 17 00:00:00 2001 From: Duncan Coutts Date: Wed, 18 Sep 2024 15:55:58 +0100 Subject: [PATCH 1/6] Extend benchmark to report allocations This is as a prelude to trying to reduce allocations. Inital result for high level benchmark Allocated total: 348802552 Allocated per: 1331 That is, 1331 bytes per IO operation. This includes allocations from the benchmark itself. Indeed profiling indicates that most of the allocations are from the benchmark itself rather than the library. --- benchmark/Bench.hs | 65 +++++++++++++++++++++++++++------------------ blockio-uring.cabal | 2 +- 2 files changed, 40 insertions(+), 27 deletions(-) diff --git a/benchmark/Bench.hs b/benchmark/Bench.hs index 4ee967b..45f6289 100644 --- a/benchmark/Bench.hs +++ b/benchmark/Bench.hs @@ -14,10 +14,12 @@ import Foreign import System.Posix.IO import System.Posix.Files -import System.Random +import System.Random as Random import System.Environment import System.Exit +import System.Mem (performMajorGC) import Data.Time +import qualified GHC.Stats as RTS import System.IO.BlockIO import System.IO.BlockIO.URing hiding (submitIO) @@ -81,15 +83,13 @@ main_lowlevel filename = do rng <- initStdGen let blocks = zip [0..] (randomPermute rng [0..lastBlock]) - total = lastBlock + 1 + totalOps = lastBlock + 1 (leadIn, blocks') = splitAt 64 blocks - before <- getCurrentTime - submitBatch leadIn - go blocks' - collectBatch 64 - after <- getCurrentTime - report before after total + withReport totalOps $ do + submitBatch leadIn + go blocks' + collectBatch 64 main_highlevel :: FilePath -> IO () @@ -111,29 +111,42 @@ main_highlevel filename = do ioctxConcurrencyLimit = 64 * 4 } blocks = V.fromList $ zip [0..] (randomPermute rng [0..lastBlock]) + totalOps = lastBlock + 1 bracket (initIOCtx params) closeIOCtx $ \ioctx -> do buf <- newPinnedByteArray (4096 * nbufs) - before <- getCurrentTime - forConcurrently_ (groupsOfN 32 blocks) $ \batch -> - submitIO ioctx $ flip fmap batch $ \ (i, block) -> - let bufOff = (i `mod` nbufs) * 4096 - blockoff = fromIntegral (block * 4096) - in IOOpRead fd blockoff buf bufOff 4096 - after <- getCurrentTime - let total = lastBlock + 1 - report before after total - -report :: UTCTime -> UTCTime -> Int -> IO () -report before after total = do - putStrLn $ "Total I/O ops: " ++ show total - putStrLn $ "Elapsed time: " ++ show elapsed - putStrLn $ "IOPS: " ++ show iops + withReport totalOps $ + forConcurrently_ (groupsOfN 32 blocks) $ \batch -> + submitIO ioctx $ flip fmap batch $ \ (i, block) -> + let bufOff = (i `mod` nbufs) * 4096 + blockoff = fromIntegral (block * 4096) + in IOOpRead fd blockoff buf bufOff 4096 + +withReport :: Int -> IO () -> IO () +withReport totalOps action = do + performMajorGC + beforeRTS <- RTS.getRTSStats + beforeTime <- getCurrentTime + action + afterTime <- getCurrentTime + performMajorGC + afterRTS <- RTS.getRTSStats + report beforeTime afterTime beforeRTS afterRTS totalOps + +report :: UTCTime -> UTCTime -> RTS.RTSStats -> RTS.RTSStats -> Int -> IO () +report beforeTime afterTime beforeRTS afterRTS totalOps = do + putStrLn $ "Total I/O ops: " ++ show totalOps + putStrLn $ "Elapsed time: " ++ show elapsed + putStrLn $ "IOPS: " ++ show iops + putStrLn $ "Allocated total: " ++ show allocated + putStrLn $ "Allocated per: " ++ show apio where - elapsed = after `diffUTCTime` before + elapsed = afterTime `diffUTCTime` beforeTime + allocated = RTS.allocated_bytes afterRTS - RTS.allocated_bytes beforeRTS - iops :: Int - iops = round (fromIntegral total / realToFrac elapsed :: Double) + iops, apio :: Int + iops = round (fromIntegral totalOps / realToFrac elapsed :: Double) + apio = round (fromIntegral allocated / realToFrac totalOps :: Double) groupsOfN :: Int -> V.Vector a -> [V.Vector a] groupsOfN n xs | V.null xs = [] diff --git a/blockio-uring.cabal b/blockio-uring.cabal index 6901077..39714d1 100644 --- a/blockio-uring.cabal +++ b/blockio-uring.cabal @@ -71,7 +71,7 @@ benchmark bench System.IO.BlockIO.URing System.IO.BlockIO.URingFFI - ghc-options: -Wall -threaded + ghc-options: -Wall -threaded -with-rtsopts=-T test-suite test default-language: Haskell2010 From 242b20ff288f50ba58db46186534f38eab2809a3 Mon Sep 17 00:00:00 2001 From: Duncan Coutts Date: Wed, 18 Sep 2024 18:05:15 +0100 Subject: [PATCH 2/6] Adjust high level benchmark approach to reduce allocations Previously we generated a large list of a random permutation of all the 4k disk blocks in the input file, and then broke that up into groups of 32 blocks. We used forConcurrently_ over the groups which starts one thread per group. This approach did excessive work and allocated a lot. For starters the random permutation is expensive in time and space. It needed to track a set of remaining choices. And it had to allocate the whole list because the forConcurrently_ forces the list (of groups). And a thread per 32 blocks is a lot of threads once one uses large files. For our example target file of 262144 blocks, this amounts to 8192 threads. Each one has a stack, so it's a lot of allocations in total. The new approach is: 1. Use random block indexes rather than a permutation. This leads to duplicates but is much cheaper because it needs little state. The best solution to the duplicates will be to use O_DIRECT to avoid caching effects. 2. Only use a small number of threads and have each one submit a series of batches of work (still of size 32). This uses much less state. Then in addition, we use a lower level approach to generatng the random block indexes to reduce heap allocations. The new allocation results for the high level benchmark are: Allocated total: 57904976 Allocated per: 221 That is, 221 bytes per IO op. This is without any changes yet to the actual library, just the benchmark. --- benchmark/Bench.hs | 70 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 53 insertions(+), 17 deletions(-) diff --git a/benchmark/Bench.hs b/benchmark/Bench.hs index 45f6289..c551167 100644 --- a/benchmark/Bench.hs +++ b/benchmark/Bench.hs @@ -7,12 +7,14 @@ module Main (main) where import Data.Primitive import qualified Data.Set as Set import Control.Monad +import Control.Monad.Primitive (RealWorld) import Control.Exception -import Control.Concurrent.Async +import Control.Concurrent.Async as Async import Foreign import System.Posix.IO import System.Posix.Files +import System.Posix.Types as Posix import System.Random as Random import System.Environment @@ -25,6 +27,7 @@ import System.IO.BlockIO import System.IO.BlockIO.URing hiding (submitIO) import qualified System.IO.BlockIO.URing as URing import qualified Data.Vector as V +import qualified Data.Vector.Mutable as VM main :: IO () main = do @@ -105,22 +108,59 @@ main_highlevel filename = do let size = fileSize status lastBlock :: Int lastBlock = fromIntegral (size `div` 4096 - 1) - nbufs = 64 * 4 params = IOCtxParams { ioctxBatchSizeLimit = 64, ioctxConcurrencyLimit = 64 * 4 } - blocks = V.fromList $ zip [0..] (randomPermute rng [0..lastBlock]) - totalOps = lastBlock + 1 - bracket (initIOCtx params) closeIOCtx $ \ioctx -> do - buf <- newPinnedByteArray (4096 * nbufs) - - withReport totalOps $ - forConcurrently_ (groupsOfN 32 blocks) $ \batch -> - submitIO ioctx $ flip fmap batch $ \ (i, block) -> - let bufOff = (i `mod` nbufs) * 4096 - blockoff = fromIntegral (block * 4096) - in IOOpRead fd blockoff buf bufOff 4096 + ntasks = 4 + batchsz = 32 + nbatches = lastBlock `div` (ntasks * batchsz) -- batches per task + totalOps = nbatches * batchsz * ntasks + bracket (initIOCtx params) closeIOCtx $ \ioctx -> + withReport totalOps $ do + tasks <- + forRngSplitM ntasks rng $ \ !rng_task -> + Async.async $ do + buf <- newPinnedByteArray (4096 * batchsz) + forRngSplitM_ nbatches rng_task $ \ !rng_batch -> + submitIO ioctx $ + generateIOOpsBatch fd buf lastBlock batchsz rng_batch + _ <- Async.waitAnyCancel tasks + return () + +generateIOOpsBatch :: Posix.Fd + -> MutableByteArray RealWorld + -> Int + -> Int + -> Random.StdGen + -> V.Vector (IOOp IO) +generateIOOpsBatch !fd !buf !lastBlock !size !rng0 = + V.create $ do + v <- VM.new size + go v rng0 0 + return v + where + go !_ !_ !i | i == size = return () + go !v !rng !i = do + let (!block, !rng') = Random.uniformR (0, lastBlock) rng + !bufOff = i * 4096 + !blockoff = fromIntegral (block * 4096) + VM.unsafeWrite v i $! IOOpRead fd blockoff buf bufOff 4096 + go v rng' (i+1) + +forRngSplitM_ :: Monad m => Int -> Random.StdGen -> (Random.StdGen -> m a) -> m () +forRngSplitM_ n rng0 action = go n rng0 + where + go 0 !_ = return () + go !i !rng = let (!rng', !rng'') = Random.split rng + in action rng' >> go (i-1) rng'' + +forRngSplitM :: Monad m => Int -> Random.StdGen -> (Random.StdGen -> m a) -> m [a] +forRngSplitM n rng0 action = go [] n rng0 + where + go acc 0 !_ = return (reverse acc) + go acc !i !rng = let (!rng', !rng'') = Random.split rng + in action rng' >>= \x -> go (x:acc) (i-1) rng'' withReport :: Int -> IO () -> IO () withReport totalOps action = do @@ -148,10 +188,6 @@ report beforeTime afterTime beforeRTS afterRTS totalOps = do iops = round (fromIntegral totalOps / realToFrac elapsed :: Double) apio = round (fromIntegral allocated / realToFrac totalOps :: Double) -groupsOfN :: Int -> V.Vector a -> [V.Vector a] -groupsOfN n xs | V.null xs = [] - | otherwise = V.take n xs : groupsOfN n (V.drop n xs) - randomPermute :: Ord a => StdGen -> [a] -> [a] randomPermute rng0 xs0 = go (Set.fromList xs0) rng0 From 6217dab45dbb4b1a0b89a1875504bea5dc62e317 Mon Sep 17 00:00:00 2001 From: Duncan Coutts Date: Wed, 18 Sep 2024 18:16:04 +0100 Subject: [PATCH 3/6] Add (NO)INLINE pragmas to the benchmark to improve profile clarity In the time and allocation profiling (using late cost centers) it helps to have things inlined or not inlined to achieve a readable structure to the profile results. This helps identifying the source of the remaining allocations. --- benchmark/Bench.hs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/benchmark/Bench.hs b/benchmark/Bench.hs index c551167..6ed5844 100644 --- a/benchmark/Bench.hs +++ b/benchmark/Bench.hs @@ -95,6 +95,7 @@ main_lowlevel filename = do collectBatch 64 +{-# NOINLINE main_highlevel #-} main_highlevel :: FilePath -> IO () main_highlevel filename = do putStrLn "High-level API benchmark" @@ -128,6 +129,7 @@ main_highlevel filename = do _ <- Async.waitAnyCancel tasks return () +{-# NOINLINE generateIOOpsBatch #-} generateIOOpsBatch :: Posix.Fd -> MutableByteArray RealWorld -> Int @@ -148,6 +150,7 @@ generateIOOpsBatch !fd !buf !lastBlock !size !rng0 = VM.unsafeWrite v i $! IOOpRead fd blockoff buf bufOff 4096 go v rng' (i+1) +{-# INLINE forRngSplitM_ #-} forRngSplitM_ :: Monad m => Int -> Random.StdGen -> (Random.StdGen -> m a) -> m () forRngSplitM_ n rng0 action = go n rng0 where @@ -155,6 +158,7 @@ forRngSplitM_ n rng0 action = go n rng0 go !i !rng = let (!rng', !rng'') = Random.split rng in action rng' >> go (i-1) rng'' +{-# INLINE forRngSplitM #-} forRngSplitM :: Monad m => Int -> Random.StdGen -> (Random.StdGen -> m a) -> m [a] forRngSplitM n rng0 action = go [] n rng0 where @@ -162,6 +166,7 @@ forRngSplitM n rng0 action = go [] n rng0 go acc !i !rng = let (!rng', !rng'') = Random.split rng in action rng' >>= \x -> go (x:acc) (i-1) rng'' +{-# INLINE withReport #-} withReport :: Int -> IO () -> IO () withReport totalOps action = do performMajorGC @@ -173,6 +178,7 @@ withReport totalOps action = do afterRTS <- RTS.getRTSStats report beforeTime afterTime beforeRTS afterRTS totalOps +{-# NOINLINE report #-} report :: UTCTime -> UTCTime -> RTS.RTSStats -> RTS.RTSStats -> Int -> IO () report beforeTime afterTime beforeRTS afterRTS totalOps = do putStrLn $ "Total I/O ops: " ++ show totalOps From 13bacbea5e72fb48873c8e279e6d9b9ad210d08e Mon Sep 17 00:00:00 2001 From: Duncan Coutts Date: Thu, 19 Sep 2024 14:36:44 +0100 Subject: [PATCH 4/6] Rewrite submitIO yet again. Try to do two things: 1. reduce allocations and improve performance slightly 2. improve the sync and async exception safety of submitIO Trying to achieve exception safety is non-trivial and needs masking and multiple levels of exception handlers. The one remaining difficulty is that if we get a sync exception from the lower level submit operation then we do not yet reset the ring to clear the SQEs that we filled out. --- src/System/IO/BlockIO.hs | 165 ++++++++++++++++++++++----------------- 1 file changed, 92 insertions(+), 73 deletions(-) diff --git a/src/System/IO/BlockIO.hs b/src/System/IO/BlockIO.hs index 11c5592..fce65c0 100644 --- a/src/System/IO/BlockIO.hs +++ b/src/System/IO/BlockIO.hs @@ -34,7 +34,7 @@ import Control.Concurrent.MVar import Control.Concurrent.QSemN import Control.Concurrent.Chan import Control.Exception (mask_, throw, ArrayException(UndefinedElement), - finally, assert, throwIO) + finally, assert, throwIO, onException) import System.IO.Error import GHC.IO.Exception (IOErrorType(ResourceVanished, InvalidArgument)) @@ -187,93 +187,109 @@ data IOOp m = IOOpRead !Fd !FileOffset !(MutableByteArray (PrimState m)) !Int ! -- at least the target number in flight at once. -- submitIO :: IOCtx -> V.Vector (IOOp IO) -> IO (VU.Vector IOResult) -submitIO IOCtx { - ioctxBatchSizeLimit', - ioctxQSemN, - ioctxURing, - ioctxChanIOBatch, - ioctxChanIOBatchIx - } - ioops - | iobatchOpCount == 0 = return VU.empty - - | iobatchOpCount > ioctxBatchSizeLimit' = do - -- create completion mvars for each sub-batch - batches <- forM (chunksOf ioctxBatchSizeLimit' ioops) $ \b -> do +submitIO ioctx@IOCtx {ioctxBatchSizeLimit'} !ioops + -- Typical small case. We can be more direct. + | V.length ioops > 0 && V.length ioops <= ioctxBatchSizeLimit' + = mask_ $ do + iobatchCompletion <- newEmptyMVar + prepAndSubmitIOBatch ioctx ioops iobatchCompletion + takeMVar iobatchCompletion + +submitIO ioctx@IOCtx {ioctxBatchSizeLimit'} !ioops0 = + -- General case. Needs multiple batches and combining results. + mask_ $ do + iobatchCompletions <- prepAndSubmitIOBatches [] ioops0 + awaitIOBatches iobatchCompletions + where + prepAndSubmitIOBatches acc !ioops + | V.null ioops = return acc + | otherwise = do + let batch = V.take ioctxBatchSizeLimit' ioops iobatchCompletion <- newEmptyMVar - return (b, iobatchCompletion) - - forM_ batches $ \(batch, iobatchCompletion) -> do - let !iobatchOpCount' = V.length batch - waitQSemN ioctxQSemN iobatchOpCount' - iobatchIx <- readChan ioctxChanIOBatchIx - let iobatchKeepAlives = batch - writeChan ioctxChanIOBatch - IOBatch { - iobatchIx, - iobatchOpCount = iobatchOpCount', - iobatchCompletion, - iobatchKeepAlives - } - - submitBatch iobatchIx batch - - waitAndCombine batches - - | otherwise = do - waitQSemN ioctxQSemN iobatchOpCount - iobatchIx <- readChan ioctxChanIOBatchIx - iobatchCompletion <- newEmptyMVar - let iobatchKeepAlives = ioops + prepAndSubmitIOBatch ioctx batch iobatchCompletion + prepAndSubmitIOBatches (iobatchCompletion:acc) + (V.drop ioctxBatchSizeLimit' ioops) + + awaitIOBatches iobatchCompletions = + VU.concat <$> mapM takeMVar (reverse iobatchCompletions) + +-- Must be called with async exceptions masked. See mask_ above in submitIO. +prepAndSubmitIOBatch :: IOCtx + -> V.Vector (IOOp IO) + -> MVar (VU.Vector IOResult) + -> IO () +prepAndSubmitIOBatch IOCtx { + ioctxQSemN, + ioctxURing, + ioctxChanIOBatch, + ioctxChanIOBatchIx + } + !iobatch !iobatchCompletion = do + let !iobatchOpCount = V.length iobatch + -- We're called with async exceptions masked, but 'waitQSemN' can block and + -- receive exceptions. That's ok. But once we acquire the semaphore + -- quantitiy we must eventully return it. There's two cases for returning: + -- 1. we successfully submit the I/O and pass the information off to the + -- completionThread which will signal the semaphore upon completion, or + -- 2. we encounter an exception here in which case we need to undo the + -- semaphore acquisition. + -- For the latter case we use 'onException'. We also need to obtain a + -- batch index. This should never block because we have as many tokens as + -- QSemN initial quantitiy, and the batch ix is released before the QSemN + -- is signaled in the completionThread. + waitQSemN ioctxQSemN iobatchOpCount + !iobatchIx <- readChan ioctxChanIOBatchIx + -- Thus undoing the acquisition involves releasing the batch index and + -- semaphore quantitiy (which themselves cannot blocks). + let undoAcquisition = do writeChan ioctxChanIOBatchIx iobatchIx + signalQSemN ioctxQSemN iobatchOpCount + flip onException undoAcquisition $ do + -- We can receive an async exception if takeMVar blocks. That's ok, we'll + -- undo the acquisition. + muring <- takeMVar ioctxURing + -- From here on we cannot receive any async exceptions, because we do not + -- do any more blocking operations. But we can encounter sync exceptions, + -- so we may still need to release the mvar on exception. + flip onException (putMVar ioctxURing muring) $ do + uring <- maybe (throwIO closed) pure muring + V.iforM_ iobatch $ \ioopix ioop -> case ioop of + IOOpRead fd off buf bufOff cnt -> do + guardPinned buf + URing.prepareRead uring fd off + (mutableByteArrayContents buf `plusPtr` bufOff) + cnt (packIOOpId iobatchIx ioopix) + IOOpWrite fd off buf bufOff cnt -> do + guardPinned buf + URing.prepareWrite uring fd off + (mutableByteArrayContents buf `plusPtr` bufOff) + cnt (packIOOpId iobatchIx ioopix) + -- TODO: if submitIO or guardPinned throws an exception, we need to + -- undo / clear the SQEs that we prepared. + URing.submitIO uring + + -- More async exception safety: we want to inform the completionThread + -- /if and only if/ we successfully submitted a bathc of IO. So now that + -- we have submitted a batch we need to inform the completionThread + -- without interruptions. We're still masked, but writeChan does not + -- throw exceptions and never blocks (unbounded channel) so we should + -- not get async or sync exceptions. writeChan ioctxChanIOBatch IOBatch { iobatchIx, iobatchOpCount, iobatchCompletion, - iobatchKeepAlives + iobatchKeepAlives = iobatch } - submitBatch iobatchIx ioops - takeMVar iobatchCompletion + putMVar ioctxURing muring where - !iobatchOpCount = V.length ioops - guardPinned mba = unless (isMutableByteArrayPinned mba) $ throwIO notPinned closed = mkIOError ResourceVanished "IOCtx closed" Nothing Nothing notPinned = mkIOError InvalidArgument "MutableByteArray is unpinned" Nothing Nothing - {-# INLINE submitBatch #-} - submitBatch iobatchIx batch = - withMVar ioctxURing $ \case - Nothing -> throwIO closed - Just uring -> do - V.iforM_ batch $ \ioopix ioop -> - let !ioopid = packIOOpId iobatchIx ioopix in - case ioop of - IOOpRead fd off buf bufOff cnt -> do - guardPinned buf - URing.prepareRead uring fd off - (mutableByteArrayContents buf `plusPtr` bufOff) - cnt ioopid - IOOpWrite fd off buf bufOff cnt -> do - guardPinned buf - URing.prepareWrite uring fd off - (mutableByteArrayContents buf `plusPtr` bufOff) - cnt ioopid - URing.submitIO uring - - waitAndCombine :: [(a, MVar (VU.Vector IOResult))] - -> IO (VU.Vector IOResult) - waitAndCombine xs = VU.concat <$!> forM xs (takeMVar . snd) - -chunksOf :: Int -> V.Vector a -> [V.Vector a] -chunksOf n xs - | V.length xs == 0 = [] - | otherwise = V.take n xs : chunksOf n (V.drop n xs) - data IOBatch = IOBatch { iobatchIx :: !IOBatchIx, iobatchOpCount :: !Int, - iobatchCompletion :: MVar (VU.Vector IOResult), + iobatchCompletion :: !(MVar (VU.Vector IOResult)), -- | The list of I\/O operations is sent to the completion -- thread so that the buffers are kept alive while the kernel -- is using them. @@ -341,6 +357,9 @@ completionThread !uring !done !maxc !qsem !chaniobatch !chaniobatchix = do VM.write keepAlives iobatchix invalidEntry result' <- VU.unsafeFreeze result putMVar completion (result' :: VU.Vector IOResult) + -- Important: release batch index _before_ we signal the QSemN. + -- The other side needs the guarantee that the index is available + -- once it acquires the QSemN. writeChan chaniobatchix iobatchix let !qrelease = VU.length result' signalQSemN qsem qrelease From b7887e9c67c0ad3539d6b7e5f8324b682c597e6c Mon Sep 17 00:00:00 2001 From: Duncan Coutts Date: Thu, 19 Sep 2024 22:37:50 +0100 Subject: [PATCH 5/6] Reduce allocations in awaitIO High level benchmark allocations down to: Allocated total: 28566920 Allocated per: 109 which is just under 14 words per operation, which isn't too bad. At least, when using 32 operations per batch (many of the allocations are per-batch). --- src/System/IO/BlockIO/URing.hs | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/src/System/IO/BlockIO/URing.hs b/src/System/IO/BlockIO/URing.hs index 64948ab..2731e4e 100644 --- a/src/System/IO/BlockIO/URing.hs +++ b/src/System/IO/BlockIO/URing.hs @@ -33,6 +33,7 @@ import qualified Data.Vector.Unboxed.Base import Foreign import Foreign.C +import Foreign.ForeignPtr.Unsafe import System.IO.Error import System.Posix.Types @@ -46,23 +47,30 @@ import qualified System.IO.BlockIO.URingFFI as FFI -- Init -- -newtype URing = URing (Ptr FFI.URing) +data URing = URing { + -- | The uring itself. + uringptr :: !(Ptr FFI.URing), + + -- | A pre-allocated buffer to help with FFI marshalling. + cqeptrfptr :: {-# UNPACK #-} !(ForeignPtr (Ptr FFI.URingCQE)) + } newtype URingParams = URingParams { uringSize :: Int } setupURing :: URingParams -> IO URing setupURing URingParams { uringSize } = do uringptr <- malloc + cqeptrfptr <- mallocForeignPtr throwErrnoResIfNegRetry_ "uringInit" $ FFI.io_uring_queue_init (fromIntegral uringSize) uringptr flags - return (URing uringptr) + return URing { uringptr, cqeptrfptr } where flags = 0 closeURing :: URing -> IO () -closeURing (URing uringptr) = do +closeURing URing {uringptr} = do FFI.io_uring_queue_exit uringptr free uringptr @@ -79,7 +87,7 @@ newtype IOOpId = IOOpId Word64 deriving (Eq, Ord, Bounded, Show) prepareRead :: URing -> Fd -> FileOffset -> Ptr Word8 -> ByteCount -> IOOpId -> IO () -prepareRead (URing uringptr) fd off buf len (IOOpId ioopid) = do +prepareRead URing {uringptr} fd off buf len (IOOpId ioopid) = do sqeptr <- throwErrResIfNull "prepareRead" fullErrorType "URing I/O queue full" $ FFI.io_uring_get_sqe uringptr @@ -87,7 +95,7 @@ prepareRead (URing uringptr) fd off buf len (IOOpId ioopid) = do FFI.io_uring_sqe_set_data sqeptr (fromIntegral ioopid) prepareWrite :: URing -> Fd -> FileOffset -> Ptr Word8 -> ByteCount -> IOOpId -> IO () -prepareWrite (URing uringptr) fd off buf len (IOOpId ioopid) = do +prepareWrite URing {uringptr} fd off buf len (IOOpId ioopid) = do sqeptr <- throwErrResIfNull "prepareWrite" fullErrorType "URing I/O queue full" $ FFI.io_uring_get_sqe uringptr @@ -95,7 +103,7 @@ prepareWrite (URing uringptr) fd off buf len (IOOpId ioopid) = do FFI.io_uring_sqe_set_data sqeptr (fromIntegral ioopid) prepareNop :: URing -> IOOpId -> IO () -prepareNop (URing uringptr) (IOOpId ioopid) = do +prepareNop URing {uringptr} (IOOpId ioopid) = do sqeptr <- throwErrResIfNull "prepareNop" fullErrorType "URing I/O queue full" $ FFI.io_uring_get_sqe uringptr @@ -103,7 +111,7 @@ prepareNop (URing uringptr) (IOOpId ioopid) = do FFI.io_uring_sqe_set_data sqeptr (fromIntegral ioopid) submitIO :: URing -> IO () -submitIO (URing uringptr) = +submitIO URing {uringptr} = throwErrnoResIfNegRetry_ "submitIO" $ FFI.io_uring_submit uringptr @@ -157,9 +165,15 @@ instance VU.Unbox IOResult -- Completing I/O -- +-- | Must only be called from one thread at once. awaitIO :: URing -> IO IOCompletion -awaitIO (URing uringptr) = - alloca $ \cqeptrptr -> do +awaitIO !URing {uringptr, cqeptrfptr} = do + -- We use unsafeForeignPtrToPtr and touchForeignPtr here rather than + -- withForeignPtr because using withForeignPtr defeats GHCs CPR analysis + -- which causes the 'IOCompletion' result to be allocated on the heap + -- rather than returned in registers. + + let !cqeptrptr = unsafeForeignPtrToPtr cqeptrfptr -- Try non-blocking first (unsafe FFI call) peekres <- FFI.io_uring_peek_cqe uringptr cqeptrptr -- But if nothing is available, use a blocking call (safe FFI call) @@ -175,6 +189,7 @@ awaitIO (URing uringptr) = cqeptr <- peek cqeptrptr FFI.URingCQE { FFI.cqe_data, FFI.cqe_res } <- peek cqeptr FFI.io_uring_cqe_seen uringptr cqeptr + touchForeignPtr cqeptrfptr let opid = IOOpId (fromIntegral cqe_data) res = IOResult_ (fromIntegral cqe_res) return $! IOCompletion opid res From 50a8a63af5b84edee3a17597c70525ec38066ba3 Mon Sep 17 00:00:00 2001 From: Duncan Coutts Date: Fri, 20 Sep 2024 12:58:28 +0100 Subject: [PATCH 6/6] Fix build for ghc 9.2 and older They need GADTs or TypeFamilies enabled in a module to solve constraints arising from other modules. --- benchmark/Bench.hs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/benchmark/Bench.hs b/benchmark/Bench.hs index 6ed5844..b008357 100644 --- a/benchmark/Bench.hs +++ b/benchmark/Bench.hs @@ -1,5 +1,6 @@ {-# LANGUAGE BangPatterns #-} {-# LANGUAGE CPP #-} +{-# LANGUAGE TypeFamilies #-} -- Needed for GHC 9.2 and older only {- HLINT ignore "Use camelCase" -} module Main (main) where @@ -8,6 +9,7 @@ import Data.Primitive import qualified Data.Set as Set import Control.Monad import Control.Monad.Primitive (RealWorld) +import Control.Monad.ST (ST) import Control.Exception import Control.Concurrent.Async as Async @@ -142,6 +144,7 @@ generateIOOpsBatch !fd !buf !lastBlock !size !rng0 = go v rng0 0 return v where + go :: V.MVector s (IOOp IO) -> Random.StdGen -> Int -> ST s () go !_ !_ !i | i == size = return () go !v !rng !i = do let (!block, !rng') = Random.uniformR (0, lastBlock) rng