Skip to content

Commit

Permalink
txgen-mvar: rework AsyncBenchmarkControl
Browse files Browse the repository at this point in the history
I. make AsyncBenchmarkControl a record
    The type alias of a tuple was not very mnemonic or self-explanatory.
    This replaces it with a record and haddock documents its fields.
II. use ABC to cancel threads
    The AsyncBenchmarkControl that should be initialized by the time a
    signal is received is fetched from the TVar and unpacked to be used
    to throw exceptions to the other threads. The other threads can now
    catch the exceptions in order to carry out orderly shutdowns in the
    sequel.
III. use TVar for Env AsyncBenchmarkControl
    In order to thread the AsyncBenchmarkControl through the contexts
    surrounding the creation and destruction of the Async structures and
    overall container, this stores a TVar (Maybe AsyncBenchmarkControl)
    as a value in a Map where previously it was just
    AsyncBenchmarkControl. The idea is to use the reference to it to be able
    to use it in the context of a signal handler by packaging the
    reference data with the code pointer in a partial application or
    monadic context or similar. With that data in hand, it's just a
    matter of iterating over the threads and reaping them all.
  • Loading branch information
NadiaYvette committed Jun 24, 2024
1 parent c83eed1 commit 3c89de4
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 83 deletions.
13 changes: 6 additions & 7 deletions bench/tx-generator/src/Cardano/Benchmarking/Command.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ where
#endif

import Cardano.Benchmarking.Compiler (compileOptions)
import Cardano.Benchmarking.LogTypes (AsyncBenchmarkControl (..))
import Cardano.Benchmarking.Script (parseScriptFileAeson, runScript)
import Cardano.Benchmarking.Script.Aeson (parseJSONFile, prettyPrint)
import Cardano.Benchmarking.Script.Env as Env (Env (Env, envThreads), mkNewEnv)
Expand Down Expand Up @@ -48,7 +49,6 @@ import Control.Monad.STM as STM (atomically)

import Data.Foldable as Fold (forM_)
import Data.List as List (unwords)
import Data.Map as Map (lookup)
import Data.Time.Format as Time (defaultTimeLocale, formatTime)
import Data.Time.Clock.System as Time (getSystemTime, systemToUTCTime)
import System.Posix.Signals as Sig (Handler (CatchInfoOnce), SignalInfo (..), SignalSpecificInfo (..), fullSignalSet, installHandler, sigINT, sigTERM)
Expand Down Expand Up @@ -108,14 +108,13 @@ runCommand = withIOManager $ \iocp -> do
installSignalHandler :: IO Env
installSignalHandler = do
env@Env { .. } <- STM.atomically mkNewEnv
Just abcTVar <- pure $ "tx-submit-benchmark" `Map.lookup` envThreads
abc <- STM.atomically $ STM.readTVar abcTVar
abc <- STM.atomically $ STM.readTVar envThreads
_ <- pure abc
#ifdef UNIX
let signalHandler = Sig.CatchInfoOnce signalHandler'
signalHandler' sigInfo = do
tid <- Conc.myThreadId
Just (throttler, workers, _, _) <- STM.atomically $ STM.readTVar abcTVar
Just AsyncBenchmarkControl { .. } <- STM.atomically $ STM.readTVar envThreads
utcTime <- Time.systemToUTCTime <$> Time.getSystemTime
-- It's meant to match Cardano.Tracers.Handlers.Logs.Utils
-- The hope was to avoid the package dependency.
Expand All @@ -139,9 +138,9 @@ runCommand = withIOManager $ \iocp -> do
errorToThrow = userError labelStr

Prelude.putStrLn labelStr
Async.cancelWith throttler errorToThrow
Fold.forM_ workers \work -> do
Async.cancelWith work errorToThrow
abcFeeder `Async.cancelWith` errorToThrow
Fold.forM_ abcWorkers \work -> do
work `Async.cancelWith` errorToThrow
Fold.forM_ [Sig.sigINT, Sig.sigTERM] $ \sig ->
Sig.installHandler sig signalHandler $ Just fullSignalSet
#endif
Expand Down
21 changes: 10 additions & 11 deletions bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx.hs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ import Network.Socket (AddrInfo (..), AddrInfoFlag (..), Family (..),


waitBenchmark :: Trace IO (TraceBenchTxSubmit TxId) -> AsyncBenchmarkControl -> ExceptT TxGenError IO ()
waitBenchmark traceSubmit (feeder, workers, mkSummary, _) = liftIO $ do
mapM_ waitCatch $ feeder : workers
traceWith traceSubmit . TraceBenchTxSubSummary =<< mkSummary
waitBenchmark traceSubmit AsyncBenchmarkControl { .. } = liftIO $ do
mapM_ waitCatch $ abcFeeder : abcWorkers
traceWith traceSubmit . TraceBenchTxSubSummary =<< abcSummary

lookupNodeAddress :: NodeIPv4Address -> IO AddrInfo
lookupNodeAddress node = do
Expand Down Expand Up @@ -156,8 +156,7 @@ walletBenchmark

reportRefs <- atomically do replicateM (fromIntegral numTargets) STM.newEmptyTMVar
let asyncList = zip reportRefs $ NE.toList remoteAddresses

allAsyncs <- forM asyncList \(reportRef, remoteInfo@(remoteName, remoteAddrInfo)) -> do
abcWorkers <- forM asyncList \(reportRef, remoteInfo@(remoteName, remoteAddrInfo)) -> do
let errorHandler = handleTxSubmissionClientError traceSubmit remoteInfo reportRef errorPolicy
client = txSubmissionClient
traceN2N
Expand All @@ -171,19 +170,19 @@ walletBenchmark
" servicing " ++ remoteName ++ " (" ++ remoteAddrString ++ ")"
pure asyncThread

tpsThrottleThread <- async $ do
abcFeeder <- async $ do
startSending tpsThrottle
traceWith traceSubmit $ TraceBenchTxSubDebug "tpsLimitedFeeder : transmitting done"
STM.atomically $ sendStop tpsThrottle
traceWith traceSubmit $ TraceBenchTxSubDebug "tpsLimitedFeeder : shutdown done"
let tid = asyncThreadId tpsThrottleThread
let tid = asyncThreadId abcFeeder
labelThread tid $ "tpsThrottleThread " ++ show tid

let tpsFeederShutdown = do
cancel tpsThrottleThread
liftIO $ STM.atomically $ sendStop tpsThrottle
let abcShutdown = do
cancel abcFeeder
liftIO . STM.atomically $ sendStop tpsThrottle

return (tpsThrottleThread, allAsyncs, mkSubmissionSummary threadName startTime reportRefs, tpsFeederShutdown)
pure AsyncBenchmarkControl { abcSummary = mkSubmissionSummary threadName startTime reportRefs, .. }
where
traceDebug :: String -> IO ()
traceDebug = traceWith traceSubmit . TraceBenchTxSubDebug
14 changes: 12 additions & 2 deletions bench/tx-generator/src/Cardano/Benchmarking/LogTypes.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}

module Cardano.Benchmarking.LogTypes
( AsyncBenchmarkControl
( AsyncBenchmarkControl (..)
, BenchTracers(..)
, NodeToNodeSubmissionTrace(..)
, SendRecvConnect
Expand Down Expand Up @@ -49,7 +49,17 @@ import Data.Time.Clock (DiffTime, NominalDiffTime)
import GHC.Generics
import Network.Mux (WithMuxBearer (..))

type AsyncBenchmarkControl = (Async.Async (), [Async.Async ()], IO SubmissionSummary, IO ())
data AsyncBenchmarkControl =
AsyncBenchmarkControl
{ abcFeeder :: Async.Async ()
-- ^ The thread to feed transactions, also called a throttler.
, abcWorkers :: [Async.Async ()]
-- ^ The per-node transaction submission threads.
, abcSummary :: IO SubmissionSummary
-- ^ IO action to emit a summary.
, abcShutdown :: IO ()
-- ^ IO action to shut down the feeder thread.
}

data BenchTracers =
BenchTracers
Expand Down
16 changes: 5 additions & 11 deletions bench/tx-generator/src/Cardano/Benchmarking/Script.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import Control.Monad.IO.Class
import Control.Monad.STM as STM (atomically)
import Control.Monad.Trans.Except as Except (throwE)
import qualified Data.List as List (unwords)
import qualified Data.Map as Map (lookup)
import System.Mem (performGC)

type Script = [Action]
Expand All @@ -48,25 +47,20 @@ runScript env script iom = do
pure (Right (), abc)
(Left err, env'@Env.Env { .. }, ()) -> do
cleanup env' (Env.traceError (show err) >> shutDownLogging)
case "tx-submit-benchmark" `Map.lookup` envThreads of
Just abcTVar -> do
abcMaybe <- STM.atomically $ STM.readTVar abcTVar
case abcMaybe of
Just abc -> pure (Left err, abc)
Nothing -> error $ List.unwords
[ "Cardano.Benchmarking.Script.runScript:"
, "AsyncBenchmarkControl uninitialized" ]
abcMaybe <- STM.atomically $ STM.readTVar envThreads
case abcMaybe of
Just abc -> pure (Left err, abc)
Nothing -> error $ List.unwords
[ "Cardano.Benchmarking.Script.runScript:"
, "AsyncBenchmarkControl absent from map" ]
, "AsyncBenchmarkControl uninitialized" ]
where
cleanup :: Env.Env -> Env.ActionM () -> IO ()
cleanup env' acts = void $ Env.runActionMEnv env' acts iom
execScript :: Env.ActionM AsyncBenchmarkControl
execScript = do
setProtocolParameters QueryLocalNode
forM_ script action
abcMaybe <- Env.getEnvThreads "tx-submit-benchmark"
abcMaybe <- Env.getEnvThreads
case abcMaybe of
Nothing -> throwE $ Env.TxGenError $ Types.TxGenError $
List.unwords
Expand Down
13 changes: 6 additions & 7 deletions bench/tx-generator/src/Cardano/Benchmarking/Script/Action.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ module Cardano.Benchmarking.Script.Action
)
where

import qualified Data.Text as Text (unpack)

import Control.Monad.IO.Class
import Control.Monad.Trans.Except.Extra

import Cardano.Benchmarking.OuroborosImports as Core (protocolToNetworkId)
import Cardano.Benchmarking.Script.Core
import Cardano.Benchmarking.Script.Env
Expand All @@ -26,6 +21,10 @@ import Cardano.Benchmarking.Tracer
import Cardano.TxGenerator.Setup.NodeConfig
import Cardano.TxGenerator.Types (TxGenError)

import Control.Monad.IO.Class
import Control.Monad.Trans.Except.Extra
import qualified Data.Text as Text (unpack)


-- | 'action' has as its sole callers
-- 'Cardano.Benchmark.Script.runScript' from "Cardano.Benchmark.Script"
Expand All @@ -47,8 +46,8 @@ action a = case a of
AddFund era wallet txIn lovelace keyName -> addFund era wallet txIn lovelace keyName
Delay t -> delay t
Submit era submitMode txParams generator -> submitAction era submitMode generator txParams
WaitBenchmark thread -> waitBenchmark thread
CancelBenchmark thread -> cancelBenchmark thread
WaitBenchmark _thread -> waitBenchmark
CancelBenchmark _thread -> cancelBenchmark
WaitForEra era -> waitForEra era
LogMsg txt -> traceDebug $ Text.unpack txt
Reserved options -> reserved options
Expand Down
22 changes: 11 additions & 11 deletions bench/tx-generator/src/Cardano/Benchmarking/Script/Core.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import qualified Cardano.Benchmarking.GeneratorTx as GeneratorTx (waitBenchmark,
import Cardano.Benchmarking.GeneratorTx.NodeToNode (ConnectClient,
benchmarkConnectTxSubmit)
import Cardano.Benchmarking.GeneratorTx.SizedMetadata (mkMetadata)
import Cardano.Benchmarking.LogTypes as Core (TraceBenchTxSubmit (..), btConnect_, btN2N_,
btSubmission2_, btTxSubmit_)
import Cardano.Benchmarking.LogTypes as Core (AsyncBenchmarkControl (..),
TraceBenchTxSubmit (..), btConnect_, btN2N_, btSubmission2_, btTxSubmit_)
import Cardano.Benchmarking.OuroborosImports as Core (LocalSubmitTx, SigningKeyFile,
makeLocalConnectInfo, protocolToCodecConfig)
import Cardano.Benchmarking.Script.Aeson (prettyPrintOrdered, readProtocolParametersFile)
Expand Down Expand Up @@ -143,20 +143,20 @@ getConnectClient = do
mempty -- (btSubmission2_ tracers)
(protocolToCodecConfig protocol)
networkMagic
waitBenchmark :: String -> ActionM ()
waitBenchmark n = do
abcMaybe <- getEnvThreads n
waitBenchmark :: ActionM ()
waitBenchmark = do
abcMaybe <- getEnvThreads
case abcMaybe of
Just abc -> waitBenchmarkCore abc
Nothing -> do
throwE . Env.TxGenError . TxGenError $
("waitBenchmark: missing AsyncBenchmarkControl" :: String)

cancelBenchmark :: String -> ActionM ()
cancelBenchmark n = do
Just ctl@(_, _ , _ , shutdownAction) <- getEnvThreads n
liftIO shutdownAction
waitBenchmarkCore ctl
cancelBenchmark :: ActionM ()
cancelBenchmark = do
Just abc@AsyncBenchmarkControl { .. } <- getEnvThreads
liftIO abcShutdown
waitBenchmarkCore abc

getLocalConnectInfo :: ActionM LocalNodeConnectInfo
getLocalConnectInfo = makeLocalConnectInfo <$> getEnvNetworkId <*> getEnvSocketPath
Expand Down Expand Up @@ -284,7 +284,7 @@ benchmarkTxStream txStream targetNodes threadName tps txCount era = do
ret <- liftIO $ runExceptT $ coreCall era
case ret of
Left err -> liftTxGenError err
Right ctl -> setEnvThreads threadName ctl
Right ctl -> setEnvThreads ctl

evalGenerator :: IsShelleyBasedEra era => Generator -> TxGenTxParams -> AsType era -> ActionM (TxStream IO era)
evalGenerator generator txParams@TxGenTxParams{txParamFee = fee} era = do
Expand Down
35 changes: 13 additions & 22 deletions bench/tx-generator/src/Cardano/Benchmarking/Script/Env.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
Expand All @@ -30,7 +31,6 @@ module Cardano.Benchmarking.Script.Env (
, Env (Env, envThreads)
, Error (..)
, mkNewEnv
, runActionM
, runActionMEnv
, liftTxGenError
, liftIOSafe
Expand Down Expand Up @@ -102,7 +102,7 @@ data Env = Env { -- | 'Cardano.Api.ProtocolParameters' is ultimately
, envNetworkId :: Maybe NetworkId
, envSocketPath :: Maybe FilePath
, envKeys :: Map String (SigningKey PaymentKey)
, envThreads :: Map String (STM.TVar (Maybe AsyncBenchmarkControl))
, envThreads :: STM.TVar (Maybe AsyncBenchmarkControl)
, envWallets :: Map String WalletRef
, envSummary :: Maybe PlutusBudgetSummary
}
Expand All @@ -118,26 +118,21 @@ emptyEnv = Env { protoParams = Nothing
, envProtocol = Nothing
, envNetworkId = Nothing
, envSocketPath = Nothing
, envThreads = Map.empty
-- This never escapes: it's always overridden.
, envThreads = undefined
, envWallets = Map.empty
, envSummary = Nothing
}

mkNewEnv :: STM Env
mkNewEnv = do
ctl <- STM.newTVar Nothing
pure emptyEnv { envThreads = "tx-submit-benchmark" `Map.singleton` ctl }
envThreads <- STM.newTVar Nothing
pure emptyEnv { envThreads }

-- | This abbreviates an `ExceptT` and `RWST` with particular types
-- used as parameters.
type ActionM a = ExceptT Error (RWST IOManager () Env IO) a

-- | This runs an `ActionM` starting with an empty `Env`.
runActionM :: ActionM ret -> IOManager -> IO (Either Error ret, Env, ())
runActionM actions ioManager = do
env <- STM.atomically mkNewEnv
runActionMEnv env actions ioManager

-- | This runs an `ActionM` starting with the `Env` being passed.
runActionMEnv :: Env -> ActionM ret -> IOManager -> IO (Either Error ret, Env, ())
runActionMEnv env action iom = RWS.runRWST (runExceptT action) iom env
Expand Down Expand Up @@ -206,14 +201,10 @@ setEnvSocketPath :: FilePath -> ActionM ()
setEnvSocketPath val = modifyEnv (\e -> e { envSocketPath = Just val })

-- | Write accessor for `envThreads`.
setEnvThreads :: String -> AsyncBenchmarkControl -> ActionM ()
setEnvThreads key val = do
threadMap <- lift $ RWS.gets envThreads
case Map.lookup key threadMap of
Nothing -> do
abcTVar <- liftIO do STM.atomically do STM.newTVar $ Just val
modifyEnv (\env -> env { envThreads = Map.insert key abcTVar threadMap })
Just abcTVar -> liftIO do STM.atomically $ abcTVar `STM.writeTVar` Just val
setEnvThreads :: AsyncBenchmarkControl -> ActionM ()
setEnvThreads abc = do
abcTVar <- lift $ RWS.gets envThreads
liftIO do STM.atomically $ abcTVar `STM.writeTVar` Just abc

-- | Write accessor for `envWallets`.
setEnvWallets :: String -> WalletRef -> ActionM ()
Expand Down Expand Up @@ -267,9 +258,9 @@ getEnvSocketPath :: ActionM SocketPath
getEnvSocketPath = File <$> getEnvVal envSocketPath "SocketPath"

-- | Read accessor for `envThreads`.
getEnvThreads :: String -> ActionM (Maybe AsyncBenchmarkControl)
getEnvThreads key = do
abcTVar <- getEnvMap envThreads key
getEnvThreads :: ActionM (Maybe AsyncBenchmarkControl)
getEnvThreads = do
abcTVar <- lift $ RWS.gets envThreads
liftIO do STM.atomically $ STM.readTVar abcTVar

-- | Read accessor for `envWallets`.
Expand Down
17 changes: 5 additions & 12 deletions bench/tx-generator/src/Cardano/Benchmarking/Script/Selftest.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import Control.Monad
import qualified Data.ByteString.Lazy.Char8 as BSL
import Data.Either (fromRight)
import qualified Data.List as List (unwords)
import qualified Data.Map as Map (lookup)
import Data.String

import Paths_tx_generator
Expand All @@ -50,19 +49,13 @@ runSelftest env iom outFile = do
Env.setBenchTracers initNullTracers
forM_ (testScript protocolFile submitMode) action
(result, Env { envThreads }, ()) <- Env.runActionMEnv env fullScript iom
case "tx-submit-benchmark" `Map.lookup` envThreads of
Nothing -> do
error $ List.unwords
[ "Cardano.Benchmarking.Script.Selftest.runSelftest:"
, "thread state uninitialized" ]
Just abcTVar -> do
abcMaybe <- STM.atomically $ STM.readTVar abcTVar
case abcMaybe of
Nothing -> pure result
Just _ -> do
error $ List.unwords
abcMaybe <- STM.atomically $ STM.readTVar envThreads
case abcMaybe of
Just _ -> error $
List.unwords
[ "Cardano.Benchmarking.Script.Selftest.runSelftest:"
, "thread state spuriously initialized" ]
Nothing -> pure result

-- | 'printJSON' prints out the list of actions using Aeson.
-- It has no callers within @cardano-node@.
Expand Down

0 comments on commit 3c89de4

Please sign in to comment.