Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jorisdral committed Mar 29, 2024
1 parent c07e1b0 commit e267ac0
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 30 deletions.
39 changes: 20 additions & 19 deletions System/IO/BlockIO.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import Data.Array.IO
import Data.Array.Unboxed
import Data.Coerce
import Data.Primitive.ByteArray
import qualified Data.Vector as V

import Control.Monad
import Control.Monad.Primitive
Expand Down Expand Up @@ -210,7 +211,7 @@ viewIOError (IOResult_ e)
-- the target depth, fill it up to double again. This way there is always
-- at least the target number in flight at once.
--
submitIO :: IOCtx -> [IOOp IO] -> IO [IOResult]
submitIO :: IOCtx -> V.Vector (IOOp IO) -> IO (V.Vector IOResult)
submitIO IOCtx {
ioctxQSemN,
ioctxURing,
Expand All @@ -235,24 +236,24 @@ submitIO IOCtx {
Nothing -> throwIO closed
Just uring -> do
-- print ("submitIO", iobatchOpCount)
sequence_
[ --print ioop >>
case ioop of
IOOpRead fd off buf bufOff cnt -> do
guardPinned buf
URing.prepareRead uring fd off
(mutableByteArrayContents buf `plusPtr` bufOff)
cnt ioopid
IOOpWrite fd off buf bufOff cnt -> do
guardPinned buf
URing.prepareWrite uring fd off
(mutableByteArrayContents buf `plusPtr` bufOff)
cnt ioopid
| (ioop, ioopix) <- zip ioops [IOOpIx 0 ..]
, let !ioopid = packIOOpId iobatchIx ioopix ]
V.iforM_ ioops $ \ioopix ioop ->
let !ioopid = packIOOpId iobatchIx (IOOpIx $ fromIntegral ioopix)
in
--print ioop >>
case ioop of
IOOpRead fd off buf bufOff cnt -> do
guardPinned buf
URing.prepareRead uring fd off
(mutableByteArrayContents buf `plusPtr` bufOff)
cnt ioopid
IOOpWrite fd off buf bufOff cnt -> do
guardPinned buf
URing.prepareWrite uring fd off
(mutableByteArrayContents buf `plusPtr` bufOff)
cnt ioopid
URing.submitIO uring
-- print ("submitIO", "submitting done")
map (IOResult_ . coerce) . elems <$> takeMVar iobatchCompletion
fmap (IOResult_ . coerce) . V.fromList . elems <$> takeMVar iobatchCompletion
where
closed = mkIOError ResourceVanished "IOCtx closed" Nothing Nothing
guardPinned mba = do
Expand All @@ -266,7 +267,7 @@ data IOBatch = IOBatch {
-- | The list of I\/O operations is sent to the completion
-- thread so that the buffers are kept alive while the kernel
-- is using them.
iobatchKeepAlives :: [IOOp IO]
iobatchKeepAlives :: V.Vector (IOOp IO)
}

newtype IOBatchIx = IOBatchIx Word32
Expand Down Expand Up @@ -312,7 +313,7 @@ completionThread uring done maxc qsem chaniobatch chaniobatchix = do
collectCompletion :: IOUArray IOBatchIx Int
-> IOArray IOBatchIx (IOUArray IOOpIx Int32)
-> IOArray IOBatchIx (MVar (UArray IOOpIx Int32))
-> IOArray IOBatchIx [IOOp IO]
-> IOArray IOBatchIx (V.Vector (IOOp IO))
-> IO ()
collectCompletion counts results completions keepAlives = do
iocompletion <- URing.awaitIO uring
Expand Down
19 changes: 9 additions & 10 deletions benchmark/Bench.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import Data.Time
import System.IO.BlockIO
import System.IO.BlockIO.URing hiding (submitIO)
import qualified System.IO.BlockIO.URing as URing
import qualified Data.Vector as V

main :: IO ()
main = do
Expand Down Expand Up @@ -100,18 +101,16 @@ main_highlevel filename = do
ioctxBatchSizeLimit = 64,
ioctxConcurrencyLimit = 64 * 4
}
blocks = zip [0..] (randomPermute rng [0..lastBlock])
blocks = V.fromList $ zip [0..] (randomPermute rng [0..lastBlock])
bracket (initIOCtx params) closeIOCtx $ \ioctx -> do
buf <- newPinnedByteArray (4096 * nbufs)

before <- getCurrentTime
forConcurrently_ (groupsOfN 32 blocks) $ \batch ->
submitIO ioctx
[ IOOpRead fd blockoff buf bufOff 4096
| (i, block) <- batch
, let bufOff = (i `mod` nbufs) * 4096
blockoff = fromIntegral (block * 4096)
]
submitIO ioctx $ flip fmap batch $ \ (i, block) ->
let bufOff = (i `mod` nbufs) * 4096
blockoff = fromIntegral (block * 4096)
in IOOpRead fd blockoff buf bufOff 4096
after <- getCurrentTime
let total = lastBlock + 1
report before after total
Expand All @@ -127,9 +126,9 @@ report before after total = do
iops :: Int
iops = round (fromIntegral total / realToFrac elapsed :: Double)

groupsOfN :: Int -> [a] -> [[a]]
groupsOfN _ [] = []
groupsOfN n xs = take n xs : groupsOfN n (drop n xs)
groupsOfN :: Int -> V.Vector a -> [V.Vector a]
groupsOfN n xs | V.null xs = []
| otherwise = V.take n xs : groupsOfN n (V.drop n xs)

randomPermute :: Ord a => StdGen -> [a] -> [a]
randomPermute rng0 xs0 =
Expand Down
5 changes: 4 additions & 1 deletion blockio-uring.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ library

build-depends:
, array ^>=0.5
, base >=4.16 && <4.20
, base >=4.16 && <4.20
, primitive ^>=0.9
, unix ^>=2.8
, vector ^>=0.13

pkgconfig-depends: liburing >=2.0 && <2.7
default-language: Haskell2010
Expand All @@ -62,6 +63,7 @@ benchmark bench
, primitive
, random
, time
, vector
, unix

pkgconfig-depends: liburing
Expand All @@ -82,6 +84,7 @@ test-suite test
, base >=4.16 && <4.20
, primitive
, tasty
, vector
, tasty-hunit

pkgconfig-depends: liburing
Expand Down

0 comments on commit e267ac0

Please sign in to comment.