Skip to content

Commit

Permalink
Added async methods for read/write operations for the underlying stre…
Browse files Browse the repository at this point in the history
…am . Closes #53. (#54)
  • Loading branch information
csteppe-kx authored Jun 15, 2021
1 parent efe6aff commit bf5ce9c
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 0 deletions.
85 changes: 85 additions & 0 deletions kx.Test/Connection/ConnectionAsyncWriteTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Moq;
using NUnit.Framework;

Expand Down Expand Up @@ -28,6 +30,24 @@ public void ConnectionWritesExpectedObjectParameterToClientStreamAsynchronously(
}
}

[Test]
public async Task ConnectionWritesExpectedObjectParameterToClientStreamAsync()
{
object expected = "param1";

using (MemoryStream s = new MemoryStream())
{
using (var connection = new c(s))
{
await connection.ksAsync(expected);

object result = connection.Deserialize(s.GetBuffer());

Assert.AreEqual(expected, result);
}
}
}

[Test]
public void ConnectionWritesExpectedStringExpressionToClientStreamAsynchronously()
{
Expand All @@ -47,6 +67,24 @@ public void ConnectionWritesExpectedStringExpressionToClientStreamAsynchronously
}
}

[Test]
public async Task ConnectionWritesExpectedStringExpressionToClientStreamAsync()
{
const string expected = "test_expression";

using (MemoryStream s = new MemoryStream())
{
using (var connection = new c(s))
{
await connection.ksAsync(expected);

object result = connection.Deserialize(s.GetBuffer());

Assert.AreEqual(expected, result);
}
}
}

[Test]
public void ConnectionWritesExpectedStringExpressionAndParameterToClientStreamAsynchronously()
{
Expand All @@ -70,6 +108,28 @@ public void ConnectionWritesExpectedStringExpressionAndParameterToClientStreamAs
}
}

[Test]
public async Task ConnectionWritesExpectedStringExpressionAndParameterToClientStreamAsync()
{
const string expression = "test_expression";
object parameter1 = 1;

using (MemoryStream s = new MemoryStream())
{
using (var connection = new c(s))
{
await connection.ksAsync(expression, parameter1);

object[] result = connection.Deserialize(s.GetBuffer()) as object[];

Assert.IsNotNull(result);
Assert.AreEqual(2, result.Length);
Assert.AreEqual(expression, new string(result[0] as char[]));
Assert.AreEqual(parameter1, result[1]);
}
}
}

[Test]
public void ConnectionWritesExpectedStringExpressionAndParametersToClientStreamAsynchronously()
{
Expand All @@ -95,6 +155,30 @@ public void ConnectionWritesExpectedStringExpressionAndParametersToClientStreamA
}
}

[Test]
public async Task ConnectionWritesExpectedStringExpressionAndParametersToClientStreamAsync()
{
const string expression = "test_expression";
object parameter1 = 1;
object parameter2 = 2;

using (MemoryStream s = new MemoryStream())
{
using (var connection = new c(s))
{
await connection.ksAsync(expression, parameter1, parameter2);

object[] result = connection.Deserialize(s.GetBuffer()) as object[];

Assert.IsNotNull(result);
Assert.AreEqual(3, result.Length);
Assert.AreEqual(expression, new string(result[0] as char[]));
Assert.AreEqual(parameter1, result[1]);
Assert.AreEqual(parameter2, result[2]);
}
}
}

private Mock<Stream> CreateTestStream(List<byte> bytesWritten)
{
Mock<Stream> testStream = new Mock<Stream>();
Expand All @@ -111,6 +195,7 @@ private Mock<Stream> CreateTestStream(List<byte> bytesWritten)
bytesWritten.AddRange(content);
});


return testStream;
}
}
Expand Down
147 changes: 147 additions & 0 deletions kx/c.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Net.Security;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

namespace kx
{
Expand Down Expand Up @@ -367,6 +368,18 @@ public object k()
return r();
}

/// <summary>
/// Reads an incoming message from the remote KDB+ process async.
/// </summary>
/// <returns>
/// Deserialised response to request.
/// </returns>
public async Task<object> kAsync()
{
await k0Async().ConfigureAwait(false);
return r();
}

/// <summary>
/// Sends a sync message request to the remote KDB+ process.
/// </summary>
Expand Down Expand Up @@ -483,6 +496,31 @@ public void k0()
ParseException();
}

/// <summary>
/// Waits for an async message and read header.
/// </summary>
public async Task k0Async()
{
_readBuffer = new byte[8];
await ReadAsync(_readBuffer).ConfigureAwait(false);

ParseHeader();
_readPosition = 4;
_readBuffer = new byte[ri() - 8];

await ReadAsync(_readBuffer).ConfigureAwait(false);

if (IsCompressed)
{
UnCompress();
}
else
{
_readPosition = 0;
}
ParseException();
}

/// <summary>
/// Sends an async message to the remote KDB+ process with a specified object parameter.
/// </summary>
Expand All @@ -492,6 +530,14 @@ public void ks(object x)
w(0, x);
}

/// <summary>
/// Sends an async message to the remote KDB+ process with a specified object parameter.
/// </summary>
/// <param name="x">The object parameter.</param>
public async Task ksAsync(object x)
{
await wAsync(0, x).ConfigureAwait(false);
}
/// <summary>
/// Sends an async message to the remote KDB+ process with a specified expression.
/// </summary>
Expand All @@ -501,6 +547,14 @@ public void ks(string s)
w(0, s.ToCharArray());
}

/// <summary>
/// Sends an async message to the remote KDB+ process with a specified expression.
/// </summary>
/// <param name="s">The expression to send.</param>
public async Task ksAsync(string s)
{
await wAsync(0, s.ToCharArray()).ConfigureAwait(false);
}
/// <summary>
/// Sends an async message to the remote KDB+ process with a specified expression
/// and object parameter.
Expand All @@ -518,6 +572,23 @@ public void ks(string s, object x)
w(0, array);
}

/// <summary>
/// Sends an async message to the remote KDB+ process with a specified expression
/// and object parameter.
/// </summary>
/// <param name="s">The expression to send.</param>
/// <param name="x">The object parameter to send.</param>
public async Task ksAsync(string s, object x)
{
object[] array = new object[]
{
s.ToCharArray(),
x
};

await wAsync(0, array).ConfigureAwait(false);
}

/// <summary>
/// Sends an async message to the remote KDB+ process with a specified expression
/// and object parameters.
Expand All @@ -537,6 +608,25 @@ public void ks(string s, object x, object y)
w(0, array);
}

/// <summary>
/// Sends an async message to the remote KDB+ process with a specified expression
/// and object parameters.
/// </summary>
/// <param name="s">The expression to send.</param>
/// <param name="x">The first object parameter to send.</param>
/// <param name="y">The second object parameter to send.</param>
public async Task ksAsync(string s, object x, object y)
{
object[] array = new object[]
{
s.ToCharArray(),
x,
y
};

await wAsync(0, array).ConfigureAwait(false);
}

/// <summary>
/// Sends an async message to the remote KDB+ process with a specified object parameter.
/// </summary>
Expand All @@ -546,6 +636,14 @@ public void kn(object x)
w(1, x);
}

/// <summary>
/// Sends an async message to the remote KDB+ process with a specified object parameter.
/// </summary>
/// <param name="x">The object parameter.</param>
public async Task knAsync(object x)
{
await wAsync(1, x).ConfigureAwait(false);
}
/// <summary>
/// Sends a response message to the remote KDB+ process.
/// </summary>
Expand All @@ -557,6 +655,18 @@ public void kr(object x)
{
w(2, x);
}

/// <summary>
/// Sends a response message to the remote KDB+ process.
/// </summary>
/// <param name="x">The response message to send.</param>
/// <remarks>
/// This should be called only during processing of an incoming sync message.
/// </remarks>
public async Task krAsync(object x)
{
await wAsync(2, x).ConfigureAwait(false);
}

/// <summary>
/// Serialises a specified object as a byte-array
Expand Down Expand Up @@ -682,6 +792,16 @@ protected void Write(byte[] bytes, int number)
_clientStream.Write(bytes, 0, number);
}

/// <summary>
/// Writes a specified byte array directly to the underlying client stream asynchronous.
/// </summary>
/// <param name="bytes">The byte array to be writtern to the client stream.</param>
/// <param name="number">The number of bytes to be written to the client stream.</param>
protected async Task WriteAsync(byte[] bytes, int number)
{
await _clientStream.WriteAsync(bytes, 0, number).ConfigureAwait(false);
}

/// <summary>
/// Gets the null object for the specified <see cref="Type"/>.
/// </summary>
Expand Down Expand Up @@ -1707,6 +1827,12 @@ private void w(int i, object x)
_clientStream.Write(buffer, 0, buffer.Length);
}

private async Task wAsync(int i, object x)
{
byte[] buffer = Serialize(i, x);
await _clientStream.WriteAsync(buffer, 0, buffer.Length).ConfigureAwait(false);
}

private void read(byte[] b)
{
int k = 0;
Expand All @@ -1728,6 +1854,27 @@ private void read(byte[] b)
throw new KException("read");
}

private async Task ReadAsync(byte[] b)
{
int k = 0;
int j = b.Length;
while (true)
{
if (k < j)
{
int i;
if ((i = await _clientStream.ReadAsync(b, k, Math.Min(_maxBufferSize, j - k)).ConfigureAwait(false)) == 0)
{
break;
}
k += i;
continue;
}
return;
}
throw new KException("read");
}

private static int ns(string s)
{
int j = s.IndexOf('\0');
Expand Down

0 comments on commit bf5ce9c

Please sign in to comment.