Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass connection's cancellation token to package handling #717

Merged
merged 3 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/SuperSocket.Connection/ConnectionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
namespace SuperSocket.Connection
{
public abstract class ConnectionBase : IConnection
{
{
public abstract IAsyncEnumerable<TPackageInfo> RunAsync<TPackageInfo>(IPipelineFilter<TPackageInfo> pipelineFilter);

public abstract ValueTask SendAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default);

public abstract ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package, CancellationToken cancellationToken = default);

public abstract ValueTask SendAsync(Action<PipeWriter> write, CancellationToken cancellationToken = default);

public bool IsClosed { get; private set; }
Expand All @@ -28,6 +28,8 @@ public abstract class ConnectionBase : IConnection

public DateTimeOffset LastActiveTime { get; protected set; } = DateTimeOffset.Now;

public CancellationToken ConnectionToken { get; protected set; }

protected virtual void OnClosed()
{
IsClosed = true;
Expand Down
2 changes: 2 additions & 0 deletions src/SuperSocket.Connection/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,7 @@ public interface IConnection
ValueTask DetachAsync();

CloseReason? CloseReason { get; }

CancellationToken ConnectionToken { get; }
}
}
19 changes: 10 additions & 9 deletions src/SuperSocket.Connection/PipeConnectionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ PipeReader IPipeConnection.InputReader
{
get { return InputReader; }
}

IPipelineFilter IPipeConnection.PipelineFilter
{
get { return _pipelineFilter; }
Expand All @@ -52,6 +52,7 @@ protected PipeConnectionBase(PipeReader inputReader, PipeWriter outputWriter, Co
Logger = options.Logger;
InputReader = inputReader;
OutputWriter = outputWriter;
ConnectionToken = _cts.Token;
}

protected virtual Task StartTask<TPackageInfo>(IObjectPipe<TPackageInfo> packagePipe)
Expand All @@ -72,7 +73,7 @@ public async override IAsyncEnumerable<TPackageInfo> RunAsync<TPackageInfo>(IPip

_packagePipe = packagePipe;
_pipelineFilter = pipelineFilter;

_pipeTask = StartTask(packagePipe);

_ = HandleClosing();
Expand Down Expand Up @@ -118,7 +119,7 @@ private async ValueTask HandleClosing()
{
if (!IsIgnorableException(exc))
OnError("Unhandled exception in the method PipeChannel.Close.", exc);
}
}
}
}
}
Expand Down Expand Up @@ -172,7 +173,7 @@ public override async ValueTask SendAsync(ReadOnlyMemory<byte> buffer, Cancellat
finally
{
SendLock.Release();
}
}
}

private void WriteBuffer(PipeWriter writer, ReadOnlyMemory<byte> buffer)
Expand Down Expand Up @@ -236,7 +237,7 @@ protected async Task ReadPipeAsync<TPackageInfo>(PipeReader reader, IObjectPipe<
{
if (!IsIgnorableException(e) && !(e is OperationCanceledException))
OnError("Failed to read from the pipe", e);

break;
}

Expand Down Expand Up @@ -267,7 +268,7 @@ protected async Task ReadPipeAsync<TPackageInfo>(PipeReader reader, IObjectPipe<
{
completed = true;
break;
}
}
}

if (completed)
Expand Down Expand Up @@ -344,7 +345,7 @@ private bool ReaderBuffer<TPackageInfo>(ref ReadOnlySequence<byte> buffer, IPipe
Close();
return false;
}

if (packageInfo == null)
{
// the current pipeline filter needs more data to process
Expand All @@ -370,12 +371,12 @@ private bool ReaderBuffer<TPackageInfo>(ref ReadOnlySequence<byte> buffer, IPipe
examined = consumed = buffer.End;
return true;
}

if (bytesConsumed > 0)
seqReader = new SequenceReader<byte>(seqReader.Sequence.Slice(bytesConsumed));
}
}

public override async ValueTask DetachAsync()
{
_isDetaching = true;
Expand Down
23 changes: 14 additions & 9 deletions src/SuperSocket.Kestrel/KestrelPipeConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,19 @@ public KestrelPipeConnection(ConnectionContext context, ConnectionOptions option
: base(context.Transport.Input, context.Transport.Output, options)
{
_context = context;
context.ConnectionClosed.Register(() => OnClosed());
context.ConnectionClosed.Register(() => OnConnectionClosed());
LocalEndPoint = context.LocalEndPoint;
RemoteEndPoint = context.RemoteEndPoint;
}

protected override void OnClosed()
{
if (!CloseReason.HasValue)
CloseReason = Connection.CloseReason.RemoteClosing;

base.OnClosed();
}

public override ValueTask DetachAsync()
{
throw new NotSupportedException($"Detach is not supported by {nameof(KestrelPipeConnection)}.");
Expand All @@ -39,14 +47,6 @@ protected override async void Close()
}
}

protected override void OnClosed()
{
if (!CloseReason.HasValue)
CloseReason = Connection.CloseReason.RemoteClosing;

base.OnClosed();
}

protected override void OnInputPipeRead(ReadResult result)
{
if (!result.IsCanceled && !result.IsCompleted)
Expand All @@ -72,4 +72,9 @@ public override async ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> pa
await base.SendAsync(packageEncoder, package, cancellationToken);
UpdateLastActiveTime();
}

private void OnConnectionClosed()
{
Cancel();
}
}
42 changes: 19 additions & 23 deletions src/SuperSocket.Server/SuperSocketService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,11 @@ protected virtual ValueTask OnSessionClosedAsync(IAppSession session, CloseEvent
if (closedHandler != null)
return closedHandler.Invoke(session, e);

#if NETSTANDARD2_1
return GetCompletedTask();
#else
return ValueTask.CompletedTask;
#endif
#if NETSTANDARD2_1
return GetCompletedTask();
#else
return ValueTask.CompletedTask;
#endif
}

protected virtual async ValueTask FireSessionConnectedEvent(AppSession session)
Expand Down Expand Up @@ -350,7 +350,7 @@ protected virtual async ValueTask FireSessionClosedEvent(AppSession session, Clo
if (!handshakeSession.Handshaked)
return;
}

await UnRegisterSessionFromMiddlewares(session);

_logger.LogInformation($"The session disconnected: {session.SessionID} ({reason})");
Expand Down Expand Up @@ -396,18 +396,18 @@ private async ValueTask HandleSession(AppSession session, IConnection connection
var packageHandlingScheduler = _packageHandlingScheduler;

#if NET6_0_OR_GREATER
using var cancellationTokenSource = GetPackageHandlingCancellationTokenSource(CancellationToken.None);
using var cancellationTokenSource = GetPackageHandlingCancellationTokenSource(connection.ConnectionToken);
#endif

await foreach (var p in packageStream)
{
if(_packageHandlingContextAccessor != null)
if (_packageHandlingContextAccessor != null)
{
_packageHandlingContextAccessor.PackageHandlingContext = new PackageHandlingContext<IAppSession, TReceivePackageInfo>(session, p);
}

#if !NET6_0_OR_GREATER
using var cancellationTokenSource = GetPackageHandlingCancellationTokenSource(CancellationToken.None);
using var cancellationTokenSource = GetPackageHandlingCancellationTokenSource(connection.ConnectionToken);
#endif
await packageHandlingScheduler.HandlePackage(session, p, cancellationTokenSource.Token);

Expand All @@ -424,13 +424,9 @@ private async ValueTask HandleSession(AppSession session, IConnection connection

protected virtual CancellationTokenSource GetPackageHandlingCancellationTokenSource(CancellationToken cancellationToken)
{
#if NET6_0_OR_GREATER
return CancellationTokenSourcePool.Shared.Rent(TimeSpan.FromSeconds(Options.PackageHandlingTimeOut));
#else
var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(Options.PackageHandlingTimeOut));
return cancellationTokenSource;
#endif
}

protected virtual ValueTask<bool> OnSessionErrorAsync(IAppSession session, PackageHandlingException<TReceivePackageInfo> exception)
Expand Down Expand Up @@ -471,20 +467,20 @@ public async Task StartAsync(CancellationToken cancellationToken)

protected virtual ValueTask OnStartedAsync()
{
#if NETSTANDARD2_1
return GetCompletedTask();
#else
return ValueTask.CompletedTask;
#endif
#if NETSTANDARD2_1
return GetCompletedTask();
#else
return ValueTask.CompletedTask;
#endif
}

protected virtual ValueTask OnStopAsync()
{
#if NETSTANDARD2_1
return GetCompletedTask();
#else
return ValueTask.CompletedTask;
#endif
#if NETSTANDARD2_1
return GetCompletedTask();
#else
return ValueTask.CompletedTask;
#endif
}

private async Task StopListener(IConnectionListener listener)
Expand Down
Loading