Skip to content

Commit

Permalink
client_async: Allow throwing an exception upon socket error during
Browse files Browse the repository at this point in the history
wakeup

When wakeup() is called, we sometime notice that we get
an endless prints:
"Unable to send to wakeup socket!".

Those prints are spamming the logs.
This commit aims to address it by allowing restating the
application via an intentional exception raise.
This behavior is configurable and its default is backward compatible.

Signed-off-by: shimon-armis <[email protected]>
  • Loading branch information
shimonturjeman committed Jun 26, 2023
1 parent 7ac6c6e commit ef4721c
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ class KafkaClient(object):
sasl mechanism handshake. Default: one of bootstrap servers
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
instance. (See kafka.oauth.abstract). Default: None
raise_upon_socket_err_during_wakeup (bool): If set to True, raise an exception
upon socket error during wakeup(). Default: False
"""

DEFAULT_CONFIG = {
Expand Down Expand Up @@ -192,7 +194,8 @@ class KafkaClient(object):
'sasl_plain_password': None,
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None
'sasl_oauth_token_provider': None,
'raise_upon_socket_err_during_wakeup': False
}

def __init__(self, **configs):
Expand Down Expand Up @@ -243,6 +246,8 @@ def __init__(self, **configs):
check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
self.config['api_version'] = self.check_version(timeout=check_timeout)

self._raise_upon_socket_err_during_wakeup = self.config['raise_upon_socket_err_during_wakeup']

def _can_bootstrap(self):
effective_failures = self._bootstrap_fails // self._num_bootstrap_hosts
backoff_factor = 2 ** effective_failures
Expand Down Expand Up @@ -933,8 +938,10 @@ def wakeup(self):
except socket.timeout:
log.warning('Timeout to send to wakeup socket!')
raise Errors.KafkaTimeoutError()
except socket.error:
except socket.error as e:
log.warning('Unable to send to wakeup socket!')
if self._raise_upon_socket_err_during_wakeup:
raise e

def _clear_wake_fd(self):
# reading from wake socket should only happen in a single thread
Expand Down

0 comments on commit ef4721c

Please sign in to comment.