Skip to content

Commit

Permalink
pass action metadata to projection as well
Browse files Browse the repository at this point in the history
  • Loading branch information
eliasdarruda committed Jan 24, 2025
1 parent c072a8b commit 6b98c27
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 13 deletions.
4 changes: 4 additions & 0 deletions lib/_generated/spawn/actors/healthcheck.pb.ex
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ defmodule Spawn.Actors.Healthcheck.HealthCheckReply do
end

defmodule Spawn.Actors.Healthcheck.HealthCheckActor.Service do
@moduledoc false

use GRPC.Service,
name: "spawn.actors.healthcheck.HealthCheckActor",
protoc_gen_elixir_version: "0.14.0"
Expand Down Expand Up @@ -717,6 +719,8 @@ defmodule Spawn.Actors.Healthcheck.HealthCheckActor.Service do
end

defmodule Spawn.Actors.Healthcheck.HealthCheckActor.Stub do
@moduledoc false

use GRPC.Stub, service: Spawn.Actors.Healthcheck.HealthCheckActor.Service
end

Expand Down
30 changes: 30 additions & 0 deletions lib/_generated/spawn/actors/protocol.pb.ex
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,20 @@ defmodule Spawn.Pipe do
json_name: "actionName",
proto3_optional: nil,
__unknown_fields__: []
},
%Google.Protobuf.FieldDescriptorProto{
name: "register_ref",
extendee: nil,
number: 3,
label: :LABEL_OPTIONAL,
type: :TYPE_STRING,
type_name: nil,
default_value: nil,
options: nil,
oneof_index: nil,
json_name: "registerRef",
proto3_optional: nil,
__unknown_fields__: []
}
],
nested_type: [],
Expand All @@ -1005,6 +1019,7 @@ defmodule Spawn.Pipe do

field(:actor, 1, type: :string)
field(:action_name, 2, type: :string, json_name: "actionName")
field(:register_ref, 3, type: :string, json_name: "registerRef")
end

defmodule Spawn.Forward do
Expand Down Expand Up @@ -1043,6 +1058,20 @@ defmodule Spawn.Forward do
json_name: "actionName",
proto3_optional: nil,
__unknown_fields__: []
},
%Google.Protobuf.FieldDescriptorProto{
name: "register_ref",
extendee: nil,
number: 3,
label: :LABEL_OPTIONAL,
type: :TYPE_STRING,
type_name: nil,
default_value: nil,
options: nil,
oneof_index: nil,
json_name: "registerRef",
proto3_optional: nil,
__unknown_fields__: []
}
],
nested_type: [],
Expand All @@ -1059,6 +1088,7 @@ defmodule Spawn.Forward do

field(:actor, 1, type: :string)
field(:action_name, 2, type: :string, json_name: "actionName")
field(:register_ref, 3, type: :string, json_name: "registerRef")
end

defmodule Spawn.Fact.MetadataEntry do
Expand Down
19 changes: 14 additions & 5 deletions lib/actors/actor/entity/invocation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ defmodule Actors.Actor.Entity.Invocation do
name = Map.get(message.metadata, "actor-name")
source_action = Map.get(message.metadata, "actor-action")

action_metadata =
case Map.get(message.metadata, "action-metadata") do
nil -> %{}
metadata -> Jason.decode!(metadata)
end

action =
actor.settings.projection_settings.subjects
|> Enum.find(fn subject -> subject.source_action == source_action end)
Expand All @@ -82,7 +88,7 @@ defmodule Actors.Actor.Entity.Invocation do
async: true,
system: %ActorSystem{name: system_name},
actor: %Actor{id: actor.id},
metadata: message.metadata,
metadata: action_metadata,
action_name: action,
payload: {:value, Google.Protobuf.Any.decode(message.state)},
caller: %ActorId{name: name, system: system_name, parent: parent}
Expand Down Expand Up @@ -580,7 +586,7 @@ defmodule Actors.Actor.Entity.Invocation do
} = _params
)
when is_nil(workflow) or workflow == %{} do
:ok = do_handle_projection(id, request.action_name, settings, state, response)
:ok = do_handle_projection(id, request, settings, state, response)

response
end
Expand All @@ -595,12 +601,14 @@ defmodule Actors.Actor.Entity.Invocation do
opts: opts
} = _params
) do
:ok = do_handle_projection(id, request.action_name, settings, state, response)
:ok = do_handle_projection(id, request, settings, state, response)

do_run_workflow(request, response, state, opts)
end

defp do_handle_projection(id, action, %{sourceable: true} = _settings, _state, response) do
defp do_handle_projection(id, request, %{sourceable: true} = _settings, _state, response) do
action = request.action_name

stream_name = StreamInitiator.stream_name(id)
id_name = String.replace(id.name, ".", "-")

Expand All @@ -615,7 +623,8 @@ defmodule Actors.Actor.Entity.Invocation do
{"Spawn-System", "#{id.system}"},
{"Actor-Parent", "#{id.parent}"},
{"Actor-Name", "#{id.name}"},
{"Actor-Action", "#{action}"}
{"Actor-Action", "#{action}"},
{"Action-Metadata", Jason.encode!(request.metadata)}
]
)
end
Expand Down
6 changes: 6 additions & 0 deletions priv/protos/spawn/actors/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,9 @@ message Pipe {

// Action.
string action_name = 2;

// Register ref
string register_ref = 3;
}

// Sends the input of a action of an Actor to the input of another action of an
Expand All @@ -291,6 +294,9 @@ message Forward {

// Action.
string action_name = 2;

// Register ref
string register_ref = 3;
}

// Facts are emitted by actions and represent the internal state of the moment
Expand Down
18 changes: 10 additions & 8 deletions spawn_sdk/spawn_sdk/lib/system/spawn_system.ex
Original file line number Diff line number Diff line change
Expand Up @@ -370,14 +370,15 @@ defmodule SpawnSdk.System.SpawnSystem do

defp handle_pipe(
%SpawnSdk.Value{
pipe: %SpawnSdk.Flow.Pipe{actor_name: actor_name, action: action} = _pipe
pipe: %SpawnSdk.Flow.Pipe{} = pipe
} = _value
) do
cmd = if is_atom(action), do: Atom.to_string(action), else: action
cmd = if is_atom(pipe.action), do: Atom.to_string(pipe.action), else: pipe.action

pipe = %Spawn.Pipe{
actor: actor_name,
action_name: cmd
actor: pipe.actor_name,
action_name: cmd,
register_ref: pipe.register_ref
}

{:pipe, pipe}
Expand All @@ -393,14 +394,15 @@ defmodule SpawnSdk.System.SpawnSystem do

defp handle_forward(
%SpawnSdk.Value{
forward: %SpawnSdk.Flow.Forward{actor_name: actor_name, action: action} = _forward
forward: %SpawnSdk.Flow.Forward{} = forward
} = _value
) do
cmd = if is_atom(action), do: Atom.to_string(action), else: action
cmd = if is_atom(forward.action), do: Atom.to_string(forward.action), else: forward.action

forward = %Spawn.Forward{
actor: actor_name,
action_name: cmd
actor: forward.actor_name,
action_name: cmd,
register_ref: forward.register_ref
}

{:forward, forward}
Expand Down

0 comments on commit 6b98c27

Please sign in to comment.