Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Radically simplify scheduler and implement new scheduling strategy for short running actions #751

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/Development/Shake/Internal/Core/Build.hs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ buildOne global@Global{..} stack database i k r = case addStack i k stack of
Right stack -> Later $ \continue -> do
setIdKeyStatus global database i k (Running (NoShow continue) r)
let go = buildRunMode global stack database r
fromLater go $ \mode -> liftIO $ addPool PoolStart globalPool $
priority = case r of
Nothing -> PoolStart
Just (execution -> t) -> PoolEstimate t (show k)
fromLater go $ \mode -> liftIO $ addPool priority globalPool $
runKey global stack k r mode $ \res -> do
runLocked database $ do
let val = fmap runValue res
Expand Down
85 changes: 34 additions & 51 deletions src/General/Pool.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import qualified Data.Heap as Heap
import qualified Data.HashSet as Set
import Data.IORef.Extra
import System.Random
import Debug.Trace
import GHC.Conc


---------------------------------------------------------------------
Expand All @@ -41,22 +43,15 @@ If any worker throws an exception, must signal to all the other workers
data S = S
{alive :: !Bool -- True until there's an exception, after which don't spawn more tasks
,threads :: !(Set.HashSet Thread) -- IMPORTANT: Must be strict or we leak thread stacks
,threadsLimit :: {-# UNPACK #-} !Int -- user supplied thread limit, Set.size threads <= threadsLimit
,threadsCount :: {-# UNPACK #-} !Int -- Set.size threads, but in O(1)
,threadsMax :: {-# UNPACK #-} !Int -- high water mark of Set.size threads (accounting only)
,threadsSum :: {-# UNPACK #-} !Int -- number of threads we have been through (accounting only)
,rand :: IO Int -- operation to give us the next random Int
,todo :: !(Heap.Heap (Heap.Entry (PoolPriority, Int) (IO ()))) -- operations waiting a thread
}


emptyS :: Int -> Bool -> IO S
emptyS n deterministic = do
rand <- if not deterministic then pure randomIO else do
ref <- newIORef 0
-- no need to be thread-safe - if two threads race they were basically the same time anyway
pure $ do i <- readIORef ref; writeIORef' ref (i+1); pure i
pure $ S True Set.empty n 0 0 0 rand Heap.empty
emptyS n deterministic =
pure $ S True Set.empty 0 0 0


data Pool = Pool
Expand All @@ -71,62 +66,51 @@ withPool (Pool var _) f = join $ modifyVar var $ \s ->
withPool_ :: Pool -> (S -> IO S) -> IO ()
withPool_ pool act = withPool pool $ fmap (, pure()) . act


worker :: Pool -> IO ()
worker pool = withPool pool $ \s -> pure $ case Heap.uncons $ todo s of
Nothing -> (s, pure ())
Just (Heap.Entry _ now, todo2) -> (s{todo = todo2}, now >> worker pool)

-- | Given a pool, and a function that breaks the S invariants, restore them.
-- They are only allowed to touch threadsLimit or todo.
-- Assumes only requires spawning a most one job (e.g. can't increase the pool by more than one at a time)
step :: Pool -> (S -> IO S) -> IO ()
-- mask_ is so we don't spawn and not record it
step pool@(Pool _ done) op = mask_ $ withPool_ pool $ \s -> do
s <- op s
case Heap.uncons $ todo s of
Just (Heap.Entry _ now, todo2) | threadsCount s < threadsLimit s -> do
-- spawn a new worker
t <- newThreadFinally (now >> worker pool) $ \t res -> case res of
Left e -> withPool_ pool $ \s -> do
signalBarrier done $ Left e
pure (remThread t s){alive = False}
Right _ ->
step pool $ pure . remThread t
pure (addThread t s){todo = todo2}
Nothing | threadsCount s == 0 -> do
signalBarrier done $ Right s
pure s{alive = False}
_ -> pure s
where
addThread t s = s{threads = Set.insert t $ threads s, threadsCount = threadsCount s + 1
,threadsSum = threadsSum s + 1, threadsMax = threadsMax s `max` (threadsCount s + 1)}
remThread t s = s{threads = Set.delete t $ threads s, threadsCount = threadsCount s - 1}

threshold :: Float
threshold = 0.05

-- | Add a new task to the pool. See the top of the module for the relative ordering
-- and semantics.
addPool :: PoolPriority -> Pool -> IO a -> IO ()
addPool priority pool act = step pool $ \s -> do
i <- rand s
pure s{todo = Heap.insert (Heap.Entry (priority, i) $ void act) $ todo s}

addPool priority pool@(Pool _ done) act =
withPool_ pool $ \s -> do
traceEventIO $ "Scheduling event with priority: " ++ show priority
t <- newThreadFinally l mcap act $ \t res -> do
traceEventIO $ show l ++ " done."
case res of
Left e -> withPool_ pool $ \s -> do
signalBarrier done $ Left e
pure (remThread t s){alive = False}
Right _ -> withPool_ pool $ \s -> do
let s' = remThread t s
when (threadsCount s' == 0) $
signalBarrier done $ Right s'{alive = False}
pure $ s'{alive = threadsCount s' /= 0}
pure (addThread t s)
where
addThread t s = s{threads = Set.insert t $ threads s, threadsCount = threadsCount s + 1
,threadsSum = threadsSum s + 1, threadsMax = threadsMax s `max` (threadsCount s + 1)}
remThread t s = s{threads = Set.delete t $ threads s, threadsCount = threadsCount s - 1}
mcap = case priority of
PoolEstimate t _ | t <= threshold -> Just 0
_ -> Nothing
l = case priority of
PoolEstimate _ s -> s
_ -> "Unknown"

data PoolPriority
= PoolException
| PoolResume
| PoolStart
| PoolBatch
| PoolDeprioritize Double
deriving (Eq,Ord)
| PoolEstimate { estimatedTime :: Float, label :: String }
deriving (Eq,Ord,Show)

-- | Temporarily increase the pool by 1 thread. Call the cleanup action to restore the value.
-- After calling cleanup you should requeue onto a new thread.
increasePool :: Pool -> IO (IO ())
increasePool pool = do
step pool $ \s -> pure s{threadsLimit = threadsLimit s + 1}
pure $ step pool $ \s -> pure s{threadsLimit = threadsLimit s - 1}

increasePool pool = pure (pure ())

-- | Make sure the pool cannot run out of tasks (and thus everything finishes) until after the cancel is called.
-- Ensures that a pool that will requeue in time doesn't go idle.
Expand All @@ -139,7 +123,6 @@ keepAlivePool pool = do
cancel
pure $ signalBarrier bar ()


-- | Run all the tasks in the pool on the given number of works.
-- If any thread throws an exception, the exception will be reraised.
runPool :: Bool -> Int -> (Pool -> IO ()) -> IO () -- run all tasks in the pool
Expand Down
18 changes: 13 additions & 5 deletions src/General/Thread.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE RankNTypes #-}

-- | A bit like 'Fence', but not thread safe and optimised for avoiding taking the fence
module General.Thread(
Expand All @@ -14,6 +15,7 @@ import Control.Concurrent.Extra
import Control.Exception
import General.Extra
import Control.Monad.Extra
import GHC.Conc


data Thread = Thread ThreadId (Barrier ())
Expand All @@ -25,15 +27,21 @@ instance Hashable Thread where
hashWithSalt salt (Thread a _) = hashWithSalt salt a

-- | The inner thread is unmasked even if you started masked.
newThreadFinally :: IO a -> (Thread -> Either SomeException a -> IO ()) -> IO Thread
newThreadFinally act cleanup = do
newThreadFinally :: String -> Maybe Int -> IO a -> (Thread -> Either SomeException a -> IO ()) -> IO Thread
newThreadFinally label mcap act cleanup = do
bar <- newBarrier
t <- mask_ $ forkIOWithUnmask $ \unmask -> flip finally (signalBarrier bar ()) $ do
res <- try $ unmask act
t <- mask_ $ fork $ \unmask -> flip finally (signalBarrier bar ()) $ do
me <- myThreadId
res <- try $ unmask act
cleanup (Thread me bar) res
labelThread t $ label ++ labeltype
pure $ Thread t bar

where
labeltype = maybe "(Free)" (\i -> "(Restricted to "++show i++")") mcap
fork :: ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
fork = case mcap of
Nothing -> forkIOWithUnmask
Just n -> forkOnWithUnmask n

stopThreads :: [Thread] -> IO ()
stopThreads threads = do
Expand Down