From 352f9b7d525b1926cb033812d05d820417df30fa Mon Sep 17 00:00:00 2001 From: gribunin Date: Fri, 12 Aug 2016 23:07:54 +0400 Subject: [PATCH 1/4] _readSemaphore has been replaced with _readEvent, because an event can be safely Set multiple times in a row, while semaphore can be Release'ed beyond its maximum level. When server sends messages faster than the client reads, and OnChannelMessageReceived was called while ReceiveAsync is still executing, the _readSemaphore was Release'ed twice -- first in on OnChannelMessageReceived, then second time at the end of ReceiveAsync (because readItems.Count > 0) which lead to unhandled SemaphoreFullException in ReceiveAsync --- .../Griffin.Core/Net/ChannelTcpClient.cs | 805 +++++++++--------- 1 file changed, 405 insertions(+), 400 deletions(-) diff --git a/src/Griffin.Framework/Griffin.Core/Net/ChannelTcpClient.cs b/src/Griffin.Framework/Griffin.Core/Net/ChannelTcpClient.cs index 5fffd51..ddc9993 100644 --- a/src/Griffin.Framework/Griffin.Core/Net/ChannelTcpClient.cs +++ b/src/Griffin.Framework/Griffin.Core/Net/ChannelTcpClient.cs @@ -1,401 +1,406 @@ -using System; -using System.Collections.Concurrent; -using System.Net; -using System.Net.Sockets; -using System.Threading; -using System.Threading.Tasks; -using Griffin.Net.Authentication; -using Griffin.Net.Buffers; -using Griffin.Net.Channels; - -namespace Griffin.Net -{ - /// - /// Can talk with messaging servers (i.e. servers based on ). - /// - /// - /// - /// You can use the property if you want to have a callback for incoming messages. - /// - /// - public class ChannelTcpClient : IDisposable - { - private readonly SocketAsyncEventArgs _args = new SocketAsyncEventArgs(); - private readonly SemaphoreSlim _connectSemaphore = new SemaphoreSlim(0, 1); - private readonly IMessageDecoder _decoder; - private readonly IMessageEncoder _encoder; - private readonly IBufferSlice _readBuffer; - private readonly ConcurrentQueue _readItems = new ConcurrentQueue(); - private readonly SemaphoreSlim _readSemaphore = new SemaphoreSlim(0, 1); - private readonly SemaphoreSlim _sendCompletedSemaphore = new SemaphoreSlim(0, 1); - private readonly SemaphoreSlim _sendQueueSemaphore = new SemaphoreSlim(1, 1); - private ITcpChannel _channel; - private Exception _connectException; - private FilterMessageHandler _filterHandler; - private Exception _sendException; - private Socket _socket; - - /// - /// Initializes a new instance of the class. - /// - /// Used to encode outbound messages. - /// Used to decode inbound messages. - public ChannelTcpClient(IMessageEncoder encoder, IMessageDecoder decoder) - : this(encoder, decoder, new BufferSlice(new byte[65535], 0, 65535)) - { - } - - /// - /// Initializes a new instance of the class. - /// - /// Used to encode outbound messages. - /// Used to decode inbound messages. - /// Buffer used to receive bytes.. - /// - /// encoder - /// or - /// decoder - /// or - /// readBuffer - /// - public ChannelTcpClient(IMessageEncoder encoder, IMessageDecoder decoder, IBufferSlice readBuffer) - { - if (encoder == null) throw new ArgumentNullException("encoder"); - if (decoder == null) throw new ArgumentNullException("decoder"); - if (readBuffer == null) throw new ArgumentNullException("readBuffer"); - - _encoder = encoder; - _decoder = decoder; - _readBuffer = readBuffer; - - _args.Completed += OnConnect; - } - - /// - /// Set certificate if you want to use secure connections. - /// - public ISslStreamBuilder Certificate { get; set; } - - /// - /// Set if you want to authenticate against a server. - /// - public IClientAuthenticator Authenticator { get; set; } - - /// - /// Gets if channel is connected - /// - public bool IsConnected - { - get { return _channel != null && _channel.IsConnected; } - } - - /// - /// Delegate which can be used instead of or to inspect all incoming messages before they - /// are passed to . - /// - public FilterMessageHandler Filter - { - get { return _filterHandler; } - set - { - if (value == null) - throw new ArgumentNullException("value"); - - _filterHandler = value; - } - } - - /// - /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. - /// - public void Dispose() - { - if (_channel == null) - return; - - _channel.Close(); - _channel = null; - } - - /// - /// Wait for all messages to be sent and close the connection - /// - /// Async task - public async Task CloseAsync() - { - await _channel.CloseAsync(); - _channel = null; - } - - /// - /// Connects to remote end point. - /// - /// Address to connect to. - /// Remote port. - /// Async task - /// Socket is already connected - public async Task ConnectAsync(IPAddress address, int port) - { - EnsureChannel(); - - if (_socket != null) - throw new InvalidOperationException("Socket is already connected"); - - _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); - _args.RemoteEndPoint = new IPEndPoint(address, port); - var isPending = _socket.ConnectAsync(_args); - - if (!isPending) - return; - - await _connectSemaphore.WaitAsync(); - if (_connectException != null) - throw _connectException; - } - - private void EnsureChannel() - { - if (_channel == null) - { - if (Certificate != null) - _channel = new SecureTcpChannel(_readBuffer, _encoder, _decoder, Certificate); - else - _channel = new TcpChannel(_readBuffer, _encoder, _decoder); - - _channel.Disconnected = OnDisconnect; - _channel.MessageSent = OnSendCompleted; - _channel.MessageReceived = OnChannelMessageReceived; - } - } - - /// - /// Connects to remote end point. - /// - /// Address to connect to. - /// Remote port. - /// Maximum amount of time to wait for a connection. - /// Async task - /// Socket is already connected - public async Task ConnectAsync(IPAddress address, int port, TimeSpan timeout) - { - if (_socket != null) - throw new InvalidOperationException("Socket is already connected"); - - EnsureChannel(); - - _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); - _args.RemoteEndPoint = new IPEndPoint(address, port); - var isPending = _socket.ConnectAsync(_args); - - if (!isPending) - return; - - await _connectSemaphore.WaitAsync(timeout); - } - - - /// - /// Receive a message - /// - /// Decoded message - public Task ReceiveAsync() - { - return ReceiveAsync(TimeSpan.FromMilliseconds(-1), CancellationToken.None); - } - - /// - /// Receive a message - /// - /// Decoded message - public async Task ReceiveAsync() where T : class - { - var item = await ReceiveAsync(TimeSpan.FromMilliseconds(-1), CancellationToken.None); - if (item == null) - return null; - var casted = item as T; - if (casted == null) - throw new InvalidCastException(string.Format("Failed to cast '{0}' as '{1}'.", item.GetType().FullName, - typeof (T).FullName)); - - return casted; - } - - /// - /// Receive a message - /// - /// Token used to cancel the pending read operation. - /// - /// Decoded message if successful; default(T) if cancellation is requested. - /// - public Task ReceiveAsync(CancellationToken cancellation) - { - return ReceiveAsync(TimeSpan.FromMilliseconds(-1), CancellationToken.None); - } - - /// - /// Receives the asynchronous. - /// - /// Maximum amount of time to wait on a message - /// Decoded message - /// - /// Was signalled that something have been received, but found nothing in - /// the in queue - /// - public Task ReceiveAsync(TimeSpan timeout) - { - return ReceiveAsync(timeout, CancellationToken.None); - } - - /// - /// Receive a message - /// - /// Maximum amount of time to wait on a message - /// Token used to cancel the pending read operation. - /// - /// Decoded message if successful; default(T) if cancellation is requested. - /// - /// - /// Was signalled that something have been received, but found nothing in - /// the in queue - /// - public async Task ReceiveAsync(TimeSpan timeout, CancellationToken cancellation) - { - await _readSemaphore.WaitAsync(timeout, cancellation); - if (cancellation.IsCancellationRequested) - return null; - - object item; - var gotItem = _readItems.TryDequeue(out item); - - if (!gotItem) - throw new ChannelException( - "Was signalled that something have been received, but found nothing in the in queue"); - - if (item is ChannelException) - throw (ChannelException) item; - - // signal so that more items can be read directly - if (_readItems.Count > 0) - _readSemaphore.Release(); - - return item; - } - - /// - /// Send message to the remote end point. - /// - /// message to send. - /// - /// message - /// - /// - /// All messages are being enqueued and sent in order. This method will return when the current message have been - /// sent. It - /// - /// - /// The method is thread safe and can be executed from multiple threads. - /// - /// - public async Task SendAsync(object message) - { - if (message == null) throw new ArgumentNullException("message"); - - if (_sendException != null) - { - var ex = _sendException; - _sendException = null; - throw new AggregateException(ex); - } - if (Authenticator != null && Authenticator.AuthenticationFailed) - throw new AuthenticationDeniedException("Failed to authenticate"); - - await _sendQueueSemaphore.WaitAsync(); - - _channel.Send(message); - - - await _sendCompletedSemaphore.WaitAsync(); - _sendQueueSemaphore.Release(); - } - - private void OnChannelMessageReceived(ITcpChannel channel, object message) - { - if (_filterHandler != null) - { - var result = _filterHandler(channel, message); - if (result == ClientFilterResult.Revoke) - return; - } - - - _readItems.Enqueue(message); - if (_readSemaphore.CurrentCount == 0) - _readSemaphore.Release(); - } - - /// - /// Pre processes incoming bytes before they are passed to the message builder. - /// - /// - /// - /// Can be used if you for instance use a custom authentication mechanism which needs to process incoming - /// bytes instead of deserialized messages. - /// - /// - public BufferPreProcessorHandler BufferPreProcessor - { - get { return _channel.BufferPreProcessor; } - set { _channel.BufferPreProcessor = value; } - } - - - private void OnConnect(object sender, SocketAsyncEventArgs e) - { - if (e.SocketError != SocketError.Success) - { - _connectException = new SocketException((int) e.SocketError); - _socket = null; - } - else - _channel.Assign(_socket); - - _connectSemaphore.Release(); - } - - private void OnDisconnect(ITcpChannel arg1, Exception arg2) - { - _socket = null; - - if (_sendCompletedSemaphore.CurrentCount == 0) - { - _sendException = arg2; - _sendCompletedSemaphore.Release(); - } - - if (_readSemaphore.CurrentCount == 0) - { - _readItems.Enqueue(new ChannelException("Socket got disconnected", arg2)); - _readSemaphore.Release(); - } - } - - //private void OnMessageReceived(object obj) - //{ - // if (_filterHandler != null) - // { - // var result = _filterHandler(_channel, obj); - // if (result == ClientFilterResult.Revoke) - // return; - // } - - // _readItems.Enqueue(obj); - // if (_readSemaphore.CurrentCount == 0) - // _readSemaphore.Release(); - //} - - private void OnSendCompleted(ITcpChannel channel, object sentMessage) - { - _sendCompletedSemaphore.Release(); - } - } +using System; +using System.Collections.Concurrent; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using Griffin.Net.Authentication; +using Griffin.Net.Buffers; +using Griffin.Net.Channels; + +namespace Griffin.Net +{ + /// + /// Can talk with messaging servers (i.e. servers based on ). + /// + /// + /// + /// You can use the property if you want to have a callback for incoming messages. + /// + /// + public class ChannelTcpClient : IDisposable + { + private readonly SocketAsyncEventArgs _args = new SocketAsyncEventArgs(); + private readonly SemaphoreSlim _connectSemaphore = new SemaphoreSlim(0, 1); + private readonly IMessageDecoder _decoder; + private readonly IMessageEncoder _encoder; + private readonly IBufferSlice _readBuffer; + private readonly ConcurrentQueue _readItems = new ConcurrentQueue(); + private readonly AutoResetEvent _readEvent = new AutoResetEvent(false); + private readonly SemaphoreSlim _sendCompletedSemaphore = new SemaphoreSlim(0, 1); + private readonly SemaphoreSlim _sendQueueSemaphore = new SemaphoreSlim(1, 1); + private ITcpChannel _channel; + private Exception _connectException; + private FilterMessageHandler _filterHandler; + private Exception _sendException; + private Socket _socket; + + /// + /// Initializes a new instance of the class. + /// + /// Used to encode outbound messages. + /// Used to decode inbound messages. + public ChannelTcpClient(IMessageEncoder encoder, IMessageDecoder decoder) + : this(encoder, decoder, new BufferSlice(new byte[65535], 0, 65535)) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// Used to encode outbound messages. + /// Used to decode inbound messages. + /// Buffer used to receive bytes.. + /// + /// encoder + /// or + /// decoder + /// or + /// readBuffer + /// + public ChannelTcpClient(IMessageEncoder encoder, IMessageDecoder decoder, IBufferSlice readBuffer) + { + if (encoder == null) throw new ArgumentNullException("encoder"); + if (decoder == null) throw new ArgumentNullException("decoder"); + if (readBuffer == null) throw new ArgumentNullException("readBuffer"); + + _encoder = encoder; + _decoder = decoder; + _readBuffer = readBuffer; + + _args.Completed += OnConnect; + } + + /// + /// Set certificate if you want to use secure connections. + /// + public ISslStreamBuilder Certificate { get; set; } + + /// + /// Set if you want to authenticate against a server. + /// + public IClientAuthenticator Authenticator { get; set; } + + /// + /// Gets if channel is connected + /// + public bool IsConnected + { + get { return _channel != null && _channel.IsConnected; } + } + + /// + /// Delegate which can be used instead of or to inspect all incoming messages before they + /// are passed to . + /// + public FilterMessageHandler Filter + { + get { return _filterHandler; } + set + { + if (value == null) + throw new ArgumentNullException("value"); + + _filterHandler = value; + } + } + + /// + /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. + /// + public void Dispose() + { + if (_channel == null) + return; + + _channel.Close(); + _channel = null; + } + + /// + /// Wait for all messages to be sent and close the connection + /// + /// Async task + public async Task CloseAsync() + { + await _channel.CloseAsync(); + _channel = null; + } + + /// + /// Connects to remote end point. + /// + /// Address to connect to. + /// Remote port. + /// Async task + /// Socket is already connected + public async Task ConnectAsync(IPAddress address, int port) + { + EnsureChannel(); + + if (_socket != null) + throw new InvalidOperationException("Socket is already connected"); + + _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + _args.RemoteEndPoint = new IPEndPoint(address, port); + var isPending = _socket.ConnectAsync(_args); + + if (!isPending) + return; + + await _connectSemaphore.WaitAsync(); + if (_connectException != null) + throw _connectException; + } + + private void EnsureChannel() + { + if (_channel == null) + { + if (Certificate != null) + _channel = new SecureTcpChannel(_readBuffer, _encoder, _decoder, Certificate); + else + _channel = new TcpChannel(_readBuffer, _encoder, _decoder); + + _channel.Disconnected = OnDisconnect; + _channel.MessageSent = OnSendCompleted; + _channel.MessageReceived = OnChannelMessageReceived; + } + } + + /// + /// Connects to remote end point. + /// + /// Address to connect to. + /// Remote port. + /// Maximum amount of time to wait for a connection. + /// Async task + /// Socket is already connected + public async Task ConnectAsync(IPAddress address, int port, TimeSpan timeout) + { + if (_socket != null) + throw new InvalidOperationException("Socket is already connected"); + + EnsureChannel(); + + _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + _args.RemoteEndPoint = new IPEndPoint(address, port); + var isPending = _socket.ConnectAsync(_args); + + if (!isPending) + return; + + await _connectSemaphore.WaitAsync(timeout); + } + + + /// + /// Receive a message + /// + /// Decoded message + public Task ReceiveAsync() + { + return ReceiveAsync(TimeSpan.FromMilliseconds(-1), CancellationToken.None); + } + + /// + /// Receive a message + /// + /// Decoded message + public async Task ReceiveAsync() where T : class + { + var item = await ReceiveAsync(TimeSpan.FromMilliseconds(-1), CancellationToken.None); + if (item == null) + return null; + var casted = item as T; + if (casted == null) + throw new InvalidCastException(string.Format("Failed to cast '{0}' as '{1}'.", item.GetType().FullName, + typeof (T).FullName)); + + return casted; + } + + /// + /// Receive a message + /// + /// Token used to cancel the pending read operation. + /// + /// Decoded message if successful; default(T) if cancellation is requested. + /// + public Task ReceiveAsync(CancellationToken cancellation) + { + return ReceiveAsync(TimeSpan.FromMilliseconds(-1), CancellationToken.None); + } + + /// + /// Receives the asynchronous. + /// + /// Maximum amount of time to wait on a message + /// Decoded message + /// + /// Was signalled that something have been received, but found nothing in + /// the in queue + /// + public Task ReceiveAsync(TimeSpan timeout) + { + return ReceiveAsync(timeout, CancellationToken.None); + } + + /// + /// Receive a message + /// + /// Maximum amount of time to wait on a message + /// Token used to cancel the pending read operation. + /// + /// Decoded message if successful; default(T) if cancellation is requested. + /// + /// + /// Was signalled that something have been received, but found nothing in + /// the in queue + /// + public async Task ReceiveAsync(TimeSpan timeout, CancellationToken cancellation) + { + return await Task.Run(() => + { + var handles = new WaitHandle[] { _readEvent, cancellation.WaitHandle }; + + WaitHandle.WaitAny(handles); + + if (cancellation.IsCancellationRequested) + return null; + + object item; + var gotItem = _readItems.TryDequeue(out item); + + if (!gotItem) + throw new ChannelException( + "Was signalled that something have been received, but found nothing in the in queue"); + + if (item is ChannelException) + throw (ChannelException)item; + + // signal so that more items can be read directly + if (_readItems.Count > 0) + _readEvent.Set(); + + return item; + }); + } + + /// + /// Send message to the remote end point. + /// + /// message to send. + /// + /// message + /// + /// + /// All messages are being enqueued and sent in order. This method will return when the current message have been + /// sent. It + /// + /// + /// The method is thread safe and can be executed from multiple threads. + /// + /// + public async Task SendAsync(object message) + { + if (message == null) throw new ArgumentNullException("message"); + + if (_sendException != null) + { + var ex = _sendException; + _sendException = null; + throw new AggregateException(ex); + } + if (Authenticator != null && Authenticator.AuthenticationFailed) + throw new AuthenticationDeniedException("Failed to authenticate"); + + await _sendQueueSemaphore.WaitAsync(); + + _channel.Send(message); + + + await _sendCompletedSemaphore.WaitAsync(); + _sendQueueSemaphore.Release(); + } + + private void OnChannelMessageReceived(ITcpChannel channel, object message) + { + if (_filterHandler != null) + { + var result = _filterHandler(channel, message); + if (result == ClientFilterResult.Revoke) + return; + } + + + _readItems.Enqueue(message); + _readEvent.Set(); + } + + /// + /// Pre processes incoming bytes before they are passed to the message builder. + /// + /// + /// + /// Can be used if you for instance use a custom authentication mechanism which needs to process incoming + /// bytes instead of deserialized messages. + /// + /// + public BufferPreProcessorHandler BufferPreProcessor + { + get { return _channel.BufferPreProcessor; } + set { _channel.BufferPreProcessor = value; } + } + + + private void OnConnect(object sender, SocketAsyncEventArgs e) + { + if (e.SocketError != SocketError.Success) + { + _connectException = new SocketException((int) e.SocketError); + _socket = null; + } + else + _channel.Assign(_socket); + + _connectSemaphore.Release(); + } + + private void OnDisconnect(ITcpChannel arg1, Exception arg2) + { + _socket = null; + + if (_sendCompletedSemaphore.CurrentCount == 0) + { + _sendException = arg2; + _sendCompletedSemaphore.Release(); + } + + if (!_readEvent.WaitOne(0)) + { + _readItems.Enqueue(new ChannelException("Socket got disconnected", arg2)); + _readEvent.Set(); + } + } + + //private void OnMessageReceived(object obj) + //{ + // if (_filterHandler != null) + // { + // var result = _filterHandler(_channel, obj); + // if (result == ClientFilterResult.Revoke) + // return; + // } + + // _readItems.Enqueue(obj); + // if (_readSemaphore.CurrentCount == 0) + // _readSemaphore.Release(); + //} + + private void OnSendCompleted(ITcpChannel channel, object sentMessage) + { + _sendCompletedSemaphore.Release(); + } + } } \ No newline at end of file From 6cfed5cec719d6fd150ed654be7cb13aebd17653 Mon Sep 17 00:00:00 2001 From: gribunin Date: Fri, 12 Aug 2016 23:09:27 +0400 Subject: [PATCH 2/4] Untabified source code --- .../Griffin.Core/Net/ChannelTcpClient.cs | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/src/Griffin.Framework/Griffin.Core/Net/ChannelTcpClient.cs b/src/Griffin.Framework/Griffin.Core/Net/ChannelTcpClient.cs index ddc9993..dd74556 100644 --- a/src/Griffin.Framework/Griffin.Core/Net/ChannelTcpClient.cs +++ b/src/Griffin.Framework/Griffin.Core/Net/ChannelTcpClient.cs @@ -26,7 +26,7 @@ public class ChannelTcpClient : IDisposable private readonly IMessageEncoder _encoder; private readonly IBufferSlice _readBuffer; private readonly ConcurrentQueue _readItems = new ConcurrentQueue(); - private readonly AutoResetEvent _readEvent = new AutoResetEvent(false); + private readonly AutoResetEvent _readEvent = new AutoResetEvent(false); private readonly SemaphoreSlim _sendCompletedSemaphore = new SemaphoreSlim(0, 1); private readonly SemaphoreSlim _sendQueueSemaphore = new SemaphoreSlim(1, 1); private ITcpChannel _channel; @@ -260,31 +260,31 @@ public Task ReceiveAsync(TimeSpan timeout) /// public async Task ReceiveAsync(TimeSpan timeout, CancellationToken cancellation) { - return await Task.Run(() => - { - var handles = new WaitHandle[] { _readEvent, cancellation.WaitHandle }; + return await Task.Run(() => + { + var handles = new WaitHandle[] { _readEvent, cancellation.WaitHandle }; - WaitHandle.WaitAny(handles); + WaitHandle.WaitAny(handles); - if (cancellation.IsCancellationRequested) - return null; + if (cancellation.IsCancellationRequested) + return null; - object item; - var gotItem = _readItems.TryDequeue(out item); + object item; + var gotItem = _readItems.TryDequeue(out item); - if (!gotItem) - throw new ChannelException( - "Was signalled that something have been received, but found nothing in the in queue"); + if (!gotItem) + throw new ChannelException( + "Was signalled that something have been received, but found nothing in the in queue"); - if (item is ChannelException) - throw (ChannelException)item; + if (item is ChannelException) + throw (ChannelException)item; - // signal so that more items can be read directly - if (_readItems.Count > 0) - _readEvent.Set(); + // signal so that more items can be read directly + if (_readItems.Count > 0) + _readEvent.Set(); - return item; - }); + return item; + }); } /// @@ -335,7 +335,7 @@ private void OnChannelMessageReceived(ITcpChannel channel, object message) _readItems.Enqueue(message); - _readEvent.Set(); + _readEvent.Set(); } /// @@ -380,7 +380,7 @@ private void OnDisconnect(ITcpChannel arg1, Exception arg2) if (!_readEvent.WaitOne(0)) { _readItems.Enqueue(new ChannelException("Socket got disconnected", arg2)); - _readEvent.Set(); + _readEvent.Set(); } } From 81865b070a1094feacbb8823d456ea260dfe25be Mon Sep 17 00:00:00 2001 From: gribunin Date: Sun, 14 Aug 2016 19:34:48 +0400 Subject: [PATCH 3/4] Fixed ReceiveAsync function with cancellation token (the given parameter was not used). More solid pattern handling of CancellationToken in ReceiveAsync --- src/Griffin.Framework/Griffin.Core/Net/ChannelTcpClient.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Griffin.Framework/Griffin.Core/Net/ChannelTcpClient.cs b/src/Griffin.Framework/Griffin.Core/Net/ChannelTcpClient.cs index dd74556..dff2503 100644 --- a/src/Griffin.Framework/Griffin.Core/Net/ChannelTcpClient.cs +++ b/src/Griffin.Framework/Griffin.Core/Net/ChannelTcpClient.cs @@ -229,7 +229,7 @@ public async Task ReceiveAsync() where T : class /// public Task ReceiveAsync(CancellationToken cancellation) { - return ReceiveAsync(TimeSpan.FromMilliseconds(-1), CancellationToken.None); + return ReceiveAsync(TimeSpan.FromMilliseconds(-1), cancellation); } /// From 904adbe11b0a476f5068debbb550496077523419 Mon Sep 17 00:00:00 2001 From: gribunin Date: Thu, 18 Aug 2016 18:42:29 +0400 Subject: [PATCH 4/4] Fixed bug in MicroMessageEncoder while encoding object with size bigger than 65535 bytes --- .../Protocols/MicroMsg/MicroMessageEncoder.cs | 460 +++++++++--------- 1 file changed, 230 insertions(+), 230 deletions(-) diff --git a/src/Griffin.Framework/Griffin.Core/Net/Protocols/MicroMsg/MicroMessageEncoder.cs b/src/Griffin.Framework/Griffin.Core/Net/Protocols/MicroMsg/MicroMessageEncoder.cs index 654722c..3a32981 100644 --- a/src/Griffin.Framework/Griffin.Core/Net/Protocols/MicroMsg/MicroMessageEncoder.cs +++ b/src/Griffin.Framework/Griffin.Core/Net/Protocols/MicroMsg/MicroMessageEncoder.cs @@ -1,231 +1,231 @@ -using System; -using System.IO; -using System.Text; -using Griffin.Net.Buffers; -using Griffin.Net.Channels; -using Griffin.Net.Protocols.Serializers; - -namespace Griffin.Net.Protocols.MicroMsg -{ - /// - /// Takes any object that the serializer supports and transfers it over the wire. - /// - /// - /// The encoder also natively supports byte[] arrays and Stream derived objects (as long as the stream - /// have a size specified). These objects - /// will be transferred without invoking the serializer. - /// - public class MicroMessageEncoder : IMessageEncoder - { - /// - /// PROTOCOL version - /// - public const byte Version = MicroMessageDecoder.Version; - - /// - /// Size of the fixed header: version (1), content length (4), type name length (1) = 8 - /// - /// - /// The header size field is not included in the actual header count as it always have to be read to - /// get the actual header size. - /// - public const int FixedHeaderLength = MicroMessageDecoder.FixedHeaderLength; - - private readonly IBufferSlice _bufferSlice; - - private readonly MemoryStream _internalStream = new MemoryStream(); - private readonly IMessageSerializer _serializer; - private Stream _bodyStream; - private int _bytesEnqueued; - private int _bytesLeftToSend; - private int _bytesTransferred; - private bool _headerIsSent; - private int _headerSize; - private object _message; - - /// - /// Initializes a new instance of the class. - /// - /// - /// Serializer used to serialize the messages that should be sent. You might want to pick a - /// serializer which is reasonable fast. - /// - public MicroMessageEncoder(IMessageSerializer serializer) - { - if (serializer == null) throw new ArgumentNullException("serializer"); - - _serializer = serializer; - _bufferSlice = new BufferSlice(new byte[65535], 0, 65535); - } - - /// - /// Initializes a new instance of the class. - /// - /// - /// Serializer used to serialize the messages that should be sent. You might want to pick a - /// serializer which is reasonable fast. - /// - /// Used when sending information. - /// - /// bufferSlice; At least the header should fit in the buffer, and the header - /// can be up to 520 bytes in the current version. - /// - public MicroMessageEncoder(IMessageSerializer serializer, IBufferSlice bufferSlice) - { - if (serializer == null) throw new ArgumentNullException("serializer"); - if (bufferSlice == null) throw new ArgumentNullException("bufferSlice"); - if (bufferSlice.Capacity < 520) - throw new ArgumentOutOfRangeException("bufferSlice", bufferSlice.Capacity, - "At least the header should fit in the buffer, and the header can be up to 520 bytes in the current version"); - - - _serializer = serializer; - _bufferSlice = bufferSlice; - } - - - /// - /// Are about to send a new message - /// - /// Message to send - /// - /// Can be used to prepare the next message. for instance serialize it etc. - /// - /// Message is of a type that the encoder cannot handle. - public void Prepare(object message) - { - if (message == null) throw new ArgumentNullException("message"); - _message = message; - _headerIsSent = false; - } - - /// - /// Serialize message and sent it add it to the buffer - /// - /// Socket buffer - public void Send(ISocketBuffer args) - { - if (_bytesTransferred < _bytesEnqueued) - { - //TODO: Is this faster than moving the bytes to the beginning of the buffer and append more bytes? - args.SetBuffer(_bufferSlice.Buffer, _bufferSlice.Offset + _bytesTransferred, - _bytesEnqueued - _bytesTransferred); - return; - } - - if (!_headerIsSent) - { - var headerLength = CreateHeader(); - var bytesToWrite = (int) Math.Min(_bufferSlice.Capacity - headerLength, _bodyStream.Length); - _bodyStream.Read(_bufferSlice.Buffer, _bufferSlice.Offset + headerLength, bytesToWrite); - args.SetBuffer(_bufferSlice.Buffer, _bufferSlice.Offset, bytesToWrite + headerLength); - _bytesEnqueued = headerLength + bytesToWrite; - _bytesLeftToSend = headerLength + (int) _bodyStream.Length; - } - else - { - _bytesEnqueued = Math.Min(_bufferSlice.Capacity, _bytesLeftToSend); - _bodyStream.Write(_bufferSlice.Buffer, _bufferSlice.Offset, _bytesEnqueued); - args.SetBuffer(_bufferSlice.Buffer, _bufferSlice.Offset, _bytesEnqueued); - } - } - - /// - /// The previous has just completed. - /// - /// - /// - /// true if the message have been sent successfully; otherwise false. - /// - public bool OnSendCompleted(int bytesTransferred) - { - // Make sure that the header is sent - // required so that the Send() method can switch to the body state. - if (!_headerIsSent) - { - _headerSize -= bytesTransferred; - if (_headerSize <= 0) - { - _headerIsSent = true; - _headerSize = 0; - } - } - - _bytesTransferred = bytesTransferred; - _bytesLeftToSend -= bytesTransferred; - if (_bytesLeftToSend == 0) - Clear(); - - return _bytesLeftToSend == 0; - } - - /// - /// Remove everything used for the last message - /// - public void Clear() - { - _bytesEnqueued = 0; - _bytesTransferred = 0; - _bytesLeftToSend = 0; - - if (!ReferenceEquals(_bodyStream, _internalStream)) - { - //bodyStream is null for channels that connected - //but never sent a message. - if (_bodyStream != null) - _bodyStream.Close(); - _bodyStream = null; - } - else - _internalStream.SetLength(0); - - _headerIsSent = false; - _headerSize = 0; - _message = null; - } - - private int CreateHeader() - { - string contentType; - - if (_message is Stream) - { - _bodyStream = (Stream) _message; - contentType = "stream"; - } - else if (_message is byte[]) - { - var buffer = (byte[]) _message; - _bodyStream = new MemoryStream(buffer); - _bodyStream.SetLength(buffer.Length); - contentType = "byte[]"; - } - else - { - _bodyStream = _internalStream; - _serializer.Serialize(_message, _bodyStream, out contentType); - if (contentType == null) - contentType = _message.GetType().AssemblyQualifiedName; - if (contentType.Length > byte.MaxValue) - throw new InvalidOperationException( - "The AssemblyQualifiedName (type name) may not be larger than 255 characters. Your type: " + - _message.GetType().AssemblyQualifiedName); - } - - var sliceOffset = _bufferSlice.Offset; - var sliceBuffer = _bufferSlice.Buffer; - _bodyStream.Position = 0; - _headerSize = FixedHeaderLength + contentType.Length; - - BitConverter2.GetBytes((ushort) _headerSize, sliceBuffer, sliceOffset); - _bufferSlice.Buffer[sliceOffset + 2] = Version; - BitConverter2.GetBytes((int) _bodyStream.Length, sliceBuffer, sliceOffset + 2 + 1); - BitConverter2.GetBytes((byte) contentType.Length, sliceBuffer, sliceOffset + 2 + 1 + 4); - Encoding.UTF8.GetBytes(contentType, 0, contentType.Length, sliceBuffer, sliceOffset + 2 + 1 + 4 + 1); - - // the header length field is not included in _headerSize as it's a header prefix. - // hence the +2 - return _headerSize + 2; - } - } +using System; +using System.IO; +using System.Text; +using Griffin.Net.Buffers; +using Griffin.Net.Channels; +using Griffin.Net.Protocols.Serializers; + +namespace Griffin.Net.Protocols.MicroMsg +{ + /// + /// Takes any object that the serializer supports and transfers it over the wire. + /// + /// + /// The encoder also natively supports byte[] arrays and Stream derived objects (as long as the stream + /// have a size specified). These objects + /// will be transferred without invoking the serializer. + /// + public class MicroMessageEncoder : IMessageEncoder + { + /// + /// PROTOCOL version + /// + public const byte Version = MicroMessageDecoder.Version; + + /// + /// Size of the fixed header: version (1), content length (4), type name length (1) = 8 + /// + /// + /// The header size field is not included in the actual header count as it always have to be read to + /// get the actual header size. + /// + public const int FixedHeaderLength = MicroMessageDecoder.FixedHeaderLength; + + private readonly IBufferSlice _bufferSlice; + + private readonly MemoryStream _internalStream = new MemoryStream(); + private readonly IMessageSerializer _serializer; + private Stream _bodyStream; + private int _bytesEnqueued; + private int _bytesLeftToSend; + private int _bytesTransferred; + private bool _headerIsSent; + private int _headerSize; + private object _message; + + /// + /// Initializes a new instance of the class. + /// + /// + /// Serializer used to serialize the messages that should be sent. You might want to pick a + /// serializer which is reasonable fast. + /// + public MicroMessageEncoder(IMessageSerializer serializer) + { + if (serializer == null) throw new ArgumentNullException("serializer"); + + _serializer = serializer; + _bufferSlice = new BufferSlice(new byte[65535], 0, 65535); + } + + /// + /// Initializes a new instance of the class. + /// + /// + /// Serializer used to serialize the messages that should be sent. You might want to pick a + /// serializer which is reasonable fast. + /// + /// Used when sending information. + /// + /// bufferSlice; At least the header should fit in the buffer, and the header + /// can be up to 520 bytes in the current version. + /// + public MicroMessageEncoder(IMessageSerializer serializer, IBufferSlice bufferSlice) + { + if (serializer == null) throw new ArgumentNullException("serializer"); + if (bufferSlice == null) throw new ArgumentNullException("bufferSlice"); + if (bufferSlice.Capacity < 520) + throw new ArgumentOutOfRangeException("bufferSlice", bufferSlice.Capacity, + "At least the header should fit in the buffer, and the header can be up to 520 bytes in the current version"); + + + _serializer = serializer; + _bufferSlice = bufferSlice; + } + + + /// + /// Are about to send a new message + /// + /// Message to send + /// + /// Can be used to prepare the next message. for instance serialize it etc. + /// + /// Message is of a type that the encoder cannot handle. + public void Prepare(object message) + { + if (message == null) throw new ArgumentNullException("message"); + _message = message; + _headerIsSent = false; + } + + /// + /// Serialize message and sent it add it to the buffer + /// + /// Socket buffer + public void Send(ISocketBuffer args) + { + if (_bytesTransferred < _bytesEnqueued) + { + //TODO: Is this faster than moving the bytes to the beginning of the buffer and append more bytes? + args.SetBuffer(_bufferSlice.Buffer, _bufferSlice.Offset + _bytesTransferred, + _bytesEnqueued - _bytesTransferred); + return; + } + + if (!_headerIsSent) + { + var headerLength = CreateHeader(); + var bytesToWrite = (int) Math.Min(_bufferSlice.Capacity - headerLength, _bodyStream.Length); + _bodyStream.Read(_bufferSlice.Buffer, _bufferSlice.Offset + headerLength, bytesToWrite); + args.SetBuffer(_bufferSlice.Buffer, _bufferSlice.Offset, bytesToWrite + headerLength); + _bytesEnqueued = headerLength + bytesToWrite; + _bytesLeftToSend = headerLength + (int) _bodyStream.Length; + } + else + { + _bytesEnqueued = Math.Min(_bufferSlice.Capacity, _bytesLeftToSend); + _bodyStream.Read(_bufferSlice.Buffer, _bufferSlice.Offset, _bytesEnqueued); + args.SetBuffer(_bufferSlice.Buffer, _bufferSlice.Offset, _bytesEnqueued); + } + } + + /// + /// The previous has just completed. + /// + /// + /// + /// true if the message have been sent successfully; otherwise false. + /// + public bool OnSendCompleted(int bytesTransferred) + { + // Make sure that the header is sent + // required so that the Send() method can switch to the body state. + if (!_headerIsSent) + { + _headerSize -= bytesTransferred; + if (_headerSize <= 0) + { + _headerIsSent = true; + _headerSize = 0; + } + } + + _bytesTransferred = bytesTransferred; + _bytesLeftToSend -= bytesTransferred; + if (_bytesLeftToSend == 0) + Clear(); + + return _bytesLeftToSend == 0; + } + + /// + /// Remove everything used for the last message + /// + public void Clear() + { + _bytesEnqueued = 0; + _bytesTransferred = 0; + _bytesLeftToSend = 0; + + if (!ReferenceEquals(_bodyStream, _internalStream)) + { + //bodyStream is null for channels that connected + //but never sent a message. + if (_bodyStream != null) + _bodyStream.Close(); + _bodyStream = null; + } + else + _internalStream.SetLength(0); + + _headerIsSent = false; + _headerSize = 0; + _message = null; + } + + private int CreateHeader() + { + string contentType; + + if (_message is Stream) + { + _bodyStream = (Stream) _message; + contentType = "stream"; + } + else if (_message is byte[]) + { + var buffer = (byte[]) _message; + _bodyStream = new MemoryStream(buffer); + _bodyStream.SetLength(buffer.Length); + contentType = "byte[]"; + } + else + { + _bodyStream = _internalStream; + _serializer.Serialize(_message, _bodyStream, out contentType); + if (contentType == null) + contentType = _message.GetType().AssemblyQualifiedName; + if (contentType.Length > byte.MaxValue) + throw new InvalidOperationException( + "The AssemblyQualifiedName (type name) may not be larger than 255 characters. Your type: " + + _message.GetType().AssemblyQualifiedName); + } + + var sliceOffset = _bufferSlice.Offset; + var sliceBuffer = _bufferSlice.Buffer; + _bodyStream.Position = 0; + _headerSize = FixedHeaderLength + contentType.Length; + + BitConverter2.GetBytes((ushort) _headerSize, sliceBuffer, sliceOffset); + _bufferSlice.Buffer[sliceOffset + 2] = Version; + BitConverter2.GetBytes((int) _bodyStream.Length, sliceBuffer, sliceOffset + 2 + 1); + BitConverter2.GetBytes((byte) contentType.Length, sliceBuffer, sliceOffset + 2 + 1 + 4); + Encoding.UTF8.GetBytes(contentType, 0, contentType.Length, sliceBuffer, sliceOffset + 2 + 1 + 4 + 1); + + // the header length field is not included in _headerSize as it's a header prefix. + // hence the +2 + return _headerSize + 2; + } + } } \ No newline at end of file