diff --git a/System/IO/BlockIO.hs b/System/IO/BlockIO.hs index 60630b6..e941564 100644 --- a/System/IO/BlockIO.hs +++ b/System/IO/BlockIO.hs @@ -1,13 +1,7 @@ {-# LANGUAGE BangPatterns #-} +{-# LANGUAGE CPP #-} {-# LANGUAGE NamedFieldPuns #-} -{-# LANGUAGE GeneralizedNewtypeDeriving #-} -{-# LANGUAGE PatternSynonyms #-} -{-# LANGUAGE ViewPatterns #-} {-# LANGUAGE LambdaCase #-} -{-# LANGUAGE StandaloneDeriving #-} -{-# LANGUAGE DerivingStrategies #-} -{-# LANGUAGE MultiParamTypeClasses #-} -{-# LANGUAGE TypeSynonymInstances #-} module System.IO.BlockIO ( @@ -26,15 +20,12 @@ module System.IO.BlockIO ( ) where -import Data.Word -import Data.Int import Data.Bits -import Data.Ix -import Data.Array.IArray -import Data.Array.IO -import Data.Array.Unboxed -import Data.Coerce import Data.Primitive.ByteArray +import qualified Data.Vector as V +import qualified Data.Vector.Mutable as VM +import qualified Data.Vector.Unboxed as VU +import qualified Data.Vector.Unboxed.Mutable as VUM import Control.Monad import Control.Monad.Primitive @@ -47,13 +38,15 @@ import Control.Exception (mask_, throw, ArrayException(UndefinedElement), import System.IO.Error import GHC.IO.Exception (IOErrorType(ResourceVanished, InvalidArgument)) -import Foreign.Ptr +import Foreign.Ptr (plusPtr) import Foreign.C.Error (Errno(..)) -import Foreign.C.Types (CInt(..), CSize) import System.Posix.Types (Fd, FileOffset, ByteCount) +#if MIN_VERSION_base(4,16,0) import System.Posix.Internals (hostIsThreaded) +#endif import qualified System.IO.BlockIO.URing as URing +import System.IO.BlockIO.URing (IOResult(..)) -- | IO context: a handle used by threads submitting IO batches. @@ -96,7 +89,9 @@ defaultIOCtxParams = initIOCtx :: IOCtxParams -> IO IOCtx initIOCtx IOCtxParams {ioctxBatchSizeLimit, ioctxConcurrencyLimit} = do +#if MIN_VERSION_base(4,16,0) unless hostIsThreaded $ throwIO rtrsNotThreaded +#endif mask_ $ do ioctxQSemN <- newQSemN ioctxConcurrencyLimit uring <- URing.setupURing (URing.URingParams ioctxBatchSizeLimit) @@ -113,8 +108,7 @@ initIOCtx IOCtxParams {ioctxBatchSizeLimit, ioctxConcurrencyLimit} = do ioctxChanIOBatch ioctxChanIOBatchIx let initialBatchIxs :: [IOBatchIx] - initialBatchIxs = - [IOBatchIx 0 .. IOBatchIx (fromIntegral (ioctxConcurrencyLimit-1))] + initialBatchIxs = [0 .. ioctxConcurrencyLimit-1] writeList2Chan ioctxChanIOBatchIx initialBatchIxs return IOCtx { ioctxQSemN, @@ -123,6 +117,7 @@ initIOCtx IOCtxParams {ioctxBatchSizeLimit, ioctxConcurrencyLimit} = do ioctxChanIOBatchIx, ioctxCloseSync } +#if MIN_VERSION_base(4,16,0) where rtrsNotThreaded = mkIOError @@ -130,6 +125,7 @@ initIOCtx IOCtxParams {ioctxBatchSizeLimit, ioctxConcurrencyLimit} = do "The run-time system should be threaded, make sure you are passing the -threaded flag" Nothing Nothing +#endif closeIOCtx :: IOCtx -> IO () closeIOCtx IOCtx {ioctxURing, ioctxCloseSync} = do @@ -149,30 +145,6 @@ closeIOCtx IOCtx {ioctxURing, ioctxCloseSync} = do data IOOp m = IOOpRead !Fd !FileOffset !(MutableByteArray (PrimState m)) !Int !ByteCount | IOOpWrite !Fd !FileOffset !(MutableByteArray (PrimState m)) !Int !ByteCount -newtype IOResult = IOResult_ URing.IOResult - -{-# COMPLETE IOResult, IOError #-} - -pattern IOResult :: ByteCount -> IOResult -pattern IOResult c <- (viewIOResult -> Just c) - where - IOResult count = IOResult_ ((fromIntegral :: CSize -> CInt) count) - -pattern IOError :: Errno -> IOResult -pattern IOError e <- (viewIOError -> Just e) - where - IOError (Errno e) = IOResult_ (-e) - -viewIOResult :: IOResult -> Maybe ByteCount -viewIOResult (IOResult_ c) - | c >= 0 = Just ((fromIntegral :: CInt -> CSize) c) - | otherwise = Nothing - -viewIOError :: IOResult -> Maybe Errno -viewIOError (IOResult_ e) - | e < 0 = Just (Errno e) - | otherwise = Nothing - -- | Submit a batch of I\/O operations, and wait for them all to complete. -- The sequence of results matches up with the sequence of operations. @@ -210,7 +182,7 @@ viewIOError (IOResult_ e) -- the target depth, fill it up to double again. This way there is always -- at least the target number in flight at once. -- -submitIO :: IOCtx -> [IOOp IO] -> IO [IOResult] +submitIO :: IOCtx -> V.Vector (IOOp IO) -> IO (VU.Vector IOResult) submitIO IOCtx { ioctxQSemN, ioctxURing, @@ -218,9 +190,8 @@ submitIO IOCtx { ioctxChanIOBatchIx } ioops = do - let iobatchOpCount :: Word32 - !iobatchOpCount = fromIntegral (length ioops) - waitQSemN ioctxQSemN (fromIntegral iobatchOpCount) + let !iobatchOpCount = V.length ioops + waitQSemN ioctxQSemN iobatchOpCount iobatchIx <- readChan ioctxChanIOBatchIx iobatchCompletion <- newEmptyMVar let iobatchKeepAlives = ioops @@ -235,24 +206,24 @@ submitIO IOCtx { Nothing -> throwIO closed Just uring -> do -- print ("submitIO", iobatchOpCount) - sequence_ - [ --print ioop >> - case ioop of - IOOpRead fd off buf bufOff cnt -> do - guardPinned buf - URing.prepareRead uring fd off - (mutableByteArrayContents buf `plusPtr` bufOff) - cnt ioopid - IOOpWrite fd off buf bufOff cnt -> do - guardPinned buf - URing.prepareWrite uring fd off - (mutableByteArrayContents buf `plusPtr` bufOff) - cnt ioopid - | (ioop, ioopix) <- zip ioops [IOOpIx 0 ..] - , let !ioopid = packIOOpId iobatchIx ioopix ] + V.iforM_ ioops $ \ioopix ioop -> + let !ioopid = packIOOpId iobatchIx ioopix + in + --print ioop >> + case ioop of + IOOpRead fd off buf bufOff cnt -> do + guardPinned buf + URing.prepareRead uring fd off + (mutableByteArrayContents buf `plusPtr` bufOff) + cnt ioopid + IOOpWrite fd off buf bufOff cnt -> do + guardPinned buf + URing.prepareWrite uring fd off + (mutableByteArrayContents buf `plusPtr` bufOff) + cnt ioopid URing.submitIO uring -- print ("submitIO", "submitting done") - map (IOResult_ . coerce) . elems <$> takeMVar iobatchCompletion + takeMVar iobatchCompletion where closed = mkIOError ResourceVanished "IOCtx closed" Nothing Nothing guardPinned mba = do @@ -261,36 +232,33 @@ submitIO IOCtx { data IOBatch = IOBatch { iobatchIx :: !IOBatchIx, - iobatchOpCount :: !Word32, - iobatchCompletion :: MVar (UArray IOOpIx Int32), + iobatchOpCount :: !Int, + iobatchCompletion :: MVar (VU.Vector IOResult), -- | The list of I\/O operations is sent to the completion -- thread so that the buffers are kept alive while the kernel -- is using them. - iobatchKeepAlives :: [IOOp IO] + iobatchKeepAlives :: V.Vector (IOOp IO) } -newtype IOBatchIx = IOBatchIx Word32 - deriving (Eq, Ord, Ix, Enum, Show) - -newtype IOOpIx = IOOpIx Word32 - deriving (Eq, Ord, Ix, Enum, Show) +type IOBatchIx = Int +type IOOpIx = Int {-# INLINE packIOOpId #-} packIOOpId :: IOBatchIx -> IOOpIx -> URing.IOOpId -packIOOpId (IOBatchIx batchix) (IOOpIx opix) = +packIOOpId batchix opix = URing.IOOpId $ unsafeShiftL (fromIntegral batchix) 32 .|. fromIntegral opix {-# INLINE unpackIOOpId #-} unpackIOOpId :: URing.IOOpId -> (IOBatchIx, IOOpIx) unpackIOOpId (URing.IOOpId w64) = - (IOBatchIx batchix, IOOpIx opix) + (batchix, opix) where - batchix :: Word32 + batchix :: Int batchix = fromIntegral (unsafeShiftR w64 32) - opix :: Word32 - opix = fromIntegral w64 + opix :: Int + opix = fromIntegral (w64 .&. 0xffffffff) completionThread :: URing.URing -> MVar () @@ -299,44 +267,42 @@ completionThread :: URing.URing -> Chan IOBatch -> Chan IOBatchIx -> IO () -completionThread uring done maxc qsem chaniobatch chaniobatchix = do - let iobatchixBounds :: (IOBatchIx, IOBatchIx) - iobatchixBounds = (IOBatchIx 0, IOBatchIx (fromIntegral maxc-1)) - counts <- newArray iobatchixBounds (-1) - results <- newArray iobatchixBounds invalidEntry - completions <- newArray iobatchixBounds invalidEntry - keepAlives <- newArray iobatchixBounds invalidEntry +completionThread !uring !done !maxc !qsem !chaniobatch !chaniobatchix = do + counts <- VUM.replicate maxc (-1) + results <- VM.replicate maxc invalidEntry + completions <- VM.replicate maxc invalidEntry + keepAlives <- VM.replicate maxc invalidEntry collectCompletion counts results completions keepAlives `finally` putMVar done () where - collectCompletion :: IOUArray IOBatchIx Int - -> IOArray IOBatchIx (IOUArray IOOpIx Int32) - -> IOArray IOBatchIx (MVar (UArray IOOpIx Int32)) - -> IOArray IOBatchIx [IOOp IO] + collectCompletion :: VUM.MVector RealWorld Int + -> VM.MVector RealWorld (VUM.MVector RealWorld IOResult) + -> VM.MVector RealWorld (MVar (VU.Vector IOResult)) + -> VM.MVector RealWorld (V.Vector (IOOp IO)) -> IO () - collectCompletion counts results completions keepAlives = do + collectCompletion !counts !results !completions !keepAlives = do iocompletion <- URing.awaitIO uring - let (URing.IOCompletion ioopid iores) = iocompletion + let (URing.IOCompletion !ioopid !iores) = iocompletion unless (ioopid == URing.IOOpId maxBound) $ do - let (iobatchix, ioopix) = unpackIOOpId ioopid + let (!iobatchix, !ioopix) = unpackIOOpId ioopid count <- do - c <- readArray counts iobatchix + c <- VUM.read counts iobatchix if c < 0 then collectIOBatches iobatchix else return c assert (count > 0) (return ()) - writeArray counts iobatchix (count-1) - result <- readArray results iobatchix - writeArray result ioopix (coerce iores) + VUM.write counts iobatchix (count-1) + result <- VM.read results iobatchix + VUM.write result ioopix iores when (count == 1) $ do - completion <- readArray completions iobatchix - writeArray counts iobatchix (-1) - writeArray results iobatchix invalidEntry - writeArray completions iobatchix invalidEntry - writeArray keepAlives iobatchix invalidEntry - result' <- freeze result - putMVar completion (result' :: UArray IOOpIx Int32) + completion <- VM.read completions iobatchix + VUM.write counts iobatchix (-1) + VM.write results iobatchix invalidEntry + VM.write completions iobatchix invalidEntry + VM.write keepAlives iobatchix invalidEntry + result' <- VU.unsafeFreeze result + putMVar completion (result' :: VU.Vector IOResult) writeChan chaniobatchix iobatchix - let !qrelease = rangeSize (bounds result') + let !qrelease = VU.length result' signalQSemN qsem qrelease collectCompletion counts results completions keepAlives @@ -348,20 +314,20 @@ completionThread uring done maxc qsem chaniobatch chaniobatchix = do -- reset count to -1, and result entries to undefined where collectIOBatches :: IOBatchIx -> IO Int - collectIOBatches iobatchixNeeded = do + collectIOBatches !iobatchixNeeded = do IOBatch{ iobatchIx, iobatchOpCount, iobatchCompletion, iobatchKeepAlives } <- readChan chaniobatch - oldcount <- readArray counts iobatchIx + oldcount <- VUM.read counts iobatchIx assert (oldcount == (-1)) (return ()) - writeArray counts iobatchIx (fromIntegral iobatchOpCount) - result <- newArray (IOOpIx 0, IOOpIx (iobatchOpCount-1)) (-1) - writeArray results iobatchIx result - writeArray completions iobatchIx iobatchCompletion - writeArray keepAlives iobatchIx iobatchKeepAlives + VUM.write counts iobatchIx (fromIntegral iobatchOpCount) + result <- VUM.replicate iobatchOpCount (IOResult (-1)) + VM.write results iobatchIx result + VM.write completions iobatchIx iobatchCompletion + VM.write keepAlives iobatchIx iobatchKeepAlives if iobatchIx == iobatchixNeeded then return $! fromIntegral iobatchOpCount else collectIOBatches iobatchixNeeded diff --git a/System/IO/BlockIO/URing.hs b/System/IO/BlockIO/URing.hs index 4d6853e..64948ab 100644 --- a/System/IO/BlockIO/URing.hs +++ b/System/IO/BlockIO/URing.hs @@ -1,7 +1,35 @@ -{-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE BangPatterns #-} - -module System.IO.BlockIO.URing where +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE PatternSynonyms #-} +{-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE TypeFamilies #-} +{-# LANGUAGE ViewPatterns #-} + +module System.IO.BlockIO.URing ( + URing, + URingParams(..), + setupURing, + closeURing, + withURing, + IOOpId(..), + prepareRead, + prepareWrite, + prepareNop, + submitIO, + IOCompletion(..), + IOResult(IOResult, IOError), + awaitIO, + ) where + +import qualified Data.Vector.Generic as VG +import qualified Data.Vector.Generic.Mutable as VGM +import qualified Data.Vector.Primitive as VP +import qualified Data.Vector.Unboxed as VU +import qualified Data.Vector.Unboxed.Mutable as VUM +import qualified Data.Vector.Unboxed.Base import Foreign import Foreign.C @@ -81,11 +109,53 @@ submitIO (URing uringptr) = -- --- Completing I/O +-- Types for completing I/O -- data IOCompletion = IOCompletion !IOOpId !IOResult -type IOResult = CInt + +newtype IOResult = IOResult_ Int + deriving (Eq, Show) + +{-# COMPLETE IOResult, IOError #-} + +pattern IOResult :: ByteCount -> IOResult +pattern IOResult c <- (viewIOResult -> Just c) + where + IOResult count = IOResult_ ((fromIntegral :: CSize -> Int) count) + +pattern IOError :: Errno -> IOResult +pattern IOError e <- (viewIOError -> Just e) + where + IOError (Errno e) = IOResult_ (fromIntegral (-e)) + +viewIOResult :: IOResult -> Maybe ByteCount +viewIOResult (IOResult_ c) + | c >= 0 = Just ((fromIntegral :: Int -> CSize) c) + | otherwise = Nothing + +viewIOError :: IOResult -> Maybe Errno +viewIOError (IOResult_ e) + | e < 0 = Just (Errno (fromIntegral e)) + | otherwise = Nothing + + +-- +-- Unboxed vector support for IOResult +-- + +newtype instance VUM.MVector s IOResult = MV_IOResult (VP.MVector s Int) +newtype instance VU.Vector IOResult = V_IOResult (VP.Vector Int) + +deriving newtype instance VGM.MVector VUM.MVector IOResult +deriving newtype instance VG.Vector VU.Vector IOResult + +instance VU.Unbox IOResult + + +-- +-- Completing I/O +-- awaitIO :: URing -> IO IOCompletion awaitIO (URing uringptr) = @@ -105,8 +175,8 @@ awaitIO (URing uringptr) = cqeptr <- peek cqeptrptr FFI.URingCQE { FFI.cqe_data, FFI.cqe_res } <- peek cqeptr FFI.io_uring_cqe_seen uringptr cqeptr - let !opid = IOOpId (fromIntegral cqe_data) - !res = fromIntegral cqe_res + let opid = IOOpId (fromIntegral cqe_data) + res = IOResult_ (fromIntegral cqe_res) return $! IOCompletion opid res diff --git a/benchmark/Bench.hs b/benchmark/Bench.hs index db05dbf..4ee967b 100644 --- a/benchmark/Bench.hs +++ b/benchmark/Bench.hs @@ -1,4 +1,5 @@ {-# LANGUAGE BangPatterns #-} +{-# LANGUAGE CPP #-} {- HLINT ignore "Use camelCase" -} module Main (main) where @@ -21,6 +22,7 @@ import Data.Time import System.IO.BlockIO import System.IO.BlockIO.URing hiding (submitIO) import qualified System.IO.BlockIO.URing as URing +import qualified Data.Vector as V main :: IO () main = do @@ -35,7 +37,11 @@ main = do main_lowlevel :: FilePath -> IO () main_lowlevel filename = do putStrLn "Low-level API benchmark" +#if MIN_VERSION_unix(2,8,0) fd <- openFd filename ReadOnly defaultFileFlags +#else + fd <- openFd filename ReadOnly Nothing defaultFileFlags +#endif status <- getFdStatus fd let size = fileSize status lastBlock :: Int @@ -59,7 +65,7 @@ main_lowlevel filename = do collectBatch n = replicateM_ n $ do (IOCompletion i count) <- awaitIO uring - when (count /= 4096) $ + when (count /= IOResult 4096) $ fail $ "I/O failure: I/O " ++ show i ++ " returned " ++ show count @@ -89,7 +95,11 @@ main_lowlevel filename = do main_highlevel :: FilePath -> IO () main_highlevel filename = do putStrLn "High-level API benchmark" +#if MIN_VERSION_unix(2,8,0) fd <- openFd filename ReadOnly defaultFileFlags +#else + fd <- openFd filename ReadOnly Nothing defaultFileFlags +#endif status <- getFdStatus fd rng <- initStdGen let size = fileSize status @@ -100,18 +110,16 @@ main_highlevel filename = do ioctxBatchSizeLimit = 64, ioctxConcurrencyLimit = 64 * 4 } - blocks = zip [0..] (randomPermute rng [0..lastBlock]) + blocks = V.fromList $ zip [0..] (randomPermute rng [0..lastBlock]) bracket (initIOCtx params) closeIOCtx $ \ioctx -> do buf <- newPinnedByteArray (4096 * nbufs) before <- getCurrentTime forConcurrently_ (groupsOfN 32 blocks) $ \batch -> - submitIO ioctx - [ IOOpRead fd blockoff buf bufOff 4096 - | (i, block) <- batch - , let bufOff = (i `mod` nbufs) * 4096 - blockoff = fromIntegral (block * 4096) - ] + submitIO ioctx $ flip fmap batch $ \ (i, block) -> + let bufOff = (i `mod` nbufs) * 4096 + blockoff = fromIntegral (block * 4096) + in IOOpRead fd blockoff buf bufOff 4096 after <- getCurrentTime let total = lastBlock + 1 report before after total @@ -127,9 +135,9 @@ report before after total = do iops :: Int iops = round (fromIntegral total / realToFrac elapsed :: Double) -groupsOfN :: Int -> [a] -> [[a]] -groupsOfN _ [] = [] -groupsOfN n xs = take n xs : groupsOfN n (drop n xs) +groupsOfN :: Int -> V.Vector a -> [V.Vector a] +groupsOfN n xs | V.null xs = [] + | otherwise = V.take n xs : groupsOfN n (V.drop n xs) randomPermute :: Ord a => StdGen -> [a] -> [a] randomPermute rng0 xs0 = diff --git a/blockio-uring.cabal b/blockio-uring.cabal index cc46c14..8f4c4a5 100644 --- a/blockio-uring.cabal +++ b/blockio-uring.cabal @@ -40,10 +40,9 @@ library System.IO.BlockIO.URingFFI build-depends: - , array ^>=0.5 - , base >=4.16 && <4.20 + base >=4.12 && <4.20 , primitive ^>=0.9 - , unix ^>=2.8 + , vector ^>=0.13 pkgconfig-depends: liburing >=2.0 && <2.7 default-language: Haskell2010 @@ -62,6 +61,7 @@ benchmark bench , primitive , random , time + , vector , unix pkgconfig-depends: liburing @@ -79,9 +79,10 @@ test-suite test main-is: Main.hs build-depends: , array - , base >=4.16 && <4.20 + , base , primitive , tasty + , vector , tasty-hunit pkgconfig-depends: liburing diff --git a/test/Main.hs b/test/Main.hs index a17ff50..aed0182 100644 --- a/test/Main.hs +++ b/test/Main.hs @@ -33,7 +33,7 @@ example_simpleNoop n = do URing.submitIO uring completion <- awaitIO uring closeURing uring - IOCompletion (IOOpId n) 0 @=? completion + IOCompletion (IOOpId n) (IOResult 0) @=? completion deriving instance Eq IOCompletion deriving instance Show IOCompletion