diff --git a/src/MagicOnion.Server/Hubs/StreamingHub.cs b/src/MagicOnion.Server/Hubs/StreamingHub.cs index 177a6ee7b..1c915cdb7 100644 --- a/src/MagicOnion.Server/Hubs/StreamingHub.cs +++ b/src/MagicOnion.Server/Hubs/StreamingHub.cs @@ -189,13 +189,16 @@ async Task HandleMessageAsync() async ValueTask ConsumeRequestQueueAsync() { + // Create and reuse a single StreamingHubContext for each hub connection. + var context = new StreamingHubContext(); + // We need to process client requests sequentially. // NOTE: Do not pass a CancellationToken to avoid allocation. We call Writer.Complete when we want to stop the consumption loop. await foreach (var request in requests.Reader.ReadAllAsync(default)) { try { - await ProcessRequestAsync(request.Handlers, request.MethodId, request.MessageId, request.Body, request.HasResponse); + await ProcessRequestAsync(context, request.Handlers, request.MethodId, request.MessageId, request.Body, request.HasResponse); } finally { @@ -266,12 +269,10 @@ static void ThrowUnknownMessageType(StreamingHubMessageType messageType) => throw new InvalidOperationException($"Unknown MessageType: {messageType}"); } - ValueTask ProcessRequestAsync(UniqueHashDictionary handlers, int methodId, int messageId, ReadOnlyMemory body, bool hasResponse) + ValueTask ProcessRequestAsync(StreamingHubContext context, UniqueHashDictionary handlers, int methodId, int messageId, ReadOnlyMemory body, bool hasResponse) { var handler = GetOrThrowHandler(handlers, methodId); - // Create a context for each call to the hub method. - var context = StreamingHubContextPool.Shared.Get(); context.Initialize( handler: handler, streamingServiceContext: (IStreamingServiceContext)Context, @@ -366,7 +367,7 @@ void CleanupRequest(StreamingHubContext context, StreamingHubHandler handler, lo var elapsed = timeProvider.GetElapsedTime(methodStartingTimestamp, methodEndingTimestamp); MagicOnionServerLog.EndInvokeHubMethod(Context.MethodHandler.Logger, context, context.ResponseSize, context.ResponseType, elapsed.TotalMilliseconds, isErrorOrInterrupted); Metrics.StreamingHubMethodCompleted(Context.Metrics, handler, methodStartingTimestamp, methodEndingTimestamp, isErrorOrInterrupted); - StreamingHubContextPool.Shared.Return(context); + context.Uninitialize(); } // Interface methods for Client diff --git a/src/MagicOnion.Server/Hubs/StreamingHubContext.cs b/src/MagicOnion.Server/Hubs/StreamingHubContext.cs index 9465fb3ad..1716033f7 100644 --- a/src/MagicOnion.Server/Hubs/StreamingHubContext.cs +++ b/src/MagicOnion.Server/Hubs/StreamingHubContext.cs @@ -1,36 +1,11 @@ using MagicOnion.Internal.Buffers; using MessagePack; using System.Collections.Concurrent; +using System.Diagnostics; using MagicOnion.Internal; -using Microsoft.Extensions.ObjectPool; namespace MagicOnion.Server.Hubs; -internal class StreamingHubContextPool -{ - const int MaxRetainedCount = 16; - readonly ObjectPool pool = new DefaultObjectPool(new Policy(), MaxRetainedCount); - - public static StreamingHubContextPool Shared { get; } = new(); - - public StreamingHubContext Get() => pool.Get(); - public void Return(StreamingHubContext ctx) => pool.Return(ctx); - - class Policy : IPooledObjectPolicy - { - public StreamingHubContext Create() - { - return new StreamingHubContext(); - } - - public bool Return(StreamingHubContext obj) - { - obj.Uninitialize(); - return true; - } - } -} - public class StreamingHubContext { IStreamingServiceContext streamingServiceContext = default!; @@ -70,6 +45,11 @@ public ConcurrentDictionary Items internal void Initialize(StreamingHubHandler handler, IStreamingServiceContext streamingServiceContext, object hubInstance, ReadOnlyMemory request, DateTime timestamp, int messageId) { +#if DEBUG + Debug.Assert(this.handler is null); + Debug.Assert(this.streamingServiceContext is null); + Debug.Assert(this.HubInstance is null); +#endif this.handler = handler; this.streamingServiceContext = streamingServiceContext; HubInstance = hubInstance; @@ -80,6 +60,12 @@ internal void Initialize(StreamingHubHandler handler, IStreamingServiceContext