Skip to content

Commit

Permalink
Merge pull request #863 from Cysharp/feature/ReduceClientHeartbeatSize
Browse files Browse the repository at this point in the history
Reduce the message size of ClientHeartbeat
  • Loading branch information
mayuki authored Oct 23, 2024
2 parents 4cc257a + 556c193 commit 0d43fd6
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,14 @@ public static void WriteServerHeartbeatMessageResponse(IBufferWriter<byte> buffe
/// Writes a client heartbeat message for sending from the client.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void WriteClientHeartbeatMessage(IBufferWriter<byte> bufferWriter, short sequence, DateTimeOffset clientSentAt)
public static void WriteClientHeartbeatMessage(IBufferWriter<byte> bufferWriter, short sequence, long clientSentAtElapsedFromOriginMs)
{
// Array(4)[0x7e(126), Sequence(int8), ClientSentAt(long; UnixTimeMs), <Extra>]
// Array(4)[0x7e(126), Sequence(int8), ClientSentAtElapsedFromOrigin(long; Ms), <Extra>]
var writer = new MessagePackWriter(bufferWriter);
writer.WriteArrayHeader(4);
writer.Write(0x7e); // 0:Type = 0x7e / 126 (ClientHeartbeat)
writer.Write(sequence); // 1:Sequence
writer.Write(clientSentAt.ToUnixTimeMilliseconds()); // 2:ClientSentAt
writer.Write(clientSentAtElapsedFromOriginMs); // 2:ClientSentAtElapsedFromOrigin
writer.WriteNil(); // 3:Reserved
writer.Flush();
}
Expand All @@ -254,12 +254,12 @@ public static void WriteClientHeartbeatMessage(IBufferWriter<byte> bufferWriter,
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void WriteClientHeartbeatMessageResponse(IBufferWriter<byte> bufferWriter, short sequence, long clientSentAt)
{
// Array(5)[0x7e(126), Sequence(int8), ClientSentAt(long; UnixTimeMs), Nil, <Extra>]
// Array(5)[0x7e(126), Sequence(int8), ClientSentAtElapsedFromOrigin(long; Ms), Nil, <Extra>]
var writer = new MessagePackWriter(bufferWriter);
writer.WriteArrayHeader(5);
writer.Write(0x7e); // 0:Type = 0x7e / 126 (Heartbeat)
writer.Write(sequence); // 1:Sequence
writer.Write(clientSentAt); // 2:ClientSentAt
writer.Write(clientSentAt); // 2:ClientSentAtElapsedFromOrigin
writer.WriteNil(); // 3:Reserved
writer.WriteNil(); // 4:Reserved
writer.Flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ internal class StreamingHubClientHeartbeatManager : IAsyncDisposable
readonly SynchronizationContext? synchronizationContext;
readonly ChannelWriter<StreamingHubPayload> writer;
readonly TimeProvider timeProvider;
readonly long timestampOrigin;

SendOrPostCallback? processServerHeartbeatCoreCache;
SendOrPostCallback? processClientHeartbeatResponseCoreCache;
Expand All @@ -29,6 +30,8 @@ internal class StreamingHubClientHeartbeatManager : IAsyncDisposable
bool isTimeoutTimerRunning;
bool disposed;

long ElapsedMillisecondsFromOrigin => (long)timeProvider.GetElapsedTime(timestampOrigin).TotalMilliseconds;

public CancellationToken TimeoutToken => timeoutTokenSource.Token;

bool IsDisposed => Volatile.Read(ref disposed);
Expand Down Expand Up @@ -56,6 +59,7 @@ TimeProvider timeProvider
this.synchronizationContext = synchronizationContext;
this.shutdownTokenSource = CancellationTokenSource.CreateLinkedTokenSource(timeoutTokenSource.Token);
this.timeProvider = timeProvider;
this.timestampOrigin = timeProvider.GetTimestamp();
}

public void StartClientHeartbeatLoop()
Expand Down Expand Up @@ -99,7 +103,6 @@ void SendClientHeartbeat()
}
}


public void ProcessClientHeartbeatResponse(StreamingHubPayload payload)
{
if (IsDisposed) return;
Expand Down Expand Up @@ -141,7 +144,7 @@ SendOrPostCallback ProcessClientHeartbeatResponseCore(Action<ClientHeartbeatEven
var payload = (StreamingHubPayload)state!;
var reader = new StreamingHubClientMessageReader(payload.Memory);
_ = reader.ReadMessageType();
var (sentSequence, clientSentAt) = reader.ReadClientHeartbeatResponse();
var (sentSequence, clientSentAtElapsedMsFromOrigin) = reader.ReadClientHeartbeatResponse();

if (sentSequence == sequence - 1/* NOTE: Sequence already 1 advanced.*/)
{
Expand All @@ -150,8 +153,7 @@ SendOrPostCallback ProcessClientHeartbeatResponseCore(Action<ClientHeartbeatEven
isTimeoutTimerRunning = false;
}

var now = timeProvider.GetUtcNow();
var elapsed = now.ToUnixTimeMilliseconds() - clientSentAt;
var elapsed = ElapsedMillisecondsFromOrigin - clientSentAtElapsedMsFromOrigin;

clientHeartbeatReceivedAction?.Invoke(new ClientHeartbeatEvent(elapsed));
StreamingHubPayloadPool.Shared.Return(payload);
Expand Down Expand Up @@ -189,9 +191,7 @@ StreamingHubPayload BuildClientHeartbeatMessage(short clientSequence)
{
using var buffer = ArrayPoolBufferWriter.RentThreadStaticWriter();

var now = timeProvider.GetUtcNow();

StreamingHubMessageWriter.WriteClientHeartbeatMessage(buffer, clientSequence, now);
StreamingHubMessageWriter.WriteClientHeartbeatMessage(buffer, clientSequence, ElapsedMillisecondsFromOrigin);
return StreamingHubPayloadPool.Shared.RentOrCreate(buffer.WrittenSpan);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ internal class StreamingHubClientHeartbeatManager : IAsyncDisposable
readonly SynchronizationContext? synchronizationContext;
readonly ChannelWriter<StreamingHubPayload> writer;
readonly TimeProvider timeProvider;
readonly long timestampOrigin;

SendOrPostCallback? processServerHeartbeatCoreCache;
SendOrPostCallback? processClientHeartbeatResponseCoreCache;
Expand All @@ -29,6 +30,8 @@ internal class StreamingHubClientHeartbeatManager : IAsyncDisposable
bool isTimeoutTimerRunning;
bool disposed;

long ElapsedMillisecondsFromOrigin => (long)timeProvider.GetElapsedTime(timestampOrigin).TotalMilliseconds;

public CancellationToken TimeoutToken => timeoutTokenSource.Token;

bool IsDisposed => Volatile.Read(ref disposed);
Expand Down Expand Up @@ -56,6 +59,7 @@ TimeProvider timeProvider
this.synchronizationContext = synchronizationContext;
this.shutdownTokenSource = CancellationTokenSource.CreateLinkedTokenSource(timeoutTokenSource.Token);
this.timeProvider = timeProvider;
this.timestampOrigin = timeProvider.GetTimestamp();
}

public void StartClientHeartbeatLoop()
Expand Down Expand Up @@ -99,7 +103,6 @@ void SendClientHeartbeat()
}
}


public void ProcessClientHeartbeatResponse(StreamingHubPayload payload)
{
if (IsDisposed) return;
Expand Down Expand Up @@ -141,7 +144,7 @@ SendOrPostCallback ProcessClientHeartbeatResponseCore(Action<ClientHeartbeatEven
var payload = (StreamingHubPayload)state!;
var reader = new StreamingHubClientMessageReader(payload.Memory);
_ = reader.ReadMessageType();
var (sentSequence, clientSentAt) = reader.ReadClientHeartbeatResponse();
var (sentSequence, clientSentAtElapsedMsFromOrigin) = reader.ReadClientHeartbeatResponse();

if (sentSequence == sequence - 1/* NOTE: Sequence already 1 advanced.*/)
{
Expand All @@ -150,8 +153,7 @@ SendOrPostCallback ProcessClientHeartbeatResponseCore(Action<ClientHeartbeatEven
isTimeoutTimerRunning = false;
}

var now = timeProvider.GetUtcNow();
var elapsed = now.ToUnixTimeMilliseconds() - clientSentAt;
var elapsed = ElapsedMillisecondsFromOrigin - clientSentAtElapsedMsFromOrigin;

clientHeartbeatReceivedAction?.Invoke(new ClientHeartbeatEvent(elapsed));
StreamingHubPayloadPool.Shared.Return(payload);
Expand Down Expand Up @@ -189,9 +191,7 @@ StreamingHubPayload BuildClientHeartbeatMessage(short clientSequence)
{
using var buffer = ArrayPoolBufferWriter.RentThreadStaticWriter();

var now = timeProvider.GetUtcNow();

StreamingHubMessageWriter.WriteClientHeartbeatMessage(buffer, clientSequence, now);
StreamingHubMessageWriter.WriteClientHeartbeatMessage(buffer, clientSequence, ElapsedMillisecondsFromOrigin);
return StreamingHubPayloadPool.Shared.RentOrCreate(buffer.WrittenSpan);
}

Expand Down
10 changes: 5 additions & 5 deletions src/MagicOnion.Internal/StreamingHubMessageWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,14 @@ public static void WriteServerHeartbeatMessageResponse(IBufferWriter<byte> buffe
/// Writes a client heartbeat message for sending from the client.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void WriteClientHeartbeatMessage(IBufferWriter<byte> bufferWriter, short sequence, DateTimeOffset clientSentAt)
public static void WriteClientHeartbeatMessage(IBufferWriter<byte> bufferWriter, short sequence, long clientSentAtElapsedFromOriginMs)
{
// Array(4)[0x7e(126), Sequence(int8), ClientSentAt(long; UnixTimeMs), <Extra>]
// Array(4)[0x7e(126), Sequence(int8), ClientSentAtElapsedFromOrigin(long; Ms), <Extra>]
var writer = new MessagePackWriter(bufferWriter);
writer.WriteArrayHeader(4);
writer.Write(0x7e); // 0:Type = 0x7e / 126 (ClientHeartbeat)
writer.Write(sequence); // 1:Sequence
writer.Write(clientSentAt.ToUnixTimeMilliseconds()); // 2:ClientSentAt
writer.Write(clientSentAtElapsedFromOriginMs); // 2:ClientSentAtElapsedFromOrigin
writer.WriteNil(); // 3:Reserved
writer.Flush();
}
Expand All @@ -254,12 +254,12 @@ public static void WriteClientHeartbeatMessage(IBufferWriter<byte> bufferWriter,
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void WriteClientHeartbeatMessageResponse(IBufferWriter<byte> bufferWriter, short sequence, long clientSentAt)
{
// Array(5)[0x7e(126), Sequence(int8), ClientSentAt(long; UnixTimeMs), Nil, <Extra>]
// Array(5)[0x7e(126), Sequence(int8), ClientSentAtElapsedFromOrigin(long; Ms), Nil, <Extra>]
var writer = new MessagePackWriter(bufferWriter);
writer.WriteArrayHeader(5);
writer.Write(0x7e); // 0:Type = 0x7e / 126 (Heartbeat)
writer.Write(sequence); // 1:Sequence
writer.Write(clientSentAt); // 2:ClientSentAt
writer.Write(clientSentAt); // 2:ClientSentAtElapsedFromOrigin
writer.WriteNil(); // 3:Reserved
writer.WriteNil(); // 4:Reserved
writer.Flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ public async Task Interval_TimeoutDisabled()

// Assert
Assert.True(channel.Reader.TryRead(out var heartbeat1));
Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x0 /* Sequence(0) */, .. ToMessagePackBytes(origin.AddSeconds(1)) /* ClientSentAt */, 0xc0 /* Nil */], heartbeat1.Memory.ToArray());
Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x0 /* Sequence(0) */, .. ToMessagePackBytes(TimeSpan.FromSeconds(1)) /* ClientSentAt */, 0xc0 /* Nil */], heartbeat1.Memory.ToArray());
Assert.True(channel.Reader.TryRead(out var heartbeat2));
Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x1 /* Sequence(1) */, .. ToMessagePackBytes(origin.AddSeconds(2)) /* ClientSentAt */, 0xc0 /* Nil */], heartbeat2.Memory.ToArray());
Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x1 /* Sequence(1) */, .. ToMessagePackBytes(TimeSpan.FromSeconds(2)) /* ClientSentAt */, 0xc0 /* Nil */], heartbeat2.Memory.ToArray());
Assert.True(channel.Reader.TryRead(out var heartbeat3));
Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x2 /* Sequence(2) */, .. ToMessagePackBytes(origin.AddSeconds(3)) /* ClientSentAt */, 0xc0 /* Nil */], heartbeat3.Memory.ToArray());
Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x2 /* Sequence(2) */, .. ToMessagePackBytes(TimeSpan.FromSeconds(3)) /* ClientSentAt */, 0xc0 /* Nil */], heartbeat3.Memory.ToArray());

Assert.False(manager.TimeoutToken.IsCancellationRequested);
}
Expand Down Expand Up @@ -77,32 +77,32 @@ public async Task Elapsed_RoundTripTime()
await Task.Delay(10);
timeProvider.Advance(TimeSpan.FromMilliseconds(100));
await Task.Delay(10);
manager.ProcessClientHeartbeatResponse(StreamingHubPayloadPool.Shared.RentOrCreate([0x95 /* Array(5) */, 0x7e /* 0x7e(127) */, 0x0 /* Sequence(0) */, .. ToMessagePackBytes(timeProvider.GetUtcNow().AddMilliseconds(-100)) /* ClientSentAt */, 0xc0 /* Nil */, 0xc0 /* Nil */]));
manager.ProcessClientHeartbeatResponse(StreamingHubPayloadPool.Shared.RentOrCreate([0x95 /* Array(5) */, 0x7e /* 0x7e(127) */, 0x0 /* Sequence(0) */, .. ToMessagePackBytes(TimeSpan.FromMilliseconds(1000)) /* ClientSentAt */, 0xc0 /* Nil */, 0xc0 /* Nil */]));

timeProvider.Advance(TimeSpan.FromMilliseconds(900)); // Send
await Task.Delay(10);
timeProvider.Advance(TimeSpan.FromMilliseconds(100));
await Task.Delay(10);
manager.ProcessClientHeartbeatResponse(StreamingHubPayloadPool.Shared.RentOrCreate([0x95 /* Array(5) */, 0x7e /* 0x7e(127) */, 0x1 /* Sequence(1) */, .. ToMessagePackBytes(timeProvider.GetUtcNow().AddMilliseconds(-100)) /* ClientSentAt */, 0xc0 /* Nil */, 0xc0 /* Nil */]));
manager.ProcessClientHeartbeatResponse(StreamingHubPayloadPool.Shared.RentOrCreate([0x95 /* Array(5) */, 0x7e /* 0x7e(127) */, 0x1 /* Sequence(1) */, .. ToMessagePackBytes(TimeSpan.FromMilliseconds(2000)) /* ClientSentAt */, 0xc0 /* Nil */, 0xc0 /* Nil */]));

timeProvider.Advance(TimeSpan.FromMilliseconds(900)); // Send
await Task.Delay(10);
timeProvider.Advance(TimeSpan.FromMilliseconds(100));
await Task.Delay(10);
manager.ProcessClientHeartbeatResponse(StreamingHubPayloadPool.Shared.RentOrCreate([0x95 /* Array(5) */, 0x7e /* 0x7e(127) */, 0x2 /* Sequence(2) */, .. ToMessagePackBytes(timeProvider.GetUtcNow().AddMilliseconds(-100)) /* ClientSentAt */, 0xc0 /* Nil */, 0xc0 /* Nil */]));
manager.ProcessClientHeartbeatResponse(StreamingHubPayloadPool.Shared.RentOrCreate([0x95 /* Array(5) */, 0x7e /* 0x7e(127) */, 0x2 /* Sequence(2) */, .. ToMessagePackBytes(TimeSpan.FromMilliseconds(3000)) /* ClientSentAt */, 0xc0 /* Nil */, 0xc0 /* Nil */]));

await Task.Delay(10);

// Assert
Assert.Equal(3, clientHeartbeatResponseReceived.Count);
Assert.True(channel.Reader.TryRead(out var heartbeat1));
Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x0 /* Sequence(0) */, .. ToMessagePackBytes(origin.AddSeconds(1)) /* ClientSentAt */, 0xc0 /* Nil */], heartbeat1.Memory.ToArray());
Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x0 /* Sequence(0) */, .. ToMessagePackBytes(TimeSpan.FromMilliseconds(1000)) /* ClientSentAt */, 0xc0 /* Nil */], heartbeat1.Memory.ToArray());
Assert.Equal(TimeSpan.FromMilliseconds(100), clientHeartbeatResponseReceived[0].RoundTripTime);
Assert.True(channel.Reader.TryRead(out var heartbeat2));
Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x1 /* Sequence(1) */, .. ToMessagePackBytes(origin.AddSeconds(2)) /* ClientSentAt */, 0xc0 /* Nil */], heartbeat2.Memory.ToArray());
Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x1 /* Sequence(1) */, .. ToMessagePackBytes(TimeSpan.FromMilliseconds(2000)) /* ClientSentAt */, 0xc0 /* Nil */], heartbeat2.Memory.ToArray());
Assert.Equal(TimeSpan.FromMilliseconds(100), clientHeartbeatResponseReceived[1].RoundTripTime);
Assert.True(channel.Reader.TryRead(out var heartbeat3));
Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x2 /* Sequence(2) */, .. ToMessagePackBytes(origin.AddSeconds(3)) /* ClientSentAt */, 0xc0 /* Nil */], heartbeat3.Memory.ToArray());
Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x2 /* Sequence(2) */, .. ToMessagePackBytes(TimeSpan.FromMilliseconds(3000)) /* ClientSentAt */, 0xc0 /* Nil */], heartbeat3.Memory.ToArray());
Assert.Equal(TimeSpan.FromMilliseconds(100), clientHeartbeatResponseReceived[2].RoundTripTime);

Assert.False(manager.TimeoutToken.IsCancellationRequested);
Expand Down Expand Up @@ -138,7 +138,7 @@ public async Task Timeout_Not_Responding()

// Assert
Assert.True(channel.Reader.TryRead(out var heartbeat1));
Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x0 /* Sequence(0) */, .. ToMessagePackBytes(origin.AddSeconds(1)) /* ClientSentAt */, 0xc0 /* Nil */], heartbeat1.Memory.ToArray());
Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x0 /* Sequence(0) */, .. ToMessagePackBytes(TimeSpan.FromSeconds(1)) /* ClientSentAt */, 0xc0 /* Nil */], heartbeat1.Memory.ToArray());
Assert.False(channel.Reader.TryRead(out var heartbeat2));

Assert.True(manager.TimeoutToken.IsCancellationRequested);
Expand Down Expand Up @@ -300,4 +300,15 @@ static byte[] ToMessagePackBytes(DateTimeOffset dt)
writer.Flush();
return arrayBufferWriter.WrittenMemory.ToArray();
}

static byte[] ToMessagePackBytes(TimeSpan ts)
{
var ms = (long)ts.TotalMilliseconds;

var arrayBufferWriter = new ArrayBufferWriter<byte>();
var writer = new MessagePackWriter(arrayBufferWriter);
writer.Write(ms);
writer.Flush();
return arrayBufferWriter.WrittenMemory.ToArray();
}
}
10 changes: 5 additions & 5 deletions tests/MagicOnion.Client.Tests/StreamingHubTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -462,13 +462,13 @@ public async Task ClientHeartbeat_Interval()
var request1 = await helper.ReadRequestRawAsync();
var request2 = await helper.ReadRequestRawAsync();
var request3 = await helper.ReadRequestRawAsync();
Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x00 /* Sequence(0) */, .. ToMessagePackBytes(origin.AddMilliseconds(100)) /* ServerSentAt */, 0xc0 /* Nil */], request1.ToArray());
Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x01 /* Sequence(1) */, .. ToMessagePackBytes(origin.AddMilliseconds(200)) /* ServerSentAt */, 0xc0 /* Nil */], request2.ToArray());
Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x02 /* Sequence(2) */, .. ToMessagePackBytes(origin.AddMilliseconds(300)) /* ServerSentAt */, 0xc0 /* Nil */], request3.ToArray());
Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x00 /* Sequence(0) */, .. ToMessagePackBytes(TimeSpan.FromMilliseconds(100)) /* ClientSentAt */, 0xc0 /* Nil */], request1.ToArray());
Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x01 /* Sequence(1) */, .. ToMessagePackBytes(TimeSpan.FromMilliseconds(200)) /* CliSentAt */, 0xc0 /* Nil */], request2.ToArray());
Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x02 /* Sequence(2) */, .. ToMessagePackBytes(TimeSpan.FromMilliseconds(300)) /* CliSentAt */, 0xc0 /* Nil */], request3.ToArray());

static byte[] ToMessagePackBytes(DateTimeOffset dt)
static byte[] ToMessagePackBytes(TimeSpan ts)
{
var ms = dt.ToUnixTimeMilliseconds();
var ms = (long)ts.TotalMilliseconds;

var arrayBufferWriter = new ArrayBufferWriter<byte>();
var writer = new MessagePackWriter(arrayBufferWriter);
Expand Down

0 comments on commit 0d43fd6

Please sign in to comment.