Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use MutableByteArray as buffers, add manual keepAlive #8

Merged
merged 2 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 30 additions & 15 deletions System/IO/BlockIO.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ import Data.Array.IArray
import Data.Array.IO
import Data.Array.Unboxed
import Data.Coerce
import Data.Primitive.ByteArray

import Control.Monad
import Control.Monad.Primitive
import Control.Concurrent (forkIO)
import Control.Concurrent.MVar
import Control.Concurrent.QSemN
Expand Down Expand Up @@ -141,9 +143,8 @@ closeIOCtx IOCtx {ioctxURing, ioctxCloseSync} = do
URing.closeURing uring
putMVar ioctxURing Nothing

data IOOp = IOOpRead !Fd !FileOffset !(Ptr Word8) !ByteCount
| IOOpWrite !Fd !FileOffset !(Ptr Word8) !ByteCount
deriving Show
data IOOp m = IOOpRead !Fd !FileOffset !(MutableByteArray (PrimState m)) !Int !ByteCount
| IOOpWrite !Fd !FileOffset !(MutableByteArray (PrimState m)) !Int !ByteCount
jorisdral marked this conversation as resolved.
Show resolved Hide resolved

newtype IOResult = IOResult_ URing.IOResult

Expand Down Expand Up @@ -206,7 +207,7 @@ viewIOError (IOResult_ e)
-- the target depth, fill it up to double again. This way there is always
-- at least the target number in flight at once.
--
submitIO :: IOCtx -> [IOOp] -> IO [IOResult]
submitIO :: IOCtx -> [IOOp IO] -> IO [IOResult]
submitIO IOCtx {
ioctxQSemN,
ioctxURing,
Expand All @@ -219,11 +220,13 @@ submitIO IOCtx {
waitQSemN ioctxQSemN (fromIntegral iobatchOpCount)
iobatchIx <- readChan ioctxChanIOBatchIx
iobatchCompletion <- newEmptyMVar
let iobatchKeepAlives = ioops
writeChan ioctxChanIOBatch
IOBatch {
iobatchIx,
iobatchOpCount,
iobatchCompletion
iobatchCompletion,
iobatchKeepAlives
}
withMVar ioctxURing $ \case
Nothing -> throwIO closed
Expand All @@ -232,11 +235,14 @@ submitIO IOCtx {
sequence_
[ --print ioop >>
case ioop of
IOOpRead fd off buf cnt ->
URing.prepareRead uring fd off buf cnt ioopid

IOOpWrite fd off buf cnt ->
URing.prepareWrite uring fd off buf cnt ioopid
IOOpRead fd off buf bufOff cnt ->
URing.prepareRead uring fd off
(mutableByteArrayContents buf `plusPtr` bufOff)
cnt ioopid
IOOpWrite fd off buf bufOff cnt ->
URing.prepareWrite uring fd off
(mutableByteArrayContents buf `plusPtr` bufOff)
cnt ioopid
| (ioop, ioopix) <- zip ioops [IOOpIx 0 ..]
, let !ioopid = packIOOpId iobatchIx ioopix ]
URing.submitIO uring
Expand All @@ -247,7 +253,11 @@ submitIO IOCtx {
data IOBatch = IOBatch {
iobatchIx :: !IOBatchIx,
iobatchOpCount :: !Word32,
iobatchCompletion :: MVar (UArray IOOpIx Int32)
iobatchCompletion :: MVar (UArray IOOpIx Int32),
-- | The list of I\/O operations is sent to the completion
-- thread so that the buffers are kept alive while the kernel
-- is using them.
iobatchKeepAlives :: [IOOp IO]
}

newtype IOBatchIx = IOBatchIx Word32
Expand Down Expand Up @@ -286,14 +296,16 @@ completionThread uring done maxc qsem chaniobatch chaniobatchix = do
counts <- newArray iobatchixBounds (-1)
results <- newArray iobatchixBounds invalidEntry
completions <- newArray iobatchixBounds invalidEntry
collectCompletion counts results completions
keepAlives <- newArray iobatchixBounds invalidEntry
collectCompletion counts results completions keepAlives
`finally` putMVar done ()
where
collectCompletion :: IOUArray IOBatchIx Int
-> IOArray IOBatchIx (IOUArray IOOpIx Int32)
-> IOArray IOBatchIx (MVar (UArray IOOpIx Int32))
-> IOArray IOBatchIx [IOOp IO]
-> IO ()
collectCompletion counts results completions = do
collectCompletion counts results completions keepAlives = do
iocompletion <- URing.awaitIO uring
let (URing.IOCompletion ioopid iores) = iocompletion
unless (ioopid == URing.IOOpId maxBound) $ do
Expand All @@ -311,12 +323,13 @@ completionThread uring done maxc qsem chaniobatch chaniobatchix = do
writeArray counts iobatchix (-1)
writeArray results iobatchix invalidEntry
writeArray completions iobatchix invalidEntry
writeArray keepAlives iobatchix invalidEntry
result' <- freeze result
putMVar completion (result' :: UArray IOOpIx Int32)
writeChan chaniobatchix iobatchix
let !qrelease = rangeSize (bounds result')
signalQSemN qsem qrelease
collectCompletion counts results completions
collectCompletion counts results completions keepAlives

-- wait for single IO result
-- if the count is positive, decrement and update result array
Expand All @@ -330,14 +343,16 @@ completionThread uring done maxc qsem chaniobatch chaniobatchix = do
IOBatch{
iobatchIx,
iobatchOpCount,
iobatchCompletion
iobatchCompletion,
iobatchKeepAlives
} <- readChan chaniobatch
oldcount <- readArray counts iobatchIx
assert (oldcount == (-1)) (return ())
writeArray counts iobatchIx (fromIntegral iobatchOpCount)
result <- newArray (IOOpIx 0, IOOpIx (iobatchOpCount-1)) (-1)
writeArray results iobatchIx result
writeArray completions iobatchIx iobatchCompletion
writeArray keepAlives iobatchIx iobatchKeepAlives
if iobatchIx == iobatchixNeeded
then return $! fromIntegral iobatchOpCount
else collectIOBatches iobatchixNeeded
Expand Down
31 changes: 16 additions & 15 deletions benchmark/Bench.hs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE BangPatterns #-}
{- HLINT ignore "Use camelCase" -}

module Main (main) where

import Data.Primitive
import qualified Data.Set as Set
import Control.Monad
import Control.Exception
Expand Down Expand Up @@ -100,20 +101,20 @@ main_highlevel filename = do
ioctxConcurrencyLimit = 64 * 4
}
blocks = zip [0..] (randomPermute rng [0..lastBlock])
bracket (initIOCtx params) closeIOCtx $ \ioctx ->
allocaBytes (4096 * nbufs) $ \bufptr -> do

before <- getCurrentTime
forConcurrently_ (groupsOfN 32 blocks) $ \batch ->
submitIO ioctx
[ IOOpRead fd blockoff bufptr' 4096
| (i, block) <- batch
, let bufptr' = bufptr `plusPtr` ((i `mod` nbufs) * 4096)
blockoff = fromIntegral (block * 4096)
]
after <- getCurrentTime
let total = lastBlock + 1
report before after total
bracket (initIOCtx params) closeIOCtx $ \ioctx -> do
buf <- newPinnedByteArray (4096 * nbufs)

before <- getCurrentTime
forConcurrently_ (groupsOfN 32 blocks) $ \batch ->
submitIO ioctx
[ IOOpRead fd blockoff buf bufOff 4096
| (i, block) <- batch
, let bufOff = (i `mod` nbufs) * 4096
blockoff = fromIntegral (block * 4096)
]
after <- getCurrentTime
let total = lastBlock + 1
report before after total

report :: UTCTime -> UTCTime -> Int -> IO ()
report before after total = do
Expand Down
8 changes: 5 additions & 3 deletions blockio-uring.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ library
System.IO.BlockIO.URingFFI

build-depends:
, array ^>=0.5
, base >=4.16 && <4.20
, unix ^>=2.8
, array ^>=0.5
, base >=4.16 && <4.20
, primitive ^>=0.9
, unix ^>=2.8

pkgconfig-depends: liburing
default-language: Haskell2010
Expand All @@ -58,6 +59,7 @@ benchmark bench
, async
, base
, containers
, primitive
, random
, time
, unix
Expand Down
Loading