From 384ba0c07f630fb6a65c2eace847de6623066a76 Mon Sep 17 00:00:00 2001 From: archfish Date: Tue, 31 Dec 2019 16:16:37 +0800 Subject: [PATCH] support init subscription before enqueue message --- lib/mqjob/version.rb | 2 +- lib/mqjob/worker.rb | 3 ++- lib/plugin/pulsar.rb | 3 +++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/mqjob/version.rb b/lib/mqjob/version.rb index 591f046..4e097be 100644 --- a/lib/mqjob/version.rb +++ b/lib/mqjob/version.rb @@ -1,3 +1,3 @@ module Mqjob - VERSION = "0.3.9" + VERSION = "0.3.10" end diff --git a/lib/mqjob/worker.rb b/lib/mqjob/worker.rb index 2e51549..639d4f5 100644 --- a/lib/mqjob/worker.rb +++ b/lib/mqjob/worker.rb @@ -87,10 +87,11 @@ def from_topic(name, opts={}) # opts # in publish message in X seconds # at publish message at specific time + # init_subscription Boolean 是否先初始化一个订阅 def enqueue(msg, opts={}) @mq ||= Plugin.client(topic_opts[:client]) - @mq.publish(topic, msg, topic_opts) + @mq.publish(topic, msg, topic_opts.merge(opts)) true end end diff --git a/lib/plugin/pulsar.rb b/lib/plugin/pulsar.rb index 96b07c3..51889b9 100644 --- a/lib/plugin/pulsar.rb +++ b/lib/plugin/pulsar.rb @@ -12,7 +12,10 @@ def listen(topic, worker, opts = {}) # opts # in publish message in X seconds # at publish message at specific time + # init_subscription Boolean def publish(topic, msg, opts = {}) + create_consumer(topic, opts) if opts[:init_subscription] + base_cmd = ::Pulsar::Proto::BaseCommand.new( type: ::Pulsar::Proto::BaseCommand::Type::SEND, send: ::Pulsar::Proto::CommandSend.new(