Skip to content

Commit

Permalink
Add support for Absinthe continuations
Browse files Browse the repository at this point in the history
Implementing subscription priming and ordinals
  • Loading branch information
Bernard Duggan authored and bernardd committed May 1, 2024
1 parent 39bdf98 commit fb6d5b7
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 13 deletions.
148 changes: 137 additions & 11 deletions lib/absinthe/phoenix/channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ defmodule Absinthe.Phoenix.Channel do
end

socket = socket |> assign(:absinthe, absinthe_config)

{:ok, socket}
end

Expand Down Expand Up @@ -94,22 +95,44 @@ defmodule Absinthe.Phoenix.Channel do
{:reply, {:ok, %{subscriptionId: doc_id}}, socket}
end

defp send_msg(msg, socket) do
{_ordinal, msg} = pop_in(msg.payload.result[:ordinal])
encoded_msg = socket.serializer.fastlane!(msg)
send(socket.transport_pid, encoded_msg)
end

defp update_ordinal(socket, topic, ordinal) do
absinthe_assigns = Map.get(socket.assigns, :absinthe, %{})

ordinals =
absinthe_assigns
|> Map.get(:subscription_ordinals, %{})
|> Map.put(topic, ordinal)

Phoenix.Socket.assign(
socket,
:absinthe,
Map.put(absinthe_assigns, :subscription_ordinals, ordinals)
)
end

defp run_doc(socket, query, config, opts) do
case run(query, config[:schema], config[:pipeline], opts) do
{:ok, %{"subscribed" => topic}, context} ->
%{transport_pid: transport_pid, serializer: serializer, pubsub_server: pubsub_server} =
socket
pubsub_subscribe(topic, socket)
socket = Absinthe.Phoenix.Socket.put_options(socket, context: context)

{{:ok, %{subscriptionId: topic}}, socket}

:ok =
Phoenix.PubSub.subscribe(
pubsub_server,
topic,
metadata: {:fastlane, transport_pid, serializer, []},
link: true
)
{:more, %{"subscribed" => topic}, continuations, context} ->
reply(socket_ref(socket), {:ok, %{subscriptionId: topic}})

pubsub_subscribe(topic, socket)
socket = Absinthe.Phoenix.Socket.put_options(socket, context: context)
{{:ok, %{subscriptionId: topic}}, socket}

handle_subscription_continuation(continuations, topic, socket)

{:noreply, socket}

{:ok, %{data: _} = reply, context} ->
socket = Absinthe.Phoenix.Socket.put_options(socket, context: context)
Expand All @@ -119,6 +142,16 @@ defmodule Absinthe.Phoenix.Channel do
socket = Absinthe.Phoenix.Socket.put_options(socket, context: context)
{{:error, reply}, socket}

{:more, %{data: _} = reply, continuations, context} ->
id = new_query_id()

socket =
socket
|> Absinthe.Phoenix.Socket.put_options(context: context)
|> handle_continuation(continuations, id)

{{:ok, add_query_id(reply, id)}, socket}

{:error, reply} ->
{reply, socket}
end
Expand All @@ -128,6 +161,9 @@ defmodule Absinthe.Phoenix.Channel do
{module, fun} = pipeline

case Absinthe.Pipeline.run(document, apply(module, fun, [schema, options])) do
{:ok, %{result: %{continuations: continuations} = result, execution: res}, _phases} ->
{:more, Map.delete(result, :continuations), continuations, res.context}

{:ok, %{result: result, execution: res}, _phases} ->
{:ok, result, res.context}

Expand All @@ -136,6 +172,19 @@ defmodule Absinthe.Phoenix.Channel do
end
end

defp pubsub_subscribe(
topic,
%{transport_pid: transport_pid, serializer: serializer, pubsub_server: pubsub_server}
) do
:ok =
Phoenix.PubSub.subscribe(
pubsub_server,
topic,
metadata: {:fastlane, transport_pid, serializer, ["subscription:data"]},
link: true
)
end

defp extract_variables(payload) do
case Map.get(payload, "variables", %{}) do
nil -> %{}
Expand All @@ -149,14 +198,91 @@ defmodule Absinthe.Phoenix.Channel do
|> Absinthe.Pipeline.for_document(options)
end

def handle_info(
%Phoenix.Socket.Broadcast{payload: %{result: %{ordinal: ordinal}}} = msg,
socket
)
when not is_nil(ordinal) do
absinthe_assigns = Map.get(socket.assigns, :absinthe, %{})
last_ordinal = absinthe_assigns[:subscription_ordinals][msg.topic]

cond do
last_ordinal == nil or last_ordinal < ordinal ->
send_msg(msg, socket)
socket = update_ordinal(socket, msg.topic, ordinal)
{:noreply, socket}

true ->
{:noreply, socket}
end
end

def handle_info(:gc, socket) do
:erlang.garbage_collect()
:erlang.garbage_collect(socket.transport_pid)
Process.send_after(self(), :gc, socket.assigns.absinthe.gc_interval)
{:noreply, socket}
end

def handle_info(_, socket) do
def handle_info(msg, socket) do
send_msg(msg, socket)
{:noreply, socket}
end

defp handle_continuation(socket, continuations, id) do
case Absinthe.Pipeline.continue(continuations) do
{:ok, %{result: %{continuation: next_continuations} = result}, _phases} ->
result =
result
|> Map.delete(:continuations)
|> add_query_id(id)

push(socket, "doc", result)
handle_continuation(socket, next_continuations, id)

{:ok, %{result: result}, _phases} ->
push(socket, "doc", add_query_id(result, id))

{:ok, %{errors: errors}, _phases} ->
push(socket, "doc", add_query_id(%{errors: errors}, id))

{:error, msg, _phases} ->
push(socket, "doc", add_query_id(msg, id))

{:ok, %{result: :no_more_results}, _phases} ->
socket
end
end

defp new_query_id,
do: "absinthe_query:" <> to_string(:erlang.unique_integer([:positive]))

defp add_query_id(result, id), do: Map.put(result, :queryId, id)

defp handle_subscription_continuation(continuations, topic, socket) do
case Absinthe.Pipeline.continue(continuations) do
{:ok, %{result: :no_more_results}, _phases} ->
:ok

{:ok, %{result: result}, _phases} ->
socket = push_subscription_item(result.data, topic, socket)

case result[:continuations] do
nil -> :ok
c -> handle_subscription_continuation(c, topic, socket)
end
end
end

defp push_subscription_item(data, topic, socket) do
msg = %Phoenix.Socket.Broadcast{
topic: topic,
event: "subscription:data",
payload: %{result: %{data: data}, subscriptionId: topic}
}

{:noreply, socket} = handle_info(msg, socket)

socket
end
end
4 changes: 3 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ defmodule Absinthe.Phoenix.Mixfile do
defp deps do
[
{:absinthe_plug, "~> 1.5"},
{:absinthe, "~> 1.5"},
# {:absinthe, "~> 1.5"},
{:absinthe,
github: "circles-learning-labs/absinthe", branch: "subscription-prime", override: true},
{:decimal, "~> 1.0 or ~> 2.0"},
{:phoenix, "~> 1.5"},
{:phoenix_pubsub, "~> 2.0"},
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%{
"absinthe": {:hex, :absinthe, "1.7.5", "a15054f05738e766f7cc7fd352887dfd5e61cec371fb4741cca37c3359ff74ac", [:mix], [{:dataloader, "~> 1.0.0 or ~> 2.0", [hex: :dataloader, repo: "hexpm", optional: true]}, {:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:nimble_parsec, "~> 1.2.2 or ~> 1.3.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}, {:opentelemetry_process_propagator, "~> 0.2.1", [hex: :opentelemetry_process_propagator, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "22a9a38adca26294ad0ee91226168f5d215b401efd770b8a1b8fd9c9b21ec316"},
"absinthe": {:git, "https://github.com/circles-learning-labs/absinthe.git", "a9c8f7b106243a05538c17a0d82b5a4328744518", [branch: "subscription-prime"]},
"absinthe_plug": {:hex, :absinthe_plug, "1.5.8", "38d230641ba9dca8f72f1fed2dfc8abd53b3907d1996363da32434ab6ee5d6ab", [:mix], [{:absinthe, "~> 1.5", [hex: :absinthe, repo: "hexpm", optional: false]}, {:plug, "~> 1.4", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "bbb04176647b735828861e7b2705465e53e2cf54ccf5a73ddd1ebd855f996e5a"},
"castore": {:hex, :castore, "1.0.3", "7130ba6d24c8424014194676d608cb989f62ef8039efd50ff4b3f33286d06db8", [:mix], [], "hexpm", "680ab01ef5d15b161ed6a95449fac5c6b8f60055677a8e79acf01b27baa4390b"},
"decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"},
Expand Down
48 changes: 48 additions & 0 deletions test/absinthe/phoenix_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,54 @@ defmodule Absinthe.PhoenixTest do
assert String.contains?(log, "boom")
end

test "subcription with prime", %{socket: socket} do
ref =
push(socket, "doc", %{
"query" => "subscription { prime }"
})

assert_reply(ref, :ok, %{subscriptionId: subscription_ref})

assert_push("subscription:data", push)

expected = %{
result: %{data: %{"prime" => "prime1"}},
subscriptionId: subscription_ref
}

assert expected == push

assert_push("subscription:data", push)

expected = %{
result: %{data: %{"prime" => "prime2"}},
subscriptionId: subscription_ref
}

assert expected == push
end

test "subscription with ordinal", %{socket: socket} do
ref = push(socket, "doc", %{"query" => "subscription { ordinal }"})

assert_reply(ref, :ok, %{subscriptionId: subscription_ref})

Absinthe.Subscription.publish(Absinthe.Phoenix.TestEndpoint, 1, ordinal: "ordinal_topic")

assert_push("subscription:data", push)
expected = %{result: %{data: %{"ordinal" => 1}}, subscriptionId: subscription_ref}
assert expected == push

Absinthe.Subscription.publish(Absinthe.Phoenix.TestEndpoint, 0, ordinal: "ordinal_topic")
# This message should not generate a notification because it has a lower ordinal

Absinthe.Subscription.publish(Absinthe.Phoenix.TestEndpoint, 2, ordinal: "ordinal_topic")

assert_push("subscription:data", push)
expected = %{result: %{data: %{"ordinal" => 2}}, subscriptionId: subscription_ref}
assert expected == push
end

test "context changes are persisted across documents", %{socket: socket} do
ref =
push(socket, "doc", %{
Expand Down
16 changes: 16 additions & 0 deletions test/support/schema.ex
Original file line number Diff line number Diff line change
Expand Up @@ -97,5 +97,21 @@ defmodule Schema do
{:error, "unauthorized"}
end
end

field :prime, :string do
config fn _, _ ->
{:ok,
topic: "prime_topic",
prime: fn _ ->
{:ok, ["prime1", "prime2"]}
end}
end
end

field :ordinal, :integer do
config fn _, _ ->
{:ok, topic: "ordinal_topic", ordinal: fn value -> value end}
end
end
end
end

0 comments on commit fb6d5b7

Please sign in to comment.