Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix JS API deserializer #709

Merged
merged 5 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions src/NATS.Client.JetStream/Internal/NatsJSApiResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
using System.Runtime.CompilerServices;
using NATS.Client.JetStream.Models;

namespace NATS.Client.JetStream.Internal;

internal readonly struct NatsJSApiResult<T>
{
private readonly T? _value;
private readonly ApiError? _error;
private readonly Exception? _exception;

public NatsJSApiResult(T value)
{
_value = value;
_error = null;
_exception = null;
}

public NatsJSApiResult(ApiError error)
{
_value = default;
_error = error;
_exception = null;
}

public NatsJSApiResult(Exception exception)
{
_value = default;
_error = null;
_exception = exception;
}

public T Value => _value ?? ThrowValueIsNotSetException();

public ApiError Error => _error ?? ThrowErrorIsNotSetException();

public Exception Exception => _exception ?? ThrowExceptionIsNotSetException();

public bool Success => _error == null && _exception == null;

public bool HasError => _error != null;

public bool HasException => _exception != null;

public static implicit operator NatsJSApiResult<T>(T value) => new(value);

public static implicit operator NatsJSApiResult<T>(ApiError error) => new(error);

public static implicit operator NatsJSApiResult<T>(Exception exception) => new(exception);

private static T ThrowValueIsNotSetException() => throw CreateInvalidOperationException("Result value is not set");

private static ApiError ThrowErrorIsNotSetException() => throw CreateInvalidOperationException("Result error is not set");

private static Exception ThrowExceptionIsNotSetException() => throw CreateInvalidOperationException("Result exception is not set");

[MethodImpl(MethodImplOptions.NoInlining)]
private static Exception CreateInvalidOperationException(string message) => new InvalidOperationException(message);
}
35 changes: 32 additions & 3 deletions src/NATS.Client.JetStream/Internal/NatsJSJsonDocumentSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,38 @@

namespace NATS.Client.JetStream.Internal;

internal sealed class NatsJSJsonDocumentSerializer : INatsDeserialize<JsonDocument>
internal sealed class NatsJSJsonDocumentSerializer<T> : INatsDeserialize<NatsJSApiResult<T>>
{
public static readonly NatsJSJsonDocumentSerializer Default = new();
public static readonly NatsJSJsonDocumentSerializer<T> Default = new();

public JsonDocument? Deserialize(in ReadOnlySequence<byte> buffer) => buffer.Length == 0 ? default : JsonDocument.Parse(buffer);
public NatsJSApiResult<T> Deserialize(in ReadOnlySequence<byte> buffer)
{
if (buffer.Length == 0)
{
return new NatsJSException("Buffer is empty");
}

using var jsonDocument = JsonDocument.Parse(buffer);

if (jsonDocument.RootElement.TryGetProperty("error", out var errorElement))
{
var error = errorElement.Deserialize(JetStream.NatsJSJsonSerializerContext.Default.ApiError) ?? throw new NatsJSException("Can't parse JetStream error JSON payload");
return error;
}

var jsonTypeInfo = NatsJSJsonSerializerContext.DefaultContext.GetTypeInfo(typeof(T));
if (jsonTypeInfo == null)
{
return new NatsJSException($"Unknown response type {typeof(T)}");
}

var result = (T?)jsonDocument.RootElement.Deserialize(jsonTypeInfo);

if (result == null)
{
return new NatsJSException("Null result");
}

return result;
}
}
33 changes: 9 additions & 24 deletions src/NATS.Client.JetStream/NatsJSContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -309,13 +309,13 @@ internal async ValueTask<NatsResult<NatsJSResponse<TResponse>>> TryJSRequestAsyn
// Validator.ValidateObject(request, new ValidationContext(request));
}

await using var sub = await Connection.CreateRequestSubAsync<TRequest, JsonDocument>(
await using var sub = await Connection.CreateRequestSubAsync<TRequest, NatsJSApiResult<TResponse>>(
subject: subject,
data: request,
headers: default,
replyOpts: new NatsSubOpts { Timeout = Connection.Opts.RequestTimeout },
requestSerializer: NatsJSJsonSerializer<TRequest>.Default,
replySerializer: NatsJSJsonDocumentSerializer.Default,
replySerializer: NatsJSJsonDocumentSerializer<TResponse>.Default,
cancellationToken: cancellationToken)
.ConfigureAwait(false);

Expand All @@ -326,37 +326,22 @@ internal async ValueTask<NatsResult<NatsJSResponse<TResponse>>> TryJSRequestAsyn
return new NatsNoRespondersException();
}

if (msg.Data == null)
{
return new NatsJSException("No response data received");
}

// We need to determine what type we're deserializing into
// .NET 6 new APIs to the rescue: we can read the buffer once
// by deserializing into a document, inspect and using the new
// API deserialize to the final type from the document.
using var jsonDocument = msg.Data;

if (jsonDocument.RootElement.TryGetProperty("error", out var errorElement))
if (msg.Error is { } messageError)
{
var error = errorElement.Deserialize(JetStream.NatsJSJsonSerializerContext.Default.ApiError) ?? throw new NatsJSException("Can't parse JetStream error JSON payload");
return new NatsJSResponse<TResponse>(default, error);
return messageError;
}

var jsonTypeInfo = NatsJSJsonSerializerContext.DefaultContext.GetTypeInfo(typeof(TResponse));
if (jsonTypeInfo == null)
if (msg.Data.HasException)
{
return new NatsJSException($"Unknown response type {typeof(TResponse)}");
return msg.Data.Exception;
}

var response = (TResponse?)jsonDocument.RootElement.Deserialize(jsonTypeInfo);

if (msg.Error is { } messageError)
if (msg.Data.HasError)
{
return messageError;
return new NatsJSResponse<TResponse>(null, msg.Data.Error);
}

return new NatsJSResponse<TResponse>(response, default);
return new NatsJSResponse<TResponse>(msg.Data.Value, null);
}

if (sub is NatsSubBase { EndReason: NatsSubEndReason.Exception, Exception: not null } sb)
Expand Down
128 changes: 128 additions & 0 deletions tests/NATS.Client.JetStream.Tests/JetStreamApiSerializerTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
using System.Buffers;
using System.Text;
using NATS.Client.Core2.Tests;
using NATS.Client.JetStream.Internal;
using NATS.Client.JetStream.Models;
using JsonSerializer = System.Text.Json.JsonSerializer;

namespace NATS.Client.JetStream.Tests;

[Collection("nats-server")]
public class JetStreamApiSerializerTest
{
private readonly ITestOutputHelper _output;
private readonly NatsServerFixture _server;

public JetStreamApiSerializerTest(ITestOutputHelper output, NatsServerFixture server)
{
_output = output;
_server = server;
}

[Fact]
public async Task Should_respect_buffers_lifecycle()
{
await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url });
var prefix = _server.GetNextId();
var js = new NatsJSContext(nats);
var apiSubject = $"{prefix}.js.fake.api";
var dataSubject = $"{prefix}.data";

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var ctsDone = CancellationTokenSource.CreateLinkedTokenSource(cts.Token);

List<Task> tasks = new();

// Keep reader buffers busy with lots of data which should not be
// kept around and used by the JsonDocument deserializer.
// Data reader
tasks.Add(Task.Run(
async () =>
{
await foreach (var unused in nats.SubscribeAsync<string>(dataSubject, cancellationToken: ctsDone.Token))
{
}
},
cts.Token));

// Data writer
tasks.Add(Task.Run(
async () =>
{
var data = new string('x', 1024);
while (ctsDone.IsCancellationRequested == false)
{
await nats.PublishAsync(dataSubject, data, cancellationToken: ctsDone.Token);
}
},
cts.Token));

// Fake JS API responder
tasks.Add(Task.Run(
async () =>
{
var json = JsonSerializer.Serialize(new AccountInfoResponse { Consumers = 1234 });
await foreach (var msg in nats.SubscribeAsync<object>(apiSubject, cancellationToken: ctsDone.Token))
{
await msg.ReplyAsync(json, cancellationToken: cts.Token);
}
},
cts.Token));

// Fake JS API requester
tasks.Add(Task.Run(
async () =>
{
for (var i = 0; i < 100; i++)
{
if (ctsDone.IsCancellationRequested)
return;

try
{
var result = await js.TryJSRequestAsync<object, AccountInfoResponse>(apiSubject, null, ctsDone.Token);
}
catch
{
ctsDone.Cancel();
throw;
}
}

ctsDone.Cancel();
},
cts.Token));

try
{
await Task.WhenAll(tasks);
}
catch (TaskCanceledException)
{
}
}

[Fact]
public void Deserialize_value()
{
var serializer = NatsJSJsonDocumentSerializer<AccountInfoResponse>.Default;
var result = serializer.Deserialize(new ReadOnlySequence<byte>(Encoding.UTF8.GetBytes("""{"memory":1}""")));
result.Value.Memory.Should().Be(1);
}

[Fact]
public void Deserialize_empty_buffer()
{
var serializer = NatsJSJsonDocumentSerializer<AccountInfoResponse>.Default;
var result = serializer.Deserialize(ReadOnlySequence<byte>.Empty);
result.Exception.Message.Should().Be("Buffer is empty");
}

[Fact]
public void Deserialize_error()
{
var serializer = NatsJSJsonDocumentSerializer<AccountInfoResponse>.Default;
var result = serializer.Deserialize(new ReadOnlySequence<byte>(Encoding.UTF8.GetBytes("""{"error":{"code":2}}""")));
result.Error.Code.Should().Be(2);
}
}
Loading