Skip to content


Fix rebase conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
Evgenii Akentev committed May 31, 2024
1 parent e4beac7 commit 6d753e6
Showing 1 changed file with 55 additions and 40 deletions.
95 changes: 55 additions & 40 deletions src/Chainweb/Pact/Backend/ChainwebPactCoreDb.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as B8
import qualified Data.DList as DL
import Data.List(sort)
import Data.List.NonEmpty (NonEmpty(..))
import qualified Data.List.NonEmpty as NE
import qualified Data.HashMap.Strict as HashMap
import qualified Data.HashSet as HashSet
import qualified Data.Map.Strict as M
Expand Down Expand Up @@ -67,7 +69,7 @@ import Chainweb.Logger
import Chainweb.Pact.Backend.DbCache
import Chainweb.Pact.Backend.Types
import Chainweb.Pact.Backend.Utils
import Chainweb.Pact.Service.Types (PactException(..), internalError)
import Chainweb.Pact.Service.Types (PactException(..), internalError, IntraBlockPersistence(..))
import Chainweb.Pact.Types (logError_)
import Chainweb.Utils (sshow)

Expand Down Expand Up @@ -166,14 +168,10 @@ doReadRow mlim d k = forModuleNameFix $ \mnFix ->
-> SQLitePendingData
-> MaybeT (BlockHandler logger) v
lookupInPendingData (Utf8 rowkey) f p = do
let deltaKey = SQLiteDeltaKey tableNameBS rowkey
ddata <- fmap _deltaData <$> MaybeT (return $ HashMap.lookup deltaKey (_pendingWrites p))
if null ddata
-- should be impossible, but we'll check this case
then mzero
-- we merge with (++) which should produce txids most-recent-first
-- -- we care about the most recent update to this rowkey
else MaybeT $ return $! f $ DL.head ddata
-- we get the latest-written value at this rowkey
allKeys <- hoistMaybe $ HashMap.lookup tableNameBS (_pendingWrites p)
ddata <- _deltaData . NE.head <$> hoistMaybe (HashMap.lookup rowkey allKeys)
MaybeT $ return $! f ddata

:: forall logger v .
Expand Down Expand Up @@ -250,12 +248,12 @@ recordPendingUpdate
recordPendingUpdate (Utf8 key) (Utf8 tn) txid vs = modifyPendingData modf
delta = SQLiteRowDelta tn (coerce txid) key vs
deltaKey = SQLiteDeltaKey tn key

modf = over pendingWrites upd
{- "const" is used here because prefer the latest update of the rowkey for
the current transaction -}
upd = HashMap.insertWith const deltaKey (DL.singleton delta)
upd = HashMap.unionWith
(HashMap.singleton tn
(HashMap.singleton key (NE.singleton delta)))

:: Maybe (BlockHeight, TxId)
Expand Down Expand Up @@ -329,9 +327,8 @@ doKeys mlim d = do
pb <- use bsPendingBlock
mptx <- use bsPendingTx

let memKeys = DL.toList $
fmap (T.decodeUtf8 . _deltaRowKey) $
collect pb `DL.append` maybe DL.empty collect mptx
let memKeys = fmap (T.decodeUtf8 . _deltaRowKey)
$ collect pb ++ maybe [] collect mptx

let !allKeys = msort -- becomes available with Pact42Upgrade
$ LHM.sort
Expand Down Expand Up @@ -370,8 +367,9 @@ doKeys mlim d = do

tn@(Utf8 tnBS) = asStringUtf8 d
collect p =
let flt k _ = _dkTable k == tnBS
in DL.concat $ HashMap.elems $ HashMap.filterWithKey flt (_pendingWrites p)
concatMap NE.toList $ HashMap.elems $ fromMaybe mempty $ HashMap.lookup tnBS (_pendingWrites p)
-- let flt k _ = _dkTable k == tnBS
-- in DL.concat $ HashMap.elems $ HashMap.filterWithKey flt (_pendingWrites p)
{-# INLINE doKeys #-}

Expand Down Expand Up @@ -414,12 +412,11 @@ doTxIds tn _tid@(TxId tid) = do
stmt = "SELECT DISTINCT txid FROM " <> tbl (tableNameCore tn) <> " WHERE txid > ?"

collect p =
let flt k _ = _dkTable k == (T.encodeUtf8 $ asString tn)
txids = DL.toList $
fmap (coerce . _deltaTxId) $
DL.concat $
let txids = fmap (coerce . _deltaTxId) $
concatMap NE.toList $
HashMap.elems $
HashMap.filterWithKey flt (_pendingWrites p)
fromMaybe mempty $
HashMap.lookup (T.encodeUtf8 $ asString tn) (_pendingWrites p)
in filter (> _tid) txids
{-# INLINE doTxIds #-}

Expand Down Expand Up @@ -512,26 +509,29 @@ doCommit = use bsMode >>= \case
modify' $ over bsTxId succ
-- merge pending tx into block data
pending <- use bsPendingTx
modify' $ over bsPendingBlock (merge pending)
persistIntraBlockWrites <- view blockHandlerPersistIntraBlockWrites
modify' $ over bsPendingBlock (merge persistIntraBlockWrites pending)
blockLogs <- use $ bsPendingBlock . pendingTxLogMapCore
modify' $ set bsPendingTx Nothing
return blockLogs
else doRollback >> return mempty
return $! concatMap (reverse . DL.toList) txrs
merge Nothing a = a
merge (Just a) b = SQLitePendingData
{ _pendingTableCreation = HashSet.union (_pendingTableCreation a) (_pendingTableCreation b)
, _pendingWrites = HashMap.unionWith mergeW (_pendingWrites a) (_pendingWrites b)
, _pendingTxLogMap = _pendingTxLogMap a
, _pendingTxLogMapCore = _pendingTxLogMapCore a
, _pendingSuccessfulTxs = _pendingSuccessfulTxs b
merge _ Nothing a = a
merge persistIntraBlockWrites (Just txPending) blockPending = SQLitePendingData
{ _pendingTableCreation = HashSet.union (_pendingTableCreation txPending) (_pendingTableCreation blockPending)
, _pendingWrites = HashMap.unionWith (HashMap.unionWith mergeAtRowKey) (_pendingWrites txPending) (_pendingWrites blockPending)
, _pendingTxLogMap = _pendingTxLogMap txPending
, _pendingTxLogMapCore = _pendingTxLogMapCore txPending
, _pendingSuccessfulTxs = _pendingSuccessfulTxs blockPending

mergeW a b = case take 1 (DL.toList a) of
[] -> b
(x:_) -> DL.cons x b
mergeAtRowKey txWrites blockWrites =
let lastTxWrite = NE.head txWrites
in case persistIntraBlockWrites of
PersistIntraBlockWrites -> lastTxWrite `NE.cons` blockWrites
DoNotPersistIntraBlockWrites -> lastTxWrite :| []
{-# INLINE doCommit #-}

-- | Begin a Pact transaction
Expand Down Expand Up @@ -575,10 +575,26 @@ doGetTxLog tn txid@(TxId txid') = do
takeHead (a:_) = [a]

readFromPending = do
pd <- getPendingData
let deltas = pd >>= HashMap.elems . _pendingWrites >>= takeHead . DL.toList
let ourDeltas = filter predicate deltas
mapM (\x -> toTxLog (asString tn) (Utf8 $ _deltaRowKey x) (_deltaData x)) ourDeltas
allPendingData <- getPendingData
let deltas = do
-- grab all pending writes in this transaction and elsewhere in
-- this block
pending <- allPendingData
-- all writes to the table
let writesAtTableByKey =
fromMaybe mempty $ HashMap.lookup tableNameBS $ _pendingWrites pending
-- a list of all writes to the table for some particular key
allWritesForSomeKey <- HashMap.elems writesAtTableByKey
-- the single latest write to the table for that key which is
-- from this txid; the most recent writes are inserted at the
-- front of the pending data
latestWriteForSomeKey <- take 1
[ writeForSomeKey
| writeForSomeKey <- NE.toList allWritesForSomeKey
, _deltaTxId writeForSomeKey == coerce txid
return latestWriteForSomeKey
mapM (\x -> toTxLog (asString tn) (Utf8 $ _deltaRowKey x) (_deltaData x)) deltas

readFromDb = do
rows <- callDb "doGetTxLog" $ \db -> qry db stmt
Expand All @@ -591,7 +607,6 @@ doGetTxLog tn txid@(TxId txid') = do
\result, got: " <> T.pack (show err)
stmt = "SELECT rowkey, rowdata FROM " <> tbl tablename <> " WHERE txid = ?"

toTxLog :: MonadThrow m => T.Text -> Utf8 -> BS.ByteString -> m (TxLog RowData)
toTxLog d key value =
case fmap (view document) $ _decodeRowData serialisePact value of
Expand Down

0 comments on commit 6d753e6

Please sign in to comment.