Skip to content

Commit 460f078

Browse files
committed
Instrument metrics in BrokerConnection
1 parent af08b54 commit 460f078

File tree

4 files changed

+127
-17
lines changed

4 files changed

+127
-17
lines changed

kafka/client_async.py

+2
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ def _bootstrap(self, hosts):
222222
cb = functools.partial(self._conn_state_change, 'bootstrap')
223223
bootstrap = BrokerConnection(host, port, afi,
224224
state_change_callback=cb,
225+
node_id='bootstrap',
225226
**self.config)
226227
bootstrap.connect()
227228
while bootstrap.connecting():
@@ -313,6 +314,7 @@ def _maybe_connect(self, node_id):
313314
cb = functools.partial(self._conn_state_change, node_id)
314315
self._conns[node_id] = BrokerConnection(host, broker.port, afi,
315316
state_change_callback=cb,
317+
node_id=node_id,
316318
**self.config)
317319
conn = self._conns[node_id]
318320
if conn.connected():

kafka/conn.py

+123
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import kafka.errors as Errors
1616
from kafka.future import Future
17+
from kafka.metrics.stats import Avg, Count, Max, Rate
1718
from kafka.protocol.api import RequestHeader
1819
from kafka.protocol.admin import SaslHandShakeRequest
1920
from kafka.protocol.commit import GroupCoordinatorResponse
@@ -58,6 +59,7 @@ class ConnectionStates(object):
5859
class BrokerConnection(object):
5960
DEFAULT_CONFIG = {
6061
'client_id': 'kafka-python-' + __version__,
62+
'node_id': 0,
6163
'request_timeout_ms': 40000,
6264
'reconnect_backoff_ms': 50,
6365
'max_in_flight_requests_per_connection': 5,
@@ -74,6 +76,8 @@ class BrokerConnection(object):
7476
'ssl_password': None,
7577
'api_version': (0, 8, 2), # default to most restrictive
7678
'state_change_callback': lambda conn: True,
79+
'metrics': None,
80+
'metric_group_prefix': '',
7781
'sasl_mechanism': 'PLAIN',
7882
'sasl_plain_username': None,
7983
'sasl_plain_password': None
@@ -138,6 +142,9 @@ def __init__(self, host, port, afi, **configs):
138142
api version. Only applies if api_version is None
139143
state_chance_callback (callable): function to be called when the
140144
connection state changes from CONNECTING to CONNECTED etc.
145+
metrics (kafka.metrics.Metrics): Optionally provide a metrics
146+
instance for capturing network IO stats. Default: None.
147+
metric_group_prefix (str): Prefix for metric names. Default: ''
141148
sasl_mechanism (str): string picking sasl mechanism when security_protocol
142149
is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported.
143150
Default: None
@@ -188,6 +195,11 @@ def __init__(self, host, port, afi, **configs):
188195
self._correlation_id = 0
189196
self._gai = None
190197
self._gai_index = 0
198+
self._sensors = None
199+
if self.config['metrics']:
200+
self._sensors = BrokerConnectionMetrics(self.config['metrics'],
201+
self.config['metric_group_prefix'],
202+
self.config['node_id'])
191203

192204
def connect(self):
193205
"""Attempt to connect and return ConnectionState"""
@@ -518,6 +530,8 @@ def _send(self, request, expect_response=True):
518530
sent_bytes = self._sock.send(data[total_sent:])
519531
total_sent += sent_bytes
520532
assert total_sent == len(data)
533+
if self._sensors:
534+
self._sensors.bytes_sent.record(total_sent)
521535
self._sock.setblocking(False)
522536
except (AssertionError, ConnectionError) as e:
523537
log.exception("Error sending %s to %s", request, self)
@@ -648,6 +662,8 @@ def _recv(self):
648662

649663
self._receiving = False
650664
self._next_payload_bytes = 0
665+
if self._sensors:
666+
self._sensors.bytes_received.record(4 + self._rbuffer.tell())
651667
self._rbuffer.seek(0)
652668
response = self._process_response(self._rbuffer)
653669
self._rbuffer.seek(0)
@@ -658,6 +674,8 @@ def _process_response(self, read_buffer):
658674
assert not self._processing, 'Recursion not supported'
659675
self._processing = True
660676
ifr = self.in_flight_requests.popleft()
677+
if self._sensors:
678+
self._sensors.request_time.record((time.time() - ifr.timestamp) * 1000)
661679

662680
# verify send/recv correlation ids match
663681
recv_correlation_id = Int32.decode(read_buffer)
@@ -827,6 +845,111 @@ def __repr__(self):
827845
self.port)
828846

829847

848+
class BrokerConnectionMetrics(object):
849+
def __init__(self, metrics, metric_group_prefix, node_id):
850+
self.metrics = metrics
851+
852+
# Any broker may have registered summary metrics already
853+
# but if not, we need to create them so we can set as parents below
854+
all_conns_transferred = metrics.get_sensor('bytes-sent-received')
855+
if not all_conns_transferred:
856+
metric_group_name = metric_group_prefix + '-metrics'
857+
858+
bytes_transferred = metrics.sensor('bytes-sent-received')
859+
bytes_transferred.add(metrics.metric_name(
860+
'network-io-rate', metric_group_name,
861+
'The average number of network operations (reads or writes) on all'
862+
' connections per second.'), Rate(sampled_stat=Count()))
863+
864+
bytes_sent = metrics.sensor('bytes-sent',
865+
parents=[bytes_transferred])
866+
bytes_sent.add(metrics.metric_name(
867+
'outgoing-byte-rate', metric_group_name,
868+
'The average number of outgoing bytes sent per second to all'
869+
' servers.'), Rate())
870+
bytes_sent.add(metrics.metric_name(
871+
'request-rate', metric_group_name,
872+
'The average number of requests sent per second.'),
873+
Rate(sampled_stat=Count()))
874+
bytes_sent.add(metrics.metric_name(
875+
'request-size-avg', metric_group_name,
876+
'The average size of all requests in the window.'), Avg())
877+
bytes_sent.add(metrics.metric_name(
878+
'request-size-max', metric_group_name,
879+
'The maximum size of any request sent in the window.'), Max())
880+
881+
bytes_received = metrics.sensor('bytes-received',
882+
parents=[bytes_transferred])
883+
bytes_received.add(metrics.metric_name(
884+
'incoming-byte-rate', metric_group_name,
885+
'Bytes/second read off all sockets'), Rate())
886+
bytes_received.add(metrics.metric_name(
887+
'response-rate', metric_group_name,
888+
'Responses received sent per second.'),
889+
Rate(sampled_stat=Count()))
890+
891+
request_latency = metrics.sensor('request-latency')
892+
request_latency.add(metrics.metric_name(
893+
'request-latency-avg', metric_group_name,
894+
'The average request latency in ms.'),
895+
Avg())
896+
request_latency.add(metrics.metric_name(
897+
'request-latency-max', metric_group_name,
898+
'The maximum request latency in ms.'),
899+
Max())
900+
901+
# if one sensor of the metrics has been registered for the connection,
902+
# then all other sensors should have been registered; and vice versa
903+
node_str = 'node-{0}'.format(node_id)
904+
node_sensor = metrics.get_sensor(node_str + '.bytes-sent')
905+
if not node_sensor:
906+
metric_group_name = metric_group_prefix + '-node-metrics.' + node_str
907+
908+
self.bytes_sent = metrics.sensor(
909+
node_str + '.bytes-sent',
910+
parents=[metrics.get_sensor('bytes-sent')])
911+
self.bytes_sent.add(metrics.metric_name(
912+
'outgoing-byte-rate', metric_group_name,
913+
'The average number of outgoing bytes sent per second.'),
914+
Rate())
915+
self.bytes_sent.add(metrics.metric_name(
916+
'request-rate', metric_group_name,
917+
'The average number of requests sent per second.'),
918+
Rate(sampled_stat=Count()))
919+
self.bytes_sent.add(metrics.metric_name(
920+
'request-size-avg', metric_group_name,
921+
'The average size of all requests in the window.'),
922+
Avg())
923+
self.bytes_sent.add(metrics.metric_name(
924+
'request-size-max', metric_group_name,
925+
'The maximum size of any request sent in the window.'),
926+
Max())
927+
928+
self.bytes_received = metrics.sensor(
929+
node_str + '.bytes-received',
930+
parents=[metrics.get_sensor('bytes-received')])
931+
self.bytes_received.add(metrics.metric_name(
932+
'incoming-byte-rate', metric_group_name,
933+
'Bytes/second read off node-connection socket'),
934+
Rate())
935+
self.bytes_received.add(metrics.metric_name(
936+
'response-rate', metric_group_name,
937+
'The average number of responses received per second.'),
938+
Rate(sampled_stat=Count()))
939+
940+
self.request_time = self.metrics.sensor(
941+
node_str + '.latency',
942+
parents=[metrics.get_sensor('request-latency')])
943+
self.request_time.add(metrics.metric_name(
944+
'request-latency-avg', metric_group_name,
945+
'The average request latency in ms.'),
946+
Avg())
947+
self.request_time.add(metrics.metric_name(
948+
'request-latency-max', metric_group_name,
949+
'The maximum request latency in ms.'),
950+
Max())
951+
952+
830953
def _address_family(address):
831954
"""
832955
Attempt to determine the family of an address (or hostname)

kafka/producer/sender.py

-17
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,6 @@ def _handle_produce_response(self, node_id, send_time, batches, response):
204204
batch = batches_by_partition[tp]
205205
self._complete_batch(batch, error, offset, ts)
206206

207-
self._sensors.record_latency((time.time() - send_time) * 1000, node=node_id)
208207
if response.API_VERSION > 0:
209208
self._sensors.record_throttle_time(response.throttle_time_ms, node=node_id)
210209

@@ -343,15 +342,6 @@ def __init__(self, metrics, client, metadata):
343342
sensor_name=sensor_name,
344343
description='The maximum time in ms record batches spent in the record accumulator.')
345344

346-
sensor_name = 'request-time'
347-
self.request_time_sensor = self.metrics.sensor(sensor_name)
348-
self.add_metric('request-latency-avg', Avg(),
349-
sensor_name=sensor_name,
350-
description='The average request latency in ms')
351-
self.add_metric('request-latency-max', Max(),
352-
sensor_name=sensor_name,
353-
description='The maximum request latency in ms')
354-
355345
sensor_name = 'produce-throttle-time'
356346
self.produce_throttle_time_sensor = self.metrics.sensor(sensor_name)
357347
self.add_metric('produce-throttle-time-avg', Avg(),
@@ -498,12 +488,5 @@ def record_errors(self, topic, count):
498488
if sensor:
499489
sensor.record(count)
500490

501-
def record_latency(self, latency, node=None):
502-
self.request_time_sensor.record(latency)
503-
if node is not None:
504-
sensor = self.metrics.get_sensor('node-' + str(node) + '.latency')
505-
if sensor:
506-
sensor.record(latency)
507-
508491
def record_throttle_time(self, throttle_time_ms, node=None):
509492
self.produce_throttle_time_sensor.record(throttle_time_ms)

test/test_client_async.py

+2
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ def test_bootstrap_success(conn):
4949
args, kwargs = conn.call_args
5050
assert args == ('localhost', 9092, socket.AF_UNSPEC)
5151
kwargs.pop('state_change_callback')
52+
kwargs.pop('node_id')
5253
assert kwargs == cli.config
5354
conn.connect.assert_called_with()
5455
conn.send.assert_called_once_with(MetadataRequest[0]([]))
@@ -62,6 +63,7 @@ def test_bootstrap_failure(conn):
6263
args, kwargs = conn.call_args
6364
assert args == ('localhost', 9092, socket.AF_UNSPEC)
6465
kwargs.pop('state_change_callback')
66+
kwargs.pop('node_id')
6567
assert kwargs == cli.config
6668
conn.connect.assert_called_with()
6769
conn.close.assert_called_with()

0 commit comments

Comments
 (0)