Skip to content

RaftCore - mpsc -> StateMachine #1336

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
drmingdrmer opened this issue Mar 6, 2025 · 11 comments
Open

RaftCore - mpsc -> StateMachine #1336

drmingdrmer opened this issue Mar 6, 2025 · 11 comments

Comments

@drmingdrmer
Copy link
Member

drmingdrmer commented Mar 6, 2025

When using the Stream approach to eliminate memory allocation, the Stream requires a 'static lifetime as it must traverse from the RaftCore task to StateMachine task. This necessitates that the stream maintains a lock on RaftCore.client_resp_channel to get client response senders for the SM. Although the stream can batch-retrieve client response senders, this design may still increase task switching overhead as RaftCore and StateMachine tasks alternately acquire the lock. Is this trade-off acceptable for your implementation?

I think we need to first prepare a better design for streaming. It should in fact reduce switching, since you can stream the entries directly from Raft core instead from here. Here you'd just set up the task (or future) accepting the stream (and potentially cancel it, as needed).

I don't quite get that with 'static lifetime. The stream is just an object somewhere on heap where two endpoints reside in two different tasks. You could also just use a regular mpsc channel, where a pair of (entry, reply_sender) is pushed and the state machine consumer picks it and executes it.

There should be a sending end for the mpsc sender to push new (entry, reply_sender) in to the channel.
I assume the pseudo code for this part looks like:

impl RaftCore {
  fn apply(&mut self, start: u64, end:u64) {
    let handle = self.read_reply_sender_handle(); // (1)
    let (tx,rx) = mpsc::channel();
    spawn(async move || {
      while true {
        let (entry, reply_sender) = handle.read().await;
        tx.send((entry,reply_sender)).await;
      }
    });
    self.sm_handle.send(SMCommand::Apply(new_stream(rx)));
  }
}

impl SMWorker {
  fn loop(mut self) {
    while let Some(sm_cmd) = self.rx.read().await {
        let SMCommand::Apply(stream) = sm_cmd else { /* ... */ };
        while let Some((entry, reply_sender)) = stream.next().await {
          // apply entry and send back reply...
       }
    }
  }
}

If I understand it correctly, at position (1), the handle has to hold a lock on the RaftCore.client_resp_channels to get reply sender in another task. right?

We just need a mechanism to track applied ID, since applying of entries can complete out-of-order. OTOH, except for snapshot handling, does Raft core need to know the applied ID?

The RaftCore need applied ID to

  • update Metrics
  • to inform linearizable read when it is safe to read from state machine.

Originally posted by @schreter in #1334 (review)

Copy link

github-actions bot commented Mar 6, 2025

👋 Thanks for opening this issue!

Get help or engage by:

  • /help : to print help messages.
  • /assignme : to assign this issue to you.

@drmingdrmer
Copy link
Member Author

@schreter Let's talk about it here?

@schreter
Copy link
Collaborator

Actually, we already have an issue talking about changing replication and applying to streams here: #1184.

The current sm::Command::Apply command is batch-oriented, not stream oriented. It gets an array of reply receivers to satisfy after applying log entries, which were pushed into the system when the client writes data. In principle, a new Apply command is pushed to the worker as soon as there are data available, right? However, the SM worker awaits the execution of the SM::apply() before getting further requests, which limits the parallelism.

Maybe the minimal change would be the following:

  • add an interface to start a user-specific stream processor on the SM as a separate command, returning the sending port to which send the requests to apply to the state machine (basically, sm::Command::Apply, which won't go to the SM worker, but rather to the new stream)
  • when starting, give it a handle to something where applied log ID can be announced (in principle, this is not bound to the Apply command, we can announce it at any time and reuse it, so no need to post it with each command)
  • this will translate to sending sm::Reponse::Apply with appropriate log ID to announce to RaftCore
  • i.e., something like fn start_streaming(&mut self, apply_callback: impl Fn(/* applied */ LogId) + 'static) -> Option<Sender<AsyncApplyCommand>> { None }
  • if a stream could be created, instead of sending Apply command to the SM worker, send it directly to async SM worker started via the new method via the second stream (we could also wrap it in a Newtype, so there is just one type of streams from the RaftCore PoV and it will just post to either regular SM stream or the user-specific worker's stream)
  • the reply won't be 1:1 associated with the Apply command, but that's actually irrelevant - we just need to update the applied index; i.e., calling the Fn above will basically inform the RaftCore about applied state
  • some other commands like BuildSnapshot, GetSnapshot, etc. can be also handled via the apply stream in the future (probably not right now)
  • before executing other commands in the SM worker (e.g., BeginReceivingSnapshot), close the async SM worker by closing its input stream and await completion of work (likely we need an additional sync primitive to be passed to or returned from start_streaming)
  • with that, we are "in sync" and know that there is no parallel work going on in the (user-provided) async SM worker

Opinions?

@drmingdrmer
Copy link
Member Author

You're right about the way it works currently, but I don't fully understand the proposal you've described for the new approach to working with the state machine worker and streaming.

In my opinion, we can't have more than one streams connected to the state machine worker because there is only one task inside it, and subscribing to two streams doesn't provide any guarantees about the order of events received. This lack of ordering guarantees could be problematic.

I believe the best approach would be to have only one stream connected to the state machine worker at any given time. The approach should work like this: RaftCore sends a stream to the state machine worker, and when the state machine worker receives this stream, it pauses the main stream and switches to the apply stream until that stream is closed.

This way, if the apply stream is in use, the main stream is suspended. When another command (such as GetSnapshot or something else) needs to be sent to the state machine worker, the apply stream should be shut down first, and then the state machine worker goes back to draining the main stream after processing all events in the apply stream.

And I'm not very sure about in your design how the state machine worker and the RaftStateMachine implementation worked together. There is still only one async task, right? If it is, how does SM worker cooperate with RaftStateMachine? Especially what's the SM::apply() defines in your design?

@schreter
Copy link
Collaborator

schreter commented Apr 2, 2025

I finally got to do some experiments on how the streaming API could look like. Unfortunately, none of them were to my satisfaction.

So let's look at the root issues we have and how we can solve it easier:

The current apply task alternates between reading the log and then applying entries to the state machine and awaits the respective operation. This means that these two operations basically block each other, so the apply cannot progress while reading the log and vice versa.

For example, if we have an operation which potentially takes longer (needs to load a larger object into memory to be modified and written back), we cannot start further operations past this batch in an overlapped way and let that operation complete after the batch is done.

For write-only operations, we cannot even return before awaiting the write of all operations in the batch before returning, otherwise we might report a wrong last-applied index to the Raft core, which might purge logs which may still be needed after the crash.

Again, the proper solution would be a stream-oriented interface for reading the log and writing data to the state machine and connecting these streams somehow, so the state machine can execute requests overlapped and report on operation completion via some callback.

However, initially, it would be possible to modify the RaftStateMachine::apply() interface similar to what we have on the RaftLogStorage::append() - pass a callback to notify about the completion of the batch (or about an error). With this, the state machine doesn't need to await the completion of the asynchronous processing (which involves I/O), instead, it can immediately return and continue processing in the background. In the meantime, further log batch can be read, while the state machine is busy doing its I/O.

There are two complications here compared to log append:

  • The state machine needs to send a reply to the client. I.e., we must pass a wrapper around ResponderOf<C> alongside each entry to be able to send the response to the client. But that should not be a problem, right?
  • We have also other operations, which require persistence of the data, especially build snapshot, install snapshot and begin receiving snapshot (get snapshot just reads the last snapshot state, so no issue). These operations need all previous apply() commands to be completed before executing. Again, this should be fairly unproblematic to implement, since we can count number of not-yet-confirmed apply() calls and await the notifications via the completion callback which was passed to apply(). Only then, the snapshot-related operation can proceed. Since these operations are relatively infrequent, it would be a good compromise between optimal usage of the I/O resources and keeping the implementation of the SM worker simple.

With this, it is also possible to write a default implementation that delegates to the legacy method and then sends results and calls the callback.

In the next step, we can take a look at reading the log, since even with this concept, it would still be awaiting batches instead of overlapping I/O for multiple batches.

What do you think?

@drmingdrmer
Copy link
Member Author

It seems the state machine needs a unified stream API: all requests (including applying log entries or building snapshots) should flow through a single stream/channel. This approach satisfies the fundamental requirement of serializing all requests.

enum Request {
  Apply{entry: LogEntry, response_tx: Sender<_>},
  BuildSnapshot{ tx: Sender<Snapshot>},
  // ...
}

trait RaftStateMachine {
  async fn handle_request(Request);
}

What about this?

@schreter
Copy link
Collaborator

schreter commented Apr 3, 2025

Yes, from API perspective, that's also a good option. Basically, that will boil down to implementing SM worker by the user instead of directly in openraft...

@drmingdrmer
Copy link
Member Author

Right. Otherwise users would still need to manage the ordering between applying log entries from one stream and building snapshots that arrive through other channels. This creates essentially the same complexity as handling all requests in a single stream...

@schreter
Copy link
Collaborator

schreter commented Apr 4, 2025

OK, so I assume, you first want to experiment a bit how such an API could be implemented? I'll also try to do some more experiments next week (hopefully).

Basically, it means remapping of the stream coming from the Raft core to the SM to a stream of commands for the SM and instead of looping internally, just pass the mapped requests to the SM directly. I.e., move the implementation of the SM worker adapted to work on top of the mapped requests as the default-implemented run_state_machine() method on the SM, right?

The log reader would be then also directly accessed in the new worker method, i.e., the implementation can then overlap log reading as well.

@drmingdrmer
Copy link
Member Author

move the implementation of the SM worker adapted to work on top of the mapped requests as the default-implemented run_state_machine() method on the SM, right?

Exactly, there would be a new version RaftStateMachine trait to do this 🤔

@schreter
Copy link
Collaborator

I pushed a proposal with (working) basis for discussion in #1355.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants