diff --git a/lib/_generated/spawn/actors/healthcheck.pb.ex b/lib/_generated/spawn/actors/healthcheck.pb.ex index 4fdd2942..1c5f5a3a 100644 --- a/lib/_generated/spawn/actors/healthcheck.pb.ex +++ b/lib/_generated/spawn/actors/healthcheck.pb.ex @@ -107,6 +107,8 @@ defmodule Spawn.Actors.Healthcheck.HealthCheckReply do end defmodule Spawn.Actors.Healthcheck.HealthCheckActor.Service do + @moduledoc false + use GRPC.Service, name: "spawn.actors.healthcheck.HealthCheckActor", protoc_gen_elixir_version: "0.14.0" @@ -717,6 +719,8 @@ defmodule Spawn.Actors.Healthcheck.HealthCheckActor.Service do end defmodule Spawn.Actors.Healthcheck.HealthCheckActor.Stub do + @moduledoc false + use GRPC.Stub, service: Spawn.Actors.Healthcheck.HealthCheckActor.Service end diff --git a/lib/_generated/spawn/actors/protocol.pb.ex b/lib/_generated/spawn/actors/protocol.pb.ex index 6e0a62ac..f61b2375 100644 --- a/lib/_generated/spawn/actors/protocol.pb.ex +++ b/lib/_generated/spawn/actors/protocol.pb.ex @@ -989,6 +989,20 @@ defmodule Spawn.Pipe do json_name: "actionName", proto3_optional: nil, __unknown_fields__: [] + }, + %Google.Protobuf.FieldDescriptorProto{ + name: "register_ref", + extendee: nil, + number: 3, + label: :LABEL_OPTIONAL, + type: :TYPE_STRING, + type_name: nil, + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "registerRef", + proto3_optional: nil, + __unknown_fields__: [] } ], nested_type: [], @@ -1005,6 +1019,7 @@ defmodule Spawn.Pipe do field(:actor, 1, type: :string) field(:action_name, 2, type: :string, json_name: "actionName") + field(:register_ref, 3, type: :string, json_name: "registerRef") end defmodule Spawn.Forward do @@ -1043,6 +1058,20 @@ defmodule Spawn.Forward do json_name: "actionName", proto3_optional: nil, __unknown_fields__: [] + }, + %Google.Protobuf.FieldDescriptorProto{ + name: "register_ref", + extendee: nil, + number: 3, + label: :LABEL_OPTIONAL, + type: :TYPE_STRING, + type_name: nil, + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "registerRef", + proto3_optional: nil, + __unknown_fields__: [] } ], nested_type: [], @@ -1059,6 +1088,7 @@ defmodule Spawn.Forward do field(:actor, 1, type: :string) field(:action_name, 2, type: :string, json_name: "actionName") + field(:register_ref, 3, type: :string, json_name: "registerRef") end defmodule Spawn.Fact.MetadataEntry do diff --git a/lib/actors/actor/caller_consumer.ex b/lib/actors/actor/caller_consumer.ex index ce4dc8e7..8936b11c 100644 --- a/lib/actors/actor/caller_consumer.ex +++ b/lib/actors/actor/caller_consumer.ex @@ -25,7 +25,6 @@ defmodule Actors.Actor.CallerConsumer do ActorSettings, ActorSystem, Registry, - ActorOpts, TimeoutStrategy, ProjectionSettings, ActorDeactivationStrategy, @@ -565,6 +564,16 @@ defmodule Actors.Actor.CallerConsumer do timeout = case metadata["request-timeout"] do nil -> 60_000 + value -> String.to_integer(value) + end + + # when a invoke errors or throws an exception + # we can backoff or not + fail_backoff = + case metadata["fail_backoff"] do + nil -> false + "false" -> false + "true" -> true value -> value end @@ -635,7 +644,9 @@ defmodule Actors.Actor.CallerConsumer do {:halt, result} {:error, :actor_invoke, error} -> - {:halt, {:error, error}} + keep_retrying_action = if fail_backoff, do: :cont, else: :halt + + {keep_retrying_action, {:error, error}} {:error, _msg} = result -> {:cont, result} diff --git a/lib/actors/actor/entity/invocation.ex b/lib/actors/actor/entity/invocation.ex index 06a2934b..896d6401 100644 --- a/lib/actors/actor/entity/invocation.ex +++ b/lib/actors/actor/entity/invocation.ex @@ -73,6 +73,12 @@ defmodule Actors.Actor.Entity.Invocation do name = Map.get(message.metadata, "actor-name") source_action = Map.get(message.metadata, "actor-action") + action_metadata = + case Map.get(message.metadata, "action-metadata") do + nil -> %{} + metadata -> Jason.decode!(metadata) + end + action = actor.settings.projection_settings.subjects |> Enum.find(fn subject -> subject.source_action == source_action end) @@ -82,7 +88,7 @@ defmodule Actors.Actor.Entity.Invocation do async: true, system: %ActorSystem{name: system_name}, actor: %Actor{id: actor.id}, - metadata: message.metadata, + metadata: action_metadata, action_name: action, payload: {:value, Google.Protobuf.Any.decode(message.state)}, caller: %ActorId{name: name, system: system_name, parent: parent} @@ -404,7 +410,7 @@ defmodule Actors.Actor.Entity.Invocation do response_checkpoint(response, checkpoint, revision, state) end - defp is_authorized?(invocation, actions, timers) do + defp is_authorized?(invocation, _actions, _timers) do acl_manager = get_acl_manager() acl_manager.get_policies!() @@ -580,7 +586,7 @@ defmodule Actors.Actor.Entity.Invocation do } = _params ) when is_nil(workflow) or workflow == %{} do - :ok = do_handle_projection(id, request.action_name, settings, state, response) + :ok = do_handle_projection(id, request, settings, state, response) response end @@ -595,12 +601,14 @@ defmodule Actors.Actor.Entity.Invocation do opts: opts } = _params ) do - :ok = do_handle_projection(id, request.action_name, settings, state, response) + :ok = do_handle_projection(id, request, settings, state, response) do_run_workflow(request, response, state, opts) end - defp do_handle_projection(id, action, %{sourceable: true} = _settings, _state, response) do + defp do_handle_projection(id, request, %{sourceable: true} = _settings, _state, response) do + action = request.action_name + stream_name = StreamInitiator.stream_name(id) id_name = String.replace(id.name, ".", "-") @@ -615,19 +623,20 @@ defmodule Actors.Actor.Entity.Invocation do {"Spawn-System", "#{id.system}"}, {"Actor-Parent", "#{id.parent}"}, {"Actor-Name", "#{id.name}"}, - {"Actor-Action", "#{action}"} + {"Actor-Action", "#{action}"}, + {"Action-Metadata", Jason.encode!(request.current_context.metadata)} ] ) end defp do_handle_projection( id, - action, + request, _settings, %EntityState{actor: %Actor{settings: %ActorSettings{kind: :PROJECTION}}} = state, response ) do - if :persistent_term.get("view-#{id.name}-#{action}", false) do + if :persistent_term.get("view-#{id.name}-#{request.action_name}", false) do # no need to persist any state since this is a view only action :ok else @@ -650,7 +659,7 @@ defmodule Actors.Actor.Entity.Invocation do end end - defp do_handle_projection(_id, _action, _settings, _state, _response), do: :ok + defp do_handle_projection(_id, _request, _settings, _state, _response), do: :ok defp do_run_workflow( _request, @@ -671,7 +680,7 @@ defmodule Actors.Actor.Entity.Invocation do opts ) do Tracer.with_span "run-workflow" do - do_side_effects(effects, opts) + do_side_effects(request, effects, opts) do_broadcast(request, broadcast, opts) do_handle_routing(request, response, opts) end @@ -689,7 +698,8 @@ defmodule Actors.Actor.Entity.Invocation do defp do_handle_routing( %ActorInvocation{ - actor: %ActorId{name: caller_actor_name, system: system_name} + actor: %ActorId{name: caller_actor_name, system: system_name}, + current_context: %Context{metadata: metadata} }, %ActorInvocationResponse{ payload: payload, @@ -708,6 +718,7 @@ defmodule Actors.Actor.Entity.Invocation do system: %ActorSystem{name: system_name}, actor: %Actor{id: %ActorId{name: actor_name, system: system_name}}, action_name: cmd, + metadata: metadata, payload: payload, caller: %ActorId{name: caller_actor_name, system: system_name} } @@ -737,7 +748,8 @@ defmodule Actors.Actor.Entity.Invocation do defp do_handle_routing( %ActorInvocation{ actor: %ActorId{name: caller_actor_name, system: system_name}, - payload: payload + payload: payload, + current_context: %Context{metadata: metadata} } = _request, %ActorInvocationResponse{ workflow: @@ -756,6 +768,7 @@ defmodule Actors.Actor.Entity.Invocation do system: %ActorSystem{name: system_name}, actor: %Actor{id: %ActorId{name: actor_name, system: system_name}}, action_name: cmd, + metadata: metadata, payload: payload, caller: %ActorId{name: caller_actor_name, system: system_name} } @@ -810,13 +823,13 @@ defmodule Actors.Actor.Entity.Invocation do :noreply end - def do_side_effects(effects, opts \\ []) + def do_side_effects(request, effects, opts \\ []) - def do_side_effects(effects, _opts) when effects == [] do + def do_side_effects(_request, effects, _opts) when effects == [] do :ok end - def do_side_effects(effects, _opts) when is_list(effects) do + def do_side_effects(request, effects, _opts) when is_list(effects) do Tracer.with_span "handle-side-effects" do try do spawn(fn -> @@ -830,6 +843,9 @@ defmodule Actors.Actor.Entity.Invocation do } = invocation } -> try do + metadata = Map.merge(request.current_context.metadata, invocation.metadata) + invocation = %InvocationRequest{invocation | metadata: metadata} + Actors.invoke(invocation, span_ctx: Tracer.current_span_ctx()) catch error -> diff --git a/lib/spawn/cluster/cluster_resolver.ex b/lib/spawn/cluster/cluster_resolver.ex index 9b6fcc7b..9648b652 100644 --- a/lib/spawn/cluster/cluster_resolver.ex +++ b/lib/spawn/cluster/cluster_resolver.ex @@ -87,20 +87,12 @@ defmodule Spawn.Cluster.ClusterResolver do service = Keyword.fetch!(config, :service) resolver = Keyword.get(config, :resolver, &:inet_res.getbyname(&1, :a)) - IO.inspect(app_name, label: "Using application name ---------------------") - IO.inspect(service, label: "Using service ---------------------") - IO.inspect(resolver, label: "Using resolver ---------------------") - IO.inspect(Node.get_cookie(), label: "Using node cookie ---------------------") - cond do app_name != nil and service != nil -> headless_service = to_charlist(service) - IO.inspect(headless_service, label: "Using headless service ---------------------") - case resolver.(headless_service) do {:ok, {:hostent, _fqdn, [], :inet, _value, addresses}} -> - IO.inspect(addresses, label: "Using addresses ---------------------") parse_response(addresses, app_name) {:error, reason} -> @@ -135,6 +127,5 @@ defmodule Spawn.Cluster.ClusterResolver do |> Enum.map(&:inet_parse.ntoa(&1)) |> Enum.map(&"#{app_name}@#{&1}") |> Enum.map(&String.to_atom(&1)) - |> IO.inspect(label: "Parsed addresses ---------------------") end end diff --git a/priv/protos/spawn/actors/protocol.proto b/priv/protos/spawn/actors/protocol.proto index fcd9edde..e6b208f2 100644 --- a/priv/protos/spawn/actors/protocol.proto +++ b/priv/protos/spawn/actors/protocol.proto @@ -280,6 +280,9 @@ message Pipe { // Action. string action_name = 2; + + // Register ref + string register_ref = 3; } // Sends the input of a action of an Actor to the input of another action of an @@ -291,6 +294,9 @@ message Forward { // Action. string action_name = 2; + + // Register ref + string register_ref = 3; } // Facts are emitted by actions and represent the internal state of the moment diff --git a/spawn_sdk/spawn_sdk/lib/defact.ex b/spawn_sdk/spawn_sdk/lib/defact.ex index 30ed6a22..17686375 100644 --- a/spawn_sdk/spawn_sdk/lib/defact.ex +++ b/spawn_sdk/spawn_sdk/lib/defact.ex @@ -48,37 +48,27 @@ defmodule SpawnSdk.Defact do ) def handle_action({unquote(action_name), payload}, context) do - try do - case {:erlang.fun_info(unquote(block_fn), :arity), unquote(action_name)} do - {{:arity, 1}, _name} -> - unquote(block_fn).(context) - - {{:arity, 2}, action} when action not in ~w(init Init Setup setup) -> - unquote(block_fn).(context, payload) - - {{:arity, arity}, _} -> - raise SpawnSdk.Actor.MalformedActor, - "Invalid callback arity #{arity} needs to be in (1, 2) for action=#{unquote(action_name)}" - end - |> case do - %SpawnSdk.Value{} = value -> - value - - {:reply, %SpawnSdk.Value{} = value} -> - value - - _ -> - raise SpawnSdk.Actor.MalformedActor, - "Return value for action=#{unquote(action_name)} must be a %Value{} struct" - end - rescue - e -> - reraise SpawnSdk.Actor.MalformedActor, - [ - message: "Error in action=#{unquote(action_name)} error=#{inspect(e)}", - exception: e - ], - __STACKTRACE__ + case {:erlang.fun_info(unquote(block_fn), :arity), unquote(action_name)} do + {{:arity, 1}, _name} -> + unquote(block_fn).(context) + + {{:arity, 2}, action} when action not in ~w(init Init Setup setup) -> + unquote(block_fn).(context, payload) + + {{:arity, arity}, _} -> + raise SpawnSdk.Actor.MalformedActor, + "Invalid callback arity #{arity} needs to be in (1, 2) for action=#{unquote(action_name)}" + end + |> case do + %SpawnSdk.Value{} = value -> + value + + {:reply, %SpawnSdk.Value{} = value} -> + value + + _ -> + raise SpawnSdk.Actor.MalformedActor, + "Return value for action=#{unquote(action_name)} must be a %Value{} struct" end end end diff --git a/spawn_sdk/spawn_sdk/lib/flow.ex b/spawn_sdk/spawn_sdk/lib/flow.ex index 4a04cbf4..5af49114 100644 --- a/spawn_sdk/spawn_sdk/lib/flow.ex +++ b/spawn_sdk/spawn_sdk/lib/flow.ex @@ -90,24 +90,29 @@ defmodule SpawnSdk.Flow do the actor that performs the Pipe is free to process another message and the actor that is receiving the Pipe is the one who will respond to the original caller. """ - defstruct actor_name: nil, action: nil + defstruct actor_name: nil, action: nil, register_ref: nil @type t :: %__MODULE__{ actor_name: String.t(), - action: String.t() | atom() + action: String.t() | atom(), + register_ref: String.t() | nil } @type actor_name :: String.t() + @type register_ref :: String.t() | nil + @type action :: String.t() | atom() - @spec to(actor_name(), action()) :: Pipe.t() - def to(actor_name, action) do + @spec to(actor_name :: actor_name(), action :: action(), register_ref :: register_ref()) :: + Forward.t() + def to(actor_name, action, register_ref \\ nil) do action_name = if is_atom(action), do: Atom.to_string(action), else: action %__MODULE__{ actor_name: actor_name, - action: action_name + action: action_name, + register_ref: register_ref } end end @@ -121,24 +126,29 @@ defmodule SpawnSdk.Flow do to process another message and the actor that is receiving the forwarding will respond to the original caller. """ - defstruct actor_name: nil, action: nil + defstruct actor_name: nil, action: nil, register_ref: nil @type t :: %__MODULE__{ actor_name: String.t(), - action: String.t() | atom() + action: String.t() | atom(), + register_ref: String.t() | nil } @type actor_name :: String.t() + @type register_ref :: String.t() | nil + @type action :: String.t() | atom() - @spec to(actor_name(), action()) :: Forward.t() - def to(actor_name, action) do + @spec to(actor_name :: actor_name(), action :: action(), register_ref :: register_ref()) :: + Forward.t() + def to(actor_name, action, register_ref \\ nil) do action_name = if is_atom(action), do: Atom.to_string(action), else: action %__MODULE__{ actor_name: actor_name, - action: action_name + action: action_name, + register_ref: register_ref } end end @@ -150,13 +160,19 @@ defmodule SpawnSdk.Flow do They will "always" be processed asynchronously and any response sent back from the Actor receiving the effect will be ignored by the effector. """ - defstruct actor_name: nil, action: nil, payload: nil, scheduled_to: nil, register_ref: nil + defstruct actor_name: nil, + action: nil, + payload: nil, + scheduled_to: nil, + register_ref: nil, + metadata: nil @type t :: %__MODULE__{ actor_name: String.t(), action: String.t() | atom(), payload: module(), register_ref: String.t() | nil, + metadata: map() | nil, scheduled_to: integer() | nil } @@ -166,6 +182,8 @@ defmodule SpawnSdk.Flow do @type payload :: term() | nil + @type metadata :: map() | nil + @spec of() :: list(SideEffect.t()) def of(), do: [] @@ -184,6 +202,7 @@ defmodule SpawnSdk.Flow do action: action_name, payload: payload, register_ref: register_ref, + metadata: opts[:metadata] || %{}, scheduled_to: parse_scheduled_to(opts[:delay], opts[:scheduled_to]) } end diff --git a/spawn_sdk/spawn_sdk/lib/system/spawn_system.ex b/spawn_sdk/spawn_sdk/lib/system/spawn_system.ex index 8696c0f2..aa1d7930 100644 --- a/spawn_sdk/spawn_sdk/lib/system/spawn_system.ex +++ b/spawn_sdk/spawn_sdk/lib/system/spawn_system.ex @@ -370,14 +370,15 @@ defmodule SpawnSdk.System.SpawnSystem do defp handle_pipe( %SpawnSdk.Value{ - pipe: %SpawnSdk.Flow.Pipe{actor_name: actor_name, action: action} = _pipe + pipe: %SpawnSdk.Flow.Pipe{} = pipe } = _value ) do - cmd = if is_atom(action), do: Atom.to_string(action), else: action + cmd = if is_atom(pipe.action), do: Atom.to_string(pipe.action), else: pipe.action pipe = %Spawn.Pipe{ - actor: actor_name, - action_name: cmd + actor: pipe.actor_name, + action_name: cmd, + register_ref: pipe.register_ref } {:pipe, pipe} @@ -393,14 +394,15 @@ defmodule SpawnSdk.System.SpawnSystem do defp handle_forward( %SpawnSdk.Value{ - forward: %SpawnSdk.Flow.Forward{actor_name: actor_name, action: action} = _forward + forward: %SpawnSdk.Flow.Forward{} = forward } = _value ) do - cmd = if is_atom(action), do: Atom.to_string(action), else: action + cmd = if is_atom(forward.action), do: Atom.to_string(forward.action), else: forward.action forward = %Spawn.Forward{ - actor: actor_name, - action_name: cmd + actor: forward.actor_name, + action_name: cmd, + register_ref: forward.register_ref } {:forward, forward} @@ -438,6 +440,7 @@ defmodule SpawnSdk.System.SpawnSystem do register_ref: effect.register_ref, async: true, caller: %ActorId{name: caller_name, system: system}, + metadata: effect.metadata, scheduled_to: effect.scheduled_to } } diff --git a/spawn_sdk/spawn_sdk/test/actor/actor_test.exs b/spawn_sdk/spawn_sdk/test/actor/actor_test.exs index fa3fbe53..b6e42a9b 100644 --- a/spawn_sdk/spawn_sdk/test/actor/actor_test.exs +++ b/spawn_sdk/spawn_sdk/test/actor/actor_test.exs @@ -87,11 +87,12 @@ defmodule Actor.ActorTest do SideEffect.to( "second_actor", "forward_caller_name", - %MyMessageResponse{data: "first_actor_value"} + %MyMessageResponse{data: "first_actor_value"}, + nil, + metadata: %{"some_meta" => "meta_present"} ) ]) - |> Value.value(%MyMessageResponse{data: "worked_with_effects"}) - |> Value.void() + |> Value.response(%MyMessageResponse{data: "worked_with_effects"}) end defact wrong_state(_ctx) do @@ -206,7 +207,9 @@ defmodule Actor.ActorTest do caller_name = "#{Map.get(ctx.caller || %{}, :name)} as caller to third_actor" Value.of() - |> Value.response(%MyMessageResponse{data: caller_name}) + |> Value.response(%MyMessageResponse{ + data: caller_name <> " " <> Map.get(ctx.metadata, "origin_meta") + }) end defact forward_caller_name(value, %Context{} = _ctx) do @@ -343,7 +346,7 @@ defmodule Actor.ActorTest do delay: 5 ) - Process.sleep(10) + Process.sleep(100) assert SpawnSdk.invoke(dynamic_actor_name, ref: "json_actor_ref", @@ -364,7 +367,7 @@ defmodule Actor.ActorTest do scheduled_at: DateTime.utc_now() |> DateTime.add(5, :second) ) - Process.sleep(10) + Process.sleep(100) assert SpawnSdk.invoke(dynamic_actor_name, ref: "json_actor_ref", @@ -440,10 +443,11 @@ defmodule Actor.ActorTest do ref: "my_actor_ref", system: system, action: "pipe_caller", - payload: payload + payload: payload, + metadata: %{"origin_meta" => "meta_present"} ) - assert %{data: "second_actor as caller to third_actor"} = response + assert %{data: "second_actor as caller to third_actor meta_present"} = response end test "calling a function that sets wrong state type", ctx do