Skip to content

Commit 34dc9dd

Browse files
authored
Use socket timeout of request_timeout_ms to prevent blocking forever on send (dpkp#1281)
1 parent d2001e4 commit 34dc9dd

File tree

1 file changed

+7
-6
lines changed

1 file changed

+7
-6
lines changed

kafka/conn.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from __future__ import absolute_import
1+
from __future__ import absolute_import, division
22

33
import collections
44
import copy
@@ -491,7 +491,7 @@ def _handle_sasl_handshake_response(self, future, response):
491491
self.config['sasl_mechanism']))
492492

493493
def _send_bytes_blocking(self, data):
494-
self._sock.setblocking(True)
494+
self._sock.settimeout(self.config['request_timeout_ms'] / 1000)
495495
total_sent = 0
496496
try:
497497
while total_sent < len(data):
@@ -501,10 +501,10 @@ def _send_bytes_blocking(self, data):
501501
raise ConnectionError('Buffer overrun during socket send')
502502
return total_sent
503503
finally:
504-
self._sock.setblocking(False)
504+
self._sock.settimeout(0.0)
505505

506506
def _recv_bytes_blocking(self, n):
507-
self._sock.setblocking(True)
507+
self._sock.settimeout(self.config['request_timeout_ms'] / 1000)
508508
try:
509509
data = b''
510510
while len(data) < n:
@@ -514,7 +514,7 @@ def _recv_bytes_blocking(self, n):
514514
data += fragment
515515
return data
516516
finally:
517-
self._sock.setblocking(False)
517+
self._sock.settimeout(0.0)
518518

519519
def _try_authenticate_plain(self, future):
520520
if self.config['security_protocol'] == 'SASL_PLAINTEXT':
@@ -696,6 +696,7 @@ def _send(self, request):
696696
# In the future we might manage an internal write buffer
697697
# and send bytes asynchronously. For now, just block
698698
# sending each request payload
699+
sent_time = time.time()
699700
total_bytes = self._send_bytes_blocking(data)
700701
if self._sensors:
701702
self._sensors.bytes_sent.record(total_bytes)
@@ -707,7 +708,7 @@ def _send(self, request):
707708
log.debug('%s Request %d: %s', self, correlation_id, request)
708709

709710
if request.expect_response():
710-
ifr = (correlation_id, future, time.time())
711+
ifr = (correlation_id, future, sent_time)
711712
self.in_flight_requests.append(ifr)
712713
else:
713714
future.success(None)

0 commit comments

Comments
 (0)