Skip to content

Commit

Permalink
Propagate HTTP/2 error codes to grpc-dotnet
Browse files Browse the repository at this point in the history
  • Loading branch information
mayuki committed Jul 9, 2024
1 parent f548b97 commit 579a3e8
Show file tree
Hide file tree
Showing 15 changed files with 137 additions and 27 deletions.
1 change: 1 addition & 0 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions native/yaha_native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ crate-type = ["staticlib", "cdylib"]
csbindgen = "1.9.1"

[dependencies]
h2 = { version = "0.4.5" }
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7.10" }
hyper = { version = "1.3.1", features = ["client", "http1", "http2"] }
Expand Down
49 changes: 36 additions & 13 deletions native/yaha_native/src/binding.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
use std::{
cell::RefCell,
num::NonZeroIsize,
ptr::null,
sync::{Arc, Mutex},
time::Duration,
cell::RefCell, error::Error, num::NonZeroIsize, ptr::null, sync::{Arc, Mutex}, time::Duration
};

use http_body_util::{combinators::BoxBody, BodyExt};
Expand Down Expand Up @@ -70,7 +66,7 @@ pub extern "C" fn yaha_init_context(
version: YahaHttpVersion,
),
on_receive: extern "C" fn(req_seq: i32, state: NonZeroIsize, length: usize, buf: *const u8),
on_complete: extern "C" fn(req_seq: i32, state: NonZeroIsize, reason: CompletionReason),
on_complete: extern "C" fn(req_seq: i32, state: NonZeroIsize, reason: CompletionReason, h2_error_code: u32),
) -> *mut YahaNativeContext {
let runtime_ctx = YahaNativeRuntimeContextInternal::from_raw_context(runtime_ctx);
let ctx = Box::new(YahaNativeContextInternal::new(
Expand All @@ -89,7 +85,7 @@ pub extern "C" fn yaha_dispose_context(ctx: *mut YahaNativeContext) {
ctx.on_receive = _sentinel_on_receive;
ctx.on_status_code_and_headers_receive = _sentinel_on_status_code_and_headers_receive;
}
extern "C" fn _sentinel_on_complete(_: i32, _: NonZeroIsize, _: CompletionReason) { panic!("The context has already disposed: on_complete"); }
extern "C" fn _sentinel_on_complete(_: i32, _: NonZeroIsize, _: CompletionReason, _: u32) { panic!("The context has already disposed: on_complete"); }
extern "C" fn _sentinel_on_receive(_: i32, _: NonZeroIsize, _: usize, _: *const u8) { panic!("The context has already disposed: on_receive"); }
extern "C" fn _sentinel_on_status_code_and_headers_receive(_: i32, _: NonZeroIsize, _: i32, _: YahaHttpVersion) { panic!("The context has already disposed: on_status_code_and_headers_receive"); }

Expand Down Expand Up @@ -476,22 +472,41 @@ pub extern "C" fn yaha_request_begin(
LAST_ERROR.with(|v| {
*v.borrow_mut() = Some("The client has not been built. You need to build it before sending the request. ".to_string());
});
(ctx.on_complete)(seq, state, CompletionReason::Error);
(ctx.on_complete)(seq, state, CompletionReason::Error, 0);
return;
}

// Send a request and wait for response status and headers.
let res = select! {
_ = cancellation_token.cancelled() => {
(ctx.on_complete)(seq, state, CompletionReason::Aborted);
(ctx.on_complete)(seq, state, CompletionReason::Aborted, 0);
return;
}
res = ctx.client.as_ref().unwrap().request(req) => {
if let Err(err) = res {
LAST_ERROR.with(|v| {
*v.borrow_mut() = Some(err.to_string());
});
(ctx.on_complete)(seq, state, CompletionReason::Error);

// If the inner error is `hyper::Error`, use its error message instead.
let error_inner = err.source()
.and_then(|e| e.downcast_ref::<hyper::Error>());

if let Some(err) = error_inner {
LAST_ERROR.with(|v| {
*v.borrow_mut() = Some(err.to_string());
});
}

// If the `hyper::Error` has `h2::Error` as inner error, the error has HTTP/2 error code.
let reason = error_inner
.and_then(|e| e.source())
.and_then(|e| e.downcast_ref::<h2::Error>())
.and_then(|e| e.reason());

let rc = reason.map(|r| u32::from(r));

(ctx.on_complete)(seq, state, CompletionReason::Error, rc.unwrap_or_default());
return;
} else {
res
Expand Down Expand Up @@ -532,7 +547,7 @@ pub extern "C" fn yaha_request_begin(
while !body.is_end_stream() {
select! {
_ = cancellation_token.cancelled() => {
(ctx.on_complete)(seq, state, CompletionReason::Aborted);
(ctx.on_complete)(seq, state, CompletionReason::Aborted, 0);
return;
}
received = body.frame() => {
Expand Down Expand Up @@ -572,7 +587,15 @@ pub extern "C" fn yaha_request_begin(
LAST_ERROR.with(|v| {
*v.borrow_mut() = Some(err.to_string());
});
(ctx.on_complete)(seq, state, CompletionReason::Error);

// If the `hyper::Error` has `h2::Error` as inner error, the error has HTTP/2 error code.
let reason = err.source()
.and_then(|e| e.downcast_ref::<h2::Error>())
.and_then(|e| e.reason());

let rc = reason.map(|r| u32::from(r));

(ctx.on_complete)(seq, state, CompletionReason::Error, rc.unwrap_or_default());
return;
}
}
Expand All @@ -591,7 +614,7 @@ pub extern "C" fn yaha_request_begin(
req_ctx.try_complete();
}

(ctx.on_complete)(seq, state, CompletionReason::Success);
(ctx.on_complete)(seq, state, CompletionReason::Success, 0);

{
let mut req_ctx = req_ctx.lock().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion native/yaha_native/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::primitives::{YahaHttpVersion, CompletionReason};
type OnStatusCodeAndHeadersReceive =
extern "C" fn(req_seq: i32, state: NonZeroIsize, status_code: i32, version: YahaHttpVersion);
type OnReceive = extern "C" fn(req_seq: i32, state: NonZeroIsize, length: usize, buf: *const u8);
type OnComplete = extern "C" fn(req_seq: i32, state: NonZeroIsize, reason: CompletionReason);
type OnComplete = extern "C" fn(req_seq: i32, state: NonZeroIsize, reason: CompletionReason, h2_error_code: u32);

pub struct YahaNativeRuntimeContext;
pub struct YahaNativeRuntimeContextInternal {
Expand Down
8 changes: 4 additions & 4 deletions src/YetAnotherHttpHandler/NativeHttpHandlerCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,9 @@ private static unsafe void OnReceive(int reqSeq, IntPtr state, UIntPtr length, b
[UnmanagedCallersOnly(CallConvs = new[] { typeof(CallConvCdecl) })]
#endif
[MonoPInvokeCallback(typeof(NativeMethods.yaha_init_context_on_complete_delegate))]
private static unsafe void OnComplete(int reqSeq, IntPtr state, CompletionReason reason)
private static unsafe void OnComplete(int reqSeq, IntPtr state, CompletionReason reason, uint h2ErrorCode)
{
if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Info($"[ReqSeq:{reqSeq}:State:0x{state:X}] Response completed: Reason={reason}");
if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Info($"[ReqSeq:{reqSeq}:State:0x{state:X}] Response completed: Reason={reason}; H2ErrorCode=0x{h2ErrorCode:x}");

var requestContext = RequestContext.FromHandle(state);
try
Expand Down Expand Up @@ -459,7 +459,7 @@ private static unsafe void OnComplete(int reqSeq, IntPtr state, CompletionReason
var buf = NativeMethods.yaha_get_last_error();
try
{
requestContext.CompleteAsFailed(UnsafeUtilities.GetStringFromUtf8Bytes(buf->AsSpan()));
requestContext.CompleteAsFailed(UnsafeUtilities.GetStringFromUtf8Bytes(buf->AsSpan()), h2ErrorCode);
}
catch
{
Expand All @@ -468,7 +468,7 @@ private static unsafe void OnComplete(int reqSeq, IntPtr state, CompletionReason
}
else
{
requestContext.CompleteAsFailed("Canceled");
requestContext.CompleteAsFailed("Canceled", 0);
}
}
finally
Expand Down
2 changes: 1 addition & 1 deletion src/YetAnotherHttpHandler/NativeMethods.Uwp.g.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ internal static unsafe partial class NativeMethods
public delegate void yaha_init_context_on_receive_delegate(int req_seq, nint state, nuint length, byte* buf);

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void yaha_init_context_on_complete_delegate(int req_seq, nint state, CompletionReason reason);
public delegate void yaha_init_context_on_complete_delegate(int req_seq, nint state, CompletionReason reason, uint h2_error_code);

[DllImport(__DllName, EntryPoint = "yaha_init_context", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern YahaNativeContext* yaha_init_context(YahaNativeRuntimeContext* runtime_ctx, yaha_init_context_on_status_code_and_headers_receive_delegate on_status_code_and_headers_receive, yaha_init_context_on_receive_delegate on_receive, yaha_init_context_on_complete_delegate on_complete);
Expand Down
2 changes: 1 addition & 1 deletion src/YetAnotherHttpHandler/NativeMethods.g.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ internal static unsafe partial class NativeMethods
public delegate void yaha_init_context_on_receive_delegate(int req_seq, nint state, nuint length, byte* buf);

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void yaha_init_context_on_complete_delegate(int req_seq, nint state, CompletionReason reason);
public delegate void yaha_init_context_on_complete_delegate(int req_seq, nint state, CompletionReason reason, uint h2_error_code);

[DllImport(__DllName, EntryPoint = "yaha_init_context", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern YahaNativeContext* yaha_init_context(YahaNativeRuntimeContext* runtime_ctx, yaha_init_context_on_status_code_and_headers_receive_delegate on_status_code_and_headers_receive, yaha_init_context_on_receive_delegate on_receive, yaha_init_context_on_complete_delegate on_complete);
Expand Down
2 changes: 1 addition & 1 deletion src/YetAnotherHttpHandler/NativeMethodsFuncPtr.g.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ internal static unsafe partial class NativeMethodsFuncPtr
public static extern void yaha_dispose_runtime(YahaNativeRuntimeContext* ctx);

[DllImport(__DllName, EntryPoint = "yaha_init_context", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern YahaNativeContext* yaha_init_context(YahaNativeRuntimeContext* runtime_ctx, delegate* unmanaged[Cdecl]<int, nint, int, YahaHttpVersion, void> on_status_code_and_headers_receive, delegate* unmanaged[Cdecl]<int, nint, nuint, byte*, void> on_receive, delegate* unmanaged[Cdecl]<int, nint, CompletionReason, void> on_complete);
public static extern YahaNativeContext* yaha_init_context(YahaNativeRuntimeContext* runtime_ctx, delegate* unmanaged[Cdecl]<int, nint, int, YahaHttpVersion, void> on_status_code_and_headers_receive, delegate* unmanaged[Cdecl]<int, nint, nuint, byte*, void> on_receive, delegate* unmanaged[Cdecl]<int, nint, CompletionReason, uint, void> on_complete);

[DllImport(__DllName, EntryPoint = "yaha_dispose_context", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern void yaha_dispose_context(YahaNativeContext* ctx);
Expand Down
4 changes: 2 additions & 2 deletions src/YetAnotherHttpHandler/RequestContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,9 @@ public void Complete()
_cancellationTokenSource.Cancel(); // Stop reading the request body.
}

public void CompleteAsFailed(string errorMessage)
public void CompleteAsFailed(string errorMessage, uint h2ErrorCode)
{
Response.CompleteAsFailed(errorMessage);
Response.CompleteAsFailed(errorMessage, h2ErrorCode);
_cancellationTokenSource.Cancel(); // Stop reading the request body.
}

Expand Down
15 changes: 12 additions & 3 deletions src/YetAnotherHttpHandler/ResponseContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,22 @@ public void Complete()
}
}

public void CompleteAsFailed(string errorMessage)
public void CompleteAsFailed(string errorMessage, uint h2ErrorCode)
{
if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Trace($"[ReqSeq:{_requestContext.RequestSequence}] Response completed with failure ({errorMessage})");
if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Trace($"[ReqSeq:{_requestContext.RequestSequence}] Response completed with failure ({errorMessage}) (0x{h2ErrorCode:x})");

lock (_writeLock)
{
var ex = new IOException(errorMessage);
Exception ex = new IOException(errorMessage);
if (h2ErrorCode != 0)
{
#if NET7_0_OR_GREATER
ex = new HttpProtocolException(h2ErrorCode, $"The HTTP/2 server reset the stream. HTTP/2 error code (0x{h2ErrorCode:x}).", ex);
#else
ex = new Http2StreamException($"The HTTP/2 server reset the stream. HTTP/2 error code (0x{h2ErrorCode:x}).", ex);
#endif
}

#if NET5_0_OR_GREATER
ExceptionDispatchInfo.SetCurrentStackTrace(ex);
#else
Expand Down
15 changes: 15 additions & 0 deletions src/YetAnotherHttpHandler/Shims/Http2StreamException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Text;

// HACK: grpc-dotnet checks InnerException type names, so matching type name ensures behaviour
// https://github.com/grpc/grpc-dotnet/blob/v2.60.0/src/Grpc.Net.Client/Internal/GrpcProtocolHelpers.cs#L479

// ReSharper disable once CheckNamespace
namespace System.Net.Http
{
internal class Http2StreamException : Exception
{
public Http2StreamException(string message, Exception innerException) : base(message, innerException) { }
}
}
11 changes: 11 additions & 0 deletions src/YetAnotherHttpHandler/Shims/Http2StreamException.cs.meta

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions test/YetAnotherHttpHandler.Test/Http2TestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,39 @@ async Task RunAsync()
}
}

[ConditionalFact]
public async Task Grpc_Error_Status_ErrorCode()
{
// Arrange
using var httpHandler = CreateHandler();
var httpClient = new HttpClient(httpHandler);
await using var server = await LaunchServerAsync<TestServerForHttp2>();
var client = new Greeter.GreeterClient(GrpcChannel.ForAddress(server.BaseUri, new GrpcChannelOptions() { HttpHandler = httpHandler }));

// Act
var ex = await Record.ExceptionAsync(async () => await client.ResetByServerAsync(new ResetRequest { ErrorCode = 0x8 /* CANCELED */ }, deadline: DateTime.UtcNow.AddSeconds(5)));

// Assert
Assert.IsType<RpcException>(ex);
Assert.Equal(StatusCode.Cancelled, ((RpcException)ex).StatusCode);
}

[ConditionalFact]
public async Task Grpc_Error_Status_Unavailable_By_IOException()
{
// Arrange
using var httpHandler = CreateHandler();
var httpClient = new HttpClient(httpHandler);
var client = new Greeter.GreeterClient(GrpcChannel.ForAddress("http://server.does.not.exists", new GrpcChannelOptions() { HttpHandler = httpHandler }));

// Act
var ex = await Record.ExceptionAsync(async () => await client.SayHelloAsync(new HelloRequest() { Name = "Alice" }, deadline: DateTime.UtcNow.AddSeconds(5)));

// Assert
Assert.IsType<RpcException>(ex);
Assert.Equal(StatusCode.Unavailable, ((RpcException)ex).StatusCode);
}

// Content with default value of true for AllowDuplex because AllowDuplex is internal.
class DuplexStreamContent : HttpContent
{
Expand Down
8 changes: 8 additions & 0 deletions test/YetAnotherHttpHandler.Test/Protos/greet.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ service Greeter {
rpc SayHelloDuplex (stream HelloRequest) returns (stream HelloReply);
rpc SayHelloDuplexCompleteRandomly (stream HelloRequest) returns (stream HelloReply);
rpc SayHelloDuplexAbortRandomly (stream HelloRequest) returns (stream HelloReply);
rpc ResetByServer (ResetRequest) returns (ResetReply);
rpc EchoDuplex (stream EchoRequest) returns (stream EchoReply);
}

Expand All @@ -31,3 +32,10 @@ message EchoRequest {
message EchoReply {
string message = 1;
}

message ResetRequest {
int32 errorCode = 1;
}

message ResetReply {
}
11 changes: 10 additions & 1 deletion test/YetAnotherHttpHandler.Test/TestServerForHttp2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ public static WebApplication BuildApplication(WebApplicationBuilder builder)
{
httpContext.Response.Headers["x-request-content-type"] = httpContext.Request.ContentType;

return Results.Bytes(await bodyStream.ToArrayAsync(), "application/octet-stream");
var memStream = new MemoryStream();
await bodyStream.CopyToAsync(memStream);

return Results.Bytes(memStream.ToArray(), "application/octet-stream");
});
app.MapPost("/post-streaming", async (HttpContext httpContext, PipeReader reader) =>
{
Expand Down Expand Up @@ -156,6 +159,12 @@ public override async Task SayHelloDuplexAbortRandomly(IAsyncStreamReader<HelloR
}
}

public override async Task<ResetReply> ResetByServer(ResetRequest request, ServerCallContext context)
{
context.GetHttpContext().Features.GetRequiredFeature<IHttpResetFeature>().Reset(errorCode: request.ErrorCode);
return new ResetReply { };
}

public override async Task EchoDuplex(IAsyncStreamReader<EchoRequest> requestStream, IServerStreamWriter<EchoReply> responseStream, ServerCallContext context)
{
await foreach (var request in requestStream.ReadAllAsync(context.CancellationToken))
Expand Down

0 comments on commit 579a3e8

Please sign in to comment.