Skip to content

Commit

Permalink
Merge pull request #1351 from SundaeSwap-finance/SB-1352-persistence-…
Browse files Browse the repository at this point in the history
…types

EventSource and EventSink abstractions for Hydra Extensibility
  • Loading branch information
ch1bo authored Mar 18, 2024
2 parents 65e15b2 + 724f6e8 commit 89f8fb5
Show file tree
Hide file tree
Showing 25 changed files with 914 additions and 376 deletions.
11 changes: 9 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,17 @@ changes.
- Add support for `Conway` in `hydra-chain-observer`.

- **BREAKING** Change to the `hydra-node` logs, monitoring and removal of `log-filter` executable:
- We renamed the `Event` data types to `Input` and consequently log items like `BeginEvent` to `BeginInput`.
- In course of this, we also removed the `log-filter` executable as nobody is actively using it and we recommend using other off-the-shelf utilities to manipulate structured JSON logs (`jq` is already quite powerful).
- Renamed the `Event` data types to `Input` and consequently log items like `BeginEvent` to `BeginInput`.
- Changed structure of `LogicOutcome` entries.
- Added node-level log entry when an input was `DroppedFromQueue`.
- In course of this, the `log-filter` executable was removed as nobody is actively using it and other off-the-shelf utilities to manipulate structured JSON logs (`jq` is already quite powerful) are recommended.
- Renamed prometheus metric `hydra_head_events -> hydra_head_inputs`.

- Introduce `EventSource` and `EventSink` interfaces in `hydra-node`:
- These handles can now be used as "extension points" to make the `hydra-node` store and load its state differently or expose `StateEvent`s to other, external services.
- Internal refactoring of persistence mechanism as event source and sink in a backward-compatible way.
- More details can be found in [ADR21](https://hydra.family/head-protocol/adr/21/)

## [0.15.0] - 2024-01-18

- Tested with `cardano-node 8.7.3` and `cardano-cli 8.17.0.0`.
Expand Down
10 changes: 5 additions & 5 deletions docs/adr/2023-11-07_029-event-source-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ slug: 29
title: |
29. EventSource & EventSink abstractions
authors: [cardenaso11, quantumplation, ch1bo]
tags: [Draft]
tags: [Accepted]
---

## Status
Draft
Accepted

## Context

Expand Down Expand Up @@ -42,12 +42,12 @@ Draft
```hs
data HydraNode tx m = HydraNode
{ -- ...
, eventSource :: EventSource (StateChanged tx) m
, eventSinks :: [EventSink (StateChanged tx) m]
, eventSource :: EventSource (StateEvent tx) m
, eventSinks :: [EventSink (StateEvent tx) m]
}
```

* The `hydra-node` will load events and __hydra_te its `HeadState` using `getEvents` of the single `eventSource`.
* The `hydra-node` will load events and `hydrate` its `HeadState` using `getEvents` of the single `eventSource`.

* The `stepHydraNode` main loop does call `putEvent` on all `eventSinks` in sequence. Any failure will make the `hydra-node` process terminate and require a restart.

Expand Down
2 changes: 1 addition & 1 deletion hydra-cluster/src/Hydra/Cluster/Scenarios.hs
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ threeNodesNoErrorsOnOpen tracer tmpDir node@RunningNode{nodeSocket} hydraScripts

let contestationPeriod = UnsafeContestationPeriod 2
let hydraTracer = contramap FromHydraNode tracer
withHydraCluster hydraTracer tmpDir nodeSocket 0 cardanoKeys hydraKeys hydraScriptsTxId contestationPeriod $ \(leader :| rest) -> do
withHydraCluster hydraTracer tmpDir nodeSocket 1 cardanoKeys hydraKeys hydraScriptsTxId contestationPeriod $ \(leader :| rest) -> do
let clients = leader : rest
waitForNodesConnected hydraTracer 20 clients

Expand Down
4 changes: 4 additions & 0 deletions hydra-node/hydra-node.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ library
Hydra.Chain.Offline
Hydra.ContestationPeriod
Hydra.Crypto
Hydra.Environment
Hydra.Events
Hydra.Events.FileBased
Hydra.HeadId
Hydra.HeadLogic
Hydra.HeadLogic.Error
Expand Down Expand Up @@ -290,6 +293,7 @@ test-suite tests
Hydra.Chain.Direct.WalletSpec
Hydra.ContestationPeriodSpec
Hydra.CryptoSpec
Hydra.Events.FileBasedSpec
Hydra.FireForgetSpec
Hydra.HeadLogicSnapshotSpec
Hydra.HeadLogicSpec
Expand Down
27 changes: 23 additions & 4 deletions hydra-node/json-schemas/logs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,25 @@ definitions:
The Party emitting the log entry.
outcome:
$ref: "logs.yaml#/definitions/Outcome"
- title: DroppedFromQueue
description: >-
An input event has been dropped from the input queue as its
time-to-live ran out.
type: object
additionalProperties: false
required:
- tag
- inputId
- input
properties:
tag:
type: string
enum: ["DroppedFromQueue"]
inputId:
type: integer
minimum: 0
input:
"$ref": "logs.yaml#/definitions/Input"
- title: LoadedState
description: >-
Loaded state events from persistence.
Expand Down Expand Up @@ -1909,15 +1928,15 @@ definitions:
additionalProperties: false
required:
- tag
- events
- stateChanges
- effects
description: >-
Continue with the given state update events and side effects.
properties:
tag:
type: string
enum: ["Continue"]
events:
stateChanges:
type: array
items:
type: object
Expand All @@ -1933,7 +1952,7 @@ definitions:
required:
- tag
- reason
- events
- stateChanges
description: >-
Wait for some condition to be met with optional state updates.
properties:
Expand All @@ -1943,7 +1962,7 @@ definitions:
reason:
type: object
$ref: "logs.yaml#/definitions/WaitReason"
events:
stateChanges:
type: array
items:
type: object
Expand Down
6 changes: 6 additions & 0 deletions hydra-node/src/Hydra/Chain.hs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import Hydra.Cardano.Api (
Witness,
)
import Hydra.ContestationPeriod (ContestationPeriod)
import Hydra.Environment (Environment (..))
import Hydra.HeadId (HeadId, HeadSeed)
import Hydra.Ledger (ChainSlot, IsTx, UTxOType)
import Hydra.OnChainId (OnChainId)
Expand Down Expand Up @@ -60,6 +61,11 @@ instance Arbitrary HeadParameters where
dedupParties HeadParameters{contestationPeriod, parties} =
HeadParameters{contestationPeriod, parties = nub parties}

-- | Make 'HeadParameters' that are consistent with the given 'Environment'.
mkHeadParameters :: Environment -> HeadParameters
mkHeadParameters Environment{party, otherParties, contestationPeriod} =
HeadParameters{contestationPeriod, parties = party : otherParties}

-- | Data type used to post transactions on chain. It holds everything to
-- construct corresponding Head protocol transactions.
data PostChainTx tx
Expand Down
36 changes: 36 additions & 0 deletions hydra-node/src/Hydra/Environment.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
module Hydra.Environment where

import Hydra.Prelude

import Hydra.ContestationPeriod (ContestationPeriod)
import Hydra.Crypto (HydraKey, SigningKey)
import Hydra.OnChainId (OnChainId)
import Hydra.Party (Party, deriveParty)

data Environment = Environment
{ party :: Party
-- ^ This is the p_i from the paper
, -- XXX: In the long run we would not want to keep the signing key in memory,
-- i.e. have an 'Effect' for signing or so.
signingKey :: SigningKey HydraKey
, otherParties :: [Party]
, -- XXX: Improve naming
participants :: [OnChainId]
, contestationPeriod :: ContestationPeriod
}
deriving stock (Show, Eq)

instance Arbitrary Environment where
arbitrary = do
signingKey <- arbitrary
otherParties <- arbitrary
participants <- arbitrary
contestationPeriod <- arbitrary
pure $
Environment
{ signingKey
, party = deriveParty signingKey
, otherParties
, contestationPeriod
, participants
}
67 changes: 67 additions & 0 deletions hydra-node/src/Hydra/Events.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
-- | This module defines the types and functions for creating 'EventSource' and
-- 'EventSink' instances and is intended to be used as an extension point.
--
-- A single 'EventSource' and zero or more 'EventSink' handles are used by the
-- main 'HydraNode' handle to load and send out events.
--
-- See 'Hydra.Events.FileBased' for an example implementation and
-- 'Hydra.Events.FileBasedSpec' for the corresponding test suite.
--
-- Custom implementations should be located under Hydra.Events to avoid
-- conflicts.
module Hydra.Events where

import Hydra.Prelude

import Hydra.Chain (IsChainState)
import Hydra.HeadLogic.Outcome (StateChanged)

type EventId = Word64

class HasEventId a where
getEventId :: a -> EventId

instance HasEventId (EventId, a) where
getEventId = fst

newtype EventSource e m = EventSource
{ getEvents :: HasEventId e => m [e]
-- ^ Retrieve all events from the event source.
}

newtype EventSink e m = EventSink
{ putEvent :: HasEventId e => e -> m ()
-- ^ Send a single event to the event sink.
}

-- | Put a list of events to a list of event sinks in a round-robin fashion.
putEventsToSinks :: (Monad m, HasEventId e) => [EventSink e m] -> [e] -> m ()
putEventsToSinks sinks events =
forM_ events $ \event ->
forM_ sinks $ \sink ->
putEvent sink event

-- * State change events as used by Hydra.Node

-- | A state change event with an event id that is the common entity to be
-- loaded from an 'EventSource' and sent to 'EventSink's.
data StateEvent tx = StateEvent
{ eventId :: EventId
, stateChanged :: StateChanged tx
}
deriving (Generic)

instance HasEventId (StateEvent tx) where
getEventId = eventId

deriving instance IsChainState tx => Show (StateEvent tx)
deriving instance IsChainState tx => Eq (StateEvent tx)
deriving instance IsChainState tx => ToJSON (StateEvent tx)
deriving instance IsChainState tx => FromJSON (StateEvent tx)

instance IsChainState tx => Arbitrary (StateEvent tx) where
arbitrary = genericArbitrary
shrink = genericShrink

genStateEvent :: StateChanged tx -> Gen (StateEvent tx)
genStateEvent sc = StateEvent <$> arbitrary <*> pure sc
86 changes: 86 additions & 0 deletions hydra-node/src/Hydra/Events/FileBased.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
-- | A file-based event source and sink using JSON encoding.
--
-- This serves as an example of how to create an 'EventSource' and 'EventSink'.
module Hydra.Events.FileBased where

import Hydra.Prelude

import Control.Concurrent.Class.MonadSTM (newTVarIO, writeTVar)
import Hydra.Chain (IsChainState)
import Hydra.Events (EventSink (..), EventSource (..), StateEvent (..))
import Hydra.HeadLogic.Outcome (StateChanged)
import Hydra.Persistence (PersistenceIncremental (..))

-- | A basic file based event source and sink defined using an
-- 'PersistenceIncremental' handle.
--
-- The complexity in this implementation mostly stems from the fact that we want
-- to be backward-compatible with the old, plain format of storing
-- 'StateChanged' items directly to disk using 'PersistenceIncremental'.
--
-- If any 'Legacy StateChanged' items are discovered, a running index is used
-- for the 'eventId', while the 'New StateEvent' values are just stored as is.
--
-- A new implementation for an 'EventSource' with a compatible 'EventSink' could
-- be defined more generically with constraints:
--
-- (ToJSON e, FromJSON e, HasEventId) e => (EventSource e m, EventSink e m)
eventPairFromPersistenceIncremental ::
(IsChainState tx, MonadSTM m) =>
PersistenceIncremental (PersistedStateChange tx) m ->
m (EventSource (StateEvent tx) m, EventSink (StateEvent tx) m)
eventPairFromPersistenceIncremental PersistenceIncremental{append, loadAll} = do
eventIdV <- newTVarIO Nothing
let
getLastSeenEventId = readTVar eventIdV

setLastSeenEventId StateEvent{eventId} = do
writeTVar eventIdV (Just eventId)

getNextEventId =
maybe 0 (+ 1) <$> readTVar eventIdV

-- Keep track of the last seen event id when loading
getEvents = do
items <- loadAll
atomically . forM items $ \i -> do
event <- case i of
New e -> pure e
Legacy sc -> do
eventId <- getNextEventId
pure $ StateEvent eventId sc

setLastSeenEventId event
pure event

-- Filter events that are already stored
putEvent e@StateEvent{eventId} = do
atomically getLastSeenEventId >>= \case
Nothing -> store e
Just lastSeenEventId
| eventId > lastSeenEventId -> store e
| otherwise -> pure ()

store e = do
append (New e)
atomically $ setLastSeenEventId e

pure (EventSource{getEvents}, EventSink{putEvent})

-- | Internal data type used by 'createJSONFileEventSourceAndSink' to be
-- compatible with plain usage of 'PersistenceIncrementa' using plain
-- 'StateChanged' items to the new 'StateEvent' persisted items.
data PersistedStateChange tx
= Legacy (StateChanged tx)
| New (StateEvent tx)
deriving stock (Generic, Show, Eq)

instance IsChainState tx => ToJSON (PersistedStateChange tx) where
toJSON = \case
Legacy sc -> toJSON sc
New e -> toJSON e

instance IsChainState tx => FromJSON (PersistedStateChange tx) where
parseJSON v =
New <$> parseJSON v
<|> Legacy <$> parseJSON v
Loading

0 comments on commit 89f8fb5

Please sign in to comment.