Skip to content

Commit 0384429

Browse files
authored
fix redis balance server, test=develop (#139)
Signed-off-by: WangXi <[email protected]>
1 parent ed9fb22 commit 0384429

File tree

2 files changed

+32
-11
lines changed

2 files changed

+32
-11
lines changed

python/edl/distill/redis/balance_server.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,19 @@
2323
import time
2424

2525

26+
def set_keepalive_linux(sock, after_idle_sec=30, interval_sec=10, max_fails=6):
27+
"""Set TCP keepalive on an open socket.
28+
29+
It activates after 30 second (after_idle_sec) of idleness,
30+
then sends a keepalive ping once every 10 seconds (interval_sec),
31+
and closes the connection after 6 failed ping (max_fails), or 60 seconds
32+
"""
33+
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
34+
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, after_idle_sec)
35+
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval_sec)
36+
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, max_fails)
37+
38+
2639
class Server(object):
2740
_READ = select.EPOLLIN | select.EPOLLHUP | select.EPOLLERR
2841
_WRITE = select.EPOLLOUT | select.EPOLLHUP | select.EPOLLERR
@@ -116,6 +129,7 @@ def _init_conn(self):
116129
# client.getpeername()
117130

118131
client.setblocking(False)
132+
set_keepalive_linux(client)
119133
fd = client.fileno()
120134
self._epoll.register(fd, self._READ)
121135
self._clients[fd] = client

python/edl/distill/redis/service_table.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,13 @@ def is_servers_update(self, fd, version):
5959
return new_version, None
6060

6161
def get_servers(self, fd, num):
62-
if fd not in self._fd_to_service_name:
62+
service_name = self._fd_to_service_name.get(fd)
63+
if service_name is None:
6364
# not register
6465
return []
65-
service_name = self._fd_to_service_name[fd]
6666

6767
if service_name not in self._service_name_to_servers or \
68-
self._service_name_to_update[service_name] is True:
68+
self._service_name_to_update.get(service_name, False):
6969
self._refresh_service(service_name)
7070

7171
return list(self._fd_to_servers[fd])
@@ -85,12 +85,11 @@ def add_service_name(self, fd, service_name, num):
8585
self._service_name_to_update[service_name] = True
8686

8787
def rm_service_name(self, fd):
88-
# client maybe exit before register
89-
if fd not in self._fd_to_service_name:
88+
service_name = self._fd_to_service_name.get(fd)
89+
if service_name is None:
90+
# client maybe exit before register
9091
return
9192

92-
service_name = self._fd_to_service_name[fd]
93-
9493
with self._mutex:
9594
if service_name in self._service_name_to_fds:
9695
try:
@@ -141,7 +140,7 @@ def _refresh_service(self, service_name):
141140

142141
# no change
143142
if len(rm_servers) == 0 and len(add_servers) == 0 and \
144-
self._service_name_to_update[service_name] is False:
143+
not self._service_name_to_update.get(service_name, False):
145144
return
146145
self._service_name_to_update[service_name] = False
147146
update_fd = set()
@@ -182,7 +181,7 @@ def _refresh_service(self, service_name):
182181
# assign: {fd0:32, fd1:32, fd2:32}
183182
server_max_connect = int((fd_num + server_num - 1) / server_num)
184183
fd_max_connect = max(1, int(server_num / fd_num))
185-
#fd_max_connect = int((server_num + fd_num - 1) / fd_num)
184+
# fd_max_connect = int((server_num + fd_num - 1) / fd_num)
186185
print('fd_num={}, server_num={}, smax={}, mcon={}'.format(
187186
fd_num, server_num, server_max_connect, fd_max_connect))
188187

@@ -253,9 +252,17 @@ def _refresh(self):
253252
except KeyError:
254253
pass
255254

256-
time.sleep(2)
255+
time.sleep(3)
256+
257+
def refresh(self):
258+
while True:
259+
try:
260+
self._refresh()
261+
except Exception as e:
262+
sys.stderr.write(str(e) + '\n')
263+
time.sleep(6)
257264

258265
def start(self):
259-
self._thread = threading.Thread(target=self._refresh)
266+
self._thread = threading.Thread(target=self.refresh)
260267
self._thread.daemon = True
261268
self._thread.start()

0 commit comments

Comments
 (0)