Skip to content

Commit

Permalink
optimized the code about input and output pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
kerryjiang committed Mar 18, 2024
1 parent ab7e45d commit ef49457
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 130 deletions.
4 changes: 2 additions & 2 deletions src/SuperSocket.Connection/ConnectionOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ public class ConnectionOptions

public ILogger Logger { get; set; }

public Pipe In { get; set; }
public Pipe Input { get; set; }

public Pipe Out { get; set; }
public Pipe Output { get; set; }

public Dictionary<string, string> Values { get; set; }
}
Expand Down
2 changes: 1 addition & 1 deletion src/SuperSocket.Connection/IObjectPipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace SuperSocket.Connection
{
interface IObjectPipe<T>
public interface IObjectPipe<T>
{
/// <summary>
/// Write an object into the pipe
Expand Down
4 changes: 2 additions & 2 deletions src/SuperSocket.Connection/IPipeConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ namespace SuperSocket.Connection
{
public interface IPipeConnection
{
Pipe In { get; }
PipeReader InputReader { get; }

Pipe Out { get; }
PipeWriter OutputWriter { get; }

IPipelineFilter PipelineFilter { get; }
}
Expand Down
116 changes: 114 additions & 2 deletions src/SuperSocket.Connection/PipeConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,46 @@ namespace SuperSocket.Connection
{
public abstract class PipeConnection<TPackageInfo> : PipeConnectionBase<TPackageInfo>
{
protected Pipe Input { get; }

protected Pipe Output { get; }

public PipeConnection(IPipelineFilter<TPackageInfo> pipelineFilter, ConnectionOptions options)
: base(pipelineFilter, options)
: this(GetInputPipe(options), GetOutputPipe(options), pipelineFilter, options)
{
}

public PipeConnection(Pipe input, Pipe output, IPipelineFilter<TPackageInfo> pipelineFilter, ConnectionOptions options)
: base(input.Reader, output.Writer, pipelineFilter, options)
{
Input = input;
Output = output;
}

private static Pipe GetInputPipe(ConnectionOptions connectionOptions)
{
return connectionOptions.Input ?? new Pipe();
}

private static Pipe GetOutputPipe(ConnectionOptions connectionOptions)
{
return connectionOptions.Output ?? new Pipe();
}

protected override Task StartTask()
{
var pipeTask = base.StartTask();
return Task.WhenAll(pipeTask, ProcessSends());
}

protected override Task StartInputPipeTask(CancellationToken cancellationToken)
{
return Task.WhenAll(FillPipeAsync(Input.Writer, cancellationToken), base.StartInputPipeTask(cancellationToken));
}

protected virtual async Task ProcessSends()
{
var output = Out.Reader;
var output = Output.Reader;

while (true)
{
Expand All @@ -39,6 +65,92 @@ protected virtual async Task ProcessSends()
output.Complete();
}

protected abstract ValueTask<int> FillPipeWithDataAsync(Memory<byte> memory, CancellationToken cancellationToken);

protected virtual async Task FillPipeAsync(PipeWriter writer, CancellationToken cancellationToken)
{
var options = Options;
var supplyController = PackagePipe as ISupplyController;

if (supplyController != null)
{
cancellationToken.Register(() =>
{
supplyController.SupplyEnd();
});
}

while (!cancellationToken.IsCancellationRequested)
{
try
{
if (supplyController != null)
{
await supplyController.SupplyRequired().ConfigureAwait(false);

if (cancellationToken.IsCancellationRequested)
break;
}

var bufferSize = options.ReceiveBufferSize;
var maxPackageLength = options.MaxPackageLength;

if (bufferSize <= 0)
bufferSize = 1024 * 4; //4k

var memory = writer.GetMemory(bufferSize);

var bytesRead = await FillPipeWithDataAsync(memory, cancellationToken).ConfigureAwait(false);

if (bytesRead == 0)
{
if (!CloseReason.HasValue)
CloseReason = Connection.CloseReason.RemoteClosing;

break;
}

LastActiveTime = DateTimeOffset.Now;

// Tell the PipeWriter how much was read
writer.Advance(bytesRead);
}
catch (Exception e)
{
if (!IsIgnorableException(e))
{
if (!(e is OperationCanceledException))
OnError("Exception happened in ReceiveAsync", e);

if (!CloseReason.HasValue)
{
CloseReason = cancellationToken.IsCancellationRequested
? Connection.CloseReason.LocalClosing : Connection.CloseReason.SocketError;
}
}
else if (!CloseReason.HasValue)
{
CloseReason = Connection.CloseReason.RemoteClosing;
}

break;
}

// Make the data available to the PipeReader
var result = await writer.FlushAsync().ConfigureAwait(false);

if (result.IsCompleted)
{
break;
}
}

// Signal to the reader that we're done writing
await writer.CompleteAsync().ConfigureAwait(false);
// And don't allow writing data to outgoing pipeline
await Output.Writer.CompleteAsync().ConfigureAwait(false);
}

protected async ValueTask<bool> ProcessOutputRead(PipeReader reader)
{
var result = await reader.ReadAsync(CancellationToken.None).ConfigureAwait(false);
Expand Down
Loading

0 comments on commit ef49457

Please sign in to comment.