Skip to content

Commit

Permalink
Remove StreamingHubContextPool
Browse files Browse the repository at this point in the history
  • Loading branch information
mayuki committed Oct 3, 2024
1 parent 0b0765b commit 2a52dbf
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 31 deletions.
11 changes: 6 additions & 5 deletions src/MagicOnion.Server/Hubs/StreamingHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,16 @@ async Task HandleMessageAsync()

async ValueTask ConsumeRequestQueueAsync()
{
// Create and reuse a single StreamingHubContext for each hub connection.
var context = new StreamingHubContext();

// We need to process client requests sequentially.
// NOTE: Do not pass a CancellationToken to avoid allocation. We call Writer.Complete when we want to stop the consumption loop.
await foreach (var request in requests.Reader.ReadAllAsync(default))
{
try
{
await ProcessRequestAsync(request.Handlers, request.MethodId, request.MessageId, request.Body, request.HasResponse);
await ProcessRequestAsync(context, request.Handlers, request.MethodId, request.MessageId, request.Body, request.HasResponse);
}
finally
{
Expand Down Expand Up @@ -266,12 +269,10 @@ static void ThrowUnknownMessageType(StreamingHubMessageType messageType)
=> throw new InvalidOperationException($"Unknown MessageType: {messageType}");
}

ValueTask ProcessRequestAsync(UniqueHashDictionary<StreamingHubHandler> handlers, int methodId, int messageId, ReadOnlyMemory<byte> body, bool hasResponse)
ValueTask ProcessRequestAsync(StreamingHubContext context, UniqueHashDictionary<StreamingHubHandler> handlers, int methodId, int messageId, ReadOnlyMemory<byte> body, bool hasResponse)
{
var handler = GetOrThrowHandler(handlers, methodId);

// Create a context for each call to the hub method.
var context = StreamingHubContextPool.Shared.Get();
context.Initialize(
handler: handler,
streamingServiceContext: (IStreamingServiceContext<StreamingHubPayload, StreamingHubPayload>)Context,
Expand Down Expand Up @@ -366,7 +367,7 @@ void CleanupRequest(StreamingHubContext context, StreamingHubHandler handler, lo
var elapsed = timeProvider.GetElapsedTime(methodStartingTimestamp, methodEndingTimestamp);
MagicOnionServerLog.EndInvokeHubMethod(Context.MethodHandler.Logger, context, context.ResponseSize, context.ResponseType, elapsed.TotalMilliseconds, isErrorOrInterrupted);
Metrics.StreamingHubMethodCompleted(Context.Metrics, handler, methodStartingTimestamp, methodEndingTimestamp, isErrorOrInterrupted);
StreamingHubContextPool.Shared.Return(context);
context.Uninitialize();
}

// Interface methods for Client
Expand Down
38 changes: 12 additions & 26 deletions src/MagicOnion.Server/Hubs/StreamingHubContext.cs
Original file line number Diff line number Diff line change
@@ -1,36 +1,11 @@
using MagicOnion.Internal.Buffers;
using MessagePack;
using System.Collections.Concurrent;
using System.Diagnostics;
using MagicOnion.Internal;
using Microsoft.Extensions.ObjectPool;

namespace MagicOnion.Server.Hubs;

internal class StreamingHubContextPool
{
const int MaxRetainedCount = 16;
readonly ObjectPool<StreamingHubContext> pool = new DefaultObjectPool<StreamingHubContext>(new Policy(), MaxRetainedCount);

public static StreamingHubContextPool Shared { get; } = new();

public StreamingHubContext Get() => pool.Get();
public void Return(StreamingHubContext ctx) => pool.Return(ctx);

class Policy : IPooledObjectPolicy<StreamingHubContext>
{
public StreamingHubContext Create()
{
return new StreamingHubContext();
}

public bool Return(StreamingHubContext obj)
{
obj.Uninitialize();
return true;
}
}
}

public class StreamingHubContext
{
IStreamingServiceContext<StreamingHubPayload, StreamingHubPayload> streamingServiceContext = default!;
Expand Down Expand Up @@ -70,6 +45,11 @@ public ConcurrentDictionary<string, object> Items

internal void Initialize(StreamingHubHandler handler, IStreamingServiceContext<StreamingHubPayload, StreamingHubPayload> streamingServiceContext, object hubInstance, ReadOnlyMemory<byte> request, DateTime timestamp, int messageId)
{
#if DEBUG
Debug.Assert(this.handler is null);
Debug.Assert(this.streamingServiceContext is null);
Debug.Assert(this.HubInstance is null);
#endif
this.handler = handler;
this.streamingServiceContext = streamingServiceContext;
HubInstance = hubInstance;
Expand All @@ -80,6 +60,12 @@ internal void Initialize(StreamingHubHandler handler, IStreamingServiceContext<S

internal void Uninitialize()
{
#if DEBUG
Debug.Assert(this.handler is not null);
Debug.Assert(this.streamingServiceContext is not null);
Debug.Assert(this.HubInstance is not null);
#endif

handler = default!;
streamingServiceContext = default!;
HubInstance = default!;
Expand Down

0 comments on commit 2a52dbf

Please sign in to comment.