Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventSource and EventSink abstractions for Hydra Extensibility #1351

Merged

Conversation

Quantumplation
Copy link
Contributor

@Quantumplation Quantumplation commented Mar 12, 2024

Implements EventSource and EventSink abstractions as outlined in this ADR.

A node hydrates from an EventSource.

Events are directed to any number of EventSinks, which is responsible for emitting them to external services, persisting them to disk, etc.

This abstracts further the notion of persistence, and allows forks of the hydra node to implement their own integrations, such as writing to a kinesis stream, an S3 bucket, or even some kind of decentralized storage provider.

While there is currently no plan to make this directly pluggable within the Hydra node in the near future, it does pin down the API between the hydra node and it's forks, so that any ongoing changes to the hydra node which don't interfere with this interface have very little impact on the upstream fork and can be easily merged.


  • CHANGELOG updated or not needed
  • Documentation updated or not needed
  • Haddocks updated or not needed
  • No new TODOs introduced or explained herafter

@ch1bo ch1bo changed the title InputSource and InputSink abstractions for Hydra Extensibility EventSource and EventSink abstractions for Hydra Extensibility Mar 13, 2024
@ch1bo ch1bo linked an issue Mar 13, 2024 that may be closed by this pull request
3 tasks
@ch1bo ch1bo marked this pull request as draft March 13, 2024 07:12
@ch1bo ch1bo force-pushed the SB-1352-persistence-types branch 2 times, most recently from 34087c0 to 9ac3f77 Compare March 13, 2024 07:35
hydra-node/src/Hydra/Node.hs Outdated Show resolved Hide resolved
@ch1bo ch1bo force-pushed the SB-1352-persistence-types branch 7 times, most recently from 5b3f2a4 to 255f843 Compare March 13, 2024 18:42
@ch1bo ch1bo force-pushed the SB-1352-persistence-types branch from 255f843 to 6070d07 Compare March 13, 2024 18:50
@ch1bo ch1bo marked this pull request as ready for review March 13, 2024 18:50
@ch1bo ch1bo requested a review from a team March 13, 2024 18:50
@ch1bo ch1bo mentioned this pull request Mar 13, 2024
3 tasks
@ch1bo ch1bo removed a link to an issue Mar 13, 2024
3 tasks
@ch1bo ch1bo linked an issue Mar 13, 2024 that may be closed by this pull request
3 tasks
@ch1bo
Copy link
Collaborator

ch1bo commented Mar 13, 2024

@Quantumplation @cardenaso11 Tested backward compatibility of <persistence dir>/state manually.

Using current master stores items in StateChanged format:

λ grep -B 2 'Initialized' devnet/persistence/alice/state
{"chainSlot":806,"tag":"TickObserved"}
{"chainSlot":807,"tag":"TickObserved"}
{"chainState":{"recordedAt":{"blockHash":"1035d142a15e9e38359c83c654c20a62436de536a33f1535916635d6819bd96d","slot":807,"tag":"ChainPoint"},"spendableUTxO":{"5d00bcf353d0b8a133933923cd204f8caf07f2b9c3ec9f28017d957fda987a32#0":{"address":"addr_test1wzrtl72m5g8f68gmxjye54kcdwave8ldnxfxpvnaejfdz2q6602qc","datum":null,"inlineDatum":{"constructor":0,"fields":[{"constructor":0,"fields":[{"int":10000}]},{"list":[{"bytes":"b37aabd81024c043f53a069c91e51a5b52e4ea399ae17ee1fe3cb9c44db707eb"}]},{"bytes":"c0624320fd0eb6c97621302bacca0e85a76059c88784ef8a5499c70c"},{"constructor":0,"fields":[{"constructor":0,"fields":[{"bytes":"0b75608268e4b0950b9b86bdbdbf9b609d24b60394080e347f149eec75859e2c"}]},{"int":0}]}]},"inlineDatumhash":"a6836b96a615aad82fbcf78f0729a42ae5683df6b4f7b34b66aef51dd7aa781e","referenceScript":null,"value":{"c0624320fd0eb6c97621302bacca0e85a76059c88784ef8a5499c70c":{"4879647261486561645631":1},"lovelace":1603320}},"5d00bcf353d0b8a133933923cd204f8caf07f2b9c3ec9f28017d957fda987a32#1":{"address":"addr_test1wz7v72jrpsqkhjtql0e3kqnfgqgu6wvayrdg3q42e8fnvygjcw8yh","datum":null,"inlineDatum":{"bytes":"c0624320fd0eb6c97621302bacca0e85a76059c88784ef8a5499c70c"},"inlineDatumhash":"32398937e82b17146d69325132015ef90304329e0b4ea12f0fb04bd882ea6d33","referenceScript":null,"value":{"c0624320fd0eb6c97621302bacca0e85a76059c88784ef8a5499c70c":{"f8a68cd18e59a6ace848155a0e967af64f4d00cf8acee8adc95a6b0d":1},"lovelace":1293000}},"5d00bcf353d0b8a133933923cd204f8caf07f2b9c3ec9f28017d957fda987a32#2":{"address":"addr_test1vru2drx33ev6dt8gfq245r5k0tmy7ngqe79va69de9dxkrg09c7d3","datum":null,"datumhash":null,"inlineDatum":null,"referenceScript":null,"value":{"lovelace":25195583}}}},"headId":"c0624320fd0eb6c97621302bacca0e85a76059c88784ef8a5499c70c","headSeed":"2230623735363038323638653462303935306239623836626462646266396236303964323462363033393430383065333437663134396565633735383539653263233022","parameters":{"contestationPeriod":10,"parties":[{"vkey":"b37aabd81024c043f53a069c91e51a5b52e4ea399ae17ee1fe3cb9c44db707eb"}]},"tag":"HeadInitialized"}

Starting this version of the node then converts items to the new format:

{"eventId":21128,"stateChanged":{"chainSlot":12118,"tag":"TickObserved"}}
{"eventId":21129,"stateChanged":{"chainSlot":12119,"tag":"TickObserved"}}
{"eventId":21130,"stateChanged":{"chainState":{"recordedAt":{"blockHash":"b45056b7a2b348c82c4dd0b6839e5e767d40927e048782d1cf6cb8bb96503572","slot":12119,"tag":"ChainPoint"},"spendableUTxO":{"13ca9e380fdaebec6c4ea860292a4a61d88ec1e70a6b56eaf479600d125d3b62#0":{"address":"addr_test1wzrtl72m5g8f68gmxjye54kcdwave8ldnxfxpvnaejfdz2q6602qc","datum":null,"inlineDatum":{"constructor":2,"fields":[{"list":[{"bytes":"b37aabd81024c043f53a069c91e51a5b52e4ea399ae17ee1fe3cb9c44db707eb"}]},{"int":3},{"bytes":"35f82870e185b1782fddda45990006f4677a60681e389e60609f47ba2f109978"},{"int":1710357674800},{"constructor":0,"fields":[{"int":10000}]},{"bytes":"c0624320fd0eb6c97621302bacca0e85a76059c88784ef8a5499c70c"},{"list":[]}]},"inlineDatumhash":"32d1d6a1217ed08b1108b25e70387c2ebd4e76e2d89c76c30c85879d84d62147","referenceScript":null,"value":{"c0624320fd0eb6c97621302bacca0e85a76059c88784ef8a5499c70c":{"4879647261486561645631":1,"f8a68cd18e59a6ace848155a0e967af64f4d00cf8acee8adc95a6b0d":1},"lovelace":102896320}},"13ca9e380fdaebec6c4ea860292a4a61d88ec1e70a6b56eaf479600d125d3b62#1":{"address":"addr_test1vru2drx33ev6dt8gfq245r5k0tmy7ngqe79va69de9dxkrg09c7d3","datum":null,"datumhash":null,"inlineDatum":null,"referenceScript":null,"value":{"lovelace":20025208}}}},"contestationDeadline":"2024-03-13T19:21:14.8Z","tag":"HeadClosed"}}

So this means, the current implementation does not override old entries in the persistence. If we want this, the current implementation of eventPairFromPersistenceIncremental would need to change and cannot be done on any PersistenceIncremental.

…entSink.\n\nIt's not clear yet if we can have the statechange ID in the StateChanged type as a substitute for tracking the current stateChangeID in the node\nBut for now it works to get the node's stateChangedID into the disk-persistence eventSink's putEvent', where we can compare it against the last persisted stateChange\nThis works for the particular disk-based eventSink where we *do have* exactly-once, but in ex kinesis poc or redis or something, it can be the key for at-least-once dict

lint
cardenaso11 and others added 22 commits March 18, 2024 18:01
We can define an event source and event sink from the
PersistenceIncremental.
Having a dedicated module allows easier import in forks of hydra and
hopefully serves as a better extension points (with less breakage).
We kept the EventSource/EventSink very abstract to make implementations
not realy on the internas of the actual data type used. However, an
event source / sink will need at least identify individual events to
tell them apart, e.g. to deduplicate them in memory.
Current idea: Keep notes where extensions are possible so we don't
change things by accident breaking external/custom code of users.
Also restore usage of persistenceIncremental in Hydra.Network.Reliability

We do not want to update this module (yet) with a different means for persistence.
By making the step-wise construction of HydraNode monadic (although not
specifically needed) we can simplify call-sites in the NodeSpec.

Also merge loadStateEventSource into hydrate function.
This is unfortunately very "white-boxy" and the testing code needs to
know that the generated events must be using parameters consistent with
the environment.

The alternative would be to move checkHeadState outside of hydrate, but
then 'WetHydraNode's can exist without this check happening.
This is generic dropping is needed on the Node-level (or
InputQueue-level) to handle some cases that are generated in the new
NodeSpec (which uses more property based testing now).
We do not want to change the HeadLogic signature nor add event ids to
the StateChanged values. Instead, keeping track of the next event id
should be orthogonal to the main protocol logic.
…urce/sink

These ensure that the new EventSource/Eventsink interface is also
backward compatible to directly using the PersistenceIncremental with
StateChanged items.
This implementation and its tests will serve as an example for an
EventSource and EventSink.
@ch1bo ch1bo added this to the 0.16.0 milestone Mar 18, 2024
@ch1bo ch1bo force-pushed the SB-1352-persistence-types branch from 5779a5c to 724f6e8 Compare March 18, 2024 17:04
@ch1bo ch1bo removed a link to an issue Mar 18, 2024
3 tasks
@ch1bo ch1bo enabled auto-merge March 18, 2024 17:04
@ch1bo ch1bo merged commit 89f8fb5 into cardano-scaling:master Mar 18, 2024
18 checks passed
@ch1bo ch1bo linked an issue Mar 18, 2024 that may be closed by this pull request
3 tasks
@Quantumplation Quantumplation deleted the SB-1352-persistence-types branch March 18, 2024 17:57
hydra-node/src/Hydra/Environment.hs Show resolved Hide resolved
hydra-node/src/Hydra/Events.hs Show resolved Hide resolved
hydra-node/test/Hydra/BehaviorSpec.hs Show resolved Hide resolved
hydra-node/test/Test/Hydra/Fixture.hs Show resolved Hide resolved

-- | Convert a 'DraftHydraNode' to a 'HydraNode' by providing mock implementations.
notConnect :: MonadThrow m => DraftHydraNode SimpleTx m -> m (HydraNode SimpleTx m)
notConnect =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe mockConnect?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also a good name. I do assume though that this is a mere "Could rename" suggestion?

events <- atomically . forM stateChanges $ \stateChanged -> do
eventId <- getNextEventId
pure StateEvent{eventId, stateChanged}
putEventsToSinks eventSinks events
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the only part that makes me noise.

do we really want to have the node crashing if a sink is not available?

cant we have a better strategy here?

maybe there is a plan for the future, can we elaborate?

Copy link
Collaborator

@ch1bo ch1bo Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been defined in the ADR

The stepHydraNode main loop does call putEvent on all eventSinks in sequence. Any failure will make the hydra-node process terminate and require a restart.

and does not present a difference to like it was before. If the persistence failed to write to disk, it would bring down the whole node.

do we really want to have the node crashing if a sink is not available?

What's the alternative? We can't continue if a sink failed to process an event.

hydra-node/src/Hydra/Node.hs Show resolved Hide resolved
hydra-node/test/Hydra/Events/FileBasedSpec.hs Show resolved Hide resolved
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Streaming Plugins
5 participants