-
Notifications
You must be signed in to change notification settings - Fork 58
EventsPreOneDotOh
NOTE: THIS IS 0.8 RELATED
This information does NOT APPLY to >= 0.9
. This page will be refactored soon (sorry for the inconvenience).
One problem that seems to bite people (myself included, frequently) is forgetting that the event/watcher delivery thread is managed by ZK, and owned by the underlying Zookeeper connection.
# in ZK >= 0.9.0
zk.register('/path/to/enlightenment') do |event|
if event.node_created?
do_some_really_expensive_operation
end
end
zk.exists?('/path/to/enlightenment', :watch => true)
Now, it's important to remember that there is only one delivery thread. you will not receive any other events until do_some_really_expensive_operation
is completed. If you have other watches you're expecting to fire, they won't. It gets especially hairy if you perform some operation that's related to ZK inside of that callback. Let's say you were being diligent and, as every program that is relying on watches should do, you'd registered a callback for on_expired_session
that recreated the connection to the server and took care of teardown/setup.
# recreate the connection here, as 'reopen' in ZK 0.8.x is kind of buggy
# (reopen should work much more cleanly in ZK >= 0.9.0)
zk.on_expired_session do |event|
handle_lost_connection!
@mutex.synchronize do
@zk.close! # be a good boy and close the connection
@zk = ZK.new
end
handle_session_established!
end
The problem with this is that as part of zk.close!
, we shut down the event delivery thread, but in doing so, we have to wait for the current event to finish. This is mainly due to the fact that there's no clean way to "nuke a thread from orbit" as Charles Nutter would say. So we handle shutdown by putting a "poison pill" in the event delivery queue, and when the thread sees this, it knows to exit cleanly.
In short, with the above code, you will deadlock.
Well, in short, the problem is that ZK can't handle this for you. You're writing the application, doing stuff that ZK can't anticipate or even begin to understand. The solution is to use your own thread, that you manage the lifecycle of, and have ZK push events off onto a queue that is consumed by your thread.
require 'thread'
require 'monitor'
# the slightly verbose way
class YourStuff
# make sure this constant doesn't get re-assigned if this
# class is reloaded
KILL_TOKEN = Object.new unless defined?(KILL_TOKEN)
def initialize
@queue = Queue.new
@mutex = Monitor.new
end
def start!
start_delivery_thread
@zk = ZK.new
handle_session_established!
end
def start_delivery_thread
@delivery_thread ||= Thread.new do
while true
event = @queue.pop
break if event == KILL_TOKEN
handle_zk_events(event)
end
end
end
def handle_zk_events(event)
if event.state_expired_session?
reconnect_zk!
end
end
def reconnect_zk!
handle_lost_connection!
@mutex.synchronize do
@zk.close!
@zk = ZK.new
end
handle_session_established!
end
def handle_lost_connection!
# this would do application specific shutdown things
# without exiting
end
def handle_session_established!
# this would register watchers, call
end
end
If you like, ZK provides a Threadpool class, which implements a simple callable-block threadpool implementation, and might ease your implementation.
require 'thread'
require 'monitor'
class YourStuff
def initialize
@threadpool = ZK::Threadpool.new(:size => 1) # adjustable
@mutex = Monitor.new
end
def zk
@mutex.synchronize { @zk }
end
def start!
@threadpool.start!
reconnect_zk!
end
def reconnect_zk!
handle_lost_connection! if zk
@mutex.synchronize do
@zk.close! if @zk
@zk = ZK.new
end
handle_session_established!
end
def handle_session_established!
zk.on_session_expired do |event|
@threadpool.defer do # <- this takes care of the queueing/running
reconnect_zk!
end
end
end
def handle_lost_connection!
# this would do application specific shutdown things
# without exiting
end
end
Now, please take note, this specific case will be easier to handle in 0.9.0 (and I'll update this sample once it's released).
The point however is that you should not do work on threads you don't own, because you are then exposing yourself to the peculiarities of the underlying implementation.