From bf5ce9cb9807f4de530c597b4297eae8d331a173 Mon Sep 17 00:00:00 2001 From: csteppe-kx <85675088+csteppe-kx@users.noreply.github.com> Date: Tue, 15 Jun 2021 12:37:20 +0100 Subject: [PATCH] Added async methods for read/write operations for the underlying stream . Closes #53. (#54) --- .../Connection/ConnectionAsyncWriteTests.cs | 85 ++++++++++ kx/c.cs | 147 ++++++++++++++++++ 2 files changed, 232 insertions(+) diff --git a/kx.Test/Connection/ConnectionAsyncWriteTests.cs b/kx.Test/Connection/ConnectionAsyncWriteTests.cs index c92b29d..a7b4df3 100644 --- a/kx.Test/Connection/ConnectionAsyncWriteTests.cs +++ b/kx.Test/Connection/ConnectionAsyncWriteTests.cs @@ -1,6 +1,8 @@ using System.Collections.Generic; using System.IO; using System.Linq; +using System.Text; +using System.Threading.Tasks; using Moq; using NUnit.Framework; @@ -28,6 +30,24 @@ public void ConnectionWritesExpectedObjectParameterToClientStreamAsynchronously( } } + [Test] + public async Task ConnectionWritesExpectedObjectParameterToClientStreamAsync() + { + object expected = "param1"; + + using (MemoryStream s = new MemoryStream()) + { + using (var connection = new c(s)) + { + await connection.ksAsync(expected); + + object result = connection.Deserialize(s.GetBuffer()); + + Assert.AreEqual(expected, result); + } + } + } + [Test] public void ConnectionWritesExpectedStringExpressionToClientStreamAsynchronously() { @@ -47,6 +67,24 @@ public void ConnectionWritesExpectedStringExpressionToClientStreamAsynchronously } } + [Test] + public async Task ConnectionWritesExpectedStringExpressionToClientStreamAsync() + { + const string expected = "test_expression"; + + using (MemoryStream s = new MemoryStream()) + { + using (var connection = new c(s)) + { + await connection.ksAsync(expected); + + object result = connection.Deserialize(s.GetBuffer()); + + Assert.AreEqual(expected, result); + } + } + } + [Test] public void ConnectionWritesExpectedStringExpressionAndParameterToClientStreamAsynchronously() { @@ -70,6 +108,28 @@ public void ConnectionWritesExpectedStringExpressionAndParameterToClientStreamAs } } + [Test] + public async Task ConnectionWritesExpectedStringExpressionAndParameterToClientStreamAsync() + { + const string expression = "test_expression"; + object parameter1 = 1; + + using (MemoryStream s = new MemoryStream()) + { + using (var connection = new c(s)) + { + await connection.ksAsync(expression, parameter1); + + object[] result = connection.Deserialize(s.GetBuffer()) as object[]; + + Assert.IsNotNull(result); + Assert.AreEqual(2, result.Length); + Assert.AreEqual(expression, new string(result[0] as char[])); + Assert.AreEqual(parameter1, result[1]); + } + } + } + [Test] public void ConnectionWritesExpectedStringExpressionAndParametersToClientStreamAsynchronously() { @@ -95,6 +155,30 @@ public void ConnectionWritesExpectedStringExpressionAndParametersToClientStreamA } } + [Test] + public async Task ConnectionWritesExpectedStringExpressionAndParametersToClientStreamAsync() + { + const string expression = "test_expression"; + object parameter1 = 1; + object parameter2 = 2; + + using (MemoryStream s = new MemoryStream()) + { + using (var connection = new c(s)) + { + await connection.ksAsync(expression, parameter1, parameter2); + + object[] result = connection.Deserialize(s.GetBuffer()) as object[]; + + Assert.IsNotNull(result); + Assert.AreEqual(3, result.Length); + Assert.AreEqual(expression, new string(result[0] as char[])); + Assert.AreEqual(parameter1, result[1]); + Assert.AreEqual(parameter2, result[2]); + } + } + } + private Mock CreateTestStream(List bytesWritten) { Mock testStream = new Mock(); @@ -111,6 +195,7 @@ private Mock CreateTestStream(List bytesWritten) bytesWritten.AddRange(content); }); + return testStream; } } diff --git a/kx/c.cs b/kx/c.cs index 9d7e77d..34d2d2f 100644 --- a/kx/c.cs +++ b/kx/c.cs @@ -4,6 +4,7 @@ using System.Net.Security; using System.Net.Sockets; using System.Text; +using System.Threading.Tasks; namespace kx { @@ -367,6 +368,18 @@ public object k() return r(); } + /// + /// Reads an incoming message from the remote KDB+ process async. + /// + /// + /// Deserialised response to request. + /// + public async Task kAsync() + { + await k0Async().ConfigureAwait(false); + return r(); + } + /// /// Sends a sync message request to the remote KDB+ process. /// @@ -483,6 +496,31 @@ public void k0() ParseException(); } + /// + /// Waits for an async message and read header. + /// + public async Task k0Async() + { + _readBuffer = new byte[8]; + await ReadAsync(_readBuffer).ConfigureAwait(false); + + ParseHeader(); + _readPosition = 4; + _readBuffer = new byte[ri() - 8]; + + await ReadAsync(_readBuffer).ConfigureAwait(false); + + if (IsCompressed) + { + UnCompress(); + } + else + { + _readPosition = 0; + } + ParseException(); + } + /// /// Sends an async message to the remote KDB+ process with a specified object parameter. /// @@ -492,6 +530,14 @@ public void ks(object x) w(0, x); } + /// + /// Sends an async message to the remote KDB+ process with a specified object parameter. + /// + /// The object parameter. + public async Task ksAsync(object x) + { + await wAsync(0, x).ConfigureAwait(false); + } /// /// Sends an async message to the remote KDB+ process with a specified expression. /// @@ -501,6 +547,14 @@ public void ks(string s) w(0, s.ToCharArray()); } + /// + /// Sends an async message to the remote KDB+ process with a specified expression. + /// + /// The expression to send. + public async Task ksAsync(string s) + { + await wAsync(0, s.ToCharArray()).ConfigureAwait(false); + } /// /// Sends an async message to the remote KDB+ process with a specified expression /// and object parameter. @@ -518,6 +572,23 @@ public void ks(string s, object x) w(0, array); } + /// + /// Sends an async message to the remote KDB+ process with a specified expression + /// and object parameter. + /// + /// The expression to send. + /// The object parameter to send. + public async Task ksAsync(string s, object x) + { + object[] array = new object[] + { + s.ToCharArray(), + x + }; + + await wAsync(0, array).ConfigureAwait(false); + } + /// /// Sends an async message to the remote KDB+ process with a specified expression /// and object parameters. @@ -537,6 +608,25 @@ public void ks(string s, object x, object y) w(0, array); } + /// + /// Sends an async message to the remote KDB+ process with a specified expression + /// and object parameters. + /// + /// The expression to send. + /// The first object parameter to send. + /// The second object parameter to send. + public async Task ksAsync(string s, object x, object y) + { + object[] array = new object[] + { + s.ToCharArray(), + x, + y + }; + + await wAsync(0, array).ConfigureAwait(false); + } + /// /// Sends an async message to the remote KDB+ process with a specified object parameter. /// @@ -546,6 +636,14 @@ public void kn(object x) w(1, x); } + /// + /// Sends an async message to the remote KDB+ process with a specified object parameter. + /// + /// The object parameter. + public async Task knAsync(object x) + { + await wAsync(1, x).ConfigureAwait(false); + } /// /// Sends a response message to the remote KDB+ process. /// @@ -557,6 +655,18 @@ public void kr(object x) { w(2, x); } + + /// + /// Sends a response message to the remote KDB+ process. + /// + /// The response message to send. + /// + /// This should be called only during processing of an incoming sync message. + /// + public async Task krAsync(object x) + { + await wAsync(2, x).ConfigureAwait(false); + } /// /// Serialises a specified object as a byte-array @@ -682,6 +792,16 @@ protected void Write(byte[] bytes, int number) _clientStream.Write(bytes, 0, number); } + /// + /// Writes a specified byte array directly to the underlying client stream asynchronous. + /// + /// The byte array to be writtern to the client stream. + /// The number of bytes to be written to the client stream. + protected async Task WriteAsync(byte[] bytes, int number) + { + await _clientStream.WriteAsync(bytes, 0, number).ConfigureAwait(false); + } + /// /// Gets the null object for the specified . /// @@ -1707,6 +1827,12 @@ private void w(int i, object x) _clientStream.Write(buffer, 0, buffer.Length); } + private async Task wAsync(int i, object x) + { + byte[] buffer = Serialize(i, x); + await _clientStream.WriteAsync(buffer, 0, buffer.Length).ConfigureAwait(false); + } + private void read(byte[] b) { int k = 0; @@ -1728,6 +1854,27 @@ private void read(byte[] b) throw new KException("read"); } + private async Task ReadAsync(byte[] b) + { + int k = 0; + int j = b.Length; + while (true) + { + if (k < j) + { + int i; + if ((i = await _clientStream.ReadAsync(b, k, Math.Min(_maxBufferSize, j - k)).ConfigureAwait(false)) == 0) + { + break; + } + k += i; + continue; + } + return; + } + throw new KException("read"); + } + private static int ns(string s) { int j = s.IndexOf('\0');