-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathrpc.py
98 lines (90 loc) · 3.76 KB
/
rpc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import time
import json
class RPC(object):
def __init__(self, locator, rcpId, msg):
self.locator = locator
self.id = rcpId
self.msg = msg
self.reply = None
self.temp = None
self.status = "pending"
self.time = time.time()
def __str__(self):
s = "<" , self.locator, ", "
s = s, self.id, ", ",
s = s, self.msg, ", ",
s = s, self.reply, ", ",
s = s, self.status, ">"
return s
class RPCManager(object):
def __init__(self, sessionManager, inQ):
self.inQ = inQ
self.sessionManager = sessionManager
self.inRPC = {}
self.outRPC = {}
self.counter = 0
def poll(self):
# Get incomming RPC
while not self.inQ.empty():
locator, datastr = self.inQ.get()
rpcId, kind, data = json.loads(datastr)
# 0 is sender
if kind == "msg":
if (locator, rpcId) not in self.inRPC.keys():
self.inRPC[(locator, rpcId)] = RPC(locator, rpcId, data)
rpc = self.inRPC[(locator, rpcId)]
if rpc.status == "complete":
# resend reply
self.sessionManager.send(rpc.locator,
json.dumps((rpc.id,
"reply",
rpc.reply)))
print "RPC Replied"
else:
# send ack
self.sessionManager.send(locator,
json.dumps((rpcId, "ack", None)))
elif kind == "reply":
if (locator, rpcId) in self.outRPC.keys():
self.outRPC[(locator, rpcId)].reply = data
self.outRPC[(locator, rpcId)].status = "complete"
print "RPC Complete"
elif kind == "ack":
if (locator, rpcId) in self.outRPC.keys():
rpc = self.outRPC[(locator, rpcId)]
if rpc.status == "pending":
rpc.status = "acked"
if (locator, rpcId) in self.inRPC.keys():
del self.inRPC[(locator, rpcId)]
# send out rpc reply
for rpc in self.inRPC.values():
if rpc.status == "send":
self.sessionManager.send(rpc.locator,
json.dumps((rpc.id,
"reply",
rpc.reply)))
rpc.status = "complete"
rpc.time = time.time()
print "RPC Replied"
# check on outstanding RPCs
for key in self.outRPC.keys():
rpc = self.outRPC[key]
# mark rpc failed if server dies
if rpc.locator not in self.sessionManager.serverList():
if rpc.status != "failed":
rpc.status = "failed"
print "RPC Failed"
# resend RPC is no ack is recived in time
if rpc.status == "pending" and (time.time() - rpc.time) > 0.25:
self.send(rpc)
def send(self, rpc):
rpc.id = "{0}:{1}:{2}".format(self.sessionManager.locator,
rpc.locator,
self.counter)
self.counter += 1
self.outRPC[(rpc.locator, rpc.id)] = rpc
self.sessionManager.send(rpc.locator, json.dumps((rpc.id,
"msg",
rpc.msg)))
rpc.time = time.time()
print "RPC Sent"