Skip to content

Commit

Permalink
use a constant for min blockheight calculation
Browse files Browse the repository at this point in the history
  • Loading branch information
chessai committed Jul 15, 2024
1 parent 30c62e5 commit 5b44f61
Showing 1 changed file with 37 additions and 55 deletions.
92 changes: 37 additions & 55 deletions src/Chainweb/Pact/Backend/Compaction.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
, TypeApplications
#-}

{-# OPTIONS_GHC -Wwarn #-}

module Chainweb.Pact.Backend.Compaction
(
-- * Compaction executable implementation
Expand All @@ -33,57 +31,56 @@ module Chainweb.Pact.Backend.Compaction
)
where

import "base" Control.Exception hiding (Handler)
import "base" Control.Monad (forM, forM_, unless, void, when)
import "base" Control.Monad.IO.Class (MonadIO(liftIO))
import "base" Data.Function ((&))
import "base" Data.Int (Int64)
import "base" Data.Maybe (fromMaybe)
import "base" Prelude hiding (log)
import "base" System.Exit (exitFailure)
import "base" System.IO (Handle)
import "base" System.IO qualified as IO
import "chainweb-storage" Chainweb.Storage.Table (Iterator(..), Entry(..), withTableIterator, unCasify, tableInsert)
import "chainweb-storage" Chainweb.Storage.Table.RocksDB (RocksDb, withRocksDb, withReadOnlyRocksDb, modernDefaultOptions)
import "direct-sqlite" Database.SQLite3 qualified as Lite
import "direct-sqlite" Database.SQLite3.Direct (Utf8(..), Database)
import "directory" System.Directory (createDirectoryIfMissing, doesDirectoryExist)
import "filepath" System.FilePath ((</>))
import "lens" Control.Lens (set, over, (^.), _3)
import "loglevel" System.LogLevel qualified as LL
import "monad-control" Control.Monad.Trans.Control (MonadBaseControl, liftBaseOp)
import "optparse-applicative" Options.Applicative qualified as O
import "pact" Pact.Types.SQLite (SType(..), RType(..))
import "pact" Pact.Types.SQLite qualified as Pact
import "rocksdb-haskell-kadena" Database.RocksDB.Types (Options(..), Compression(..))
import "streaming" Streaming qualified as S
import "streaming" Streaming.Prelude qualified as S
import "text" Data.Text (Text)
import "text" Data.Text qualified as Text
import "unliftio" UnliftIO.Async (pooledForConcurrently_)
import "yet-another-logger" System.Logger hiding (Logger)
import "yet-another-logger" System.Logger qualified as YAL
import "yet-another-logger" System.Logger.Backend.ColorOption (useColor)
import Chainweb.BlockHeader (BlockHeader(..))
import Chainweb.BlockHeaderDB.Internal (BlockHeaderDb(..), RankedBlockHash(..), RankedBlockHeader(..))
import Chainweb.BlockHeight (BlockHeight(..))
import Chainweb.Cut.CutHashes (cutIdToText)
import Chainweb.CutDB (cutHashesTable)
import Chainweb.Graph (diameter)
import Chainweb.Logger (Logger, l2l, setComponent, logFunctionText)
import Chainweb.Pact.Backend.ChainwebPactDb ()
import Chainweb.Pact.Backend.PactState
import Chainweb.Pact.Backend.Types (SQLiteEnv)
import Chainweb.Pact.Backend.Utils (fromUtf8, toUtf8)
import Chainweb.Payload.PayloadStore (initializePayloadDb, addNewPayload, lookupPayloadWithHeight)
import Chainweb.Payload.PayloadStore.RocksDB (newPayloadDb)
import Chainweb.Storage.Table (Iterator(..), Entry(..), withTableIterator, unCasify, tableInsert)
import Chainweb.Pact.Backend.Types (SQLiteEnv)
import Chainweb.Storage.Table.RocksDB (RocksDb, withRocksDb, withReadOnlyRocksDb, modernDefaultOptions)
import Chainweb.Utils (sshow, fromText, toText, int)
import Chainweb.Version (ChainId, ChainwebVersion(..), chainIdToText, chainGraphAt)
import Chainweb.Version (ChainId, ChainwebVersion(..), chainIdToText)
import Chainweb.Version.Mainnet (mainnet)
import Chainweb.Version.Registry (lookupVersionByName)
import Chainweb.Version.Testnet (testnet)
import Chainweb.WebBlockHeaderDB (getWebBlockHeaderDb, initWebBlockHeaderDb)
import Control.Exception hiding (Handler)
import Control.Lens (set, over, (^.), _3)
import Control.Monad (forM, forM_, unless, void, when)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Control.Monad.Trans.Control (MonadBaseControl, liftBaseOp)
import Data.Function ((&))
import Data.Int (Int64)
import Data.LogMessage (SomeLogMessage, logText)
import Data.Maybe (fromMaybe)
import Data.Text (Text)
import Data.Text qualified as Text
import Database.RocksDB.Types (Options(..), Compression(..))
import Database.SQLite3 qualified as Lite
import Database.SQLite3.Direct (Utf8(..), Database)
import Options.Applicative qualified as O
import Pact.Types.SQLite (SType(..), RType(..))
import Pact.Types.SQLite qualified as Pact
import Prelude hiding (log)
import Streaming qualified as S
import Streaming.Prelude qualified as S
import System.Directory (createDirectoryIfMissing, doesDirectoryExist)
import System.FilePath ((</>))
import System.IO (Handle)
import System.IO qualified as IO
import System.Logger.Backend.ColorOption (useColor)
import UnliftIO.Async (pooledForConcurrently_)

withDefaultLogger :: LL.LogLevel -> (YAL.Logger SomeLogMessage -> IO a) -> IO a
withDefaultLogger ll f = withHandleBackend_ logText handleCfg $ \b ->
Expand Down Expand Up @@ -175,13 +172,10 @@ getConfig = do

main :: IO ()
main = do
--status <- newTVarIO Running
--installFatalSignalHandlers status [ sigHUP, sigTERM, sigXCPU, sigXFSZ ]

compact =<< getConfig

compactPactState :: (Logger logger) => logger -> ChainwebVersion -> BlockHeight -> SQLiteEnv -> SQLiteEnv -> IO ()
compactPactState logger v targetBlockHeight srcDb targetDb = do
compactPactState :: (Logger logger) => logger -> BlockHeight -> SQLiteEnv -> SQLiteEnv -> IO ()
compactPactState logger targetBlockHeight srcDb targetDb = do
let log = logFunctionText logger

-- These pragmas are tuned for fast insertion on systems with a wide range
Expand Down Expand Up @@ -239,7 +233,7 @@ compactPactState logger v targetBlockHeight srcDb targetDb = do

-- Copy over TransactionIndex.
--
-- We compact this based on the RocksDB 'minBlockHeight'.
-- We compact this based on the RocksDB 'blockHeightKeepDepth'.
-- /poll and SPV rely on having this table synchronised with RocksDB.
--
-- We should let users have the option to keep this around, as an
Expand All @@ -251,8 +245,7 @@ compactPactState logger v targetBlockHeight srcDb targetDb = do
log LL.Info "Copying over TransactionIndex"
let wholeTableQuery = "SELECT txhash, blockheight FROM TransactionIndex WHERE blockheight >= ?1 ORDER BY blockheight"

let (minBlockHeight, _) = blockHeightRange v targetBlockHeight
throwSqlError $ qryStream srcDb wholeTableQuery [SInt (int minBlockHeight)] [RBlob, RInt] $ \tblRows -> do
throwSqlError $ qryStream srcDb wholeTableQuery [SInt (int (targetBlockHeight - blockHeightKeepDepth))] [RBlob, RInt] $ \tblRows -> do
Lite.withStatement targetDb "INSERT INTO TransactionIndex VALUES (?1, ?2)" $ \stmt -> do
-- I experimented a bunch with chunk sizes, to keep transactions
-- small. As far as I can tell, there isn't really much
Expand Down Expand Up @@ -289,13 +282,6 @@ compactPactState logger v targetBlockHeight srcDb targetDb = do

log LL.Info "Compaction done"

-- TODO: this logic is brittle right now, we need to make sure
-- we have at least this amount, or abort.
--
-- TODO: make 1k/3k constants; no magic numbers. perhaps these constants
-- should be shared with locateLatestSafeTarget. The 2k can be derived
-- from that and a constant for the 3k.

-- We are trying to make sure that we keep around at least 3k blocks.
-- The compaction target is 1k blocks prior to the latest common
-- blockheight (i.e. min (map blockHeight allChains)), so we take
Expand All @@ -308,10 +294,8 @@ compactPactState logger v targetBlockHeight srcDb targetDb = do
--
-- Note that the number 3k was arbitrary but chosen to be a safe
-- amount of data more than what is in SQLite.
blockHeightRange :: ChainwebVersion -> BlockHeight -> (BlockHeight, BlockHeight)
blockHeightRange v target = (,)
(target - 2_000)
(target + 1_000 + int (diameter (chainGraphAt v target)))
blockHeightKeepDepth :: BlockHeight
blockHeightKeepDepth = 2_000

compact :: Config -> IO ()
compact cfg = do
Expand All @@ -338,11 +322,10 @@ compact cfg = do
pure targetBlockHeight

-- Trim RocksDB here
let (minBlockHeight, _) = blockHeightRange cfg.chainwebVersion targetBlockHeight
withRocksDbFileLogger cfg.logDir LL.Debug $ \logger -> do
withReadOnlyRocksDb (rocksDir cfg.fromDir) modernDefaultOptions $ \srcRocksDb -> do
withRocksDb (rocksDir cfg.toDir) (modernDefaultOptions { compression = NoCompression }) $ \targetRocksDb -> do
trimRocksDb (set setLoggerLevel (l2l LL.Info) logger) cfg.chainwebVersion cids minBlockHeight srcRocksDb targetRocksDb
trimRocksDb (set setLoggerLevel (l2l LL.Info) logger) cfg.chainwebVersion cids (targetBlockHeight - blockHeightKeepDepth) srcRocksDb targetRocksDb

-- Shallow Nodes notes:
-- - Users should bring up new nodes if they need maximal uptime
Expand All @@ -351,7 +334,7 @@ compact cfg = do
withPerChainFileLogger cfg.logDir cid LL.Debug $ \logger -> do
withChainDb cid logger (pactDir cfg.fromDir) $ \_ srcDb -> do
withChainDb cid logger (pactDir cfg.toDir) $ \_ targetDb -> do
compactPactState logger cfg.chainwebVersion targetBlockHeight srcDb targetDb
compactPactState logger targetBlockHeight srcDb targetDb

compactTable :: (Logger logger)
=> logger -- ^ logger
Expand Down Expand Up @@ -608,7 +591,6 @@ locateLatestSafeTarget logger v dbDir cids = do
-- In devnet or testing versions we don't care.
let safeDepth :: BlockHeight
safeDepth
-- TODO: Lars thinks 1_000 is too conservative
| v == mainnet || v == testnet = BlockHeight 1_000
| otherwise = BlockHeight 0

Expand Down

0 comments on commit 5b44f61

Please sign in to comment.