From c55994b4654d807991c67e19ea8c37cfe04c7753 Mon Sep 17 00:00:00 2001 From: Lars Kuhtz Date: Mon, 3 Aug 2020 01:55:22 -0700 Subject: [PATCH 1/9] More efficient and correct database pruning --- chainweb.cabal | 2 +- src/Chainweb/BlockHeaderDB/PruneForks.hs | 385 +++++++++++++++-------- src/Chainweb/Chainweb/ChainResources.hs | 23 +- 3 files changed, 262 insertions(+), 148 deletions(-) diff --git a/chainweb.cabal b/chainweb.cabal index 378d29b5c8..a3f7a57632 100644 --- a/chainweb.cabal +++ b/chainweb.cabal @@ -318,7 +318,6 @@ library , base64-bytestring >= 1.0 && < 1.1 -- due to change in error message that is inlined in pact , binary >= 0.8 - , bloomfilter >= 2.0 , bytes >= 0.15 , bytestring >= 0.10 , case-insensitive >= 1.2 @@ -330,6 +329,7 @@ library , connection >=0.2 , containers >= 0.5 , cryptonite >= 0.25 + , cuckoo >= 0.2 , data-default >=0.7 , data-dword >= 0.3 , deepseq >= 1.4 diff --git a/src/Chainweb/BlockHeaderDB/PruneForks.hs b/src/Chainweb/BlockHeaderDB/PruneForks.hs index 5be305c3fd..5c266c7ebf 100644 --- a/src/Chainweb/BlockHeaderDB/PruneForks.hs +++ b/src/Chainweb/BlockHeaderDB/PruneForks.hs @@ -1,7 +1,14 @@ -{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE BangPatterns #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE LambdaCase #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE PolyKinds #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TypeApplications #-} -- | -- Module: Chainweb.BlockHeaderDB.PruneForks @@ -13,21 +20,32 @@ -- Prune old forks from BlockHeader DB -- module Chainweb.BlockHeaderDB.PruneForks -( pruneForks +( pruneForksLogg +, pruneForks , pruneForks_ + +-- * Mark and sweep GC for payloads +, mkFilter +, markPayload ) where +import Chainweb.MerkleLogHash + import Control.Monad -import Control.Monad.ST +import Control.Monad.Catch +import Control.Monad.IO.Class -import qualified Data.BloomFilter.Easy as BF (suggestSizing) -import qualified Data.BloomFilter.Hash as BF -import qualified Data.BloomFilter.Mutable as BF +import Data.Aeson +import qualified Data.ByteArray as BA +import Data.Coerce +import Data.Cuckoo import Data.Function -import qualified Data.HashSet as HS +import qualified Data.List as L import Data.Maybe import Data.Semigroup +import Foreign.Ptr + import Numeric.Natural import Prelude hiding (lookup) @@ -41,7 +59,11 @@ import System.LogLevel import Chainweb.BlockHash import Chainweb.BlockHeader import Chainweb.BlockHeaderDB.Internal +import Chainweb.BlockHeight +import Chainweb.ChainId import Chainweb.Logger +import Chainweb.Payload +import Chainweb.Payload.PayloadStore import Chainweb.TreeDB import Chainweb.Utils hiding (Codec) @@ -50,7 +72,32 @@ import Data.CAS.RocksDB import Data.LogMessage -- -------------------------------------------------------------------------- -- --- Prune Old Forks +-- Chain Database Pruning + +pruneForksLogg + :: Logger logger + => logger + -> BlockHeaderDb + -> Natural + -- ^ The depth at which deletion starts. Note, that the max rank isn't + -- necessarly included in the current best cut. So one, should choose a + -- depth for which one is confident that all forks are resolved. + + -> (BlockHeader -> IO ()) + -- ^ Deletion call back. This hook is called /after/ the entry is + -- deleted from the database. It's main purpose is to delete any + -- resources that were related to the deleted header and that are not + -- needed any more. + + -> (BlockPayloadHash -> IO ()) + -- ^ Deletion call back. This hook is called /after/ the entry is + -- deleted from the database. It's main purpose is to delete any + -- resources that were related to the deleted header and that are not + -- needed any more. + -> IO Int +pruneForksLogg logger = pruneForks logg + where + logg = logFunctionText (setComponent "ChainDatabasePrunning" logger) -- | Prunes most block headers and block payloads from forks that are older than -- the given number of blocks. @@ -69,152 +116,226 @@ import Data.LogMessage -- This doesn't update the the cut db or the payload db. -- pruneForks - :: Logger logger - => logger + :: LogFunctionText -> BlockHeaderDb -> Natural - -- ^ The depth at which the roots are collected for marking block - -- headers that are kept. Any fork that was active that the given depth - -- is kept. - -- - -- The depth is computed based on the entry of maximum rank. This entry - -- is not necessarily included in the overall consensus of the chainweb. - -- In particular the the higest ranking entry is not necessarily the - -- entry of largest POW weight. - -- - -- Usually this number would be defined in terms of the chainweb - -- diameter and/or the difficulty adjustment window. - -> (BlockHeader -> Bool -> IO ()) + -- ^ The depth at which deletion starts. Note, that the max rank isn't + -- necessarly included in the current best cut. So one, should choose a + -- depth for which one is confident that all forks are resolved. + + -> (BlockHeader -> IO ()) -- ^ Deletion call back. This hook is called /after/ the entry is -- deleted from the database. It's main purpose is to delete any -- resources that were related to the deleted header and that are not - -- needed any more. The Boolean argument indicates whether the payload - -- of the block is shared with any block header that isn't marked for - -- deletion. - -> IO Int -pruneForks logger = pruneForks_ logg - where - logg = logFunctionText (setComponent "pact-tx-replay" logger) + -- needed any more. + -> (BlockPayloadHash -> IO ()) + -- ^ Deletion call back. This hook is called /after/ the entry is + -- deleted from the database. It's main purpose is to delete any + -- resources that were related to the deleted header and that are not + -- needed any more. + -> IO Int +pruneForks logg cdb depth headerCallback payloadCallback = do + hdr <- maxEntry cdb + let mar = MaxRank $ Max $ int (_blockHeight hdr) - depth + pruneForks_ logg cdb mar (MinRank $ Min 0) headerCallback payloadCallback +-- | TODO add option to also validate the block headers +-- pruneForks_ :: LogFunctionText -> BlockHeaderDb - -> Natural - -> (BlockHeader -> Bool -> IO ()) + -> MaxRank + -> MinRank + -> (BlockHeader -> IO ()) + -> (BlockPayloadHash -> IO ()) -> IO Int -pruneForks_ logg cdb limit callback = do +pruneForks_ logg cdb mar mir hdrCallback payloadCallback = do - -- find all roots at \(maxEntry - limit\) - -- - m <- maxRank cdb - let rootHeight = m - min m limit - - !roots <- keys cdb Nothing Nothing - (Just $ MinRank $ Min rootHeight) - Nothing - -- include all block headers in the root set, so that all headers - -- are included in the marking phase. PayloadBlockHashes can be - -- shared between blocks at different height, so we must make sure - -- that they are retained if a root depends on them. - streamToHashSet_ - - -- Bloom filter for marking block headers - -- - let s = max (int rootHeight + 10 * HS.size roots) (int rootHeight * 2) - let (size, hashNum) = BF.suggestSizing s 0.0001 - !marked <- stToIO $ BF.new (BF.cheapHashes hashNum) size + !pivots <- entries cdb Nothing Nothing + (Just $ MinRank $ Min $ _getMaxRank mar) + (Just mar) + S.toList_ - -- Bloom filter for marking block payload hashes - -- - -- Payloads can be shared between different blocks. Thus, to be on the safe - -- side, we are not deleting any marked payload. - -- - -- Note that we could use cuckoo hashes or counting bloom filters to do - -- reference counting and collect more unreferenced payloads. - -- - -- TODO: would it be better to use a single shared filter? - -- - let (psize, phashNum) = BF.suggestSizing s 0.0001 - !markedPayloads <- stToIO $ BF.new (BF.cheapHashes phashNum) psize + when (null pivots) $ do + logg Warn + $ "Skipping database pruning because of an empty set of block headers at upper pruning bound " <> sshow mar + <> ". This would otherwise delete the complete database." - -- Iterate backwards and mark all predecessors of the roots - -- - void $ branchEntries cdb Nothing Nothing Nothing Nothing mempty (HS.map UpperBound roots) - $ S.mapM_ $ \h -> stToIO $ do - BF.insert marked $ runPut $ encodeBlockHash $ _blockHash h - BF.insert markedPayloads - $ runPut $ encodeBlockPayloadHash $ _blockPayloadHash h - - -- Iterate forward and delete all non-marked block headers that are not a - -- predecessor of a block header that is kept. + withReverseHeaderStream cdb (mar - 1) mir $ + S.foldM_ go (return (pivots, [], 0)) (\(_, _, n) -> return n) + where + + go + :: ([BlockHeader], [BlockPayloadHash], Int) + -> BlockHeader + -> IO ([BlockHeader], [BlockPayloadHash], Int) + + go ([], _, _) _ = error "impossible" -- FIXME + + -- Note that almost always `pivots` is a singleton list and `bs` is empty. + -- Also `payloads` almost always empty. -- - entries cdb Nothing Nothing Nothing (Just $ MaxRank $ Max rootHeight) - $ S.foldM_ (go marked markedPayloads) (return (mempty, 0)) (return . snd) + go (!pivots, !payloads, !n) cur = do + + -- Sanity Check: make sure didn't miss the pivot: + -- + when (_blockHeight cur + 1 < maximum (fmap _blockHeight pivots)) $ do + let pivot = head pivots + throwM + $ TreeDbAncestorMissing @BlockHeaderDb pivot (int (_blockHeight cur)) + $ "Corrupted chain database for chain " <> toText (_chainId cdb) + <> ". The chain db must be deleted and re-resynchronized." + + -- FIXME: try to repair the database by fetching the missing + -- block from remote peers? + -- + -- It's probably the best to write an independent repair + -- program or module + + case L.partition (\p -> _blockHash cur == _blockParent p) pivots of + + -- Delete element + ([], _) -> do + deleteHdr cur + return (pivots, _blockPayloadHash cur : payloads, n+1) + + -- We've got a new pivot. This case happens almost always. + -- + (_, bs) -> do + -- TODO: add intrinsic and inductive valiation? + + let newPivots = cur : bs + + -- When after adding this pivot all pivots have the same block + -- height we can delete the pending payloads, since we've seen + -- the payloads of all pivots down to the current height. + -- + -- This check is fast when bs is empty. + -- + when (all (((==) `on` _blockHeight) cur) bs) $ + mapM_ deletePayload (payloads L.\\ fmap _blockPayloadHash newPivots) + + return (newPivots, [], n) + + deleteHdr k = do + -- TODO: make this atomic (create boilerplate to combine queries for + -- different tables) + casDelete (_chainDbCas cdb) (casKey $ RankedBlockHeader k) + tableDelete (_chainDbRankTable cdb) (_blockHash k) + logg Debug + $ "pruned block header " <> encodeToText (_blockHash k) + <> " at height " <> sshow (_blockHeight k) + hdrCallback k + + deletePayload p = do + logg Debug $ "call payload pruning callback for hash: " <> encodeToText p + payloadCallback p + +-- -------------------------------------------------------------------------- -- +-- Utils +-- TODO: provide this function in chainweb-storage: +-- +-- Returns the stream of key-value pairs of an 'RocksDbTableIter' in reverse +-- order. +-- +-- The iterator must be released after the stream is consumed. Releasing the +-- iterator to early while the stream is still in use results in a runtime +-- error. Not releasing the iterator after the processing of the stream has +-- finished results in a memory leak. +-- +iterToReverseValueStream :: RocksDbTableIter k v -> S.Stream (S.Of v) IO () +iterToReverseValueStream it = liftIO (tableIterValue it) >>= \case + Nothing -> return () + Just x -> S.yield x >> liftIO (tableIterPrev it) >> iterToReverseValueStream it +{-# INLINE iterToReverseValueStream #-} + +withReverseHeaderStream + :: BlockHeaderDb + -> MaxRank + -> MinRank + -> (S.Stream (S.Of BlockHeader) IO () -> IO a) + -> IO a +withReverseHeaderStream db mar mir inner = withTableIter headerTbl $ \it -> do + tableIterSeek it $ RankedBlockHash (BlockHeight $ int $ _getMaxRank mar + 1) nullBlockHash + tableIterPrev it + inner $ iterToReverseValueStream it + & S.map _getRankedBlockHeader + & S.takeWhile (\a -> int (_blockHeight a) >= mir) + where + headerTbl = _chainDbCas db +{-# INLINE withReverseHeaderStream #-} + +-- -------------------------------------------------------------------------- -- +-- Mark and sweep GC for Payloads +-- + +newtype GcHash = GcHash MerkleLogHash + deriving newtype (Show, ToJSON) + +instance CuckooFilterHash GcHash where + cuckooHash (Salt s) (GcHash a) = fnv1a_bytes s a -- $ BA.takeView a 32 + cuckooFingerprint (Salt s) (GcHash a) = sip_bytes s a -- s $ BA.takeView a 32 + {-# INLINE cuckooHash #-} + {-# INLINE cuckooFingerprint #-} + +gcHash :: Coercible a MerkleLogHash => a -> GcHash +gcHash = coerce +{-# INLINE gcHash #-} + +type Filter = CuckooFilterIO 4 8 GcHash + +mkFilter :: IO Filter +mkFilter = do + newCuckooFilter 0 80000000 + -- 100 items per block (as of summer 2020) + -- TODO: should depend on current block height + +markPayload + :: HasCasLookupConstraint cas BlockPayload + => PayloadDb cas + -> Filter + -> BlockPayloadHash + -> IO () +markPayload db cf h = do + casLookup pdb h >>= \case + Nothing -> error "corrupted database: payload not found" + Just payload -> do + tryInsert "payload hash" (gcHash $ _blockPayloadPayloadHash payload) + tryInsert "transactions hash" (gcHash $ _blockPayloadTransactionsHash payload) + tryInsert "outputs hash" (gcHash $ _blockPayloadOutputsHash payload) where + tryInsert k a = do + -- inserting a large number of equal elements causes the filter to fail. + m <- member cf a + if m + then + print $ "member found for " <> k <> ": " <> encodeToText a + else + unlessM (insert cf a) $ + error "failed to insert item in cuckoo filter: increase the size of the filter and try again" - -- The falsePositiveSet collects all deleted nodes that are known to be - -- false positives. + pdb = _transactionDbBlockPayloads $ _transactionDb db + + -- Tables -- - -- We know that a node is false positive when it is in the filter but any of - -- its ancestors is neither in the filter nor in the falsePositiveSet. - -- (Because for a true positive all ancestors are in the filter.) We also - -- know that a node is false positive if it's payload isn't marked, because - -- we mark the payload of all nodes that are true positives. + -- BlockPayloadStore - BlockPayload: + -- *BlockPayloadHash, BlockTransactionsHash, BlockOutputsHash -- - -- Note that this doesn't capture all false positives, but only those that - -- are not connected to the main chain. + -- BlockTransactionStore - BlockTransactions: + -- *BlockTransactionsHash, Vector Transactions, MinerData -- - -- Nodes that are known to be false positives are safe to remove, if also - -- all of its successors are removed. + -- BlockOutputsStore - BlockOutputs: + -- *BlockOutputsHash, Vector TransactionOutput, CoinbaseOutput -- - -- Nodes that are known to be false postives must be removed, if any of their - -- predecessors got removed or if their payload got removed. + -- TransactionTreeStore - TransactionTree: + -- *BlockTransactionsHash, MerkleTree + -- + -- OutputTreeStore - OutputTree + -- *BlockOutputsHash, MerkleTree -- - go marked markedPayloads (!falsePositiveSet, !i) !h = do - let k = runPut $ encodeBlockHash $ _blockHash h - let p = runPut $ encodeBlockHash $ _blockParent h - isMarked <- stToIO $ BF.elem k marked - - let payloadHashBytes = runPut $ encodeBlockPayloadHash $ _blockPayloadHash h - isPayloadMarked <- stToIO $ BF.elem payloadHashBytes markedPayloads - - -- Delete nodes not in the filter - if not isMarked - then do - deleteKey h isPayloadMarked - return (falsePositiveSet, succ i) - else do - -- Delete nodes which parent isn't in the filter or is in the - -- falsePositiveSet - parentIsMarked <- stToIO $ BF.elem p marked - if not (isGenesisBlockHeader h) && (not parentIsMarked || HS.member p falsePositiveSet || not isPayloadMarked) - then do - -- We know that this a false positive. We keep track of this for - -- future reference. - -- - -- TODO: consider using cuckoo filters because entries can be - -- deleted from them. So we wouldn't need to keep track of - -- deleted falsePositives. However, with cuckoo filters we'd - -- have to re-hash or keep track of failing inserts. - deleteKey h isPayloadMarked - return (HS.insert k falsePositiveSet, succ i) - else - -- The key is either - -- 1. in the chain or - -- 2. is a false positive that has a payload and is connected to - -- the chain (i.e. has all of its predecessors). - -- - -- We accept a small number of nodes of the second case. - -- - return (falsePositiveSet, i) - deleteKey h isPayloadMarked = do - -- TODO: make this atomic (create boilerplate to combine queries for - -- different tables) - casDelete (_chainDbCas cdb) (casKey $ RankedBlockHeader h) - tableDelete (_chainDbRankTable cdb) (_blockHash h) - logg Debug $ "deleted block header at height " <> sshow (_blockHeight h) <> " with payload mark " <> sshow isPayloadMarked - callback h isPayloadMarked + -- 1. Delete payloads hashes + -- 2. do payload mark and sweep gc after pruning all chain databases diff --git a/src/Chainweb/Chainweb/ChainResources.hs b/src/Chainweb/Chainweb/ChainResources.hs index 35f20264e3..78349c9129 100644 --- a/src/Chainweb/Chainweb/ChainResources.hs +++ b/src/Chainweb/Chainweb/ChainResources.hs @@ -135,23 +135,16 @@ withChainResources -- prune block header db when prune $ do logg Info "start pruning block header database" - x <- pruneForks logger cdb (diam * 3) $ \_h _payloadInUse -> - - -- FIXME At the time of writing his payload hashes are not - -- unique. The pruning algorithm can handle non-uniquness - -- between within a chain between forks, but not accross - -- chains. Also cas-deletion is sound for payload hashes if - -- outputs are unique for payload hashes. - -- - -- Renable this code once pact + x <- pruneForksLogg logger cdb (diam * 3) (\_ -> return ()) (\_ -> return ()) + -- FIXME The pruning algorithm can handle non-uniquness + -- between forks on the same chain, but not accross chains. + -- Payload hashes are unique between chains except for + -- genesis blocks. -- - -- includes the parent hash into the coinbase hash, - -- includes the transaction hash into the respective output hash, and - -- guarantees that transaction hashes are unique. + -- However, it's not clear if uniquness holds for all + -- components of the payloads. Presumably they are unique + -- for all but transaction hashes of empty blocks. -- - -- unless payloadInUse - -- $ casDelete payloadDb (_blockPayloadHash h) - return () logg Info $ "finished pruning block header database. Deleted " <> sshow x <> " block headers." let pex = pes requestQ putMVar pexMv pex From 761ffac67b18b45f9d82a984e7ac6ae91b74d190 Mon Sep 17 00:00:00 2001 From: Lars Kuhtz Date: Mon, 3 Aug 2020 22:55:26 -0700 Subject: [PATCH 2/9] implement paylaod gc --- chainweb.cabal | 1 + src/Chainweb/BlockHeaderDB/PruneForks.hs | 125 +++----------------- src/Chainweb/Chainweb.hs | 18 ++- src/Chainweb/Chainweb/ChainResources.hs | 23 +--- src/Chainweb/Chainweb/PruneChainDatabase.hs | Bin 0 -> 13826 bytes 5 files changed, 35 insertions(+), 132 deletions(-) create mode 100644 src/Chainweb/Chainweb/PruneChainDatabase.hs diff --git a/chainweb.cabal b/chainweb.cabal index a3f7a57632..373449eebd 100644 --- a/chainweb.cabal +++ b/chainweb.cabal @@ -158,6 +158,7 @@ library , Chainweb.Chainweb.CutResources , Chainweb.Chainweb.MinerResources , Chainweb.Chainweb.PeerResources + , Chainweb.Chainweb.PruneChainDatabase , Chainweb.Counter , Chainweb.Crypto.MerkleLog , Chainweb.Cut diff --git a/src/Chainweb/BlockHeaderDB/PruneForks.hs b/src/Chainweb/BlockHeaderDB/PruneForks.hs index 5c266c7ebf..aacf34ad55 100644 --- a/src/Chainweb/BlockHeaderDB/PruneForks.hs +++ b/src/Chainweb/BlockHeaderDB/PruneForks.hs @@ -23,29 +23,17 @@ module Chainweb.BlockHeaderDB.PruneForks ( pruneForksLogg , pruneForks , pruneForks_ - --- * Mark and sweep GC for payloads -, mkFilter -, markPayload ) where -import Chainweb.MerkleLogHash - import Control.Monad import Control.Monad.Catch import Control.Monad.IO.Class -import Data.Aeson -import qualified Data.ByteArray as BA -import Data.Coerce -import Data.Cuckoo import Data.Function import qualified Data.List as L import Data.Maybe import Data.Semigroup -import Foreign.Ptr - import Numeric.Natural import Prelude hiding (lookup) @@ -62,8 +50,6 @@ import Chainweb.BlockHeaderDB.Internal import Chainweb.BlockHeight import Chainweb.ChainId import Chainweb.Logger -import Chainweb.Payload -import Chainweb.Payload.PayloadStore import Chainweb.TreeDB import Chainweb.Utils hiding (Codec) @@ -83,13 +69,13 @@ pruneForksLogg -- necessarly included in the current best cut. So one, should choose a -- depth for which one is confident that all forks are resolved. - -> (BlockHeader -> IO ()) + -> (Bool -> BlockHeader -> IO ()) -- ^ Deletion call back. This hook is called /after/ the entry is -- deleted from the database. It's main purpose is to delete any -- resources that were related to the deleted header and that are not -- needed any more. - -> (BlockPayloadHash -> IO ()) + -> (Bool -> BlockPayloadHash -> IO ()) -- ^ Deletion call back. This hook is called /after/ the entry is -- deleted from the database. It's main purpose is to delete any -- resources that were related to the deleted header and that are not @@ -108,12 +94,9 @@ pruneForksLogg logger = pruneForks logg -- This function doesn't guarantee to delete all blocks on forks. A small number -- of fork blocks may not get deleted. -- --- The function takes a callback that is invoked on each deleted block header. --- The callback takes a parameter that indicates whether the block payload hash --- is shared with any non-deleted block header. There is a small rate of false --- positives of block payload hashes that are marked in use. --- --- This doesn't update the the cut db or the payload db. +-- The function takes callbacks that are invoked on each block header and +-- paylaod hash. The callback takes a parameter that indicates whether the +-- related block is pruned or not. -- pruneForks :: LogFunctionText @@ -123,13 +106,13 @@ pruneForks -- necessarly included in the current best cut. So one, should choose a -- depth for which one is confident that all forks are resolved. - -> (BlockHeader -> IO ()) + -> (Bool -> BlockHeader -> IO ()) -- ^ Deletion call back. This hook is called /after/ the entry is -- deleted from the database. It's main purpose is to delete any -- resources that were related to the deleted header and that are not -- needed any more. - -> (BlockPayloadHash -> IO ()) + -> (Bool -> BlockPayloadHash -> IO ()) -- ^ Deletion call back. This hook is called /after/ the entry is -- deleted from the database. It's main purpose is to delete any -- resources that were related to the deleted header and that are not @@ -147,8 +130,8 @@ pruneForks_ -> BlockHeaderDb -> MaxRank -> MinRank - -> (BlockHeader -> IO ()) - -> (BlockPayloadHash -> IO ()) + -> (Bool -> BlockHeader -> IO ()) + -> (Bool -> BlockPayloadHash -> IO ()) -> IO Int pruneForks_ logg cdb mar mir hdrCallback payloadCallback = do @@ -198,6 +181,7 @@ pruneForks_ logg cdb mar mir hdrCallback payloadCallback = do -- Delete element ([], _) -> do deleteHdr cur + hdrCallback True cur return (pivots, _blockPayloadHash cur : payloads, n+1) -- We've got a new pivot. This case happens almost always. @@ -213,9 +197,15 @@ pruneForks_ logg cdb mar mir hdrCallback payloadCallback = do -- -- This check is fast when bs is empty. -- - when (all (((==) `on` _blockHeight) cur) bs) $ - mapM_ deletePayload (payloads L.\\ fmap _blockPayloadHash newPivots) - + when (all (((==) `on` _blockHeight) cur) bs) $ do + let pivotPayloads = _blockPayloadHash <$> newPivots + forM_ (_blockPayloadHash cur : payloads) $ \p -> do + let del = not $ elem p pivotPayloads + when del $ logg Debug + $ "call payload callback for deleted payload: " <> encodeToText p + payloadCallback del p + + hdrCallback False cur return (newPivots, [], n) deleteHdr k = do @@ -226,11 +216,6 @@ pruneForks_ logg cdb mar mir hdrCallback payloadCallback = do logg Debug $ "pruned block header " <> encodeToText (_blockHash k) <> " at height " <> sshow (_blockHeight k) - hdrCallback k - - deletePayload p = do - logg Debug $ "call payload pruning callback for hash: " <> encodeToText p - payloadCallback p -- -------------------------------------------------------------------------- -- -- Utils @@ -267,75 +252,3 @@ withReverseHeaderStream db mar mir inner = withTableIter headerTbl $ \it -> do headerTbl = _chainDbCas db {-# INLINE withReverseHeaderStream #-} --- -------------------------------------------------------------------------- -- --- Mark and sweep GC for Payloads --- - -newtype GcHash = GcHash MerkleLogHash - deriving newtype (Show, ToJSON) - -instance CuckooFilterHash GcHash where - cuckooHash (Salt s) (GcHash a) = fnv1a_bytes s a -- $ BA.takeView a 32 - cuckooFingerprint (Salt s) (GcHash a) = sip_bytes s a -- s $ BA.takeView a 32 - {-# INLINE cuckooHash #-} - {-# INLINE cuckooFingerprint #-} - -gcHash :: Coercible a MerkleLogHash => a -> GcHash -gcHash = coerce -{-# INLINE gcHash #-} - -type Filter = CuckooFilterIO 4 8 GcHash - -mkFilter :: IO Filter -mkFilter = do - newCuckooFilter 0 80000000 - -- 100 items per block (as of summer 2020) - -- TODO: should depend on current block height - -markPayload - :: HasCasLookupConstraint cas BlockPayload - => PayloadDb cas - -> Filter - -> BlockPayloadHash - -> IO () -markPayload db cf h = do - casLookup pdb h >>= \case - Nothing -> error "corrupted database: payload not found" - Just payload -> do - tryInsert "payload hash" (gcHash $ _blockPayloadPayloadHash payload) - tryInsert "transactions hash" (gcHash $ _blockPayloadTransactionsHash payload) - tryInsert "outputs hash" (gcHash $ _blockPayloadOutputsHash payload) - where - tryInsert k a = do - -- inserting a large number of equal elements causes the filter to fail. - m <- member cf a - if m - then - print $ "member found for " <> k <> ": " <> encodeToText a - else - unlessM (insert cf a) $ - error "failed to insert item in cuckoo filter: increase the size of the filter and try again" - - pdb = _transactionDbBlockPayloads $ _transactionDb db - - -- Tables - -- - -- BlockPayloadStore - BlockPayload: - -- *BlockPayloadHash, BlockTransactionsHash, BlockOutputsHash - -- - -- BlockTransactionStore - BlockTransactions: - -- *BlockTransactionsHash, Vector Transactions, MinerData - -- - -- BlockOutputsStore - BlockOutputs: - -- *BlockOutputsHash, Vector TransactionOutput, CoinbaseOutput - -- - -- TransactionTreeStore - TransactionTree: - -- *BlockTransactionsHash, MerkleTree - -- - -- OutputTreeStore - OutputTree - -- *BlockOutputsHash, MerkleTree - -- - - - -- 1. Delete payloads hashes - -- 2. do payload mark and sweep gc after pruning all chain databases diff --git a/src/Chainweb/Chainweb.hs b/src/Chainweb/Chainweb.hs index d7e8a0562f..127245d99d 100644 --- a/src/Chainweb/Chainweb.hs +++ b/src/Chainweb/Chainweb.hs @@ -158,10 +158,6 @@ import System.LogLevel -- internal modules -import qualified Pact.Types.ChainId as P -import qualified Pact.Types.ChainMeta as P -import qualified Pact.Types.Command as P - import Chainweb.BlockHeader import Chainweb.BlockHeaderDB (BlockHeaderDb) import Chainweb.BlockHeight @@ -170,6 +166,7 @@ import Chainweb.Chainweb.ChainResources import Chainweb.Chainweb.CutResources import Chainweb.Chainweb.MinerResources import Chainweb.Chainweb.PeerResources +import Chainweb.Chainweb.PruneChainDatabase import Chainweb.Cut import Chainweb.CutDB import Chainweb.HostAddress @@ -202,6 +199,10 @@ import P2P.Node.Configuration import P2P.Node.PeerDB (PeerDb) import P2P.Peer +import qualified Pact.Types.ChainId as P +import qualified Pact.Types.ChainMeta as P +import qualified Pact.Types.Command as P + -- -------------------------------------------------------------------------- -- -- TransactionIndexConfig @@ -571,14 +572,19 @@ withChainwebInternal -> (forall cas' . PayloadCasLookup cas' => Chainweb logger cas' -> IO a) -> IO a withChainwebInternal conf logger peer rocksDb dbDir nodeid resetDb inner = do + + -- TODO distinguish between "prune: headers" and "prune: full" + when prune $ fullGc (setComponent "database-gc" logger) rocksDb v + initializePayloadDb v payloadDb + concurrentWith -- initialize chains concurrently (\cid -> do let mcfg = validatingMempoolConfig cid v (_configBlockGasLimit conf) withChainResources v cid rocksDb peer (chainLogger cid) - mcfg payloadDb prune dbDir nodeid - pactConfig) + mcfg payloadDb dbDir nodeid pactConfig + ) -- initialize global resources after all chain resources are initialized (\cs -> global (HM.fromList $ zip cidsList cs)) diff --git a/src/Chainweb/Chainweb/ChainResources.hs b/src/Chainweb/Chainweb/ChainResources.hs index 78349c9129..75d6533da7 100644 --- a/src/Chainweb/Chainweb/ChainResources.hs +++ b/src/Chainweb/Chainweb/ChainResources.hs @@ -51,8 +51,8 @@ import System.LogLevel -- internal modules import Chainweb.BlockHeaderDB -import Chainweb.BlockHeight import Chainweb.BlockHeaderDB.PruneForks +import Chainweb.BlockHeight import Chainweb.ChainId import Chainweb.Chainweb.PeerResources import Chainweb.Graph @@ -68,6 +68,7 @@ import Chainweb.NodeId import Chainweb.Pact.Service.PactInProcApi import Chainweb.Pact.Service.Types import Chainweb.Payload.PayloadStore +import Chainweb.Payload.PayloadStore.RocksDB import Chainweb.RestAPI.NetworkID import Chainweb.Transaction import Chainweb.Utils @@ -115,8 +116,6 @@ withChainResources -> logger -> (MVar PactExecutionService -> Mempool.InMemConfig ChainwebTransaction) -> PayloadDb cas - -> Bool - -- ^ whether to prune the chain database -> Maybe FilePath -- ^ database directory for checkpointer -> Maybe NodeId @@ -124,7 +123,7 @@ withChainResources -> (ChainResources logger -> IO a) -> IO a withChainResources - v cid rdb peer logger mempoolCfg0 payloadDb prune dbDir nodeid pactConfig inner = + v cid rdb peer logger mempoolCfg0 payloadDb dbDir nodeid pactConfig inner = withBlockHeaderDb rdb v cid $ \cdb -> do pexMv <- newEmptyMVar let mempoolCfg = mempoolCfg0 pexMv @@ -132,20 +131,6 @@ withChainResources mpc <- MPCon.mkMempoolConsensus mempool cdb $ Just payloadDb withPactService v cid (setComponent "pact" logger) mpc cdb payloadDb dbDir nodeid pactConfig $ \requestQ -> do - -- prune block header db - when prune $ do - logg Info "start pruning block header database" - x <- pruneForksLogg logger cdb (diam * 3) (\_ -> return ()) (\_ -> return ()) - -- FIXME The pruning algorithm can handle non-uniquness - -- between forks on the same chain, but not accross chains. - -- Payload hashes are unique between chains except for - -- genesis blocks. - -- - -- However, it's not clear if uniquness holds for all - -- components of the payloads. Presumably they are unique - -- for all but transaction hashes of empty blocks. - -- - logg Info $ "finished pruning block header database. Deleted " <> sshow x <> " block headers." let pex = pes requestQ putMVar pexMv pex @@ -158,8 +143,6 @@ withChainResources , _chainResPact = pex } where - logg = logFunctionText (setComponent "pact-tx-replay" logger) - diam = diameter (_chainGraph (v, maxBound @BlockHeight)) pes requestQ = case v of Test{} -> emptyPactExecutionService TimedConsensus{} -> emptyPactExecutionService diff --git a/src/Chainweb/Chainweb/PruneChainDatabase.hs b/src/Chainweb/Chainweb/PruneChainDatabase.hs new file mode 100644 index 0000000000000000000000000000000000000000..4f41b1600a800ddb6d99a5029e53853833d483c1 GIT binary patch literal 13826 zcmdT~+j1MnncizY#b1hQ1vnIB*?W;#)bB^SqU&i-(G_KlpH)_j*0X(PXS`nQ9> zj$YY1yUU8@Ra=|ZE;GAl&FkF0&*r&3DT~&=Z}+S?J3hVM_t+M;HhK1wP2br2c2n6? zk8YQc3$vc5=EO91_ubn&Tj!-oZ3;3$>>gBCNm<$S8YteGIy0bl&mz7)uBtpsOq-R3 zTXFF8sr<};F3Ysb?U9_UOjg|6`KCWwoiZ<*LP&@KyftclqM; zFFu#=Ky6{<;^Jh+wHH}piw3|iaP_+2p)<<$->PEinR_#ynRgUjp-|F63R=(uY zD`3p>tlb=meP2O-Ym2QpUh(bq%GOr0MvTn!GP#wN#qL^CPpB-)TC7P{hcfS4DT{oA zr459bTWMCNwy9KRL+KJ+*hZrH`zk9EE3XLox;8~)5+}IC6f*At%x!ybEuL^)b13!> zKTJ}WO(O}V*&Gtsy?v7BvNCs8+EQ9`YeC8~FXtu~Q&n{dGR3sAG&O|0w33v0Zh^5_ z1{#|{nDYGuZ08L?tpF*#Gmr~*l?z#R00CpQ+}f@H6H6c`uq#CgxJA;{HRPyd)maN` zNjsXB!U6#!{4f=W&nBjWbrP8})e8$$0M-))mK9l>Ig^)-Z3DT_XRwig6eagdHk z2LC>jVZXF?U6r+|H(Q#Wt1#X2ik7J)jkP~N6jP{1-MXG|ljm~-B~b&Z&27uwCD^J= z4bO(`$e*_etfK6|T_8{d7U+mp3{L)QQ{UoSKI{W>eSoOa8;v@`7c;>#__Lx^yMy?! z+7m-4T>)}afqH7hBi;68v@M4D;jN_n*_)CYE(>Mu>_i$yi=$_F$Jf{3xUn#bRGQXS z^ro_+otu0n=Plf%?D7-}rpXu9A`U@MvM!re5(EPp2YkydBS0*Jn4se|R(h<9tj2Pv z%gGox7G0sq&CgU zi7hbKuCjWUw?n`$XYy(z&23gyH2$nTl%@pcfWst7x9)PJ2Is=$Gi&L0z=_A%orN_w zYpTtx%dUX6P>ccKnodq5CG>R%|6Z5Qn(ShcB^h)aiFhoySPm+L;7n(G;sXgN>Eyu% z5nnV3WDOPP-JP^{6}+m2Vx02t5fz%IqqKdTkP_y}Fp7{s9Z(oc1D!(W&R0C>$^9w= zM~HO1E!xtcivtb|Pzm*)ZkxK|_6P#U7M zy0pgqw_RIx?YASNdUn39a;u4me$so=Ya|=)q_$7G#*#4Us~(Mj#x&a2Vl& zY1Z^XT=t7i6&-<)}vuR+aLn1#{K5xK= zyX9&MJVf;w6H}AXK9JQ46(0!-l>qH)%>)YLX~`g8jzX|v!c#&MpR4)=!f0UgY@HPp zF$)gp2U>7n=oxbqi3?4dt~$A2p?F|yjo>-+K|X9@`+%r4Fry<9g_1^NP|>O1AdOXJ znZseeE$>l{)Q1q9gXf|q%S^JvdZD&Ic9|2Z!?P|S4U%h$&Ar*s?E+1XJqHPP$P#WW z3{vJf(roHE#HJzOQ zfo#;TA6LFk)OhT1o@-g#92~*|^ZYD1mARETtL@*j-?IumeKmfr5n=S`b(yEk5j|aRkOk^2nY}THov5(G>VX`zAJ)K@ zm$UDY{<@U8L$3OTR~yt1>)A!QynxZ=VZVR;dsuF{KA`cS__sC80CrBDHUHIvP<62R zh{Qdxvi~iJK^2JDd)ow*+Y8yI?7TaxO|{y23hH50JCDEjO~STW_~$eKdBqs<(B{#i zZ8h~IU)R>|VfhbjmhbZS4!H&Kzn7mH1z5PWO#|mvY5y!LAT3A}ZAUbSgnCLP?a4;~ zKzNDrUes_PR;8tN<%BQAbm4VW<3h6XJR^S3gY9*mKAKvJi3xsw-QK!FThhUfc~udCFdK*S#$I^W#L zq%AKpRIiU+<$sPr!rjzG5Tyk$UCgyxT+Da7=AY;-AT%Zsdmi*A~I7fMsmG_ZE7V zoS=+hOBk_66{2NYF*N(nVRuh}C`XUw(E{d#mIH$XUy$r&YbK}KZ>I7{zIrJQ+NFB| zhcEFv-q6e*ePXZG!dAV#5%f=LGav1=h}LKUhyEdifY2fN8Aa$;;r*^-X2ptR0cjz{ zxX#R{d&Tda!xiB(w4jJC9IC7^2toa$MUcU)0x8C~5w2vwGERXKHZZO|L}P*>fkjQl zSRbX&?6r*t^K%X}XzAe_O?V}IOEDo=f;SgWF z^blh>AR3GZ!?m%}N5S!7#gCT#Gs=%QZYllwuF~76zWT(9r@EasW0I)Lk-&pGKh{;{x7FyurENV3oQ+9{VMDo1fy7|70h1Q&+(gPfoyTh$gwF9> z2?;XV^GiI$${!espDH)I(Eb^YVsO96R=jXOYeAFWvjrOtz?^Y11^F_W;~P#|If`|0 zzt1`B3$ubF?SAOBfRijqtNS~KdR_riz*eSq)Q^o15UDqmf4`&!4jGO4vod(26O ze0?KdpOzi#+%;V4`Rt1!_H89kqWNeTJ$b&sQH^N%;*W;J%rUEnI-OHPXZnR_7)bZ1 zMqIZJ7s6!RI_;6GyHZ=k0EKPTR=XpU-*wvXlE23u{;%cvv|otgqhamQ^i&eEPxj*y zVJDx2k^r5xj~9E}Fm`uJ##o5c(NopUR9^EqVd{bz2~o?w=p*u{?>kk}Bi<(r(ch-E zYx4#Q24f&$pNiv$dHuI^q$Hhxp`9M9WL)_>Eer)DvcRk3SwF=*P7(~KV6~X@{BhzP z%1r-EYcl)?gPd2GUaN2iD1ZDj8u}N30M+-_ipffjq@5CSI6zv-KkW%F@2as5y^O6?hrEl=Gx%($e=R~ zooTqgpfdtLiPBFC$f?D0l8JTe% zi}vZ5v=N#FJwv$RtBMz+F}>l#k(@4^Tw(HZDA(ohuHL?ZY;cIy8olQ6fRQ@1&QO)G z$fkxu#GSdh_g4m|(G3DspkxZF7RB9jb2G;Y~=G za8%UFWG}2Ht3CjUx17%3T%5o8#-9c%WxW4HyWDvjqvWE4rE?qjuKHwPBs>p7*4tDb zkKdx(Rnoz?!F()3`1k+FZzACw9P~*M`26vMj|4-27#0-(=v(cEX&DU?P8QDJDr-=4 zjBF4fhUBoEpXEth3Q{Uhpb*wT?uBp0`xiL!!|^?iGC4cLETpV4PAK$5b;4asoZmG( z$J5I{Q_iyNMrTN?_~6B784J$&3LU$YVWd54ToT04TpfNCqVbw4eZ?%e@M3ixzy6`_$${^oYQ`)k+=^Au>QI@yxM$YM1d(wI+W+F9JH)LaTiGm*!(zBp66vyAzV}h=j@bw_dPr&Xpd;DLF7Q zqJo<=Ui9d}Jg&R(=3iZR6^7in9aPY|^>R##CWH5nsQa1+>bQZz%UEz-hf`;sA)q*S z$7%GcE(^{?F;7RT39pC2lh)Y5U?8(>(aSv|7lwoJvmRl@ite@O0SqO$;Nk=5-kOQh z)HN7+eQ$dcPn73+Z;{tHBK*Rl;__zOT6og6K&}|Aq>9^mbFSB81n(aO_ExaVM>6rL z3S$8E1y76rpzQ`T7#Hs`1opI|IAJF`WsiV zINapv=_tAn7k1;7^qWJsZrh6ew_nC^-J>LuGU^kGfhy6dV?P65nMyU_t8J3qj%VRbw zM^?1F{qXuB+x%-0ll+h@jJzvX`<>uAfQ?NPS{S|K7+$xLysdqMq%=UsAa!~lSwQSX zA292krF;99Xw|>kmTU()b;}2BI6~Nbv?1?B@qh_e4)xehM_B0!@q?c6!HTFQ2Pk?L i9uDT?mDvtGkUx%z!P68*D}pVwdu$=vNV4`C#Qy^8_gVb_ literal 0 HcmV?d00001 From 145dd951468210ea92be57d5a79a8e920f112c19 Mon Sep 17 00:00:00 2001 From: Lars Kuhtz Date: Wed, 5 Aug 2020 22:42:53 -0700 Subject: [PATCH 3/9] fix pruning --- src/Chainweb/BlockHeaderDB/PruneForks.hs | 105 +++++++++--------- src/Chainweb/Chainweb.hs | 75 +++++++++++-- src/Chainweb/Chainweb/PruneChainDatabase.hs | Bin 13826 -> 14201 bytes src/Chainweb/Pact/PactService/Checkpointer.hs | 9 +- 4 files changed, 121 insertions(+), 68 deletions(-) diff --git a/src/Chainweb/BlockHeaderDB/PruneForks.hs b/src/Chainweb/BlockHeaderDB/PruneForks.hs index aacf34ad55..ddeebcba26 100644 --- a/src/Chainweb/BlockHeaderDB/PruneForks.hs +++ b/src/Chainweb/BlockHeaderDB/PruneForks.hs @@ -1,5 +1,6 @@ {-# LANGUAGE BangPatterns #-} {-# LANGUAGE DataKinds #-} +{-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE DerivingStrategies #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GADTs #-} @@ -33,6 +34,10 @@ import Data.Function import qualified Data.List as L import Data.Maybe import Data.Semigroup +import qualified Data.Text as T + +import GHC.Generics +import GHC.Stack import Numeric.Natural @@ -123,10 +128,22 @@ pruneForks logg cdb depth headerCallback payloadCallback = do let mar = MaxRank $ Max $ int (_blockHeight hdr) - depth pruneForks_ logg cdb mar (MinRank $ Min 0) headerCallback payloadCallback --- | TODO add option to also validate the block headers +data PruneForksException + = PruneForksDbInvariantViolation BlockHeight [BlockHeight] T.Text + deriving (Show, Eq, Ord, Generic) + +instance Exception PruneForksException + +-- | Prune forks between the given min rank and max rank. +-- +-- Only block headers that are ancestors of a block header that has block height +-- max rank are kept. +-- +-- TODO add option to also validate the block headers -- pruneForks_ - :: LogFunctionText + :: HasCallStack + => LogFunctionText -> BlockHeaderDb -> MaxRank -> MinRank @@ -145,68 +162,46 @@ pruneForks_ logg cdb mar mir hdrCallback payloadCallback = do $ "Skipping database pruning because of an empty set of block headers at upper pruning bound " <> sshow mar <> ". This would otherwise delete the complete database." - withReverseHeaderStream cdb (mar - 1) mir $ - S.foldM_ go (return (pivots, [], 0)) (\(_, _, n) -> return n) - where - - go - :: ([BlockHeader], [BlockPayloadHash], Int) - -> BlockHeader - -> IO ([BlockHeader], [BlockPayloadHash], Int) - - go ([], _, _) _ = error "impossible" -- FIXME + withReverseHeaderStream cdb (mar - 1) mir $ \s -> s + & S.groupBy ((==) `on` _blockHeight) + & S.mapped S.toList + & S.foldM_ go (return (_blockParent <$> pivots, 0)) (return . snd) + where -- Note that almost always `pivots` is a singleton list and `bs` is empty. -- Also `payloads` almost always empty. -- - go (!pivots, !payloads, !n) cur = do + go :: ([BlockHash], Int) -> [BlockHeader] -> IO ([BlockHash], Int) + go ([], _) _ = throwM $ InternalInvariantViolation "PrunForks.pruneForks_: impossible case" + go (!pivots, !n) curs = do + + let (newPivots, toDelete) = L.partition (\h -> _blockHash h `elem` pivots) curs + pivotPayloads = _blockPayloadHash <$> newPivots - -- Sanity Check: make sure didn't miss the pivot: + -- TODO: try to repair the database by fetching the missing block from + -- remote peers? -- - when (_blockHeight cur + 1 < maximum (fmap _blockHeight pivots)) $ do - let pivot = head pivots - throwM - $ TreeDbAncestorMissing @BlockHeaderDb pivot (int (_blockHeight cur)) + -- It's probably the best to write an independent repair program or + -- module + when (null newPivots) $ do + logg Error $ "Corrupted chain database for chain " <> toText (_chainId cdb) <> ". The chain db must be deleted and re-resynchronized." + throwM $ TreeDbKeyNotFound @BlockHeaderDb (head pivots) + -- note that the use of `head` here is safe + + forM_ toDelete $ \b -> do + deleteHdr b + hdrCallback True b + let payload = _blockPayloadHash b + payloadCallback (payload `notElem` pivotPayloads) payload + + forM_ newPivots $ \b -> do + hdrCallback False b + payloadCallback False (_blockPayloadHash b) + + return (_blockParent <$> newPivots, n + length toDelete) - -- FIXME: try to repair the database by fetching the missing - -- block from remote peers? - -- - -- It's probably the best to write an independent repair - -- program or module - - case L.partition (\p -> _blockHash cur == _blockParent p) pivots of - - -- Delete element - ([], _) -> do - deleteHdr cur - hdrCallback True cur - return (pivots, _blockPayloadHash cur : payloads, n+1) - - -- We've got a new pivot. This case happens almost always. - -- - (_, bs) -> do - -- TODO: add intrinsic and inductive valiation? - - let newPivots = cur : bs - - -- When after adding this pivot all pivots have the same block - -- height we can delete the pending payloads, since we've seen - -- the payloads of all pivots down to the current height. - -- - -- This check is fast when bs is empty. - -- - when (all (((==) `on` _blockHeight) cur) bs) $ do - let pivotPayloads = _blockPayloadHash <$> newPivots - forM_ (_blockPayloadHash cur : payloads) $ \p -> do - let del = not $ elem p pivotPayloads - when del $ logg Debug - $ "call payload callback for deleted payload: " <> encodeToText p - payloadCallback del p - - hdrCallback False cur - return (newPivots, [], n) deleteHdr k = do -- TODO: make this atomic (create boilerplate to combine queries for diff --git a/src/Chainweb/Chainweb.hs b/src/Chainweb/Chainweb.hs index 127245d99d..aa21d8b895 100644 --- a/src/Chainweb/Chainweb.hs +++ b/src/Chainweb/Chainweb.hs @@ -126,7 +126,7 @@ import Control.Concurrent.MVar (MVar, readMVar) import Control.Error.Util (note) import Control.Lens hiding ((.=), (<.>)) import Control.Monad -import Control.Monad.Catch (throwM) +import Control.Monad.Catch (MonadThrow, throwM) import Data.Bifunctor (second) import Data.CAS (casLookupM) @@ -269,11 +269,45 @@ instance FromJSON (ThrottlingConfig -> ThrottlingConfig) where <*< throttlingPeerRate ..: "putPeer" % o <*< throttlingLocalRate ..: "local" % o --- +-- -------------------------------------------------------------------------- -- +-- Cut Coniguration + +data ChainDatabaseGcConfig = GcNone | GcPruneForks | GcFull + deriving (Show, Eq, Ord, Enum, Bounded, Generic) + +chainDatabaseGcToText :: ChainDatabaseGcConfig -> T.Text +chainDatabaseGcToText GcNone = "none" +chainDatabaseGcToText GcPruneForks = "forks" +chainDatabaseGcToText GcFull = "full" + +chainDatabaseGcFromText :: MonadThrow m => T.Text -> m ChainDatabaseGcConfig +chainDatabaseGcFromText t = case T.toCaseFold t of + "none" -> return GcNone + "forks" -> return GcPruneForks + "full" -> return GcFull + x -> throwM $ TextFormatException $ "unknown value for database pruning configuration: " <> sshow x + +instance HasTextRepresentation ChainDatabaseGcConfig where + toText = chainDatabaseGcToText + fromText = chainDatabaseGcFromText + {-# INLINE toText #-} + {-# INLINE fromText #-} + +instance ToJSON ChainDatabaseGcConfig where + toJSON = toJSON . chainDatabaseGcToText + {-# INLINE toJSON #-} + +instance FromJSON ChainDatabaseGcConfig where + parseJSON v = parseJsonFromText "ChainDatabaseGcConfig" v <|> legacy v + where + legacy = withBool "ChainDatabaseGcConfig" $ \case + True -> return GcPruneForks + False -> return GcNone + {-# INLINE parseJSON #-} data CutConfig = CutConfig { _cutIncludeOrigin :: !Bool - , _cutPruneChainDatabase :: !Bool + , _cutPruneChainDatabase :: !ChainDatabaseGcConfig , _cutFetchTimeout :: !Int , _cutInitialCutHeightLimit :: !(Maybe CutHeight) } deriving (Eq, Show) @@ -296,9 +330,26 @@ instance FromJSON (CutConfig -> CutConfig) where defaultCutConfig :: CutConfig defaultCutConfig = CutConfig { _cutIncludeOrigin = True - , _cutPruneChainDatabase = True + , _cutPruneChainDatabase = GcPruneForks , _cutFetchTimeout = 3_000_000 - , _cutInitialCutHeightLimit = Nothing } + , _cutInitialCutHeightLimit = Nothing + } + +pCutConfig :: MParser CutConfig +pCutConfig = id + <$< cutIncludeOrigin .:: boolOption_ + % long "cut-include-origin" + <> hidden + <> internal + <> help "whether to include the origin when sending cuts" + <*< cutPruneChainDatabase .:: textOption + % long "prune-chain-database" + <> help "How to prune the chain database on startup. Pruning forks takes about 1-2min. A full GC takes several minutes" + <> metavar "none|forks|full" + <*< cutFetchTimeout .:: option auto + % long "cut-fetch-timeout" + <> help "The timeout for processing new cuts in microseconds" + -- cutInitialCutHeightLimit isn't supported on the command line -- -------------------------------------------------------------------------- -- -- Chainweb Configuration @@ -433,6 +484,7 @@ pChainwebConfiguration = id <*< configRosetta .:: boolOption_ % long "rosetta" <> help "Enable the Rosetta endpoints." + <*< configCuts %:: pCutConfig -- -------------------------------------------------------------------------- -- -- Chainweb Resources @@ -573,11 +625,15 @@ withChainwebInternal -> IO a withChainwebInternal conf logger peer rocksDb dbDir nodeid resetDb inner = do - -- TODO distinguish between "prune: headers" and "prune: full" - when prune $ fullGc (setComponent "database-gc" logger) rocksDb v - initializePayloadDb v payloadDb + -- Garbage Collection + -- performed before PayloadDb and BlockHeaderDb used by other components + case _cutPruneChainDatabase (_configCuts conf) of + GcFull -> fullGc (setComponent "database-gc" logger) rocksDb v + GcPruneForks -> pruneAllChains (setComponent "database-prune" logger) rocksDb v + GcNone -> return () + concurrentWith -- initialize chains concurrently (\cid -> do @@ -598,9 +654,6 @@ withChainwebInternal conf logger peer rocksDb dbDir nodeid resetDb inner = do , _pactAllowReadsInLocal = _configAllowReadsInLocal conf } - prune :: Bool - prune = _cutPruneChainDatabase $ _configCuts conf - cidsList :: [ChainId] cidsList = toList cids diff --git a/src/Chainweb/Chainweb/PruneChainDatabase.hs b/src/Chainweb/Chainweb/PruneChainDatabase.hs index 4f41b1600a800ddb6d99a5029e53853833d483c1..1028a45fc389537d1444fb04f8a885fd8b44943d 100644 GIT binary patch delta 512 zcmc(bu}T9$5Qd3Jz{6k=Ee*q=kcbI6u@VtY3Js!I_yCz?Czmyw-LSif#KNGjaJ7x4 zg$VZuHWogE?_uX8LJA98x0qpezkhb-e^6ig^(B`aEpMbBx%bSZT!stdX&-!7IzZ=! zeR5o&=Rh#Pu`(z?v|#$IETI^K)Y3~-@|MI7$O=_vf`EI)q{%VaWsWRo91*mLG;6M@ z?MYM^3kG(m=~$_H^0;!J9^|u&|1?NvsMJY207%Z(p3~L!-clw??h5ruR0{a^=O`1$ z(9j(Nn-KEUrj5J^gmYku9UOvlU2_9P$G$Z5oU4sJJK9IvBXMO>JJ^Olkjt6lr@}!~ z$WarDQFV8ZCDT+Y{r)mMuR+JyS$nBt5a_Xmj)1hcl3bX+6+W{IQ8j(qYUVS+Hwf9L A;Q#;t delta 112 zcmeyF*Oaqi9WNupWCtGE&8K-!Fiz$a=#tFL%Ph%E%*m`u4M?oa$xlpiNm3|NC`d`# u+$X@zGMQg=vy1`|U{?lIbx<^ilhJ6hp1d=Yk>+L#g?ns_7Mms169fR+%O$P= diff --git a/src/Chainweb/Pact/PactService/Checkpointer.hs b/src/Chainweb/Pact/PactService/Checkpointer.hs index bf8c3d0268..fa6f8d73a1 100644 --- a/src/Chainweb/Pact/PactService/Checkpointer.hs +++ b/src/Chainweb/Pact/PactService/Checkpointer.hs @@ -93,7 +93,7 @@ import Chainweb.Payload import Chainweb.Payload.PayloadStore import Chainweb.TreeDB (collectForkBlocks, lookup, lookupM) import Chainweb.Utils hiding (check) -import Data.CAS (casLookupM) +import Data.CAS (casLookup) exitOnRewindLimitExceeded :: PactServiceM cas a -> PactServiceM cas a exitOnRewindLimitExceeded = handle $ \case @@ -349,7 +349,12 @@ rewindTo rewindLimit (Just (ParentHeader parent)) = do -- This does a restore, i.e. it rewinds the checkpointer back in -- history, if needed. withCheckpointerWithoutRewind (Just target) "fastForward" $ \pdbenv -> do - payload <- liftIO (payloadWithOutputsToPayloadData <$> casLookupM payloadDb bpHash) + payload <- liftIO $ casLookup payloadDb bpHash >>= \case + Nothing -> throwM $ PactInternalError + $ "Checkpointer.rewindTo.fastForward: lookup of payload failed" + <> ". BlockPayloadHash: " <> encodeToText bpHash + <> ". Block: "<> encodeToText (ObjectEncoded block) + Just x -> return $ payloadWithOutputsToPayloadData x void $ execBlock block payload pdbenv return $! Save block () -- double check output hash here? From ca5fa31be90f43fa472ef87ff634ac7da0a4eecf Mon Sep 17 00:00:00 2001 From: Lars Kuhtz Date: Wed, 5 Aug 2020 23:08:45 -0700 Subject: [PATCH 4/9] ... --- src/Chainweb/Chainweb/PruneChainDatabase.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Chainweb/Chainweb/PruneChainDatabase.hs b/src/Chainweb/Chainweb/PruneChainDatabase.hs index 1028a45fc3..e18c338ef6 100644 --- a/src/Chainweb/Chainweb/PruneChainDatabase.hs +++ b/src/Chainweb/Chainweb/PruneChainDatabase.hs @@ -222,7 +222,7 @@ fullGc logger rdb v = do markedPayloads <- mkFilter (round $ int @_ @Double m * 1.1) logg Info $ "Allocated " - <> sshow (sizeInAllocatedBytes markedTrans * sizeInAllocatedBytes markedPayloads `div` (1024 * 1024)) + <> sshow ((sizeInAllocatedBytes markedTrans + sizeInAllocatedBytes markedPayloads) `div` (1024 * 1024)) <> "MB for marking database entries" -- TODO mark all entries above a depth of depth, so it doesn't get GCed From 71f8b4131076673c667f33644cf48c8b9381cf65 Mon Sep 17 00:00:00 2001 From: Lars Kuhtz Date: Thu, 6 Aug 2020 00:07:58 -0700 Subject: [PATCH 5/9] ... --- src/Chainweb/Chainweb/PruneChainDatabase.hs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/Chainweb/Chainweb/PruneChainDatabase.hs b/src/Chainweb/Chainweb/PruneChainDatabase.hs index e18c338ef6..02d960333c 100644 --- a/src/Chainweb/Chainweb/PruneChainDatabase.hs +++ b/src/Chainweb/Chainweb/PruneChainDatabase.hs @@ -238,6 +238,11 @@ fullGc logger rdb v = do chainLogg Info $ "finished pruning block header database. Deleted " <> sshow x <> " block headers." return (markedPayloads, markedTrans) + -- TODO: consider using bloom fiters instead that can be merged. Alternatively, + -- implement concurrent insertion for cuckoo filters, where the hashing is done + -- concurrently and a lock is used only for the actual modification of the + -- underlying buffer. Or do fine grained locking on the filter. + -- checkMark :: BA.ByteArrayAccess a => [(Filter a)] -> a -> IO Bool checkMark [] _ = return False checkMark (h : t) a = member h (GcHash a) >>= \case @@ -319,11 +324,12 @@ payloadGcCallback logg db markedPayloads markedTrans isDelete h = do tryInsert cf "transactions hash" (gcHash $ _blockPayloadTransactionsHash payload) where tryInsert cf k a = do - -- inserting a large number of equal elements causes the filter to fail. + -- inserting a somewhat larger number (I think, it's actually 7) of + -- equal elements causes the filter to fail. unlessM (member cf a) $ unlessM (insert cf a) $ error - $ "failed to insert item in cuckoo filter " <> k - <> ": increase the size of the filter and try again" + $ "failed to insert item " <> k <> " in cuckoo filter" + <> ": while very rare this can happen. Usually it is resolve by retrying." pdb = _transactionDbBlockPayloads $ _transactionDb db From eea2cab10cce3a87e257823fec991b19faee817f Mon Sep 17 00:00:00 2001 From: Lars Kuhtz Date: Thu, 6 Aug 2020 17:53:55 -0700 Subject: [PATCH 6/9] update stack and nix for cuckoo --- project.nix | 7 ++++++- stack.yaml | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/project.nix b/project.nix index 4006a29e72..030e12775d 100644 --- a/project.nix +++ b/project.nix @@ -16,9 +16,14 @@ proj = kpkgs.rp.project ({ pkgs, hackGet, ... }: with pkgs.haskell.lib; }); in { name = "chainweb"; - overrides = self: super: { + overrides = self: super: with pkgs.haskell.lib; { chainweb = enableCabalFlag ( justStaticExecutables (enableDWARFDebugging (convertCabalTestsAndBenchmarksToExecutables super.chainweb))) "use_systemd"; + cuckoo = dontCheck (callHackageDirect { + pkg = "cuckoo"; + ver = "0.2"; + sha256 = "0rn4kjlxbfj3336rwrwcvfavf7y7a9hmgsm5hd1mdqw86haxjhg0"; + }); }; packages = { diff --git a/stack.yaml b/stack.yaml index 5b8624ba94..7bb334c348 100644 --- a/stack.yaml +++ b/stack.yaml @@ -6,7 +6,6 @@ build: extra-deps: # --- Missing from Stackage --- # - - bloomfilter-2.0.1.0 - configuration-tools-0.5.0 - digraph-0.2 - fake-0.1.1.3 @@ -24,6 +23,7 @@ extra-deps: - token-bucket-0.1.0.1 - wai-middleware-throttle-0.3.0.1 - yet-another-logger-0.4.0 + - cuckoo-0.2.0.1 # --- Forced Downgrades --- # - network-3.1.0.1 From 785558d9e323c438cab45da896f53e020d0f6eee Mon Sep 17 00:00:00 2001 From: Lars Kuhtz Date: Thu, 6 Aug 2020 19:36:04 -0700 Subject: [PATCH 7/9] remove some redundant imports --- project.nix | 1 + src/Chainweb/Chainweb/ChainResources.hs | 5 ----- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/project.nix b/project.nix index 030e12775d..440cefd587 100644 --- a/project.nix +++ b/project.nix @@ -19,6 +19,7 @@ proj = kpkgs.rp.project ({ pkgs, hackGet, ... }: with pkgs.haskell.lib; overrides = self: super: with pkgs.haskell.lib; { chainweb = enableCabalFlag ( justStaticExecutables (enableDWARFDebugging (convertCabalTestsAndBenchmarksToExecutables super.chainweb))) "use_systemd"; + cuckoo = dontCheck (callHackageDirect { pkg = "cuckoo"; ver = "0.2"; diff --git a/src/Chainweb/Chainweb/ChainResources.hs b/src/Chainweb/Chainweb/ChainResources.hs index 75d6533da7..7b61a51653 100644 --- a/src/Chainweb/Chainweb/ChainResources.hs +++ b/src/Chainweb/Chainweb/ChainResources.hs @@ -39,7 +39,6 @@ import Control.Monad import Control.Monad.Catch import Data.Maybe -import Data.Semigroup import qualified Data.Text as T import qualified Network.HTTP.Client as HTTP @@ -51,11 +50,8 @@ import System.LogLevel -- internal modules import Chainweb.BlockHeaderDB -import Chainweb.BlockHeaderDB.PruneForks -import Chainweb.BlockHeight import Chainweb.ChainId import Chainweb.Chainweb.PeerResources -import Chainweb.Graph import Chainweb.Logger import qualified Chainweb.Mempool.Consensus as MPCon import qualified Chainweb.Mempool.InMem as Mempool @@ -68,7 +64,6 @@ import Chainweb.NodeId import Chainweb.Pact.Service.PactInProcApi import Chainweb.Pact.Service.Types import Chainweb.Payload.PayloadStore -import Chainweb.Payload.PayloadStore.RocksDB import Chainweb.RestAPI.NetworkID import Chainweb.Transaction import Chainweb.Utils From 3f102c2ad26ae5ee97a5209ef0ecb05eaeb675be Mon Sep 17 00:00:00 2001 From: Lars Kuhtz Date: Thu, 6 Aug 2020 22:41:36 -0700 Subject: [PATCH 8/9] more nix --- project.nix | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/project.nix b/project.nix index 440cefd587..168e251d27 100644 --- a/project.nix +++ b/project.nix @@ -16,15 +16,19 @@ proj = kpkgs.rp.project ({ pkgs, hackGet, ... }: with pkgs.haskell.lib; }); in { name = "chainweb"; - overrides = self: super: with pkgs.haskell.lib; { + overrides = self: super: { chainweb = enableCabalFlag ( justStaticExecutables (enableDWARFDebugging (convertCabalTestsAndBenchmarksToExecutables super.chainweb))) "use_systemd"; - - cuckoo = dontCheck (callHackageDirect { - pkg = "cuckoo"; - ver = "0.2"; - sha256 = "0rn4kjlxbfj3336rwrwcvfavf7y7a9hmgsm5hd1mdqw86haxjhg0"; - }); + cuckoo = dontBenchmark (dontCheck (self.callHackageDirect { + pkg = "cuckoo"; + ver = "0.2.1"; + sha256 = "1dsac9qc90aagcgvznzfjd4wl8wgxhq1m8f5h556ys72nkm1ablx"; + } {})); + quickcheck-classes-base = dontCheck (self.callHackageDirect { + pkg = "quickcheck-classes-base"; + ver = "0.6.0.0"; + sha256 = "1mmhfp95wqg6i5gzap4b4g87zgbj46nnpir56hqah97igsbvis70"; + } {}); }; packages = { From 00f838ca2bc547ee355222627bb860a51fa2378c Mon Sep 17 00:00:00 2001 From: Lars Kuhtz Date: Fri, 7 Aug 2020 09:58:48 -0700 Subject: [PATCH 9/9] fix arithmetic underflow on empty databases --- src/Chainweb/BlockHeaderDB/PruneForks.hs | 15 +++++++++++++-- src/Chainweb/Chainweb/PruneChainDatabase.hs | 4 ++-- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/src/Chainweb/BlockHeaderDB/PruneForks.hs b/src/Chainweb/BlockHeaderDB/PruneForks.hs index ddeebcba26..cf1c3f554d 100644 --- a/src/Chainweb/BlockHeaderDB/PruneForks.hs +++ b/src/Chainweb/BlockHeaderDB/PruneForks.hs @@ -125,8 +125,16 @@ pruneForks -> IO Int pruneForks logg cdb depth headerCallback payloadCallback = do hdr <- maxEntry cdb - let mar = MaxRank $ Max $ int (_blockHeight hdr) - depth - pruneForks_ logg cdb mar (MinRank $ Min 0) headerCallback payloadCallback + if int (_blockHeight hdr) < depth + 1 + then do + logg Warn + $ "Skipping database prunning because the maximum block height of " + <> sshow (_blockHeight hdr) <> " is smaller than the requested depth " + <> sshow depth <> " plus one." + return 0 + else do + let mar = MaxRank $ Max $ int (_blockHeight hdr) - depth + pruneForks_ logg cdb mar (MinRank $ Min 0) headerCallback payloadCallback data PruneForksException = PruneForksDbInvariantViolation BlockHeight [BlockHeight] T.Text @@ -150,6 +158,9 @@ pruneForks_ -> (Bool -> BlockHeader -> IO ()) -> (Bool -> BlockPayloadHash -> IO ()) -> IO Int +pruneForks_ logg _ mar mir _ _ + | mar <= 1 = 0 <$ logg Warn ("Skipping database prunning on for max bound of " <> sshow mar) + | mir <= 1 = 0 <$ logg Warn ("Skipping database prunning on for min bound of " <> sshow mir) pruneForks_ logg cdb mar mir hdrCallback payloadCallback = do !pivots <- entries cdb Nothing Nothing diff --git a/src/Chainweb/Chainweb/PruneChainDatabase.hs b/src/Chainweb/Chainweb/PruneChainDatabase.hs index 02d960333c..5822697ce4 100644 --- a/src/Chainweb/Chainweb/PruneChainDatabase.hs +++ b/src/Chainweb/Chainweb/PruneChainDatabase.hs @@ -218,8 +218,8 @@ fullGc logger rdb v = do chainLogg = logFunctionText chainLogger m <- maxRank cdb - markedTrans <- mkFilter (round $ int @_ @Double m * 1.1) - markedPayloads <- mkFilter (round $ int @_ @Double m * 1.1) + markedTrans <- mkFilter (round $ (1024 + int @_ @Double m) * 1.1) + markedPayloads <- mkFilter (round $ (1024 + int @_ @Double m) * 1.1) logg Info $ "Allocated " <> sshow ((sizeInAllocatedBytes markedTrans + sizeInAllocatedBytes markedPayloads) `div` (1024 * 1024))