From 3ccf695296371a9c0ee91d82e2fcb480788e751e Mon Sep 17 00:00:00 2001 From: nailixing Date: Wed, 10 Jun 2020 22:37:23 +0800 Subject: [PATCH] Retry connect to kafka broker --- singa_auto/kafka/inference_cache.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/singa_auto/kafka/inference_cache.py b/singa_auto/kafka/inference_cache.py index c9db2905..440db43a 100644 --- a/singa_auto/kafka/inference_cache.py +++ b/singa_auto/kafka/inference_cache.py @@ -18,6 +18,9 @@ # import os +import time +import traceback + from kafka import KafkaConsumer from kafka import KafkaProducer from kafka.errors import KafkaError @@ -97,10 +100,21 @@ def pop_queries_for_worker(self, worker_id: str, batch_size: int) -> List[Query]: name = f'workers_{worker_id}_queries' - query_consumer = KafkaConsumer(name, - bootstrap_servers=self.connection_url, - auto_offset_reset='earliest', - group_id=QUERIES_QUEUE) + RETRY_TIMES = 4 + while True: + try: + query_consumer = KafkaConsumer(name, + bootstrap_servers=self.connection_url, + auto_offset_reset='earliest', + group_id=QUERIES_QUEUE) + break + except Exception as e: + logger.error('Kafka conn Error, retry: {}'.format(RETRY_TIMES)) + logger.error(traceback.format_exc()) + RETRY_TIMES -= 1 + time.sleep(1) + if RETRY_TIMES <= 0: + raise partition = TopicPartition(name, 0) partitiondic = query_consumer.end_offsets([partition])