From 6bf76e222457e958d04a29f72096f22c963b51a0 Mon Sep 17 00:00:00 2001 From: mtmk Date: Mon, 13 Jan 2025 14:46:18 +0000 Subject: [PATCH] Fix JS API deserializer (#709) * Fix JS API deserializer * Format * Fix JetStream API serialization with typed results Replace JsonDocument-based responses with strongly-typed `NatsJSApiResult` for improved type safety and error handling. Added new deserialization logic and tests to cover valid responses, errors, and edge cases like empty buffers. * Format --- .../Internal/NatsJSApiResult.cs | 59 ++++++++ .../Internal/NatsJSJsonDocumentSerializer.cs | 35 ++++- src/NATS.Client.JetStream/NatsJSContext.cs | 33 ++--- .../JetStreamApiSerializerTest.cs | 128 ++++++++++++++++++ 4 files changed, 228 insertions(+), 27 deletions(-) create mode 100644 src/NATS.Client.JetStream/Internal/NatsJSApiResult.cs create mode 100644 tests/NATS.Client.JetStream.Tests/JetStreamApiSerializerTest.cs diff --git a/src/NATS.Client.JetStream/Internal/NatsJSApiResult.cs b/src/NATS.Client.JetStream/Internal/NatsJSApiResult.cs new file mode 100644 index 000000000..0281d61a0 --- /dev/null +++ b/src/NATS.Client.JetStream/Internal/NatsJSApiResult.cs @@ -0,0 +1,59 @@ +using System.Runtime.CompilerServices; +using NATS.Client.JetStream.Models; + +namespace NATS.Client.JetStream.Internal; + +internal readonly struct NatsJSApiResult +{ + private readonly T? _value; + private readonly ApiError? _error; + private readonly Exception? _exception; + + public NatsJSApiResult(T value) + { + _value = value; + _error = null; + _exception = null; + } + + public NatsJSApiResult(ApiError error) + { + _value = default; + _error = error; + _exception = null; + } + + public NatsJSApiResult(Exception exception) + { + _value = default; + _error = null; + _exception = exception; + } + + public T Value => _value ?? ThrowValueIsNotSetException(); + + public ApiError Error => _error ?? ThrowErrorIsNotSetException(); + + public Exception Exception => _exception ?? ThrowExceptionIsNotSetException(); + + public bool Success => _error == null && _exception == null; + + public bool HasError => _error != null; + + public bool HasException => _exception != null; + + public static implicit operator NatsJSApiResult(T value) => new(value); + + public static implicit operator NatsJSApiResult(ApiError error) => new(error); + + public static implicit operator NatsJSApiResult(Exception exception) => new(exception); + + private static T ThrowValueIsNotSetException() => throw CreateInvalidOperationException("Result value is not set"); + + private static ApiError ThrowErrorIsNotSetException() => throw CreateInvalidOperationException("Result error is not set"); + + private static Exception ThrowExceptionIsNotSetException() => throw CreateInvalidOperationException("Result exception is not set"); + + [MethodImpl(MethodImplOptions.NoInlining)] + private static Exception CreateInvalidOperationException(string message) => new InvalidOperationException(message); +} diff --git a/src/NATS.Client.JetStream/Internal/NatsJSJsonDocumentSerializer.cs b/src/NATS.Client.JetStream/Internal/NatsJSJsonDocumentSerializer.cs index 5f930d114..c022b47a0 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSJsonDocumentSerializer.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSJsonDocumentSerializer.cs @@ -4,9 +4,38 @@ namespace NATS.Client.JetStream.Internal; -internal sealed class NatsJSJsonDocumentSerializer : INatsDeserialize +internal sealed class NatsJSJsonDocumentSerializer : INatsDeserialize> { - public static readonly NatsJSJsonDocumentSerializer Default = new(); + public static readonly NatsJSJsonDocumentSerializer Default = new(); - public JsonDocument? Deserialize(in ReadOnlySequence buffer) => buffer.Length == 0 ? default : JsonDocument.Parse(buffer); + public NatsJSApiResult Deserialize(in ReadOnlySequence buffer) + { + if (buffer.Length == 0) + { + return new NatsJSException("Buffer is empty"); + } + + using var jsonDocument = JsonDocument.Parse(buffer); + + if (jsonDocument.RootElement.TryGetProperty("error", out var errorElement)) + { + var error = errorElement.Deserialize(JetStream.NatsJSJsonSerializerContext.Default.ApiError) ?? throw new NatsJSException("Can't parse JetStream error JSON payload"); + return error; + } + + var jsonTypeInfo = NatsJSJsonSerializerContext.DefaultContext.GetTypeInfo(typeof(T)); + if (jsonTypeInfo == null) + { + return new NatsJSException($"Unknown response type {typeof(T)}"); + } + + var result = (T?)jsonDocument.RootElement.Deserialize(jsonTypeInfo); + + if (result == null) + { + return new NatsJSException("Null result"); + } + + return result; + } } diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index 8876a200c..343991647 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -309,13 +309,13 @@ internal async ValueTask>> TryJSRequestAsyn // Validator.ValidateObject(request, new ValidationContext(request)); } - await using var sub = await Connection.CreateRequestSubAsync( + await using var sub = await Connection.CreateRequestSubAsync>( subject: subject, data: request, headers: default, replyOpts: new NatsSubOpts { Timeout = Connection.Opts.RequestTimeout }, requestSerializer: NatsJSJsonSerializer.Default, - replySerializer: NatsJSJsonDocumentSerializer.Default, + replySerializer: NatsJSJsonDocumentSerializer.Default, cancellationToken: cancellationToken) .ConfigureAwait(false); @@ -326,37 +326,22 @@ internal async ValueTask>> TryJSRequestAsyn return new NatsNoRespondersException(); } - if (msg.Data == null) - { - return new NatsJSException("No response data received"); - } - - // We need to determine what type we're deserializing into - // .NET 6 new APIs to the rescue: we can read the buffer once - // by deserializing into a document, inspect and using the new - // API deserialize to the final type from the document. - using var jsonDocument = msg.Data; - - if (jsonDocument.RootElement.TryGetProperty("error", out var errorElement)) + if (msg.Error is { } messageError) { - var error = errorElement.Deserialize(JetStream.NatsJSJsonSerializerContext.Default.ApiError) ?? throw new NatsJSException("Can't parse JetStream error JSON payload"); - return new NatsJSResponse(default, error); + return messageError; } - var jsonTypeInfo = NatsJSJsonSerializerContext.DefaultContext.GetTypeInfo(typeof(TResponse)); - if (jsonTypeInfo == null) + if (msg.Data.HasException) { - return new NatsJSException($"Unknown response type {typeof(TResponse)}"); + return msg.Data.Exception; } - var response = (TResponse?)jsonDocument.RootElement.Deserialize(jsonTypeInfo); - - if (msg.Error is { } messageError) + if (msg.Data.HasError) { - return messageError; + return new NatsJSResponse(null, msg.Data.Error); } - return new NatsJSResponse(response, default); + return new NatsJSResponse(msg.Data.Value, null); } if (sub is NatsSubBase { EndReason: NatsSubEndReason.Exception, Exception: not null } sb) diff --git a/tests/NATS.Client.JetStream.Tests/JetStreamApiSerializerTest.cs b/tests/NATS.Client.JetStream.Tests/JetStreamApiSerializerTest.cs new file mode 100644 index 000000000..06d86cb6a --- /dev/null +++ b/tests/NATS.Client.JetStream.Tests/JetStreamApiSerializerTest.cs @@ -0,0 +1,128 @@ +using System.Buffers; +using System.Text; +using NATS.Client.Core2.Tests; +using NATS.Client.JetStream.Internal; +using NATS.Client.JetStream.Models; +using JsonSerializer = System.Text.Json.JsonSerializer; + +namespace NATS.Client.JetStream.Tests; + +[Collection("nats-server")] +public class JetStreamApiSerializerTest +{ + private readonly ITestOutputHelper _output; + private readonly NatsServerFixture _server; + + public JetStreamApiSerializerTest(ITestOutputHelper output, NatsServerFixture server) + { + _output = output; + _server = server; + } + + [Fact] + public async Task Should_respect_buffers_lifecycle() + { + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); + var prefix = _server.GetNextId(); + var js = new NatsJSContext(nats); + var apiSubject = $"{prefix}.js.fake.api"; + var dataSubject = $"{prefix}.data"; + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + var ctsDone = CancellationTokenSource.CreateLinkedTokenSource(cts.Token); + + List tasks = new(); + + // Keep reader buffers busy with lots of data which should not be + // kept around and used by the JsonDocument deserializer. + // Data reader + tasks.Add(Task.Run( + async () => + { + await foreach (var unused in nats.SubscribeAsync(dataSubject, cancellationToken: ctsDone.Token)) + { + } + }, + cts.Token)); + + // Data writer + tasks.Add(Task.Run( + async () => + { + var data = new string('x', 1024); + while (ctsDone.IsCancellationRequested == false) + { + await nats.PublishAsync(dataSubject, data, cancellationToken: ctsDone.Token); + } + }, + cts.Token)); + + // Fake JS API responder + tasks.Add(Task.Run( + async () => + { + var json = JsonSerializer.Serialize(new AccountInfoResponse { Consumers = 1234 }); + await foreach (var msg in nats.SubscribeAsync(apiSubject, cancellationToken: ctsDone.Token)) + { + await msg.ReplyAsync(json, cancellationToken: cts.Token); + } + }, + cts.Token)); + + // Fake JS API requester + tasks.Add(Task.Run( + async () => + { + for (var i = 0; i < 100; i++) + { + if (ctsDone.IsCancellationRequested) + return; + + try + { + var result = await js.TryJSRequestAsync(apiSubject, null, ctsDone.Token); + } + catch + { + ctsDone.Cancel(); + throw; + } + } + + ctsDone.Cancel(); + }, + cts.Token)); + + try + { + await Task.WhenAll(tasks); + } + catch (TaskCanceledException) + { + } + } + + [Fact] + public void Deserialize_value() + { + var serializer = NatsJSJsonDocumentSerializer.Default; + var result = serializer.Deserialize(new ReadOnlySequence(Encoding.UTF8.GetBytes("""{"memory":1}"""))); + result.Value.Memory.Should().Be(1); + } + + [Fact] + public void Deserialize_empty_buffer() + { + var serializer = NatsJSJsonDocumentSerializer.Default; + var result = serializer.Deserialize(ReadOnlySequence.Empty); + result.Exception.Message.Should().Be("Buffer is empty"); + } + + [Fact] + public void Deserialize_error() + { + var serializer = NatsJSJsonDocumentSerializer.Default; + var result = serializer.Deserialize(new ReadOnlySequence(Encoding.UTF8.GetBytes("""{"error":{"code":2}}"""))); + result.Error.Code.Should().Be(2); + } +}