Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use vectors in submitIO #11

Merged
merged 5 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 74 additions & 108 deletions System/IO/BlockIO.hs
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE ViewPatterns #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE TypeSynonymInstances #-}

module System.IO.BlockIO (

Expand All @@ -26,15 +20,12 @@ module System.IO.BlockIO (

) where

import Data.Word
import Data.Int
import Data.Bits
import Data.Ix
import Data.Array.IArray
import Data.Array.IO
import Data.Array.Unboxed
import Data.Coerce
import Data.Primitive.ByteArray
import qualified Data.Vector as V
import qualified Data.Vector.Mutable as VM
import qualified Data.Vector.Unboxed as VU
import qualified Data.Vector.Unboxed.Mutable as VUM

import Control.Monad
import Control.Monad.Primitive
Expand All @@ -47,13 +38,15 @@ import Control.Exception (mask_, throw, ArrayException(UndefinedElement),
import System.IO.Error
import GHC.IO.Exception (IOErrorType(ResourceVanished, InvalidArgument))

import Foreign.Ptr
import Foreign.Ptr (plusPtr)
import Foreign.C.Error (Errno(..))
import Foreign.C.Types (CInt(..), CSize)
import System.Posix.Types (Fd, FileOffset, ByteCount)
#if MIN_VERSION_base(4,16,0)
import System.Posix.Internals (hostIsThreaded)
#endif

import qualified System.IO.BlockIO.URing as URing
import System.IO.BlockIO.URing (IOResult(..))


-- | IO context: a handle used by threads submitting IO batches.
Expand Down Expand Up @@ -96,7 +89,9 @@ defaultIOCtxParams =

initIOCtx :: IOCtxParams -> IO IOCtx
initIOCtx IOCtxParams {ioctxBatchSizeLimit, ioctxConcurrencyLimit} = do
#if MIN_VERSION_base(4,16,0)
unless hostIsThreaded $ throwIO rtrsNotThreaded
#endif
mask_ $ do
ioctxQSemN <- newQSemN ioctxConcurrencyLimit
uring <- URing.setupURing (URing.URingParams ioctxBatchSizeLimit)
Expand All @@ -113,8 +108,7 @@ initIOCtx IOCtxParams {ioctxBatchSizeLimit, ioctxConcurrencyLimit} = do
ioctxChanIOBatch
ioctxChanIOBatchIx
let initialBatchIxs :: [IOBatchIx]
initialBatchIxs =
[IOBatchIx 0 .. IOBatchIx (fromIntegral (ioctxConcurrencyLimit-1))]
initialBatchIxs = [0 .. ioctxConcurrencyLimit-1]
writeList2Chan ioctxChanIOBatchIx initialBatchIxs
return IOCtx {
ioctxQSemN,
Expand All @@ -123,13 +117,15 @@ initIOCtx IOCtxParams {ioctxBatchSizeLimit, ioctxConcurrencyLimit} = do
ioctxChanIOBatchIx,
ioctxCloseSync
}
#if MIN_VERSION_base(4,16,0)
where
rtrsNotThreaded =
mkIOError
illegalOperationErrorType
"The run-time system should be threaded, make sure you are passing the -threaded flag"
Nothing
Nothing
#endif

closeIOCtx :: IOCtx -> IO ()
closeIOCtx IOCtx {ioctxURing, ioctxCloseSync} = do
Expand All @@ -149,30 +145,6 @@ closeIOCtx IOCtx {ioctxURing, ioctxCloseSync} = do
data IOOp m = IOOpRead !Fd !FileOffset !(MutableByteArray (PrimState m)) !Int !ByteCount
| IOOpWrite !Fd !FileOffset !(MutableByteArray (PrimState m)) !Int !ByteCount

newtype IOResult = IOResult_ URing.IOResult

{-# COMPLETE IOResult, IOError #-}

pattern IOResult :: ByteCount -> IOResult
pattern IOResult c <- (viewIOResult -> Just c)
where
IOResult count = IOResult_ ((fromIntegral :: CSize -> CInt) count)

pattern IOError :: Errno -> IOResult
pattern IOError e <- (viewIOError -> Just e)
where
IOError (Errno e) = IOResult_ (-e)

viewIOResult :: IOResult -> Maybe ByteCount
viewIOResult (IOResult_ c)
| c >= 0 = Just ((fromIntegral :: CInt -> CSize) c)
| otherwise = Nothing

viewIOError :: IOResult -> Maybe Errno
viewIOError (IOResult_ e)
| e < 0 = Just (Errno e)
| otherwise = Nothing


-- | Submit a batch of I\/O operations, and wait for them all to complete.
-- The sequence of results matches up with the sequence of operations.
Expand Down Expand Up @@ -210,17 +182,16 @@ viewIOError (IOResult_ e)
-- the target depth, fill it up to double again. This way there is always
-- at least the target number in flight at once.
--
submitIO :: IOCtx -> [IOOp IO] -> IO [IOResult]
submitIO :: IOCtx -> V.Vector (IOOp IO) -> IO (VU.Vector IOResult)
submitIO IOCtx {
ioctxQSemN,
ioctxURing,
ioctxChanIOBatch,
ioctxChanIOBatchIx
}
ioops = do
let iobatchOpCount :: Word32
!iobatchOpCount = fromIntegral (length ioops)
waitQSemN ioctxQSemN (fromIntegral iobatchOpCount)
let !iobatchOpCount = V.length ioops
waitQSemN ioctxQSemN iobatchOpCount
iobatchIx <- readChan ioctxChanIOBatchIx
iobatchCompletion <- newEmptyMVar
let iobatchKeepAlives = ioops
Expand All @@ -235,24 +206,24 @@ submitIO IOCtx {
Nothing -> throwIO closed
Just uring -> do
-- print ("submitIO", iobatchOpCount)
sequence_
[ --print ioop >>
case ioop of
IOOpRead fd off buf bufOff cnt -> do
guardPinned buf
URing.prepareRead uring fd off
(mutableByteArrayContents buf `plusPtr` bufOff)
cnt ioopid
IOOpWrite fd off buf bufOff cnt -> do
guardPinned buf
URing.prepareWrite uring fd off
(mutableByteArrayContents buf `plusPtr` bufOff)
cnt ioopid
| (ioop, ioopix) <- zip ioops [IOOpIx 0 ..]
, let !ioopid = packIOOpId iobatchIx ioopix ]
V.iforM_ ioops $ \ioopix ioop ->
let !ioopid = packIOOpId iobatchIx ioopix
in
--print ioop >>
case ioop of
IOOpRead fd off buf bufOff cnt -> do
guardPinned buf
URing.prepareRead uring fd off
(mutableByteArrayContents buf `plusPtr` bufOff)
cnt ioopid
IOOpWrite fd off buf bufOff cnt -> do
guardPinned buf
URing.prepareWrite uring fd off
(mutableByteArrayContents buf `plusPtr` bufOff)
cnt ioopid
URing.submitIO uring
-- print ("submitIO", "submitting done")
map (IOResult_ . coerce) . elems <$> takeMVar iobatchCompletion
takeMVar iobatchCompletion
where
closed = mkIOError ResourceVanished "IOCtx closed" Nothing Nothing
guardPinned mba = do
Expand All @@ -261,36 +232,33 @@ submitIO IOCtx {

data IOBatch = IOBatch {
iobatchIx :: !IOBatchIx,
iobatchOpCount :: !Word32,
iobatchCompletion :: MVar (UArray IOOpIx Int32),
iobatchOpCount :: !Int,
iobatchCompletion :: MVar (VU.Vector IOResult),
-- | The list of I\/O operations is sent to the completion
-- thread so that the buffers are kept alive while the kernel
-- is using them.
iobatchKeepAlives :: [IOOp IO]
iobatchKeepAlives :: V.Vector (IOOp IO)
}

newtype IOBatchIx = IOBatchIx Word32
deriving (Eq, Ord, Ix, Enum, Show)

newtype IOOpIx = IOOpIx Word32
deriving (Eq, Ord, Ix, Enum, Show)
type IOBatchIx = Int
type IOOpIx = Int

{-# INLINE packIOOpId #-}
packIOOpId :: IOBatchIx -> IOOpIx -> URing.IOOpId
packIOOpId (IOBatchIx batchix) (IOOpIx opix) =
packIOOpId batchix opix =
URing.IOOpId $ unsafeShiftL (fromIntegral batchix) 32
.|. fromIntegral opix

{-# INLINE unpackIOOpId #-}
unpackIOOpId :: URing.IOOpId -> (IOBatchIx, IOOpIx)
unpackIOOpId (URing.IOOpId w64) =
(IOBatchIx batchix, IOOpIx opix)
(batchix, opix)
where
batchix :: Word32
batchix :: Int
batchix = fromIntegral (unsafeShiftR w64 32)

opix :: Word32
opix = fromIntegral w64
opix :: Int
opix = fromIntegral (w64 .&. 0xffffffff)

completionThread :: URing.URing
-> MVar ()
Expand All @@ -299,44 +267,42 @@ completionThread :: URing.URing
-> Chan IOBatch
-> Chan IOBatchIx
-> IO ()
completionThread uring done maxc qsem chaniobatch chaniobatchix = do
let iobatchixBounds :: (IOBatchIx, IOBatchIx)
iobatchixBounds = (IOBatchIx 0, IOBatchIx (fromIntegral maxc-1))
counts <- newArray iobatchixBounds (-1)
results <- newArray iobatchixBounds invalidEntry
completions <- newArray iobatchixBounds invalidEntry
keepAlives <- newArray iobatchixBounds invalidEntry
completionThread !uring !done !maxc !qsem !chaniobatch !chaniobatchix = do
counts <- VUM.replicate maxc (-1)
results <- VM.replicate maxc invalidEntry
completions <- VM.replicate maxc invalidEntry
keepAlives <- VM.replicate maxc invalidEntry
collectCompletion counts results completions keepAlives
`finally` putMVar done ()
where
collectCompletion :: IOUArray IOBatchIx Int
-> IOArray IOBatchIx (IOUArray IOOpIx Int32)
-> IOArray IOBatchIx (MVar (UArray IOOpIx Int32))
-> IOArray IOBatchIx [IOOp IO]
collectCompletion :: VUM.MVector RealWorld Int
-> VM.MVector RealWorld (VUM.MVector RealWorld IOResult)
-> VM.MVector RealWorld (MVar (VU.Vector IOResult))
-> VM.MVector RealWorld (V.Vector (IOOp IO))
-> IO ()
collectCompletion counts results completions keepAlives = do
collectCompletion !counts !results !completions !keepAlives = do
iocompletion <- URing.awaitIO uring
let (URing.IOCompletion ioopid iores) = iocompletion
let (URing.IOCompletion !ioopid !iores) = iocompletion
unless (ioopid == URing.IOOpId maxBound) $ do
let (iobatchix, ioopix) = unpackIOOpId ioopid
let (!iobatchix, !ioopix) = unpackIOOpId ioopid
count <- do
c <- readArray counts iobatchix
c <- VUM.read counts iobatchix
if c < 0 then collectIOBatches iobatchix
else return c
assert (count > 0) (return ())
writeArray counts iobatchix (count-1)
result <- readArray results iobatchix
writeArray result ioopix (coerce iores)
VUM.write counts iobatchix (count-1)
result <- VM.read results iobatchix
VUM.write result ioopix iores
when (count == 1) $ do
completion <- readArray completions iobatchix
writeArray counts iobatchix (-1)
writeArray results iobatchix invalidEntry
writeArray completions iobatchix invalidEntry
writeArray keepAlives iobatchix invalidEntry
result' <- freeze result
putMVar completion (result' :: UArray IOOpIx Int32)
completion <- VM.read completions iobatchix
VUM.write counts iobatchix (-1)
VM.write results iobatchix invalidEntry
VM.write completions iobatchix invalidEntry
VM.write keepAlives iobatchix invalidEntry
result' <- VU.unsafeFreeze result
putMVar completion (result' :: VU.Vector IOResult)
writeChan chaniobatchix iobatchix
let !qrelease = rangeSize (bounds result')
let !qrelease = VU.length result'
signalQSemN qsem qrelease
collectCompletion counts results completions keepAlives

Expand All @@ -348,20 +314,20 @@ completionThread uring done maxc qsem chaniobatch chaniobatchix = do
-- reset count to -1, and result entries to undefined
where
collectIOBatches :: IOBatchIx -> IO Int
collectIOBatches iobatchixNeeded = do
collectIOBatches !iobatchixNeeded = do
IOBatch{
iobatchIx,
iobatchOpCount,
iobatchCompletion,
iobatchKeepAlives
} <- readChan chaniobatch
oldcount <- readArray counts iobatchIx
oldcount <- VUM.read counts iobatchIx
assert (oldcount == (-1)) (return ())
writeArray counts iobatchIx (fromIntegral iobatchOpCount)
result <- newArray (IOOpIx 0, IOOpIx (iobatchOpCount-1)) (-1)
writeArray results iobatchIx result
writeArray completions iobatchIx iobatchCompletion
writeArray keepAlives iobatchIx iobatchKeepAlives
VUM.write counts iobatchIx (fromIntegral iobatchOpCount)
result <- VUM.replicate iobatchOpCount (IOResult (-1))
VM.write results iobatchIx result
VM.write completions iobatchIx iobatchCompletion
VM.write keepAlives iobatchIx iobatchKeepAlives
if iobatchIx == iobatchixNeeded
then return $! fromIntegral iobatchOpCount
else collectIOBatches iobatchixNeeded
Expand Down
Loading
Loading