diff --git a/.gitignore b/.gitignore index aafa48011d..0259a974f7 100644 --- a/.gitignore +++ b/.gitignore @@ -53,3 +53,4 @@ rosetta/logs/* .ghci_history .direnv/ .envrc +massif.out.* diff --git a/chainweb.cabal b/chainweb.cabal index 57f95d768f..dc34e38d77 100644 --- a/chainweb.cabal +++ b/chainweb.cabal @@ -299,6 +299,7 @@ library , Chainweb.Pact.Backend.ChainwebPactDb , Chainweb.Pact.Backend.DbCache , Chainweb.Pact.Backend.Compaction + , Chainweb.Pact.Backend.CompactionInMemory , Chainweb.Pact.Backend.PactState , Chainweb.Pact.Backend.PactState.Diff , Chainweb.Pact.Backend.PactState.EmbeddedSnapshot @@ -373,9 +374,12 @@ library , chronos >= 1.1 , clock >= 0.7 , configuration-tools >= 0.6 - , crypton-connection >=0.2 , containers >= 0.5 , crypton >= 0.31 + , crypton-connection >=0.2 + , crypton-x509 >=1.7 + , crypton-x509-system >=1.6 + , crypton-x509-validation >=1.6 , cuckoo >= 0.3 , data-default >=0.7 , data-dword >= 0.3 @@ -402,13 +406,14 @@ library , ixset-typed >= 0.4 , lens >= 4.17 , loglevel >= 0.1 + , lrucaching >= 0.3 , memory >=0.14 , merkle-log >=0.2 , mmorph >= 1.1 , monad-control >= 1.0 , mtl >= 2.3 - , mwc-random >= 0.13 , mwc-probability >= 2.0 && <2.4 + , mwc-random >= 0.13 , network >= 3.1.2 , optparse-applicative >= 0.14 , pact >= 4.2.0.1 @@ -417,6 +422,7 @@ library , patience >= 0.3 , pem >=0.2 , primitive >= 0.7.1.0 + , process >= 1.6 , random >= 1.2 , rosetta >= 1.0 , safe-exceptions >= 0.1 @@ -432,29 +438,26 @@ library , streaming-commons >= 0.2 , template-haskell >= 2.14 , text >= 2.0 - , trifecta >= 2.1 , these >= 1.0 , time >= 1.12.2 , tls >=1.9 , tls-session-manager >= 0.0 , token-bucket >= 0.1 , transformers >= 0.5 + , trifecta >= 2.1 , unliftio ^>= 0.2 , unordered-containers >= 0.2.16 , uuid >= 1.3.15 + , vector >= 0.12.2 + , vector-algorithms >= 0.7 , wai >= 3.2.2.1 , wai-app-static >= 3.1.6.3 , wai-cors >= 0.2.7 , wai-extra >= 3.0.28 - , wai-middleware-validation - , vector >= 0.12.2 - , vector-algorithms >= 0.7 , wai-middleware-throttle >= 0.3 + , wai-middleware-validation , warp >= 3.3.6 , warp-tls >= 3.4 - , crypton-x509 >=1.7 - , crypton-x509-system >=1.6 - , crypton-x509-validation >=1.6 , yaml >= 0.11 , yet-another-logger >= 0.4.1 diff --git a/default.nix b/default.nix index c04e975b1c..cd670c1ef4 100644 --- a/default.nix +++ b/default.nix @@ -76,6 +76,9 @@ let haskellSrc = with nix-filter.lib; filter { shell.buildInputs = with pkgs; [ zlib pkgconfig + sqlite + valgrind + massif-visualizer ]; modules = [ { diff --git a/src/Chainweb/Pact/Backend/CompactionInMemory.hs b/src/Chainweb/Pact/Backend/CompactionInMemory.hs new file mode 100644 index 0000000000..b56354f546 --- /dev/null +++ b/src/Chainweb/Pact/Backend/CompactionInMemory.hs @@ -0,0 +1,629 @@ +{-# language + BangPatterns + , DerivingStrategies + , FlexibleContexts + , GeneralizedNewtypeDeriving + , ImportQualifiedPost + , LambdaCase + , NumericUnderscores + , OverloadedRecordDot + , OverloadedStrings + , PackageImports + , ScopedTypeVariables + , TypeApplications +#-} + +{-# options_ghc -fno-warn-unused-imports #-} + +module Chainweb.Pact.Backend.CompactionInMemory + ( main + ) + where + +import "base" System.Exit (ExitCode(..), exitFailure, exitWith) +import "loglevel" System.LogLevel qualified as LL +import "yet-another-logger" System.Logger hiding (Logger) +import "yet-another-logger" System.Logger qualified as YAL +import Chainweb.BlockHeight (BlockHeight(..)) +import Chainweb.Logger (Logger, l2l, setComponent, logFunctionText) +import Chainweb.Pact.Backend.ChainwebPactDb () +import Chainweb.Pact.Backend.PactState +import Chainweb.Pact.Backend.Utils (fromUtf8, toUtf8, withSqliteDb) +import Chainweb.Utils (T2(..), sshow, fromText, toText, int) +import Chainweb.Version (ChainId, ChainwebVersion(..), unsafeChainId, chainIdToText) +import Chainweb.Version.Development (devnet) +import Chainweb.Version.Mainnet (mainnet) +import Chainweb.Version.Registry (lookupVersionByName) +import Chainweb.Version.Testnet (testnet) +import Chainweb.Version.Utils (chainIdsAt) +import Chronos qualified +import Control.Concurrent (forkIO, threadDelay) +import Control.Concurrent.MVar (swapMVar, readMVar, newMVar) +import Control.Exception (Exception, SomeException(..), bracket_, finally) +import Control.Lens (makeLenses, set, over, view, (^.), _2, (^?!), ix) +import Control.Monad (forM, forM_, unless, void, when) +import Control.Monad.Catch (MonadCatch(catch), MonadThrow(throwM)) +import Control.Monad.IO.Class (MonadIO(liftIO)) +import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, local) +import Control.Monad.Trans.Control (MonadBaseControl, liftBaseOp) +import Data.Bifunctor (second) +import Data.ByteString (ByteString) +import Data.ByteString qualified as BS +import Data.ByteString.Builder qualified as BB +import Data.ByteString.Lazy qualified as BL +import Data.Coerce (coerce) +import Data.Csv (ToRecord(..)) +import Data.Csv qualified as Csv +import Data.Csv.Builder qualified as Csv +import Data.Foldable qualified as F +import Data.Function (fix, (&)) +import Data.HashMap.Strict (HashMap) +import Data.HashMap.Strict qualified as HM +import Data.IORef (IORef, readIORef, newIORef, atomicModifyIORef') +import Data.Int (Int64) +import Data.List qualified as List +import Data.LogMessage +import Data.LruCache (LruCache) +import Data.LruCache qualified as Lru +import Data.Map (Map) +import Data.Map.Strict qualified as M +import Data.Maybe (fromMaybe) +import Data.Ord (Down(..)) +import Data.Set (Set) +import Data.Set qualified as Set +import Data.String (IsString) +import Data.Text (Text) +import Data.Text qualified as Text +import Data.Text.Encoding qualified as Text +import Data.Vector (Vector) +import Data.Vector qualified as V +import Data.Word (Word64) +import Database.SQLite3 qualified as Lite +import Database.SQLite3 qualified as SQL +import Database.SQLite3.Direct (Utf8(..), Database) +import Database.SQLite3.Direct qualified as SQL +import GHC.Stack (HasCallStack) +import Options.Applicative qualified as O +import Pact.Types.Persistence (TxId(..)) +import Pact.Types.SQLite (SType(..), RType(..)) +import Pact.Types.SQLite qualified as Pact +import Prelude hiding (log) +import Streaming (Stream, Of) +import Streaming qualified as S +import Streaming.Prelude qualified as S +import System.Directory (createDirectoryIfMissing, doesDirectoryExist, removeFile) +import System.FilePath ((), addExtension) +import System.IO (Handle) +import System.IO qualified as IO +import System.IO.Unsafe (unsafePerformIO) +import System.Logger.Backend.ColorOption (useColor) +import System.Process (shell, readCreateProcessWithExitCode) +import UnliftIO.Async (pooledForConcurrently_) + +withDefaultLogger :: LL.LogLevel -> (YAL.Logger SomeLogMessage -> IO a) -> IO a +withDefaultLogger ll f = withHandleBackend_ logText handleCfg $ \b -> + withLogger defaultLoggerConfig b $ \l -> f (set setLoggerLevel (l2l ll) l) + where + handleCfg = defaultHandleBackendConfig + { _handleBackendConfigHandle = StdErr + } + +withPerChainFileLogger :: FilePath -> ChainId -> LL.LogLevel -> (YAL.Logger SomeLogMessage -> IO a) -> IO a +withPerChainFileLogger ld chainId ll f = do + createDirectoryIfMissing True {- do create parents -} ld + let logFile = ld ("chain-" <> cid <> ".log") + let handleConfig = defaultHandleBackendConfig + { _handleBackendConfigHandle = FileHandle logFile + } + withHandleBackend_' logText handleConfig $ \h b -> do + + done <- newMVar False + void $ forkIO $ fix $ \go -> do + doneYet <- readMVar done + let flush = do + w <- IO.hIsOpen h + when w (IO.hFlush h) + unless doneYet $ do + flush + threadDelay 5_000_000 + go + flush + + withLogger defaultLoggerConfig b $ \l -> do + let logger = setComponent "compaction" + $ over setLoggerScope (("chain", chainIdToText chainId) :) + $ set setLoggerLevel (l2l ll) l + a <- f logger + void $ swapMVar done True + pure a + where + cid = Text.unpack (chainIdToText chainId) + +withHandleBackend_' :: (MonadIO m, MonadBaseControl IO m) + => (msg -> Text) + -> HandleBackendConfig + -> (Handle -> LoggerBackend msg -> m a) + -> m a +withHandleBackend_' format conf inner = + case conf ^. handleBackendConfigHandle of + StdErr -> run IO.stderr + StdOut -> run IO.stdout + FileHandle file -> liftBaseOp (IO.withFile file IO.AppendMode) run + where + run h = do + colored <- liftIO $ useColor (conf ^. handleBackendConfigColor) h + inner h (handleBackend_ format h colored) + +newtype TableName = TableName { getTableName :: Utf8 } + deriving newtype (Eq, Ord) + deriving stock (Show) + +data Config = Config + { chainwebVersion :: ChainwebVersion + , sourceDir :: FilePath + , targetDir :: FilePath + , concurrent :: ConcurrentChains + , logDir :: FilePath + } + +data ConcurrentChains = SingleChain | ManyChainsAtOnce + +getConfig :: IO Config +getConfig = do + O.execParser opts + where + opts :: O.ParserInfo Config + opts = O.info (parser O.<**> O.helper) + (O.fullDesc <> O.progDesc "Pact DB Compaction Tool - create a compacted copy of the source directory Pact DB into the target directory.") + + parser :: O.Parser Config + parser = Config + <$> (parseVersion <$> O.strOption (O.long "chainweb-version" <> O.value "mainnet01")) + <*> O.strOption (O.long "source-directory") + <*> O.strOption (O.long "target-directory") + <*> O.flag SingleChain ManyChainsAtOnce (O.long "parallel") + <*> O.strOption (O.long "log-dir") + + parseVersion :: Text -> ChainwebVersion + parseVersion = + lookupVersionByName + . fromMaybe (error "ChainwebVersion parse failed") + . fromText + +main :: IO () +main = do + compact =<< getConfig + +compact :: Config -> IO () +compact cfg = do + let cids = allChains cfg.chainwebVersion + + targetBlockHeight <- withDefaultLogger LL.Debug $ \logger -> do + targetBlockHeight <- locateLatestSafeTarget logger cfg.chainwebVersion cfg.sourceDir cids + + targetDirExists <- doesDirectoryExist cfg.targetDir + when targetDirExists $ do + exitLog logger "Target directory already exists. Aborting." + + -- Create the target directory, and its parents too (if missing) + createDirectoryIfMissing True cfg.targetDir + + pure targetBlockHeight + + -- TODO (chessai): these may need tuning + -- + -- journal_mode = OFF is terrible for prod but probably OK here + -- since we are just doing a bunch of bulk inserts + let fastBulkInsertPragmas = + [ "journal_mode = OFF" + , "synchronous = OFF" + -- , "cache_size = 488281" -- in pages. desiredMax = 2.0GB. defaultPageSize = 4096 bytes/page. divide the two -- TODO: calculate from a flag that takes cache_size + , "temp_store = FILE" + , "shrink_memory" + ] + + -- TODO: attempt to apply pragmas to the source database for faster reading + --let fastReadPragmas = + -- [ + -- ] + + -- TODO: add progress meter or something? + forChains_ cfg.concurrent cids $ \cid -> do + withPerChainFileLogger cfg.logDir cid LL.Debug $ \logger -> do + withChainDb cid logger cfg.sourceDir $ \_ srcDb -> do + withChainDb cid logger cfg.targetDir $ \_ targetDb -> do + let log = logFunctionText logger + + -- Establish pragmas for bulk insert performance + -- + -- Note that we can't apply pragmas to the src + -- because we can't guarantee it's not being accessed + -- by another process. + Pact.runPragmas targetDb fastBulkInsertPragmas + + -- Create checkpointer tables on the target + createCheckpointerTables targetDb logger + + -- Compact BlockHistory + -- This is extremely fast and low residency + do + log LL.Info "Compacting BlockHistory" + activeRow <- getBlockHistoryRowAt logger srcDb targetBlockHeight + Pact.exec' targetDb "INSERT INTO BlockHistory VALUES (?1, ?2, ?3)" activeRow + + -- Compact VersionedTableMutation + -- This is extremely fast and low residency + do + log LL.Info "Compacting VersionedTableMutation" + activeRows <- getVersionedTableMutationRowsAt logger srcDb targetBlockHeight + Lite.withStatement targetDb "INSERT INTO VersionedTableMutation VALUES (?1, ?2)" $ \stmt -> do + forM_ activeRows $ \row -> do + Pact.bindParams stmt row + void $ stepThenReset stmt + + -- Copy over VersionedTableCreation (it isn't compacted) + -- This is pretty fast and low residency + do + log LL.Info "Copying over VersionedTableCreation" + let wholeTableQuery = "SELECT tablename, createBlockheight FROM VersionedTableCreation" + throwSqlError $ qryStream srcDb wholeTableQuery [] [RText, RInt] $ \tblRows -> do + Lite.withStatement targetDb "INSERT INTO VersionedTableCreation VALUES (?1, ?2)" $ \stmt -> do + flip S.mapM_ tblRows $ \row -> do + Pact.bindParams stmt row + void $ stepThenReset stmt + + -- Copy over TransactionIndex + -- + -- It isn't (currently) compacted so the amount of data is quite large + -- This, SYS:Pacts, and coin_coin-table were all used as benchmarks + -- for optimisations. + -- + -- Maybe consider + -- https://tableplus.com/blog/2018/07/sqlite-how-to-copy-table-to-another-database.html + do + log LL.Info "Copying over TransactionIndex" + let wholeTableQuery = "SELECT txhash, blockheight FROM TransactionIndex ORDER BY blockheight" + + throwSqlError $ qryStream srcDb wholeTableQuery [] [RBlob, RInt] $ \tblRows -> do + Lite.withStatement targetDb "INSERT INTO TransactionIndex VALUES (?1, ?2)" $ \stmt -> do + -- I experimented a bunch with chunk sizes, to keep transactions + -- small. As far as I can tell, there isn't really much + -- difference in any of them wrt residency, but there is wrt + -- speed. More experimentation may be needed here, but 10k is + -- fine so far. + S.chunksOf 10_000 tblRows + & S.mapsM_ (\chunk -> do + inTx targetDb $ flip S.mapM_ chunk $ \row -> do + Pact.bindParams stmt row + void (stepThenReset stmt) + ) + + -- Vacuuming after copying over all of the TransactionIndex data, + -- but before creating its indices, makes a big differences in + -- memory residency (~0.5G), at the expense of speed (~20s increase) + Pact.exec_ targetDb "VACUUM;" + + -- Create the checkpointer table indices after bulk-inserting into them + -- This is faster than creating the indices before + createCheckpointerIndexes targetDb logger + + -- Compact all user tables + log LL.Info "Starting user tables" + getLatestPactTableNamesAt srcDb targetBlockHeight + & S.mapM_ (\tblname -> do + compactTable logger srcDb targetDb (fromUtf8 tblname) targetBlockHeight + ) + + log LL.Info "Compaction done" + +compactTable :: (Logger logger) + => logger -- ^ logger + -> Database -- ^ source database (where we get the active pact state) + -> Database -- ^ target database (where we put the compacted state, + use as a rowkey cache) + -> Text -- ^ the table we are compacting + -> BlockHeight -- ^ target blockheight + -> IO () +compactTable logger srcDb targetDb tblname targetBlockHeight = do + let log = logFunctionText logger + let tblnameUtf8 = toUtf8 tblname + + log LL.Info $ "Creating table " <> tblname + createUserTable targetDb tblnameUtf8 + + -- We create the user table indices before inserting into the table. + -- This makes the insertions slower, but it's for good reason. + -- + -- The query that grabs the pact state from the source db groups rowkeys + -- in descending order by txid. We then simply need to keep only the first + -- appearance of each rowkey. A simple in-memory cache does not suffice, + -- because we have strict max residency requirements. So in order to fully + -- stream with minimal residency, we use the target database as a rowkey cache. + -- For each rowkey, we check if it appears in the target, and if it does, we + -- discard that row and move on to the next. This is why we need the indices, + -- because this membership check is extremely slow without it, and it far + -- outweighs the insert slowdowns imposed by the indices. However, because + -- the SQLite-based rowkey cache is still somewhat slow, we also employ + -- an in-memory LRU cache that is checked beforehand. The LRU cache stores + -- hashes of the rowkeys, so the unbounded size of rowkeys is not a problem + -- for its residency. Though, at any point we could encounter a very large + -- rowkey. + log LL.Info $ "Creating table indices for " <> tblname + createUserTableIndex targetDb tblnameUtf8 + + -- Grab the endingtxid for determining latest state at the + -- target height + endingTxId <- getEndingTxId srcDb targetBlockHeight + + let getActiveState :: () + => Lite.Statement + -> LruCache ByteString () + -> Stream (Of [SType]) IO (Either SQL.Error ()) + -> Stream (Of PactRow) IO () + getActiveState rkSqliteCache = go + where + go rkMemCache s = do + e <- liftIO (S.next s) + case e of + Left (Left sqlErr) -> do + liftIO $ exitLog logger $ "Encountered SQL Error during getActiveState: " <> sshow sqlErr + Left (Right ()) -> do + pure () + Right (row, rest) -> do + case row of + [SText (Utf8 rk), SInt tid, SBlob rd] -> do + -- Lookup in the LRU Cache, then fall back to the SQLite + -- cache + let useCache :: IO (Maybe PactRow, LruCache ByteString ()) + useCache = do + case Lru.lookup rk rkMemCache of + Nothing -> do + inTargetDb <- do + Pact.qrys rkSqliteCache [SText (Utf8 rk)] [RInt] >>= \case + [] -> pure False + [[SInt 1]] -> pure True + _ -> exitLog logger "getActiveState: invalid membership query" + let !newCache = Lru.insert rk () rkMemCache + if inTargetDb + then do + pure (Nothing, newCache) + else do + pure (Just (PactRow rk rd tid), newCache) + Just ((), newCache) -> do + pure (Nothing, newCache) + + (maybeRow, newRkCache) <- liftIO useCache + forM_ maybeRow $ \r -> S.yield r + go newRkCache rest + _ -> do + liftIO $ exitLog logger "Encountered invalid row shape during getActiveState" + + -- This query gets all rows at or below (older than) the target blockheight + let activeStateQryText = "SELECT rowkey, txid, rowdata FROM " + <> "[" <> tblnameUtf8 <> "]" + <> " WHERE txid < ?1" + <> " ORDER BY rowid DESC" -- txid ordering agrees with rowid ordering, but rowid is much faster + let activeStateQryArgs = [SInt endingTxId] + let activeStateQryRetTypes = [RText, RInt, RBlob] + + -- This query checks for rowkey membership in the target db (implements the + -- SQLite part of the rowkey cache) + let checkTargetForRkText = "SELECT 1 FROM [" <> tblname <> "] WHERE rowkey = ?1" + -- This query inserts rows into the target + -- TODO: Investigate if bulkInsert matters + let insertQryText = "INSERT INTO " <> fromUtf8 (tbl tblnameUtf8) <> " (rowkey, txid, rowdata) VALUES (?1, ?2, ?3)" + -- The LRU rowkey cache. Right now a capacity of 500 is used. This was just + -- determined by vibes. It can probably be larger because it just contains + -- hashes. + let rkMemCache = Lru.empty @ByteString @() 500 + qryStream srcDb activeStateQryText activeStateQryArgs activeStateQryRetTypes $ \rs -> do + Lite.withStatement targetDb checkTargetForRkText $ \rkSqliteCacheStmt -> do + Lite.withStatement targetDb insertQryText $ \insertStmt -> do + log LL.Info $ "Inserting compacted rows into table " <> tblname + getActiveState rkSqliteCacheStmt rkMemCache rs + -- Need to experiment more with chunk sizes. Trying to keep + -- transaction residency low while balancing speed. + -- + -- Perhaps instead of number of rows, this needs to be based + -- on the cumulative size (in bytes) of the rowkey+rowdata + & S.chunksOf 10_000 + & S.mapsM_ (\chunk -> do + inTx targetDb $ flip S.mapM_ chunk $ \pr -> do + let row = [SText (Utf8 pr.rowKey), SInt pr.txId, SBlob pr.rowData] + Pact.bindParams insertStmt row + void $ stepThenReset insertStmt + ) + -- Maybe this makes sense some of the time? Speed vs mem tradeoff + {- & (\s -> inTx targetDb $ flip S.mapM_ s (\pr -> do + let row = [SText (Utf8 pr.rowKey), SInt pr.txId, SBlob pr.rowData] + Pact.bindParams insertStmt row + void $ stepThenReset insertStmt + )) -} + + log LL.Info $ "Done compacting table " <> tblname + +createCheckpointerTables :: (Logger logger) + => Database + -> logger + -> IO () +createCheckpointerTables db logger = do + let log = logFunctionText logger LL.Info + + log "Creating Checkpointer table BlockHistory" + inTx db $ Pact.exec_ db $ mconcat + [ "CREATE TABLE IF NOT EXISTS BlockHistory " + , "(blockheight UNSIGNED BIGINT NOT NULL" + , ", hash BLOB NOT NULL" + , ", endingtxid UNSIGNED BIGINT NOT NULL" + , ");" + ] + + log "Creating Checkpointer table VersionedTableCreation" + inTx db $ Pact.exec_ db $ mconcat + [ "CREATE TABLE IF NOT EXISTS VersionedTableCreation " + , "(tablename TEXT NOT NULL" + , ", createBlockheight UNSIGNED BIGINT NOT NULL" + , ");" + ] + + log "Creating Checkpointer table VersionedTableMutation" + inTx db $ Pact.exec_ db $ mconcat + [ "CREATE TABLE IF NOT EXISTS VersionedTableMutation " + , "(tablename TEXT NOT NULL" + , ", blockheight UNSIGNED BIGINT NOT NULL" + , ");" + ] + + log "Creating Checkpointer table TransactionIndex" + inTx db $ Pact.exec_ db $ mconcat + [ "CREATE TABLE IF NOT EXISTS TransactionIndex " + , "(txhash BLOB NOT NULL" + , ", blockheight UNSIGNED BIGINT NOT NULL" + , ");" + ] + + forM_ ["BlockHistory", "VersionedTableCreation", "VersionedTableMutation", "TransactionIndex"] $ \tblname -> do + log $ "Deleting from table " <> fromUtf8 tblname + Pact.exec_ db $ "DELETE FROM " <> tbl tblname + +createCheckpointerIndexes :: (Logger logger) => Database -> logger -> IO () +createCheckpointerIndexes db logger = do + let log = logFunctionText logger LL.Info + + log "Creating BlockHistory index" + inTx db $ Pact.exec_ db + "CREATE UNIQUE INDEX IF NOT EXISTS BlockHistory_blockheight_unique_ix ON BlockHistory (blockheight)" + + log "Creating VersionedTableCreation index" + inTx db $ Pact.exec_ db + "CREATE UNIQUE INDEX IF NOT EXISTS VersionedTableCreation_createBlockheight_tablename_unique_ix ON VersionedTableCreation (createBlockheight, tablename)" + + log "Creating VersionedTableMutation index" + inTx db $ Pact.exec_ db + "CREATE UNIQUE INDEX IF NOT EXISTS VersionedTableMutation_blockheight_tablename_unique_ix ON VersionedTableMutation (blockheight, tablename)" + + log "Creating TransactionIndex indexes" + inTx db $ Pact.exec_ db + "CREATE UNIQUE INDEX IF NOT EXISTS TransactionIndex_txhash_unique_ix ON TransactionIndex (txhash)" + inTx db $ Pact.exec_ db + "CREATE INDEX IF NOT EXISTS TransactionIndex_blockheight_ix ON TransactionIndex (blockheight)" + +createUserTable :: Database -> Utf8 -> IO () +createUserTable db tblname = do + Pact.exec_ db $ mconcat + [ "CREATE TABLE IF NOT EXISTS ", tbl tblname, " " + , "(rowkey TEXT" -- This should probably be NOT NULL, but we have no proof of that, so for now this is just kept the same as chainweb-node's implementation. + , ", txid UNSIGNED BIGINT NOT NULL" + , ", rowdata BLOB NOT NULL" + , ");" + ] + + Pact.exec_ db $ "DELETE FROM " <> tbl tblname + +createUserTableIndex :: Database -> Utf8 -> IO () +createUserTableIndex db tblname = do + inTx db $ do + Pact.exec_ db $ mconcat + [ "CREATE UNIQUE INDEX IF NOT EXISTS ", tbl (tblname <> "_rowkey_txid_unique_ix"), " ON " + , tbl tblname, " (rowkey, txid)" + ] + Pact.exec_ db $ mconcat + [ "CREATE INDEX IF NOT EXISTS ", tbl (tblname <> "_txid_ix"), " ON " + , tbl tblname, " (txid DESC)" + ] + +-- | Returns the active @(blockheight, hash, endingtxid)@ from BlockHistory +getBlockHistoryRowAt :: (Logger logger) + => logger + -> Database + -> BlockHeight + -> IO [SType] +getBlockHistoryRowAt logger db target = do + r <- Pact.qry db "SELECT blockheight, hash, endingtxid FROM BlockHistory WHERE blockheight = ?1" [SInt (int target)] [RInt, RBlob, RInt] + case r of + [row@[SInt bh, SBlob _hash, SInt _endingTxId]] -> do + unless (target == int bh) $ do + exitLog logger "BlockHeight mismatch in BlockHistory query. This is a bug in the compaction tool. Please report it on the issue tracker or discord." + pure row + _ -> do + exitLog logger "getBlockHistoryRowAt query: invalid query" + +-- | Returns active @[(tablename, blockheight)]@ from VersionedTableMutation +getVersionedTableMutationRowsAt :: (Logger logger) + => logger + -> Database + -> BlockHeight + -> IO [[SType]] +getVersionedTableMutationRowsAt logger db target = do + r <- Pact.qry db "SELECT tablename, blockheight FROM VersionedTableMutation WHERE blockheight = ?1" [SInt (int target)] [RText, RInt] + forM r $ \case + row@[SText _, SInt bh] -> do + unless (target == int bh) $ do + exitLog logger "BlockHeight mismatch in VersionedTableMutation query. This is a bug in the compaction tool. Please report it." + pure row + _ -> do + exitLog logger "getVersionedTableMutationRowsAt query: invalid query" + +tbl :: Utf8 -> Utf8 +tbl u = "[" <> u <> "]" + +locateLatestSafeTarget :: (Logger logger) + => logger + -> ChainwebVersion + -> FilePath + -> [ChainId] + -> IO BlockHeight +locateLatestSafeTarget logger v dbDir cids = do + let log = logFunctionText logger + + let logger' = set setLoggerLevel (l2l LL.Error) logger + latestCommon <- getLatestCommonBlockHeight logger' dbDir cids + earliestCommon <- getEarliestCommonBlockHeight logger' dbDir cids + + log LL.Debug $ "Latest Common BlockHeight: " <> sshow latestCommon + log LL.Debug $ "Earliest Common BlockHeight: " <> sshow earliestCommon + + -- Make sure we have at least 1k blocks of depth for prod. + -- In devnet or testing versions we don't care. + let safeDepth :: BlockHeight + safeDepth + | v == mainnet || v == testnet = BlockHeight 1_000 + | otherwise = BlockHeight 0 + + when (latestCommon - earliestCommon < safeDepth) $ do + exitLog logger "locateLatestSafeTarget: Not enough history to safely compact. Aborting." + + let target = latestCommon - safeDepth + log LL.Debug $ "Compaction target blockheight is: " <> sshow target + pure target + +exitLog :: (Logger logger) + => logger + -> Text + -> IO a +exitLog logger msg = do + logFunctionText logger LL.Error msg + exitFailure + +stepThenReset :: Lite.Statement -> IO Lite.StepResult +stepThenReset stmt = do + Lite.stepNoCB stmt `finally` (Lite.clearBindings stmt >> Lite.reset stmt) + +forChains_ :: ConcurrentChains -> [ChainId] -> (ChainId -> IO a) -> IO () +forChains_ = \case + SingleChain -> forM_ + ManyChainsAtOnce -> pooledForConcurrently_ + +throwSqlError :: IO (Either Lite.Error a) -> IO a +throwSqlError ioe = do + e <- ioe + case e of + Left err -> error (show err) + Right a -> pure a + +inTx :: Database -> IO a -> IO a +inTx db io = do + bracket_ + (Pact.exec_ db "BEGIN;") + (Pact.exec_ db "COMMIT;") + io + diff --git a/src/Chainweb/Pact/Backend/PactState.hs b/src/Chainweb/Pact/Backend/PactState.hs index 9808ffa53f..1d3d2ed0dc 100644 --- a/src/Chainweb/Pact/Backend/PactState.hs +++ b/src/Chainweb/Pact/Backend/PactState.hs @@ -1,11 +1,14 @@ +{-# LANGUAGE BangPatterns #-} {-# LANGUAGE DerivingStrategies #-} {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE ImportQualifiedPost #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE MultiWayIf #-} {-# LANGUAGE OverloadedRecordDot #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RankNTypes #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE StrictData #-} @@ -26,6 +29,8 @@ module Chainweb.Pact.Backend.PactState , getPactTables , getLatestPactStateDiffable , getLatestPactStateAt + , withLatestPactStateAt + , getLatestPactTableNamesAt , getLatestPactStateAtDiffable , getLatestBlockHeight , getEarliestBlockHeight @@ -36,7 +41,10 @@ module Chainweb.Pact.Backend.PactState , withChainDb , addChainIdLabel , doesPactDbExist + , chainDbFileName , allChains + , qryStream + , bulkInsert , PactRow(..) , PactRowContents(..) @@ -45,42 +53,41 @@ module Chainweb.Pact.Backend.PactState ) where -import Control.Exception (bracket) -import Control.Monad (forM, forM_, when) +import Chainweb.BlockHeight (BlockHeight(..)) +import Chainweb.Logger (Logger, addLabel) +import Chainweb.Pact.Backend.Types (SQLiteEnv) +import Chainweb.Pact.Backend.Utils (fromUtf8, toUtf8, withSqliteDb) +import Chainweb.Utils (T2(..), int) +import Chainweb.Version (ChainId, ChainwebVersion, chainIdToText) +import Chainweb.Version.Utils (chainIdsAt) +import Control.Exception (bracket, finally) +import Control.Monad (forM, forM_, when, void) +import Control.Monad.Except (ExceptT(..), runExceptT, throwError) import Control.Monad.IO.Class (MonadIO(liftIO)) import Control.Monad.Trans.Class (lift) -import Control.Monad.Except (ExceptT(..), runExceptT, throwError) import Data.Aeson (ToJSON(..), (.=)) import Data.Aeson qualified as Aeson -import Data.Vector (Vector) -import Data.Vector qualified as Vector import Data.ByteString (ByteString) -import Data.Int (Int64) import Data.Foldable qualified as F +import Data.Function ((&)) +import Data.Int (Int64) import Data.List qualified as List import Data.Map (Map) import Data.Map.Strict qualified as M +import Data.Set (Set) +import Data.Set qualified as Set import Data.Text (Text) import Data.Text qualified as Text import Data.Text.Encoding qualified as Text +import Database.SQLite3 qualified as SQL import Database.SQLite3.Direct (Utf8(..), Database) -import Database.SQLite3.Direct qualified as SQL - -import Chainweb.BlockHeight (BlockHeight(..)) -import Chainweb.Logger (Logger, addLabel) -import Chainweb.Pact.Backend.Types (SQLiteEnv) -import Chainweb.Pact.Backend.Utils (fromUtf8, withSqliteDb) -import Chainweb.Utils (int) -import Chainweb.Version (ChainId, ChainwebVersion, chainIdToText) -import Chainweb.Version.Utils (chainIdsAt) - -import System.Directory (doesFileExist) -import System.FilePath (()) - +import Database.SQLite3.Direct qualified as Direct import Pact.Types.SQLite (SType(..), RType(..)) import Pact.Types.SQLite qualified as Pact import Streaming.Prelude (Stream, Of) import Streaming.Prelude qualified as S +import System.Directory (doesFileExist) +import System.FilePath (()) excludedTables :: [Utf8] excludedTables = checkpointerTables ++ compactionTables @@ -148,14 +155,14 @@ withChainDb cid logger' path f = do withSqliteDb cid logger path resetDb (f logger) -- | Get all Pact table names in the database. -getPactTableNames :: Database -> IO (Vector Utf8) -getPactTableNames db = do +getPactTableNames :: Database -> Stream (Of Utf8) IO () +getPactTableNames db = eachIO $ do let sortedTableNames :: [[SType]] -> [Utf8] sortedTableNames rows = List.sortOn (Text.toLower . fromUtf8) $ flip List.map rows $ \case [SText u] -> u _ -> error "getPactTableNames.sortedTableNames: expected text" - tables <- fmap sortedTableNames $ do + fmap sortedTableNames $ do let qryText = "SELECT name FROM sqlite_schema \ \WHERE \ @@ -163,8 +170,11 @@ getPactTableNames db = do \AND \ \ name NOT LIKE 'sqlite_%'" Pact.qry db qryText [] [RText] - - pure (Vector.fromList tables) + where + eachIO :: (Foldable f) => IO (f a) -> Stream (Of a) IO () + eachIO m_xs = do + xs <- liftIO m_xs + S.each xs -- | Get all of the rows for each table. The tables will be appear sorted -- lexicographically by table name. @@ -172,59 +182,82 @@ getPactTables :: Database -> Stream (Of Table) IO () getPactTables db = do let fmtTable x = "\"" <> x <> "\"" - tables <- liftIO $ getPactTableNames db - - forM_ tables $ \tbl -> do - if tbl `notElem` excludedTables - then do - let qryText = "SELECT rowkey, rowdata, txid FROM " - <> fmtTable tbl - userRows <- liftIO $ Pact.qry db qryText [] [RText, RBlob, RInt] - shapedRows <- forM userRows $ \case - [SText (Utf8 rowKey), SBlob rowData, SInt txId] -> do - pure $ PactRow {..} - _ -> error "getPactTableNames: unexpected shape of user table row" - S.yield $ Table (fromUtf8 tbl) shapedRows - else do - pure () + getPactTableNames db + & S.filter (\tbl -> tbl `notElem` excludedTables) + & S.mapM (\tbl -> do + let qryText = "SELECT rowkey, rowdata, txid FROM " + <> fmtTable tbl + userRows <- liftIO $ Pact.qry db qryText [] [RText, RBlob, RInt] + shapedRows <- forM userRows $ \case + [SText (Utf8 rowKey), SBlob rowData, SInt txId] -> do + pure $ PactRow {..} + _ -> error "getPactTableNames: unexpected shape of user table row" + pure $ Table (fromUtf8 tbl) shapedRows + ) -- streaming SQLite step; see Pact SQLite module -stepStatement :: SQL.Statement -> [RType] -> Stream (Of [SType]) IO (Either SQL.Error ()) +stepStatement :: Direct.Statement -> [RType] -> Stream (Of [SType]) IO (Either Direct.Error ()) stepStatement stmt rts = runExceptT $ do -- todo: rename from acc - let acc :: SQL.StepResult -> ExceptT SQL.Error (Stream (Of [SType]) IO) () + let acc :: Direct.StepResult -> ExceptT Direct.Error (Stream (Of [SType]) IO) () acc = \case - SQL.Done -> do + Direct.Done -> do pure () - SQL.Row -> do + Direct.Row -> do as <- forM (List.zip [0..] rts) $ \(colIx, expectedColType) -> do liftIO $ case expectedColType of - RInt -> SInt <$> SQL.columnInt64 stmt colIx - RDouble -> SDouble <$> SQL.columnDouble stmt colIx - RText -> SText <$> SQL.columnText stmt colIx - RBlob -> SBlob <$> SQL.columnBlob stmt colIx + RInt -> SInt <$> Direct.columnInt64 stmt colIx + RDouble -> SDouble <$> Direct.columnDouble stmt colIx + RText -> SText <$> Direct.columnText stmt colIx + RBlob -> SBlob <$> Direct.columnBlob stmt colIx lift $ S.yield as - liftIO (SQL.step stmt) >>= \case + liftIO (Direct.step stmt) >>= \case Left err -> do throwError err Right sr -> do acc sr -- maybe use stepNoCB - ExceptT (liftIO (SQL.step stmt)) >>= acc + ExceptT (liftIO (Direct.step stmt)) >>= acc -- | Prepare/execute query with params; stream the results -qry :: () +qryStream :: () => Database -> Utf8 -> [SType] -> [RType] - -> (Stream (Of [SType]) IO (Either SQL.Error ()) -> IO x) + -> (Stream (Of [SType]) IO (Either Direct.Error ()) -> IO x) -> IO x -qry db qryText args returnTypes k = do - bracket (Pact.prepStmt db qryText) SQL.finalize $ \stmt -> do - Pact.bindParams stmt args +qryStream db qryText args returnTypes k = do + bracket (SQL.prepareUtf8 db qryText) Direct.finalize $ \stmt -> do + bindParams stmt args k (stepStatement stmt returnTypes) + where + bindParams :: Direct.Statement -> [SType] -> IO () + bindParams s as = forM_ (List.zip [1..] as) $ \(argIndex, arg) -> do + case arg of + SInt a -> Direct.bindInt64 s argIndex a + SDouble a -> Direct.bindDouble s argIndex a + SText a -> Direct.bindText s argIndex a + SBlob a -> Direct.bindBlob s argIndex a + +bulkInsert :: Database -> Text -> [[SType]] -> IO () +bulkInsert db tblname values = do + let oneRow rowSize = "(" <> Text.intercalate ", " (List.replicate rowSize "?") <> ")" + let allRows rows = Text.intercalate ", " (List.map (\row -> oneRow (List.length row)) rows) + case values of + [] -> do + pure () + rows -> do + let q = toUtf8 $ "INSERT INTO [" <> tblname <> "] VALUES " <> allRows rows + bracket (SQL.prepareUtf8 db q) Direct.finalize $ \s -> do + forM_ (List.zip [1..] (List.concat values)) $ \(argIndex, arg) -> do + case arg of + SInt a -> Direct.bindInt64 s argIndex a + SDouble a -> Direct.bindDouble s argIndex a + SText a -> Direct.bindText s argIndex a + SBlob a -> Direct.bindBlob s argIndex a + void $ SQL.stepNoCB s `finally` (SQL.clearBindings s >> SQL.reset s) -- | Get the latest Pact state (in a ready-to-diff form). getLatestPactStateDiffable :: Database -> Stream (Of TableDiffable) IO () @@ -262,8 +295,26 @@ getLatestPactStateAt :: () getLatestPactStateAt db bh = do endingTxId <- liftIO $ getEndingTxId db bh - tables <- liftIO $ getPactTableNames db - + getLatestPactTableNamesAt db bh + & S.mapM (\tbl -> do + let qryText = "SELECT rowkey, rowdata, txid FROM " + <> "\"" <> tbl <> "\"" + <> " WHERE txid do + let go :: Map ByteString PactRowContents -> [SType] -> Map ByteString PactRowContents + go m = \case + [SText (Utf8 rowKey), SBlob rowData, SInt txId] -> + M.insertWith (\prc1 prc2 -> if prc1.txId > prc2.txId then prc1 else prc2) rowKey (PactRowContents rowData txId) m + _ -> error "getLatestPactState: unexpected shape of user table row" + S.fold_ go M.empty id rows + pure (fromUtf8 tbl, latestState) + ) + +getLatestPactTableNamesAt :: () + => Database + -> BlockHeight + -> Stream (Of Utf8) IO () +getLatestPactTableNamesAt db bh = do tablesCreatedAfter <- liftIO $ do let qryText = "SELECT tablename FROM VersionedTableCreation WHERE createBlockheight > ?1" rows <- Pact.qry db qryText [SInt (int bh)] [RText] @@ -271,19 +322,63 @@ getLatestPactStateAt db bh = do [SText tbl] -> pure tbl _ -> error "getLatestPactStateAt.tablesCreatedAfter: expected text" - forM_ tables $ \tbl -> do - when (tbl `notElem` (excludedTables ++ tablesCreatedAfter)) $ do - let qryText = "SELECT rowkey, rowdata, txid FROM " - <> "\"" <> tbl <> "\"" - <> " WHERE txid do - let go :: Map ByteString PactRowContents -> [SType] -> Map ByteString PactRowContents - go m = \case - [SText (Utf8 rowKey), SBlob rowData, SInt txId] -> - M.insertWith (\prc1 prc2 -> if prc1.txId > prc2.txId then prc1 else prc2) rowKey (PactRowContents rowData txId) m - _ -> error "getLatestPactState: unexpected shape of user table row" - S.fold_ go M.empty id rows - S.yield (fromUtf8 tbl, latestState) + let excludeThese = excludedTables ++ tablesCreatedAfter + getPactTableNames db + & S.filter (\tbl -> tbl `notElem` excludeThese) + +-- | Use the Pact State at the given height. +withLatestPactStateAt :: () + => Database + -> BlockHeight + -> (forall r. Text -> Stream (Of (T2 Int64 PactRow)) IO r -> IO ()) + -> IO () +withLatestPactStateAt db bh withTable = do + endingTxId <- liftIO $ getEndingTxId db bh + + getLatestPactTableNamesAt db bh + & S.mapM_ (\tbl -> do + -- ❯ sqlite3 pact-v1-chain-0.sqlite 'EXPLAIN QUERY PLAN SELECT rowid, rowkey, rowdata, txid FROM [coin_coin-table] WHERE txid < 100 ORDER BY rowid DESC' + -- QUERY PLAN + -- `--SCAN coin_coin-table + + let qryText = "SELECT rowid, rowkey, txid, rowdata FROM " + <> "[" <> tbl <> "]" + <> " WHERE txid < ?1" + <> " ORDER BY rowid DESC" + + qryStream db qryText [SInt endingTxId] [RInt, RText, RInt, RBlob] $ \rows -> do + let go :: () + => Set ByteString + -> Stream (Of [SType]) IO (Either Direct.Error ()) + -> Stream (Of (T2 Int64 PactRow)) IO () + go !seen s = do + e <- liftIO (S.next s) + case e of + Left (Left sqlErr) -> do + error $ "withLatestPactStateAt: Encountered SQLite error: " <> show sqlErr + Left (Right ()) -> do + pure () + Right (row, rest) -> do + case row of + [SInt rowid, SText (Utf8 rk), SInt tid, SBlob rd] -> do + let element = T2 rowid $ PactRow + { rowKey = rk + , rowData = rd + , txId = tid + } + if | tbl == "SYS:Pacts" && rd == "null" -> do + S.yield element + go (Set.insert rk seen) rest + | rk `Set.member` seen -> do + go seen rest + | otherwise -> do + S.yield element + go (Set.insert rk seen) rest + _ -> do + error "getLatestPactState: invalid query" + + withTable (fromUtf8 tbl) (go Set.empty rows) + ) -- | A pact table - just its name and its rows. data Table = Table @@ -330,13 +425,17 @@ data PactRowContents = PactRowContents -- contains the pact db for the given ChainId. doesPactDbExist :: ChainId -> FilePath -> IO Bool doesPactDbExist cid dbDir = do - let chainDbFileName = mconcat - [ "pact-v1-chain-" - , Text.unpack (chainIdToText cid) - , ".sqlite" - ] - let file = dbDir chainDbFileName - doesFileExist file + doesFileExist (chainDbFileName cid dbDir) + +-- | Given a pact database directory, return the SQLite +-- path chainweb uses for the given ChainId. +chainDbFileName :: ChainId -> FilePath -> FilePath +chainDbFileName cid dbDir = dbDir mconcat + [ "pact-v1-chain-" + , Text.unpack (chainIdToText cid) + , ".sqlite" + ] + addChainIdLabel :: (Logger logger) => ChainId diff --git a/src/Chainweb/Pact/Backend/PactState/Diff.hs b/src/Chainweb/Pact/Backend/PactState/Diff.hs index 7930752991..103e7e1b10 100644 --- a/src/Chainweb/Pact/Backend/PactState/Diff.hs +++ b/src/Chainweb/Pact/Backend/PactState/Diff.hs @@ -22,36 +22,33 @@ module Chainweb.Pact.Backend.PactState.Diff ) where -import Data.IORef (newIORef, readIORef, atomicModifyIORef') +import Chainweb.Logger (logFunctionText, logFunctionJson) +import Chainweb.Pact.Backend.Compaction (TargetBlockHeight(..)) +import Chainweb.Pact.Backend.Compaction qualified as C +import Chainweb.Pact.Backend.PactState (TableDiffable(..), getLatestPactStateAtDiffable, getLatestPactStateDiffable, doesPactDbExist, withChainDb, allChains) +import Chainweb.Utils (fromText, toText) +import Chainweb.Version (ChainwebVersion(..), ChainId, chainIdToText) +import Chainweb.Version.Mainnet (mainnet) +import Chainweb.Version.Registry (lookupVersionByName) import Control.Monad (forM_, when, void) import Control.Monad.IO.Class (MonadIO(liftIO)) import Data.Aeson ((.=)) import Data.Aeson qualified as Aeson import Data.ByteString (ByteString) +import Data.IORef (newIORef, readIORef, atomicModifyIORef') import Data.Map (Map) import Data.Map.Merge.Strict qualified as Merge import Data.Map.Strict qualified as M import Data.Maybe (fromMaybe) import Data.Text (Text) import Data.Text qualified as Text -import Data.Text.IO qualified as Text import Data.Text.Encoding qualified as Text +import Data.Text.IO qualified as Text import Options.Applicative - -import Chainweb.Logger (logFunctionText, logFunctionJson) -import Chainweb.Utils (fromText, toText) -import Chainweb.Version (ChainwebVersion(..), ChainId, chainIdToText) -import Chainweb.Version.Mainnet (mainnet) -import Chainweb.Version.Registry (lookupVersionByName) -import Chainweb.Pact.Backend.Compaction (TargetBlockHeight(..)) -import Chainweb.Pact.Backend.Compaction qualified as C -import Chainweb.Pact.Backend.PactState (TableDiffable(..), getLatestPactStateAtDiffable, getLatestPactStateDiffable, doesPactDbExist, withChainDb, allChains) - -import System.Exit (exitFailure) -import System.LogLevel (LogLevel(..)) - import Streaming.Prelude (Stream, Of) import Streaming.Prelude qualified as S +import System.Exit (exitFailure) +import System.LogLevel (LogLevel(..)) data PactDiffConfig = PactDiffConfig { firstDbDir :: FilePath @@ -235,6 +232,6 @@ diffLatestPactState = go error "right stream longer than left" (Right (t1, next1), Right (t2, next2)) -> do when (t1.name /= t2.name) $ do - error "diffLatestPactState: mismatched table names" + error $ "diffLatestPactState: mismatched table names: " <> Text.unpack t1.name <> " vs. " <> Text.unpack t2.name S.yield (t1.name, diffTables t1 t2) go next1 next2 diff --git a/tools/cwtool/CwTool.hs b/tools/cwtool/CwTool.hs index 837b85fe07..f984871cde 100644 --- a/tools/cwtool/CwTool.hs +++ b/tools/cwtool/CwTool.hs @@ -10,6 +10,7 @@ import System.Exit import Text.Printf import Chainweb.Pact.Backend.Compaction (main) +import Chainweb.Pact.Backend.CompactionInMemory (main) import Chainweb.Pact.Backend.PactState.Diff (pactDiffMain) import Chainweb.Pact.Backend.PactState.GrandHash.Calc (pactCalcMain) import Chainweb.Pact.Backend.PactState.GrandHash.Import (pactImportMain) @@ -108,6 +109,10 @@ topLevelCommands = "compact" "Compact pact database" Chainweb.Pact.Backend.Compaction.main + , CommandSpec + "sigma-compact" + "Compaction pact database (read-only)" + Chainweb.Pact.Backend.CompactionInMemory.main , CommandSpec "pact-diff" "Diff the latest state of two pact databases"