forked from rabbitmq/rabbitmq-tutorials
-
Notifications
You must be signed in to change notification settings - Fork 0
/
rpc_client.rb
72 lines (53 loc) · 1.6 KB
/
rpc_client.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#!/usr/bin/env ruby
require 'bunny'
require 'thread'
class FibonacciClient
attr_accessor :call_id, :response, :lock, :condition, :connection,
:channel, :server_queue_name, :reply_queue, :exchange
def initialize(server_queue_name)
@connection = Bunny.new(automatically_recover: false)
@connection.start
@channel = connection.create_channel
@exchange = channel.default_exchange
@server_queue_name = server_queue_name
setup_reply_queue
end
def call(n)
@call_id = generate_uuid
exchange.publish(n.to_s,
routing_key: server_queue_name,
correlation_id: call_id,
reply_to: reply_queue.name)
# wait for the signal to continue the execution
lock.synchronize { condition.wait(lock) }
response
end
def stop
channel.close
connection.close
end
private
def setup_reply_queue
@lock = Mutex.new
@condition = ConditionVariable.new
that = self
@reply_queue = channel.queue('', exclusive: true)
reply_queue.subscribe do |_delivery_info, properties, payload|
if properties[:correlation_id] == that.call_id
that.response = payload.to_i
# sends the signal to continue the execution of #call
that.lock.synchronize { that.condition.signal }
end
end
end
def generate_uuid
# very naive but good enough for code examples
"#{rand}#{rand}#{rand}"
end
end
client = FibonacciClient.new('rpc_queue')
n = (ARGV[0] || 30).to_i
puts " [x] Requesting fib(#{n})"
response = client.call(n)
puts " [.] Got #{response}"
client.stop