From 579a3e838ddeb38e378d0a05eededc5773d6a6ed Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Tue, 9 Jul 2024 12:00:11 +0900 Subject: [PATCH] Propagate HTTP/2 error codes to grpc-dotnet --- native/Cargo.lock | 1 + native/yaha_native/Cargo.toml | 1 + native/yaha_native/src/binding.rs | 49 ++++++++++++++----- native/yaha_native/src/context.rs | 2 +- .../NativeHttpHandlerCore.cs | 8 +-- .../NativeMethods.Uwp.g.cs | 2 +- src/YetAnotherHttpHandler/NativeMethods.g.cs | 2 +- .../NativeMethodsFuncPtr.g.cs | 2 +- src/YetAnotherHttpHandler/RequestContext.cs | 4 +- src/YetAnotherHttpHandler/ResponseContext.cs | 15 ++++-- .../Shims/Http2StreamException.cs | 15 ++++++ .../Shims/Http2StreamException.cs.meta | 11 +++++ .../Http2TestBase.cs | 33 +++++++++++++ .../Protos/greet.proto | 8 +++ .../TestServerForHttp2.cs | 11 ++++- 15 files changed, 137 insertions(+), 27 deletions(-) create mode 100644 src/YetAnotherHttpHandler/Shims/Http2StreamException.cs create mode 100644 src/YetAnotherHttpHandler/Shims/Http2StreamException.cs.meta diff --git a/native/Cargo.lock b/native/Cargo.lock index 1e904dc..e025fac 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -1148,6 +1148,7 @@ dependencies = [ "csbindgen", "futures-channel", "futures-util", + "h2", "http-body-util", "hyper", "hyper-rustls", diff --git a/native/yaha_native/Cargo.toml b/native/yaha_native/Cargo.toml index 5de4c32..ed89248 100644 --- a/native/yaha_native/Cargo.toml +++ b/native/yaha_native/Cargo.toml @@ -13,6 +13,7 @@ crate-type = ["staticlib", "cdylib"] csbindgen = "1.9.1" [dependencies] +h2 = { version = "0.4.5" } tokio = { version = "1", features = ["full"] } tokio-util = { version = "0.7.10" } hyper = { version = "1.3.1", features = ["client", "http1", "http2"] } diff --git a/native/yaha_native/src/binding.rs b/native/yaha_native/src/binding.rs index 34c29a2..eb37dfe 100644 --- a/native/yaha_native/src/binding.rs +++ b/native/yaha_native/src/binding.rs @@ -1,9 +1,5 @@ use std::{ - cell::RefCell, - num::NonZeroIsize, - ptr::null, - sync::{Arc, Mutex}, - time::Duration, + cell::RefCell, error::Error, num::NonZeroIsize, ptr::null, sync::{Arc, Mutex}, time::Duration }; use http_body_util::{combinators::BoxBody, BodyExt}; @@ -70,7 +66,7 @@ pub extern "C" fn yaha_init_context( version: YahaHttpVersion, ), on_receive: extern "C" fn(req_seq: i32, state: NonZeroIsize, length: usize, buf: *const u8), - on_complete: extern "C" fn(req_seq: i32, state: NonZeroIsize, reason: CompletionReason), + on_complete: extern "C" fn(req_seq: i32, state: NonZeroIsize, reason: CompletionReason, h2_error_code: u32), ) -> *mut YahaNativeContext { let runtime_ctx = YahaNativeRuntimeContextInternal::from_raw_context(runtime_ctx); let ctx = Box::new(YahaNativeContextInternal::new( @@ -89,7 +85,7 @@ pub extern "C" fn yaha_dispose_context(ctx: *mut YahaNativeContext) { ctx.on_receive = _sentinel_on_receive; ctx.on_status_code_and_headers_receive = _sentinel_on_status_code_and_headers_receive; } -extern "C" fn _sentinel_on_complete(_: i32, _: NonZeroIsize, _: CompletionReason) { panic!("The context has already disposed: on_complete"); } +extern "C" fn _sentinel_on_complete(_: i32, _: NonZeroIsize, _: CompletionReason, _: u32) { panic!("The context has already disposed: on_complete"); } extern "C" fn _sentinel_on_receive(_: i32, _: NonZeroIsize, _: usize, _: *const u8) { panic!("The context has already disposed: on_receive"); } extern "C" fn _sentinel_on_status_code_and_headers_receive(_: i32, _: NonZeroIsize, _: i32, _: YahaHttpVersion) { panic!("The context has already disposed: on_status_code_and_headers_receive"); } @@ -476,14 +472,14 @@ pub extern "C" fn yaha_request_begin( LAST_ERROR.with(|v| { *v.borrow_mut() = Some("The client has not been built. You need to build it before sending the request. ".to_string()); }); - (ctx.on_complete)(seq, state, CompletionReason::Error); + (ctx.on_complete)(seq, state, CompletionReason::Error, 0); return; } // Send a request and wait for response status and headers. let res = select! { _ = cancellation_token.cancelled() => { - (ctx.on_complete)(seq, state, CompletionReason::Aborted); + (ctx.on_complete)(seq, state, CompletionReason::Aborted, 0); return; } res = ctx.client.as_ref().unwrap().request(req) => { @@ -491,7 +487,26 @@ pub extern "C" fn yaha_request_begin( LAST_ERROR.with(|v| { *v.borrow_mut() = Some(err.to_string()); }); - (ctx.on_complete)(seq, state, CompletionReason::Error); + + // If the inner error is `hyper::Error`, use its error message instead. + let error_inner = err.source() + .and_then(|e| e.downcast_ref::()); + + if let Some(err) = error_inner { + LAST_ERROR.with(|v| { + *v.borrow_mut() = Some(err.to_string()); + }); + } + + // If the `hyper::Error` has `h2::Error` as inner error, the error has HTTP/2 error code. + let reason = error_inner + .and_then(|e| e.source()) + .and_then(|e| e.downcast_ref::()) + .and_then(|e| e.reason()); + + let rc = reason.map(|r| u32::from(r)); + + (ctx.on_complete)(seq, state, CompletionReason::Error, rc.unwrap_or_default()); return; } else { res @@ -532,7 +547,7 @@ pub extern "C" fn yaha_request_begin( while !body.is_end_stream() { select! { _ = cancellation_token.cancelled() => { - (ctx.on_complete)(seq, state, CompletionReason::Aborted); + (ctx.on_complete)(seq, state, CompletionReason::Aborted, 0); return; } received = body.frame() => { @@ -572,7 +587,15 @@ pub extern "C" fn yaha_request_begin( LAST_ERROR.with(|v| { *v.borrow_mut() = Some(err.to_string()); }); - (ctx.on_complete)(seq, state, CompletionReason::Error); + + // If the `hyper::Error` has `h2::Error` as inner error, the error has HTTP/2 error code. + let reason = err.source() + .and_then(|e| e.downcast_ref::()) + .and_then(|e| e.reason()); + + let rc = reason.map(|r| u32::from(r)); + + (ctx.on_complete)(seq, state, CompletionReason::Error, rc.unwrap_or_default()); return; } } @@ -591,7 +614,7 @@ pub extern "C" fn yaha_request_begin( req_ctx.try_complete(); } - (ctx.on_complete)(seq, state, CompletionReason::Success); + (ctx.on_complete)(seq, state, CompletionReason::Success, 0); { let mut req_ctx = req_ctx.lock().unwrap(); diff --git a/native/yaha_native/src/context.rs b/native/yaha_native/src/context.rs index da4909f..341097a 100644 --- a/native/yaha_native/src/context.rs +++ b/native/yaha_native/src/context.rs @@ -30,7 +30,7 @@ use crate::primitives::{YahaHttpVersion, CompletionReason}; type OnStatusCodeAndHeadersReceive = extern "C" fn(req_seq: i32, state: NonZeroIsize, status_code: i32, version: YahaHttpVersion); type OnReceive = extern "C" fn(req_seq: i32, state: NonZeroIsize, length: usize, buf: *const u8); -type OnComplete = extern "C" fn(req_seq: i32, state: NonZeroIsize, reason: CompletionReason); +type OnComplete = extern "C" fn(req_seq: i32, state: NonZeroIsize, reason: CompletionReason, h2_error_code: u32); pub struct YahaNativeRuntimeContext; pub struct YahaNativeRuntimeContextInternal { diff --git a/src/YetAnotherHttpHandler/NativeHttpHandlerCore.cs b/src/YetAnotherHttpHandler/NativeHttpHandlerCore.cs index a36b2f2..e693732 100644 --- a/src/YetAnotherHttpHandler/NativeHttpHandlerCore.cs +++ b/src/YetAnotherHttpHandler/NativeHttpHandlerCore.cs @@ -402,9 +402,9 @@ private static unsafe void OnReceive(int reqSeq, IntPtr state, UIntPtr length, b [UnmanagedCallersOnly(CallConvs = new[] { typeof(CallConvCdecl) })] #endif [MonoPInvokeCallback(typeof(NativeMethods.yaha_init_context_on_complete_delegate))] - private static unsafe void OnComplete(int reqSeq, IntPtr state, CompletionReason reason) + private static unsafe void OnComplete(int reqSeq, IntPtr state, CompletionReason reason, uint h2ErrorCode) { - if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Info($"[ReqSeq:{reqSeq}:State:0x{state:X}] Response completed: Reason={reason}"); + if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Info($"[ReqSeq:{reqSeq}:State:0x{state:X}] Response completed: Reason={reason}; H2ErrorCode=0x{h2ErrorCode:x}"); var requestContext = RequestContext.FromHandle(state); try @@ -459,7 +459,7 @@ private static unsafe void OnComplete(int reqSeq, IntPtr state, CompletionReason var buf = NativeMethods.yaha_get_last_error(); try { - requestContext.CompleteAsFailed(UnsafeUtilities.GetStringFromUtf8Bytes(buf->AsSpan())); + requestContext.CompleteAsFailed(UnsafeUtilities.GetStringFromUtf8Bytes(buf->AsSpan()), h2ErrorCode); } catch { @@ -468,7 +468,7 @@ private static unsafe void OnComplete(int reqSeq, IntPtr state, CompletionReason } else { - requestContext.CompleteAsFailed("Canceled"); + requestContext.CompleteAsFailed("Canceled", 0); } } finally diff --git a/src/YetAnotherHttpHandler/NativeMethods.Uwp.g.cs b/src/YetAnotherHttpHandler/NativeMethods.Uwp.g.cs index 841ddf7..ca46131 100644 --- a/src/YetAnotherHttpHandler/NativeMethods.Uwp.g.cs +++ b/src/YetAnotherHttpHandler/NativeMethods.Uwp.g.cs @@ -36,7 +36,7 @@ internal static unsafe partial class NativeMethods public delegate void yaha_init_context_on_receive_delegate(int req_seq, nint state, nuint length, byte* buf); [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - public delegate void yaha_init_context_on_complete_delegate(int req_seq, nint state, CompletionReason reason); + public delegate void yaha_init_context_on_complete_delegate(int req_seq, nint state, CompletionReason reason, uint h2_error_code); [DllImport(__DllName, EntryPoint = "yaha_init_context", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)] public static extern YahaNativeContext* yaha_init_context(YahaNativeRuntimeContext* runtime_ctx, yaha_init_context_on_status_code_and_headers_receive_delegate on_status_code_and_headers_receive, yaha_init_context_on_receive_delegate on_receive, yaha_init_context_on_complete_delegate on_complete); diff --git a/src/YetAnotherHttpHandler/NativeMethods.g.cs b/src/YetAnotherHttpHandler/NativeMethods.g.cs index df242ba..84968bf 100644 --- a/src/YetAnotherHttpHandler/NativeMethods.g.cs +++ b/src/YetAnotherHttpHandler/NativeMethods.g.cs @@ -41,7 +41,7 @@ internal static unsafe partial class NativeMethods public delegate void yaha_init_context_on_receive_delegate(int req_seq, nint state, nuint length, byte* buf); [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - public delegate void yaha_init_context_on_complete_delegate(int req_seq, nint state, CompletionReason reason); + public delegate void yaha_init_context_on_complete_delegate(int req_seq, nint state, CompletionReason reason, uint h2_error_code); [DllImport(__DllName, EntryPoint = "yaha_init_context", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)] public static extern YahaNativeContext* yaha_init_context(YahaNativeRuntimeContext* runtime_ctx, yaha_init_context_on_status_code_and_headers_receive_delegate on_status_code_and_headers_receive, yaha_init_context_on_receive_delegate on_receive, yaha_init_context_on_complete_delegate on_complete); diff --git a/src/YetAnotherHttpHandler/NativeMethodsFuncPtr.g.cs b/src/YetAnotherHttpHandler/NativeMethodsFuncPtr.g.cs index 6f73df0..bedaa4e 100644 --- a/src/YetAnotherHttpHandler/NativeMethodsFuncPtr.g.cs +++ b/src/YetAnotherHttpHandler/NativeMethodsFuncPtr.g.cs @@ -29,7 +29,7 @@ internal static unsafe partial class NativeMethodsFuncPtr public static extern void yaha_dispose_runtime(YahaNativeRuntimeContext* ctx); [DllImport(__DllName, EntryPoint = "yaha_init_context", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)] - public static extern YahaNativeContext* yaha_init_context(YahaNativeRuntimeContext* runtime_ctx, delegate* unmanaged[Cdecl] on_status_code_and_headers_receive, delegate* unmanaged[Cdecl] on_receive, delegate* unmanaged[Cdecl] on_complete); + public static extern YahaNativeContext* yaha_init_context(YahaNativeRuntimeContext* runtime_ctx, delegate* unmanaged[Cdecl] on_status_code_and_headers_receive, delegate* unmanaged[Cdecl] on_receive, delegate* unmanaged[Cdecl] on_complete); [DllImport(__DllName, EntryPoint = "yaha_dispose_context", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)] public static extern void yaha_dispose_context(YahaNativeContext* ctx); diff --git a/src/YetAnotherHttpHandler/RequestContext.cs b/src/YetAnotherHttpHandler/RequestContext.cs index 30eb02e..c711489 100644 --- a/src/YetAnotherHttpHandler/RequestContext.cs +++ b/src/YetAnotherHttpHandler/RequestContext.cs @@ -286,9 +286,9 @@ public void Complete() _cancellationTokenSource.Cancel(); // Stop reading the request body. } - public void CompleteAsFailed(string errorMessage) + public void CompleteAsFailed(string errorMessage, uint h2ErrorCode) { - Response.CompleteAsFailed(errorMessage); + Response.CompleteAsFailed(errorMessage, h2ErrorCode); _cancellationTokenSource.Cancel(); // Stop reading the request body. } diff --git a/src/YetAnotherHttpHandler/ResponseContext.cs b/src/YetAnotherHttpHandler/ResponseContext.cs index 8d2e7fa..c55c75f 100644 --- a/src/YetAnotherHttpHandler/ResponseContext.cs +++ b/src/YetAnotherHttpHandler/ResponseContext.cs @@ -110,13 +110,22 @@ public void Complete() } } - public void CompleteAsFailed(string errorMessage) + public void CompleteAsFailed(string errorMessage, uint h2ErrorCode) { - if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Trace($"[ReqSeq:{_requestContext.RequestSequence}] Response completed with failure ({errorMessage})"); + if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Trace($"[ReqSeq:{_requestContext.RequestSequence}] Response completed with failure ({errorMessage}) (0x{h2ErrorCode:x})"); lock (_writeLock) { - var ex = new IOException(errorMessage); + Exception ex = new IOException(errorMessage); + if (h2ErrorCode != 0) + { +#if NET7_0_OR_GREATER + ex = new HttpProtocolException(h2ErrorCode, $"The HTTP/2 server reset the stream. HTTP/2 error code (0x{h2ErrorCode:x}).", ex); +#else + ex = new Http2StreamException($"The HTTP/2 server reset the stream. HTTP/2 error code (0x{h2ErrorCode:x}).", ex); +#endif + } + #if NET5_0_OR_GREATER ExceptionDispatchInfo.SetCurrentStackTrace(ex); #else diff --git a/src/YetAnotherHttpHandler/Shims/Http2StreamException.cs b/src/YetAnotherHttpHandler/Shims/Http2StreamException.cs new file mode 100644 index 0000000..cbb1be2 --- /dev/null +++ b/src/YetAnotherHttpHandler/Shims/Http2StreamException.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Text; + +// HACK: grpc-dotnet checks InnerException type names, so matching type name ensures behaviour +// https://github.com/grpc/grpc-dotnet/blob/v2.60.0/src/Grpc.Net.Client/Internal/GrpcProtocolHelpers.cs#L479 + +// ReSharper disable once CheckNamespace +namespace System.Net.Http +{ + internal class Http2StreamException : Exception + { + public Http2StreamException(string message, Exception innerException) : base(message, innerException) { } + } +} diff --git a/src/YetAnotherHttpHandler/Shims/Http2StreamException.cs.meta b/src/YetAnotherHttpHandler/Shims/Http2StreamException.cs.meta new file mode 100644 index 0000000..51fdf6f --- /dev/null +++ b/src/YetAnotherHttpHandler/Shims/Http2StreamException.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: f53aaf0200f86ab4787e221dcc0db5d1 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/test/YetAnotherHttpHandler.Test/Http2TestBase.cs b/test/YetAnotherHttpHandler.Test/Http2TestBase.cs index eb3b74a..f9f2f01 100644 --- a/test/YetAnotherHttpHandler.Test/Http2TestBase.cs +++ b/test/YetAnotherHttpHandler.Test/Http2TestBase.cs @@ -535,6 +535,39 @@ async Task RunAsync() } } + [ConditionalFact] + public async Task Grpc_Error_Status_ErrorCode() + { + // Arrange + using var httpHandler = CreateHandler(); + var httpClient = new HttpClient(httpHandler); + await using var server = await LaunchServerAsync(); + var client = new Greeter.GreeterClient(GrpcChannel.ForAddress(server.BaseUri, new GrpcChannelOptions() { HttpHandler = httpHandler })); + + // Act + var ex = await Record.ExceptionAsync(async () => await client.ResetByServerAsync(new ResetRequest { ErrorCode = 0x8 /* CANCELED */ }, deadline: DateTime.UtcNow.AddSeconds(5))); + + // Assert + Assert.IsType(ex); + Assert.Equal(StatusCode.Cancelled, ((RpcException)ex).StatusCode); + } + + [ConditionalFact] + public async Task Grpc_Error_Status_Unavailable_By_IOException() + { + // Arrange + using var httpHandler = CreateHandler(); + var httpClient = new HttpClient(httpHandler); + var client = new Greeter.GreeterClient(GrpcChannel.ForAddress("http://server.does.not.exists", new GrpcChannelOptions() { HttpHandler = httpHandler })); + + // Act + var ex = await Record.ExceptionAsync(async () => await client.SayHelloAsync(new HelloRequest() { Name = "Alice" }, deadline: DateTime.UtcNow.AddSeconds(5))); + + // Assert + Assert.IsType(ex); + Assert.Equal(StatusCode.Unavailable, ((RpcException)ex).StatusCode); + } + // Content with default value of true for AllowDuplex because AllowDuplex is internal. class DuplexStreamContent : HttpContent { diff --git a/test/YetAnotherHttpHandler.Test/Protos/greet.proto b/test/YetAnotherHttpHandler.Test/Protos/greet.proto index 4ca43d5..29e2e06 100644 --- a/test/YetAnotherHttpHandler.Test/Protos/greet.proto +++ b/test/YetAnotherHttpHandler.Test/Protos/greet.proto @@ -11,6 +11,7 @@ service Greeter { rpc SayHelloDuplex (stream HelloRequest) returns (stream HelloReply); rpc SayHelloDuplexCompleteRandomly (stream HelloRequest) returns (stream HelloReply); rpc SayHelloDuplexAbortRandomly (stream HelloRequest) returns (stream HelloReply); + rpc ResetByServer (ResetRequest) returns (ResetReply); rpc EchoDuplex (stream EchoRequest) returns (stream EchoReply); } @@ -31,3 +32,10 @@ message EchoRequest { message EchoReply { string message = 1; } + +message ResetRequest { + int32 errorCode = 1; +} + +message ResetReply { +} \ No newline at end of file diff --git a/test/YetAnotherHttpHandler.Test/TestServerForHttp2.cs b/test/YetAnotherHttpHandler.Test/TestServerForHttp2.cs index 11c04b1..ee2ce77 100644 --- a/test/YetAnotherHttpHandler.Test/TestServerForHttp2.cs +++ b/test/YetAnotherHttpHandler.Test/TestServerForHttp2.cs @@ -26,7 +26,10 @@ public static WebApplication BuildApplication(WebApplicationBuilder builder) { httpContext.Response.Headers["x-request-content-type"] = httpContext.Request.ContentType; - return Results.Bytes(await bodyStream.ToArrayAsync(), "application/octet-stream"); + var memStream = new MemoryStream(); + await bodyStream.CopyToAsync(memStream); + + return Results.Bytes(memStream.ToArray(), "application/octet-stream"); }); app.MapPost("/post-streaming", async (HttpContext httpContext, PipeReader reader) => { @@ -156,6 +159,12 @@ public override async Task SayHelloDuplexAbortRandomly(IAsyncStreamReader ResetByServer(ResetRequest request, ServerCallContext context) + { + context.GetHttpContext().Features.GetRequiredFeature().Reset(errorCode: request.ErrorCode); + return new ResetReply { }; + } + public override async Task EchoDuplex(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context) { await foreach (var request in requestStream.ReadAllAsync(context.CancellationToken))