Skip to content

Commit

Permalink
Retry connect to kafka broker
Browse files Browse the repository at this point in the history
  • Loading branch information
nailixing committed Jun 10, 2020
1 parent 6c834fd commit 3ccf695
Showing 1 changed file with 18 additions and 4 deletions.
22 changes: 18 additions & 4 deletions singa_auto/kafka/inference_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#

import os
import time
import traceback

from kafka import KafkaConsumer
from kafka import KafkaProducer
from kafka.errors import KafkaError
Expand Down Expand Up @@ -97,10 +100,21 @@ def pop_queries_for_worker(self, worker_id: str,
batch_size: int) -> List[Query]:
name = f'workers_{worker_id}_queries'

query_consumer = KafkaConsumer(name,
bootstrap_servers=self.connection_url,
auto_offset_reset='earliest',
group_id=QUERIES_QUEUE)
RETRY_TIMES = 4
while True:
try:
query_consumer = KafkaConsumer(name,
bootstrap_servers=self.connection_url,
auto_offset_reset='earliest',
group_id=QUERIES_QUEUE)
break
except Exception as e:
logger.error('Kafka conn Error, retry: {}'.format(RETRY_TIMES))
logger.error(traceback.format_exc())
RETRY_TIMES -= 1
time.sleep(1)
if RETRY_TIMES <= 0:
raise

partition = TopicPartition(name, 0)
partitiondic = query_consumer.end_offsets([partition])
Expand Down

0 comments on commit 3ccf695

Please sign in to comment.