Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce allocations #22

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 93 additions & 35 deletions benchmark/Bench.hs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these new INLINE/NOINLINE pragmas in 6217dab change benchmark results in any way?

Original file line number Diff line number Diff line change
@@ -1,28 +1,35 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE TypeFamilies #-} -- Needed for GHC 9.2 and older only
{- HLINT ignore "Use camelCase" -}

module Main (main) where

import Data.Primitive
import qualified Data.Set as Set
import Control.Monad
import Control.Monad.Primitive (RealWorld)
import Control.Monad.ST (ST)
import Control.Exception
import Control.Concurrent.Async
import Control.Concurrent.Async as Async

import Foreign
import System.Posix.IO
import System.Posix.Files
import System.Posix.Types as Posix

import System.Random
import System.Random as Random
import System.Environment
import System.Exit
import System.Mem (performMajorGC)
import Data.Time
import qualified GHC.Stats as RTS

import System.IO.BlockIO
import System.IO.BlockIO.URing hiding (submitIO)
import qualified System.IO.BlockIO.URing as URing
import qualified Data.Vector as V
import qualified Data.Vector.Mutable as VM

main :: IO ()
main = do
Expand Down Expand Up @@ -81,17 +88,16 @@ main_lowlevel filename = do

rng <- initStdGen
let blocks = zip [0..] (randomPermute rng [0..lastBlock])
total = lastBlock + 1
totalOps = lastBlock + 1
(leadIn, blocks') = splitAt 64 blocks

before <- getCurrentTime
submitBatch leadIn
go blocks'
collectBatch 64
after <- getCurrentTime
report before after total
withReport totalOps $ do
submitBatch leadIn
go blocks'
collectBatch 64


{-# NOINLINE main_highlevel #-}
main_highlevel :: FilePath -> IO ()
main_highlevel filename = do
putStrLn "High-level API benchmark"
Expand All @@ -105,39 +111,91 @@ main_highlevel filename = do
let size = fileSize status
lastBlock :: Int
lastBlock = fromIntegral (size `div` 4096 - 1)
nbufs = 64 * 4
params = IOCtxParams {
ioctxBatchSizeLimit = 64,
ioctxConcurrencyLimit = 64 * 4
}
blocks = V.fromList $ zip [0..] (randomPermute rng [0..lastBlock])
bracket (initIOCtx params) closeIOCtx $ \ioctx -> do
buf <- newPinnedByteArray (4096 * nbufs)

before <- getCurrentTime
forConcurrently_ (groupsOfN 32 blocks) $ \batch ->
submitIO ioctx $ flip fmap batch $ \ (i, block) ->
let bufOff = (i `mod` nbufs) * 4096
blockoff = fromIntegral (block * 4096)
in IOOpRead fd blockoff buf bufOff 4096
after <- getCurrentTime
let total = lastBlock + 1
report before after total

report :: UTCTime -> UTCTime -> Int -> IO ()
report before after total = do
putStrLn $ "Total I/O ops: " ++ show total
putStrLn $ "Elapsed time: " ++ show elapsed
putStrLn $ "IOPS: " ++ show iops
ntasks = 4
batchsz = 32
nbatches = lastBlock `div` (ntasks * batchsz) -- batches per task
totalOps = nbatches * batchsz * ntasks
Comment on lines +120 to +121
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, we might ignore a few pages based on whether the division rounds down?

bracket (initIOCtx params) closeIOCtx $ \ioctx ->
withReport totalOps $ do
tasks <-
forRngSplitM ntasks rng $ \ !rng_task ->
Async.async $ do
buf <- newPinnedByteArray (4096 * batchsz)
forRngSplitM_ nbatches rng_task $ \ !rng_batch ->
submitIO ioctx $
generateIOOpsBatch fd buf lastBlock batchsz rng_batch
_ <- Async.waitAnyCancel tasks
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that right? Shouldn't all async actions finish?

return ()

{-# NOINLINE generateIOOpsBatch #-}
generateIOOpsBatch :: Posix.Fd
-> MutableByteArray RealWorld
-> Int
-> Int
-> Random.StdGen
-> V.Vector (IOOp IO)
generateIOOpsBatch !fd !buf !lastBlock !size !rng0 =
V.create $ do
v <- VM.new size
go v rng0 0
return v
where
go :: V.MVector s (IOOp IO) -> Random.StdGen -> Int -> ST s ()
go !_ !_ !i | i == size = return ()
go !v !rng !i = do
let (!block, !rng') = Random.uniformR (0, lastBlock) rng
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify, previously we read each block exactly once, now it's random, so some will be read multiple times, some never? That should be fine for this benchmark.

!bufOff = i * 4096
!blockoff = fromIntegral (block * 4096)
VM.unsafeWrite v i $! IOOpRead fd blockoff buf bufOff 4096
go v rng' (i+1)

{-# INLINE forRngSplitM_ #-}
forRngSplitM_ :: Monad m => Int -> Random.StdGen -> (Random.StdGen -> m a) -> m ()
forRngSplitM_ n rng0 action = go n rng0
where
elapsed = after `diffUTCTime` before
go 0 !_ = return ()
go !i !rng = let (!rng', !rng'') = Random.split rng
in action rng' >> go (i-1) rng''

iops :: Int
iops = round (fromIntegral total / realToFrac elapsed :: Double)
{-# INLINE forRngSplitM #-}
forRngSplitM :: Monad m => Int -> Random.StdGen -> (Random.StdGen -> m a) -> m [a]
forRngSplitM n rng0 action = go [] n rng0
where
go acc 0 !_ = return (reverse acc)
go acc !i !rng = let (!rng', !rng'') = Random.split rng
in action rng' >>= \x -> go (x:acc) (i-1) rng''

{-# INLINE withReport #-}
withReport :: Int -> IO () -> IO ()
withReport totalOps action = do
performMajorGC
beforeRTS <- RTS.getRTSStats
beforeTime <- getCurrentTime
action
afterTime <- getCurrentTime
performMajorGC
afterRTS <- RTS.getRTSStats
report beforeTime afterTime beforeRTS afterRTS totalOps

{-# NOINLINE report #-}
report :: UTCTime -> UTCTime -> RTS.RTSStats -> RTS.RTSStats -> Int -> IO ()
report beforeTime afterTime beforeRTS afterRTS totalOps = do
putStrLn $ "Total I/O ops: " ++ show totalOps
putStrLn $ "Elapsed time: " ++ show elapsed
putStrLn $ "IOPS: " ++ show iops
putStrLn $ "Allocated total: " ++ show allocated
putStrLn $ "Allocated per: " ++ show apio
where
elapsed = afterTime `diffUTCTime` beforeTime
allocated = RTS.allocated_bytes afterRTS - RTS.allocated_bytes beforeRTS

groupsOfN :: Int -> V.Vector a -> [V.Vector a]
groupsOfN n xs | V.null xs = []
| otherwise = V.take n xs : groupsOfN n (V.drop n xs)
iops, apio :: Int
iops = round (fromIntegral totalOps / realToFrac elapsed :: Double)
apio = round (fromIntegral allocated / realToFrac totalOps :: Double)

randomPermute :: Ord a => StdGen -> [a] -> [a]
randomPermute rng0 xs0 =
Expand Down
2 changes: 1 addition & 1 deletion blockio-uring.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ benchmark bench
System.IO.BlockIO.URing
System.IO.BlockIO.URingFFI

ghc-options: -Wall -threaded
ghc-options: -Wall -threaded -with-rtsopts=-T

test-suite test
default-language: Haskell2010
Expand Down
165 changes: 92 additions & 73 deletions src/System/IO/BlockIO.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import Control.Concurrent.MVar
import Control.Concurrent.QSemN
import Control.Concurrent.Chan
import Control.Exception (mask_, throw, ArrayException(UndefinedElement),
finally, assert, throwIO)
finally, assert, throwIO, onException)
import System.IO.Error
import GHC.IO.Exception (IOErrorType(ResourceVanished, InvalidArgument))

Expand Down Expand Up @@ -187,93 +187,109 @@ data IOOp m = IOOpRead !Fd !FileOffset !(MutableByteArray (PrimState m)) !Int !
-- at least the target number in flight at once.
--
submitIO :: IOCtx -> V.Vector (IOOp IO) -> IO (VU.Vector IOResult)
submitIO IOCtx {
ioctxBatchSizeLimit',
ioctxQSemN,
ioctxURing,
ioctxChanIOBatch,
ioctxChanIOBatchIx
}
ioops
| iobatchOpCount == 0 = return VU.empty

| iobatchOpCount > ioctxBatchSizeLimit' = do
-- create completion mvars for each sub-batch
batches <- forM (chunksOf ioctxBatchSizeLimit' ioops) $ \b -> do
submitIO ioctx@IOCtx {ioctxBatchSizeLimit'} !ioops
-- Typical small case. We can be more direct.
| V.length ioops > 0 && V.length ioops <= ioctxBatchSizeLimit'
= mask_ $ do
iobatchCompletion <- newEmptyMVar
prepAndSubmitIOBatch ioctx ioops iobatchCompletion
takeMVar iobatchCompletion

submitIO ioctx@IOCtx {ioctxBatchSizeLimit'} !ioops0 =
-- General case. Needs multiple batches and combining results.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or it needs no batch at all, if V.null ioops. The code handles this case correctly, but the comment only mentions the V.length ioops > ioctxBatchSizeLimit' case

mask_ $ do
iobatchCompletions <- prepAndSubmitIOBatches [] ioops0
awaitIOBatches iobatchCompletions
where
prepAndSubmitIOBatches acc !ioops
| V.null ioops = return acc
| otherwise = do
let batch = V.take ioctxBatchSizeLimit' ioops
iobatchCompletion <- newEmptyMVar
return (b, iobatchCompletion)

forM_ batches $ \(batch, iobatchCompletion) -> do
let !iobatchOpCount' = V.length batch
waitQSemN ioctxQSemN iobatchOpCount'
iobatchIx <- readChan ioctxChanIOBatchIx
let iobatchKeepAlives = batch
writeChan ioctxChanIOBatch
IOBatch {
iobatchIx,
iobatchOpCount = iobatchOpCount',
iobatchCompletion,
iobatchKeepAlives
}

submitBatch iobatchIx batch

waitAndCombine batches

| otherwise = do
waitQSemN ioctxQSemN iobatchOpCount
iobatchIx <- readChan ioctxChanIOBatchIx
iobatchCompletion <- newEmptyMVar
let iobatchKeepAlives = ioops
prepAndSubmitIOBatch ioctx batch iobatchCompletion
prepAndSubmitIOBatches (iobatchCompletion:acc)
(V.drop ioctxBatchSizeLimit' ioops)

awaitIOBatches iobatchCompletions =
VU.concat <$> mapM takeMVar (reverse iobatchCompletions)
Comment on lines +213 to +214
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was giving some thought to whether it is okay that takeMVar is interruptible here. I suppose if the submitting thread gets interrupted and doesn't take the MVars, that should be fine because the MVars can be GC'ed. No explicit cleanup required

Comment on lines +213 to +214
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Int terms of allocations/number of allocated objects, would reverse on the vector be better than reverse on the list?


-- Must be called with async exceptions masked. See mask_ above in submitIO.
prepAndSubmitIOBatch :: IOCtx
-> V.Vector (IOOp IO)
-> MVar (VU.Vector IOResult)
-> IO ()
prepAndSubmitIOBatch IOCtx {
ioctxQSemN,
ioctxURing,
ioctxChanIOBatch,
ioctxChanIOBatchIx
}
!iobatch !iobatchCompletion = do
let !iobatchOpCount = V.length iobatch
-- We're called with async exceptions masked, but 'waitQSemN' can block and
-- receive exceptions. That's ok. But once we acquire the semaphore
Comment on lines +229 to +230
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because waitQSemN is interruptible, right?

-- quantitiy we must eventully return it. There's two cases for returning:
-- 1. we successfully submit the I/O and pass the information off to the
-- completionThread which will signal the semaphore upon completion, or
-- 2. we encounter an exception here in which case we need to undo the
-- semaphore acquisition.
-- For the latter case we use 'onException'. We also need to obtain a
-- batch index. This should never block because we have as many tokens as
-- QSemN initial quantitiy, and the batch ix is released before the QSemN
-- is signaled in the completionThread.
waitQSemN ioctxQSemN iobatchOpCount
!iobatchIx <- readChan ioctxChanIOBatchIx
-- Thus undoing the acquisition involves releasing the batch index and
-- semaphore quantitiy (which themselves cannot blocks).
let undoAcquisition = do writeChan ioctxChanIOBatchIx iobatchIx
signalQSemN ioctxQSemN iobatchOpCount
flip onException undoAcquisition $ do
-- We can receive an async exception if takeMVar blocks. That's ok, we'll
-- undo the acquisition.
muring <- takeMVar ioctxURing
-- From here on we cannot receive any async exceptions, because we do not
-- do any more blocking operations. But we can encounter sync exceptions,
-- so we may still need to release the mvar on exception.
flip onException (putMVar ioctxURing muring) $ do
uring <- maybe (throwIO closed) pure muring
V.iforM_ iobatch $ \ioopix ioop -> case ioop of
IOOpRead fd off buf bufOff cnt -> do
guardPinned buf
URing.prepareRead uring fd off
(mutableByteArrayContents buf `plusPtr` bufOff)
cnt (packIOOpId iobatchIx ioopix)
IOOpWrite fd off buf bufOff cnt -> do
guardPinned buf
URing.prepareWrite uring fd off
(mutableByteArrayContents buf `plusPtr` bufOff)
cnt (packIOOpId iobatchIx ioopix)
-- TODO: if submitIO or guardPinned throws an exception, we need to
-- undo / clear the SQEs that we prepared.
URing.submitIO uring

-- More async exception safety: we want to inform the completionThread
-- /if and only if/ we successfully submitted a bathc of IO. So now that
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
-- /if and only if/ we successfully submitted a bathc of IO. So now that
-- /if and only if/ we successfully submitted a batch of IO. So now that

-- we have submitted a batch we need to inform the completionThread
-- without interruptions. We're still masked, but writeChan does not
-- throw exceptions and never blocks (unbounded channel) so we should
-- not get async or sync exceptions.
writeChan ioctxChanIOBatch
IOBatch {
iobatchIx,
iobatchOpCount,
iobatchCompletion,
iobatchKeepAlives
iobatchKeepAlives = iobatch
}
submitBatch iobatchIx ioops
takeMVar iobatchCompletion
putMVar ioctxURing muring
where
!iobatchOpCount = V.length ioops

guardPinned mba = unless (isMutableByteArrayPinned mba) $ throwIO notPinned
closed = mkIOError ResourceVanished "IOCtx closed" Nothing Nothing
notPinned = mkIOError InvalidArgument "MutableByteArray is unpinned" Nothing Nothing

{-# INLINE submitBatch #-}
submitBatch iobatchIx batch =
withMVar ioctxURing $ \case
Nothing -> throwIO closed
Just uring -> do
V.iforM_ batch $ \ioopix ioop ->
let !ioopid = packIOOpId iobatchIx ioopix in
case ioop of
IOOpRead fd off buf bufOff cnt -> do
guardPinned buf
URing.prepareRead uring fd off
(mutableByteArrayContents buf `plusPtr` bufOff)
cnt ioopid
IOOpWrite fd off buf bufOff cnt -> do
guardPinned buf
URing.prepareWrite uring fd off
(mutableByteArrayContents buf `plusPtr` bufOff)
cnt ioopid
URing.submitIO uring

waitAndCombine :: [(a, MVar (VU.Vector IOResult))]
-> IO (VU.Vector IOResult)
waitAndCombine xs = VU.concat <$!> forM xs (takeMVar . snd)

chunksOf :: Int -> V.Vector a -> [V.Vector a]
chunksOf n xs
| V.length xs == 0 = []
| otherwise = V.take n xs : chunksOf n (V.drop n xs)

data IOBatch = IOBatch {
iobatchIx :: !IOBatchIx,
iobatchOpCount :: !Int,
iobatchCompletion :: MVar (VU.Vector IOResult),
iobatchCompletion :: !(MVar (VU.Vector IOResult)),
-- | The list of I\/O operations is sent to the completion
-- thread so that the buffers are kept alive while the kernel
-- is using them.
Expand Down Expand Up @@ -341,6 +357,9 @@ completionThread !uring !done !maxc !qsem !chaniobatch !chaniobatchix = do
VM.write keepAlives iobatchix invalidEntry
result' <- VU.unsafeFreeze result
putMVar completion (result' :: VU.Vector IOResult)
-- Important: release batch index _before_ we signal the QSemN.
-- The other side needs the guarantee that the index is available
-- once it acquires the QSemN.
writeChan chaniobatchix iobatchix
let !qrelease = VU.length result'
signalQSemN qsem qrelease
Expand Down
Loading
Loading