diff --git a/bench/Chainweb/Pact/Backend/ForkingBench.hs b/bench/Chainweb/Pact/Backend/ForkingBench.hs index 104e904ec2..8df4b4f14f 100644 --- a/bench/Chainweb/Pact/Backend/ForkingBench.hs +++ b/bench/Chainweb/Pact/Backend/ForkingBench.hs @@ -87,7 +87,7 @@ import Chainweb.Pact.Backend.Compaction qualified as C import Chainweb.Pact.Backend.Types import Chainweb.Pact.Backend.Utils import Chainweb.Pact.PactService -import Chainweb.Pact.Service.BlockValidation +import Chainweb.Pact.Service.BlockValidation as BlockValidation import Chainweb.Pact.Service.PactQueue import Chainweb.Pact.Service.Types import Chainweb.Pact.Types @@ -259,7 +259,8 @@ data Resources , coinAccounts :: !(MVar (Map Account (NonEmpty (DynKeyPair, [SigCapability])))) , nonceCounter :: !(IORef Word64) , txPerBlock :: !(IORef Int) - , sqlEnv :: !SQLiteEnv + , writeSqlEnv :: !SQLiteEnv + , readSqlEnv :: !SQLiteEnv } type RunPactService = @@ -290,46 +291,38 @@ withResources rdb trunkLength logLevel compact f = C.envWithCleanup create destr coinAccounts <- newMVar mempty nonceCounter <- newIORef 1 txPerBlock <- newIORef 10 - sqlEnv <- openSQLiteConnection "" {- temporary SQLite db -} chainwebBenchPragmas + + writeSqlEnv <- openSQLiteConnection "" {- temporary SQLite db -} chainwebPragmas + readSqlEnv <- openSQLiteConnection "" {- temporary SQLite db -} chainwebPragmas mp <- testMemPoolAccess txPerBlock coinAccounts pactService <- - startPact testVer logger blockHeaderDb payloadDb mp sqlEnv + startPact testVer logger blockHeaderDb payloadDb mp (writeSqlEnv, readSqlEnv) mainTrunkBlocks <- playLine payloadDb blockHeaderDb trunkLength genesisBlock (snd pactService) nonceCounter when (compact == DoCompact) $ do C.withDefaultLogger Error $ \lgr -> do - void $ C.compact (BlockHeight trunkLength) lgr sqlEnv [] + void $ C.compact (BlockHeight trunkLength) lgr writeSqlEnv [] return $ NoopNFData $ Resources {..} destroy (NoopNFData (Resources {..})) = do stopPact pactService - stopSqliteDb sqlEnv + stopSqliteDb writeSqlEnv + stopSqliteDb readSqlEnv pactQueueSize = 2000 logger = genericLogger logLevel T.putStrLn - startPact version l bhdb pdb mempool sqlEnv = do + startPact version l bhdb pdb mempool sqlEnvs = do reqQ <- newPactQueue pactQueueSize - a <- async $ runPactService version cid l reqQ mempool bhdb pdb sqlEnv testPactServiceConfig + a <- async $ runPactService version cid l reqQ mempool bhdb pdb sqlEnvs testPactServiceConfig { _pactBlockGasLimit = 180_000 } - return (a, reqQ) stopPact (a, _) = cancel a - chainwebBenchPragmas = - [ "synchronous = NORMAL" - , "journal_mode = WAL" - , "locking_mode = EXCLUSIVE" - -- this is different from the prodcution database that uses @NORMAL@ - , "temp_store = MEMORY" - , "auto_vacuum = NONE" - , "page_size = 1024" - ] - genesisBlock :: BlockHeader genesisBlock = genesisBlockHeader testVer cid @@ -366,7 +359,7 @@ testMemPoolAccess txsPerBlock accounts = do getTestBlock mVarAccounts txOrigTime validate bHeight hash | bHeight == 1 = do meta <- setTime txOrigTime <$> makeMeta cid - (as, kss, cmds) <- unzip3 . toList <$> createCoinAccounts testVer meta + (as, kss, cmds) <- unzip3 <$> createCoinAccounts testVer meta twoNames case traverse validateCommand cmds of Left err -> throwM $ userError err Right !r -> do @@ -461,15 +454,20 @@ stockKey s = do stockKeyFile :: ByteString stockKeyFile = $(embedFile "pact/genesis/devnet/keys.yaml") -createCoinAccounts :: ChainwebVersion -> PublicMeta -> IO (NonEmpty (Account, NonEmpty (DynKeyPair, [SigCapability]), Command Text)) -createCoinAccounts v meta = traverse (go <*> createCoinAccount v meta) names +createCoinAccounts :: ChainwebVersion -> PublicMeta -> [String] -> IO [(Account, NonEmpty (DynKeyPair, [SigCapability]), Command Text)] +createCoinAccounts v meta names' = traverse (go <*> createCoinAccount v meta) names' where go a m = do (b,c) <- m return (Account a,b,c) -names :: NonEmpty String -names = NEL.map safeCapitalize . NEL.fromList $ Prelude.take 2 $ words "mary elizabeth patricia jennifer linda barbara margaret susan dorothy jessica james john robert michael william david richard joseph charles thomas" +twoNames :: [String] +twoNames = take 2 names + +names :: [String] +names = map safeCapitalize $ names' ++ [(n ++ show x) | n <- names', x <- [0 :: Int ..1000]] + where + names' = words "mary elizabeth patricia jennifer linda barbara margaret susan dorothy jessica james john robert michael william david richard joseph charles thomas" formatB16PubKey :: DynKeyPair -> Text formatB16PubKey = \case diff --git a/src/Chainweb/Pact/PactService.hs b/src/Chainweb/Pact/PactService.hs index d996aea23f..cc2e154e13 100644 --- a/src/Chainweb/Pact/PactService.hs +++ b/src/Chainweb/Pact/PactService.hs @@ -101,7 +101,7 @@ import Chainweb.Pact.Backend.RelationalCheckpointer (withProdRelationalCheckpoin import Chainweb.Pact.Backend.Types import Chainweb.Pact.PactService.ExecBlock import Chainweb.Pact.PactService.Checkpointer -import Chainweb.Pact.Service.PactQueue (PactQueue, getNextRequest) +import Chainweb.Pact.Service.PactQueue (PactQueue, getNextWriteRequest, getNextReadRequest) import Chainweb.Pact.Service.Types import Chainweb.Pact.SPV import Chainweb.Pact.TransactionExec @@ -128,13 +128,18 @@ runPactService -> MemPoolAccess -> BlockHeaderDb -> PayloadDb tbl - -> SQLiteEnv + -> (SQLiteEnv, SQLiteEnv) -> PactServiceConfig -> IO () -runPactService ver cid chainwebLogger reqQ mempoolAccess bhDb pdb sqlenv config = - void $ withPactService ver cid chainwebLogger bhDb pdb sqlenv config $ do +runPactService ver cid chainwebLogger reqQ mempoolAccess bhDb pdb (writeSqlEnv, readSqlEnv) config = + void $ withPactService ver cid chainwebLogger bhDb pdb writeSqlEnv config True $ do initialPayloadState mempoolAccess ver cid - serviceRequests mempoolAccess reqQ + pst <- get + pse <- ask + liftIO $ race_ + (runPactServiceM pst pse $ serviceWriteRequests mempoolAccess reqQ) + (threadDelay 1_000_000 >> (withPactService ver cid chainwebLogger bhDb pdb readSqlEnv config False $ + serviceReadRequests mempoolAccess reqQ)) withPactService :: (Logger logger, CanReadablePayloadCas tbl) @@ -145,9 +150,10 @@ withPactService -> PayloadDb tbl -> SQLiteEnv -> PactServiceConfig + -> Bool -> PactServiceM logger tbl a -> IO (T2 a PactServiceState) -withPactService ver cid chainwebLogger bhDb pdb sqlenv config act = +withPactService ver cid chainwebLogger bhDb pdb sqlenv config initLatest act = withProdRelationalCheckpointer checkpointerLogger (_pactModuleCacheLimit config) sqlenv ver cid $ \checkpointer -> do let !rs = readRewards let !pse = PactServiceEnv @@ -197,7 +203,7 @@ withPactService ver cid chainwebLogger bhDb pdb sqlenv config act = -- 'initalPayloadState.readContracts'. We therefore rewind to the latest -- avaliable header in the block header database. -- - exitOnRewindLimitExceeded $ initializeLatestBlock (_pactUnlimitedInitialRewind config) + when initLatest $ exitOnRewindLimitExceeded $ initializeLatestBlock (_pactUnlimitedInitialRewind config) act where pactServiceLogger = setComponent "pact" chainwebLogger @@ -280,40 +286,29 @@ lookupBlockHeader bhash ctx = do throwM $ BlockHeaderLookupFailure $ "failed lookup of parent header in " <> ctx <> ": " <> sshow e --- | Loop forever, serving Pact execution requests and reponses from the queues -serviceRequests +-- | Loop forever, serving Pact execution Write-requests +serviceWriteRequests :: forall logger tbl. (Logger logger, CanReadablePayloadCas tbl) => MemPoolAccess -> PactQueue -> PactServiceM logger tbl () -serviceRequests memPoolAccess reqQ = do - logInfo "Starting service" - go `finally` logInfo "Stopping service" +serviceWriteRequests memPoolAccess reqQ = do + logInfo "Starting write-requests handling service" + go `finally` logInfo "Stopping write-requests handling service" where go :: PactServiceM logger tbl () go = do PactServiceEnv{_psLogger} <- ask - logDebug "serviceRequests: wait" - SubmittedRequestMsg msg statusRef <- liftIO $ getNextRequest reqQ + logDebug "serviceWriteRequests: wait" + SubmittedRequestMsg msg statusRef <- liftIO $ getNextWriteRequest reqQ requestId <- liftIO $ UUID.toText <$> UUID.nextRandom let logFn :: LogFunction logFn = logFunction $ addLabel ("pact-request-id", requestId) _psLogger - logDebug $ "serviceRequests: " <> sshow msg + logDebug $ "serviceWriteRequests: " <> sshow msg case msg of CloseMsg -> tryOne "execClose" statusRef $ return () - LocalMsg (LocalReq localRequest preflight sigVerify rewindDepth) -> do - trace logFn "Chainweb.Pact.PactService.execLocal" () 0 $ - tryOne "execLocal" statusRef $ - execLocal localRequest preflight sigVerify rewindDepth - go - NewBlockMsg NewBlockReq {..} -> do - trace logFn "Chainweb.Pact.PactService.execNewBlock" - () 1 $ - tryOne "execNewBlock" statusRef $ - execNewBlock memPoolAccess _newMiner - go ValidateBlockMsg ValidateBlockReq {..} -> do tryOne "execValidateBlock" statusRef $ fmap fst $ trace' logFn "Chainweb.Pact.PactService.execValidateBlock" @@ -321,6 +316,44 @@ serviceRequests memPoolAccess reqQ = do (\(_, g) -> fromIntegral g) (execValidateBlock memPoolAccess _valBlockHeader _valCheckablePayload) go + SyncToBlockMsg SyncToBlockReq {..} -> do + trace logFn "Chainweb.Pact.PactService.execSyncToBlock" _syncToBlockHeader 1 $ + tryOne "syncToBlockBlock" statusRef $ + execSyncToBlock _syncToBlockHeader + go + _ -> error $ "impossible: unexpected request " ++ show msg + +-- | Loop forever, serving Pact execution Read-requests +serviceReadRequests + :: forall logger tbl. (Logger logger, CanReadablePayloadCas tbl) + => MemPoolAccess + -> PactQueue + -> PactServiceM logger tbl () +serviceReadRequests memPoolAccess reqQ = do + logInfo "Starting read-requests handling service" + go `finally` (logInfo "Stopping read-requests handling service") + where + go = do + logDebug "serviceReadRequests: wait" + SubmittedRequestMsg msg statusRef <- liftIO $ getNextReadRequest reqQ + requestId <- liftIO $ UUID.toText <$> UUID.nextRandom + PactServiceEnv{_psLogger} <- ask + let + logFn :: LogFunction + logFn = logFunction $ addLabel ("pact-request-id", requestId) _psLogger + logDebug $ "serviceReadRequests: " <> sshow msg + case msg of + NewBlockMsg NewBlockReq {..} -> do + trace logFn "Chainweb.Pact.PactService.execNewBlock" + () 1 $ + tryOne "execNewBlock" statusRef $ + execNewBlock memPoolAccess _newMiner + go + LocalMsg (LocalReq localRequest preflight sigVerify rewindDepth) -> do + trace logFn "Chainweb.Pact.PactService.execLocal" () 0 $ + tryOne "execLocal" statusRef $ + execLocal localRequest preflight sigVerify rewindDepth + go LookupPactTxsMsg (LookupPactTxsReq confDepth txHashes) -> do trace logFn "Chainweb.Pact.PactService.execLookupPactTxs" () (length txHashes) $ @@ -343,97 +376,94 @@ serviceRequests memPoolAccess reqQ = do tryOne "execHistoricalLookup" statusRef $ execHistoricalLookup bh d k go - SyncToBlockMsg SyncToBlockReq {..} -> do - trace logFn "Chainweb.Pact.PactService.execSyncToBlock" _syncToBlockHeader 1 $ - tryOne "syncToBlockBlock" statusRef $ - execSyncToBlock _syncToBlockHeader - go ReadOnlyReplayMsg ReadOnlyReplayReq {..} -> do trace logFn "Chainweb.Pact.PactService.execReadOnlyReplay" (_readOnlyReplayLowerBound, _readOnlyReplayUpperBound) 1 $ tryOne "readOnlyReplayBlock" statusRef $ execReadOnlyReplay _readOnlyReplayLowerBound _readOnlyReplayUpperBound go + _ -> error $ "impossible: unexpected request " ++ show msg - tryOne - :: forall a. Text - -> TVar (RequestStatus a) - -> PactServiceM logger tbl a - -> PactServiceM logger tbl () - tryOne which statusRef act = - evalPactOnThread - `catches` - [ Handler $ \(e :: SomeException) -> do - logError $ mconcat - [ "Received exception running pact service (" - , which - , "): " - , sshow e - ] - liftIO $ throwIO e - ] - where - -- here we start a thread to service the request - evalPactOnThread :: PactServiceM logger tbl () - evalPactOnThread = do - maybeException <- withPactState $ \run -> do - goLock <- newEmptyMVar - finishedLock <- newEmptyMVar - -- fork a thread to service the request - bracket - (forkIO $ - flip finally (tryPutMVar finishedLock ()) $ do - -- wait until we've been told to start. - -- we don't want to start if the request was cancelled - -- already - takeMVar goLock - -- run and report the answer. - tryAny (run act) >>= \case - Left ex -> atomically $ writeTVar statusRef (RequestFailed ex) - Right r -> atomically $ writeTVar statusRef (RequestDone r) - ) - -- if Pact itself is killed, kill the request thread too. - (\tid -> throwTo tid RequestCancelled >> takeMVar finishedLock) - (\_tid -> do - -- check first if the request has been cancelled before - -- starting work on it - beforeStarting <- atomically $ do - readTVar statusRef >>= \case - RequestInProgress -> - error "PactService internal error: request in progress before starting" - RequestDone _ -> - error "PactService internal error: request finished before starting" - RequestFailed e -> - return (Left e) - RequestNotStarted -> do - writeTVar statusRef RequestInProgress - return (Right ()) - case beforeStarting of - -- the request has already been cancelled, don't - -- start work on it. - Left ex -> return (Left ex) - Right () -> do - -- let the request thread start working - putMVar goLock () - -- wait until the request thread has finished - atomically $ readTVar statusRef >>= \case - RequestInProgress -> retry - RequestDone _ -> return (Right ()) - RequestFailed e -> return (Left e) - RequestNotStarted -> error "PactService internal error: request not started after starting" - ) - case maybeException of - Left (fromException -> Just AsyncCancelled) -> - logDebug "Pact action was cancelled" - Left (fromException -> Just ThreadKilled) -> - logWarn "Pact action thread was killed" - Left (exn :: SomeException) -> - logError $ mconcat - [ "Received exception running pact service (" - , which - , "): " - , sshow exn - ] - Right () -> return () +tryOne + :: forall logger tbl a. (Logger logger, CanReadablePayloadCas tbl) + => Text + -> TVar (RequestStatus a) + -> PactServiceM logger tbl a + -> PactServiceM logger tbl () +tryOne which statusRef act = + evalPactOnThread + `catches` + [ Handler $ \(e :: SomeException) -> do + logError $ mconcat + [ "Received exception running pact service (" + , which + , "): " + , sshow e + ] + liftIO $ throwIO e + ] + where + -- here we start a thread to service the request + evalPactOnThread :: PactServiceM logger tbl () + evalPactOnThread = do + maybeException <- withPactState $ \run -> do + goLock <- newEmptyMVar + finishedLock <- newEmptyMVar + -- fork a thread to service the request + bracket + (forkIO $ + flip finally (tryPutMVar finishedLock ()) $ do + -- wait until we've been told to start. + -- we don't want to start if the request was cancelled + -- already + takeMVar goLock + -- run and report the answer. + tryAny (run act) >>= \case + Left ex -> atomically $ writeTVar statusRef (RequestFailed ex) + Right r -> atomically $ writeTVar statusRef (RequestDone r) + ) + -- if Pact itself is killed, kill the request thread too. + (\tid -> throwTo tid RequestCancelled >> takeMVar finishedLock) + (\_tid -> do + -- check first if the request has been cancelled before + -- starting work on it + beforeStarting <- atomically $ do + readTVar statusRef >>= \case + RequestInProgress -> + error "PactService internal error: request in progress before starting" + RequestDone _ -> + error "PactService internal error: request finished before starting" + RequestFailed e -> + return (Left e) + RequestNotStarted -> do + writeTVar statusRef RequestInProgress + return (Right ()) + case beforeStarting of + -- the request has already been cancelled, don't + -- start work on it. + Left ex -> return (Left ex) + Right () -> do + -- let the request thread start working + putMVar goLock () + -- wait until the request thread has finished + atomically $ readTVar statusRef >>= \case + RequestInProgress -> retry + RequestDone _ -> return (Right ()) + RequestFailed e -> return (Left e) + RequestNotStarted -> error "PactService internal error: request not started after starting" + ) + case maybeException of + Left (fromException -> Just AsyncCancelled) -> + logDebug "Pact action was cancelled" + Left (fromException -> Just ThreadKilled) -> + logWarn "Pact action thread was killed" + Left (exn :: SomeException) -> + logError $ mconcat + [ "Received exception running pact service (" + , which + , "): " + , sshow exn + ] + Right () -> return () execNewBlock :: forall logger tbl. (Logger logger, CanReadablePayloadCas tbl) diff --git a/src/Chainweb/Pact/Service/PactInProcApi.hs b/src/Chainweb/Pact/Service/PactInProcApi.hs index 132f70ff91..adbb24420f 100644 --- a/src/Chainweb/Pact/Service/PactInProcApi.hs +++ b/src/Chainweb/Pact/Service/PactInProcApi.hs @@ -67,8 +67,9 @@ withPactService -> (PactQueue -> IO a) -> IO a withPactService ver cid logger mpc bhdb pdb pactDbDir config action = - withSqliteDb cid logger pactDbDir (_pactResetDb config) $ \sqlenv -> - withPactService' ver cid logger mpa bhdb pdb sqlenv config action + withSqliteDb cid logger pactDbDir (_pactResetDb config) $ \writeSqlEnv -> + withSqliteDb cid logger pactDbDir (_pactResetDb config) $ \readSqlEnv -> + withPactService' ver cid logger mpa bhdb pdb (writeSqlEnv, readSqlEnv) config action where mpa = pactMemPoolAccess mpc $ addLabel ("sub-component", "MempoolAccess") logger @@ -84,18 +85,18 @@ withPactService' -> MemPoolAccess -> BlockHeaderDb -> PayloadDb tbl - -> SQLiteEnv + -> (SQLiteEnv, SQLiteEnv) -> PactServiceConfig -> (PactQueue -> IO a) -> IO a -withPactService' ver cid logger memPoolAccess bhDb pdb sqlenv config action = do +withPactService' ver cid logger memPoolAccess bhDb pdb sqlenvs config action = do reqQ <- newPactQueue (_pactQueueSize config) race (concurrently_ (monitor reqQ) (server reqQ)) (action reqQ) >>= \case Left () -> error "Chainweb.Pact.Service.PactInProcApi: pact service terminated unexpectedly" Right a -> return a where server reqQ = runForever logg "pact-service" - $ PS.runPactService ver cid logger reqQ memPoolAccess bhDb pdb sqlenv config + $ PS.runPactService ver cid logger reqQ memPoolAccess bhDb pdb sqlenvs config logg = logFunction logger monitor = runPactServiceQueueMonitor $ addLabel ("sub-component", "PactQueue") logger diff --git a/src/Chainweb/Pact/Service/PactQueue.hs b/src/Chainweb/Pact/Service/PactQueue.hs index b06ca0c66d..114e9d4628 100644 --- a/src/Chainweb/Pact/Service/PactQueue.hs +++ b/src/Chainweb/Pact/Service/PactQueue.hs @@ -22,7 +22,8 @@ module Chainweb.Pact.Service.PactQueue , waitForSubmittedRequest , submitRequestAnd , submitRequestAndWait -, getNextRequest +, getNextWriteRequest +, getNextReadRequest , getPactQueueStats , newPactQueue , resetPactQueueStats @@ -57,9 +58,11 @@ import Chainweb.Utils -- other requests. -- data PactQueue = PactQueue - { _pactQueueValidateBlock :: !(TBQueue (T2 SubmittedRequestMsg (Time Micros))) - , _pactQueueNewBlock :: !(TBQueue (T2 SubmittedRequestMsg (Time Micros))) - , _pactQueueOtherMsg :: !(TBQueue (T2 SubmittedRequestMsg (Time Micros))) + { _pactQueueWriteRequests :: !(TBQueue (T2 SubmittedRequestMsg (Time Micros))) + , _pactQueueCloseRequests :: !(TBQueue (T2 SubmittedRequestMsg (Time Micros))) + -- NewBlock requests are Read-requests as well but prioritize them with their own queue + , _pactQueueNewBlockRequests :: !(TBQueue (T2 SubmittedRequestMsg (Time Micros))) + , _pactQueueReadRequests :: !(TBQueue (T2 SubmittedRequestMsg (Time Micros))) , _pactQueuePactQueueValidateBlockMsgCounters :: !(IORef PactQueueCounters) , _pactQueuePactQueueNewBlockMsgCounters :: !(IORef PactQueueCounters) , _pactQueuePactQueueOtherMsgCounters :: !(IORef PactQueueCounters) @@ -78,6 +81,7 @@ newPactQueue sz = PactQueue <$> newTBQueueIO sz <*> newTBQueueIO sz <*> newTBQueueIO sz + <*> newTBQueueIO sz <*> newIORef initPactQueueCounters <*> newIORef initPactQueueCounters <*> newIORef initPactQueueCounters @@ -93,9 +97,13 @@ addRequest q msg = do return statusRef where priority = case msg of - ValidateBlockMsg {} -> _pactQueueValidateBlock q - NewBlockMsg {} -> _pactQueueNewBlock q - _ -> _pactQueueOtherMsg q + -- Write-requests + ValidateBlockMsg {} -> _pactQueueWriteRequests q + SyncToBlockMsg {} -> _pactQueueWriteRequests q + CloseMsg -> _pactQueueCloseRequests q + -- Read-requests + NewBlockMsg {} -> _pactQueueNewBlockRequests q + _ -> _pactQueueReadRequests q -- | Cancel a request that's already been submitted to the Pact queue. -- @@ -135,14 +143,12 @@ submitRequestAnd q msg k = mask $ \restore -> do submitRequestAndWait :: PactQueue -> RequestMsg r -> IO r submitRequestAndWait q msg = submitRequestAnd q msg waitForSubmittedRequest --- | Get the next available request from the Pact execution queue +-- | Get the next available Write-request from the Pact execution queue -- -getNextRequest :: PactQueue -> IO SubmittedRequestMsg -getNextRequest q = do - T2 req entranceTime <- atomically - $ tryReadTBQueueOrRetry (_pactQueueValidateBlock q) - <|> tryReadTBQueueOrRetry (_pactQueueNewBlock q) - <|> tryReadTBQueueOrRetry (_pactQueueOtherMsg q) +getNextWriteRequest :: PactQueue -> IO SubmittedRequestMsg +getNextWriteRequest q = do + T2 req entranceTime <- atomically $ tryReadTBQueueOrRetry (_pactQueueWriteRequests q) + <|> tryReadTBQueueOrRetry (_pactQueueCloseRequests q) requestTime <- diff <$> getCurrentTimeIntegral <*> pure entranceTime updatePactQueueCounters (counters req q) requestTime return req @@ -152,6 +158,23 @@ getNextRequest q = do Just msg -> return msg counters (SubmittedRequestMsg ValidateBlockMsg{} _) = _pactQueuePactQueueValidateBlockMsgCounters + counters (SubmittedRequestMsg NewBlockMsg{} _) = error "getNextWriteRequest.counters.impossible" + counters _ = _pactQueuePactQueueOtherMsgCounters + +-- | Get the next available Read-request from the Pact execution queue +getNextReadRequest :: PactQueue -> IO SubmittedRequestMsg +getNextReadRequest q = do + T2 req entranceTime <- atomically $ tryReadTBQueueOrRetry (_pactQueueNewBlockRequests q) + <|> tryReadTBQueueOrRetry (_pactQueueReadRequests q) + requestTime <- diff <$> getCurrentTimeIntegral <*> pure entranceTime + updatePactQueueCounters (counters req q) requestTime + return req + where + tryReadTBQueueOrRetry = tryReadTBQueue >=> \case + Nothing -> retry + Just msg -> return msg + + counters (SubmittedRequestMsg ValidateBlockMsg{} _) = error "getNextReadRequest.counters.impossible" counters (SubmittedRequestMsg NewBlockMsg{} _) = _pactQueuePactQueueNewBlockMsgCounters counters _ = _pactQueuePactQueueOtherMsgCounters diff --git a/test/Chainweb/Test/Pact/ModuleCacheOnRestart.hs b/test/Chainweb/Test/Pact/ModuleCacheOnRestart.hs index d436a071f6..3ac0d9e999 100644 --- a/test/Chainweb/Test/Pact/ModuleCacheOnRestart.hs +++ b/test/Chainweb/Test/Pact/ModuleCacheOnRestart.hs @@ -296,7 +296,7 @@ withPact' bdbio ioSqlEnv r (ps, cacheTest) tastylog = do let pdb = _bdbPayloadDb bdb sqlEnv <- ioSqlEnv T2 _ pstate <- withPactService - testVer testChainId logger bhdb pdb sqlEnv testPactServiceConfig ps + testVer testChainId logger bhdb pdb sqlEnv testPactServiceConfig True ps cacheTest r (_psInitCache pstate) where logger = genericLogger Quiet (tastylog . T.unpack) diff --git a/test/Chainweb/Test/Pact/PactSingleChainTest.hs b/test/Chainweb/Test/Pact/PactSingleChainTest.hs index b553470348..e1b705ef63 100644 --- a/test/Chainweb/Test/Pact/PactSingleChainTest.hs +++ b/test/Chainweb/Test/Pact/PactSingleChainTest.hs @@ -18,7 +18,7 @@ module Chainweb.Test.Pact.PactSingleChainTest ) where import Control.Arrow ((&&&)) -import Control.Concurrent (forkIO) +import Control.Concurrent (forkIO, threadDelay) import Control.Concurrent.MVar import Control.DeepSeq import Control.Lens hiding ((.=), matching) @@ -264,12 +264,13 @@ rosettaFailsWithoutFullHistory rdb = pactQueue <- newPactQueue 2000 blockDb <- mkTestBlockDb testVersion rdb bhDb <- getWebBlockHeaderDb (_bdbWebBlockHeaderDb blockDb) cid - sqlEnv <- sqlEnvIO + writeSqlEnv <- sqlEnvIO + readSqlEnv <- sqlEnvIO mempool <- fmap snd dm let payloadDb = _bdbPayloadDb blockDb let cfg = testPactServiceConfig { _pactFullHistoryRequired = True } let logger = genericLogger System.LogLevel.Error (\_ -> return ()) - e <- try $ runPactService testVersion cid logger pactQueue mempool bhDb payloadDb sqlEnv cfg + e <- try $ runPactService testVersion cid logger pactQueue mempool bhDb payloadDb (writeSqlEnv, readSqlEnv) cfg case e of Left (FullHistoryRequired {}) -> do pure () @@ -963,7 +964,8 @@ compactionSetup pat rdb pactCfg f = blockDb <- mkTestBlockDb testVersion rdb bhDb <- getWebBlockHeaderDb (_bdbWebBlockHeaderDb blockDb) cid let payloadDb = _bdbPayloadDb blockDb - sqlEnv <- sqlEnvIO + writeSqlEnv <- sqlEnvIO + readSqlEnv <- sqlEnvIO (mempoolRef, mempool) <- do (ref, nonRef) <- dm pure (pure ref, nonRef) @@ -971,14 +973,15 @@ compactionSetup pat rdb pactCfg f = let logger = genericLogger System.LogLevel.Error (\_ -> return ()) - void $ forkIO $ runPactService testVersion cid logger pactQueue mempool bhDb payloadDb sqlEnv pactCfg + void $ forkIO $ runPactService testVersion cid logger pactQueue mempool bhDb payloadDb (writeSqlEnv,readSqlEnv) pactCfg + threadDelay 1_000_000 setOneShotMempool mempoolRef goldenMemPool f $ CompactionResources { mempoolRef = mempoolRef , mempool = mempool - , sqlEnv = sqlEnv + , sqlEnv = writeSqlEnv , pactQueue = pactQueue , blockDb = blockDb } diff --git a/test/Chainweb/Test/Pact/TTL.hs b/test/Chainweb/Test/Pact/TTL.hs index a00bf24861..6d98d2efbd 100644 --- a/test/Chainweb/Test/Pact/TTL.hs +++ b/test/Chainweb/Test/Pact/TTL.hs @@ -241,9 +241,10 @@ doValidateBlock doValidateBlock ctxIO header payload = do ctx <- ctxIO _mv' <- validateBlock header (CheckablePayloadWithOutputs payload) $ _ctxQueue ctx + addNewPayload (_ctxPdb ctx) (_blockHeight header) payload unsafeInsertBlockHeaderDb (_ctxBdb ctx) header - -- FIXME FIXME FIXME: do at least some checks? + -- FIXME FIXME: do at least some checks? -- -------------------------------------------------------------------------- -- -- Misc Utils diff --git a/test/Chainweb/Test/Pact/Utils.hs b/test/Chainweb/Test/Pact/Utils.hs index c22f54b002..fabd911c51 100644 --- a/test/Chainweb/Test/Pact/Utils.hs +++ b/test/Chainweb/Test/Pact/Utils.hs @@ -904,13 +904,14 @@ withPactTestBlockDb' version cid rdb sqlEnvIO mempoolIO pactConfig f = startPact bdbio = do reqQ <- newPactQueue 2000 bdb <- bdbio - sqlEnv <- sqlEnvIO + writeSqlEnv <- sqlEnvIO + readSqlEnv <- sqlEnvIO mempool <- mempoolIO bhdb <- getWebBlockHeaderDb (_bdbWebBlockHeaderDb bdb) cid let pdb = _bdbPayloadDb bdb a <- async $ runForever (\_ _ -> return ()) "Chainweb.Test.Pact.Utils.withPactTestBlockDb" $ - runPactService version cid logger reqQ mempool bhdb pdb sqlEnv pactConfig - return (a, (sqlEnv,reqQ,bdb)) + runPactService version cid logger reqQ mempool bhdb pdb (writeSqlEnv, readSqlEnv) pactConfig + return (a, (writeSqlEnv,reqQ,bdb)) stopPact (a, _) = cancel a @@ -963,10 +964,11 @@ withPactTestBlockDb version cid rdb mempoolIO pactConfig f = mempool <- mempoolIO bhdb <- getWebBlockHeaderDb (_bdbWebBlockHeaderDb bdb) cid let pdb = _bdbPayloadDb bdb - sqlEnv <- startSqliteDb cid logger dir False + writeSqlEnv <- startSqliteDb cid logger dir False + readSqlEnv <- startSqliteDb cid logger dir False a <- async $ runForever (\_ _ -> return ()) "Chainweb.Test.Pact.Utils.withPactTestBlockDb" $ - runPactService version cid logger reqQ mempool bhdb pdb sqlEnv pactConfig - return (a, (sqlEnv,reqQ,bdb)) + runPactService version cid logger reqQ mempool bhdb pdb (writeSqlEnv, readSqlEnv) pactConfig + return (a, (writeSqlEnv,reqQ,bdb)) stopPact (a, (sqlEnv, _, _)) = cancel a >> stopSqliteDb sqlEnv diff --git a/tools/ea/Ea.hs b/tools/ea/Ea.hs index bce1bef14c..d5aa58e31d 100644 --- a/tools/ea/Ea.hs +++ b/tools/ea/Ea.hs @@ -179,7 +179,7 @@ genPayloadModule v tag cid cwTxs = pdb <- newPayloadDb withSystemTempDirectory "ea-pact-db" $ \pactDbDir -> do T2 payloadWO _ <- withSqliteDb cid logger pactDbDir False $ \env -> - withPactService v cid logger bhdb pdb env testPactServiceConfig $ + withPactService v cid logger bhdb pdb env testPactServiceConfig True $ execNewGenesisBlock noMiner (V.fromList cwTxs) return $ TL.toStrict $ TB.toLazyText $ payloadModuleCode tag payloadWO