Skip to content

Commit 0ba9be8

Browse files
authored
feat(quic): update h3 to 0.0.8, h3-datagram to 0.0.2 (#407)
1 parent 8b849ef commit 0ba9be8

File tree

6 files changed

+113
-99
lines changed

6 files changed

+113
-99
lines changed

compio-quic/Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ rustls = { workspace = true }
2727
rustls-platform-verifier = { version = "0.5.0", optional = true }
2828
rustls-native-certs = { workspace = true, optional = true }
2929
webpki-roots = { version = "0.26.3", optional = true }
30-
h3 = { version = "0.0.7", optional = true }
31-
h3-datagram = { version = "0.0.1", optional = true }
30+
h3 = { version = "0.0.8", optional = true }
31+
h3-datagram = { version = "0.0.2", optional = true }
3232

3333
# Utils
3434
flume = { workspace = true }

compio-quic/examples/http3-client.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ async fn main() {
7777

7878
drop(send_req);
7979

80-
handle.await.unwrap().unwrap();
80+
handle.await.unwrap();
8181
}
8282

8383
endpoint.shutdown().await.unwrap();

compio-quic/examples/http3-server.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ async fn main() {
3232
.await
3333
.unwrap();
3434

35-
while let Ok(Some((req, mut stream))) = conn.accept().await {
35+
while let Ok(Some(resolver)) = conn.accept().await {
36+
let (req, mut stream) = resolver.resolve_request().await.unwrap();
3637
println!("Received request: {req:?}");
3738
stream
3839
.send_response(

compio-quic/src/connection.rs

+70-59
Original file line numberDiff line numberDiff line change
@@ -983,73 +983,90 @@ pub enum OpenStreamError {
983983

984984
#[cfg(feature = "h3")]
985985
pub(crate) mod h3_impl {
986-
use compio_buf::bytes::{Buf, BytesMut};
986+
use compio_buf::bytes::Buf;
987987
use futures_util::ready;
988988
use h3::{
989989
error::Code,
990-
quic::{self, Error, WriteBuf},
990+
quic::{self, ConnectionErrorIncoming, StreamErrorIncoming, WriteBuf},
991991
};
992992
use h3_datagram::{
993-
datagram::Datagram,
994-
quic_traits::{RecvDatagramExt, SendDatagramExt},
993+
datagram::EncodedDatagram,
994+
quic_traits::{
995+
DatagramConnectionExt, RecvDatagram, SendDatagram, SendDatagramErrorIncoming,
996+
},
995997
};
996998

997999
use super::*;
998-
use crate::{ReadError, WriteError, send_stream::h3_impl::SendStream};
999-
1000-
impl Error for ConnectionError {
1001-
fn is_timeout(&self) -> bool {
1002-
matches!(self, ConnectionError::TimedOut)
1003-
}
1000+
use crate::send_stream::h3_impl::SendStream;
1001+
1002+
impl From<ConnectionError> for ConnectionErrorIncoming {
1003+
fn from(e: ConnectionError) -> Self {
1004+
use ConnectionError::*;
1005+
match e {
1006+
ApplicationClosed(e) => Self::ApplicationClose {
1007+
error_code: e.error_code.into_inner(),
1008+
},
1009+
TimedOut => Self::Timeout,
10041010

1005-
fn err_code(&self) -> Option<u64> {
1006-
match &self {
1007-
ConnectionError::ApplicationClosed(quinn_proto::ApplicationClose {
1008-
error_code,
1009-
..
1010-
}) => Some(error_code.into_inner()),
1011-
_ => None,
1011+
e => Self::Undefined(Arc::new(e)),
10121012
}
10131013
}
10141014
}
10151015

1016-
impl Error for SendDatagramError {
1017-
fn is_timeout(&self) -> bool {
1018-
false
1016+
impl From<ConnectionError> for StreamErrorIncoming {
1017+
fn from(e: ConnectionError) -> Self {
1018+
Self::ConnectionErrorIncoming {
1019+
connection_error: e.into(),
1020+
}
10191021
}
1022+
}
10201023

1021-
fn err_code(&self) -> Option<u64> {
1022-
match self {
1023-
SendDatagramError::ConnectionLost(ConnectionError::ApplicationClosed(
1024-
quinn_proto::ApplicationClose { error_code, .. },
1025-
)) => Some(error_code.into_inner()),
1026-
_ => None,
1024+
impl From<SendDatagramError> for SendDatagramErrorIncoming {
1025+
fn from(e: SendDatagramError) -> Self {
1026+
use SendDatagramError::*;
1027+
match e {
1028+
UnsupportedByPeer | Disabled => Self::NotAvailable,
1029+
TooLarge => Self::TooLarge,
1030+
ConnectionLost(e) => Self::ConnectionError(e.into()),
10271031
}
10281032
}
10291033
}
10301034

1031-
impl<B> SendDatagramExt<B> for Connection
1035+
impl<B> SendDatagram<B> for Connection
10321036
where
10331037
B: Buf,
10341038
{
1035-
type Error = SendDatagramError;
1036-
1037-
fn send_datagram(&mut self, data: Datagram<B>) -> Result<(), Self::Error> {
1038-
let mut buf = BytesMut::new();
1039-
data.encode(&mut buf);
1040-
Connection::send_datagram(self, buf.freeze())
1039+
fn send_datagram<T: Into<EncodedDatagram<B>>>(
1040+
&mut self,
1041+
data: T,
1042+
) -> Result<(), SendDatagramErrorIncoming> {
1043+
let mut buf: EncodedDatagram<B> = data.into();
1044+
let buf = buf.copy_to_bytes(buf.remaining());
1045+
Ok(Connection::send_datagram(self, buf)?)
10411046
}
10421047
}
10431048

1044-
impl RecvDatagramExt for Connection {
1045-
type Buf = Bytes;
1046-
type Error = ConnectionError;
1049+
impl RecvDatagram for Connection {
1050+
type Buffer = Bytes;
10471051

1048-
fn poll_accept_datagram(
1052+
fn poll_incoming_datagram(
10491053
&mut self,
1050-
cx: &mut Context<'_>,
1051-
) -> Poll<Result<Option<Self::Buf>, Self::Error>> {
1052-
Poll::Ready(Ok(Some(ready!(self.poll_recv_datagram(cx))?)))
1054+
cx: &mut core::task::Context<'_>,
1055+
) -> Poll<Result<Self::Buffer, ConnectionErrorIncoming>> {
1056+
Poll::Ready(Ok(ready!(self.poll_recv_datagram(cx))?))
1057+
}
1058+
}
1059+
1060+
impl<B: Buf> DatagramConnectionExt<B> for Connection {
1061+
type RecvDatagramHandler = Self;
1062+
type SendDatagramHandler = Self;
1063+
1064+
fn send_datagram_handler(&self) -> Self::SendDatagramHandler {
1065+
self.clone()
1066+
}
1067+
1068+
fn recv_datagram_handler(&self) -> Self::RecvDatagramHandler {
1069+
self.clone()
10531070
}
10541071
}
10551072

@@ -1085,12 +1102,11 @@ pub(crate) mod h3_impl {
10851102
B: Buf,
10861103
{
10871104
type Buf = Bytes;
1088-
type Error = ReadError;
10891105

10901106
fn poll_data(
10911107
&mut self,
10921108
cx: &mut Context<'_>,
1093-
) -> Poll<Result<Option<Self::Buf>, Self::Error>> {
1109+
) -> Poll<Result<Option<Self::Buf>, StreamErrorIncoming>> {
10941110
self.recv.poll_data(cx)
10951111
}
10961112

@@ -1107,17 +1123,15 @@ pub(crate) mod h3_impl {
11071123
where
11081124
B: Buf,
11091125
{
1110-
type Error = WriteError;
1111-
1112-
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1126+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), StreamErrorIncoming>> {
11131127
self.send.poll_ready(cx)
11141128
}
11151129

1116-
fn send_data<T: Into<WriteBuf<B>>>(&mut self, data: T) -> Result<(), Self::Error> {
1130+
fn send_data<T: Into<WriteBuf<B>>>(&mut self, data: T) -> Result<(), StreamErrorIncoming> {
11171131
self.send.send_data(data)
11181132
}
11191133

1120-
fn poll_finish(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1134+
fn poll_finish(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), StreamErrorIncoming>> {
11211135
self.send.poll_finish(cx)
11221136
}
11231137

@@ -1138,7 +1152,7 @@ pub(crate) mod h3_impl {
11381152
&mut self,
11391153
cx: &mut Context<'_>,
11401154
buf: &mut D,
1141-
) -> Poll<Result<usize, Self::Error>> {
1155+
) -> Poll<Result<usize, StreamErrorIncoming>> {
11421156
self.send.poll_send(cx, buf)
11431157
}
11441158
}
@@ -1152,21 +1166,20 @@ pub(crate) mod h3_impl {
11521166
B: Buf,
11531167
{
11541168
type BidiStream = BidiStream<B>;
1155-
type OpenError = ConnectionError;
11561169
type SendStream = SendStream<B>;
11571170

11581171
fn poll_open_bidi(
11591172
&mut self,
11601173
cx: &mut Context<'_>,
1161-
) -> Poll<Result<Self::BidiStream, Self::OpenError>> {
1174+
) -> Poll<Result<Self::BidiStream, StreamErrorIncoming>> {
11621175
let (stream, is_0rtt) = ready!(self.0.poll_open_stream(Some(cx), Dir::Bi))?;
11631176
Poll::Ready(Ok(BidiStream::new(self.0.0.clone(), stream, is_0rtt)))
11641177
}
11651178

11661179
fn poll_open_send(
11671180
&mut self,
11681181
cx: &mut Context<'_>,
1169-
) -> Poll<Result<Self::SendStream, Self::OpenError>> {
1182+
) -> Poll<Result<Self::SendStream, StreamErrorIncoming>> {
11701183
let (stream, is_0rtt) = ready!(self.0.poll_open_stream(Some(cx), Dir::Uni))?;
11711184
Poll::Ready(Ok(SendStream::new(self.0.0.clone(), stream, is_0rtt)))
11721185
}
@@ -1182,21 +1195,20 @@ pub(crate) mod h3_impl {
11821195
B: Buf,
11831196
{
11841197
type BidiStream = BidiStream<B>;
1185-
type OpenError = ConnectionError;
11861198
type SendStream = SendStream<B>;
11871199

11881200
fn poll_open_bidi(
11891201
&mut self,
11901202
cx: &mut Context<'_>,
1191-
) -> Poll<Result<Self::BidiStream, Self::OpenError>> {
1203+
) -> Poll<Result<Self::BidiStream, StreamErrorIncoming>> {
11921204
let (stream, is_0rtt) = ready!(self.poll_open_stream(Some(cx), Dir::Bi))?;
11931205
Poll::Ready(Ok(BidiStream::new(self.0.clone(), stream, is_0rtt)))
11941206
}
11951207

11961208
fn poll_open_send(
11971209
&mut self,
11981210
cx: &mut Context<'_>,
1199-
) -> Poll<Result<Self::SendStream, Self::OpenError>> {
1211+
) -> Poll<Result<Self::SendStream, StreamErrorIncoming>> {
12001212
let (stream, is_0rtt) = ready!(self.poll_open_stream(Some(cx), Dir::Uni))?;
12011213
Poll::Ready(Ok(SendStream::new(self.0.clone(), stream, is_0rtt)))
12021214
}
@@ -1210,24 +1222,23 @@ pub(crate) mod h3_impl {
12101222
where
12111223
B: Buf,
12121224
{
1213-
type AcceptError = ConnectionError;
12141225
type OpenStreams = OpenStreams;
12151226
type RecvStream = RecvStream;
12161227

12171228
fn poll_accept_recv(
12181229
&mut self,
12191230
cx: &mut std::task::Context<'_>,
1220-
) -> Poll<Result<Option<Self::RecvStream>, Self::AcceptError>> {
1231+
) -> Poll<Result<Self::RecvStream, ConnectionErrorIncoming>> {
12211232
let (stream, is_0rtt) = ready!(self.poll_accept_stream(cx, Dir::Uni))?;
1222-
Poll::Ready(Ok(Some(RecvStream::new(self.0.clone(), stream, is_0rtt))))
1233+
Poll::Ready(Ok(RecvStream::new(self.0.clone(), stream, is_0rtt)))
12231234
}
12241235

12251236
fn poll_accept_bidi(
12261237
&mut self,
12271238
cx: &mut std::task::Context<'_>,
1228-
) -> Poll<Result<Option<Self::BidiStream>, Self::AcceptError>> {
1239+
) -> Poll<Result<Self::BidiStream, ConnectionErrorIncoming>> {
12291240
let (stream, is_0rtt) = ready!(self.poll_accept_stream(cx, Dir::Bi))?;
1230-
Poll::Ready(Ok(Some(BidiStream::new(self.0.clone(), stream, is_0rtt))))
1241+
Poll::Ready(Ok(BidiStream::new(self.0.clone(), stream, is_0rtt)))
12311242
}
12321243

12331244
fn opener(&self) -> Self::OpenStreams {

compio-quic/src/recv_stream.rs

+15-15
Original file line numberDiff line numberDiff line change
@@ -529,38 +529,38 @@ impl futures_util::AsyncRead for RecvStream {
529529

530530
#[cfg(feature = "h3")]
531531
pub(crate) mod h3_impl {
532-
use h3::quic::{self, Error};
532+
use h3::quic::{self, StreamErrorIncoming};
533533

534534
use super::*;
535535

536-
impl Error for ReadError {
537-
fn is_timeout(&self) -> bool {
538-
matches!(self, Self::ConnectionLost(ConnectionError::TimedOut))
539-
}
540-
541-
fn err_code(&self) -> Option<u64> {
542-
match self {
543-
Self::ConnectionLost(ConnectionError::ApplicationClosed(
544-
quinn_proto::ApplicationClose { error_code, .. },
545-
))
546-
| Self::Reset(error_code) => Some(error_code.into_inner()),
547-
_ => None,
536+
impl From<ReadError> for StreamErrorIncoming {
537+
fn from(e: ReadError) -> Self {
538+
use ReadError::*;
539+
match e {
540+
Reset(code) => Self::StreamTerminated {
541+
error_code: code.into_inner(),
542+
},
543+
ConnectionLost(e) => Self::ConnectionErrorIncoming {
544+
connection_error: e.into(),
545+
},
546+
IllegalOrderedRead => unreachable!("illegal ordered read"),
547+
e => Self::Unknown(Box::new(e)),
548548
}
549549
}
550550
}
551551

552552
impl quic::RecvStream for RecvStream {
553553
type Buf = Bytes;
554-
type Error = ReadError;
555554

556555
fn poll_data(
557556
&mut self,
558557
cx: &mut Context<'_>,
559-
) -> Poll<Result<Option<Self::Buf>, Self::Error>> {
558+
) -> Poll<Result<Option<Self::Buf>, StreamErrorIncoming>> {
560559
self.execute_poll_read(cx, true, |chunks| match chunks.next(usize::MAX) {
561560
Ok(Some(chunk)) => ReadStatus::Readable(chunk.bytes),
562561
res => (None, res.err()).into(),
563562
})
563+
.map_err(Into::into)
564564
}
565565

566566
fn stop_sending(&mut self, error_code: u64) {

0 commit comments

Comments
 (0)