Skip to content

Fail backoff and fail metadata #419

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 24, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/_generated/spawn/actors/healthcheck.pb.ex
Original file line number Diff line number Diff line change
@@ -107,6 +107,8 @@ defmodule Spawn.Actors.Healthcheck.HealthCheckReply do
end

defmodule Spawn.Actors.Healthcheck.HealthCheckActor.Service do
@moduledoc false

use GRPC.Service,
name: "spawn.actors.healthcheck.HealthCheckActor",
protoc_gen_elixir_version: "0.14.0"
@@ -717,6 +719,8 @@ defmodule Spawn.Actors.Healthcheck.HealthCheckActor.Service do
end

defmodule Spawn.Actors.Healthcheck.HealthCheckActor.Stub do
@moduledoc false

use GRPC.Stub, service: Spawn.Actors.Healthcheck.HealthCheckActor.Service
end

30 changes: 30 additions & 0 deletions lib/_generated/spawn/actors/protocol.pb.ex
Original file line number Diff line number Diff line change
@@ -989,6 +989,20 @@ defmodule Spawn.Pipe do
json_name: "actionName",
proto3_optional: nil,
__unknown_fields__: []
},
%Google.Protobuf.FieldDescriptorProto{
name: "register_ref",
extendee: nil,
number: 3,
label: :LABEL_OPTIONAL,
type: :TYPE_STRING,
type_name: nil,
default_value: nil,
options: nil,
oneof_index: nil,
json_name: "registerRef",
proto3_optional: nil,
__unknown_fields__: []
}
],
nested_type: [],
@@ -1005,6 +1019,7 @@ defmodule Spawn.Pipe do

field(:actor, 1, type: :string)
field(:action_name, 2, type: :string, json_name: "actionName")
field(:register_ref, 3, type: :string, json_name: "registerRef")
end

defmodule Spawn.Forward do
@@ -1043,6 +1058,20 @@ defmodule Spawn.Forward do
json_name: "actionName",
proto3_optional: nil,
__unknown_fields__: []
},
%Google.Protobuf.FieldDescriptorProto{
name: "register_ref",
extendee: nil,
number: 3,
label: :LABEL_OPTIONAL,
type: :TYPE_STRING,
type_name: nil,
default_value: nil,
options: nil,
oneof_index: nil,
json_name: "registerRef",
proto3_optional: nil,
__unknown_fields__: []
}
],
nested_type: [],
@@ -1059,6 +1088,7 @@ defmodule Spawn.Forward do

field(:actor, 1, type: :string)
field(:action_name, 2, type: :string, json_name: "actionName")
field(:register_ref, 3, type: :string, json_name: "registerRef")
end

defmodule Spawn.Fact.MetadataEntry do
15 changes: 13 additions & 2 deletions lib/actors/actor/caller_consumer.ex
Original file line number Diff line number Diff line change
@@ -25,7 +25,6 @@ defmodule Actors.Actor.CallerConsumer do
ActorSettings,
ActorSystem,
Registry,
ActorOpts,
TimeoutStrategy,
ProjectionSettings,
ActorDeactivationStrategy,
@@ -565,6 +564,16 @@ defmodule Actors.Actor.CallerConsumer do
timeout =
case metadata["request-timeout"] do
nil -> 60_000
value -> String.to_integer(value)
end

# when a invoke errors or throws an exception
# we can backoff or not
fail_backoff =
case metadata["fail_backoff"] do
nil -> false
"false" -> false
"true" -> true
value -> value
end

@@ -635,7 +644,9 @@ defmodule Actors.Actor.CallerConsumer do
{:halt, result}

{:error, :actor_invoke, error} ->
{:halt, {:error, error}}
keep_retrying_action = if fail_backoff, do: :cont, else: :halt

{keep_retrying_action, {:error, error}}

{:error, _msg} = result ->
{:cont, result}
46 changes: 31 additions & 15 deletions lib/actors/actor/entity/invocation.ex
Original file line number Diff line number Diff line change
@@ -73,6 +73,12 @@ defmodule Actors.Actor.Entity.Invocation do
name = Map.get(message.metadata, "actor-name")
source_action = Map.get(message.metadata, "actor-action")

action_metadata =
case Map.get(message.metadata, "action-metadata") do
nil -> %{}
metadata -> Jason.decode!(metadata)
end

action =
actor.settings.projection_settings.subjects
|> Enum.find(fn subject -> subject.source_action == source_action end)
@@ -82,7 +88,7 @@ defmodule Actors.Actor.Entity.Invocation do
async: true,
system: %ActorSystem{name: system_name},
actor: %Actor{id: actor.id},
metadata: message.metadata,
metadata: action_metadata,
action_name: action,
payload: {:value, Google.Protobuf.Any.decode(message.state)},
caller: %ActorId{name: name, system: system_name, parent: parent}
@@ -404,7 +410,7 @@ defmodule Actors.Actor.Entity.Invocation do
response_checkpoint(response, checkpoint, revision, state)
end

defp is_authorized?(invocation, actions, timers) do
defp is_authorized?(invocation, _actions, _timers) do
acl_manager = get_acl_manager()

acl_manager.get_policies!()
@@ -580,7 +586,7 @@ defmodule Actors.Actor.Entity.Invocation do
} = _params
)
when is_nil(workflow) or workflow == %{} do
:ok = do_handle_projection(id, request.action_name, settings, state, response)
:ok = do_handle_projection(id, request, settings, state, response)

response
end
@@ -595,12 +601,14 @@ defmodule Actors.Actor.Entity.Invocation do
opts: opts
} = _params
) do
:ok = do_handle_projection(id, request.action_name, settings, state, response)
:ok = do_handle_projection(id, request, settings, state, response)

do_run_workflow(request, response, state, opts)
end

defp do_handle_projection(id, action, %{sourceable: true} = _settings, _state, response) do
defp do_handle_projection(id, request, %{sourceable: true} = _settings, _state, response) do
action = request.action_name

stream_name = StreamInitiator.stream_name(id)
id_name = String.replace(id.name, ".", "-")

@@ -615,19 +623,20 @@ defmodule Actors.Actor.Entity.Invocation do
{"Spawn-System", "#{id.system}"},
{"Actor-Parent", "#{id.parent}"},
{"Actor-Name", "#{id.name}"},
{"Actor-Action", "#{action}"}
{"Actor-Action", "#{action}"},
{"Action-Metadata", Jason.encode!(request.current_context.metadata)}
]
)
end

defp do_handle_projection(
id,
action,
request,
_settings,
%EntityState{actor: %Actor{settings: %ActorSettings{kind: :PROJECTION}}} = state,
response
) do
if :persistent_term.get("view-#{id.name}-#{action}", false) do
if :persistent_term.get("view-#{id.name}-#{request.action_name}", false) do
# no need to persist any state since this is a view only action
:ok
else
@@ -650,7 +659,7 @@ defmodule Actors.Actor.Entity.Invocation do
end
end

defp do_handle_projection(_id, _action, _settings, _state, _response), do: :ok
defp do_handle_projection(_id, _request, _settings, _state, _response), do: :ok

defp do_run_workflow(
_request,
@@ -671,7 +680,7 @@ defmodule Actors.Actor.Entity.Invocation do
opts
) do
Tracer.with_span "run-workflow" do
do_side_effects(effects, opts)
do_side_effects(request, effects, opts)
do_broadcast(request, broadcast, opts)
do_handle_routing(request, response, opts)
end
@@ -689,7 +698,8 @@ defmodule Actors.Actor.Entity.Invocation do

defp do_handle_routing(
%ActorInvocation{
actor: %ActorId{name: caller_actor_name, system: system_name}
actor: %ActorId{name: caller_actor_name, system: system_name},
current_context: %Context{metadata: metadata}
},
%ActorInvocationResponse{
payload: payload,
@@ -708,6 +718,7 @@ defmodule Actors.Actor.Entity.Invocation do
system: %ActorSystem{name: system_name},
actor: %Actor{id: %ActorId{name: actor_name, system: system_name}},
action_name: cmd,
metadata: metadata,
payload: payload,
caller: %ActorId{name: caller_actor_name, system: system_name}
}
@@ -737,7 +748,8 @@ defmodule Actors.Actor.Entity.Invocation do
defp do_handle_routing(
%ActorInvocation{
actor: %ActorId{name: caller_actor_name, system: system_name},
payload: payload
payload: payload,
current_context: %Context{metadata: metadata}
} = _request,
%ActorInvocationResponse{
workflow:
@@ -756,6 +768,7 @@ defmodule Actors.Actor.Entity.Invocation do
system: %ActorSystem{name: system_name},
actor: %Actor{id: %ActorId{name: actor_name, system: system_name}},
action_name: cmd,
metadata: metadata,
payload: payload,
caller: %ActorId{name: caller_actor_name, system: system_name}
}
@@ -810,13 +823,13 @@ defmodule Actors.Actor.Entity.Invocation do
:noreply
end

def do_side_effects(effects, opts \\ [])
def do_side_effects(request, effects, opts \\ [])

def do_side_effects(effects, _opts) when effects == [] do
def do_side_effects(_request, effects, _opts) when effects == [] do
:ok
end

def do_side_effects(effects, _opts) when is_list(effects) do
def do_side_effects(request, effects, _opts) when is_list(effects) do
Tracer.with_span "handle-side-effects" do
try do
spawn(fn ->
@@ -830,6 +843,9 @@ defmodule Actors.Actor.Entity.Invocation do
} = invocation
} ->
try do
metadata = Map.merge(request.current_context.metadata, invocation.metadata)
invocation = %InvocationRequest{invocation | metadata: metadata}

Actors.invoke(invocation, span_ctx: Tracer.current_span_ctx())
catch
error ->
9 changes: 0 additions & 9 deletions lib/spawn/cluster/cluster_resolver.ex
Original file line number Diff line number Diff line change
@@ -87,20 +87,12 @@ defmodule Spawn.Cluster.ClusterResolver do
service = Keyword.fetch!(config, :service)
resolver = Keyword.get(config, :resolver, &:inet_res.getbyname(&1, :a))

IO.inspect(app_name, label: "Using application name ---------------------")
IO.inspect(service, label: "Using service ---------------------")
IO.inspect(resolver, label: "Using resolver ---------------------")
IO.inspect(Node.get_cookie(), label: "Using node cookie ---------------------")

cond do
app_name != nil and service != nil ->
headless_service = to_charlist(service)

IO.inspect(headless_service, label: "Using headless service ---------------------")

case resolver.(headless_service) do
{:ok, {:hostent, _fqdn, [], :inet, _value, addresses}} ->
IO.inspect(addresses, label: "Using addresses ---------------------")
parse_response(addresses, app_name)

{:error, reason} ->
@@ -135,6 +127,5 @@ defmodule Spawn.Cluster.ClusterResolver do
|> Enum.map(&:inet_parse.ntoa(&1))
|> Enum.map(&"#{app_name}@#{&1}")
|> Enum.map(&String.to_atom(&1))
|> IO.inspect(label: "Parsed addresses ---------------------")
end
end
6 changes: 6 additions & 0 deletions priv/protos/spawn/actors/protocol.proto
Original file line number Diff line number Diff line change
@@ -280,6 +280,9 @@ message Pipe {

// Action.
string action_name = 2;

// Register ref
string register_ref = 3;
}

// Sends the input of a action of an Actor to the input of another action of an
@@ -291,6 +294,9 @@ message Forward {

// Action.
string action_name = 2;

// Register ref
string register_ref = 3;
}

// Facts are emitted by actions and represent the internal state of the moment
52 changes: 21 additions & 31 deletions spawn_sdk/spawn_sdk/lib/defact.ex
Original file line number Diff line number Diff line change
@@ -48,37 +48,27 @@ defmodule SpawnSdk.Defact do
)

def handle_action({unquote(action_name), payload}, context) do
try do
case {:erlang.fun_info(unquote(block_fn), :arity), unquote(action_name)} do
{{:arity, 1}, _name} ->
unquote(block_fn).(context)

{{:arity, 2}, action} when action not in ~w(init Init Setup setup) ->
unquote(block_fn).(context, payload)

{{:arity, arity}, _} ->
raise SpawnSdk.Actor.MalformedActor,
"Invalid callback arity #{arity} needs to be in (1, 2) for action=#{unquote(action_name)}"
end
|> case do
%SpawnSdk.Value{} = value ->
value

{:reply, %SpawnSdk.Value{} = value} ->
value

_ ->
raise SpawnSdk.Actor.MalformedActor,
"Return value for action=#{unquote(action_name)} must be a %Value{} struct"
end
rescue
e ->
reraise SpawnSdk.Actor.MalformedActor,
[
message: "Error in action=#{unquote(action_name)} error=#{inspect(e)}",
exception: e
],
__STACKTRACE__
case {:erlang.fun_info(unquote(block_fn), :arity), unquote(action_name)} do
{{:arity, 1}, _name} ->
unquote(block_fn).(context)

{{:arity, 2}, action} when action not in ~w(init Init Setup setup) ->
unquote(block_fn).(context, payload)

{{:arity, arity}, _} ->
raise SpawnSdk.Actor.MalformedActor,
"Invalid callback arity #{arity} needs to be in (1, 2) for action=#{unquote(action_name)}"
end
|> case do
%SpawnSdk.Value{} = value ->
value

{:reply, %SpawnSdk.Value{} = value} ->
value

_ ->
raise SpawnSdk.Actor.MalformedActor,
"Return value for action=#{unquote(action_name)} must be a %Value{} struct"
end
end
end
Loading

Unchanged files with check annotations Beta

alias Actors.Config.PersistentTermConfig, as: Config
alias Spawn.Actors.ActorViewOption
alias Protobuf.Protoc.Generator.Util

Check warning on line 18 in lib/sidecar/grpc/code_generator.ex

GitHub Actions / Build and Test OTP 25 / Elixir 1.15

unused alias Util

Check warning on line 18 in lib/sidecar/grpc/code_generator.ex

GitHub Actions / Build and Test OTP 25 / Elixir 1.15

unused alias Util
alias Mix.Tasks.Protobuf.Generate
alias Spawn.Utils.AnySerializer
defp do_invoke_host(payload, state) do
payload
|> ActorInvocation.encode()
|> @http_node_client.invoke_host_actor()

Check warning on line 74 in lib/actors/actor/interface/http.ex

GitHub Actions / Build and Test OTP 25 / Elixir 1.15

NodeClientMock.invoke_host_actor/1 is undefined (module NodeClientMock is not available or is yet to be defined)
|> case do
{:ok, %Finch.Response{body: ""}} ->
Logger.error("User Function Actor response Invocation body is empty")