Skip to content
Jonathan D. Simms edited this page Apr 26, 2012 · 1 revision

NOTE: THIS IS 0.8 RELATED

This information does NOT APPLY to >= 0.9. This page will be refactored soon (sorry for the inconvenience).

The Event Dispatch Thread is NOT YOURS

One problem that seems to bite people (myself included, frequently) is forgetting that the event/watcher delivery thread is managed by ZK, and owned by the underlying Zookeeper connection.

# in ZK >= 0.9.0

zk.register('/path/to/enlightenment') do |event|
  if event.node_created?
    do_some_really_expensive_operation
  end
end


zk.exists?('/path/to/enlightenment', :watch => true)

Now, it's important to remember that there is only one delivery thread. you will not receive any other events until do_some_really_expensive_operation is completed. If you have other watches you're expecting to fire, they won't. It gets especially hairy if you perform some operation that's related to ZK inside of that callback. Let's say you were being diligent and, as every program that is relying on watches should do, you'd registered a callback for on_expired_session that recreated the connection to the server and took care of teardown/setup.

# recreate the connection here, as 'reopen' in ZK 0.8.x is kind of buggy
# (reopen should work much more cleanly in ZK >= 0.9.0)

zk.on_expired_session do |event|
  handle_lost_connection!
  @mutex.synchronize do
    @zk.close! # be a good boy and close the connection
    @zk = ZK.new
  end
  handle_session_established!
end

The problem with this is that as part of zk.close!, we shut down the event delivery thread, but in doing so, we have to wait for the current event to finish. This is mainly due to the fact that there's no clean way to "nuke a thread from orbit" as Charles Nutter would say. So we handle shutdown by putting a "poison pill" in the event delivery queue, and when the thread sees this, it knows to exit cleanly.

In short, with the above code, you will deadlock.

So what should one do?

Well, in short, the problem is that ZK can't handle this for you. You're writing the application, doing stuff that ZK can't anticipate or even begin to understand. The solution is to use your own thread, that you manage the lifecycle of, and have ZK push events off onto a queue that is consumed by your thread.

require 'thread'
require 'monitor'

# the slightly verbose way

class YourStuff
  # make sure this constant doesn't get re-assigned if this
  # class is reloaded 
  KILL_TOKEN = Object.new unless defined?(KILL_TOKEN)

  def initialize
    @queue = Queue.new
    @mutex = Monitor.new
  end

  def start!
    start_delivery_thread
    @zk = ZK.new
    handle_session_established!
  end

  def start_delivery_thread
    @delivery_thread ||= Thread.new do
      while true
        event = @queue.pop
        break if event == KILL_TOKEN
        handle_zk_events(event)
      end
    end
  end

  def handle_zk_events(event)
    if event.state_expired_session?
      reconnect_zk!
    end
  end

  def reconnect_zk!
    handle_lost_connection!
    @mutex.synchronize do
      @zk.close!
      @zk = ZK.new
    end
    handle_session_established!
  end

  def handle_lost_connection!
    # this would do application specific shutdown things
    # without exiting
  end

  def handle_session_established!
    # this would register watchers, call 
  end
end

If you like, ZK provides a Threadpool class, which implements a simple callable-block threadpool implementation, and might ease your implementation.

require 'thread'
require 'monitor'

class YourStuff
  def initialize
    @threadpool = ZK::Threadpool.new(:size => 1) # adjustable
    @mutex = Monitor.new
  end

  def zk
    @mutex.synchronize { @zk }
  end

  def start!
    @threadpool.start!
    reconnect_zk!
  end

  def reconnect_zk!
    handle_lost_connection! if zk

    @mutex.synchronize do
      @zk.close! if @zk
      @zk = ZK.new
    end

    handle_session_established!
  end

  def handle_session_established!
    zk.on_session_expired do |event|
      @threadpool.defer do            # <- this takes care of the queueing/running
        reconnect_zk!
      end
    end
  end

  def handle_lost_connection!
    # this would do application specific shutdown things
    # without exiting
  end
end

Now, please take note, this specific case will be easier to handle in 0.9.0 (and I'll update this sample once it's released).

The point however is that you should not do work on threads you don't own, because you are then exposing yourself to the peculiarities of the underlying implementation.

Clone this wiki locally