-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reduce allocations #22
base: main
Are you sure you want to change the base?
Changes from all commits
d626dae
242b20f
6217dab
13bacbe
b7887e9
50a8a63
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,28 +1,35 @@ | ||
{-# LANGUAGE BangPatterns #-} | ||
{-# LANGUAGE CPP #-} | ||
{-# LANGUAGE TypeFamilies #-} -- Needed for GHC 9.2 and older only | ||
{- HLINT ignore "Use camelCase" -} | ||
|
||
module Main (main) where | ||
|
||
import Data.Primitive | ||
import qualified Data.Set as Set | ||
import Control.Monad | ||
import Control.Monad.Primitive (RealWorld) | ||
import Control.Monad.ST (ST) | ||
import Control.Exception | ||
import Control.Concurrent.Async | ||
import Control.Concurrent.Async as Async | ||
|
||
import Foreign | ||
import System.Posix.IO | ||
import System.Posix.Files | ||
import System.Posix.Types as Posix | ||
|
||
import System.Random | ||
import System.Random as Random | ||
import System.Environment | ||
import System.Exit | ||
import System.Mem (performMajorGC) | ||
import Data.Time | ||
import qualified GHC.Stats as RTS | ||
|
||
import System.IO.BlockIO | ||
import System.IO.BlockIO.URing hiding (submitIO) | ||
import qualified System.IO.BlockIO.URing as URing | ||
import qualified Data.Vector as V | ||
import qualified Data.Vector.Mutable as VM | ||
|
||
main :: IO () | ||
main = do | ||
|
@@ -81,17 +88,16 @@ main_lowlevel filename = do | |
|
||
rng <- initStdGen | ||
let blocks = zip [0..] (randomPermute rng [0..lastBlock]) | ||
total = lastBlock + 1 | ||
totalOps = lastBlock + 1 | ||
(leadIn, blocks') = splitAt 64 blocks | ||
|
||
before <- getCurrentTime | ||
submitBatch leadIn | ||
go blocks' | ||
collectBatch 64 | ||
after <- getCurrentTime | ||
report before after total | ||
withReport totalOps $ do | ||
submitBatch leadIn | ||
go blocks' | ||
collectBatch 64 | ||
|
||
|
||
{-# NOINLINE main_highlevel #-} | ||
main_highlevel :: FilePath -> IO () | ||
main_highlevel filename = do | ||
putStrLn "High-level API benchmark" | ||
|
@@ -105,39 +111,91 @@ main_highlevel filename = do | |
let size = fileSize status | ||
lastBlock :: Int | ||
lastBlock = fromIntegral (size `div` 4096 - 1) | ||
nbufs = 64 * 4 | ||
params = IOCtxParams { | ||
ioctxBatchSizeLimit = 64, | ||
ioctxConcurrencyLimit = 64 * 4 | ||
} | ||
blocks = V.fromList $ zip [0..] (randomPermute rng [0..lastBlock]) | ||
bracket (initIOCtx params) closeIOCtx $ \ioctx -> do | ||
buf <- newPinnedByteArray (4096 * nbufs) | ||
|
||
before <- getCurrentTime | ||
forConcurrently_ (groupsOfN 32 blocks) $ \batch -> | ||
submitIO ioctx $ flip fmap batch $ \ (i, block) -> | ||
let bufOff = (i `mod` nbufs) * 4096 | ||
blockoff = fromIntegral (block * 4096) | ||
in IOOpRead fd blockoff buf bufOff 4096 | ||
after <- getCurrentTime | ||
let total = lastBlock + 1 | ||
report before after total | ||
|
||
report :: UTCTime -> UTCTime -> Int -> IO () | ||
report before after total = do | ||
putStrLn $ "Total I/O ops: " ++ show total | ||
putStrLn $ "Elapsed time: " ++ show elapsed | ||
putStrLn $ "IOPS: " ++ show iops | ||
ntasks = 4 | ||
batchsz = 32 | ||
nbatches = lastBlock `div` (ntasks * batchsz) -- batches per task | ||
totalOps = nbatches * batchsz * ntasks | ||
Comment on lines
+120
to
+121
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, we might ignore a few pages based on whether the division rounds down? |
||
bracket (initIOCtx params) closeIOCtx $ \ioctx -> | ||
withReport totalOps $ do | ||
tasks <- | ||
forRngSplitM ntasks rng $ \ !rng_task -> | ||
Async.async $ do | ||
buf <- newPinnedByteArray (4096 * batchsz) | ||
forRngSplitM_ nbatches rng_task $ \ !rng_batch -> | ||
submitIO ioctx $ | ||
generateIOOpsBatch fd buf lastBlock batchsz rng_batch | ||
_ <- Async.waitAnyCancel tasks | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is that right? Shouldn't all async actions finish? |
||
return () | ||
|
||
{-# NOINLINE generateIOOpsBatch #-} | ||
generateIOOpsBatch :: Posix.Fd | ||
-> MutableByteArray RealWorld | ||
-> Int | ||
-> Int | ||
-> Random.StdGen | ||
-> V.Vector (IOOp IO) | ||
generateIOOpsBatch !fd !buf !lastBlock !size !rng0 = | ||
V.create $ do | ||
v <- VM.new size | ||
go v rng0 0 | ||
return v | ||
where | ||
go :: V.MVector s (IOOp IO) -> Random.StdGen -> Int -> ST s () | ||
go !_ !_ !i | i == size = return () | ||
go !v !rng !i = do | ||
let (!block, !rng') = Random.uniformR (0, lastBlock) rng | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to clarify, previously we read each block exactly once, now it's random, so some will be read multiple times, some never? That should be fine for this benchmark. |
||
!bufOff = i * 4096 | ||
!blockoff = fromIntegral (block * 4096) | ||
VM.unsafeWrite v i $! IOOpRead fd blockoff buf bufOff 4096 | ||
go v rng' (i+1) | ||
|
||
{-# INLINE forRngSplitM_ #-} | ||
forRngSplitM_ :: Monad m => Int -> Random.StdGen -> (Random.StdGen -> m a) -> m () | ||
forRngSplitM_ n rng0 action = go n rng0 | ||
where | ||
elapsed = after `diffUTCTime` before | ||
go 0 !_ = return () | ||
go !i !rng = let (!rng', !rng'') = Random.split rng | ||
in action rng' >> go (i-1) rng'' | ||
|
||
iops :: Int | ||
iops = round (fromIntegral total / realToFrac elapsed :: Double) | ||
{-# INLINE forRngSplitM #-} | ||
forRngSplitM :: Monad m => Int -> Random.StdGen -> (Random.StdGen -> m a) -> m [a] | ||
forRngSplitM n rng0 action = go [] n rng0 | ||
where | ||
go acc 0 !_ = return (reverse acc) | ||
go acc !i !rng = let (!rng', !rng'') = Random.split rng | ||
in action rng' >>= \x -> go (x:acc) (i-1) rng'' | ||
|
||
{-# INLINE withReport #-} | ||
withReport :: Int -> IO () -> IO () | ||
withReport totalOps action = do | ||
performMajorGC | ||
beforeRTS <- RTS.getRTSStats | ||
beforeTime <- getCurrentTime | ||
action | ||
afterTime <- getCurrentTime | ||
performMajorGC | ||
afterRTS <- RTS.getRTSStats | ||
report beforeTime afterTime beforeRTS afterRTS totalOps | ||
|
||
{-# NOINLINE report #-} | ||
report :: UTCTime -> UTCTime -> RTS.RTSStats -> RTS.RTSStats -> Int -> IO () | ||
report beforeTime afterTime beforeRTS afterRTS totalOps = do | ||
putStrLn $ "Total I/O ops: " ++ show totalOps | ||
putStrLn $ "Elapsed time: " ++ show elapsed | ||
putStrLn $ "IOPS: " ++ show iops | ||
putStrLn $ "Allocated total: " ++ show allocated | ||
putStrLn $ "Allocated per: " ++ show apio | ||
where | ||
elapsed = afterTime `diffUTCTime` beforeTime | ||
allocated = RTS.allocated_bytes afterRTS - RTS.allocated_bytes beforeRTS | ||
|
||
groupsOfN :: Int -> V.Vector a -> [V.Vector a] | ||
groupsOfN n xs | V.null xs = [] | ||
| otherwise = V.take n xs : groupsOfN n (V.drop n xs) | ||
iops, apio :: Int | ||
iops = round (fromIntegral totalOps / realToFrac elapsed :: Double) | ||
apio = round (fromIntegral allocated / realToFrac totalOps :: Double) | ||
|
||
randomPermute :: Ord a => StdGen -> [a] -> [a] | ||
randomPermute rng0 xs0 = | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -34,7 +34,7 @@ import Control.Concurrent.MVar | |||||
import Control.Concurrent.QSemN | ||||||
import Control.Concurrent.Chan | ||||||
import Control.Exception (mask_, throw, ArrayException(UndefinedElement), | ||||||
finally, assert, throwIO) | ||||||
finally, assert, throwIO, onException) | ||||||
import System.IO.Error | ||||||
import GHC.IO.Exception (IOErrorType(ResourceVanished, InvalidArgument)) | ||||||
|
||||||
|
@@ -187,93 +187,109 @@ data IOOp m = IOOpRead !Fd !FileOffset !(MutableByteArray (PrimState m)) !Int ! | |||||
-- at least the target number in flight at once. | ||||||
-- | ||||||
submitIO :: IOCtx -> V.Vector (IOOp IO) -> IO (VU.Vector IOResult) | ||||||
submitIO IOCtx { | ||||||
ioctxBatchSizeLimit', | ||||||
ioctxQSemN, | ||||||
ioctxURing, | ||||||
ioctxChanIOBatch, | ||||||
ioctxChanIOBatchIx | ||||||
} | ||||||
ioops | ||||||
| iobatchOpCount == 0 = return VU.empty | ||||||
|
||||||
| iobatchOpCount > ioctxBatchSizeLimit' = do | ||||||
-- create completion mvars for each sub-batch | ||||||
batches <- forM (chunksOf ioctxBatchSizeLimit' ioops) $ \b -> do | ||||||
submitIO ioctx@IOCtx {ioctxBatchSizeLimit'} !ioops | ||||||
-- Typical small case. We can be more direct. | ||||||
| V.length ioops > 0 && V.length ioops <= ioctxBatchSizeLimit' | ||||||
= mask_ $ do | ||||||
iobatchCompletion <- newEmptyMVar | ||||||
prepAndSubmitIOBatch ioctx ioops iobatchCompletion | ||||||
takeMVar iobatchCompletion | ||||||
|
||||||
submitIO ioctx@IOCtx {ioctxBatchSizeLimit'} !ioops0 = | ||||||
-- General case. Needs multiple batches and combining results. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. or it needs no batch at all, if |
||||||
mask_ $ do | ||||||
iobatchCompletions <- prepAndSubmitIOBatches [] ioops0 | ||||||
awaitIOBatches iobatchCompletions | ||||||
where | ||||||
prepAndSubmitIOBatches acc !ioops | ||||||
| V.null ioops = return acc | ||||||
| otherwise = do | ||||||
let batch = V.take ioctxBatchSizeLimit' ioops | ||||||
iobatchCompletion <- newEmptyMVar | ||||||
return (b, iobatchCompletion) | ||||||
|
||||||
forM_ batches $ \(batch, iobatchCompletion) -> do | ||||||
let !iobatchOpCount' = V.length batch | ||||||
waitQSemN ioctxQSemN iobatchOpCount' | ||||||
iobatchIx <- readChan ioctxChanIOBatchIx | ||||||
let iobatchKeepAlives = batch | ||||||
writeChan ioctxChanIOBatch | ||||||
IOBatch { | ||||||
iobatchIx, | ||||||
iobatchOpCount = iobatchOpCount', | ||||||
iobatchCompletion, | ||||||
iobatchKeepAlives | ||||||
} | ||||||
|
||||||
submitBatch iobatchIx batch | ||||||
|
||||||
waitAndCombine batches | ||||||
|
||||||
| otherwise = do | ||||||
waitQSemN ioctxQSemN iobatchOpCount | ||||||
iobatchIx <- readChan ioctxChanIOBatchIx | ||||||
iobatchCompletion <- newEmptyMVar | ||||||
let iobatchKeepAlives = ioops | ||||||
prepAndSubmitIOBatch ioctx batch iobatchCompletion | ||||||
prepAndSubmitIOBatches (iobatchCompletion:acc) | ||||||
(V.drop ioctxBatchSizeLimit' ioops) | ||||||
|
||||||
awaitIOBatches iobatchCompletions = | ||||||
VU.concat <$> mapM takeMVar (reverse iobatchCompletions) | ||||||
Comment on lines
+213
to
+214
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was giving some thought to whether it is okay that
Comment on lines
+213
to
+214
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Int terms of allocations/number of allocated objects, would reverse on the vector be better than reverse on the list? |
||||||
|
||||||
-- Must be called with async exceptions masked. See mask_ above in submitIO. | ||||||
prepAndSubmitIOBatch :: IOCtx | ||||||
-> V.Vector (IOOp IO) | ||||||
-> MVar (VU.Vector IOResult) | ||||||
-> IO () | ||||||
prepAndSubmitIOBatch IOCtx { | ||||||
ioctxQSemN, | ||||||
ioctxURing, | ||||||
ioctxChanIOBatch, | ||||||
ioctxChanIOBatchIx | ||||||
} | ||||||
!iobatch !iobatchCompletion = do | ||||||
let !iobatchOpCount = V.length iobatch | ||||||
-- We're called with async exceptions masked, but 'waitQSemN' can block and | ||||||
-- receive exceptions. That's ok. But once we acquire the semaphore | ||||||
Comment on lines
+229
to
+230
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because |
||||||
-- quantitiy we must eventully return it. There's two cases for returning: | ||||||
-- 1. we successfully submit the I/O and pass the information off to the | ||||||
-- completionThread which will signal the semaphore upon completion, or | ||||||
-- 2. we encounter an exception here in which case we need to undo the | ||||||
-- semaphore acquisition. | ||||||
-- For the latter case we use 'onException'. We also need to obtain a | ||||||
-- batch index. This should never block because we have as many tokens as | ||||||
-- QSemN initial quantitiy, and the batch ix is released before the QSemN | ||||||
-- is signaled in the completionThread. | ||||||
waitQSemN ioctxQSemN iobatchOpCount | ||||||
!iobatchIx <- readChan ioctxChanIOBatchIx | ||||||
-- Thus undoing the acquisition involves releasing the batch index and | ||||||
-- semaphore quantitiy (which themselves cannot blocks). | ||||||
let undoAcquisition = do writeChan ioctxChanIOBatchIx iobatchIx | ||||||
signalQSemN ioctxQSemN iobatchOpCount | ||||||
flip onException undoAcquisition $ do | ||||||
-- We can receive an async exception if takeMVar blocks. That's ok, we'll | ||||||
-- undo the acquisition. | ||||||
muring <- takeMVar ioctxURing | ||||||
-- From here on we cannot receive any async exceptions, because we do not | ||||||
-- do any more blocking operations. But we can encounter sync exceptions, | ||||||
-- so we may still need to release the mvar on exception. | ||||||
flip onException (putMVar ioctxURing muring) $ do | ||||||
uring <- maybe (throwIO closed) pure muring | ||||||
V.iforM_ iobatch $ \ioopix ioop -> case ioop of | ||||||
IOOpRead fd off buf bufOff cnt -> do | ||||||
guardPinned buf | ||||||
URing.prepareRead uring fd off | ||||||
(mutableByteArrayContents buf `plusPtr` bufOff) | ||||||
cnt (packIOOpId iobatchIx ioopix) | ||||||
IOOpWrite fd off buf bufOff cnt -> do | ||||||
guardPinned buf | ||||||
URing.prepareWrite uring fd off | ||||||
(mutableByteArrayContents buf `plusPtr` bufOff) | ||||||
cnt (packIOOpId iobatchIx ioopix) | ||||||
-- TODO: if submitIO or guardPinned throws an exception, we need to | ||||||
-- undo / clear the SQEs that we prepared. | ||||||
URing.submitIO uring | ||||||
|
||||||
-- More async exception safety: we want to inform the completionThread | ||||||
-- /if and only if/ we successfully submitted a bathc of IO. So now that | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
-- we have submitted a batch we need to inform the completionThread | ||||||
-- without interruptions. We're still masked, but writeChan does not | ||||||
-- throw exceptions and never blocks (unbounded channel) so we should | ||||||
-- not get async or sync exceptions. | ||||||
writeChan ioctxChanIOBatch | ||||||
IOBatch { | ||||||
iobatchIx, | ||||||
iobatchOpCount, | ||||||
iobatchCompletion, | ||||||
iobatchKeepAlives | ||||||
iobatchKeepAlives = iobatch | ||||||
} | ||||||
submitBatch iobatchIx ioops | ||||||
takeMVar iobatchCompletion | ||||||
putMVar ioctxURing muring | ||||||
where | ||||||
!iobatchOpCount = V.length ioops | ||||||
|
||||||
guardPinned mba = unless (isMutableByteArrayPinned mba) $ throwIO notPinned | ||||||
closed = mkIOError ResourceVanished "IOCtx closed" Nothing Nothing | ||||||
notPinned = mkIOError InvalidArgument "MutableByteArray is unpinned" Nothing Nothing | ||||||
|
||||||
{-# INLINE submitBatch #-} | ||||||
submitBatch iobatchIx batch = | ||||||
withMVar ioctxURing $ \case | ||||||
Nothing -> throwIO closed | ||||||
Just uring -> do | ||||||
V.iforM_ batch $ \ioopix ioop -> | ||||||
let !ioopid = packIOOpId iobatchIx ioopix in | ||||||
case ioop of | ||||||
IOOpRead fd off buf bufOff cnt -> do | ||||||
guardPinned buf | ||||||
URing.prepareRead uring fd off | ||||||
(mutableByteArrayContents buf `plusPtr` bufOff) | ||||||
cnt ioopid | ||||||
IOOpWrite fd off buf bufOff cnt -> do | ||||||
guardPinned buf | ||||||
URing.prepareWrite uring fd off | ||||||
(mutableByteArrayContents buf `plusPtr` bufOff) | ||||||
cnt ioopid | ||||||
URing.submitIO uring | ||||||
|
||||||
waitAndCombine :: [(a, MVar (VU.Vector IOResult))] | ||||||
-> IO (VU.Vector IOResult) | ||||||
waitAndCombine xs = VU.concat <$!> forM xs (takeMVar . snd) | ||||||
|
||||||
chunksOf :: Int -> V.Vector a -> [V.Vector a] | ||||||
chunksOf n xs | ||||||
| V.length xs == 0 = [] | ||||||
| otherwise = V.take n xs : chunksOf n (V.drop n xs) | ||||||
|
||||||
data IOBatch = IOBatch { | ||||||
iobatchIx :: !IOBatchIx, | ||||||
iobatchOpCount :: !Int, | ||||||
iobatchCompletion :: MVar (VU.Vector IOResult), | ||||||
iobatchCompletion :: !(MVar (VU.Vector IOResult)), | ||||||
-- | The list of I\/O operations is sent to the completion | ||||||
-- thread so that the buffers are kept alive while the kernel | ||||||
-- is using them. | ||||||
|
@@ -341,6 +357,9 @@ completionThread !uring !done !maxc !qsem !chaniobatch !chaniobatchix = do | |||||
VM.write keepAlives iobatchix invalidEntry | ||||||
result' <- VU.unsafeFreeze result | ||||||
putMVar completion (result' :: VU.Vector IOResult) | ||||||
-- Important: release batch index _before_ we signal the QSemN. | ||||||
-- The other side needs the guarantee that the index is available | ||||||
-- once it acquires the QSemN. | ||||||
writeChan chaniobatchix iobatchix | ||||||
let !qrelease = VU.length result' | ||||||
signalQSemN qsem qrelease | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do these new INLINE/NOINLINE pragmas in 6217dab change benchmark results in any way?