Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add error type for driver down when receiving response. #1020

Merged
merged 2 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions http/src/h2/proto/hpack/table.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::{
use core::{
cmp,
collections::VecDeque,
hash::{Hash, Hasher},
mem, usize,
mem,
};

use std::collections::VecDeque;

use fnv::FnvHasher;

use crate::http::{header, method::Method};
Expand Down
28 changes: 17 additions & 11 deletions postgres/src/driver/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use postgres_protocol::message::backend;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use xitca_io::bytes::BytesMut;

use crate::error::{DriverDown, Error};
use crate::error::{DriverDownReceiving, Error};

pub struct Response {
rx: ResponseReceiver,
Expand All @@ -22,15 +22,14 @@ impl Response {
}
}

pub(crate) fn recv(&mut self) -> impl Future<Output = Result<backend::Message, Error>> + '_ {
pub(crate) fn recv(&mut self) -> impl Future<Output = Result<backend::Message, Error>> + Send + '_ {
poll_fn(|cx| {
if self.buf.is_empty() {
self.buf = ready!(self.rx.poll_recv(cx)).ok_or_else(|| DriverDown(BytesMut::new()))?;
self.buf = ready!(self.rx.poll_recv(cx)).ok_or_else(|| DriverDownReceiving)?;
}

let res = match backend::Message::parse(&mut self.buf)?.expect("must not parse message from empty buffer.")
{
// TODO: error response.
backend::Message::ErrorResponse(_body) => Err(Error::todo()),
msg => Ok(msg),
};
Expand All @@ -46,23 +45,30 @@ pub(crate) struct ResponseSender {
msg_count: usize,
}

pub(super) enum SenderState {
Continue,
Finish,
}

impl ResponseSender {
fn new(tx: UnboundedSender<BytesMut>, msg_count: usize) -> Self {
Self { tx, msg_count }
}

pub(super) fn send(&mut self, msg: BytesMut) {
let _ = self.tx.send(msg);
}
pub(super) fn send(&mut self, msg: BytesMut, complete: bool) -> SenderState {
debug_assert!(self.msg_count > 0);

pub(super) fn complete(&mut self, current_msg_complete: bool) -> bool {
assert!(self.msg_count > 0);
let _ = self.tx.send(msg);

if current_msg_complete {
if complete {
self.msg_count -= 1;
}

self.msg_count == 0
if self.msg_count == 0 {
SenderState::Finish
} else {
SenderState::Continue
}
}
}

Expand Down
9 changes: 5 additions & 4 deletions postgres/src/driver/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use xitca_unsafe_collection::futures::{Select as _, SelectOutput};
use crate::{error::Error, iter::AsyncLendingIterator};

use super::{
codec::{Request, Response, ResponseMessage, ResponseSender},
codec::{Request, Response, ResponseMessage, ResponseSender, SenderState},
Drive,
};

Expand Down Expand Up @@ -183,11 +183,12 @@ where
while let Some(res) = ResponseMessage::try_from_buf(self.read_buf.get_mut())? {
match res {
ResponseMessage::Normal { buf, complete } => {
if let Some(front) = self.res.front_mut() {
front.send(buf);
if front.complete(complete) {
let front = self.res.front_mut().expect("server respond out of bound");
match front.send(buf, complete) {
SenderState::Finish => {
self.res.pop_front();
}
SenderState::Continue => {}
}
}
ResponseMessage::Async(msg) => return Ok(Some(msg)),
Expand Down
12 changes: 7 additions & 5 deletions postgres/src/driver/io_uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use xitca_unsafe_collection::futures::{ReusableLocalBoxFuture, Select, SelectOut
use crate::error::Error;

use super::{
codec::{ResponseMessage, ResponseSender},
codec::{ResponseMessage, ResponseSender, SenderState},
generic::GenericDriverRx,
};

Expand Down Expand Up @@ -97,10 +97,12 @@ where
while let Some(res) = ResponseMessage::try_from_buf(buf)? {
match res {
ResponseMessage::Normal { buf, complete } => {
let front = self.res.front_mut().expect("out of bound must not happen");
front.send(buf);
if front.complete(complete) {
self.res.pop_front();
let front = self.res.front_mut().expect("server respond out of bound");
match front.send(buf, complete) {
SenderState::Finish => {
self.res.pop_front();
}
SenderState::Continue => {}
}
}
ResponseMessage::Async(msg) => return Ok(Some(msg)),
Expand Down
22 changes: 21 additions & 1 deletion postgres/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ impl error::Error for Error {
}
}

/// error indicate [Client]'s [Driver] is dropped and can't be accessed anymore.
/// error indicate [Client]'s [Driver] is dropped and can't be accessed anymore when sending
/// request to driver.
///
/// the field inside error contains the raw bytes buffer of query message that are ready to be
/// sent to the [Driver] for transporting. It's possible to construct a new [Client] and [Driver]
Expand Down Expand Up @@ -107,6 +108,25 @@ impl From<DriverDown> for Error {
}
}

/// error indicate [Client]'s [Driver] is dropped and can't be accessed anymore when receiving response
/// from server. any mid flight response and unfinished response data are lost and can't be recovered.
#[derive(Debug)]
pub struct DriverDownReceiving;

impl fmt::Display for DriverDownReceiving {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Driver is dropped and unaccessible. Response data is lost and unrecoverable.")
}
}

impl error::Error for DriverDownReceiving {}

impl From<DriverDownReceiving> for Error {
fn from(e: DriverDownReceiving) -> Self {
Self(Box::new(e))
}
}

pub struct InvalidColumnIndex(pub String);

impl fmt::Debug for InvalidColumnIndex {
Expand Down
Loading