@@ -125,7 +125,7 @@ def check_config_files_path(self):
125
125
if message .errno == errno .EACCES :
126
126
raise DevoSenderException (
127
127
ERROR_MSGS .CANT_READ_CONFIG_FILE % (
128
- file , str (message ))) \
128
+ file , str (message ))) \
129
129
from message
130
130
else :
131
131
raise DevoSenderException (
@@ -303,11 +303,12 @@ class Sender(logging.Handler):
303
303
:param con_type: TCP or SSL, default SSL, you can pass it in
304
304
config object too
305
305
:param timeout: timeout for socket
306
+ :param inactivity_timeout: inactivity timeout for Ingestion balancer, so connection is restarted before reaching
306
307
:param debug: For more info in console/logger output
307
308
:param logger: logger. Default sys.console
308
309
"""
309
310
310
- def __init__ (self , config = None , con_type = None ,
311
+ def __init__ (self , config = None , con_type = None , inactivity_timeout = 30 ,
311
312
timeout = 30 , debug = False , logger = None ):
312
313
if config is None :
313
314
raise DevoSenderException (ERROR_MSGS .PROBLEMS_WITH_SENDER_ARGS )
@@ -316,7 +317,9 @@ def __init__(self, config=None, con_type=None,
316
317
self .reconnection = 0
317
318
self .debug = debug
318
319
self .socket_timeout = timeout
320
+ self .inactivity_timeout = inactivity_timeout
319
321
self .socket_max_connection = 3600 * 1000
322
+ self .last_message = int (time .time ())
320
323
self .buffer = SenderBuffer ()
321
324
self .logging = {}
322
325
@@ -362,6 +365,7 @@ def __connect_tcp_socket(self):
362
365
self .socket .settimeout (self .socket_timeout )
363
366
try :
364
367
self .socket .connect (self ._sender_config .address )
368
+ self .last_message = int (time .time ())
365
369
except socket .error as error :
366
370
self .close ()
367
371
raise DevoSenderException (
@@ -392,38 +396,36 @@ def __connect_ssl(self):
392
396
raise DevoSenderException (
393
397
ERROR_MSGS .PFX_CERTIFICATE_READ_FAILED % str (error )) from error
394
398
try :
395
- try :
396
- if self ._sender_config .key is not None \
397
- and self ._sender_config .chain is not None \
398
- and self ._sender_config .cert is not None :
399
-
400
- context = ssl .create_default_context (
401
- cafile = self ._sender_config .chain )
402
-
403
- if self ._sender_config .sec_level is not None :
404
- context .set_ciphers (
405
- "DEFAULT@SECLEVEL={!s}"
406
- .format (self ._sender_config .sec_level ))
407
-
408
- context .check_hostname = self ._sender_config .check_hostname
409
-
410
- if self ._sender_config .verify_mode is not None :
411
- context .verify_mode = self ._sender_config .verify_mode
412
-
413
- context .load_cert_chain (keyfile = self ._sender_config .key ,
414
- certfile = self ._sender_config .cert )
415
- self .socket = \
416
- context .wrap_socket (
417
- self .socket ,
418
- server_hostname = self ._sender_config .address [0 ]
419
- )
420
- else :
421
- self .socket = ssl .wrap_socket (self .socket ,
422
- cert_reqs = ssl .CERT_NONE )
423
- except ssl .SSLError :
424
- raise ssl .SSLError
399
+ if self ._sender_config .key is not None \
400
+ and self ._sender_config .chain is not None \
401
+ and self ._sender_config .cert is not None :
402
+
403
+ context = ssl .create_default_context (
404
+ cafile = self ._sender_config .chain )
405
+
406
+ if self ._sender_config .sec_level is not None :
407
+ context .set_ciphers (
408
+ "DEFAULT@SECLEVEL={!s}"
409
+ .format (self ._sender_config .sec_level ))
410
+
411
+ context .check_hostname = self ._sender_config .check_hostname
412
+
413
+ if self ._sender_config .verify_mode is not None :
414
+ context .verify_mode = self ._sender_config .verify_mode
415
+
416
+ context .load_cert_chain (keyfile = self ._sender_config .key ,
417
+ certfile = self ._sender_config .cert )
418
+ self .socket = \
419
+ context .wrap_socket (
420
+ self .socket ,
421
+ server_hostname = self ._sender_config .address [0 ]
422
+ )
423
+ else :
424
+ self .socket = ssl .wrap_socket (self .socket ,
425
+ cert_reqs = ssl .CERT_NONE )
425
426
426
427
self .socket .connect (self ._sender_config .address )
428
+ self .last_message = int (time .time ())
427
429
self .reconnection += 1
428
430
if self .debug :
429
431
self .logger .debug ('Conected to %s|%s'
@@ -527,6 +529,13 @@ def __status(self):
527
529
if self .socket_max_connection < timeit :
528
530
self .close ()
529
531
return False
532
+
533
+ # If there is no activity (connection or message sent) for an amount of time bigger then the inactivity
534
+ # timeout, the balancer may have already close the connection. Close it and reconnect.
535
+ if int (time .time ()) - self .last_message > self .inactivity_timeout :
536
+ self .close ()
537
+ return False
538
+
530
539
return True
531
540
532
541
def close (self ):
@@ -569,6 +578,7 @@ def __send_oc(self, record):
569
578
for iteration in range (0 , total ):
570
579
part = record [int (iteration * 4096 ):
571
580
int ((iteration + 1 ) * 4096 )]
581
+ self .last_message = int (time .time ())
572
582
if self .socket .sendall (part ) is not None :
573
583
raise DevoSenderException (ERROR_MSGS .SEND_ERROR )
574
584
sent += len (part )
@@ -593,6 +603,7 @@ def send_raw(self, record, multiline=False, zip=False):
593
603
if not multiline and not zip :
594
604
msg = self .__encode_record (record )
595
605
sent = len (msg )
606
+ self .last_message = int (time .time ())
596
607
if self .socket .sendall (msg ) is not None :
597
608
raise DevoSenderException (ERROR_MSGS .SEND_ERROR )
598
609
return 1
0 commit comments