Skip to content

Commit

Permalink
Add :message_label support (#411)
Browse files Browse the repository at this point in the history
  • Loading branch information
mariari authored Mar 28, 2024
1 parent 6425e2f commit 6e12f05
Showing 1 changed file with 67 additions and 20 deletions.
87 changes: 67 additions & 20 deletions lib/kino/process.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ defmodule Kino.Process do

@type supervisor :: pid() | atom()
@type trace_target :: :all | pid() | [pid()]
@type label_response :: {:ok, String.t()} | :continue

@doc """
Generates a visualization of an application tree.
Expand Down Expand Up @@ -192,8 +193,8 @@ defmodule Kino.Process do
from the provided trace function.
"""
@spec render_seq_trace(trace_target(), (-> any())) :: any()
def render_seq_trace(trace_target \\ :all, trace_function) do
{func_result, sequence_diagram} = seq_trace(trace_target, trace_function)
def render_seq_trace(trace_target \\ :all, trace_function, opts \\ []) do
{func_result, sequence_diagram} = seq_trace(trace_target, trace_function, opts)
Kino.render(sequence_diagram)
func_result
end
Expand Down Expand Up @@ -224,6 +225,13 @@ defmodule Kino.Process do
target argument can either be a single PID, a list of PIDs, or the atom `:all`
depending on what messages you would like to retain in your trace.
## Options
* `:message_label` - A function to help label message events. If
the given function returns `:continue`, then the default label
is used. However, if the function returns a `String.t()`, then
that will be used for the label.
## Examples
To generate a trace of all the messages occurring during the execution of the
Expand Down Expand Up @@ -266,15 +274,43 @@ defmodule Kino.Process do
Agent.stop(agent_pid)
end)
Further if you are interested in custom labeling between messages
sent between processes, you can specify custom labels for the
messages you are interested in:
{:ok, agent_pid} = Agent.start_link(fn -> [] end)
Process.monitor(agent_pid)
Kino.Process.seq_trace(agent_pid, fn ->
1..2
|> Task.async_stream(
fn value ->
Agent.get(agent_pid, fn value -> value end)
100 * value
end,
max_concurrency: 3
)
|> Stream.run()
Agent.stop(agent_pid)
end,
message_label: fn(msg) ->
case msg do
{:"$gen_call", _ref, {:get, _}} -> {:ok, "GET: value"}
_ -> :continue
end
end)
"""
@spec seq_trace(trace_target(), (-> any())) :: {any(), Mermaid.t()}
def seq_trace(trace_target \\ :all, trace_function)
@spec seq_trace(trace_target(), (-> any()), keyword()) :: {any(), Mermaid.t()}
def seq_trace(trace_target \\ :all, trace_function, opts \\ [])

def seq_trace(pid, trace_function) when is_pid(pid) do
seq_trace([pid], trace_function)
def seq_trace(pid, trace_function, opts) when is_pid(pid) do
seq_trace([pid], trace_function, opts)
end

def seq_trace(trace_pids, trace_function) when is_list(trace_pids) or trace_pids == :all do
def seq_trace(trace_pids, trace_function, opts)
when is_list(trace_pids) or trace_pids == :all do
# Set up the process message tracer and the Erlang seq_trace_module
calling_pid = self()
{:ok, tracer_pid} = Tracer.start_link()
Expand Down Expand Up @@ -349,7 +385,7 @@ defmodule Kino.Process do
trace_events
|> Enum.reduce({[], MapSet.new()}, fn %{from: from, to: to, message: message},
{events, started_processes} ->
events = [normalize_message(from, to, message, participants_lookup) | events]
events = [normalize_message(from, to, message, participants_lookup, opts) | events]

from_idx = Map.get(participants_lookup, from, :not_found)
to_idx = Map.get(participants_lookup, to, :not_found)
Expand Down Expand Up @@ -408,25 +444,31 @@ defmodule Kino.Process do
defp deactivate?(idx, {:EXIT, _, _}) when idx != :not_found, do: true
defp deactivate?(_idx, _), do: false

defp normalize_message(from, to, message, participants_lookup)
defp normalize_message(from, to, message, participants_lookup, opts)
when is_map_key(participants_lookup, from) and is_map_key(participants_lookup, to) do
formatted_message = label_from_message(message)
formatted_message = label_from_message(message, label_from_options(opts))
from_idx = participants_lookup[from]
to_idx = participants_lookup[to]

"#{from_idx}->>#{to_idx}: #{formatted_message}"
end

defp normalize_message(_, _, _, _), do: ""

defp label_from_message(message) do
case message do
{:EXIT, _, reason} -> "EXIT: #{label_from_reason(reason)}"
{:spawn_request, _, _, _, _, _, _, _} -> "SPAWN"
{:DOWN, _, :process, _, reason} -> "DOWN: #{label_from_reason(reason)}"
{:"$gen_call", _ref, value} -> "CALL: #{label_from_value(value)}"
{:"$gen_cast", value} -> "CAST: #{label_from_value(value)}"
value -> "INFO: #{label_from_value(value)}"
defp normalize_message(_, _, _, _, _), do: ""

defp label_from_message(message, custom_label) do
case custom_label.(message) do
{:ok, response} ->
response

:continue ->
case message do
{:EXIT, _, reason} -> "EXIT: #{label_from_reason(reason)}"
{:spawn_request, _, _, _, _, _, _, _} -> "SPAWN"
{:DOWN, _, :process, _, reason} -> "DOWN: #{label_from_reason(reason)}"
{:"$gen_call", _ref, value} -> "CALL: #{label_from_value(value)}"
{:"$gen_cast", value} -> "CAST: #{label_from_value(value)}"
value -> "INFO: #{label_from_value(value)}"
end
end
end

Expand All @@ -444,6 +486,11 @@ defmodule Kino.Process do
defp label_from_value(tuple) when is_tuple(tuple), do: "tuple"
defp label_from_value(_), do: "term"

defp label_from_options(opts) do
opts
|> Keyword.get(:message_label, fn _message -> :continue end)
end

defp direction_from_opts(opts) do
opts
|> Keyword.get(:direction, :top_down)
Expand Down

0 comments on commit 6e12f05

Please sign in to comment.