-
Notifications
You must be signed in to change notification settings - Fork 0
/
Chat.pas
270 lines (252 loc) · 6.59 KB
/
Chat.pas
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
unit Chat;
{
Implement two-way realiable acked lock-step protocol
}
INTERFACE
uses NetAddr,ServerLoop,MemStream;
type tChat=object
remote:tNetAddr;
opcode:byte;
txSeq:Word;
rxSeq:Word;
rxAcked:boolean;
closed:boolean;
RTT:LongWord;{in ms}
Callback: procedure(msg:tSMsg; data:boolean) of object; {client must maintain active chats}
OnTimeout: procedure of object;
OnDispose: procedure of object;
procedure Init(const iremote:tNetAddr);
procedure SetTimeout(acktm,repltm:LongInt);
procedure AddHeaders(var s:tMemoryStream);
procedure StreamInit(var s:tMemoryStream; l:word);
procedure Send(s:tMemoryStream);
{the stream can be invalidated, but the buffer must not be modified or freed}
procedure Ack;
procedure Close;
private
txPk:pointer; txLen:word; {last sent, not acked msg}
txTime:tDateTime;
tmAck,tmReply:LongWord;{ms}
procedure InitFrom(const iremote:tNetAddr; iopcode:byte);
procedure Done;
procedure Resend;
procedure OnReply(msg:tSMsg);
procedure ReplyTimeout;
end;
type tChatHandler=procedure(var nchat:tChat; msg:tSMsg);
procedure SetChatHandler(initcode:byte; handler:tChatHandler);
{ download manager create FileRequest
File Request open chat session to server
upmgr accepts chat and send reply
FileRequest acks, chat is then closed after TimeWait
upmgr starts TC transfer
transfer finished, upmgr send new Chat to FileRequest
FileRequest acks, chat is closed on both ends
FileRequest can open new chat if blocks are missing
=> chat msgs must be created with New, disposed by Chat
=> there is TimeWait, no references are to the Chat, except Sheduler, it Disposes itself.
}
{ Chats are the HiMsg. Use hash table from ServerLoop, works for HiMsg too. }
IMPLEMENTATION
uses SysUtils;
procedure tChat.Init(const iremote:tNetAddr);
begin
remote:=iremote;
opcode:=128+Random(128); {$warning possible overflow}
while ServerLoop.IsMsgHandled(opcode,remote) do inc(opcode);
InitFrom(remote,opcode);
end;
procedure tChat.InitFrom(const iremote:tNetAddr; iopcode:byte);
begin
remote:=iremote;
opcode:=iopcode;
SetMsgHandler(opcode,remote,@OnReply);
txSeq:=0;
rxSeq:=0;
rxAcked:=true; {to not ack pk 0}
closed:=false;
txPk:=nil;
txLen:=0;
Callback:=nil;
OnTimeout:=nil;
OnDispose:=nil;
RTT:=200; {a default for timeouts}
txTime:=0;
tmAck:=0;
tmReply:=0;
end;
{struct
opcode:byte
seq:2
ack_seq:2
data:xx
}
procedure tCHat.AddHeaders(var s:tMemoryStream);
begin s.skip(5) end;
procedure tChat.StreamInit(var s:tMemoryStream; l:word);
begin
s.Init(GetMem(l+5),0,l+5);
AddHeaders(s);
end;
procedure tChat.SetTimeout(acktm,repltm:LongInt);
begin
assert(assigned(OnTimeout));
tmAck:=acktm;
tmReply:=repltm;
end;
procedure tChat.Send(s:tMemoryStream);
begin
if txLen>0 then begin
FreeMem(txPk,txLen);
UnShedule(@Resend);
end;
//assert(assigned(callback));
Inc(txSeq);
s.Seek(0);
s.WriteByte(opcode);
s.WriteWord(txSeq,2);
if not rxAcked then begin
s.WriteWord(rxSeq,2);
rxAcked:=true;
end else s.WriteWord(0,2);
txPk:=s.base;
txLen:=s.Length;
ServerLoop.SendMessage(txPk^,txLen,remote);
ServerLoop.Shedule(RTT*2,@Resend);
txTime:=Now;
end;
procedure tChat.Ack;
var s:tMemoryStream;
begin
if not rxAcked then begin
s.Init(GetMem(5),0,5);
s.WriteByte(opcode);
s.WriteWord(0,2);
s.WriteWord(rxSeq,2);
ServerLoop.SendMessage(s.base^,s.length,remote);
FreeMem(s.base,s.length);
rxAcked:=true;
if assigned(OnTimeout) and (tmReply>0) then Shedule(tmReply,@ReplyTimeout);
end;
end;
procedure tChat.Close;
begin
assert(not closed);
Ack;
closed:=true;
callback:=nil; {avoid calling}
ontimeout:=nil;
UnShedule(@ReplyTimeout); {fuck it}
//writeln('Chat: closing');
if txLen=0 {no packets in flight} then begin
Shedule(5000{todo},@Done); {wait for something lost}
end;
end;
procedure tChat.Done;
begin
if txLen>0 then FreeMem(txPk,txLen);
SetMsgHandler(opcode,remote,nil);
UnShedule(@Resend);
UnShedule(@ReplyTimeout);
if assigned(OnDispose) then OnDispose
else FreeMem(@self,sizeof(self));
//writeln('Chat: closed');
end;
procedure tChat.Resend;
{timeout waiting for ack}
begin
{check for timeout and closed}
if RTT<1 then RTT:=2; RTT:=RTT*2;
if closed and (RTT>5000) then begin
Done;
exit
end;
if (not closed) and (tmAck>0) and (RTT>tmAck) then begin
if assigned(ontimeout) then OnTimeout;
Done;
exit
end;
{resend}
//writeln('Chat: retry');
ServerLoop.SendMessage(txPk^,txLen,remote);
txTime:=Now;
{reshedule}
ServerLoop.Shedule(RTT,@Resend);
end;
procedure tChat.OnReply(msg:tSMsg);
var seq,aseq:Word;
var s:tMemoryStream;
begin
msg.stream.skip(1{opcode});
seq:=msg.stream.ReadWord(2);
aseq:=msg.stream.ReadWord(2);
if aseq>0 then {ack of our msg} begin
if (aseq=txSeq)and(txLen>0) {it is current} then begin
if txTime>0 then RTT:=Round((Now-txTime)*MsecsPerDay);
FreeMem(txPk,txLen);
UnShedule(@Resend);
if Closed then Shedule(5,@Done);{wtf?}
TxLen:=0;
txPk:=nil;
if assigned(callback) then callback(msg,false);
if assigned(OnTimeout) and (tmReply>0) then Shedule(tmReply,@ReplyTimeout);
end else {write(' old-ack')it is ack of old data, do nothing};
end;
if seq>0 then {some data} begin
if seq<=rxSeq then {remote didnt get our ack} begin
s.Init(GetMem(5),0,5);
s.WriteByte(opcode);
s.WriteWord(0,2);
s.WriteWord(rxSeq,2);
ServerLoop.SendMessage(s.base^,s.length,remote);
FreeMem(s.base,s.length);
if seq=rxSeq then rxacked:=true;
end else begin
{some useful data!}
rxSeq:=seq;
rxAcked:=false;
UnShedule(@ReplyTimeout);
if assigned(callback) then callback(msg,true);
end;
end;
end;
procedure tChat.ReplyTimeout;
begin
assert(assigned(OnTimeout));
OnTimeout;
{...}
end;
var ChatHandlers: array [1..32] of tChatHandler;
procedure SetChatHandler(initcode:byte; handler:tChatHandler);
begin
assert(ChatHandlers[initcode]=nil);
ChatHandlers[initcode]:=handler;
end;
procedure OnHiMsg(msg:tSMsg);
{new chat was received!}
var opcode:byte;
var seq,aseq:word;
var hnd:tChatHandler;
var nchat:^tChat;
var ix:byte;
begin
opcode:=msg.stream.ReadByte;
assert(not IsMsgHandled(opcode,msg.source^));
seq:=msg.stream.ReadWord(2);
aseq:=msg.stream.ReadWord(2);
if (seq<>1)or(aseq>0) then exit; {invalid initial state}
ix:=msg.stream.ReadByte;
if (ix<1)or(ix>high(ChatHandlers)) then exit;
hnd:=ChatHandlers[ix];
if not assigned(hnd) then raise eXception.Create('No handler for initcode '+IntToStr(ix));
msg.stream.seek(msg.stream.position-1);{unskip the initcode}
nchat:=GetMem(sizeof(tChat));
nchat^.InitFrom(msg.Source^,opcode);
nchat^.rxacked:=false;
nchat^.rxSeq:=1;
hnd(nchat^,msg);
end;
BEGIN
FillChar(ChatHandlers,sizeof(chathandlers),0);
ServerLoop.SetHiMsgHandler(@OnHiMsg);
END.