Skip to content

Commit

Permalink
Merge pull request #980 from input-output-hk/ensemble/emit-snapshot
Browse files Browse the repository at this point in the history
Refactor emit snapshot
  • Loading branch information
ch1bo authored Jul 20, 2023
2 parents 9a2c386 + 8f567f2 commit e0ea210
Show file tree
Hide file tree
Showing 7 changed files with 409 additions and 273 deletions.
2 changes: 1 addition & 1 deletion hydra-node/hydra-node.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ test-suite tests
Hydra.OptionsSpec
Hydra.PartySpec
Hydra.PersistenceSpec
Hydra.SnapshotStrategySpec
Hydra.HeadLogicSnapshotSpec
Paths_hydra_node
Spec
Test.Hydra.Fixture
Expand Down
199 changes: 96 additions & 103 deletions hydra-node/src/Hydra/HeadLogic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ data RequirementFailure tx
| InvalidMultisignature {multisig :: Text, vkeys :: [VerificationKey HydraKey]}
| SnapshotAlreadySigned {knownSignatures :: [Party], receivedSignature :: Party}
| AckSnNumberInvalid {requestedSn :: SnapshotNumber, lastSeenSn :: SnapshotNumber}
| SnapshotDoesNotApply {requestedSn :: SnapshotNumber, txid :: TxIdType tx, error :: ValidationError }
| SnapshotDoesNotApply {requestedSn :: SnapshotNumber, txid :: TxIdType tx, error :: ValidationError}
deriving stock (Generic)

deriving instance (Eq (TxIdType tx)) => Eq (RequirementFailure tx)
Expand Down Expand Up @@ -427,6 +427,15 @@ collectWaits = \case
Effects _ -> []
Combined l r -> collectWaits l <> collectWaits r

collectState :: Outcome tx -> [HeadState tx]
collectState = \case
NoOutcome -> []
Error _ -> []
Wait _ -> []
NewState s -> [s]
Effects _ -> []
Combined l r -> collectState l <> collectState r

-- * The Coordinated Head protocol

-- ** Opening the Head
Expand Down Expand Up @@ -647,35 +656,67 @@ onOpenNetworkReqTx env ledger st ttl tx =
case applyTransactions currentSlot seenUTxO [tx] of
Left (_, err)
| ttl <= 0 ->
NewState (Open st{coordinatedHeadState = untrackTxInState})
`Combined` Effects [ClientEffect $ TxInvalid headId seenUTxO tx err]
NewState (Open st{coordinatedHeadState = untrackTxInState})
`Combined` Effects [ClientEffect $ TxInvalid headId seenUTxO tx err]
| otherwise ->
NewState (Open st{coordinatedHeadState = trackTxInState})
`Combined` Wait (WaitOnNotApplicableTx err)
Right utxo' ->
NewState
( Open
st
{ coordinatedHeadState =
trackTxInState
{ seenTxs = seenTxs <> [tx]
, seenUTxO = utxo'
Effects [ClientEffect $ TxValid headId tx]
`Combined` if isLeader parameters party nextSn && not snapshotInFlight
then
NewState
( Open
st
{ coordinatedHeadState =
trackTxInState
{ seenTxs = seenTxs'
, seenUTxO = utxo'
, seenSnapshot =
RequestedSnapshot
{ lastSeen = seenSnapshotNumber seenSnapshot
, requested = nextSn
}
}
}
}
)
`Combined` Effects [ClientEffect $ TxValid headId tx]
& emitSnapshot env
)
`Combined` Effects [NetworkEffect (ReqSn nextSn (txId <$> seenTxs'))]
else
NewState
( Open
st
{ coordinatedHeadState =
trackTxInState
{ seenTxs = seenTxs'
, seenUTxO = utxo'
}
}
)
where
Ledger{applyTransactions} = ledger

CoordinatedHeadState{allTxs, seenTxs, seenUTxO} = coordinatedHeadState
Environment{party} = env

CoordinatedHeadState{allTxs, seenTxs, seenUTxO, confirmedSnapshot, seenSnapshot} = coordinatedHeadState

Snapshot{number = confirmedSn} = getSnapshot confirmedSnapshot

OpenState{coordinatedHeadState, headId, currentSlot} = st
OpenState{coordinatedHeadState, headId, currentSlot, parameters} = st

trackTxInState = coordinatedHeadState{allTxs = Map.insert (txId tx) tx allTxs}

untrackTxInState = coordinatedHeadState{allTxs = Map.delete (txId tx) allTxs}

snapshotInFlight = case seenSnapshot of
NoSeenSnapshot -> False
LastSeenSnapshot{} -> False
RequestedSnapshot{} -> True
SeenSnapshot{} -> True

nextSn = confirmedSn + 1

seenTxs' = seenTxs <> [tx]

-- | Process a snapshot request ('ReqSn') from party.
--
-- This checks that s is the next snapshot number and that the party is
Expand Down Expand Up @@ -811,7 +852,7 @@ onOpenNetworkAckSn ::
-- | Snapshot number of this AckSn.
SnapshotNumber ->
Outcome tx
onOpenNetworkAckSn env openState otherParty snapshotSignature sn =
onOpenNetworkAckSn Environment{party} openState otherParty snapshotSignature sn =
-- TODO: verify authenticity of message and whether otherParty is part of the head
-- Spec: require s ∈ {ŝ, ŝ + 1}
requireValidAckSn $ do
Expand All @@ -824,21 +865,41 @@ onOpenNetworkAckSn env openState otherParty snapshotSignature sn =
-- Spec: σ̃ ← MS-ASig(k_H, ̂Σ̂)
let multisig = aggregateInOrder sigs' parties
let allTxs' = foldr Map.delete allTxs confirmed
let nextSn = sn + 1
requireVerifiedMultisignature multisig snapshot $
NewState
( onlyUpdateCoordinatedHeadState $
coordinatedHeadState
{ confirmedSnapshot =
ConfirmedSnapshot
{ snapshot
, signatures = multisig
Effects [ClientEffect $ SnapshotConfirmed headId snapshot multisig]
`Combined` if isLeader parameters party nextSn && not (null seenTxs)
then
NewState
( onlyUpdateCoordinatedHeadState $
coordinatedHeadState
{ confirmedSnapshot =
ConfirmedSnapshot
{ snapshot
, signatures = multisig
}
, seenSnapshot =
RequestedSnapshot
{ lastSeen = sn
, requested = nextSn
}
, allTxs = allTxs'
}
, seenSnapshot = LastSeenSnapshot (number snapshot)
, allTxs = allTxs'
}
)
`Combined` Effects [ClientEffect $ SnapshotConfirmed headId snapshot multisig]
& emitSnapshot env
)
`Combined` Effects [NetworkEffect (ReqSn nextSn (txId <$> seenTxs))]
else
NewState
( onlyUpdateCoordinatedHeadState $
coordinatedHeadState
{ confirmedSnapshot =
ConfirmedSnapshot
{ snapshot
, signatures = multisig
}
, seenSnapshot = LastSeenSnapshot sn
, allTxs = allTxs'
}
)
where
seenSn = seenSnapshotNumber seenSnapshot

Expand Down Expand Up @@ -881,21 +942,22 @@ onOpenNetworkAckSn env openState otherParty snapshotSignature sn =
RequireFailed $
InvalidMultisignature{multisig = show multisig, vkeys}


vkeys = vkey <$> parties

-- XXX: Data structures become unwieldy -> helper functions or lenses
onlyUpdateCoordinatedHeadState chs' =
Open openState{coordinatedHeadState = chs'}

CoordinatedHeadState{seenSnapshot, allTxs} = coordinatedHeadState

OpenState
{ parameters = HeadParameters{parties}
{ parameters
, coordinatedHeadState
, headId
} = openState

CoordinatedHeadState{seenSnapshot, allTxs, seenTxs} = coordinatedHeadState

HeadParameters{parties} = parameters

-- ** Closing the Head

-- | Client request to close the head. This leads to a close transaction on
Expand Down Expand Up @@ -1103,77 +1165,8 @@ update env ledger st ev = case (st, ev) of

-- * Snapshot helper functions

data SnapshotOutcome tx
= ShouldSnapshot SnapshotNumber [tx] -- TODO(AB) : should really be a Set (TxId tx)
| ShouldNotSnapshot NoSnapshotReason
deriving (Eq, Show, Generic)

data NoSnapshotReason
= NotLeader SnapshotNumber
| SnapshotInFlight SnapshotNumber
| NoTransactionsToSnapshot
deriving (Eq, Show, Generic)

isLeader :: HeadParameters -> Party -> SnapshotNumber -> Bool
isLeader HeadParameters{parties} p sn =
case p `elemIndex` parties of
Just i -> ((fromIntegral sn - 1) `mod` length parties) == i
_ -> False

-- | Snapshot emission decider
newSn :: Environment -> HeadParameters -> CoordinatedHeadState tx -> SnapshotOutcome tx
newSn Environment{party} parameters CoordinatedHeadState{confirmedSnapshot, seenSnapshot, seenTxs} =
if
| not (isLeader parameters party nextSn) ->
ShouldNotSnapshot $ NotLeader nextSn
| -- NOTE: This is different than in the spec. If we use seenSn /=
-- confirmedSn here, we implicitly require confirmedSn <= seenSn. Which
-- may be an acceptable invariant, but we have property tests which are
-- more strict right now. Anyhow, we can be more expressive.
snapshotInFlight ->
ShouldNotSnapshot $ SnapshotInFlight nextSn
| null seenTxs ->
ShouldNotSnapshot NoTransactionsToSnapshot
| otherwise ->
ShouldSnapshot nextSn seenTxs
where
nextSn = confirmedSn + 1

snapshotInFlight = case seenSnapshot of
NoSeenSnapshot -> False
LastSeenSnapshot{} -> False
RequestedSnapshot{} -> True
SeenSnapshot{} -> True

Snapshot{number = confirmedSn} = getSnapshot confirmedSnapshot

-- | Emit a snapshot if we are the next snapshot leader. 'Outcome' modifying
-- signature so it can be chained with other 'update' functions.
emitSnapshot :: IsTx tx => Environment -> Outcome tx -> Outcome tx
emitSnapshot env outcome =
case outcome of
NewState (Open OpenState{parameters, coordinatedHeadState, chainState, headId, currentSlot}) ->
case newSn env parameters coordinatedHeadState of
ShouldSnapshot sn txs -> do
let CoordinatedHeadState{seenSnapshot} = coordinatedHeadState
NewState
( Open
OpenState
{ parameters
, coordinatedHeadState =
coordinatedHeadState
{ seenSnapshot =
RequestedSnapshot
{ lastSeen = seenSnapshotNumber seenSnapshot
, requested = sn
}
}
, chainState
, headId
, currentSlot
}
)
`Combined` Effects [NetworkEffect (ReqSn sn (txId <$> txs))]
_ -> outcome
Combined l r -> Combined (emitSnapshot env l) (emitSnapshot env r)
_ -> outcome
59 changes: 39 additions & 20 deletions hydra-node/test/Hydra/BehaviorSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ spec = parallel $ do
withHydraNode bobSk [alice] chain $ \n2 -> do
openHead n1 n2

send n1 (NewTx (aValidTx 42))
send n1 (NewTx $ aValidTx 42)
waitUntil [n1, n2] $ TxValid testHeadId (aValidTx 42)

let snapshot = Snapshot 1 (utxoRefs [1, 2, 42]) [42]
Expand All @@ -268,6 +268,44 @@ spec = parallel $ do
send n1 Close
waitForNext n1 >>= assertHeadIsClosedWith 1

it "snapshots are created as long as transactions to snapshot exist" $
shouldRunInSim $
withSimulatedChainAndNetwork $ \chain ->
withHydraNode aliceSk [bob] chain $ \n1 ->
withHydraNode bobSk [alice] chain $ \n2 -> do
openHead n1 n2

-- Load the "ingest queue" of the head enough to have still
-- pending transactions after a first snapshot request by
-- alice. Note that we are in a deterministic simulation here.
send n1 (NewTx $ aValidTx 40)
send n1 (NewTx $ aValidTx 41)
send n1 (NewTx $ aValidTx 42)

-- Expect alice to create a snapshot from the first requested
-- transaction right away which is the current snapshot policy.
waitUntilMatch [n1, n2] $ \case
SnapshotConfirmed{snapshot = Snapshot{number, confirmed}} ->
number == 1 && confirmed == [40]
_ -> False

-- Expect bob to also snapshot what did "not fit" into the first
-- snapshot.
waitUntilMatch [n1, n2] $ \case
SnapshotConfirmed{snapshot = Snapshot{number, confirmed}} ->
-- NOTE: We sort the confirmed to be clear that the order may
-- be freely picked by the leader.
number == 2 && sort confirmed == [41, 42]
_ -> False

-- As there are no pending transactions and snapshots anymore
-- we expect to continue normally on seeing just another tx.
send n1 (NewTx $ aValidTx 44)
waitUntilMatch [n1, n2] $ \case
SnapshotConfirmed{snapshot = Snapshot{number, confirmed}} ->
number == 3 && confirmed == [44]
_ -> False

it "depending transactions stay pending and are confirmed in order" $
shouldRunInSim $
withSimulatedChainAndNetwork $ \chain ->
Expand Down Expand Up @@ -341,25 +379,6 @@ spec = parallel $ do
TxInvalid{transaction} -> transaction == tx''
_ -> False

it "multiple transactions get snapshotted" $ do
pendingWith "This test is not longer true after recent changes which simplify the snapshot construction."
shouldRunInSim $ do
withSimulatedChainAndNetwork $ \chain ->
withHydraNode aliceSk [bob] chain $ \n1 ->
withHydraNode bobSk [alice] chain $ \n2 -> do
openHead n1 n2

send n1 (NewTx (aValidTx 42))
send n1 (NewTx (aValidTx 43))

waitUntil [n1] $ TxValid testHeadId (aValidTx 42)
waitUntil [n1] $ TxValid testHeadId (aValidTx 43)

let snapshot = Snapshot 1 (utxoRefs [1, 2, 42, 43]) [42, 43]
sigs = aggregate [sign aliceSk snapshot, sign bobSk snapshot]

waitUntil [n1] $ SnapshotConfirmed testHeadId snapshot sigs

it "outputs utxo from confirmed snapshot when client requests it" $
shouldRunInSim $ do
withSimulatedChainAndNetwork $ \chain ->
Expand Down
Loading

0 comments on commit e0ea210

Please sign in to comment.