Skip to content

Commit

Permalink
Merge pull request #5142 from rmosolgo/direct-write-subscription
Browse files Browse the repository at this point in the history
Support calls to write_subscription within resolve
  • Loading branch information
rmosolgo authored Dec 9, 2024
2 parents fbc901e + 8372ecd commit dd24fce
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 16 deletions.
4 changes: 3 additions & 1 deletion lib/graphql/execution/interpreter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ def run_all(schema, query_options, context: {}, max_complexity: schema.max_compl
results = []
queries.each_with_index do |query, idx|
if query.subscription? && !query.subscription_update?
query.context.namespace(:subscriptions)[:events] = []
subs_namespace = query.context.namespace(:subscriptions)
subs_namespace[:events] = []
subs_namespace[:subscriptions] = {}
end
multiplex.dataloader.append_job {
operation = query.selected_operation
Expand Down
54 changes: 50 additions & 4 deletions lib/graphql/schema/subscription.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,22 @@ class Subscription < GraphQL::Schema::Resolver
# propagate null.
null false

# @api private
def initialize(object:, context:, field:)
super
# Figure out whether this is an update or an initial subscription
@mode = context.query.subscription_update? ? :update : :subscribe
@subscription_written = false
@original_arguments = nil
if (subs_ns = context.namespace(:subscriptions)) &&
(sub_insts = subs_ns[:subscriptions])
sub_insts[context.current_path] = self
end
end

# @api private
def resolve_with_support(**args)
@original_arguments = args # before `loads:` have been run
result = nil
unsubscribed = true
unsubscribed_result = catch :graphql_subscription_unsubscribed do
Expand All @@ -46,14 +55,17 @@ def resolve_with_support(**args)
end
end

# Implement the {Resolve} API
# Implement the {Resolve} API.
# You can implement this if you want code to run for _both_ the initial subscription
# and for later updates. Or, implement {#subscribe} and {#update}
def resolve(**args)
# Dispatch based on `@mode`, which will raise a `NoMethodError` if we ever
# have an unexpected `@mode`
public_send("resolve_#{@mode}", **args)
end

# Wrap the user-defined `#subscribe` hook
# @api private
def resolve_subscribe(**args)
ret_val = !args.empty? ? subscribe(**args) : subscribe
if ret_val == :no_response
Expand All @@ -71,6 +83,7 @@ def subscribe(args = {})
end

# Wrap the user-provided `#update` hook
# @api private
def resolve_update(**args)
ret_val = !args.empty? ? update(**args) : update
if ret_val == NO_UPDATE
Expand Down Expand Up @@ -106,14 +119,13 @@ def unsubscribe(update_value = nil)
throw :graphql_subscription_unsubscribed, update_value
end

READING_SCOPE = ::Object.new
# Call this method to provide a new subscription_scope; OR
# call it without an argument to get the subscription_scope
# @param new_scope [Symbol]
# @param optional [Boolean] If true, then don't require `scope:` to be provided to updates to this subscription.
# @return [Symbol]
def self.subscription_scope(new_scope = READING_SCOPE, optional: false)
if new_scope != READING_SCOPE
def self.subscription_scope(new_scope = NOT_CONFIGURED, optional: false)
if new_scope != NOT_CONFIGURED
@subscription_scope = new_scope
@subscription_scope_optional = optional
elsif defined?(@subscription_scope)
Expand Down Expand Up @@ -150,6 +162,40 @@ def self.subscription_scope_optional?
def self.topic_for(arguments:, field:, scope:)
Subscriptions::Serialize.dump_recursive([scope, field.graphql_name, arguments])
end

# Calls through to `schema.subscriptions` to register this subscription with the backend.
# This is automatically called by GraphQL-Ruby after a query finishes successfully,
# but if you need to commit the subscription during `#subscribe`, you can call it there.
# (This method also sets a flag showing that this subscription was already written.)
#
# If you call this method yourself, you may also need to {#unsubscribe}
# or call `subscriptions.delete_subscription` to clean up the database if the query crashes with an error
# later in execution.
# @return [void]
def write_subscription
if subscription_written?
raise GraphQL::Error, "`write_subscription` was called but `#{self.class}#subscription_written?` is already true. Remove a call to `write subscription`."
else
@subscription_written = true
context.schema.subscriptions.write_subscription(context.query, [event])
end
nil
end

# @return [Boolean] `true` if {#write_subscription} was called already
def subscription_written?
@subscription_written
end

# @return [Subscriptions::Event] This object is used as a representation of this subscription for the backend
def event
@event ||= Subscriptions::Event.new(
name: field.name,
arguments: @original_arguments,
context: context,
field: field,
)
end
end
end
end
22 changes: 12 additions & 10 deletions lib/graphql/subscriptions/default_subscription_resolve_extension.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,30 @@ def resolve(context:, object:, arguments:)
def after_resolve(value:, context:, object:, arguments:, **rest)
if value.is_a?(GraphQL::ExecutionError)
value
elsif @field.resolver&.method_defined?(:subscription_written?) &&
(subscription_namespace = context.namespace(:subscriptions)) &&
(subscriptions_by_path = subscription_namespace[:subscriptions])
(subscription_instance = subscriptions_by_path[context.current_path])
# If it was already written, don't append this event to be written later
if !subscription_instance.subscription_written?
events = context.namespace(:subscriptions)[:events]
events << subscription_instance.event
end
value
elsif (events = context.namespace(:subscriptions)[:events])
# This is the first execution, so gather an Event
# for the backend to register:
event = Subscriptions::Event.new(
name: field.name,
arguments: arguments_without_field_extras(arguments: arguments),
arguments: arguments,
context: context,
field: field,
)
events << event
value
elsif context.query.subscription_topic == Subscriptions::Event.serialize(
field.name,
arguments_without_field_extras(arguments: arguments),
arguments,
field,
scope: (field.subscription_scope ? context[field.subscription_scope] : nil),
)
Expand All @@ -45,14 +55,6 @@ def after_resolve(value:, context:, object:, arguments:, **rest)
context.skip
end
end

private

def arguments_without_field_extras(arguments:)
arguments.dup.tap do |event_args|
field.extras.each { |k| event_args.delete(k) }
end
end
end
end
end
13 changes: 12 additions & 1 deletion lib/graphql/subscriptions/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class Event

def initialize(name:, arguments:, field: nil, context: nil, scope: nil)
@name = name
@arguments = arguments
@arguments = self.class.arguments_without_field_extras(arguments: arguments, field: field)
@context = context
field ||= context.field
scope_key = field.subscription_scope
Expand All @@ -39,6 +39,7 @@ def initialize(name:, arguments:, field: nil, context: nil, scope: nil)
# @return [String] an identifier for this unit of subscription
def self.serialize(_name, arguments, field, scope:, context: GraphQL::Query::NullContext.instance)
subscription = field.resolver || GraphQL::Schema::Subscription
arguments = arguments_without_field_extras(field: field, arguments: arguments)
normalized_args = stringify_args(field, arguments.to_h, context)
subscription.topic_for(arguments: normalized_args, field: field, scope: scope)
end
Expand All @@ -60,6 +61,16 @@ def fingerprint
end

class << self
def arguments_without_field_extras(arguments:, field:)
if !field.extras.empty?
arguments = arguments.dup
field.extras.each do |extra_key|
arguments.delete(extra_key)
end
end
arguments
end

private

# This method does not support cyclic references in the Hash,
Expand Down
71 changes: 71 additions & 0 deletions spec/graphql/schema/subscription_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -696,4 +696,75 @@ class PrivateSubscription < SubscriptionFieldSchema::BaseSubscription
assert_nil PrivateSubscription.subscription_scope
end
end

describe "writing during resolution" do
class DirectWriteSchema < GraphQL::Schema
class WriteCheckSubscriptions
def use(schema)
schema.subscriptions = self
end

def write_subscription(query, events)
query.context[:write_subscription_count] ||= 0
query.context[:write_subscription_count] += 1
evs = query.context[:written_events] ||= []
evs << events
nil
end
end
class ImplicitWrite < GraphQL::Schema::Subscription
type String

def subscribe
"#{context[:written_events]&.size.inspect} / #{context[:write_subscription_count].inspect} / #{subscription_written?}"
end
end

class DirectWrite < ImplicitWrite
type String
def subscribe
write_subscription
super
end
end

class DirectWriteTwice < DirectWrite
type String
def subscribe
write_subscription
super
end
end

class Subscription < GraphQL::Schema::Object
field :direct, subscription: DirectWrite
field :implicit, subscription: ImplicitWrite
field :direct_twice, subscription: DirectWriteTwice
end

use WriteCheckSubscriptions.new
subscription(Subscription)
end

it "only calls write_subscription once" do
res = DirectWriteSchema.execute("subscription { direct }")
assert_equal "1 / 1 / true", res["data"]["direct"]
assert_equal 1, res.context[:write_subscription_count]
assert_equal [1], res.context[:written_events].map(&:size)
assert_equal true, res.context.namespace(:subscriptions)[:subscriptions].values.first.subscription_written?

res = DirectWriteSchema.execute("subscription { implicit }")
assert_equal "nil / nil / false", res["data"]["implicit"]
assert_equal 1, res.context[:write_subscription_count]
assert_equal [1], res.context[:written_events].map(&:size)
assert_equal false, res.context.namespace(:subscriptions)[:subscriptions].values.first.subscription_written?
end

it "raises if write_subscription is called twice" do
err = assert_raises GraphQL::Error do
DirectWriteSchema.execute("subscription { directTwice }")
end
assert_equal "`write_subscription` was called but `DirectWriteSchema::DirectWriteTwice#subscription_written?` is already true. Remove a call to `write subscription`.", err.message
end
end
end

0 comments on commit dd24fce

Please sign in to comment.