diff --git a/System/IO/BlockIO.hs b/System/IO/BlockIO.hs index d24c015..0c18f2e 100644 --- a/System/IO/BlockIO.hs +++ b/System/IO/BlockIO.hs @@ -3,7 +3,7 @@ {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE ViewPatterns #-} - +{-# LANGUAGE LambdaCase #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE DerivingStrategies #-} {-# LANGUAGE MultiParamTypeClasses #-} @@ -24,6 +24,8 @@ module System.IO.BlockIO ( IOResult(IOResult, IOError), ByteCount, Errno(..), + -- * Internal + pattern IOResult_ ) where import Data.Word @@ -64,7 +66,7 @@ data IOCtx = IOCtx { -- | Locking of the writer end of the URing: used by writers -- while they are modifying the uring submission queue. - ioctxURing :: !(MVar URing.URing), + ioctxURing :: !(MVar (Maybe URing.URing)), -- | Communication channel from writers to the completion thread: -- letting it know about new batches of IO that they have @@ -98,7 +100,7 @@ initIOCtx IOCtxParams {ioctxBatchSizeLimit, ioctxConcurrencyLimit} = do mask_ $ do ioctxQSemN <- newQSemN ioctxConcurrencyLimit uring <- URing.setupURing (URing.URingParams ioctxBatchSizeLimit) - ioctxURing <- newMVar uring + ioctxURing <- newMVar (Just uring) ioctxChanIOBatch <- newChan ioctxChanIOBatchIx <- newChan ioctxCloseSync <- newEmptyMVar @@ -131,14 +133,15 @@ initIOCtx IOCtxParams {ioctxBatchSizeLimit, ioctxConcurrencyLimit} = do closeIOCtx :: IOCtx -> IO () closeIOCtx IOCtx {ioctxURing, ioctxCloseSync} = do - uring <- takeMVar ioctxURing - URing.prepareNop uring (URing.IOOpId maxBound) - URing.submitIO uring - takeMVar ioctxCloseSync - URing.closeURing uring - putMVar ioctxURing (throw closed) - where - closed = mkIOError ResourceVanished "IOCtx closed" Nothing Nothing + uringMay <- takeMVar ioctxURing + case uringMay of + Nothing -> putMVar ioctxURing Nothing + Just uring -> do + URing.prepareNop uring (URing.IOOpId maxBound) + URing.submitIO uring + takeMVar ioctxCloseSync + URing.closeURing uring + putMVar ioctxURing Nothing data IOOp = IOOpRead !Fd !FileOffset !(Ptr Word8) !ByteCount | IOOpWrite !Fd !FileOffset !(Ptr Word8) !ByteCount @@ -224,21 +227,24 @@ submitIO IOCtx { iobatchOpCount, iobatchCompletion } - withMVar ioctxURing $ \uring -> do + withMVar ioctxURing $ \case + Nothing -> throwIO closed + Just uring -> do -- print ("submitIO", iobatchOpCount) - sequence_ - [ --print ioop >> - case ioop of - IOOpRead fd off buf cnt -> - URing.prepareRead uring fd off buf cnt ioopid - - IOOpWrite fd off buf cnt -> - URing.prepareWrite uring fd off buf cnt ioopid - | (ioop, ioopix) <- zip ioops [IOOpIx 0 ..] - , let !ioopid = packIOOpId iobatchIx ioopix ] - URing.submitIO uring + sequence_ + [ --print ioop >> + case ioop of + IOOpRead fd off buf cnt -> + URing.prepareRead uring fd off buf cnt ioopid + + IOOpWrite fd off buf cnt -> + URing.prepareWrite uring fd off buf cnt ioopid + | (ioop, ioopix) <- zip ioops [IOOpIx 0 ..] + , let !ioopid = packIOOpId iobatchIx ioopix ] + URing.submitIO uring -- print ("submitIO", "submitting done") map (IOResult_ . coerce) . elems <$> takeMVar iobatchCompletion + where closed = mkIOError ResourceVanished "IOCtx closed" Nothing Nothing data IOBatch = IOBatch { iobatchIx :: !IOBatchIx,