Skip to content

Commit

Permalink
Process PactQueue concurrently with read-only connection.
Browse files Browse the repository at this point in the history
Signed-off-by: Evgenii Akentev <[email protected]>
Change-Id: Ia007a565d75c625ddb243534a546d57584ec8e7d
  • Loading branch information
Evgenii Akentev committed May 28, 2024
1 parent e363858 commit 09d9a79
Show file tree
Hide file tree
Showing 27 changed files with 414 additions and 304 deletions.
2 changes: 1 addition & 1 deletion bench/Chainweb/Pact/Backend/Bench.hs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ cpWithBench torun =
let neverLogger = genericLogger Error (\_ -> return ())
!sqliteEnv <- openSQLiteConnection dbFile chainwebPragmas
!cenv <-
initRelationalCheckpointer defaultModuleCacheLimit sqliteEnv DoNotPersistIntraBlockWrites neverLogger testVer testChainId
initRelationalCheckpointer defaultModuleCacheLimit (SQLiteEnv ReadWrite sqliteEnv) DoNotPersistIntraBlockWrites neverLogger testVer testChainId
return $ NoopNFData (sqliteEnv, cenv)

teardown (NoopNFData (sqliteEnv, _cenv)) = closeSQLiteConnection sqliteEnv
Expand Down
46 changes: 22 additions & 24 deletions bench/Chainweb/Pact/Backend/ForkingBench.hs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ import Chainweb.Pact.Backend.Compaction qualified as C
import Chainweb.Pact.Backend.Types
import Chainweb.Pact.Backend.Utils
import Chainweb.Pact.PactService
import Chainweb.Pact.Service.BlockValidation
import Chainweb.Pact.Service.BlockValidation as BlockValidation
import Chainweb.Pact.Service.PactQueue
import Chainweb.Pact.Service.Types
import Chainweb.Pact.Types
Expand Down Expand Up @@ -264,7 +264,8 @@ data Resources
, coinAccounts :: !(MVar (Map Account (NonEmpty (DynKeyPair, [SigCapability]))))
, nonceCounter :: !(IORef Word64)
, txPerBlock :: !(IORef Int)
, sqlEnv :: !SQLiteEnv
, writeSqlEnv :: !Database
, readSqlEnv :: !Database
}

type RunPactService =
Expand Down Expand Up @@ -296,47 +297,39 @@ withResources rdb trunkLength logLevel compact p f = C.envWithCleanup create des
coinAccounts <- newMVar mempty
nonceCounter <- newIORef 1
txPerBlock <- newIORef 10
sqlEnv <- openSQLiteConnection "" {- temporary SQLite db -} chainwebBenchPragmas

writeSqlEnv <- openSQLiteConnection "" {- temporary SQLite db -} chainwebPragmas
readSqlEnv <- openSQLiteConnection "" {- temporary SQLite db -} chainwebPragmas
mp <- testMemPoolAccess txPerBlock coinAccounts
pactService <-
startPact testVer logger blockHeaderDb payloadDb mp sqlEnv
startPact testVer logger blockHeaderDb payloadDb mp (writeSqlEnv, readSqlEnv)
mainTrunkBlocks <-
playLine payloadDb blockHeaderDb trunkLength genesisBlock (snd pactService) nonceCounter
when (compact == DoCompact) $ do
C.withDefaultLogger Error $ \lgr -> do
void $ C.compact (BlockHeight trunkLength) lgr sqlEnv []
void $ C.compact (BlockHeight trunkLength) lgr writeSqlEnv []

return $ NoopNFData $ Resources {..}

destroy (NoopNFData (Resources {..})) = do
stopPact pactService
stopSqliteDb sqlEnv
stopSqliteDb writeSqlEnv
stopSqliteDb readSqlEnv

pactQueueSize = 2000

logger = genericLogger logLevel T.putStrLn

startPact version l bhdb pdb mempool sqlEnv = do
startPact version l bhdb pdb mempool sqlEnvs = do
reqQ <- newPactQueue pactQueueSize
a <- async $ runPactService version cid l reqQ mempool bhdb pdb sqlEnv testPactServiceConfig
a <- async $ runPactService version cid l reqQ mempool bhdb pdb sqlEnvs testPactServiceConfig
{ _pactBlockGasLimit = 180_000
, _pactPersistIntraBlockWrites = p
}

return (a, reqQ)

stopPact (a, _) = cancel a

chainwebBenchPragmas =
[ "synchronous = NORMAL"
, "journal_mode = WAL"
, "locking_mode = EXCLUSIVE"
-- this is different from the prodcution database that uses @NORMAL@
, "temp_store = MEMORY"
, "auto_vacuum = NONE"
, "page_size = 1024"
]

genesisBlock :: BlockHeader
genesisBlock = genesisBlockHeader testVer cid

Expand Down Expand Up @@ -373,7 +366,7 @@ testMemPoolAccess txsPerBlock accounts = do
getTestBlock mVarAccounts txOrigTime validate bHeight hash
| bHeight == 1 = do
meta <- setTime txOrigTime <$> makeMeta cid
(as, kss, cmds) <- unzip3 . toList <$> createCoinAccounts testVer meta
(as, kss, cmds) <- unzip3 <$> createCoinAccounts testVer meta twoNames
case traverse validateCommand cmds of
Left err -> throwM $ userError err
Right !r -> do
Expand Down Expand Up @@ -468,15 +461,20 @@ stockKey s = do
stockKeyFile :: ByteString
stockKeyFile = $(embedFile "pact/genesis/devnet/keys.yaml")

createCoinAccounts :: ChainwebVersion -> PublicMeta -> IO (NonEmpty (Account, NonEmpty (DynKeyPair, [SigCapability]), Command Text))
createCoinAccounts v meta = traverse (go <*> createCoinAccount v meta) names
createCoinAccounts :: ChainwebVersion -> PublicMeta -> [String] -> IO [(Account, NonEmpty (DynKeyPair, [SigCapability]), Command Text)]
createCoinAccounts v meta names' = traverse (go <*> createCoinAccount v meta) names'
where
go a m = do
(b,c) <- m
return (Account a,b,c)

names :: NonEmpty String
names = NEL.map safeCapitalize . NEL.fromList $ Prelude.take 2 $ words "mary elizabeth patricia jennifer linda barbara margaret susan dorothy jessica james john robert michael william david richard joseph charles thomas"
twoNames :: [String]
twoNames = take 2 names

names :: [String]
names = map safeCapitalize $ names' ++ [(n ++ show x) | n <- names', x <- [0 :: Int ..1000]]
where
names' = words "mary elizabeth patricia jennifer linda barbara margaret susan dorothy jessica james john robert michael william david richard joseph charles thomas"

formatB16PubKey :: DynKeyPair -> Text
formatB16PubKey = \case
Expand Down
1 change: 0 additions & 1 deletion src/Chainweb/Chainweb.hs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,6 @@ withChainwebInternal
-> (StartedChainweb logger -> IO ())
-> IO ()
withChainwebInternal conf logger peer serviceSock rocksDb pactDbDir backupDir resetDb inner = do

unless (_configOnlySyncPact conf || _configReadOnlyReplay conf) $
initializePayloadDb v payloadDb

Expand Down
12 changes: 6 additions & 6 deletions src/Chainweb/Pact/Backend/ChainwebPactDb.hs
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ createVersionedTable tablename db = do

-- | Delete any state from the database newer than the input parent header.
rewindDbTo
:: SQLiteEnv
:: Database
-> Maybe ParentHeader
-> IO ()
rewindDbTo db Nothing = rewindDbToGenesis db
Expand All @@ -656,7 +656,7 @@ rewindDbTo db mh@(Just (ParentHeader ph)) = do

-- rewind before genesis, delete all user tables and all rows in all tables
rewindDbToGenesis
:: SQLiteEnv
:: Database
-> IO ()
rewindDbToGenesis db = do
exec_ db "DELETE FROM BlockHistory;"
Expand Down Expand Up @@ -742,7 +742,7 @@ rewindDbToBlock db bh endingTxId = do
exec' db "DELETE FROM TransactionIndex WHERE blockheight > ?;"
[ SInt (fromIntegral bh) ]

commitBlockStateToDatabase :: SQLiteEnv -> BlockHash -> BlockHeight -> BlockState -> IO ()
commitBlockStateToDatabase :: Database -> BlockHash -> BlockHeight -> BlockState -> IO ()
commitBlockStateToDatabase db hsh bh blockState = do
let newTables = _pendingTableCreation $ _bsPendingBlock blockState
mapM_ (\tn -> createUserTable (Utf8 tn)) newTables
Expand Down Expand Up @@ -808,7 +808,7 @@ commitBlockStateToDatabase db hsh bh blockState = do


-- | Create all tables that exist pre-genesis
initSchema :: (Logger logger) => logger -> SQLiteEnv -> IO ()
initSchema :: (Logger logger) => logger -> Database -> IO ()
initSchema logger sql =
withSavepoint sql DbTransaction $ do
createBlockHistoryTable
Expand Down Expand Up @@ -860,12 +860,12 @@ initSchema logger sql =
"CREATE INDEX IF NOT EXISTS \
\ transactionIndexByBH ON TransactionIndex(blockheight)";

getEndTxId :: Text -> SQLiteEnv -> Maybe ParentHeader -> IO TxId
getEndTxId :: Text -> Database -> Maybe ParentHeader -> IO TxId
getEndTxId msg sql pc = case pc of
Nothing -> return 0
Just (ParentHeader ph) -> getEndTxId' msg sql (_blockHeight ph) (_blockHash ph)

getEndTxId' :: Text -> SQLiteEnv -> BlockHeight -> BlockHash -> IO TxId
getEndTxId' :: Text -> Database -> BlockHeight -> BlockHash -> IO TxId
getEndTxId' msg sql bh bhsh = do
r <- qry sql
"SELECT endingtxid FROM BlockHistory WHERE blockheight = ? and hash = ?;"
Expand Down
3 changes: 1 addition & 2 deletions src/Chainweb/Pact/Backend/PactState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ import Database.SQLite3.Direct qualified as SQL

import Chainweb.BlockHeight (BlockHeight(..))
import Chainweb.Logger (Logger, addLabel)
import Chainweb.Pact.Backend.Types (SQLiteEnv)
import Chainweb.Pact.Backend.Utils (fromUtf8, withSqliteDb)
import Chainweb.Utils (int)
import Chainweb.Version (ChainId, ChainwebVersion, chainIdToText)
Expand Down Expand Up @@ -140,7 +139,7 @@ withChainDb :: (Logger logger)
=> ChainId
-> logger
-> FilePath
-> (logger -> SQLiteEnv -> IO x)
-> (logger -> Database -> IO x)
-> IO x
withChainDb cid logger' path f = do
let logger = addChainIdLabel cid logger'
Expand Down
4 changes: 2 additions & 2 deletions src/Chainweb/Pact/Backend/PactState/GrandHash/Calc.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import Chainweb.Pact.Backend.PactState.EmbeddedSnapshot (Snapshot(..))
import Chainweb.Pact.Backend.PactState.EmbeddedSnapshot.Mainnet qualified as MainnetSnapshot
import Chainweb.Pact.Backend.PactState.GrandHash.Algorithm (ChainGrandHash(..))
import Chainweb.Pact.Backend.PactState.GrandHash.Utils (resolveLatestCutHeaders, resolveCutHeadersAtHeights, computeGrandHashesAt, withConnections, hex, rocksParser, cwvParser)
import Chainweb.Pact.Backend.Types (SQLiteEnv)
import Chainweb.Pact.Backend.Types (Database)
import Chainweb.Storage.Table.RocksDB (RocksDb, withReadOnlyRocksDb, modernDefaultOptions)
import Chainweb.Utils (sshow)
import Chainweb.Version (ChainwebVersion(..), ChainwebVersionName(..))
Expand Down Expand Up @@ -75,7 +75,7 @@ data BlockHeightTargets
pactCalc :: (Logger logger)
=> logger
-> ChainwebVersion
-> HashMap ChainId SQLiteEnv
-> HashMap ChainId Database
-- ^ pact database dir
-> RocksDb
-- ^ rocksdb dir
Expand Down
6 changes: 3 additions & 3 deletions src/Chainweb/Pact/Backend/PactState/GrandHash/Import.hs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ import Chainweb.Pact.Backend.PactState.EmbeddedSnapshot (Snapshot(..))
import Chainweb.Pact.Backend.PactState.EmbeddedSnapshot.Mainnet qualified as MainnetSnapshots
import Chainweb.Pact.Backend.PactState.GrandHash.Utils (resolveLatestCutHeaders, resolveCutHeadersAtHeight, computeGrandHashesAt, exitLog, withConnections, chainwebDbFilePath, rocksParser, cwvParser)
import Chainweb.Pact.Backend.RelationalCheckpointer (withProdRelationalCheckpointer)
import Chainweb.Pact.Backend.Types (SQLiteEnv, _cpRewindTo)
import Chainweb.Pact.Backend.Types (Database, _cpRewindTo, SQLiteEnv(..), SQLiteConnectionType(..))
import Chainweb.Pact.Service.Types (IntraBlockPersistence(..))
import Chainweb.Pact.Types (defaultModuleCacheLimit)
import Chainweb.Storage.Table.RocksDB (RocksDb, withReadOnlyRocksDb, modernDefaultOptions)
Expand Down Expand Up @@ -93,7 +93,7 @@ import System.LogLevel (LogLevel(..))
pactVerify :: (Logger logger)
=> logger
-> ChainwebVersion
-> HashMap ChainId SQLiteEnv
-> HashMap ChainId Database
-- ^ pact connections
-> RocksDb
-- ^ rocksDb
Expand Down Expand Up @@ -187,7 +187,7 @@ pactDropPostVerified logger v srcDir tgtDir snapshotBlockHeight snapshotChainHas
let logger' = addChainIdLabel cid logger
logFunctionText logger' Info
$ "Dropping anything post verified state (BlockHeight " <> sshow snapshotBlockHeight <> ")"
withProdRelationalCheckpointer logger defaultModuleCacheLimit sqliteEnv DoNotPersistIntraBlockWrites v cid $ \cp -> do
withProdRelationalCheckpointer logger defaultModuleCacheLimit (SQLiteEnv ReadWrite sqliteEnv) DoNotPersistIntraBlockWrites v cid $ \cp -> do
_cpRewindTo cp (Just $ ParentHeader $ blockHeader $ snapshotChainHashes ^?! ix cid)

data PactImportConfig = PactImportConfig
Expand Down
18 changes: 9 additions & 9 deletions src/Chainweb/Pact/Backend/PactState/GrandHash/Utils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import Chainweb.Logger (Logger, logFunctionText)
import Chainweb.Pact.Backend.PactState (getLatestPactStateAt, getLatestBlockHeight, addChainIdLabel)
import Chainweb.Pact.Backend.PactState.EmbeddedSnapshot (Snapshot(..))
import Chainweb.Pact.Backend.PactState.GrandHash.Algorithm (computeGrandHash)
import Chainweb.Pact.Backend.Types (SQLiteEnv)
import Chainweb.Pact.Backend.Types (Database)
import Chainweb.Pact.Backend.Utils (startSqliteDb, stopSqliteDb)
import Chainweb.Storage.Table.RocksDB (RocksDb)
import Chainweb.TreeDB (seekAncestor)
Expand Down Expand Up @@ -69,7 +69,7 @@ limitCut :: (Logger logger)
=> logger
-> WebBlockHeaderDb
-> HashMap ChainId BlockHeader -- ^ latest cut headers
-> HashMap ChainId SQLiteEnv
-> HashMap ChainId Database
-> BlockHeight
-> IO (HashMap ChainId BlockHeader)
limitCut logger wbhdb latestCutHeaders pactConns blockHeight = do
Expand Down Expand Up @@ -117,7 +117,7 @@ getLatestCutHeaders v rocksDb = do
resolveLatestCutHeaders :: (Logger logger)
=> logger
-> ChainwebVersion
-> HashMap ChainId SQLiteEnv
-> HashMap ChainId Database
-> RocksDb
-> IO (BlockHeight, HashMap ChainId BlockHeader)
resolveLatestCutHeaders logger v pactConns rocksDb = do
Expand All @@ -131,7 +131,7 @@ resolveLatestCutHeaders logger v pactConns rocksDb = do
resolveCutHeadersAtHeight :: (Logger logger)
=> logger
-> ChainwebVersion
-> HashMap ChainId SQLiteEnv
-> HashMap ChainId Database
-> RocksDb
-> BlockHeight
-> IO (HashMap ChainId BlockHeader)
Expand All @@ -146,7 +146,7 @@ resolveCutHeadersAtHeight logger v pactConns rocksDb target = do
resolveCutHeadersAtHeights :: (Logger logger)
=> logger
-> ChainwebVersion
-> HashMap ChainId SQLiteEnv
-> HashMap ChainId Database
-> RocksDb
-> [BlockHeight] -- ^ targets
-> IO [(BlockHeight, HashMap ChainId BlockHeader)]
Expand All @@ -159,7 +159,7 @@ resolveCutHeadersAtHeights logger v pactConns rocksDb targets = do
-- a 'BlockHeader' with the computed 'ChainGrandHash' at the header's
-- 'BlockHeight'.
computeGrandHashesAt :: ()
=> HashMap ChainId SQLiteEnv
=> HashMap ChainId Database
-- ^ pact connections
-> HashMap ChainId BlockHeader
-- ^ Resolved targets, i.e, blockheights that are accessible per each
Expand Down Expand Up @@ -202,17 +202,17 @@ withConnections :: (Logger logger)
=> logger
-> FilePath
-> [ChainId]
-> (HashMap ChainId SQLiteEnv -> IO x)
-> (HashMap ChainId Database -> IO x)
-> IO x
withConnections logger pactDir cids f = do
checkPactDbsExist pactDir cids
bracket openConnections closeConnections f
where
openConnections :: IO (HashMap ChainId SQLiteEnv)
openConnections :: IO (HashMap ChainId Database)
openConnections = fmap HM.fromList $ forM cids $ \cid -> do
(cid, ) <$> startSqliteDb cid logger pactDir False

closeConnections :: HashMap ChainId SQLiteEnv -> IO ()
closeConnections :: HashMap ChainId Database -> IO ()
closeConnections = mapM_ stopSqliteDb

hex :: ByteString -> Text
Expand Down
Loading

0 comments on commit 09d9a79

Please sign in to comment.