Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ActiveJob support for subscriptions #4836

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions lib/graphql/subscriptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ class SubscriptionScopeMissingError < GraphQL::Error
end

# @see {Subscriptions#initialize} for options, concrete implementations may add options.
def self.use(defn, options = {})
schema = defn.is_a?(Class) ? defn : defn.target

def self.use(schema, **options)
if schema.subscriptions(inherited: false)
raise ArgumentError, "Can't reinstall subscriptions. #{schema} is using #{schema.subscriptions}, can't also add #{self}"
end
Expand All @@ -37,18 +35,47 @@ def self.use(defn, options = {})

# @param schema [Class] the GraphQL schema this manager belongs to
# @param validate_update [Boolean] If false, then validation is skipped when executing updates
def initialize(schema:, validate_update: true, broadcast: false, default_broadcastable: false, **rest)
def initialize(schema:, validate_update: true, broadcast: false, default_broadcastable: false, trigger_job: NOT_CONFIGURED, trigger_job_queue_as: NOT_CONFIGURED, **rest)
if broadcast
schema.query_analyzer(Subscriptions::BroadcastAnalyzer)
end
@default_broadcastable = default_broadcastable
@schema = schema
@validate_update = validate_update
@trigger_job = if trigger_job == NOT_CONFIGURED
if defined?(ActiveJob::Base)
require "graphql/subscriptions/trigger_job"
trigger_job_class = Class.new(GraphQL::Subscriptions::TriggerJob)
trigger_job_class.subscriptions = self
if trigger_job_queue_as != NOT_CONFIGURED
trigger_job_class.queue_as(trigger_job_queue_as)
end
# ActiveJob will need a constant reference to this class:
schema.const_set(:SubscriptionsTriggerJob, trigger_job_class)
trigger_job_class
else
nil
end
else
trigger_job
end
end

# @return [Boolean] Used when fields don't have `broadcastable:` explicitly set
attr_reader :default_broadcastable

# This class is used to trigger with `.trigger_later`.
# When Rails is loaded, a Job class is automatically created.
# Pass `use ..., trigger_job: ...` to provide a custom class.
# @return [Class<Active::JobBase>, nil]
attr_reader :trigger_job

# Use {trigger_job} (`.perform_later`) to perform this trigger.
# @see trigger for arguments
def trigger_later(event_name, args, object, scope: nil, context: {})
job_class = @trigger_job || raise(ArgumentError, "No `trigger_job` configured. Make sure Rails is loaded or provide a `trigger_job:` option to `use #{self.class}, ...`.")
job_class.perform_later(event_name, args, object, scope: scope, context: {})
end
# Fetch subscriptions matching this field + arguments pair
# And pass them off to the queue.
# @param event_name [String]
Expand Down
15 changes: 15 additions & 0 deletions lib/graphql/subscriptions/trigger_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# frozen_string_literal: true
module GraphQL
class Subscriptions
class TriggerJob < ActiveJob::Base
class << self
# @return [GraphQL::Subscriptions]
attr_accessor :subscriptions
end

def perform(*args, **kwargs)
self.class.subscriptions.trigger(*args, **kwargs)
end
end
end
end
78 changes: 78 additions & 0 deletions spec/graphql/subscriptions/trigger_job_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# frozen_string_literal: true
require "spec_helper"

describe "GraphQL::Subscriptions::TriggerJob" do
before do
skip "Requires Rails" unless testing_rails?
TriggerJobSchema.subscriptions.reset
end

if defined?(ActiveJob)
include ActiveJob::TestHelper
ActiveJob::Base.logger = Logger.new(IO::NULL)
end

class TriggerJobSchema < GraphQL::Schema
class InMemorySubscriptions < GraphQL::Subscriptions
attr_reader :write_subscription_events, :execute_all_events

def initialize(...)
super
reset
end

def write_subscription(_query, events)
@write_subscription_events.concat(events)
end

def execute_all(event, _object)
@execute_all_events.push(event)
end

def reset
@write_subscription_events = []
@execute_all_events = []
end
end

class Subscription < GraphQL::Schema::Object
class Update < GraphQL::Schema::Subscription
field :news, String

def resolve
object
{
news: (object && object[:news]) ? object[:news] : "Hello World"
}
end
end

field :update, subscription: Update
end
subscription Subscription
use InMemorySubscriptions
end

it "Creates a custom ActiveJob::Base subclass" do
assert_equal TriggerJobSchema::SubscriptionsTriggerJob, TriggerJobSchema.subscriptions.trigger_job
assert_equal GraphQL::Subscriptions::TriggerJob, TriggerJobSchema::SubscriptionsTriggerJob.superclass
assert_equal ActiveJob::Base, TriggerJobSchema::SubscriptionsTriggerJob.superclass.superclass

custom_class = Class.new(TriggerJobSchema) do
use TriggerJobSchema::InMemorySubscriptions, trigger_job_queue_as: "graphql_subscriptions"
end

assert_equal "graphql_subscriptions", custom_class::SubscriptionsTriggerJob.queue_name
end

it "runs .trigger in the background" do
res = TriggerJobSchema.execute("subscription { update { news } }")
assert_equal 1, TriggerJobSchema.subscriptions.write_subscription_events.size
assert_equal 0, TriggerJobSchema.subscriptions.execute_all_events.size
perform_enqueued_jobs do
TriggerJobSchema.subscriptions.trigger_later(:update, {}, { news: "Expect a week of sunshine" })
end
assert_equal 1, TriggerJobSchema.subscriptions.execute_all_events.size

end
end
24 changes: 24 additions & 0 deletions spec/graphql/subscriptions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,30 @@ def self.parse_error(err, context)
res = schema.subscriptions.trigger("payload", { "id" => "8"}, OpenStruct.new(str: nil, int: nil))
assert res
end

if !testing_rails?
it "raises an error when trigger_job isn't configured" do
assert_nil schema.subscriptions.trigger_job
err = assert_raises ArgumentError do
schema.subscriptions.trigger_later(:nothing, {}, :nothing)
end
assert_equal "No `trigger_job` configured. Make sure Rails is loaded or provide a `trigger_job:` option to `use InMemoryBackend::Subscriptions, ...`.", err.message
end

it "doesn't raise an error when trigger_job is present" do
log = []
trigger_job = Class.new do
define_singleton_method(:perform_later) do |*args, **kwargs|
log << [args, kwargs]
end
end
new_schema = Class.new(schema) do
use InMemoryBackend::Subscriptions, extra: 123, trigger_job: trigger_job
end
assert new_schema.subscriptions.trigger_later(:nothing, :nothing, :nothing)
assert_equal [[[:nothing, :nothing, :nothing], { scope: nil, context: {} }]], log
end
end
end

describe "Triggering with custom enum values" do
Expand Down