Skip to content

Commit

Permalink
Add an example Kubernetes liveness probe to the README
Browse files Browse the repository at this point in the history
  • Loading branch information
meqif committed Dec 12, 2022
1 parent a0b1cd4 commit febd22e
Showing 1 changed file with 86 additions and 5 deletions.
91 changes: 86 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -528,23 +528,104 @@ Please see [Compression](#compression)

### Consumer rebalance listener

You may also want to set a consuemr rebalance listener.
You may also want to set a consumer rebalance listener for whatever reason. A very simple listener that simply logs the partition assignment and rebalance events would look like this:

```ruby
class MyConsumerRebalanceListener
def on_partitions_assigned(_consumer, _list)
attr_reader :logger
def initialize(logger)
@logger = logger
end
def on_partitions_assigned(_consumer, list)
partitions = list.to_h.map { |_key, values| values.map(&:partition) }.flatten
logger.warn("Partitions assigned: #{partitions}")
end
def on_partitions_revoked(_consumer, list)
partitions = list.to_h.map { |_key, values| values.map(&:partition) }.flatten
logger.warn("Partitions revoked: #{partitions}")
end
end
Racecar.config.consumer_rebalance_listener do
MyConsumerRebalanceListener.new(logger)
end
```

Note that the exact requirements for the consumer rebalance listener interface are set by rdkafka-ruby and may change from version to version.

#### Liveness probe

You can also take advantage of the consumer rebalance listener to implement a liveness probe for Kubernetes.

The core idea is detecting lack of progress in specific partitions by 1. updating the timestamp on files representing activity in a partition and 2. checking the timestamp of said files in a Kubernetes probe.
Since partitions can be rebalanced for a variety of reasons, some housekeeping is necessary and the consumer rebalance listener can be used for that purpose. That will avoid false positives due to partitions that have been re-assigned to a different Kafka consumer.

``` ruby
require 'fileutils'
class MyConsumer < Racecar::Consumer
def perform(message)
do_stuff(message)
liveness_probe(message.partition)
end
private
def liveness_probe(partition)
FileUtils.touch("/app/tmp/kafka_partition_#{partition}_alive")
end
end
```

``` ruby
class RebalanceListener
def on_partitions_assigned(_consumer, list)
# Clean up every liveness file that was carried from the consumer's previous life
Dir['/app/tmp/kafka_partition_*'].each { |f| File.delete(f) }
partitions = list.to_h.map { |_key, values| values.map(&:partition) }.flatten
Rails.logger.warn("Partitions assigned: #{partitions}")
# Ensure new files are created to detect cases where the consumer didn't process
# any messages from a newly-assigned partition
partitions.each do |partition|
File.new("/app/tmp/kafka_partition_#{partition}_alive", 'w')
end
end
def on_partitions_revoked(_consumer, _list)
def on_partitions_revoked(_consumer, list)
partitions = list.to_h.map { |_key, values| values.map(&:partition) }.flatten
Rails.logger.warn("Partitions revoked: #{partitions}")
end
end
Racecar.config.consumer_rebalance_listener do
MyConsumerRebalanceListener.new
RebalanceListener.new
end
```

Note that the exact requirements for the consumer rebalance listener are set by rdkafka-ruby and may change from version to version.
Something like the following bash script could be used by Kubernetes to detect (and log) when progress has been made in specific partitions (tweak `MAX_AGE_MINUTES` to taste):

``` bash
#!/usr/bin/env bash
set -euo pipefail
MAX_AGE_MINUTES=5
STUCK_PARTITIONS=$(
find /app/tmp/ -name 'kafka_partition_*_alive' -mmin "+$MAX_AGE_MINUTES" | \
sed -e 's|/app/tmp/||' -e 's|_alive||' -e 's|kafka_partition_||'
)
if [ -n "$STUCK_PARTITIONS" ]; then
echo -n "Found stuck partitions:" >&2
echo "$STUCK_PARTITIONS" | tr '\n' ', ' | sed 's/,$/\n/' >&2
exit 1
fi
```

### Logging

Expand Down

0 comments on commit febd22e

Please sign in to comment.