From c84dcbe5e2bea7bb34c50e8aa4c9d5c87c6d038a Mon Sep 17 00:00:00 2001 From: Evgenii Akentev Date: Mon, 4 Mar 2024 21:23:17 +0400 Subject: [PATCH] Process PactQueue concurrently with read-only connection. Signed-off-by: Evgenii Akentev Change-Id: Ia007a565d75c625ddb243534a546d57584ec8e7d --- bench/Chainweb/Pact/Backend/Bench.hs | 2 +- bench/Chainweb/Pact/Backend/ForkingBench.hs | 46 ++- src/Chainweb/Chainweb.hs | 1 - src/Chainweb/Pact/Backend/ChainwebPactDb.hs | 12 +- src/Chainweb/Pact/Backend/PactState.hs | 3 +- .../Pact/Backend/PactState/GrandHash/Calc.hs | 4 +- .../Backend/PactState/GrandHash/Import.hs | 6 +- .../Pact/Backend/PactState/GrandHash/Utils.hs | 18 +- .../Pact/Backend/RelationalCheckpointer.hs | 38 +-- src/Chainweb/Pact/Backend/Types.hs | 11 +- src/Chainweb/Pact/Backend/Utils.hs | 86 ++++-- src/Chainweb/Pact/PactService.hs | 266 ++++++++++-------- src/Chainweb/Pact/Service/PactInProcApi.hs | 13 +- src/Chainweb/Pact/Service/PactQueue.hs | 50 +++- test/Chainweb/Test/Pact/Checkpointer.hs | 8 +- .../Test/Pact/ModuleCacheOnRestart.hs | 4 +- test/Chainweb/Test/Pact/PactMultiChainTest.hs | 4 +- test/Chainweb/Test/Pact/PactReplay.hs | 13 +- .../Chainweb/Test/Pact/PactSingleChainTest.hs | 46 +-- test/Chainweb/Test/Pact/RemotePactTest.hs | 2 +- test/Chainweb/Test/Pact/SQLite.hs | 20 +- test/Chainweb/Test/Pact/TTL.hs | 3 +- test/Chainweb/Test/Pact/Utils.hs | 44 +-- test/Chainweb/Test/Utils.hs | 11 +- tools/cwtool/TxSimulator.hs | 2 +- tools/db-checksum/CheckpointerDBChecksum.hs | 5 +- tools/ea/Ea.hs | 3 +- 27 files changed, 416 insertions(+), 305 deletions(-) diff --git a/bench/Chainweb/Pact/Backend/Bench.hs b/bench/Chainweb/Pact/Backend/Bench.hs index 8bd9f98ae1..1645b1bbc9 100644 --- a/bench/Chainweb/Pact/Backend/Bench.hs +++ b/bench/Chainweb/Pact/Backend/Bench.hs @@ -146,7 +146,7 @@ cpWithBench torun = let neverLogger = genericLogger Error (\_ -> return ()) !sqliteEnv <- openSQLiteConnection dbFile chainwebPragmas !cenv <- - initRelationalCheckpointer defaultModuleCacheLimit sqliteEnv neverLogger testVer testChainId + initRelationalCheckpointer defaultModuleCacheLimit (SQLiteEnv ReadWrite sqliteEnv) neverLogger testVer testChainId return $ NoopNFData (sqliteEnv, cenv) teardown (NoopNFData (sqliteEnv, _cenv)) = closeSQLiteConnection sqliteEnv diff --git a/bench/Chainweb/Pact/Backend/ForkingBench.hs b/bench/Chainweb/Pact/Backend/ForkingBench.hs index 104e904ec2..a6a4d6e0b3 100644 --- a/bench/Chainweb/Pact/Backend/ForkingBench.hs +++ b/bench/Chainweb/Pact/Backend/ForkingBench.hs @@ -87,7 +87,7 @@ import Chainweb.Pact.Backend.Compaction qualified as C import Chainweb.Pact.Backend.Types import Chainweb.Pact.Backend.Utils import Chainweb.Pact.PactService -import Chainweb.Pact.Service.BlockValidation +import Chainweb.Pact.Service.BlockValidation as BlockValidation import Chainweb.Pact.Service.PactQueue import Chainweb.Pact.Service.Types import Chainweb.Pact.Types @@ -259,7 +259,8 @@ data Resources , coinAccounts :: !(MVar (Map Account (NonEmpty (DynKeyPair, [SigCapability])))) , nonceCounter :: !(IORef Word64) , txPerBlock :: !(IORef Int) - , sqlEnv :: !SQLiteEnv + , writeSqlEnv :: !Database + , readSqlEnv :: !Database } type RunPactService = @@ -290,46 +291,38 @@ withResources rdb trunkLength logLevel compact f = C.envWithCleanup create destr coinAccounts <- newMVar mempty nonceCounter <- newIORef 1 txPerBlock <- newIORef 10 - sqlEnv <- openSQLiteConnection "" {- temporary SQLite db -} chainwebBenchPragmas + + writeSqlEnv <- openSQLiteConnection "" {- temporary SQLite db -} chainwebPragmas + readSqlEnv <- openSQLiteConnection "" {- temporary SQLite db -} chainwebPragmas mp <- testMemPoolAccess txPerBlock coinAccounts pactService <- - startPact testVer logger blockHeaderDb payloadDb mp sqlEnv + startPact testVer logger blockHeaderDb payloadDb mp (writeSqlEnv, readSqlEnv) mainTrunkBlocks <- playLine payloadDb blockHeaderDb trunkLength genesisBlock (snd pactService) nonceCounter when (compact == DoCompact) $ do C.withDefaultLogger Error $ \lgr -> do - void $ C.compact (BlockHeight trunkLength) lgr sqlEnv [] + void $ C.compact (BlockHeight trunkLength) lgr writeSqlEnv [] return $ NoopNFData $ Resources {..} destroy (NoopNFData (Resources {..})) = do stopPact pactService - stopSqliteDb sqlEnv + stopSqliteDb writeSqlEnv + stopSqliteDb readSqlEnv pactQueueSize = 2000 logger = genericLogger logLevel T.putStrLn - startPact version l bhdb pdb mempool sqlEnv = do + startPact version l bhdb pdb mempool sqlEnvs = do reqQ <- newPactQueue pactQueueSize - a <- async $ runPactService version cid l reqQ mempool bhdb pdb sqlEnv testPactServiceConfig + a <- async $ runPactService version cid l reqQ mempool bhdb pdb sqlEnvs testPactServiceConfig { _pactBlockGasLimit = 180_000 } - return (a, reqQ) stopPact (a, _) = cancel a - chainwebBenchPragmas = - [ "synchronous = NORMAL" - , "journal_mode = WAL" - , "locking_mode = EXCLUSIVE" - -- this is different from the prodcution database that uses @NORMAL@ - , "temp_store = MEMORY" - , "auto_vacuum = NONE" - , "page_size = 1024" - ] - genesisBlock :: BlockHeader genesisBlock = genesisBlockHeader testVer cid @@ -366,7 +359,7 @@ testMemPoolAccess txsPerBlock accounts = do getTestBlock mVarAccounts txOrigTime validate bHeight hash | bHeight == 1 = do meta <- setTime txOrigTime <$> makeMeta cid - (as, kss, cmds) <- unzip3 . toList <$> createCoinAccounts testVer meta + (as, kss, cmds) <- unzip3 <$> createCoinAccounts testVer meta twoNames case traverse validateCommand cmds of Left err -> throwM $ userError err Right !r -> do @@ -461,15 +454,20 @@ stockKey s = do stockKeyFile :: ByteString stockKeyFile = $(embedFile "pact/genesis/devnet/keys.yaml") -createCoinAccounts :: ChainwebVersion -> PublicMeta -> IO (NonEmpty (Account, NonEmpty (DynKeyPair, [SigCapability]), Command Text)) -createCoinAccounts v meta = traverse (go <*> createCoinAccount v meta) names +createCoinAccounts :: ChainwebVersion -> PublicMeta -> [String] -> IO [(Account, NonEmpty (DynKeyPair, [SigCapability]), Command Text)] +createCoinAccounts v meta names' = traverse (go <*> createCoinAccount v meta) names' where go a m = do (b,c) <- m return (Account a,b,c) -names :: NonEmpty String -names = NEL.map safeCapitalize . NEL.fromList $ Prelude.take 2 $ words "mary elizabeth patricia jennifer linda barbara margaret susan dorothy jessica james john robert michael william david richard joseph charles thomas" +twoNames :: [String] +twoNames = take 2 names + +names :: [String] +names = map safeCapitalize $ names' ++ [(n ++ show x) | n <- names', x <- [0 :: Int ..1000]] + where + names' = words "mary elizabeth patricia jennifer linda barbara margaret susan dorothy jessica james john robert michael william david richard joseph charles thomas" formatB16PubKey :: DynKeyPair -> Text formatB16PubKey = \case diff --git a/src/Chainweb/Chainweb.hs b/src/Chainweb/Chainweb.hs index a3e2c1f5bb..58b790d6a9 100644 --- a/src/Chainweb/Chainweb.hs +++ b/src/Chainweb/Chainweb.hs @@ -362,7 +362,6 @@ withChainwebInternal -> (StartedChainweb logger -> IO ()) -> IO () withChainwebInternal conf logger peer serviceSock rocksDb pactDbDir backupDir resetDb inner = do - unless (_configOnlySyncPact conf || _configReadOnlyReplay conf) $ initializePayloadDb v payloadDb diff --git a/src/Chainweb/Pact/Backend/ChainwebPactDb.hs b/src/Chainweb/Pact/Backend/ChainwebPactDb.hs index e33fa2a0f4..c00c230431 100644 --- a/src/Chainweb/Pact/Backend/ChainwebPactDb.hs +++ b/src/Chainweb/Pact/Backend/ChainwebPactDb.hs @@ -639,7 +639,7 @@ createVersionedTable tablename db = do -- | Delete any state from the database newer than the input parent header. rewindDbTo - :: SQLiteEnv + :: Database -> Maybe ParentHeader -> IO () rewindDbTo db Nothing = rewindDbToGenesis db @@ -649,7 +649,7 @@ rewindDbTo db mh@(Just (ParentHeader ph)) = do -- rewind before genesis, delete all user tables and all rows in all tables rewindDbToGenesis - :: SQLiteEnv + :: Database -> IO () rewindDbToGenesis db = do exec_ db "DELETE FROM BlockHistory;" @@ -735,7 +735,7 @@ rewindDbToBlock db bh endingTxId = do exec' db "DELETE FROM TransactionIndex WHERE blockheight > ?;" [ SInt (fromIntegral bh) ] -commitBlockStateToDatabase :: SQLiteEnv -> BlockHash -> BlockHeight -> BlockState -> IO () +commitBlockStateToDatabase :: Database -> BlockHash -> BlockHeight -> BlockState -> IO () commitBlockStateToDatabase db hsh bh blockState = do let newTables = _pendingTableCreation $ _bsPendingBlock blockState mapM_ (\tn -> createUserTable (Utf8 tn)) newTables @@ -810,7 +810,7 @@ commitBlockStateToDatabase db hsh bh blockState = do -- | Create all tables that exist pre-genesis -initSchema :: (Logger logger) => logger -> SQLiteEnv -> IO () +initSchema :: (Logger logger) => logger -> Database -> IO () initSchema logger sql = withSavepoint sql DbTransaction $ do createBlockHistoryTable @@ -862,12 +862,12 @@ initSchema logger sql = "CREATE INDEX IF NOT EXISTS \ \ transactionIndexByBH ON TransactionIndex(blockheight)"; -getEndTxId :: Text -> SQLiteEnv -> Maybe ParentHeader -> IO TxId +getEndTxId :: Text -> Database -> Maybe ParentHeader -> IO TxId getEndTxId msg sql pc = case pc of Nothing -> return 0 Just (ParentHeader ph) -> getEndTxId' msg sql (_blockHeight ph) (_blockHash ph) -getEndTxId' :: Text -> SQLiteEnv -> BlockHeight -> BlockHash -> IO TxId +getEndTxId' :: Text -> Database -> BlockHeight -> BlockHash -> IO TxId getEndTxId' msg sql bh bhsh = do r <- qry sql "SELECT endingtxid FROM BlockHistory WHERE blockheight = ? and hash = ?;" diff --git a/src/Chainweb/Pact/Backend/PactState.hs b/src/Chainweb/Pact/Backend/PactState.hs index 9808ffa53f..7a73e71348 100644 --- a/src/Chainweb/Pact/Backend/PactState.hs +++ b/src/Chainweb/Pact/Backend/PactState.hs @@ -68,7 +68,6 @@ import Database.SQLite3.Direct qualified as SQL import Chainweb.BlockHeight (BlockHeight(..)) import Chainweb.Logger (Logger, addLabel) -import Chainweb.Pact.Backend.Types (SQLiteEnv) import Chainweb.Pact.Backend.Utils (fromUtf8, withSqliteDb) import Chainweb.Utils (int) import Chainweb.Version (ChainId, ChainwebVersion, chainIdToText) @@ -140,7 +139,7 @@ withChainDb :: (Logger logger) => ChainId -> logger -> FilePath - -> (logger -> SQLiteEnv -> IO x) + -> (logger -> Database -> IO x) -> IO x withChainDb cid logger' path f = do let logger = addChainIdLabel cid logger' diff --git a/src/Chainweb/Pact/Backend/PactState/GrandHash/Calc.hs b/src/Chainweb/Pact/Backend/PactState/GrandHash/Calc.hs index fea74d363a..374834b3dc 100644 --- a/src/Chainweb/Pact/Backend/PactState/GrandHash/Calc.hs +++ b/src/Chainweb/Pact/Backend/PactState/GrandHash/Calc.hs @@ -32,7 +32,7 @@ import Chainweb.Pact.Backend.PactState.EmbeddedSnapshot (Snapshot(..)) import Chainweb.Pact.Backend.PactState.EmbeddedSnapshot.Mainnet qualified as MainnetSnapshot import Chainweb.Pact.Backend.PactState.GrandHash.Algorithm (ChainGrandHash(..)) import Chainweb.Pact.Backend.PactState.GrandHash.Utils (resolveLatestCutHeaders, resolveCutHeadersAtHeights, computeGrandHashesAt, withConnections, hex, rocksParser, cwvParser) -import Chainweb.Pact.Backend.Types (SQLiteEnv) +import Chainweb.Pact.Backend.Types (Database) import Chainweb.Storage.Table.RocksDB (RocksDb, withReadOnlyRocksDb, modernDefaultOptions) import Chainweb.Utils (sshow) import Chainweb.Version (ChainwebVersion(..), ChainwebVersionName(..)) @@ -75,7 +75,7 @@ data BlockHeightTargets pactCalc :: (Logger logger) => logger -> ChainwebVersion - -> HashMap ChainId SQLiteEnv + -> HashMap ChainId Database -- ^ pact database dir -> RocksDb -- ^ rocksdb dir diff --git a/src/Chainweb/Pact/Backend/PactState/GrandHash/Import.hs b/src/Chainweb/Pact/Backend/PactState/GrandHash/Import.hs index 5cf5946df0..1db25fd0d4 100644 --- a/src/Chainweb/Pact/Backend/PactState/GrandHash/Import.hs +++ b/src/Chainweb/Pact/Backend/PactState/GrandHash/Import.hs @@ -62,7 +62,7 @@ import Chainweb.Pact.Backend.PactState.EmbeddedSnapshot (Snapshot(..)) import Chainweb.Pact.Backend.PactState.EmbeddedSnapshot.Mainnet qualified as MainnetSnapshots import Chainweb.Pact.Backend.PactState.GrandHash.Utils (resolveLatestCutHeaders, resolveCutHeadersAtHeight, computeGrandHashesAt, exitLog, withConnections, chainwebDbFilePath, rocksParser, cwvParser) import Chainweb.Pact.Backend.RelationalCheckpointer (withProdRelationalCheckpointer) -import Chainweb.Pact.Backend.Types (SQLiteEnv, _cpRewindTo) +import Chainweb.Pact.Backend.Types (Database, _cpRewindTo, SQLiteEnv(..), SQLiteConnectionType(..)) import Chainweb.Pact.Types (defaultModuleCacheLimit) import Chainweb.Storage.Table.RocksDB (RocksDb, withReadOnlyRocksDb, modernDefaultOptions) import Chainweb.Utils (sshow) @@ -92,7 +92,7 @@ import System.LogLevel (LogLevel(..)) pactVerify :: (Logger logger) => logger -> ChainwebVersion - -> HashMap ChainId SQLiteEnv + -> HashMap ChainId Database -- ^ pact connections -> RocksDb -- ^ rocksDb @@ -186,7 +186,7 @@ pactDropPostVerified logger v srcDir tgtDir snapshotBlockHeight snapshotChainHas let logger' = addChainIdLabel cid logger logFunctionText logger' Info $ "Dropping anything post verified state (BlockHeight " <> sshow snapshotBlockHeight <> ")" - withProdRelationalCheckpointer logger defaultModuleCacheLimit sqliteEnv v cid $ \cp -> do + withProdRelationalCheckpointer logger defaultModuleCacheLimit (SQLiteEnv ReadWrite sqliteEnv) v cid $ \cp -> do _cpRewindTo cp (Just $ ParentHeader $ blockHeader $ snapshotChainHashes ^?! ix cid) data PactImportConfig = PactImportConfig diff --git a/src/Chainweb/Pact/Backend/PactState/GrandHash/Utils.hs b/src/Chainweb/Pact/Backend/PactState/GrandHash/Utils.hs index 532fc98482..4aaf438fbc 100644 --- a/src/Chainweb/Pact/Backend/PactState/GrandHash/Utils.hs +++ b/src/Chainweb/Pact/Backend/PactState/GrandHash/Utils.hs @@ -33,7 +33,7 @@ import Chainweb.Logger (Logger, logFunctionText) import Chainweb.Pact.Backend.PactState (getLatestPactStateAt, getLatestBlockHeight, addChainIdLabel) import Chainweb.Pact.Backend.PactState.EmbeddedSnapshot (Snapshot(..)) import Chainweb.Pact.Backend.PactState.GrandHash.Algorithm (computeGrandHash) -import Chainweb.Pact.Backend.Types (SQLiteEnv) +import Chainweb.Pact.Backend.Types (Database) import Chainweb.Pact.Backend.Utils (startSqliteDb, stopSqliteDb) import Chainweb.Storage.Table.RocksDB (RocksDb) import Chainweb.TreeDB (seekAncestor) @@ -69,7 +69,7 @@ limitCut :: (Logger logger) => logger -> WebBlockHeaderDb -> HashMap ChainId BlockHeader -- ^ latest cut headers - -> HashMap ChainId SQLiteEnv + -> HashMap ChainId Database -> BlockHeight -> IO (HashMap ChainId BlockHeader) limitCut logger wbhdb latestCutHeaders pactConns blockHeight = do @@ -117,7 +117,7 @@ getLatestCutHeaders v rocksDb = do resolveLatestCutHeaders :: (Logger logger) => logger -> ChainwebVersion - -> HashMap ChainId SQLiteEnv + -> HashMap ChainId Database -> RocksDb -> IO (BlockHeight, HashMap ChainId BlockHeader) resolveLatestCutHeaders logger v pactConns rocksDb = do @@ -131,7 +131,7 @@ resolveLatestCutHeaders logger v pactConns rocksDb = do resolveCutHeadersAtHeight :: (Logger logger) => logger -> ChainwebVersion - -> HashMap ChainId SQLiteEnv + -> HashMap ChainId Database -> RocksDb -> BlockHeight -> IO (HashMap ChainId BlockHeader) @@ -146,7 +146,7 @@ resolveCutHeadersAtHeight logger v pactConns rocksDb target = do resolveCutHeadersAtHeights :: (Logger logger) => logger -> ChainwebVersion - -> HashMap ChainId SQLiteEnv + -> HashMap ChainId Database -> RocksDb -> [BlockHeight] -- ^ targets -> IO [(BlockHeight, HashMap ChainId BlockHeader)] @@ -159,7 +159,7 @@ resolveCutHeadersAtHeights logger v pactConns rocksDb targets = do -- a 'BlockHeader' with the computed 'ChainGrandHash' at the header's -- 'BlockHeight'. computeGrandHashesAt :: () - => HashMap ChainId SQLiteEnv + => HashMap ChainId Database -- ^ pact connections -> HashMap ChainId BlockHeader -- ^ Resolved targets, i.e, blockheights that are accessible per each @@ -202,17 +202,17 @@ withConnections :: (Logger logger) => logger -> FilePath -> [ChainId] - -> (HashMap ChainId SQLiteEnv -> IO x) + -> (HashMap ChainId Database -> IO x) -> IO x withConnections logger pactDir cids f = do checkPactDbsExist pactDir cids bracket openConnections closeConnections f where - openConnections :: IO (HashMap ChainId SQLiteEnv) + openConnections :: IO (HashMap ChainId Database) openConnections = fmap HM.fromList $ forM cids $ \cid -> do (cid, ) <$> startSqliteDb cid logger pactDir False - closeConnections :: HashMap ChainId SQLiteEnv -> IO () + closeConnections :: HashMap ChainId Database -> IO () closeConnections = mapM_ stopSqliteDb hex :: ByteString -> Text diff --git a/src/Chainweb/Pact/Backend/RelationalCheckpointer.hs b/src/Chainweb/Pact/Backend/RelationalCheckpointer.hs index f789cc8cb5..b09b4275c6 100644 --- a/src/Chainweb/Pact/Backend/RelationalCheckpointer.hs +++ b/src/Chainweb/Pact/Backend/RelationalCheckpointer.hs @@ -113,20 +113,22 @@ initRelationalCheckpointer' -> ChainwebVersion -> ChainId -> IO (MVar (DbCache PersistModuleData), Checkpointer logger) -initRelationalCheckpointer' dbCacheLimit sqlenv loggr v cid = do - initSchema loggr sqlenv +initRelationalCheckpointer' dbCacheLimit (SQLiteEnv connType db) loggr v cid = do + when (connType == ReadWrite) $ initSchema loggr db + moduleCacheVar <- newMVar (emptyDbCache dbCacheLimit) let checkpointer = Checkpointer - { _cpRestoreAndSave = doRestoreAndSave loggr v cid sqlenv moduleCacheVar + { _cpRestoreAndSave = doRestoreAndSave loggr v cid db moduleCacheVar + , _cpReadCp = ReadCheckpointer - { _cpReadFrom = doReadFrom loggr v cid sqlenv moduleCacheVar - , _cpGetBlockHistory = doGetBlockHistory sqlenv - , _cpGetHistoricalLookup = doGetHistoricalLookup sqlenv - , _cpGetEarliestBlock = doGetEarliestBlock sqlenv - , _cpGetLatestBlock = doGetLatestBlock sqlenv - , _cpLookupBlockInCheckpointer = doLookupBlock sqlenv - , _cpGetBlockParent = doGetBlockParent v cid sqlenv + { _cpReadFrom = doReadFrom loggr v cid db moduleCacheVar + , _cpGetBlockHistory = doGetBlockHistory db + , _cpGetHistoricalLookup = doGetHistoricalLookup db + , _cpGetEarliestBlock = doGetEarliestBlock db + , _cpGetLatestBlock = doGetLatestBlock db + , _cpLookupBlockInCheckpointer = doLookupBlock db + , _cpGetBlockParent = doGetBlockParent v cid db , _cpLogger = loggr } } @@ -139,7 +141,7 @@ doReadFrom => logger -> ChainwebVersion -> ChainId - -> SQLiteEnv + -> Database -> MVar (DbCache PersistModuleData) -> Maybe ParentHeader -> (CurrentBlockDbEnv logger -> IO a) @@ -190,7 +192,7 @@ doRestoreAndSave => logger -> ChainwebVersion -> ChainId - -> SQLiteEnv + -> Database -> MVar (DbCache PersistModuleData) -> Maybe ParentHeader -> Stream (Of (RunnableBlock logger q)) IO r @@ -257,7 +259,7 @@ doRestoreAndSave logger v cid sql moduleCacheVar parent blocks = return blocks -doGetEarliestBlock :: HasCallStack => SQLiteEnv -> IO (Maybe (BlockHeight, BlockHash)) +doGetEarliestBlock :: HasCallStack => Database -> IO (Maybe (BlockHeight, BlockHash)) doGetEarliestBlock db = do r <- qry_ db qtext [RInt, RBlob] >>= mapM go case r of @@ -272,7 +274,7 @@ doGetEarliestBlock db = do in return (fromIntegral hgt, hash) go _ = fail "Chainweb.Pact.Backend.RelationalCheckpointer.doGetEarliest: impossible. This is a bug in chainweb-node." -doGetLatestBlock :: HasCallStack => SQLiteEnv -> IO (Maybe (BlockHeight, BlockHash)) +doGetLatestBlock :: HasCallStack => Database -> IO (Maybe (BlockHeight, BlockHash)) doGetLatestBlock db = do r <- qry_ db qtext [RInt, RBlob] >>= mapM go case r of @@ -287,7 +289,7 @@ doGetLatestBlock db = do in return (fromIntegral hgt, hash) go _ = fail "Chainweb.Pact.Backend.RelationalCheckpointer.doGetLatest: impossible. This is a bug in chainweb-node." -doLookupBlock :: SQLiteEnv -> (BlockHeight, BlockHash) -> IO Bool +doLookupBlock :: Database -> (BlockHeight, BlockHash) -> IO Bool doLookupBlock db (bheight, bhash) = do r <- qry db qtext [SInt $ fromIntegral bheight, SBlob (runPutS (encodeBlockHash bhash))] [RInt] @@ -298,7 +300,7 @@ doLookupBlock db (bheight, bhash) = do qtext = "SELECT COUNT(*) FROM BlockHistory WHERE blockheight = ? \ \ AND hash = ?;" -doGetBlockParent :: ChainwebVersion -> ChainId -> SQLiteEnv -> (BlockHeight, BlockHash) -> IO (Maybe BlockHash) +doGetBlockParent :: ChainwebVersion -> ChainId -> Database -> (BlockHeight, BlockHash) -> IO (Maybe BlockHash) doGetBlockParent v cid db (bh, hash) | bh == genesisHeight v cid = return Nothing | otherwise = do @@ -351,7 +353,7 @@ doLookupSuccessful curHeight hashes = do return $! T3 txhash' (fromIntegral blockheight) blockhash' go _ = fail "impossible" -doGetBlockHistory :: SQLiteEnv -> BlockHeader -> Domain RowKey RowData -> IO BlockTxHistory +doGetBlockHistory :: Database -> BlockHeader -> Domain RowKey RowData -> IO BlockTxHistory doGetBlockHistory db blockHeader d = do endTxId <- fmap fromIntegral $ getEndTxId "doGetBlockHistory" db (Just $ ParentHeader blockHeader) @@ -406,7 +408,7 @@ doGetBlockHistory db blockHeader d = do doGetHistoricalLookup - :: SQLiteEnv + :: Database -> BlockHeader -> Domain RowKey RowData -> RowKey diff --git a/src/Chainweb/Pact/Backend/Types.hs b/src/Chainweb/Pact/Backend/Types.hs index 8d49567243..4e7d857d38 100644 --- a/src/Chainweb/Pact/Backend/Types.hs +++ b/src/Chainweb/Pact/Backend/Types.hs @@ -66,7 +66,9 @@ module Chainweb.Pact.Backend.Types , benvBlockState , blockHandlerEnv , runBlockEnv - , SQLiteEnv + , SQLiteConnectionType(..) + , SQLiteEnv(..) + , Database , BlockHandler(..) , blockHandlerBlockHeight , blockHandlerModuleNameFix @@ -217,7 +219,8 @@ data SQLitePendingData = SQLitePendingData makeLenses ''SQLitePendingData -type SQLiteEnv = Database +data SQLiteConnectionType = ReadWrite | ReadOnly deriving Eq +data SQLiteEnv = SQLiteEnv SQLiteConnectionType Database -- | Monad state for 'BlockHandler. -- This notably contains all of the information that's being mutated during @@ -252,7 +255,7 @@ initBlockState cl txid = BlockState makeLenses ''BlockState data BlockHandlerEnv logger = BlockHandlerEnv - { _blockHandlerDb :: !SQLiteEnv + { _blockHandlerDb :: !Database , _blockHandlerLogger :: !logger , _blockHandlerBlockHeight :: !BlockHeight , _blockHandlerModuleNameFix :: !Bool @@ -262,7 +265,7 @@ data BlockHandlerEnv logger = BlockHandlerEnv mkBlockHandlerEnv :: ChainwebVersion -> ChainId -> BlockHeight - -> SQLiteEnv -> logger -> BlockHandlerEnv logger + -> Database -> logger -> BlockHandlerEnv logger mkBlockHandlerEnv v cid bh sql logger = BlockHandlerEnv { _blockHandlerDb = sql , _blockHandlerLogger = logger diff --git a/src/Chainweb/Pact/Backend/Utils.hs b/src/Chainweb/Pact/Backend/Utils.hs index c5546b154f..82b96bd610 100644 --- a/src/Chainweb/Pact/Backend/Utils.hs +++ b/src/Chainweb/Pact/Backend/Utils.hs @@ -47,6 +47,7 @@ module Chainweb.Pact.Backend.Utils , execMulti -- * SQLite runners , withSqliteDb + , withReadOnlySqliteDb , startSqliteDb , stopSqliteDb , withSQLiteConnection @@ -158,7 +159,7 @@ callDb callerName action = do Right r -> return r withSavepoint - :: SQLiteEnv + :: Database -> SavepointName -> IO a -> IO a @@ -176,11 +177,11 @@ withSavepoint db name action = mask $ \resetMask -> do , Handler $ \(e :: SomeException) -> throwErr ("non-pact exception: " <> sshow e) ] -beginSavepoint :: SQLiteEnv -> SavepointName -> IO () +beginSavepoint :: Database -> SavepointName -> IO () beginSavepoint db name = exec_ db $ "SAVEPOINT [" <> convSavepointName name <> "];" -commitSavepoint :: SQLiteEnv -> SavepointName -> IO () +commitSavepoint :: Database -> SavepointName -> IO () commitSavepoint db name = exec_ db $ "RELEASE SAVEPOINT [" <> convSavepointName name <> "];" @@ -194,13 +195,13 @@ commitSavepoint db name = -- Cf. for details about -- savepoints. -- -rollbackSavepoint :: SQLiteEnv -> SavepointName -> IO () +rollbackSavepoint :: Database -> SavepointName -> IO () rollbackSavepoint db name = exec_ db $ "ROLLBACK TRANSACTION TO SAVEPOINT [" <> convSavepointName name <> "];" -- | @abortSavepoint n@ rolls back all database updates since the most recent -- savepoint with the name @n@ and removes it from the savepoint stack. -abortSavepoint :: SQLiteEnv -> SavepointName -> IO () +abortSavepoint :: Database -> SavepointName -> IO () abortSavepoint db name = do rollbackSavepoint db name commitSavepoint db name @@ -281,19 +282,49 @@ withSqliteDb -> logger -> FilePath -> Bool - -> (SQLiteEnv -> IO a) + -> (Database -> IO a) -> IO a withSqliteDb cid logger dbDir resetDb = bracket (startSqliteDb cid logger dbDir resetDb) stopSqliteDb +withReadOnlySqliteDb + :: Logger logger + => ChainId + -> logger + -> FilePath + -> (Database -> IO a) + -> IO a +withReadOnlySqliteDb cid logger dbDir = bracket + (startReadOnlySqliteDb cid logger dbDir) + stopSqliteDb + +startReadOnlySqliteDb + :: Logger logger + => ChainId + -> logger + -> FilePath + -> IO Database +startReadOnlySqliteDb cid logger dbDir = do + textLog Info $ mconcat + [ "opened sqlitedb for " + , sshow cid + , " in directory " + , sshow dbDir + ] + textLog Info $ "opening sqlitedb named " <> T.pack sqliteFile + openReadOnlySQLiteConnection sqliteFile chainwebPragmas + where + textLog = logFunctionText logger + sqliteFile = dbDir chainDbFileName cid + startSqliteDb :: Logger logger => ChainId -> logger -> FilePath -> Bool - -> IO SQLiteEnv + -> IO Database startSqliteDb cid logger dbDir doResetDb = do when doResetDb resetDb createDirectoryIfMissing True dbDir @@ -317,15 +348,16 @@ chainDbFileName cid = fold , ".sqlite" ] -stopSqliteDb :: SQLiteEnv -> IO () +stopSqliteDb :: Database -> IO () stopSqliteDb = closeSQLiteConnection -withSQLiteConnection :: String -> [Pragma] -> (SQLiteEnv -> IO c) -> IO c +withSQLiteConnection :: String -> [Pragma] -> (Database -> IO c) -> IO c withSQLiteConnection file ps = bracket (openSQLiteConnection file ps) closeSQLiteConnection -openSQLiteConnection :: String -> [Pragma] -> IO SQLiteEnv -openSQLiteConnection file ps = open2 file >>= \case +openSQLiteConnection :: String -> [Pragma] -> IO Database +openSQLiteConnection file ps = do + open2 file >>= \case Left (err, msg) -> internalError $ "withSQLiteConnection: Can't open db with " @@ -334,7 +366,18 @@ openSQLiteConnection file ps = open2 file >>= \case runPragmas r ps return r -closeSQLiteConnection :: SQLiteEnv -> IO () +openReadOnlySQLiteConnection :: String -> [Pragma] -> IO Database +openReadOnlySQLiteConnection file ps = do + openReadOnly2 file >>= \case + Left (err, msg) -> + internalError $ + "withSQLiteConnection: Can't open db with " + <> asString (show err) <> ": " <> asString (show msg) + Right r -> do + runPragmas r ps + return r + +closeSQLiteConnection :: Database -> IO () closeSQLiteConnection c = void $ close_v2 c -- passing the empty string as filename causes sqlite to use a temporary file @@ -343,7 +386,7 @@ closeSQLiteConnection c = void $ close_v2 c -- -- Cf. https://www.sqlite.org/inmemorydb.html -- -withTempSQLiteConnection :: [Pragma] -> (SQLiteEnv -> IO c) -> IO c +withTempSQLiteConnection :: [Pragma] -> (Database -> IO c) -> IO c withTempSQLiteConnection = withSQLiteConnection "" -- Using the special file name @:memory:@ causes sqlite to create a temporary in-memory @@ -351,21 +394,30 @@ withTempSQLiteConnection = withSQLiteConnection "" -- -- Cf. https://www.sqlite.org/inmemorydb.html -- -withInMemSQLiteConnection :: [Pragma] -> (SQLiteEnv -> IO c) -> IO c +withInMemSQLiteConnection :: [Pragma] -> (Database -> IO c) -> IO c withInMemSQLiteConnection = withSQLiteConnection ":memory:" open2 :: String -> IO (Either (SQ3.Error, SQ3.Utf8) SQ3.Database) open2 file = open_v2 (fromString file) - (collapseFlags [sqlite_open_readwrite , sqlite_open_create , sqlite_open_fullmutex]) + (collapseFlags [sqlite_open_readwrite , sqlite_open_create, sqlite_open_nomutex]) Nothing -- Nothing corresponds to the nullPtr +openReadOnly2 :: String -> IO (Either (SQ3.Error, SQ3.Utf8) SQ3.Database) +openReadOnly2 file = open_v2 + (fromString file) + (collapseFlags [sqlite_open_readonly , sqlite_open_nomutex]) + Nothing -- Nothing corresponds to the nullPtr + + collapseFlags :: [SQLiteFlag] -> SQLiteFlag collapseFlags xs = if Prelude.null xs then error "collapseFlags: You must pass a non-empty list" else Prelude.foldr1 (.|.) xs -sqlite_open_readwrite, sqlite_open_create, sqlite_open_fullmutex :: SQLiteFlag +sqlite_open_readonly, sqlite_open_readwrite, sqlite_open_create, sqlite_open_nomutex :: SQLiteFlag +sqlite_open_readonly = 0x00000001 sqlite_open_readwrite = 0x00000002 sqlite_open_create = 0x00000004 -sqlite_open_fullmutex = 0x00010000 +sqlite_open_nomutex = 0x00008000 +-- sqlite_open_fullmutex = 0x00010000 diff --git a/src/Chainweb/Pact/PactService.hs b/src/Chainweb/Pact/PactService.hs index b86b857efe..cd9cf37ffc 100644 --- a/src/Chainweb/Pact/PactService.hs +++ b/src/Chainweb/Pact/PactService.hs @@ -101,7 +101,7 @@ import Chainweb.Pact.Backend.RelationalCheckpointer (withProdRelationalCheckpoin import Chainweb.Pact.Backend.Types import Chainweb.Pact.PactService.ExecBlock import Chainweb.Pact.PactService.Checkpointer -import Chainweb.Pact.Service.PactQueue (PactQueue, getNextRequest) +import Chainweb.Pact.Service.PactQueue (PactQueue, getNextWriteRequest, getNextReadRequest) import Chainweb.Pact.Service.Types import Chainweb.Pact.SPV import Chainweb.Pact.TransactionExec @@ -128,13 +128,26 @@ runPactService -> MemPoolAccess -> BlockHeaderDb -> PayloadDb tbl - -> SQLiteEnv + -> (Database, Database) -> PactServiceConfig -> IO () -runPactService ver cid chainwebLogger reqQ mempoolAccess bhDb pdb sqlenv config = - void $ withPactService ver cid chainwebLogger bhDb pdb sqlenv config $ do +runPactService ver cid chainwebLogger reqQ mempoolAccess bhDb pdb (writeSqlEnv, readSqlEnv) config = do + void $ withPactService ver cid chainwebLogger bhDb pdb (SQLiteEnv ReadWrite writeSqlEnv) config $ do + -- If the latest header that is stored in the checkpointer was on an + -- orphaned fork, there is no way to recover it in the call of + -- 'initalPayloadState.readContracts'. We therefore rewind to the latest + -- avaliable header in the block header database. + -- + exitOnRewindLimitExceeded $ initializeLatestBlock (_pactUnlimitedInitialRewind config) + initialPayloadState mempoolAccess ver cid - serviceRequests mempoolAccess reqQ + + pst <- get + pse <- ask + liftIO $ race_ + (runPactServiceM pst pse $ serviceWriteRequests mempoolAccess reqQ) + ((withPactService ver cid chainwebLogger bhDb pdb (SQLiteEnv ReadOnly readSqlEnv) config $ + serviceReadRequests mempoolAccess reqQ)) withPactService :: (Logger logger, CanReadablePayloadCas tbl) @@ -147,7 +160,7 @@ withPactService -> PactServiceConfig -> PactServiceM logger tbl a -> IO (T2 a PactServiceState) -withPactService ver cid chainwebLogger bhDb pdb sqlenv config act = +withPactService ver cid chainwebLogger bhDb pdb sqlenv config act = do withProdRelationalCheckpointer checkpointerLogger (_pactModuleCacheLimit config) sqlenv ver cid $ \checkpointer -> do let !rs = readRewards let !pse = PactServiceEnv @@ -191,14 +204,7 @@ withPactService ver cid chainwebLogger bhDb pdb sqlenv config act = logError_ chainwebLogger (J.encodeText msg) throwM e - runPactServiceM pst pse $ do - -- If the latest header that is stored in the checkpointer was on an - -- orphaned fork, there is no way to recover it in the call of - -- 'initalPayloadState.readContracts'. We therefore rewind to the latest - -- avaliable header in the block header database. - -- - exitOnRewindLimitExceeded $ initializeLatestBlock (_pactUnlimitedInitialRewind config) - act + runPactServiceM pst pse act where pactServiceLogger = setComponent "pact" chainwebLogger checkpointerLogger = addLabel ("sub-component", "checkpointer") pactServiceLogger @@ -280,40 +286,29 @@ lookupBlockHeader bhash ctx = do throwM $ BlockHeaderLookupFailure $ "failed lookup of parent header in " <> ctx <> ": " <> sshow e --- | Loop forever, serving Pact execution requests and reponses from the queues -serviceRequests +-- | Loop forever, serving Pact execution Write-requests +serviceWriteRequests :: forall logger tbl. (Logger logger, CanReadablePayloadCas tbl) => MemPoolAccess -> PactQueue -> PactServiceM logger tbl () -serviceRequests memPoolAccess reqQ = do - logInfo "Starting service" - go `finally` logInfo "Stopping service" +serviceWriteRequests memPoolAccess reqQ = do + logInfo "Starting write-requests handling service" + go `finally` logInfo "Stopping write-requests handling service" where go :: PactServiceM logger tbl () go = do PactServiceEnv{_psLogger} <- ask - logDebug "serviceRequests: wait" - SubmittedRequestMsg msg statusRef <- liftIO $ getNextRequest reqQ + logDebug "serviceWriteRequests: wait" + SubmittedRequestMsg msg statusRef <- liftIO $ getNextWriteRequest reqQ requestId <- liftIO $ UUID.toText <$> UUID.nextRandom let logFn :: LogFunction logFn = logFunction $ addLabel ("pact-request-id", requestId) _psLogger - logDebug $ "serviceRequests: " <> sshow msg + logDebug $ "serviceWriteRequests: " <> sshow msg case msg of CloseMsg -> tryOne "execClose" statusRef $ return () - LocalMsg (LocalReq localRequest preflight sigVerify rewindDepth) -> do - trace logFn "Chainweb.Pact.PactService.execLocal" () 0 $ - tryOne "execLocal" statusRef $ - execLocal localRequest preflight sigVerify rewindDepth - go - NewBlockMsg NewBlockReq {..} -> do - trace logFn "Chainweb.Pact.PactService.execNewBlock" - () 1 $ - tryOne "execNewBlock" statusRef $ - execNewBlock memPoolAccess _newMiner - go ValidateBlockMsg ValidateBlockReq {..} -> do tryOne "execValidateBlock" statusRef $ fmap fst $ trace' logFn "Chainweb.Pact.PactService.execValidateBlock" @@ -321,6 +316,44 @@ serviceRequests memPoolAccess reqQ = do (\(_, g) -> fromIntegral g) (execValidateBlock memPoolAccess _valBlockHeader _valCheckablePayload) go + SyncToBlockMsg SyncToBlockReq {..} -> do + trace logFn "Chainweb.Pact.PactService.execSyncToBlock" _syncToBlockHeader 1 $ + tryOne "syncToBlockBlock" statusRef $ + execSyncToBlock _syncToBlockHeader + go + _ -> error $ "impossible: unexpected request " ++ show msg + +-- | Loop forever, serving Pact execution Read-requests +serviceReadRequests + :: forall logger tbl. (Logger logger, CanReadablePayloadCas tbl) + => MemPoolAccess + -> PactQueue + -> PactServiceM logger tbl () +serviceReadRequests memPoolAccess reqQ = do + logInfo "Starting read-requests handling service" + go `finally` (logInfo "Stopping read-requests handling service") + where + go = do + logDebug "serviceReadRequests: wait" + SubmittedRequestMsg msg statusRef <- liftIO $ getNextReadRequest reqQ + requestId <- liftIO $ UUID.toText <$> UUID.nextRandom + PactServiceEnv{_psLogger} <- ask + let + logFn :: LogFunction + logFn = logFunction $ addLabel ("pact-request-id", requestId) _psLogger + logDebug $ "serviceReadRequests: " <> sshow msg + case msg of + NewBlockMsg NewBlockReq {..} -> do + trace logFn "Chainweb.Pact.PactService.execNewBlock" + () 1 $ + tryOne "execNewBlock" statusRef $ + execNewBlock memPoolAccess _newMiner + go + LocalMsg (LocalReq localRequest preflight sigVerify rewindDepth) -> do + trace logFn "Chainweb.Pact.PactService.execLocal" () 0 $ + tryOne "execLocal" statusRef $ + execLocal localRequest preflight sigVerify rewindDepth + go LookupPactTxsMsg (LookupPactTxsReq confDepth txHashes) -> do trace logFn "Chainweb.Pact.PactService.execLookupPactTxs" () (length txHashes) $ @@ -343,97 +376,94 @@ serviceRequests memPoolAccess reqQ = do tryOne "execHistoricalLookup" statusRef $ execHistoricalLookup bh d k go - SyncToBlockMsg SyncToBlockReq {..} -> do - trace logFn "Chainweb.Pact.PactService.execSyncToBlock" _syncToBlockHeader 1 $ - tryOne "syncToBlockBlock" statusRef $ - execSyncToBlock _syncToBlockHeader - go ReadOnlyReplayMsg ReadOnlyReplayReq {..} -> do trace logFn "Chainweb.Pact.PactService.execReadOnlyReplay" (_readOnlyReplayLowerBound, _readOnlyReplayUpperBound) 1 $ tryOne "readOnlyReplayBlock" statusRef $ execReadOnlyReplay _readOnlyReplayLowerBound _readOnlyReplayUpperBound go + _ -> error $ "impossible: unexpected request " ++ show msg - tryOne - :: forall a. Text - -> TVar (RequestStatus a) - -> PactServiceM logger tbl a - -> PactServiceM logger tbl () - tryOne which statusRef act = - evalPactOnThread - `catches` - [ Handler $ \(e :: SomeException) -> do - logError $ mconcat - [ "Received exception running pact service (" - , which - , "): " - , sshow e - ] - liftIO $ throwIO e - ] - where - -- here we start a thread to service the request - evalPactOnThread :: PactServiceM logger tbl () - evalPactOnThread = do - maybeException <- withPactState $ \run -> do - goLock <- newEmptyMVar - finishedLock <- newEmptyMVar - -- fork a thread to service the request - bracket - (forkIO $ - flip finally (tryPutMVar finishedLock ()) $ do - -- wait until we've been told to start. - -- we don't want to start if the request was cancelled - -- already - takeMVar goLock - -- run and report the answer. - tryAny (run act) >>= \case - Left ex -> atomically $ writeTVar statusRef (RequestFailed ex) - Right r -> atomically $ writeTVar statusRef (RequestDone r) - ) - -- if Pact itself is killed, kill the request thread too. - (\tid -> throwTo tid RequestCancelled >> takeMVar finishedLock) - (\_tid -> do - -- check first if the request has been cancelled before - -- starting work on it - beforeStarting <- atomically $ do - readTVar statusRef >>= \case - RequestInProgress -> - error "PactService internal error: request in progress before starting" - RequestDone _ -> - error "PactService internal error: request finished before starting" - RequestFailed e -> - return (Left e) - RequestNotStarted -> do - writeTVar statusRef RequestInProgress - return (Right ()) - case beforeStarting of - -- the request has already been cancelled, don't - -- start work on it. - Left ex -> return (Left ex) - Right () -> do - -- let the request thread start working - putMVar goLock () - -- wait until the request thread has finished - atomically $ readTVar statusRef >>= \case - RequestInProgress -> retry - RequestDone _ -> return (Right ()) - RequestFailed e -> return (Left e) - RequestNotStarted -> error "PactService internal error: request not started after starting" - ) - case maybeException of - Left (fromException -> Just AsyncCancelled) -> - logDebug "Pact action was cancelled" - Left (fromException -> Just ThreadKilled) -> - logWarn "Pact action thread was killed" - Left (exn :: SomeException) -> - logError $ mconcat - [ "Received exception running pact service (" - , which - , "): " - , sshow exn - ] - Right () -> return () +tryOne + :: forall logger tbl a. (Logger logger, CanReadablePayloadCas tbl) + => Text + -> TVar (RequestStatus a) + -> PactServiceM logger tbl a + -> PactServiceM logger tbl () +tryOne which statusRef act = + evalPactOnThread + `catches` + [ Handler $ \(e :: SomeException) -> do + logError $ mconcat + [ "Received exception running pact service (" + , which + , "): " + , sshow e + ] + liftIO $ throwIO e + ] + where + -- here we start a thread to service the request + evalPactOnThread :: PactServiceM logger tbl () + evalPactOnThread = do + maybeException <- withPactState $ \run -> do + goLock <- newEmptyMVar + finishedLock <- newEmptyMVar + -- fork a thread to service the request + bracket + (forkIO $ + flip finally (tryPutMVar finishedLock ()) $ do + -- wait until we've been told to start. + -- we don't want to start if the request was cancelled + -- already + takeMVar goLock + -- run and report the answer. + tryAny (run act) >>= \case + Left ex -> atomically $ writeTVar statusRef (RequestFailed ex) + Right r -> atomically $ writeTVar statusRef (RequestDone r) + ) + -- if Pact itself is killed, kill the request thread too. + (\tid -> throwTo tid RequestCancelled >> takeMVar finishedLock) + (\_tid -> do + -- check first if the request has been cancelled before + -- starting work on it + beforeStarting <- atomically $ do + readTVar statusRef >>= \case + RequestInProgress -> + error "PactService internal error: request in progress before starting" + RequestDone _ -> + error "PactService internal error: request finished before starting" + RequestFailed e -> + return (Left e) + RequestNotStarted -> do + writeTVar statusRef RequestInProgress + return (Right ()) + case beforeStarting of + -- the request has already been cancelled, don't + -- start work on it. + Left ex -> return (Left ex) + Right () -> do + -- let the request thread start working + putMVar goLock () + -- wait until the request thread has finished + atomically $ readTVar statusRef >>= \case + RequestInProgress -> retry + RequestDone _ -> return (Right ()) + RequestFailed e -> return (Left e) + RequestNotStarted -> error "PactService internal error: request not started after starting" + ) + case maybeException of + Left (fromException -> Just AsyncCancelled) -> + logDebug "Pact action was cancelled" + Left (fromException -> Just ThreadKilled) -> + logWarn "Pact action thread was killed" + Left (exn :: SomeException) -> + logError $ mconcat + [ "Received exception running pact service (" + , which + , "): " + , sshow exn + ] + Right () -> return () execNewBlock :: forall logger tbl. (Logger logger, CanReadablePayloadCas tbl) diff --git a/src/Chainweb/Pact/Service/PactInProcApi.hs b/src/Chainweb/Pact/Service/PactInProcApi.hs index 132f70ff91..b759dbbdad 100644 --- a/src/Chainweb/Pact/Service/PactInProcApi.hs +++ b/src/Chainweb/Pact/Service/PactInProcApi.hs @@ -66,9 +66,10 @@ withPactService -> PactServiceConfig -> (PactQueue -> IO a) -> IO a -withPactService ver cid logger mpc bhdb pdb pactDbDir config action = - withSqliteDb cid logger pactDbDir (_pactResetDb config) $ \sqlenv -> - withPactService' ver cid logger mpa bhdb pdb sqlenv config action +withPactService ver cid logger mpc bhdb pdb pactDbDir config action = do + withSqliteDb cid logger pactDbDir (_pactResetDb config) $ \writeSqlEnv -> do + withReadOnlySqliteDb cid logger pactDbDir $ \readSqlEnv -> do + withPactService' ver cid logger mpa bhdb pdb (writeSqlEnv, readSqlEnv) config action where mpa = pactMemPoolAccess mpc $ addLabel ("sub-component", "MempoolAccess") logger @@ -84,18 +85,18 @@ withPactService' -> MemPoolAccess -> BlockHeaderDb -> PayloadDb tbl - -> SQLiteEnv + -> (Database, Database) -> PactServiceConfig -> (PactQueue -> IO a) -> IO a -withPactService' ver cid logger memPoolAccess bhDb pdb sqlenv config action = do +withPactService' ver cid logger memPoolAccess bhDb pdb sqlenvs config action = do reqQ <- newPactQueue (_pactQueueSize config) race (concurrently_ (monitor reqQ) (server reqQ)) (action reqQ) >>= \case Left () -> error "Chainweb.Pact.Service.PactInProcApi: pact service terminated unexpectedly" Right a -> return a where server reqQ = runForever logg "pact-service" - $ PS.runPactService ver cid logger reqQ memPoolAccess bhDb pdb sqlenv config + $ PS.runPactService ver cid logger reqQ memPoolAccess bhDb pdb sqlenvs config logg = logFunction logger monitor = runPactServiceQueueMonitor $ addLabel ("sub-component", "PactQueue") logger diff --git a/src/Chainweb/Pact/Service/PactQueue.hs b/src/Chainweb/Pact/Service/PactQueue.hs index b06ca0c66d..c94d19dddf 100644 --- a/src/Chainweb/Pact/Service/PactQueue.hs +++ b/src/Chainweb/Pact/Service/PactQueue.hs @@ -22,7 +22,8 @@ module Chainweb.Pact.Service.PactQueue , waitForSubmittedRequest , submitRequestAnd , submitRequestAndWait -, getNextRequest +, getNextWriteRequest +, getNextReadRequest , getPactQueueStats , newPactQueue , resetPactQueueStats @@ -57,9 +58,10 @@ import Chainweb.Utils -- other requests. -- data PactQueue = PactQueue - { _pactQueueValidateBlock :: !(TBQueue (T2 SubmittedRequestMsg (Time Micros))) - , _pactQueueNewBlock :: !(TBQueue (T2 SubmittedRequestMsg (Time Micros))) - , _pactQueueOtherMsg :: !(TBQueue (T2 SubmittedRequestMsg (Time Micros))) + { _pactQueueWriteRequests :: !(TBQueue (T2 SubmittedRequestMsg (Time Micros))) + , _pactQueueCloseRequests :: !(TBQueue (T2 SubmittedRequestMsg (Time Micros))) + , _pactQueueNewBlockRequests :: !(TBQueue (T2 SubmittedRequestMsg (Time Micros))) + , _pactQueueReadRequests :: !(TBQueue (T2 SubmittedRequestMsg (Time Micros))) , _pactQueuePactQueueValidateBlockMsgCounters :: !(IORef PactQueueCounters) , _pactQueuePactQueueNewBlockMsgCounters :: !(IORef PactQueueCounters) , _pactQueuePactQueueOtherMsgCounters :: !(IORef PactQueueCounters) @@ -78,6 +80,7 @@ newPactQueue sz = PactQueue <$> newTBQueueIO sz <*> newTBQueueIO sz <*> newTBQueueIO sz + <*> newTBQueueIO sz <*> newIORef initPactQueueCounters <*> newIORef initPactQueueCounters <*> newIORef initPactQueueCounters @@ -93,9 +96,13 @@ addRequest q msg = do return statusRef where priority = case msg of - ValidateBlockMsg {} -> _pactQueueValidateBlock q - NewBlockMsg {} -> _pactQueueNewBlock q - _ -> _pactQueueOtherMsg q + -- Write-requests + ValidateBlockMsg {} -> _pactQueueWriteRequests q + SyncToBlockMsg {} -> _pactQueueWriteRequests q + CloseMsg -> _pactQueueCloseRequests q + -- Read-requests + NewBlockMsg {} -> _pactQueueNewBlockRequests q + _ -> _pactQueueReadRequests q -- | Cancel a request that's already been submitted to the Pact queue. -- @@ -135,14 +142,12 @@ submitRequestAnd q msg k = mask $ \restore -> do submitRequestAndWait :: PactQueue -> RequestMsg r -> IO r submitRequestAndWait q msg = submitRequestAnd q msg waitForSubmittedRequest --- | Get the next available request from the Pact execution queue +-- | Get the next available Write-request from the Pact execution queue -- -getNextRequest :: PactQueue -> IO SubmittedRequestMsg -getNextRequest q = do - T2 req entranceTime <- atomically - $ tryReadTBQueueOrRetry (_pactQueueValidateBlock q) - <|> tryReadTBQueueOrRetry (_pactQueueNewBlock q) - <|> tryReadTBQueueOrRetry (_pactQueueOtherMsg q) +getNextWriteRequest :: PactQueue -> IO SubmittedRequestMsg +getNextWriteRequest q = do + T2 req entranceTime <- atomically $ tryReadTBQueueOrRetry (_pactQueueWriteRequests q) + <|> tryReadTBQueueOrRetry (_pactQueueCloseRequests q) requestTime <- diff <$> getCurrentTimeIntegral <*> pure entranceTime updatePactQueueCounters (counters req q) requestTime return req @@ -152,6 +157,23 @@ getNextRequest q = do Just msg -> return msg counters (SubmittedRequestMsg ValidateBlockMsg{} _) = _pactQueuePactQueueValidateBlockMsgCounters + counters (SubmittedRequestMsg NewBlockMsg{} _) = error "getNextWriteRequest.counters.impossible" + counters _ = _pactQueuePactQueueOtherMsgCounters + +-- | Get the next available Read-request from the Pact execution queue +getNextReadRequest :: PactQueue -> IO SubmittedRequestMsg +getNextReadRequest q = do + T2 req entranceTime <- atomically $ tryReadTBQueueOrRetry (_pactQueueNewBlockRequests q) + <|> tryReadTBQueueOrRetry (_pactQueueReadRequests q) + requestTime <- diff <$> getCurrentTimeIntegral <*> pure entranceTime + updatePactQueueCounters (counters req q) requestTime + return req + where + tryReadTBQueueOrRetry = tryReadTBQueue >=> \case + Nothing -> retry + Just msg -> return msg + + counters (SubmittedRequestMsg ValidateBlockMsg{} _) = error "getNextReadRequest.counters.impossible" counters (SubmittedRequestMsg NewBlockMsg{} _) = _pactQueuePactQueueNewBlockMsgCounters counters _ = _pactQueuePactQueueOtherMsgCounters diff --git a/test/Chainweb/Test/Pact/Checkpointer.hs b/test/Chainweb/Test/Pact/Checkpointer.hs index 1c9a1a96c7..8d0848e8bd 100644 --- a/test/Chainweb/Test/Pact/Checkpointer.hs +++ b/test/Chainweb/Test/Pact/Checkpointer.hs @@ -639,18 +639,18 @@ runTwice step action = do runSQLite :: (Logger logger, logger ~ GenericLogger) => (IO (Checkpointer logger) -> TestTree) - -> IO SQLiteEnv + -> IO Database -> TestTree runSQLite f = runSQLite' (f . fmap fst) runSQLite' :: (Logger logger, logger ~ GenericLogger) - => (IO (Checkpointer logger, SQLiteEnv) -> TestTree) - -> IO SQLiteEnv + => (IO (Checkpointer logger, Database) -> TestTree) + -> IO Database -> TestTree runSQLite' runTest sqlEnvIO = runTest $ do sqlenv <- sqlEnvIO - cp <- initRelationalCheckpointer defaultModuleCacheLimit sqlenv logger testVer testChainId + cp <- initRelationalCheckpointer defaultModuleCacheLimit (SQLiteEnv ReadWrite sqlenv) logger testVer testChainId return (cp, sqlenv) where logger = addLabel ("sub-component", "relational-checkpointer") $ dummyLogger diff --git a/test/Chainweb/Test/Pact/ModuleCacheOnRestart.hs b/test/Chainweb/Test/Pact/ModuleCacheOnRestart.hs index d436a071f6..71dd72eb4e 100644 --- a/test/Chainweb/Test/Pact/ModuleCacheOnRestart.hs +++ b/test/Chainweb/Test/Pact/ModuleCacheOnRestart.hs @@ -285,7 +285,7 @@ snapshotCache iomcache initCache = do withPact' :: (Logger logger, logger ~ GenericLogger) => IO TestBlockDb - -> IO SQLiteEnv + -> IO Database -> IO (MVar ModuleInitCache) -> CacheTest logger RocksDbTable -> (String -> IO ()) @@ -296,7 +296,7 @@ withPact' bdbio ioSqlEnv r (ps, cacheTest) tastylog = do let pdb = _bdbPayloadDb bdb sqlEnv <- ioSqlEnv T2 _ pstate <- withPactService - testVer testChainId logger bhdb pdb sqlEnv testPactServiceConfig ps + testVer testChainId logger bhdb pdb (SQLiteEnv ReadWrite sqlEnv) testPactServiceConfig ps cacheTest r (_psInitCache pstate) where logger = genericLogger Quiet (tastylog . T.unpack) diff --git a/test/Chainweb/Test/Pact/PactMultiChainTest.hs b/test/Chainweb/Test/Pact/PactMultiChainTest.hs index ed3f6cc7ac..57addab144 100644 --- a/test/Chainweb/Test/Pact/PactMultiChainTest.hs +++ b/test/Chainweb/Test/Pact/PactMultiChainTest.hs @@ -90,7 +90,7 @@ cid = unsafeChainId 9 data MultiEnv = MultiEnv { _menvBdb :: !TestBlockDb , _menvPact :: !WebPactExecutionService - , _menvPacts :: !(HM.HashMap ChainId (SQLiteEnv, PactExecutionService)) + , _menvPacts :: !(HM.HashMap ChainId (Database, PactExecutionService)) , _menvMpa :: !(IO (IORef MemPoolAccess)) , _menvMiner :: !Miner , _menvChainId :: !ChainId @@ -390,7 +390,7 @@ runLocalWithDepth nonce depth cid' cmd = do cwCmd <- buildCwCmd nonce testVersion cmd liftIO $ try @_ @PactException $ _pactLocal pact Nothing Nothing depth cwCmd -getSqlite :: ChainId -> PactTestM SQLiteEnv +getSqlite :: ChainId -> PactTestM Database getSqlite cid' = do HM.lookup cid' <$> view menvPacts >>= \case Just (dbEnv, _) -> return dbEnv diff --git a/test/Chainweb/Test/Pact/PactReplay.hs b/test/Chainweb/Test/Pact/PactReplay.hs index 92139f3910..8677bfc8b8 100644 --- a/test/Chainweb/Test/Pact/PactReplay.hs +++ b/test/Chainweb/Test/Pact/PactReplay.hs @@ -9,6 +9,7 @@ module Chainweb.Test.Pact.PactReplay where +-- import Control.Concurrent (threadDelay) import Control.Monad (forM_, unless, void) import Control.Monad.Catch import Control.Monad.Reader @@ -86,7 +87,7 @@ tests rdb = onRestart :: IO (IORef MemPoolAccess) - -> IO (SQLiteEnv, PactQueue, TestBlockDb) + -> IO (Database, PactQueue, TestBlockDb) -> (String -> IO ()) -> Assertion onRestart mpio iop step = do @@ -160,7 +161,7 @@ dupegenMemPoolAccess = do serviceInitializationAfterFork :: IO (IORef MemPoolAccess) -> BlockHeader - -> IO (SQLiteEnv, PactQueue, TestBlockDb) + -> IO (Database, PactQueue, TestBlockDb) -> Assertion serviceInitializationAfterFork mpio genesisBlock iop = do setOneShotMempool mpio testMemPoolAccess @@ -204,7 +205,7 @@ serviceInitializationAfterFork mpio genesisBlock iop = do firstPlayThrough :: IO (IORef MemPoolAccess) -> BlockHeader - -> IO (SQLiteEnv, PactQueue, TestBlockDb) + -> IO (Database, PactQueue, TestBlockDb) -> Assertion firstPlayThrough mpio genesisBlock iop = do setOneShotMempool mpio testMemPoolAccess @@ -238,7 +239,7 @@ firstPlayThrough mpio genesisBlock iop = do testDupes :: IO (IORef MemPoolAccess) - -> IO (SQLiteEnv, PactQueue, TestBlockDb) + -> IO (Database, PactQueue, TestBlockDb) -> Assertion testDupes mpio iop = do setMempool mpio =<< dupegenMemPoolAccess @@ -269,7 +270,7 @@ testDupes mpio iop = do testDeepForkLimit :: IO (IORef MemPoolAccess) -> RewindLimit - -> IO (SQLiteEnv, PactQueue,TestBlockDb) + -> IO (Database, PactQueue,TestBlockDb) -> (String -> IO ()) -> Assertion testDeepForkLimit mpio (RewindLimit deepForkLimit) iop step = do @@ -313,7 +314,7 @@ testDeepForkLimit mpio (RewindLimit deepForkLimit) iop step = do mineBlock :: Nonce - -> IO (SQLiteEnv, PactQueue, TestBlockDb) + -> IO (Database, PactQueue, TestBlockDb) -> IO (T3 ParentHeader BlockHeader PayloadWithOutputs) mineBlock nonce iop = timeout 5000000 go >>= \case Nothing -> error "PactReplay.mineBlock: Test timeout. Most likely a test case caused a pact service failure that wasn't caught, and the test was blocked while waiting for the result" diff --git a/test/Chainweb/Test/Pact/PactSingleChainTest.hs b/test/Chainweb/Test/Pact/PactSingleChainTest.hs index 509f49ec9f..4589a2ad10 100644 --- a/test/Chainweb/Test/Pact/PactSingleChainTest.hs +++ b/test/Chainweb/Test/Pact/PactSingleChainTest.hs @@ -18,7 +18,7 @@ module Chainweb.Test.Pact.PactSingleChainTest ) where import Control.Arrow ((&&&)) -import Control.Concurrent (forkIO) +import Control.Concurrent (forkIO) -- , threadDelay) import Control.Concurrent.MVar import Control.DeepSeq import Control.Lens hiding ((.=), matching) @@ -138,7 +138,7 @@ tests rdb = testGroup testName testWithConf' :: () => RocksDb - -> (IO (IORef MemPoolAccess) -> IO (SQLiteEnv, PactQueue, TestBlockDb) -> TestTree) + -> (IO (IORef MemPoolAccess) -> IO (Database, PactQueue, TestBlockDb) -> TestTree) -> PactServiceConfig -> TestTree testWithConf' rdb f conf = @@ -148,13 +148,13 @@ testWithConf' rdb f conf = test' :: () => RocksDb - -> (IO (IORef MemPoolAccess) -> IO (SQLiteEnv, PactQueue, TestBlockDb) -> TestTree) + -> (IO (IORef MemPoolAccess) -> IO (Database, PactQueue, TestBlockDb) -> TestTree) -> TestTree test' rdb f = testWithConf' rdb f testPactServiceConfig testTimeout' :: () => RocksDb - -> (IO (IORef MemPoolAccess) -> IO (SQLiteEnv, PactQueue, TestBlockDb) -> TestTree) + -> (IO (IORef MemPoolAccess) -> IO (Database, PactQueue, TestBlockDb) -> TestTree) -> TestTree testTimeout' rdb f = testWithConf' rdb f (testPactServiceConfig { _pactPreInsertCheckTimeout = 1 }) @@ -185,13 +185,13 @@ runBlock q bdb timeOffset = do forSuccess "newBlockAndValidate: validate" $ runBlockE q bdb timeOffset -newBlockAndValidate :: IO (IORef MemPoolAccess) -> IO (SQLiteEnv, PactQueue, TestBlockDb) -> TestTree +newBlockAndValidate :: IO (IORef MemPoolAccess) -> IO (Database, PactQueue, TestBlockDb) -> TestTree newBlockAndValidate refIO reqIO = testCase "newBlockAndValidate" $ do (_, q, bdb) <- reqIO setOneShotMempool refIO goldenMemPool void $ runBlock q bdb second -newBlockAndValidationFailure :: IO (IORef MemPoolAccess) -> IO (SQLiteEnv, PactQueue, TestBlockDb) -> TestTree +newBlockAndValidationFailure :: IO (IORef MemPoolAccess) -> IO (Database, PactQueue, TestBlockDb) -> TestTree newBlockAndValidationFailure refIO reqIO = testCase "newBlockAndValidationFailure" $ do (_, q, bdb) <- reqIO setOneShotMempool refIO goldenMemPool @@ -264,12 +264,13 @@ rosettaFailsWithoutFullHistory rdb = pactQueue <- newPactQueue 2000 blockDb <- mkTestBlockDb testVersion rdb bhDb <- getWebBlockHeaderDb (_bdbWebBlockHeaderDb blockDb) cid - sqlEnv <- sqlEnvIO + writeSqlEnv <- sqlEnvIO + readSqlEnv <- sqlEnvIO mempool <- fmap snd dm let payloadDb = _bdbPayloadDb blockDb let cfg = testPactServiceConfig { _pactFullHistoryRequired = True } let logger = genericLogger System.LogLevel.Error (\_ -> return ()) - e <- try $ runPactService testVersion cid logger pactQueue mempool bhDb payloadDb sqlEnv cfg + e <- try $ runPactService testVersion cid logger pactQueue mempool bhDb payloadDb (writeSqlEnv, readSqlEnv) cfg case e of Left (FullHistoryRequired {}) -> do pure () @@ -552,7 +553,7 @@ compactionGrandHashUnchanged rdb = assertEqual "GrandHash pre- and post-compaction are the same" hashPreCompaction hashPostCompaction -getHistory :: IO (IORef MemPoolAccess) -> IO (SQLiteEnv, PactQueue, TestBlockDb) -> TestTree +getHistory :: IO (IORef MemPoolAccess) -> IO (Database, PactQueue, TestBlockDb) -> TestTree getHistory refIO reqIO = testCase "getHistory" $ do (_, q, bdb) <- reqIO setOneShotMempool refIO goldenMemPool @@ -596,7 +597,7 @@ getHistoricalLookupNoTxs :: T.Text -> (Maybe (TxLog RowData) -> IO ()) -> IO (IORef MemPoolAccess) - -> IO (SQLiteEnv, PactQueue, TestBlockDb) + -> IO (Database, PactQueue, TestBlockDb) -> TestTree getHistoricalLookupNoTxs key assertF refIO reqIO = testCase (T.unpack ("getHistoricalLookupNoTxs: " <> key)) $ do @@ -610,7 +611,7 @@ getHistoricalLookupWithTxs :: T.Text -> (Maybe (TxLog RowData) -> IO ()) -> IO (IORef MemPoolAccess) - -> IO (SQLiteEnv, PactQueue, TestBlockDb) + -> IO (Database, PactQueue, TestBlockDb) -> TestTree getHistoricalLookupWithTxs key assertF refIO reqIO = testCase (T.unpack ("getHistoricalLookupWithTxs: " <> key)) $ do @@ -648,7 +649,7 @@ setFromHeader bh = -- this test relies on block gas errors being thrown before other Pact errors. -blockGasLimitTest :: HasCallStack => IO (IORef MemPoolAccess) -> IO (SQLiteEnv, PactQueue, TestBlockDb) -> TestTree +blockGasLimitTest :: HasCallStack => IO (IORef MemPoolAccess) -> IO (Database, PactQueue, TestBlockDb) -> TestTree blockGasLimitTest _ reqIO = testCase "blockGasLimitTest" $ do (_, q, _) <- reqIO @@ -703,7 +704,7 @@ blockGasLimitTest _ reqIO = testCase "blockGasLimitTest" $ do _ -> return () -mempoolRefillTest :: IO (IORef MemPoolAccess) -> IO (SQLiteEnv, PactQueue, TestBlockDb) -> TestTree +mempoolRefillTest :: IO (IORef MemPoolAccess) -> IO (Database, PactQueue, TestBlockDb) -> TestTree mempoolRefillTest mpRefIO reqIO = testCase "mempoolRefillTest" $ do (_, q, bdb) <- reqIO @@ -752,7 +753,7 @@ mempoolRefillTest mpRefIO reqIO = testCase "mempoolRefillTest" $ do $ set cbRPC (mkExec' "(+ 1 2)") $ defaultCmd -moduleNameFork :: IO (IORef MemPoolAccess) -> IO (SQLiteEnv, PactQueue, TestBlockDb) -> TestTree +moduleNameFork :: IO (IORef MemPoolAccess) -> IO (Database, PactQueue, TestBlockDb) -> TestTree moduleNameFork mpRefIO reqIO = testCase "moduleNameFork" $ do (_, q, bdb) <- reqIO @@ -792,7 +793,7 @@ moduleNameMempool ns mn = mempty defaultCmd -mempoolCreationTimeTest :: IO (IORef MemPoolAccess) -> IO (SQLiteEnv, PactQueue, TestBlockDb) -> TestTree +mempoolCreationTimeTest :: IO (IORef MemPoolAccess) -> IO (Database, PactQueue, TestBlockDb) -> TestTree mempoolCreationTimeTest mpRefIO reqIO = testCase "mempoolCreationTimeTest" $ do (_, q, bdb) <- reqIO @@ -831,7 +832,7 @@ mempoolCreationTimeTest mpRefIO reqIO = testCase "mempoolCreationTimeTest" $ do unless (V.and oks) $ throwM $ userError "Insert failed" return txs -preInsertCheckTimeoutTest :: IO (IORef MemPoolAccess) -> IO (SQLiteEnv, PactQueue, TestBlockDb) -> TestTree +preInsertCheckTimeoutTest :: IO (IORef MemPoolAccess) -> IO (Database, PactQueue, TestBlockDb) -> TestTree preInsertCheckTimeoutTest _ reqIO = testCase "preInsertCheckTimeoutTest" $ do (_, q, _) <- reqIO @@ -865,7 +866,7 @@ preInsertCheckTimeoutTest _ reqIO = testCase "preInsertCheckTimeoutTest" $ do (V.all (== Left InsertErrorTimedOut)) rs -badlistNewBlockTest :: IO (IORef MemPoolAccess) -> IO (SQLiteEnv, PactQueue, TestBlockDb) -> TestTree +badlistNewBlockTest :: IO (IORef MemPoolAccess) -> IO (Database, PactQueue, TestBlockDb) -> TestTree badlistNewBlockTest mpRefIO reqIO = testCase "badlistNewBlockTest" $ do (_, reqQ, _) <- reqIO let hashToTxHashList = V.singleton . requestKeyToTransactionHash . RequestKey . toUntypedHash @'Blake2b_256 @@ -889,7 +890,7 @@ badlistNewBlockTest mpRefIO reqIO = testCase "badlistNewBlockTest" $ do } -goldenNewBlock :: String -> MemPoolAccess -> IO (IORef MemPoolAccess) -> IO (SQLiteEnv, PactQueue, TestBlockDb) -> TestTree +goldenNewBlock :: String -> MemPoolAccess -> IO (IORef MemPoolAccess) -> IO (Database, PactQueue, TestBlockDb) -> TestTree goldenNewBlock name mp mpRefIO reqIO = golden name $ do (_, reqQ, _) <- reqIO setOneShotMempool mpRefIO mp @@ -948,7 +949,7 @@ goldenMemPool = mempty data CompactionResources = CompactionResources { mempoolRef :: IO (IORef MemPoolAccess) , mempool :: MemPoolAccess - , sqlEnv :: SQLiteEnv + , sqlEnv :: Database , pactQueue :: PactQueue , blockDb :: TestBlockDb } @@ -968,7 +969,8 @@ compactionSetup pat rdb pactCfg f = blockDb <- mkTestBlockDb testVersion rdb bhDb <- getWebBlockHeaderDb (_bdbWebBlockHeaderDb blockDb) cid let payloadDb = _bdbPayloadDb blockDb - sqlEnv <- sqlEnvIO + writeSqlEnv <- sqlEnvIO + readSqlEnv <- sqlEnvIO (mempoolRef, mempool) <- do (ref, nonRef) <- dm pure (pure ref, nonRef) @@ -976,14 +978,14 @@ compactionSetup pat rdb pactCfg f = let logger = genericLogger System.LogLevel.Error (\_ -> return ()) - void $ forkIO $ runPactService testVersion cid logger pactQueue mempool bhDb payloadDb sqlEnv pactCfg + void $ forkIO $ runPactService testVersion cid logger pactQueue mempool bhDb payloadDb (writeSqlEnv,readSqlEnv) pactCfg setOneShotMempool mempoolRef goldenMemPool f $ CompactionResources { mempoolRef = mempoolRef , mempool = mempool - , sqlEnv = sqlEnv + , sqlEnv = writeSqlEnv , pactQueue = pactQueue , blockDb = blockDb } diff --git a/test/Chainweb/Test/Pact/RemotePactTest.hs b/test/Chainweb/Test/Pact/RemotePactTest.hs index ccbe95f0a6..551dd8a9ac 100644 --- a/test/Chainweb/Test/Pact/RemotePactTest.hs +++ b/test/Chainweb/Test/Pact/RemotePactTest.hs @@ -141,7 +141,7 @@ withRequestKeys t cenv = do -- random chain sampling works with our test harnesses. -- tests :: RocksDb -> TestTree -tests rdb = testGroup "Chainweb.Test.Pact.RemotePactTest" +tests rdb = sequentialTestGroup "Chainweb.Test.Pact.RemotePactTest" AllFinish [ withResourceT (withNodeDbDirs rdb nNodes) $ \dbDirs -> withResourceT (withNodesAtLatestBehavior v id =<< liftIO dbDirs) $ \net -> let cenv = _getServiceClientEnv <$> net diff --git a/test/Chainweb/Test/Pact/SQLite.hs b/test/Chainweb/Test/Pact/SQLite.hs index a3294d7d91..3bb6045c50 100644 --- a/test/Chainweb/Test/Pact/SQLite.hs +++ b/test/Chainweb/Test/Pact/SQLite.hs @@ -138,13 +138,13 @@ shaa i = "sha3a_" <> fromString (show i) -- -------------------------------------------------------------------------- -- -- -runMsgTest :: IO (MVar SQLiteEnv) -> [Int] -> Int -> MsgFile -> IO () +runMsgTest :: IO (MVar Database) -> [Int] -> Int -> MsgFile -> IO () runMsgTest dbVarIO splitArg n f = do dbVar <- dbVarIO withMVar dbVar $ \db -> do msgAssert (\_ a b -> a @?= b) (sqliteSha3 db n splitArg) f -runMonteTest :: IO (MVar SQLiteEnv) -> [Int] -> Int -> MonteFile -> IO () +runMonteTest :: IO (MVar Database) -> [Int] -> Int -> MonteFile -> IO () runMonteTest dbVarIO splitArg n f = do dbVar <- dbVarIO withMVar dbVar $ \db -> do @@ -153,7 +153,7 @@ runMonteTest dbVarIO splitArg n f = do -- -------------------------------------------------------------------------- -- -- Repeated use in a query -msgTableTest :: IO (MVar SQLiteEnv) -> Int -> MsgFile -> IO () +msgTableTest :: IO (MVar Database) -> Int -> MsgFile -> IO () msgTableTest dbVarIO n msgFile = do dbVar <- dbVarIO withMVar dbVar $ \db -> do @@ -170,7 +170,7 @@ msgTableTest dbVarIO n msgFile = do query = "SELECT sum(" <> sha n <> "(substr(msg,1,len)) != md) FROM " <> fromString name name = "msgTable_" <> show n -msgTable :: SQLiteEnv -> String -> MsgFile -> IO () +msgTable :: Database -> String -> MsgFile -> IO () msgTable db name msgFile = do exec_ db ("CREATE TABLE " <> tbl <> " (len INT, msg BLOB, md BLOB)") forM_ (_msgVectors msgFile) $ \i -> do @@ -185,13 +185,13 @@ msgTable db name msgFile = do -- -------------------------------------------------------------------------- -- -- Repeated use in query for MonteFile -monteTableTest :: IO (MVar SQLiteEnv) -> Int -> MonteFile -> IO () +monteTableTest :: IO (MVar Database) -> Int -> MonteFile -> IO () monteTableTest dbVarIO n monteFile = do dbVar <- dbVarIO withMVar dbVar $ \db -> monteTableTest_ db n monteFile -monteTableTest_ :: SQLiteEnv -> Int -> MonteFile -> IO () +monteTableTest_ :: Database -> Int -> MonteFile -> IO () monteTableTest_ db n monteFile = do monteTable db monteTableName monteFile let query = fromString $ unwords @@ -219,7 +219,7 @@ monteTableTest_ db n monteFile = do where monteTableName = "monteTable_" <> show n -monteTable :: SQLiteEnv -> String -> MonteFile -> IO () +monteTable :: Database -> String -> MonteFile -> IO () monteTable db name monteFile = do exec_ db ("CREATE TABLE " <> tbl <> " (count INT, md BLOB)") forM_ (_monteVectors monteFile) $ \i -> do @@ -236,7 +236,7 @@ monteTable db name monteFile = do -- split a large input accross table rows withAggTable - :: IO (MVar SQLiteEnv) + :: IO (MVar Database) -> Int -> Int -> (IO (String, [B.ByteString]) -> TestTree) @@ -255,7 +255,7 @@ withAggTable dbVarIO rowCount chunkSize = exec' db ("INSERT INTO " <> fromString tbl <> " VALUES(?)") [SBlob i] return (tbl, input) -testAgg :: Int -> IO (MVar SQLiteEnv) -> IO (String, [B.ByteString]) -> IO () +testAgg :: Int -> IO (MVar Database) -> IO (String, [B.ByteString]) -> IO () testAgg n dbVarIO tblIO = do dbVar <- dbVarIO (tbl, input) <- first fromString <$> tblIO @@ -282,7 +282,7 @@ hashToByteString = BS.fromShort . coerce -- -------------------------------------------------------------------------- -- -- SHA3 Implementation -sqliteSha3 :: SQLiteEnv -> Int -> [Int] -> B.ByteString -> B.ByteString +sqliteSha3 :: Database -> Int -> [Int] -> B.ByteString -> B.ByteString sqliteSha3 db n argSplit arg = unsafePerformIO $ do rows <- qry db queryStr params [RBlob] case rows of diff --git a/test/Chainweb/Test/Pact/TTL.hs b/test/Chainweb/Test/Pact/TTL.hs index a00bf24861..6d98d2efbd 100644 --- a/test/Chainweb/Test/Pact/TTL.hs +++ b/test/Chainweb/Test/Pact/TTL.hs @@ -241,9 +241,10 @@ doValidateBlock doValidateBlock ctxIO header payload = do ctx <- ctxIO _mv' <- validateBlock header (CheckablePayloadWithOutputs payload) $ _ctxQueue ctx + addNewPayload (_ctxPdb ctx) (_blockHeight header) payload unsafeInsertBlockHeaderDb (_ctxBdb ctx) header - -- FIXME FIXME FIXME: do at least some checks? + -- FIXME FIXME: do at least some checks? -- -------------------------------------------------------------------------- -- -- Misc Utils diff --git a/test/Chainweb/Test/Pact/Utils.hs b/test/Chainweb/Test/Pact/Utils.hs index c22f54b002..1df132126c 100644 --- a/test/Chainweb/Test/Pact/Utils.hs +++ b/test/Chainweb/Test/Pact/Utils.hs @@ -154,8 +154,6 @@ import qualified Data.Text.Encoding as T import Data.String import qualified Data.Vector as V -import Database.SQLite3.Direct (Database) - import GHC.Generics import GHC.IO.Exception(IOException(..)) @@ -667,12 +665,12 @@ testPactCtxSQLite -> Version.ChainId -> BlockHeaderDb -> PayloadDb tbl - -> SQLiteEnv + -> Database -> PactServiceConfig -> (TxContext -> GasModel) -> IO (TestPactCtx logger tbl) testPactCtxSQLite logger v cid bhdb pdb sqlenv conf gasmodel = do - cp <- initRelationalCheckpointer defaultModuleCacheLimit sqlenv cpLogger v cid + cp <- initRelationalCheckpointer defaultModuleCacheLimit (SQLiteEnv ReadWrite sqlenv) cpLogger v cid let rs = readRewards !ctx <- TestPactCtx <$!> newMVar (PactServiceState mempty) @@ -720,7 +718,7 @@ withWebPactExecutionService -> TestBlockDb -> MemPoolAccess -> (TxContext -> GasModel) - -> ((WebPactExecutionService,HM.HashMap ChainId (SQLiteEnv, PactExecutionService)) -> IO a) + -> ((WebPactExecutionService,HM.HashMap ChainId (Database, PactExecutionService)) -> IO a) -> IO a withWebPactExecutionService logger v pactConfig bdb mempoolAccess gasmodel act = withDbs $ \sqlenvs -> do @@ -734,7 +732,7 @@ withWebPactExecutionService logger v pactConfig bdb mempoolAccess gasmodel act = withDbs f = foldl' (\soFar _ -> withDb soFar) f (chainIds v) [] withDb g envs = withTempSQLiteConnection chainwebPragmas $ \s -> g (s : envs) - mkPact :: SQLiteEnv -> ChainId -> IO PactExecutionService + mkPact :: Database -> ChainId -> IO PactExecutionService mkPact sqlenv c = do bhdb <- getBlockHeaderDb c bdb ctx <- testPactCtxSQLite logger v c bhdb (_bdbPayloadDb bdb) sqlenv pactConfig gasmodel @@ -785,7 +783,7 @@ runCut v bdb pact genTime noncer miner = h <- getParentTestBlockDb bdb cid void $ _webPactValidateBlock pact h (CheckablePayloadWithOutputs pout) -initializeSQLite :: IO SQLiteEnv +initializeSQLite :: IO Database initializeSQLite = open2 file >>= \case Left (_err, _msg) -> internalError "initializeSQLite: A connection could not be opened." @@ -793,7 +791,7 @@ initializeSQLite = open2 file >>= \case where file = "" {- temporary sqlitedb -} -freeSQLiteResource :: SQLiteEnv -> IO () +freeSQLiteResource :: Database -> IO () freeSQLiteResource sqlenv = void $ close_v2 sqlenv type WithPactCtxSQLite logger tbl = forall a . PactServiceM logger tbl a -> IO a @@ -884,7 +882,7 @@ withTemporaryDir = withResource -- | Single-chain Pact via service queue. -- -- The difference between this and 'withPactTestBlockDb' is that, --- this function takes a `SQLiteEnv` resource which it then exposes +-- this function takes a `Database` resource which it then exposes -- to the test function. -- -- TODO: Consolidate these two functions. @@ -892,10 +890,10 @@ withPactTestBlockDb' :: ChainwebVersion -> ChainId -> RocksDb - -> IO SQLiteEnv + -> IO Database -> IO MemPoolAccess -> PactServiceConfig - -> (IO (SQLiteEnv,PactQueue,TestBlockDb) -> TestTree) + -> (IO (Database,PactQueue,TestBlockDb) -> TestTree) -> TestTree withPactTestBlockDb' version cid rdb sqlEnvIO mempoolIO pactConfig f = withResource' (mkTestBlockDb version rdb) $ \bdbio -> @@ -904,13 +902,14 @@ withPactTestBlockDb' version cid rdb sqlEnvIO mempoolIO pactConfig f = startPact bdbio = do reqQ <- newPactQueue 2000 bdb <- bdbio - sqlEnv <- sqlEnvIO + writeSqlEnv <- sqlEnvIO + readSqlEnv <- sqlEnvIO mempool <- mempoolIO bhdb <- getWebBlockHeaderDb (_bdbWebBlockHeaderDb bdb) cid let pdb = _bdbPayloadDb bdb a <- async $ runForever (\_ _ -> return ()) "Chainweb.Test.Pact.Utils.withPactTestBlockDb" $ - runPactService version cid logger reqQ mempool bhdb pdb sqlEnv pactConfig - return (a, (sqlEnv,reqQ,bdb)) + runPactService version cid logger reqQ mempool bhdb pdb (writeSqlEnv, readSqlEnv) pactConfig + return (a, (writeSqlEnv,reqQ,bdb)) stopPact (a, _) = cancel a @@ -924,7 +923,7 @@ withPactTestBlockDb' version cid rdb sqlEnvIO mempoolIO pactConfig f = withSqliteDb :: () => ChainId -> IO FilePath - -> (IO SQLiteEnv -> TestTree) + -> (IO Database -> TestTree) -> TestTree withSqliteDb cid iodir s = withResource start stop s where @@ -949,7 +948,7 @@ withPactTestBlockDb -> RocksDb -> IO MemPoolAccess -> PactServiceConfig - -> (IO (SQLiteEnv,PactQueue,TestBlockDb) -> TestTree) + -> (IO (Database,PactQueue,TestBlockDb) -> TestTree) -> TestTree withPactTestBlockDb version cid rdb mempoolIO pactConfig f = withTemporaryDir $ \iodir -> @@ -963,10 +962,11 @@ withPactTestBlockDb version cid rdb mempoolIO pactConfig f = mempool <- mempoolIO bhdb <- getWebBlockHeaderDb (_bdbWebBlockHeaderDb bdb) cid let pdb = _bdbPayloadDb bdb - sqlEnv <- startSqliteDb cid logger dir False + writeSqlEnv <- startSqliteDb cid logger dir False + readSqlEnv <- startSqliteDb cid logger dir False a <- async $ runForever (\_ _ -> return ()) "Chainweb.Test.Pact.Utils.withPactTestBlockDb" $ - runPactService version cid logger reqQ mempool bhdb pdb sqlEnv pactConfig - return (a, (sqlEnv,reqQ,bdb)) + runPactService version cid logger reqQ mempool bhdb pdb (writeSqlEnv, readSqlEnv) pactConfig + return (a, (writeSqlEnv,reqQ,bdb)) stopPact (a, (sqlEnv, _, _)) = cancel a >> stopSqliteDb sqlEnv @@ -1030,7 +1030,7 @@ getLatestPactState db = do (PactState.getLatestPactStateDiffable db) locateTarget :: () - => SQLiteEnv + => Database -> C.TargetBlockHeight -> IO BlockHeight locateTarget db = \case @@ -1055,7 +1055,7 @@ locateTarget db = \case compact :: () => LogLevel -> [C.CompactFlag] - -> SQLiteEnv + -> Database -> C.TargetBlockHeight -> IO () compact logLevel cFlags db target = do @@ -1067,7 +1067,7 @@ compact logLevel cFlags db target = do compactUntilAvailable :: C.TargetBlockHeight -> YAL.Logger SomeLogMessage - -> SQLiteEnv + -> Database -> [C.CompactFlag] -> IO () compactUntilAvailable target logger db flags = do diff --git a/test/Chainweb/Test/Utils.hs b/test/Chainweb/Test/Utils.hs index 2096e4f3ae..50797b4b9a 100644 --- a/test/Chainweb/Test/Utils.hs +++ b/test/Chainweb/Test/Utils.hs @@ -204,7 +204,7 @@ import Chainweb.Mempool.Mempool (MempoolBackend(..), TransactionHash(..), BlockF import Chainweb.MerkleUniverse import Chainweb.Miner.Config import Chainweb.Miner.Pact -import Chainweb.Pact.Backend.Types (SQLiteEnv) +import Chainweb.Pact.Backend.Types (Database) import Chainweb.Pact.Backend.Utils (openSQLiteConnection, closeSQLiteConnection, chainwebPragmas) import Chainweb.Payload.PayloadStore import Chainweb.RestAPI @@ -308,15 +308,15 @@ withRocksResource = view _2 . snd <$> allocate create destroy -- withSQLiteResource :: String - -> ResourceT IO SQLiteEnv + -> ResourceT IO Database withSQLiteResource file = snd <$> allocate (openSQLiteConnection file chainwebPragmas) closeSQLiteConnection -withTempSQLiteResource :: ResourceT IO SQLiteEnv +withTempSQLiteResource :: ResourceT IO Database withTempSQLiteResource = withSQLiteResource "" -withInMemSQLiteResource :: ResourceT IO SQLiteEnv +withInMemSQLiteResource :: ResourceT IO Database withInMemSQLiteResource = withSQLiteResource ":memory:" -- -------------------------------------------------------------------------- -- @@ -966,7 +966,8 @@ awaitBlockHeight v cenv i = do result <- retrying testRetryPolicy checkRetry $ const $ runClientM (cutGetClient v) cenv case result of - Left e -> throwM e + Left e -> + throwM e Right x | all (\bh -> _bhwhHeight bh >= i) (_cutHashes x) -> return () | otherwise -> error diff --git a/tools/cwtool/TxSimulator.hs b/tools/cwtool/TxSimulator.hs index bdf6029364..58bc1f80a1 100644 --- a/tools/cwtool/TxSimulator.hs +++ b/tools/cwtool/TxSimulator.hs @@ -106,7 +106,7 @@ simulate sc@(SimConfig dbDir txIdx' _ _ cid ver gasLog doTypecheck) = do pwos <- fetchOutputs sc cenv hdrs withSqliteDb cid cwLogger dbDir False $ \sqlenv -> do cp <- - initRelationalCheckpointer defaultModuleCacheLimit sqlenv logger ver cid + initRelationalCheckpointer defaultModuleCacheLimit (SQLiteEnv ReadWrite sqlenv) logger ver cid case (txIdx',doTypecheck) of (Just txIdx,_) -> do -- single-tx simulation let pwo = head pwos diff --git a/tools/db-checksum/CheckpointerDBChecksum.hs b/tools/db-checksum/CheckpointerDBChecksum.hs index 16844ccab2..f24daefe65 100644 --- a/tools/db-checksum/CheckpointerDBChecksum.hs +++ b/tools/db-checksum/CheckpointerDBChecksum.hs @@ -45,7 +45,6 @@ import Pact.Types.SQLite -- chainweb imports import Chainweb.BlockHeight -import Chainweb.Pact.Backend.Types import Chainweb.Pact.Backend.Utils hiding (callDb) import Chainweb.Pact.Service.Types import Chainweb.Utils hiding (check) @@ -197,14 +196,14 @@ work args = withSQLiteConnection (_sqliteFile args) chainwebPragmas (runReaderT <> Utf8 tbl <> "] WHERE txid > ? AND txid <= ? ORDER BY txid DESC, rowkey ASC, rowdata ASC;" -callDb :: T.Text -> (Database -> IO a) -> ReaderT SQLiteEnv IO a +callDb :: T.Text -> (Database -> IO a) -> ReaderT Database IO a callDb callerName action = do c <- ask tryAny (liftIO $ action c) >>= \case Left err -> internalError $ "callDb (" <> callerName <> "): " <> sshow err Right r -> return r -getUserTables :: BlockHeight -> BlockHeight -> ReaderT SQLiteEnv IO [B.ByteString] +getUserTables :: BlockHeight -> BlockHeight -> ReaderT Database IO [B.ByteString] getUserTables low high = callDb "getUserTables" $ \db -> do usertables <- (traverse toByteString . concat) <$> qry db stmt (SInt . fromIntegral <$> [low, high]) [RText] diff --git a/tools/ea/Ea.hs b/tools/ea/Ea.hs index bce1bef14c..89a57919a8 100644 --- a/tools/ea/Ea.hs +++ b/tools/ea/Ea.hs @@ -52,6 +52,7 @@ import Text.Printf import Chainweb.BlockHeaderDB import Chainweb.Logger (genericLogger) import Chainweb.Miner.Pact (noMiner) +import Chainweb.Pact.Backend.Types import Chainweb.Pact.Backend.Utils import Chainweb.Pact.PactService import Chainweb.Pact.Types (testPactServiceConfig) @@ -179,7 +180,7 @@ genPayloadModule v tag cid cwTxs = pdb <- newPayloadDb withSystemTempDirectory "ea-pact-db" $ \pactDbDir -> do T2 payloadWO _ <- withSqliteDb cid logger pactDbDir False $ \env -> - withPactService v cid logger bhdb pdb env testPactServiceConfig $ + withPactService v cid logger bhdb pdb (SQLiteEnv ReadWrite env) testPactServiceConfig $ execNewGenesisBlock noMiner (V.fromList cwTxs) return $ TL.toStrict $ TB.toLazyText $ payloadModuleCode tag payloadWO