-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlbbroker.rb
153 lines (128 loc) · 3.56 KB
/
lbbroker.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#!/usr/bin/env ruby
require 'rbczmq'
require_relative 'zhelpers'
NUM_CLIENTS = 10
NUM_WORKERS = 3
context = ZMQ::Context.new
class Client
def initialize(context)
@socket = context.socket(ZMQ::REQ)
@socket.set_random_identity
@socket.connect("ipc://frontend.ipc")
end
def run
# Send request, get reply
@socket.send("HELLO")
reply = @socket.recv
puts "Client: #{reply}"
end
end
class Worker
def initialize(context)
@socket = context.socket(ZMQ::REQ)
@socket.set_random_identity
@socket.connect("ipc://backend.ipc")
end
def run
# Tell broker we're ready for work
@socket.send("READY")
loop do
# read and save all frames until we get an empty frame
# In this example there is only 1, but there could be more.
#message = @socket.recv_message
identity = @socket.recv
empty = @socket.recv
# get request, send reply
request = @socket.recv
puts("Worker: #{request}")
reply = ZMQ::Message.new
reply.addstr(identity)
reply.addstr("")
reply.addstr("OK")
@socket.send_message(reply)
# seems czmq doesn't like sendm with a REQ socket. `recv` is fine...
# Constructing a multi-frame message works fine though.
#@socket.sendm(identity)
#@socket.sendm("")
#@socket.send("OK")
end
end
end
def main(context)
frontend = context.socket(ZMQ::ROUTER)
backend = context.socket(ZMQ::ROUTER)
frontend.bind("ipc://frontend.ipc")
backend.bind("ipc://backend.ipc")
num_clients = 0
NUM_CLIENTS.times do
Thread.new do
begin
num_clients += 1
client = Client.new(context)
client.run
rescue Exception => ex
puts "Client thread exit with exception: #{ex}"
puts ex.backtrace
end
end
end
NUM_WORKERS.times do
Thread.new do
begin
worker = Worker.new(context)
worker.run
rescue Exception => ex
puts "Worker thread exit with exception: #{ex}"
puts ex.backtrace
end
end
end
worker_queue = []
poller = ZMQ::Poller.new
front_poll_item = ZMQ::Pollitem.new(frontend, ZMQ::POLLIN)
back_poll_item = ZMQ::Pollitem.new(backend, ZMQ::POLLIN)
poller.register(back_poll_item)
loop do
poller.poll(-1)
if poller.readables.include?(backend)
# Queue worker identity for load-balancing
worker_id = backend.recv
worker_queue << worker_id
# Second frame is empty
backend.recv
# Third frame is READY or else a client reply identity
client_id = backend.recv
unless client_id == "READY"
backend.recv # empty delimiter
reply = backend.recv
# send to front end
frontend.sendm(client_id)
frontend.sendm("")
frontend.send(reply)
num_clients -= 1
break if num_clients == 0
end
# if we have our first worker, we can start polling front end socket
if worker_queue.length == 1
poller.register(front_poll_item)
end
end
if poller.readables.include?(frontend)
# Now get next client request, route to last-used worker
# Client request is [identity][empty][request]
client_id = frontend.recv
frontend.recv # empty
request = frontend.recv
backend.sendm(worker_queue.shift)
backend.sendm("")
backend.sendm(client_id)
backend.sendm("")
backend.send(request)
# if there are no workers left, stop polling front end socket
if worker_queue.length == 0
poller.remove(front_poll_item)
end
end
end
end
main(context)