diff --git a/lib/absinthe/subscription/proxy.ex b/lib/absinthe/subscription/proxy.ex index fa70b86831..d9ecf0cac6 100644 --- a/lib/absinthe/subscription/proxy.ex +++ b/lib/absinthe/subscription/proxy.ex @@ -6,10 +6,11 @@ defmodule Absinthe.Subscription.Proxy do defstruct [ :pubsub, :node, - :task_super + :task_super, + :async ] - def child_spec([_, _, shard] = args) do + def child_spec([_task_super, _pubsub, shard, _async] = args) do %{ id: {__MODULE__, shard}, start: {__MODULE__, :start_link, [args]} @@ -26,11 +27,11 @@ defmodule Absinthe.Subscription.Proxy do def topic(shard), do: "__absinthe__:proxy:#{shard}" - def init([task_super, pubsub, shard]) do + def init([task_super, pubsub, shard, async]) do node_name = pubsub.node_name() :ok = pubsub.subscribe(topic(shard)) Process.send_after(self(), :gc, @gc_interval) - {:ok, %__MODULE__{pubsub: pubsub, node: node_name, task_super: task_super}} + {:ok, %__MODULE__{pubsub: pubsub, node: node_name, task_super: task_super, async: async}} end def handle_info(:gc, state) do @@ -42,13 +43,20 @@ defmodule Absinthe.Subscription.Proxy do def handle_info(payload, state) do # There's no meaningful form of backpressure to have here, and we can't # bottleneck execution inside each proxy process - unless payload.node == state.pubsub.node_name() do - Subscription.Local.publish_mutation( - state.pubsub, - payload.mutation_result, - payload.subscribed_fields - ) + if state.async do + Task.Supervisor.start_child(state.task_super, Subscription.Local, :publish_mutation, [ + state.pubsub, + payload.mutation_result, + payload.subscribed_fields + ]) + else + Subscription.Local.publish_mutation( + state.pubsub, + payload.mutation_result, + payload.subscribed_fields + ) + end end {:noreply, state} diff --git a/lib/absinthe/subscription/proxy_supervisor.ex b/lib/absinthe/subscription/proxy_supervisor.ex index 62821d2b81..7c8d6b02f0 100644 --- a/lib/absinthe/subscription/proxy_supervisor.ex +++ b/lib/absinthe/subscription/proxy_supervisor.ex @@ -3,18 +3,18 @@ defmodule Absinthe.Subscription.ProxySupervisor do use Supervisor - def start_link([pubsub, registry, pool_size]) do - Supervisor.start_link(__MODULE__, {pubsub, registry, pool_size}) + def start_link([pubsub, registry, pool_size, async]) do + Supervisor.start_link(__MODULE__, {pubsub, registry, pool_size, async}) end - def init({pubsub, registry, pool_size}) do + def init({pubsub, registry, pool_size, async}) do task_super_name = Module.concat(registry, TaskSuper) task_super = {Task.Supervisor, name: task_super_name} # Shard numbers are generated by phash2 which is 0-based: proxies = for shard <- 0..(pool_size - 1) do - {Absinthe.Subscription.Proxy, [task_super_name, pubsub, shard]} + {Absinthe.Subscription.Proxy, [task_super_name, pubsub, shard, async]} end Supervisor.init([task_super | proxies], strategy: :one_for_one) diff --git a/lib/absinthe/subscription/supervisor.ex b/lib/absinthe/subscription/supervisor.ex index 7b63ae4ab9..c61a001bf5 100644 --- a/lib/absinthe/subscription/supervisor.ex +++ b/lib/absinthe/subscription/supervisor.ex @@ -24,10 +24,17 @@ defmodule Absinthe.Subscription.Supervisor do pool_size = Keyword.get(opts, :pool_size, System.schedulers_online() * 2) compress_registry? = Keyword.get(opts, :compress_registry?, true) - Supervisor.start_link(__MODULE__, {pubsub, pool_size, compress_registry?}) + # Absinthe.Subscription.Proxy listens for subscription messages + # from other nodes and then runs Subscription.Local.publish_mutation to process + # the mutation on the local node. By default it runs in a task superivsor so that + # requests are handled concurrently. However, this may not work for some + # systems. Setting `async` to false makes it so that the requests are processed one at a time. + async? = Keyword.get(opts, :async, true) + + Supervisor.start_link(__MODULE__, {pubsub, pool_size, compress_registry?, async?}) end - def init({pubsub, pool_size, compress_registry?}) do + def init({pubsub, pool_size, compress_registry?, async?}) do registry_name = Absinthe.Subscription.registry_name(pubsub) meta = [pool_size: pool_size] @@ -40,7 +47,7 @@ defmodule Absinthe.Subscription.Supervisor do meta: meta, compressed: compress_registry? ]}, - {Absinthe.Subscription.ProxySupervisor, [pubsub, registry_name, pool_size]} + {Absinthe.Subscription.ProxySupervisor, [pubsub, registry_name, pool_size, async?]} ] Supervisor.init(children, strategy: :one_for_one)