Skip to content

Commit

Permalink
Merge pull request #11 from well-typed/jdral/vector-submitio
Browse files Browse the repository at this point in the history
Use vectors in `submitIO`
  • Loading branch information
dcoutts authored Apr 4, 2024
2 parents c07e1b0 + a19a48b commit fdb1a51
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 131 deletions.
182 changes: 74 additions & 108 deletions System/IO/BlockIO.hs
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE ViewPatterns #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE TypeSynonymInstances #-}

module System.IO.BlockIO (

Expand All @@ -26,15 +20,12 @@ module System.IO.BlockIO (

) where

import Data.Word
import Data.Int
import Data.Bits
import Data.Ix
import Data.Array.IArray
import Data.Array.IO
import Data.Array.Unboxed
import Data.Coerce
import Data.Primitive.ByteArray
import qualified Data.Vector as V
import qualified Data.Vector.Mutable as VM
import qualified Data.Vector.Unboxed as VU
import qualified Data.Vector.Unboxed.Mutable as VUM

import Control.Monad
import Control.Monad.Primitive
Expand All @@ -47,13 +38,15 @@ import Control.Exception (mask_, throw, ArrayException(UndefinedElement),
import System.IO.Error
import GHC.IO.Exception (IOErrorType(ResourceVanished, InvalidArgument))

import Foreign.Ptr
import Foreign.Ptr (plusPtr)
import Foreign.C.Error (Errno(..))
import Foreign.C.Types (CInt(..), CSize)
import System.Posix.Types (Fd, FileOffset, ByteCount)
#if MIN_VERSION_base(4,16,0)
import System.Posix.Internals (hostIsThreaded)
#endif

import qualified System.IO.BlockIO.URing as URing
import System.IO.BlockIO.URing (IOResult(..))


-- | IO context: a handle used by threads submitting IO batches.
Expand Down Expand Up @@ -96,7 +89,9 @@ defaultIOCtxParams =

initIOCtx :: IOCtxParams -> IO IOCtx
initIOCtx IOCtxParams {ioctxBatchSizeLimit, ioctxConcurrencyLimit} = do
#if MIN_VERSION_base(4,16,0)
unless hostIsThreaded $ throwIO rtrsNotThreaded
#endif
mask_ $ do
ioctxQSemN <- newQSemN ioctxConcurrencyLimit
uring <- URing.setupURing (URing.URingParams ioctxBatchSizeLimit)
Expand All @@ -113,8 +108,7 @@ initIOCtx IOCtxParams {ioctxBatchSizeLimit, ioctxConcurrencyLimit} = do
ioctxChanIOBatch
ioctxChanIOBatchIx
let initialBatchIxs :: [IOBatchIx]
initialBatchIxs =
[IOBatchIx 0 .. IOBatchIx (fromIntegral (ioctxConcurrencyLimit-1))]
initialBatchIxs = [0 .. ioctxConcurrencyLimit-1]
writeList2Chan ioctxChanIOBatchIx initialBatchIxs
return IOCtx {
ioctxQSemN,
Expand All @@ -123,13 +117,15 @@ initIOCtx IOCtxParams {ioctxBatchSizeLimit, ioctxConcurrencyLimit} = do
ioctxChanIOBatchIx,
ioctxCloseSync
}
#if MIN_VERSION_base(4,16,0)
where
rtrsNotThreaded =
mkIOError
illegalOperationErrorType
"The run-time system should be threaded, make sure you are passing the -threaded flag"
Nothing
Nothing
#endif

closeIOCtx :: IOCtx -> IO ()
closeIOCtx IOCtx {ioctxURing, ioctxCloseSync} = do
Expand All @@ -149,30 +145,6 @@ closeIOCtx IOCtx {ioctxURing, ioctxCloseSync} = do
data IOOp m = IOOpRead !Fd !FileOffset !(MutableByteArray (PrimState m)) !Int !ByteCount
| IOOpWrite !Fd !FileOffset !(MutableByteArray (PrimState m)) !Int !ByteCount

newtype IOResult = IOResult_ URing.IOResult

{-# COMPLETE IOResult, IOError #-}

pattern IOResult :: ByteCount -> IOResult
pattern IOResult c <- (viewIOResult -> Just c)
where
IOResult count = IOResult_ ((fromIntegral :: CSize -> CInt) count)

pattern IOError :: Errno -> IOResult
pattern IOError e <- (viewIOError -> Just e)
where
IOError (Errno e) = IOResult_ (-e)

viewIOResult :: IOResult -> Maybe ByteCount
viewIOResult (IOResult_ c)
| c >= 0 = Just ((fromIntegral :: CInt -> CSize) c)
| otherwise = Nothing

viewIOError :: IOResult -> Maybe Errno
viewIOError (IOResult_ e)
| e < 0 = Just (Errno e)
| otherwise = Nothing


-- | Submit a batch of I\/O operations, and wait for them all to complete.
-- The sequence of results matches up with the sequence of operations.
Expand Down Expand Up @@ -210,17 +182,16 @@ viewIOError (IOResult_ e)
-- the target depth, fill it up to double again. This way there is always
-- at least the target number in flight at once.
--
submitIO :: IOCtx -> [IOOp IO] -> IO [IOResult]
submitIO :: IOCtx -> V.Vector (IOOp IO) -> IO (VU.Vector IOResult)
submitIO IOCtx {
ioctxQSemN,
ioctxURing,
ioctxChanIOBatch,
ioctxChanIOBatchIx
}
ioops = do
let iobatchOpCount :: Word32
!iobatchOpCount = fromIntegral (length ioops)
waitQSemN ioctxQSemN (fromIntegral iobatchOpCount)
let !iobatchOpCount = V.length ioops
waitQSemN ioctxQSemN iobatchOpCount
iobatchIx <- readChan ioctxChanIOBatchIx
iobatchCompletion <- newEmptyMVar
let iobatchKeepAlives = ioops
Expand All @@ -235,24 +206,24 @@ submitIO IOCtx {
Nothing -> throwIO closed
Just uring -> do
-- print ("submitIO", iobatchOpCount)
sequence_
[ --print ioop >>
case ioop of
IOOpRead fd off buf bufOff cnt -> do
guardPinned buf
URing.prepareRead uring fd off
(mutableByteArrayContents buf `plusPtr` bufOff)
cnt ioopid
IOOpWrite fd off buf bufOff cnt -> do
guardPinned buf
URing.prepareWrite uring fd off
(mutableByteArrayContents buf `plusPtr` bufOff)
cnt ioopid
| (ioop, ioopix) <- zip ioops [IOOpIx 0 ..]
, let !ioopid = packIOOpId iobatchIx ioopix ]
V.iforM_ ioops $ \ioopix ioop ->
let !ioopid = packIOOpId iobatchIx ioopix
in
--print ioop >>
case ioop of
IOOpRead fd off buf bufOff cnt -> do
guardPinned buf
URing.prepareRead uring fd off
(mutableByteArrayContents buf `plusPtr` bufOff)
cnt ioopid
IOOpWrite fd off buf bufOff cnt -> do
guardPinned buf
URing.prepareWrite uring fd off
(mutableByteArrayContents buf `plusPtr` bufOff)
cnt ioopid
URing.submitIO uring
-- print ("submitIO", "submitting done")
map (IOResult_ . coerce) . elems <$> takeMVar iobatchCompletion
takeMVar iobatchCompletion
where
closed = mkIOError ResourceVanished "IOCtx closed" Nothing Nothing
guardPinned mba = do
Expand All @@ -261,36 +232,33 @@ submitIO IOCtx {

data IOBatch = IOBatch {
iobatchIx :: !IOBatchIx,
iobatchOpCount :: !Word32,
iobatchCompletion :: MVar (UArray IOOpIx Int32),
iobatchOpCount :: !Int,
iobatchCompletion :: MVar (VU.Vector IOResult),
-- | The list of I\/O operations is sent to the completion
-- thread so that the buffers are kept alive while the kernel
-- is using them.
iobatchKeepAlives :: [IOOp IO]
iobatchKeepAlives :: V.Vector (IOOp IO)
}

newtype IOBatchIx = IOBatchIx Word32
deriving (Eq, Ord, Ix, Enum, Show)

newtype IOOpIx = IOOpIx Word32
deriving (Eq, Ord, Ix, Enum, Show)
type IOBatchIx = Int
type IOOpIx = Int

{-# INLINE packIOOpId #-}
packIOOpId :: IOBatchIx -> IOOpIx -> URing.IOOpId
packIOOpId (IOBatchIx batchix) (IOOpIx opix) =
packIOOpId batchix opix =
URing.IOOpId $ unsafeShiftL (fromIntegral batchix) 32
.|. fromIntegral opix

{-# INLINE unpackIOOpId #-}
unpackIOOpId :: URing.IOOpId -> (IOBatchIx, IOOpIx)
unpackIOOpId (URing.IOOpId w64) =
(IOBatchIx batchix, IOOpIx opix)
(batchix, opix)
where
batchix :: Word32
batchix :: Int
batchix = fromIntegral (unsafeShiftR w64 32)

opix :: Word32
opix = fromIntegral w64
opix :: Int
opix = fromIntegral (w64 .&. 0xffffffff)

completionThread :: URing.URing
-> MVar ()
Expand All @@ -299,44 +267,42 @@ completionThread :: URing.URing
-> Chan IOBatch
-> Chan IOBatchIx
-> IO ()
completionThread uring done maxc qsem chaniobatch chaniobatchix = do
let iobatchixBounds :: (IOBatchIx, IOBatchIx)
iobatchixBounds = (IOBatchIx 0, IOBatchIx (fromIntegral maxc-1))
counts <- newArray iobatchixBounds (-1)
results <- newArray iobatchixBounds invalidEntry
completions <- newArray iobatchixBounds invalidEntry
keepAlives <- newArray iobatchixBounds invalidEntry
completionThread !uring !done !maxc !qsem !chaniobatch !chaniobatchix = do
counts <- VUM.replicate maxc (-1)
results <- VM.replicate maxc invalidEntry
completions <- VM.replicate maxc invalidEntry
keepAlives <- VM.replicate maxc invalidEntry
collectCompletion counts results completions keepAlives
`finally` putMVar done ()
where
collectCompletion :: IOUArray IOBatchIx Int
-> IOArray IOBatchIx (IOUArray IOOpIx Int32)
-> IOArray IOBatchIx (MVar (UArray IOOpIx Int32))
-> IOArray IOBatchIx [IOOp IO]
collectCompletion :: VUM.MVector RealWorld Int
-> VM.MVector RealWorld (VUM.MVector RealWorld IOResult)
-> VM.MVector RealWorld (MVar (VU.Vector IOResult))
-> VM.MVector RealWorld (V.Vector (IOOp IO))
-> IO ()
collectCompletion counts results completions keepAlives = do
collectCompletion !counts !results !completions !keepAlives = do
iocompletion <- URing.awaitIO uring
let (URing.IOCompletion ioopid iores) = iocompletion
let (URing.IOCompletion !ioopid !iores) = iocompletion
unless (ioopid == URing.IOOpId maxBound) $ do
let (iobatchix, ioopix) = unpackIOOpId ioopid
let (!iobatchix, !ioopix) = unpackIOOpId ioopid
count <- do
c <- readArray counts iobatchix
c <- VUM.read counts iobatchix
if c < 0 then collectIOBatches iobatchix
else return c
assert (count > 0) (return ())
writeArray counts iobatchix (count-1)
result <- readArray results iobatchix
writeArray result ioopix (coerce iores)
VUM.write counts iobatchix (count-1)
result <- VM.read results iobatchix
VUM.write result ioopix iores
when (count == 1) $ do
completion <- readArray completions iobatchix
writeArray counts iobatchix (-1)
writeArray results iobatchix invalidEntry
writeArray completions iobatchix invalidEntry
writeArray keepAlives iobatchix invalidEntry
result' <- freeze result
putMVar completion (result' :: UArray IOOpIx Int32)
completion <- VM.read completions iobatchix
VUM.write counts iobatchix (-1)
VM.write results iobatchix invalidEntry
VM.write completions iobatchix invalidEntry
VM.write keepAlives iobatchix invalidEntry
result' <- VU.unsafeFreeze result
putMVar completion (result' :: VU.Vector IOResult)
writeChan chaniobatchix iobatchix
let !qrelease = rangeSize (bounds result')
let !qrelease = VU.length result'
signalQSemN qsem qrelease
collectCompletion counts results completions keepAlives

Expand All @@ -348,20 +314,20 @@ completionThread uring done maxc qsem chaniobatch chaniobatchix = do
-- reset count to -1, and result entries to undefined
where
collectIOBatches :: IOBatchIx -> IO Int
collectIOBatches iobatchixNeeded = do
collectIOBatches !iobatchixNeeded = do
IOBatch{
iobatchIx,
iobatchOpCount,
iobatchCompletion,
iobatchKeepAlives
} <- readChan chaniobatch
oldcount <- readArray counts iobatchIx
oldcount <- VUM.read counts iobatchIx
assert (oldcount == (-1)) (return ())
writeArray counts iobatchIx (fromIntegral iobatchOpCount)
result <- newArray (IOOpIx 0, IOOpIx (iobatchOpCount-1)) (-1)
writeArray results iobatchIx result
writeArray completions iobatchIx iobatchCompletion
writeArray keepAlives iobatchIx iobatchKeepAlives
VUM.write counts iobatchIx (fromIntegral iobatchOpCount)
result <- VUM.replicate iobatchOpCount (IOResult (-1))
VM.write results iobatchIx result
VM.write completions iobatchIx iobatchCompletion
VM.write keepAlives iobatchIx iobatchKeepAlives
if iobatchIx == iobatchixNeeded
then return $! fromIntegral iobatchOpCount
else collectIOBatches iobatchixNeeded
Expand Down
Loading

0 comments on commit fdb1a51

Please sign in to comment.