diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8eb3b06 --- /dev/null +++ b/.gitignore @@ -0,0 +1,12 @@ +/.bundle/ +/.yardoc +/Gemfile.lock +/_yardoc/ +/coverage/ +/doc/ +/pkg/ +/spec/reports/ +/tmp/ + +# rspec failure tracking +.rspec_status diff --git a/.rspec b/.rspec new file mode 100644 index 0000000..8c18f1a --- /dev/null +++ b/.rspec @@ -0,0 +1,2 @@ +--format documentation +--color diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..16d03ca --- /dev/null +++ b/.travis.yml @@ -0,0 +1,10 @@ +sudo: false +language: ruby + +rvm: + - 2.4.1 + +before_install: + - curl -sL https://dl.google.com/dl/cloudsdk/channels/rapid/google-cloud-sdk.tar.gz | tar xz + - google-cloud-sdk/install.sh --override-components beta pubsub-emulator --quiet + - export PATH=$PATH:$PWD/google-cloud-sdk/bin diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..8a44fe7 --- /dev/null +++ b/Gemfile @@ -0,0 +1,5 @@ +source 'https://rubygems.org' + +gemspec + +gem 'concurrent-ruby-ext' diff --git a/LICENSE.txt b/LICENSE.txt new file mode 100644 index 0000000..2d94455 --- /dev/null +++ b/LICENSE.txt @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2017 Keita Urashima + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..c593883 --- /dev/null +++ b/README.md @@ -0,0 +1,110 @@ +# ActiveJob::GoogleCloudPubsub + +Google Cloud Pub/Sub adapter and worker for ActiveJob + +## Installation + +```ruby +gem 'activejob-google_cloud_pubsub' +``` + +## Usage + +First, change the ActiveJob backend. + +``` ruby +Rails.application.config.active_job.queue_adapter = :google_cloud_pubsub +``` + +Write the Job class and code to use it. + +``` ruby +class HelloJob < ApplicationJob + def perform(name) + puts "hello, #{name}!" + end +end +``` + +``` ruby +class HelloController < ApplicationController + def say + HelloJob.perform_later params[:name] + end +end +``` + +In order to test the worker in your local environment, it is a good idea to use the Pub/Sub emulator provided by `gcloud` command. Refer to [this document](https://cloud.google.com/pubsub/docs/emulator) for the installation procedure. + +When the installation is completed, execute the following command to start up the worker. + +``` sh +$ gcloud beta emulators pubsub start + +(Switch to another terminal) + +$ eval `gcloud beta emulators pubsub env-init` +$ cd path/to/your-app +$ bundle exec activejob-google_cloud_pubsub-worker +``` + +If you hit the previous action, the job will be executed. +(Both the emulator and the worker stop with Ctrl+C) + +## Configuration + +### Adapter + +When passing options to the adapter, you need to create the object instead of a symbol. + +``` ruby +Rails.application.config.active_job.queue_adapter = ActiveJob::QueueAdapters::GoogleCloudPubsubAdapter.new(options) +``` + +All options are passed to [`Google::Cloud::Pubsub.new`](http://googlecloudplatform.github.io/google-cloud-ruby/#/docs/google-cloud-pubsub/v0.23.2/google/cloud/pubsub?method=new-class). + +### Worker + +The following command line flags can be specified. All flags are optional. + +#### `--require=PATH` + +The path of the file to load before the worker starts up. + +Default: `./config/environment` + +#### `--queue=NAME` + +The name of the queue the worker handles. + +Note: One worker can handle only one queue. If you use multiple queues, you need to launch multiple worker processes. + +Default: `default` + +#### `--min_threads=N` + +Minimum number of worker threads. + +Default: `0` + +#### `--max_threads=N` + +Maximum number of worker threads. + +Default: number of logical cores + +#### `--project=PROJECT_ID`, `--keyfile=PATH` + +Credentials of Google Cloud Platform. Please refer to [the document](https://github.com/GoogleCloudPlatform/google-cloud-ruby/blob/master/AUTHENTICATION.md) for details. + +``` sh +$ bundle exec rake spec +``` + +## Contributing + +Bug reports and pull requests are welcome on GitHub at https://github.com/ursm/activejob-google_cloud_pubsub. + +## License + +The gem is available as open source under the terms of the [MIT License](http://opensource.org/licenses/MIT). diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..93cb943 --- /dev/null +++ b/Rakefile @@ -0,0 +1,6 @@ +require 'bundler/gem_tasks' +require 'rspec/core/rake_task' + +RSpec::Core::RakeTask.new(:spec) + +task :default => :spec diff --git a/activejob-google_cloud_pubsub.gemspec b/activejob-google_cloud_pubsub.gemspec new file mode 100644 index 0000000..971aa3a --- /dev/null +++ b/activejob-google_cloud_pubsub.gemspec @@ -0,0 +1,31 @@ +lib = File.expand_path('../lib', __FILE__) +$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) +require 'active_job/google_cloud_pubsub/version' + +Gem::Specification.new do |spec| + spec.name = 'activejob-google_cloud_pubsub' + spec.version = ActiveJob::GoogleCloudPubsub::VERSION + spec.authors = ['Keita Urashima'] + spec.email = ['ursm@ursm.jp'] + + spec.summary = 'Google Cloud Pub/Sub adapter and worker for ActiveJob' + spec.homepage = 'https://github.com/ursm/activejob-google_cloud_pubsub' + spec.license = 'MIT' + + spec.files = `git ls-files -z`.split("\x0").reject do |f| + f.match(%r{^(test|spec|features)/}) + end + + spec.bindir = 'exe' + spec.executables = spec.files.grep(%r{^exe/}) { |f| File.basename(f) } + spec.require_paths = ['lib'] + + spec.add_runtime_dependency 'activejob' + spec.add_runtime_dependency 'activesupport' + spec.add_runtime_dependency 'concurrent-ruby' + spec.add_runtime_dependency 'google-cloud-pubsub' + + spec.add_development_dependency 'bundler' + spec.add_development_dependency 'rake' + spec.add_development_dependency 'rspec' +end diff --git a/bin/console b/bin/console new file mode 100755 index 0000000..86a404c --- /dev/null +++ b/bin/console @@ -0,0 +1,14 @@ +#!/usr/bin/env ruby + +require 'bundler/setup' +require 'activejob-google_cloud_pubsub' + +# You can add fixtures and/or initialization code here to make experimenting +# with your gem easier. You can also use a different console, if you like. + +# (If you use this, don't forget to add pry to your Gemfile!) +# require "pry" +# Pry.start + +require 'irb' +IRB.start(__FILE__) diff --git a/bin/setup b/bin/setup new file mode 100755 index 0000000..dce67d8 --- /dev/null +++ b/bin/setup @@ -0,0 +1,8 @@ +#!/usr/bin/env bash +set -euo pipefail +IFS=$'\n\t' +set -vx + +bundle install + +# Do any other automated setup that you need to do here diff --git a/exe/activejob-google_cloud_pubsub-worker b/exe/activejob-google_cloud_pubsub-worker new file mode 100755 index 0000000..4a8e001 --- /dev/null +++ b/exe/activejob-google_cloud_pubsub-worker @@ -0,0 +1,41 @@ +#!/usr/bin/env ruby + +require 'activejob-google_cloud_pubsub' +require 'google/cloud/pubsub' +require 'optparse/kwargs' + +Version = ActiveJob::GoogleCloudPubsub::VERSION + +args = { + require: './config/environment' +} + +opt = OptionParser.new + +opt.on '--[no-]require=PATH' do |v| + args[:require] = v +end + +worker_args = opt.define_by_keywords( + {}, + ActiveJob::GoogleCloudPubsub::Worker.instance_method(:initialize), + { + min_threads: Integer, + max_threads: Integer + } +) + +pubsub_args = opt.define_by_keywords( + {}, + Google::Cloud::Pubsub.method(:new), + { + scope: Array, + timeout: Integer + } +) + +opt.parse ARGV + +require args[:require] if args[:require] + +ActiveJob::GoogleCloudPubsub::Worker.new(**worker_args, **pubsub_args).run diff --git a/lib/active_job/google_cloud_pubsub/adapter.rb b/lib/active_job/google_cloud_pubsub/adapter.rb new file mode 100644 index 0000000..3ae5210 --- /dev/null +++ b/lib/active_job/google_cloud_pubsub/adapter.rb @@ -0,0 +1,29 @@ +require 'active_job/google_cloud_pubsub/naming' +require 'google/cloud/pubsub' +require 'json' + +module ActiveJob + module GoogleCloudPubsub + class Adapter + include Naming + + def initialize(**pubsub_args) + @pubsub = Google::Cloud::Pubsub.new(pubsub_args) + end + + def enqueue(job, attributes = {}) + topic = @pubsub.topic(topic_name(job.queue_name), autocreate: true) + + topic.publish JSON.dump(job.serialize), attributes + end + + def enqueue_at(job, timestamp) + enqueue job, timestamp: timestamp + end + end + end +end + +require 'active_job' + +ActiveJob::QueueAdapters::GoogleCloudPubsubAdapter = ActiveJob::GoogleCloudPubsub::Adapter diff --git a/lib/active_job/google_cloud_pubsub/naming.rb b/lib/active_job/google_cloud_pubsub/naming.rb new file mode 100644 index 0000000..fcf23b6 --- /dev/null +++ b/lib/active_job/google_cloud_pubsub/naming.rb @@ -0,0 +1,13 @@ +module ActiveJob + module GoogleCloudPubsub + module Naming + def topic_name(queue_name) + "activejob-queue-#{queue_name}" + end + + def subscription_name(queue_name) + "activejob-worker-#{queue_name}" + end + end + end +end diff --git a/lib/active_job/google_cloud_pubsub/version.rb b/lib/active_job/google_cloud_pubsub/version.rb new file mode 100644 index 0000000..7183ff6 --- /dev/null +++ b/lib/active_job/google_cloud_pubsub/version.rb @@ -0,0 +1,5 @@ +module ActiveJob + module GoogleCloudPubsub + VERSION = '0.1.0' + end +end diff --git a/lib/active_job/google_cloud_pubsub/worker.rb b/lib/active_job/google_cloud_pubsub/worker.rb new file mode 100644 index 0000000..d469de6 --- /dev/null +++ b/lib/active_job/google_cloud_pubsub/worker.rb @@ -0,0 +1,98 @@ +require 'active_job/base' +require 'active_job/google_cloud_pubsub/naming' +require 'active_support/core_ext/numeric/time' +require 'concurrent' +require 'google/cloud/pubsub' +require 'json' +require 'logger' + +module ActiveJob + module GoogleCloudPubsub + class Worker + include Naming + + MAX_DEADLINE = 10.minutes + + cattr_accessor(:logger) { Logger.new($stdout) } + + def initialize(queue: 'default', min_threads: 0, max_threads: Concurrent.processor_count, **pubsub_args) + @queue_name, @min_threads, @max_threads = queue, min_threads, max_threads + + @pubsub = Google::Cloud::Pubsub.new(**pubsub_args) + end + + def subscription + topic = @pubsub.topic(topic_name(@queue_name), autocreate: true) + + topic.subscription(subscription_name(@queue_name)) || topic.subscribe(subscription_name(@queue_name)) + end + alias ensure_subscription subscription + + def run + pool = Concurrent::ThreadPoolExecutor.new(min_threads: @min_threads, max_threads: @max_threads, max_queue: -1) + + subscription.listen do |message| + begin + Concurrent::Promise.execute(args: message, executor: pool) {|msg| + process msg + }.rescue {|e| + logger.error e + } + rescue Concurrent::RejectedExecutionError + message.delay! 10.seconds.to_i + end + end + end + + private + + def process(message) + if timestamp = message.attributes['timestamp'] + ts = Time.at(timestamp.to_f) + + if ts >= Time.now + _process message + else + message.delay! [ts - Time.now, MAX_DEADLINE.to_i].min + end + else + _process message + end + end + + def _process(message) + timer_opts = { + execution_interval: MAX_DEADLINE - 10.seconds, + timeout_interval: 5.seconds, + run_now: true + } + + delay_timer = Concurrent::TimerTask.execute(timer_opts) { + message.delay! MAX_DEADLINE.to_i + } + + begin + succeeded = false + failed = false + + ActiveJob::Base.execute JSON.parse(message.data) + + succeeded = true + rescue Exception + failed = true + + raise + ensure + delay_timer.shutdown + + if succeeded || failed + message.acknowledge! + else + # terminated from outside + message.delay! 0 + end + end + end + end + end +end diff --git a/lib/activejob-google_cloud_pubsub.rb b/lib/activejob-google_cloud_pubsub.rb new file mode 100644 index 0000000..3627aac --- /dev/null +++ b/lib/activejob-google_cloud_pubsub.rb @@ -0,0 +1,12 @@ +require 'active_job' + +module ActiveJob + module GoogleCloudPubsub + autoload :VERSION, 'active_job/google_cloud_pubsub/version' + autoload :Worker, 'active_job/google_cloud_pubsub/worker' + end + + module QueueAdapters + autoload :GoogleCloudPubsubAdapter, 'active_job/google_cloud_pubsub/adapter' + end +end diff --git a/spec/integration_spec.rb b/spec/integration_spec.rb new file mode 100644 index 0000000..25edaf6 --- /dev/null +++ b/spec/integration_spec.rb @@ -0,0 +1,64 @@ +require 'spec_helper' + +require 'active_job' +require 'json' +require 'thread' +require 'timeout' + +RSpec.describe ActiveJob::GoogleCloudPubsub, :emulator do + class GreetingJob < ActiveJob::Base + def perform(name) + $queue.push "hello, #{name}!" + end + end + + around :all do |example| + orig, ActiveJob::Base.logger = ActiveJob::Base.logger, nil + + begin + example.run + ensure + ActiveJob::Base.logger = orig + end + end + + around :each do |example| + $queue = Thread::Queue.new + + ActiveJob::Base.queue_adapter = ActiveJob::QueueAdapters::GoogleCloudPubsubAdapter.new(emulator_host: @emulator_host) + + run_worker emulator_host: @emulator_host, &example + end + + example do + GreetingJob.perform_later 'alice' + GreetingJob.set(wait: 0.1).perform_later 'bob' + GreetingJob.set(wait_until: Time.now + 0.2).perform_later 'charlie' + + Timeout.timeout 1 do + expect(3.times.map { $queue.pop }).to contain_exactly( + 'hello, alice!', + 'hello, bob!', + 'hello, charlie!' + ) + end + end + + private + + def run_worker(**args, &block) + worker = ActiveJob::GoogleCloudPubsub::Worker.new(**args) + + worker.ensure_subscription + + thread = Thread.new { + worker.run + } + + thread.abort_on_exception = true + + block.call + ensure + thread.kill if thread + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb new file mode 100644 index 0000000..c29d826 --- /dev/null +++ b/spec/spec_helper.rb @@ -0,0 +1,48 @@ +require 'bundler/setup' + +require 'activejob-google_cloud_pubsub' +require 'timeout' + +RSpec.configure do |config| + # Enable flags like --only-failures and --next-failure + config.example_status_persistence_file_path = '.rspec_status' + + config.expect_with :rspec do |rspec| + rspec.syntax = :expect + end + + config.around :each, emulator: true do |example| + run_pubsub_emulator do |emulator_host| + @emulator_host = emulator_host + + example.run + end + end + + private + + def run_pubsub_emulator(&block) + pipe = IO.popen('gcloud beta emulators pubsub start', err: %i(child out), pgroup: true) + + Timeout.timeout 10 do + pipe.each do |line| + break if line.include?('INFO: Server started') + + raise line if line.include?('Exception in thread') + end + end + + host = `gcloud beta emulators pubsub env-init`.match(/^export PUBSUB_EMULATOR_HOST=(\S+)$/).values_at(1).first + + block.call host + ensure + return unless pipe + + begin + Process.kill :TERM, -Process.getpgid(pipe.pid) + Process.wait pipe.pid + rescue Errno::ESRCH, Errno::ECHILD + # already terminated + end + end +end