For ra
to do anything useful you need to provide it with a state machine
implementation that solves a particular problem.
To implement a state machine that will be replicated using Raft and ra
, implement the
ra_machine
behaviour. There are two mandatory callbacks that need to be
implemented:
-callback init(Conf :: machine_init_args()) -> state().
-callback 'apply'(command_meta_data(), command(), State) ->
{State, reply(), effects() | effect()} | {State, reply()}.
init/1
returns the initial state when a new instance of the state machine
is created. It takes an arbitrary map of configuration parameters.
apply/3
is the primary function that is called for every command in the
raft log. It takes a meta data map containing the raft index and term (more on that later),
a command and the
current state and returns the new state, effects and a reply that can be returned
to the caller if they issued a synchronous call (see: ra:process_command/2
).
There are also some optional callbacks that advanced state machines may choose to implement.
This example builds a simple key-value store that supports
write
and read
(or put and get) operations.
Create a new erlang module named ra_kv
using the ra_machine
behaviour and
export the init/1
and apply/3
functions:
-module(ra_kv).
-behaviour(ra_machine).
-export([init/1, apply/3]).
First we are going to define a type spec for the state and commands that we will use. The state is simply a map of arbitrary keys and values. We can store anything.
-opaque state() :: #{term() => term()}.
-type ra_kv_command() :: {write, Key :: term(), Value :: term()} |
{read, Key :: term()}.
To implement init/1
simply return an empty map as the initial state of our kv store.
init(_Config) -> #{}.
To implement the apply/3
function we need to handle each of the commands
we support.
apply(_Meta, {write, Key, Value}, State) ->
{maps:put(Key, Value, State), ok, _Effects = []};
apply(_Meta, {read, Key}, State) ->
Reply = maps:get(Key, State, undefined),
{State, Reply, _Effects = []}.
For the {write, Key, Value}
command we simply put the key and value into the
map and return the new state, pass through the list of effects and an ok
return value.
For {read, Key}
we additionally return the value of the key or undefined
if
it does not exist so that a waiting caller can obtain the value.
And that is it! The state machine is finished.
To actually run this we need to configure a ra cluster to use the ra_kv
state machine and start it. The simplest way is to use the ra:start_cluster/3
function. It takes a ClusterName that can be a binary, string or atom,
a machine configuration and a list of servers that define the initial
set of members.
start() ->
%% the initial cluster members
Members = [{ra_kv1, node()}, {ra_kv2, node()}, {ra_kv3, node()}],
%% an arbitrary cluster name
ClusterName = <<"ra_kv">>,
%% the config passed to `init/1`, must be a `map`
Config = #{},
%% the machine configuration
Machine = {module, ?MODULE, Config},
%% ensure ra is started
application:ensure_all_started(ra),
%% start a cluster instance running the `ra_kv` machine
ra:start_cluster(ClusterName, Machine, Members).
If you then start an erlang shell with make shell
or similar and call
ra_kv:start/0
you should hopefully be returned with something like:
{ok,[{ra_kv3,nonode@nohost},
{ra_kv2,nonode@nohost},
{ra_kv1,nonode@nohost}],
[]}
Indicating that all servers in the ra
cluster were successfully started. The
last element of the tuple would contain the servers that were not successfully
started. If a quorum of servers could not be started the function would return
and error.
Now you can write your first value into the cluster.
2> ra:process_command(ra_kv1, {write, k, v}).
{ok, ok, {ra_kv1,nonode@nohost}}
3> ra:process_command(ra_kv1, {read, k}).
{ok, v, {ra_kv1,nonode@nohost}}
4> ra:process_command(ra_kv1, {write, k, v2}).
{ok, ok, {ra_kv1,nonode@nohost}}
5> ra:process_command(ra_kv1, {read, k}).
{ok, v2, {ra_kv1,nonode@nohost}}
ra:process_command/2
blocks until the command has achieved consensus
and has been applied to the state machine on the leader server. It is the simplest
way to interact with ra
but also the one with the highest latency.
To read values consistently we have no choice other than to use it.
The return tuple has either the raft index and term the entry was added to the
raft log or the return value optionally returned by the state machine. The
{read, Key}
command returns the current value of the key.
We have already added the start/0
function to start a local ra cluster. It would
make sense to abstract interactions with the kv store behind a nicer interface
than calling ra:process_command/2
directly.
write(Key, Value) ->
%% it would make sense to cache this to avoid redirection costs when this
%% server happens not to be the current leader
Server = ra_kv1,
case ra:process_command(Server, {write, Key, Value}) of
{ok, _, _} ->
ok;
Err ->
Err
end.
read(Key) ->
Server = ra_kv1,
case ra:process_command(Server, {read, Key}) of
{ok, Value, _} ->
{ok, Value};
Err ->
Err
end.
Effects are used to separate the state machine logic from the side effects it wants
to take inside it's environment. Each call to the apply/3
function can return
a list of effects for the leader to realise. This includes sending messages,
setting up server and process monitors and calling arbitrary functions.
Effects should be a list sorted by execution order, i.e. the effect to be actioned first should be at the head of the list.
Only the leader that first applies an entry will attempt the effect.
Followers process the same set of commands but simply throw away any effects
returned by the state machine unless specific effect provide the local
option.
Spec | Executed on |
---|---|
{send_msg, pid(), Msg :: term()} |
leader |
{send_msg, pid(), Msg :: term(), [local]} |
on member local to pid() else leader |
{monitor | demonitor, process | node, pid() | node()} |
leader |
{mod_call, mfa()} |
leader |
{timer, Name :: term(), Time :: non_neg_integer() | infinity} |
leader |
{append, term()} |
leader |
{append, term(), ra_server:command_reply_mode()} |
leader |
{log, [ra_index()], fun(([user_command()]) -> effects())} |
leader |
{log, [ra_index()], fun(([user_command()]) -> effects()), {local, node()}} |
on member local to node() else leader |
{log_ext, [ra_index()], fun(([ra_log:read_plan()]) -> effects()), {local, node()}} |
on member local to node() else leader |
{release_cursor | checkpoint, ra_index(), term()} |
all members |
{aux, term()} |
every member |
The {send_msg, pid(), Msg :: term()}
effect asynchronously sends a message
to the specified
pid
. Note that ra
uses erlang:send/3
with the no_connect
and no_suspend
options which are the least reliable message sending options. It does this so
that a state machine send_msg
effect will never block the main ra
process.
To ensure message reliability normal Automatic Repeat Query (ARQ)
like protocols between the state machine and the receiver should be implemented
if needed.
The {send_msg, pid(), Msg :: term(), Options :: list()}
effect can be used to further
control how the effect is executed via options. The options are:
ra_event
: the message will be wrapped in ara_event
structure like{ra_event, LeaderId, Msg}
. Thera_event
format can be changed by providing ara_event_formatter
server configuration.cast
: the message will be wrappen in a standard{$gen_cast, Msg}
wrapper to conform to OTPgen
conventions.local
: if the targetpid()
is local to any member of the Ra cluster the delivery of the message will be done from the local member even if this is a follower. This way a network hop can be avoided. If the targetpid()
is on a node without a Ra member it will be sent from the leader.
Use {monitor, process | node, pid() | node()}
to ask the ra
leader to
monitor a process or node. If ra
receives a DOWN
for a process it
is monitoring it will commit a {down, pid(), term()}
command to the log that
the state machine needs to handle. If it detects a monitored node as down or up
it will commit a {nodeup | nodedown, node()}
command.
Use {demonitor, process | node, pid() | node()}
to stop monitoring a process
or a node.
All monitors are invalidated when the leader changes. State machines should
re-issue monitor effects when becoming leader using the state_enter/2
callback.
Use the {mod_call, module(), function(), Args :: [term()]}
to call an arbitrary
function. Care need to be taken not to block the ra
process whilst doing so.
It is recommended that expensive operations are done in another process.
The mod_call
effect is useful for e.g. updating an ETS table of committed entries
or similar.
The {timer, Name :: term(), Time :: non_neg_integer() | infinity}
effects asks the Ra leader
to maintain a timer on behalf of the state machine and commit a timeout
command
when the timer triggers. If setting the time to infinity
, the timer will not be started
and any running timer with same name will be cancelled.
The timer is relative and setting another timer with the same name before the current timer runs out results in the current timer being reset.
All timers are invalidated when the leader changes. State machines should
re-issue timer effects when becoming leader using the state_enter/2
callback.
Use {log, Indexes :: [ra_index()], fun(([user_command()]) -> effects()}
to read
commands from the log from the specified indexes and return a list of effects.
Effectively this effect transforms log entries into effects.
Potential use cases could be when a command contains large binary data and you don't want to keep this in memory but load it on demand when needed for a side-effect.
This is an advanced feature and will only work as long as the command is still
in the log. If a release_cursor
has been emitted with an index higher than this,
the command may no longer be in the log and the function will not be called.
There is currently no facility for reading partial data from a snapshot.
The {release_cursor, RaftIndex, MachineState}
effect can be used to give Ra cluster members a hint to trigger a snapshot.
This effect, when emitted, is evaluated on all nodes and not just the leader.
It is not guaranteed that a snapshot will be taken. A decision to take a snapshot or to delay it is taken using a number of internal Ra state factors. The goal is to minimise disk I/O activity when possible.
Checkpoints are nearly the same concept as snapshots. Snapshotting truncates
the log up to the snapshot's index, which might be undesirable for machines
which read from the log with the {log, Indexes, Fun}
effect mentioned above.
The {checkpoint, RaftIndex, MachineState}
effect can be used as a hint to
trigger a checkpoint. Like snapshotting, this effect is evaluated on all nodes
and when a checkpoint is taken, the machine state is saved to disk and can be
used for recovery when the machine restarts. A checkpoint being written does
not trigger any log truncation though.
The {release_cursor, RaftIndex}
effect can then be used to promote any
existing checkpoint older than or equal to RaftIndex
into a proper snapshot,
and any log entries older than the checkpoint's index are then truncated.
These two effects are intended for machines that use the {log, Indexes, Fun}
effect and can substantially improve machine recovery time compared to
snapshotting alone, especially when the machine needs to keep old log entries
around for a long time.
Apart from the mandatory callbacks Ra supports some optional callbacks
-
state_enter(ra_server:ra_state() | eol, state()) -> effects().
This callback allows the ra machine impl to react to state changes in the ra server. The most common use of this callback is to re-issue monitor effects when entering the leader state as these are transient and when the leader changes they need to be issued again. This callback is called on all members in a cluster whenever they change their local state.
-
tick(TimeMs :: milliseconds(), state()) -> effects().
This callback is called periodically the interval of which is controlled by the
tick_interval
server configuration. This callback can be used to trigger periodic actions. -
init_aux/1
-
handle_aux/5
-callback init_aux(Name :: atom()) -> term(). -callback handle_aux(ra_server:ra_state(), {call, From :: from()} | cast, Command :: term(), AuxState, IntState) -> {reply, Reply :: term(), AuxState, IntState} | {reply, Reply :: term(), AuxState, IntState, effects()} | {no_reply, AuxState, IntState} | {no_reply, AuxState, IntState, effects()} when AuxState :: term(), IntState :: ra_aux:internal_state().
These two callbacks allow each server to maintain a local state machine in addition to the replicated and consistent state machine. They can be interacted with two ways:
- Using the
ra:aux_*
APIs to cast or call into the aux machine. - By emitting
aux
effects from the state machine.
Aux machines allow implementors a lot of flexibility that isn't otherwise offered by the more restrictive and deterministic state machine itself.
The
ra_aux
module can be used to access internal Ra server state information such as the current members, the state machine state itself or even reading from the log. - Using the
-
overview(state()) -> map()
Use this to return an overvciew map the state machine which is returned when using e.g.
sys:get_status/1
-
version/0
,which_module/1
See the next section on State Machine Versioning
It is eventually necessary to make changes to the state machine code. Any changes to a state machine that would result in a different end state when the state is re-calculated from the log of entries (as is done when restarting a ra server) should be considered breaking.
As Ra state machines need to be deterministic any changes to the logic inside
the apply/3
function
needs to be enabled at the same index on all members of a Ra cluster.
Ra considers all state machines versioned starting with version 0. State machines
that need to be updated with breaking changes need to implement the optional
versioning parts of the ra_machine
behaviour:
-type version() :: non_neg_integer().
-callback version() -> pos_integer().
-callback which_module(version()) -> module().
version/0
returns the current version which is an integer that is
higher than any previously used version number. Whenever a breaking change is
made this should be incremented.
which_module/1
maps a version to the module implementing it. This allows
developers to optionally keep entire modules for old versions instead of trying
to handle multiple versions in the same module.
E.g. when moving from version 0 of my_machine
to version 1:
-
Copy and rename the
my_machine
module tomy_machine_v0
-
Implement the breaking changes in the original module and bump the version.
version() -> 1.
which_module(1) -> my_machine;
which_module(0) -> my_machine_v0.
This would ensure that any entries added to the log are applied against the active machine version at the time they were added, leading to a deterministic outcome.
For smaller (but still breaking) changes that can be handled in the original
module it is also possible to switch based on the machine_version
key
included in the meta data passed to apply/3
.
NB: when using the latter approach combined with state upgrades the init/1
callback needs to return the v0 version of the state to ensure the state upgrade
code works as expected. It is recommended that if large change to the state
record needs to be made to use the multiple modules approach.
New versions are enabled whenever there is a quorum of members with a higher version and one of them is elected leader. The leader will commit the new version to the log and each follower will move to the new version when this log entry is applied. Followers that do not yet have the new version available will receive log entries from the leader and update their logs but will not apply log entries. When they are upgraded and have the new version, all outstanding log entries will be applied. In practical terms this means that Ra nodes can be upgraded one by one.
In order to be upgradeable, the state machine implementation will need to handle
the version bump in the form of a command that is passed to the apply/3
callback:
{machine_version, OldVersion, NewVersion}
. This provides an
opportunity to transform the state data into a new form, if needed. Note that the version
bump may be for several versions so it may be necessary to handle multiple
state transformations.