Skip to content

Commit 025b69e

Browse files
committed
Instrument bufferpool-wait-ratio metric in KafkaProducer
1 parent 460f078 commit 025b69e

File tree

3 files changed

+17
-8
lines changed

3 files changed

+17
-8
lines changed

kafka/producer/buffer.py

+11-6
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
gzip_encode, snappy_encode,
1010
lz4_encode, lz4_encode_old_kafka)
1111
from .. import errors as Errors
12+
from ..metrics.stats import Rate
1213
from ..protocol.types import Int32, Int64
1314
from ..protocol.message import MessageSet, Message
1415

@@ -135,7 +136,7 @@ def buffer(self):
135136

136137
class SimpleBufferPool(object):
137138
"""A simple pool of BytesIO objects with a weak memory ceiling."""
138-
def __init__(self, memory, poolable_size):
139+
def __init__(self, memory, poolable_size, metrics=None, metric_group_prefix='producer-metrics'):
139140
"""Create a new buffer pool.
140141
141142
Arguments:
@@ -150,10 +151,13 @@ def __init__(self, memory, poolable_size):
150151
self._free = collections.deque([io.BytesIO() for _ in range(buffers)])
151152

152153
self._waiters = collections.deque()
153-
#self.metrics = metrics;
154-
#self.waitTime = this.metrics.sensor("bufferpool-wait-time");
155-
#MetricName metricName = metrics.metricName("bufferpool-wait-ratio", metricGrpName, "The fraction of time an appender waits for space allocation.");
156-
#this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
154+
self.wait_time = None
155+
if metrics:
156+
self.wait_time = metrics.sensor('bufferpool-wait-time')
157+
self.wait_time.add(metrics.metric_name(
158+
'bufferpool-wait-ratio', metric_group_prefix,
159+
'The fraction of time an appender waits for space allocation.'),
160+
Rate())
157161

158162
def allocate(self, size, max_time_to_block_ms):
159163
"""
@@ -187,7 +191,8 @@ def allocate(self, size, max_time_to_block_ms):
187191
start_wait = time.time()
188192
more_memory.wait(max_time_to_block_ms / 1000.0)
189193
end_wait = time.time()
190-
#this.waitTime.record(endWait - startWait, time.milliseconds());
194+
if self.wait_time:
195+
self.wait_time.record(end_wait - start_wait)
191196

192197
if self._free:
193198
buf = self._free.popleft()

kafka/producer/kafka.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ def __init__(self, **configs):
335335
assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers'
336336

337337
message_version = 1 if self.config['api_version'] >= (0, 10) else 0
338-
self._accumulator = RecordAccumulator(message_version=message_version, **self.config)
338+
self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config)
339339
self._metadata = client.cluster
340340
guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1)
341341
self._sender = Sender(client, self._metadata,

kafka/producer/record_accumulator.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,8 @@ class RecordAccumulator(object):
162162
'linger_ms': 0,
163163
'retry_backoff_ms': 100,
164164
'message_version': 0,
165+
'metrics': None,
166+
'metric_group_prefix': 'producer-metrics',
165167
}
166168

167169
def __init__(self, **configs):
@@ -176,7 +178,9 @@ def __init__(self, **configs):
176178
self._batches = collections.defaultdict(collections.deque) # TopicPartition: [RecordBatch]
177179
self._tp_locks = {None: threading.Lock()} # TopicPartition: Lock, plus a lock to add entries
178180
self._free = SimpleBufferPool(self.config['buffer_memory'],
179-
self.config['batch_size'])
181+
self.config['batch_size'],
182+
metrics=self.config['metrics'],
183+
metric_group_prefix=self.config['metric_group_prefix'])
180184
self._incomplete = IncompleteRecordBatches()
181185
# The following variables should only be accessed by the sender thread,
182186
# so we don't need to protect them w/ locking.

0 commit comments

Comments
 (0)