9
9
import io .jpower .kcp .netty .internal .ReusableListIterator ;
10
10
import io .netty .buffer .ByteBuf ;
11
11
import io .netty .buffer .ByteBufAllocator ;
12
- import io .netty .util .Recycler ;
12
+ import io .netty .util .internal .ObjectPool ;
13
+ import io .netty .util .internal .ObjectPool .Handle ;
13
14
import io .netty .util .internal .logging .InternalLogger ;
14
15
import io .netty .util .internal .logging .InternalLoggerFactory ;
15
16
@@ -68,7 +69,10 @@ public class Kcp {
68
69
69
70
public static final int IKCP_WND_SND = 32 ;
70
71
71
- public static final int IKCP_WND_RCV = 32 ;
72
+ /**
73
+ * must >= max fragment size
74
+ */
75
+ public static final int IKCP_WND_RCV = 128 ;
72
76
73
77
public static final int IKCP_MTU_DEF = 1400 ;
74
78
@@ -113,9 +117,9 @@ public class Kcp {
113
117
114
118
private long rcvNxt ;
115
119
116
- private long tsRecent ;
120
+ private int tsRecent ;
117
121
118
- private long tsLastack ;
122
+ private int tsLastack ;
119
123
120
124
private int ssthresh = IKCP_THRESH_INIT ;
121
125
@@ -137,11 +141,11 @@ public class Kcp {
137
141
138
142
private int probe ;
139
143
140
- private long current ;
144
+ private int current ;
141
145
142
146
private int interval = IKCP_INTERVAL ;
143
147
144
- private long tsFlush = IKCP_INTERVAL ;
148
+ private int tsFlush = IKCP_INTERVAL ;
145
149
146
150
private int xmit ;
147
151
@@ -151,7 +155,7 @@ public class Kcp {
151
155
152
156
private boolean updated ;
153
157
154
- private long tsProbe ;
158
+ private int tsProbe ;
155
159
156
160
private int probeWait ;
157
161
@@ -198,10 +202,6 @@ public class Kcp {
198
202
199
203
private KcpMetric metric = new KcpMetric (this );
200
204
201
- private static long long2Uint (long n ) {
202
- return n & 0x00000000FFFFFFFFL ;
203
- }
204
-
205
205
private static long int2Uint (int i ) {
206
206
return i & 0xFFFFFFFFL ;
207
207
}
@@ -210,10 +210,6 @@ private static int ibound(int lower, int middle, int upper) {
210
210
return Math .min (Math .max (lower , middle ), upper );
211
211
}
212
212
213
- private static long ibound (long lower , long middle , long upper ) {
214
- return Math .min (Math .max (lower , middle ), upper );
215
- }
216
-
217
213
private static int itimediff (int later , int earlier ) {
218
214
return later - earlier ;
219
215
}
@@ -239,7 +235,7 @@ private static int encodeSeg(ByteBuf buf, Segment seg) {
239
235
buf .writeByte (seg .cmd );
240
236
buf .writeByte (seg .frg );
241
237
buf .writeShortLE (seg .wnd );
242
- buf .writeIntLE (( int ) seg .ts );
238
+ buf .writeIntLE (seg .ts );
243
239
buf .writeIntLE ((int ) seg .sn );
244
240
buf .writeIntLE ((int ) seg .una );
245
241
buf .writeIntLE (seg .data .readableBytes ());
@@ -249,7 +245,7 @@ private static int encodeSeg(ByteBuf buf, Segment seg) {
249
245
250
246
private static class Segment {
251
247
252
- private final Recycler . Handle <Segment > recyclerHandle ;
248
+ private final Handle <Segment > recyclerHandle ;
253
249
254
250
private int conv ;
255
251
@@ -259,13 +255,13 @@ private static class Segment {
259
255
260
256
private int wnd ;
261
257
262
- private long ts ;
258
+ private int ts ;
263
259
264
260
private long sn ;
265
261
266
262
private long una ;
267
263
268
- private long resendts ;
264
+ private int resendts ;
269
265
270
266
private int rto ;
271
267
@@ -275,16 +271,9 @@ private static class Segment {
275
271
276
272
private ByteBuf data ;
277
273
278
- private static final Recycler <Segment > RECYCLER = new Recycler <Segment >() {
279
-
280
- @ Override
281
- protected Segment newObject (Handle <Segment > handle ) {
282
- return new Segment (handle );
283
- }
284
-
285
- };
274
+ private static final ObjectPool <Segment > RECYCLER = ObjectPool .newPool (Segment ::new );
286
275
287
- private Segment (Recycler . Handle <Segment > recyclerHandle ) {
276
+ private Segment (Handle <Segment > recyclerHandle ) {
288
277
this .recyclerHandle = recyclerHandle ;
289
278
}
290
279
@@ -344,7 +333,7 @@ private void release(List<Segment> segQueue) {
344
333
}
345
334
}
346
335
347
- private ByteBuf tryCreateAndOutput (ByteBuf buffer , int need ) {
336
+ private ByteBuf tryCreateOrOutput (ByteBuf buffer , int need ) {
348
337
if (buffer == null ) {
349
338
buffer = createFlushByteBuf ();
350
339
} else if (buffer .readableBytes () + need > mtu ) {
@@ -555,7 +544,7 @@ public int send(ByteBuf buf) {
555
544
count = (len + mss - 1 ) / mss ;
556
545
}
557
546
558
- if (count > 255 ) { // Maybe don't need the conditon in stream mode
547
+ if (count >= IKCP_WND_RCV ) { // Maybe don't need the conditon in stream mode
559
548
return -2 ;
560
549
}
561
550
@@ -648,7 +637,7 @@ private void parseFastack(long sn) {
648
637
}
649
638
}
650
639
651
- private void ackPush (long sn , long ts ) {
640
+ private void ackPush (long sn , int ts ) {
652
641
int newSize = 2 * (ackcount + 1 );
653
642
654
643
if (newSize > acklist .length ) {
@@ -664,7 +653,7 @@ private void ackPush(long sn, long ts) {
664
653
}
665
654
666
655
acklist [2 * ackcount ] = (int ) sn ;
667
- acklist [2 * ackcount + 1 ] = ( int ) ts ;
656
+ acklist [2 * ackcount + 1 ] = ts ;
668
657
ackcount ++;
669
658
}
670
659
@@ -736,8 +725,8 @@ public int input(ByteBuf data) {
736
725
}
737
726
738
727
while (true ) {
739
- int conv , len , wnd ;
740
- long ts , sn , una ;
728
+ int conv , len , wnd , ts ;
729
+ long sn , una ;
741
730
byte cmd ;
742
731
short frg ;
743
732
Segment seg ;
@@ -754,7 +743,7 @@ public int input(ByteBuf data) {
754
743
cmd = data .readByte ();
755
744
frg = data .readUnsignedByte ();
756
745
wnd = data .readUnsignedShortLE ();
757
- ts = data .readUnsignedIntLE ();
746
+ ts = data .readIntLE ();
758
747
sn = data .readUnsignedIntLE ();
759
748
una = data .readUnsignedIntLE ();
760
749
len = data .readIntLE ();
@@ -776,10 +765,10 @@ public int input(ByteBuf data) {
776
765
shrinkBuf ();
777
766
778
767
boolean readed = false ;
779
- long uintCurrent = long2Uint ( current ) ;
768
+ int current = this . current ;
780
769
switch (cmd ) {
781
770
case IKCP_CMD_ACK : {
782
- int rtt = itimediff (uintCurrent , ts );
771
+ int rtt = itimediff (current , ts );
783
772
if (rtt >= 0 ) {
784
773
updateAck (rtt );
785
774
}
@@ -889,8 +878,7 @@ private int wndUnused() {
889
878
* ikcp_flush
890
879
*/
891
880
private void flush () {
892
- long current = this .current ;
893
- long uintCurrent = long2Uint (current );
881
+ int current = this .current ;
894
882
895
883
// 'ikcp_update' haven't been called.
896
884
if (!updated ) {
@@ -911,9 +899,9 @@ private void flush() {
911
899
// flush acknowledges
912
900
int count = ackcount ;
913
901
for (int i = 0 ; i < count ; i ++) {
914
- buffer = tryCreateAndOutput (buffer , IKCP_OVERHEAD );
902
+ buffer = tryCreateOrOutput (buffer , IKCP_OVERHEAD );
915
903
seg .sn = int2Uint (acklist [i * 2 ]);
916
- seg .ts = int2Uint ( acklist [i * 2 + 1 ]) ;
904
+ seg .ts = acklist [i * 2 + 1 ];
917
905
encodeSeg (buffer , seg );
918
906
if (log .isDebugEnabled ()) {
919
907
log .debug ("{} flush ack: sn={}, ts={}" , this , seg .sn , seg .ts );
@@ -948,7 +936,7 @@ private void flush() {
948
936
// flush window probing commands
949
937
if ((probe & IKCP_ASK_SEND ) != 0 ) {
950
938
seg .cmd = IKCP_CMD_WASK ;
951
- buffer = tryCreateAndOutput (buffer , IKCP_OVERHEAD );
939
+ buffer = tryCreateOrOutput (buffer , IKCP_OVERHEAD );
952
940
encodeSeg (buffer , seg );
953
941
if (log .isDebugEnabled ()) {
954
942
log .debug ("{} flush ask" , this );
@@ -958,7 +946,7 @@ private void flush() {
958
946
// flush window probing commands
959
947
if ((probe & IKCP_ASK_TELL ) != 0 ) {
960
948
seg .cmd = IKCP_CMD_WINS ;
961
- buffer = tryCreateAndOutput (buffer , IKCP_OVERHEAD );
949
+ buffer = tryCreateOrOutput (buffer , IKCP_OVERHEAD );
962
950
encodeSeg (buffer , seg );
963
951
if (log .isDebugEnabled ()) {
964
952
log .debug ("{} flush tell: wnd={}" , this , seg .wnd );
@@ -985,7 +973,7 @@ private void flush() {
985
973
newSeg .conv = conv ;
986
974
newSeg .cmd = IKCP_CMD_PUSH ;
987
975
newSeg .wnd = seg .wnd ;
988
- newSeg .ts = uintCurrent ;
976
+ newSeg .ts = current ;
989
977
newSeg .sn = sndNxt ++;
990
978
newSeg .una = rcvNxt ;
991
979
newSeg .resendts = current ;
@@ -1044,15 +1032,15 @@ private void flush() {
1044
1032
}
1045
1033
1046
1034
if (needsend ) {
1047
- segment .ts = uintCurrent ;
1035
+ segment .ts = current ;
1048
1036
segment .wnd = seg .wnd ;
1049
1037
segment .una = rcvNxt ;
1050
1038
1051
1039
ByteBuf segData = segment .data ;
1052
1040
int segLen = segData .readableBytes ();
1053
1041
int need = IKCP_OVERHEAD + segLen ;
1054
1042
1055
- buffer = tryCreateAndOutput (buffer , need );
1043
+ buffer = tryCreateOrOutput (buffer , need );
1056
1044
encodeSeg (buffer , segment );
1057
1045
1058
1046
if (segLen > 0 ) {
@@ -1110,7 +1098,7 @@ private void flush() {
1110
1098
*
1111
1099
* @param current
1112
1100
*/
1113
- public void update (long current ) {
1101
+ public void update (int current ) {
1114
1102
this .current = current ;
1115
1103
1116
1104
if (!updated ) {
@@ -1156,12 +1144,12 @@ public void update(long current) {
1156
1144
* @param current
1157
1145
* @return
1158
1146
*/
1159
- public long check (long current ) {
1147
+ public int check (int current ) {
1160
1148
if (!updated ) {
1161
1149
return current ;
1162
1150
}
1163
1151
1164
- long tsFlush = this .tsFlush ;
1152
+ int tsFlush = this .tsFlush ;
1165
1153
int slap = itimediff (current , tsFlush );
1166
1154
if (slap >= 10000 || slap < -10000 ) {
1167
1155
tsFlush = current ;
0 commit comments