Skip to content

Commit

Permalink
perf: KcpPeer: StopWatch replaced with time parameters just like kcp.c.
Browse files Browse the repository at this point in the history
=> KcpServer doesn't need one StopWatch per connection anymore
=> KcpServer can call .ElapsedTimeMilliseconds once, instead of per-connection
  • Loading branch information
vis2k committed Dec 14, 2022
1 parent 30a789e commit 7ac3f31
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 46 deletions.
14 changes: 11 additions & 3 deletions kcp2k/Assets/kcp2k/highlevel/KcpClient.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// kcp client logic abstracted into a class.
// for use in Mirror, DOTSNET, testing, etc.
using System;
using System.Diagnostics;
using System.Net;
using System.Net.Sockets;

Expand Down Expand Up @@ -36,6 +37,9 @@ public class KcpClient
// state
public bool connected;

// time: we need milliseconds. stopwatch is easiest.
readonly Stopwatch watch = new Stopwatch();

public KcpClient(Action OnConnected,
Action<ArraySegment<byte>, KcpChannel> OnData,
Action OnDisconnected,
Expand All @@ -45,6 +49,7 @@ public KcpClient(Action OnConnected,
this.OnData = OnData;
this.OnDisconnected = OnDisconnected;
this.OnError = OnError;
watch.Start();
}

public void Connect(string address, ushort port, KcpConfig config)
Expand Down Expand Up @@ -189,26 +194,29 @@ public void Disconnect()
// process incoming messages. should be called before updating the world.
public void TickIncoming()
{
uint time = (uint)watch.ElapsedMilliseconds;

// recv on socket first, then process incoming
// (even if we didn't receive anything. need to tick ping etc.)
// (connection is null if not active)
if (peer != null)
{

while (RawReceive(out ArraySegment<byte> segment))
peer.RawInput(segment);
peer.RawInput(time, segment);
}

// RawReceive may have disconnected peer. null check again.
peer?.TickIncoming();
peer?.TickIncoming(time);
}

// process outgoing messages. should be called after updating the world.
public void TickOutgoing()
{
// process outgoing
// (connection is null if not active)
peer?.TickOutgoing();
uint time = (uint)watch.ElapsedMilliseconds;
peer?.TickOutgoing(time);
}

// process incoming and outgoing for convenience
Expand Down
60 changes: 25 additions & 35 deletions kcp2k/Assets/kcp2k/highlevel/KcpPeer.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
// Kcp Peer, similar to UDP Peer but wrapped with reliability, channels,
// timeouts, authentication, state, etc.
//
// still IO agnostic to work with udp, nonalloc, relays, native, etc.
// => IO agnostic to work with udp, nonalloc, relays, native, etc.
// => time as parameter, just like original kcp.
// server doesn't need one stopwatch per connection.
using System;
using System.Diagnostics;
using System.Net.Sockets;

namespace kcp2k
Expand Down Expand Up @@ -36,11 +37,6 @@ public class KcpPeer
public int timeout;
uint lastReceiveTime;

// internal time.
// StopWatch offers ElapsedMilliSeconds and should be more precise than
// Unity's time.deltaTime over long periods.
readonly Stopwatch watch = new Stopwatch();

// we need to subtract the channel byte from every MaxMessageSize
// calculation.
// we also need to tell kcp to use MTU-1 to leave space for the byte.
Expand Down Expand Up @@ -166,15 +162,13 @@ public KcpPeer(Action<ArraySegment<byte>> output, KcpConfig config)
kcpSendBuffer = new byte[1 + ReliableMaxMessageSize(config.ReceiveWindowSize)];

timeout = config.Timeout;

watch.Start();
}

void HandleTimeout(uint time)
void HandleTimeout(uint timeMilliseconds)
{
// note: we are also sending a ping regularly, so timeout should
// only ever happen if the connection is truly gone.
if (time >= lastReceiveTime + timeout)
if (timeMilliseconds >= lastReceiveTime + timeout)
{
// pass error to user callback. no need to log it manually.
// GetType() shows Server/ClientConn instead of just Connection.
Expand All @@ -196,15 +190,15 @@ void HandleDeadLink()
}

// send a ping occasionally in order to not time out on the other end.
void HandlePing(uint time)
void HandlePing(uint timeMilliseconds)
{
// enough time elapsed since last ping?
if (time >= lastPingTime + PING_INTERVAL)
if (timeMilliseconds >= lastPingTime + PING_INTERVAL)
{
// ping again and reset time
//Log.Debug("KCP: sending ping...");
SendPing();
lastPingTime = time;
lastPingTime = timeMilliseconds;
}
}

Expand Down Expand Up @@ -237,7 +231,7 @@ void HandleChoked()

// reads the next reliable message type & content from kcp.
// -> to avoid buffering, unreliable messages call OnData directly.
bool ReceiveNextReliable(out KcpHeader header, out ArraySegment<byte> message)
bool ReceiveNextReliable(uint timeMilliseconds, out KcpHeader header, out ArraySegment<byte> message)
{
message = default;
header = KcpHeader.Disconnect;
Expand Down Expand Up @@ -272,20 +266,20 @@ bool ReceiveNextReliable(out KcpHeader header, out ArraySegment<byte> message)
// extract header & content without header
header = (KcpHeader)kcpMessageBuffer[0];
message = new ArraySegment<byte>(kcpMessageBuffer, 1, msgSize - 1);
lastReceiveTime = (uint)watch.ElapsedMilliseconds;
lastReceiveTime = timeMilliseconds;
return true;
}

void TickIncoming_Connected(uint time)
void TickIncoming_Connected(uint timeMilliseconds)
{
// detect common events & ping
HandleTimeout(time);
HandleTimeout(timeMilliseconds);
HandleDeadLink();
HandlePing(time);
HandlePing(timeMilliseconds);
HandleChoked();

// any reliable kcp message received?
if (ReceiveNextReliable(out KcpHeader header, out ArraySegment<byte> message))
if (ReceiveNextReliable(timeMilliseconds, out KcpHeader header, out ArraySegment<byte> message))
{
// message type FSM. no default so we never miss a case.
switch (header)
Expand Down Expand Up @@ -319,16 +313,16 @@ void TickIncoming_Connected(uint time)
}
}

void TickIncoming_Authenticated(uint time)
void TickIncoming_Authenticated(uint timeMilliseconds)
{
// detect common events & ping
HandleTimeout(time);
HandleTimeout(timeMilliseconds);
HandleDeadLink();
HandlePing(time);
HandlePing(timeMilliseconds);
HandleChoked();

// process all received messages
while (ReceiveNextReliable(out KcpHeader header, out ArraySegment<byte> message))
while (ReceiveNextReliable(timeMilliseconds, out KcpHeader header, out ArraySegment<byte> message))
{
// message type FSM. no default so we never miss a case.
switch (header)
Expand Down Expand Up @@ -376,22 +370,20 @@ void TickIncoming_Authenticated(uint time)
}
}

public void TickIncoming()
public void TickIncoming(uint timeMilliseconds)
{
uint time = (uint)watch.ElapsedMilliseconds;

try
{
switch (state)
{
case KcpState.Connected:
{
TickIncoming_Connected(time);
TickIncoming_Connected(timeMilliseconds);
break;
}
case KcpState.Authenticated:
{
TickIncoming_Authenticated(time);
TickIncoming_Authenticated(timeMilliseconds);
break;
}
case KcpState.Disconnected:
Expand Down Expand Up @@ -428,10 +420,8 @@ public void TickIncoming()
}
}

public void TickOutgoing()
public void TickOutgoing(uint timeMilliseconds)
{
uint time = (uint)watch.ElapsedMilliseconds;

try
{
switch (state)
Expand All @@ -440,7 +430,7 @@ public void TickOutgoing()
case KcpState.Authenticated:
{
// update flushes out messages
kcp.Update(time);
kcp.Update(timeMilliseconds);
break;
}
case KcpState.Disconnected:
Expand Down Expand Up @@ -480,7 +470,7 @@ public void TickOutgoing()
// insert raw IO. usually from socket.Receive.
// offset is useful for relays, where we may parse a header and then
// feed the rest to kcp.
public void RawInput(ArraySegment<byte> segment)
public void RawInput(uint timeMilliseconds, ArraySegment<byte> segment)
{
// ensure valid size: at least 1 byte for channel
if (segment.Count <= 0) return;
Expand Down Expand Up @@ -537,7 +527,7 @@ public void RawInput(ArraySegment<byte> segment)
// otherwise a connection might time out even
// though unreliable were received, but no
// reliable was received.
lastReceiveTime = (uint)watch.ElapsedMilliseconds;
lastReceiveTime = timeMilliseconds;
}
else
{
Expand Down
28 changes: 20 additions & 8 deletions kcp2k/Assets/kcp2k/highlevel/KcpServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
// for use in Mirror, DOTSNET, testing, etc.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Net.Sockets;
using UnityEngine;
using Debug = UnityEngine.Debug;

namespace kcp2k
{
Expand Down Expand Up @@ -36,6 +37,11 @@ public class KcpServer
public Dictionary<int, KcpServerConnection> connections =
new Dictionary<int, KcpServerConnection>();

// time: we need milliseconds. stopwatch is easiest.
// one watch per server is enough, we don't need one per connection.
// .ElapsedMilliseconds shows in profiler. don't call it per-connection.
readonly Stopwatch watch = new Stopwatch();

public KcpServer(Action<int> OnConnected,
Action<int, ArraySegment<byte>, KcpChannel> OnData,
Action<int> OnDisconnected,
Expand All @@ -48,6 +54,8 @@ public KcpServer(Action<int> OnConnected,
this.OnError = OnError;
this.config = config;

watch.Start();

// create newClientEP either IPv4 or IPv6
newClientEP = config.DualMode
? new IPEndPoint(IPAddress.IPv6Any, 0)
Expand Down Expand Up @@ -205,7 +213,7 @@ protected virtual KcpServerConnection CreateConnection(int connectionId)

// receive + add + process once.
// best to call this as long as there is more data to receive.
void ProcessMessage(ArraySegment<byte> segment, int connectionId)
void ProcessMessage(uint time, ArraySegment<byte> segment, int connectionId)
{
//Log.Info($"KCP: server raw recv {msgLength} bytes = {BitConverter.ToString(buffer, 0, msgLength)}");

Expand Down Expand Up @@ -288,8 +296,8 @@ void ProcessMessage(ArraySegment<byte> segment, int connectionId)
// connected event was set up.
// tick will process the first message and adds the
// connection if it was the handshake.
connection.peer.RawInput(segment);
connection.peer.TickIncoming();
connection.peer.RawInput(time, segment);
connection.peer.TickIncoming(time);

// again, do not add to connections.
// if the first message wasn't the kcp handshake then
Expand All @@ -298,7 +306,7 @@ void ProcessMessage(ArraySegment<byte> segment, int connectionId)
// existing connection: simply input the message into kcp
else
{
connection.peer.RawInput(segment);
connection.peer.RawInput(time, segment);
}
}

Expand All @@ -307,17 +315,19 @@ void ProcessMessage(ArraySegment<byte> segment, int connectionId)
readonly HashSet<int> connectionsToRemove = new HashSet<int>();
public virtual void TickIncoming()
{
uint time = (uint)watch.ElapsedMilliseconds;

// input all received messages into kcp
while (RawReceiveFrom(out ArraySegment<byte> segment, out int connectionId))
{
ProcessMessage(segment, connectionId);
ProcessMessage(time, segment, connectionId);
}

// process inputs for all server connections
// (even if we didn't receive anything. need to tick ping etc.)
foreach (KcpServerConnection connection in connections.Values)
{
connection.peer.TickIncoming();
connection.peer.TickIncoming(time);
}

// remove disconnected connections
Expand All @@ -334,10 +344,12 @@ public virtual void TickIncoming()
// virtual because relay may need to inject their own ping or similar.
public virtual void TickOutgoing()
{
uint time = (uint)watch.ElapsedMilliseconds;

// flush all server connections
foreach (KcpServerConnection connection in connections.Values)
{
connection.peer.TickOutgoing();
connection.peer.TickOutgoing(time);
}
}

Expand Down

0 comments on commit 7ac3f31

Please sign in to comment.