Skip to content

Commit

Permalink
Merge pull request #243 from eigr/feat/checkpoint-restore
Browse files Browse the repository at this point in the history
Feat. Checkpoint Restore
  • Loading branch information
sleipnir authored Aug 26, 2023
2 parents 5f2f44e + 906971c commit 5423a3c
Show file tree
Hide file tree
Showing 15 changed files with 459 additions and 200 deletions.
1 change: 1 addition & 0 deletions compile-pb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ set -o pipefail
protoc --elixir_out=gen_descriptors=true:./lib/spawn/google/protobuf --proto_path=priv/protos/google/protobuf priv/protos/google/protobuf/any.proto
protoc --elixir_out=gen_descriptors=true:./lib/spawn/actors --proto_path=priv/protos priv/protos/eigr/functions/protocol/actors/actor.proto
protoc --elixir_out=gen_descriptors=true:./lib/spawn/actors --proto_path=priv/protos priv/protos/eigr/functions/protocol/actors/protocol.proto
protoc --elixir_out=gen_descriptors=true:./lib/spawn/actors --proto_path=priv/protos priv/protos/eigr/functions/protocol/actors/state.proto

#protoc --elixir_out=gen_descriptors=true:./lib/spawn/cloudevents --proto_path=priv/protos/io/cloudevents/v1 priv/protos/io/cloudevents/v1/spec.proto
4 changes: 4 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import Config

config :do_it, DoIt.Commfig,
dirname: System.tmp_dir(),
filename: "spawn_cli.json"

# config :spawn_statestores, Statestores.Vault,
# json_library: Jason,
# ciphers: [
Expand Down
4 changes: 4 additions & 0 deletions config/prod.exs
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
import Config

config :do_it, DoIt.Commfig,
dirname: System.tmp_dir(),
filename: "spawn_cli.json"
106 changes: 96 additions & 10 deletions lib/actors/actor/entity/entity.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ defmodule Actors.Actor.Entity do
ActorState
}

alias Eigr.Functions.Protocol.State.Checkpoint
alias Eigr.Functions.Protocol.State.Revision

import Spawn.Utils.Common, only: [return_and_maybe_hibernate: 1]

@default_call_timeout :infinity
@fullsweep_after 10

Expand Down Expand Up @@ -49,7 +54,8 @@ defmodule Actors.Actor.Entity do
defp do_handle_continue(action, state) do
Logger.warning("Unhandled handle_continue for action #{action}")

{:noreply, state, :hibernate}
{:noreply, state}
|> return_and_maybe_hibernate()
end

@impl true
Expand All @@ -62,29 +68,107 @@ defmodule Actors.Actor.Entity do
Invocation.invoke({invocation, opts}, state)

action ->
do_handle_call(action, from, state)
do_handle_defaults(action, from, state)
end
|> parse_packed_response()
end

defp do_handle_call(
defp do_handle_defaults(action, from, state) do
case action do
:get_state ->
do_handle_get_state(action, from, state)

:checkpoint ->
do_handle_checkpoint(action, from, state)

{:restore, checkpoint} ->
do_handle_restore(checkpoint, from, state)
end
end

defp do_handle_checkpoint(
_action,
_from,
%EntityState{
revision: revision,
actor: %Actor{state: actor_state} = _actor
} = state
)
when is_nil(actor_state) do
{:reply, {:ok, %Checkpoint{revision: %Revision{value: revision}}}, state}
|> return_and_maybe_hibernate()
end

defp do_handle_checkpoint(
_action,
_from,
%EntityState{
revision: revision,
actor: %Actor{} = _actor
} = state
) do
revision = revision + 1

case Lifecycle.checkpoint(revision, state) do
{:ok, actor_state, _hash} ->
checkpoint = %Checkpoint{revision: %Revision{value: revision}, state: actor_state}

{:reply, {:ok, checkpoint}, state}
|> return_and_maybe_hibernate()

_ ->
{:reply, :error, state}
|> return_and_maybe_hibernate()
end
end

defp do_handle_restore(
%Checkpoint{revision: %Revision{value: revision}},
_from,
%EntityState{
actor: %Actor{id: %ActorId{} = id} = _actor
} = state
) do
case Lifecycle.get_state(id, revision) do
{:ok, current_state, current_revision, _status, _node} ->
checkpoint = %Checkpoint{
revision: %Revision{value: current_revision},
state: current_state
}

{:reply, {:ok, checkpoint}, current_state}
|> return_and_maybe_hibernate()

_ ->
{:reply, :error, state}
|> return_and_maybe_hibernate()
end

{:reply, {:ok, :not_found}, state}
end

defp do_handle_get_state(
:get_state,
_from,
%EntityState{
actor: %Actor{state: actor_state} = _actor
} = state
)
when is_nil(actor_state),
do: {:reply, {:error, :not_found}, state, :hibernate}
when is_nil(actor_state) do
{:reply, {:error, :not_found}, state}
|> return_and_maybe_hibernate()
end

defp do_handle_call(
defp do_handle_get_state(
:get_state,
_from,
%EntityState{
actor: %Actor{state: %ActorState{} = actor_state} = _actor
} = state
),
do: {:reply, {:ok, actor_state}, state, :hibernate}
) do
{:reply, {:ok, actor_state}, state}
|> return_and_maybe_hibernate()
end

@impl true
def handle_cast(action, state) do
Expand All @@ -104,7 +188,8 @@ defmodule Actors.Actor.Entity do
defp do_handle_cast(action, state) do
Logger.warning("Unhandled handle_cast for action #{action}")

{:noreply, state, :hibernate}
{:noreply, state}
|> return_and_maybe_hibernate()
end

@impl true
Expand Down Expand Up @@ -180,7 +265,8 @@ defmodule Actors.Actor.Entity do
if not is_nil(actor_state),
do: StateManager.save(id, actor_state, revision: revision, status: "UNKNOWN")

{:noreply, state, :hibernate}
{:noreply, state}
|> return_and_maybe_hibernate()
end

@impl true
Expand Down
47 changes: 35 additions & 12 deletions lib/actors/actor/entity/invocation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Actors.Actor.Entity.Invocation do
require Logger
require OpenTelemetry.Tracer, as: Tracer

alias Actors.Actor.Entity.EntityState
alias Actors.Actor.Entity.{EntityState, Lifecycle}
alias Actors.Exceptions.NotAuthorizedException

alias Eigr.Functions.Protocol.Actors.{
Expand Down Expand Up @@ -35,6 +35,8 @@ defmodule Actors.Actor.Entity.Invocation do

alias Spawn.Utils.AnySerializer

import Spawn.Utils.Common, only: [return_and_maybe_hibernate: 1]

@default_actions [
"get",
"Get",
Expand Down Expand Up @@ -150,15 +152,18 @@ defmodule Actors.Actor.Entity.Invocation do
) do
if length(actions) <= 0 do
Logger.warning("Actor [#{actor_name}] has not registered any Actions")
{:noreply, state, :hibernate}

{:noreply, state}
|> return_and_maybe_hibernate()
else
init_action =
Enum.filter(actions, fn cmd -> Enum.member?(@default_init_actions, cmd.name) end)
|> Enum.at(0)

case init_action do
nil ->
{:noreply, state, :hibernate}
{:noreply, state}
|> return_and_maybe_hibernate()

_ ->
interface = get_interface(actor_opts)
Expand Down Expand Up @@ -186,7 +191,8 @@ defmodule Actors.Actor.Entity.Invocation do
{:noreply, new_state}

{:error, _reason, new_state} ->
{:noreply, new_state, :hibernate}
{:noreply, new_state}
|> return_and_maybe_hibernate()
end
end
end
Expand Down Expand Up @@ -242,14 +248,15 @@ defmodule Actors.Actor.Entity.Invocation do
{:ok, "#{inspect(response.updated_context.metadata)}"}
])

build_response(request, response, new_state, opts)
handle_response(request, response, new_state, opts)

{:error, reason, new_state} ->
Tracer.add_event("failure-invocation", [
{:error, "#{inspect(reason)}"}
])

{:reply, {:error, reason}, new_state, :hibernate}
{:reply, {:error, reason}, new_state}
|> return_and_maybe_hibernate()
end
end

Expand Down Expand Up @@ -298,14 +305,30 @@ defmodule Actors.Actor.Entity.Invocation do
}
end

defp build_response(request, response, state, opts) do
case do_response(request, response, state, opts) do
:noreply ->
{:noreply, state}
defp handle_response(
request,
%ActorInvocationResponse{checkpoint: checkpoint} = response,
%EntityState{
revision: revision
} = state,
opts
) do
response =
case do_response(request, response, state, opts) do
:noreply ->
{:noreply, state}
|> return_and_maybe_hibernate()

response ->
{:reply, {:ok, response}, state}
|> return_and_maybe_hibernate()
end

response ->
{:reply, {:ok, response}, state}
if checkpoint do
Lifecycle.checkpoint(revision, state)
end

response
end

defp do_response(
Expand Down
33 changes: 28 additions & 5 deletions lib/actors/actor/entity/lifecycle.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ defmodule Actors.Actor.Entity.Lifecycle do

alias Sidecar.Measurements

import Spawn.Utils.Common, only: [return_and_maybe_hibernate: 1]

@deactivated_status "DEACTIVATED"
@default_deactivate_timeout 10_000
@default_snapshot_timeout 2_000
Expand Down Expand Up @@ -162,6 +164,25 @@ defmodule Actors.Actor.Entity.Lifecycle do

def load_state(state), do: {:noreply, state, {:continue, :call_init_action}}

def checkpoint(revision, %EntityState{
actor:
%Actor{
id: %ActorId{name: name} = id,
state: actor_state
} = actor
}) do
response =
if is_actor_valid?(actor) do
Logger.debug("Doing Actor checkpoint to Actor [#{name}]")

StateManager.save(id, actor_state, revision: revision)
else
{:error, :nothing}
end

response
end

def terminate(reason, %EntityState{
revision: revision,
actor:
Expand All @@ -174,7 +195,7 @@ defmodule Actors.Actor.Entity.Lifecycle do
StateManager.save(id, actor_state, revision: revision, status: @deactivated_status)
end

Logger.debug("Terminating actor #{name} with reason #{inspect(reason)}")
Logger.debug("Terminating Actor [#{name}] with reason #{inspect(reason)}")
end

def snapshot(
Expand Down Expand Up @@ -207,7 +228,8 @@ defmodule Actors.Actor.Entity.Lifecycle do
state
end

{:noreply, state, :hibernate}
{:noreply, state}
|> return_and_maybe_hibernate()
end

def snapshot(
Expand Down Expand Up @@ -265,7 +287,8 @@ defmodule Actors.Actor.Entity.Lifecycle do
new_state
end

{:noreply, state, :hibernate}
{:noreply, state}
|> return_and_maybe_hibernate()
end

def snapshot(state), do: {:noreply, state, :hibernate}
Expand Down Expand Up @@ -295,13 +318,13 @@ defmodule Actors.Actor.Entity.Lifecycle do

_ ->
schedule_deactivate(deactivation_strategy)
{:noreply, state, :hibernate}
{:noreply, state}
end
end

def deactivate(state), do: {:noreply, state, :hibernate}

defp get_state(id, revision) do
def get_state(id, revision) do
initial = StateManager.load(id)

if revision <= 0 do
Expand Down
4 changes: 2 additions & 2 deletions lib/actors/actors.ex
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ defmodule Actors do
actor_system:
%ActorSystem{name: _name, registry: %Registry{actors: actors} = _registry} =
actor_system
} = registration,
} = _registration,
opts
) do
actors
Expand Down Expand Up @@ -139,7 +139,7 @@ defmodule Actors do
@spec spawn_actor(SpawnRequest.t(), any()) :: {:ok, SpawnResponse.t()}
def spawn_actor(spawn, opts \\ [])

def spawn_actor(%SpawnRequest{actors: actors} = spawn, opts) do
def spawn_actor(%SpawnRequest{actors: actors} = _spawn, opts) do
hosts =
Enum.map(actors, fn %ActorId{} = id ->
case ActorRegistry.get_hosts_by_actor(id, parent: true) do
Expand Down
Loading

0 comments on commit 5423a3c

Please sign in to comment.