Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rework error handling. #989

Merged
merged 4 commits into from
Mar 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions postgres/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ impl Config {
"disable" => SslMode::Disable,
"prefer" => SslMode::Prefer,
"require" => SslMode::Require,
_ => return Err(Error::ToDo),
_ => return Err(Error::todo()),
};
self.ssl_mode(mode);
}
Expand All @@ -259,7 +259,7 @@ impl Config {
let port = if port.is_empty() {
5432
} else {
port.parse().map_err(|_| Error::ToDo)?
port.parse().map_err(|_| Error::todo())?
};
self.port(port);
}
Expand All @@ -269,13 +269,13 @@ impl Config {
"any" => TargetSessionAttrs::Any,
"read-write" => TargetSessionAttrs::ReadWrite,
_ => {
return Err(Error::ToDo);
return Err(Error::todo());
}
};
self.target_session_attrs(target_session_attrs);
}
_ => {
return Err(Error::ToDo);
return Err(Error::todo());
}
}

Expand Down Expand Up @@ -375,9 +375,9 @@ impl<'a> Parser<'a> {
Some((_, c)) if c == target => Ok(()),
Some((i, c)) => {
let _m = format!("unexpected character at byte {i}: expected `{target}` but got `{c}`");
Err(Error::ToDo)
Err(Error::todo())
}
None => Err(Error::ToDo),
None => Err(Error::todo()),
}
}

Expand Down Expand Up @@ -436,7 +436,7 @@ impl<'a> Parser<'a> {
}

if value.is_empty() {
return Err(Error::ToDo);
return Err(Error::todo());
}

Ok(value)
Expand All @@ -460,7 +460,7 @@ impl<'a> Parser<'a> {
}
}

Err(Error::ToDo)
Err(Error::todo())
}

fn parameter(&mut self) -> Result<Option<(&'a str, String)>, Error> {
Expand Down Expand Up @@ -566,7 +566,7 @@ impl<'a> UrlParser<'a> {
let (host, port) = if chunk.starts_with('[') {
let idx = match chunk.find(']') {
Some(idx) => idx,
None => return Err(Error::ToDo),
None => return Err(Error::todo()),
};

let host = &chunk[1..idx];
Expand All @@ -576,7 +576,7 @@ impl<'a> UrlParser<'a> {
} else if remaining.is_empty() {
None
} else {
return Err(Error::ToDo);
return Err(Error::todo());
};

(host, port)
Expand Down Expand Up @@ -620,7 +620,7 @@ impl<'a> UrlParser<'a> {
while !self.s.is_empty() {
let key = match self.take_until(&['=']) {
Some(key) => self.decode(key)?,
None => return Err(Error::ToDo),
None => return Err(Error::todo()),
};
self.eat_byte();

Expand Down Expand Up @@ -651,6 +651,6 @@ impl<'a> UrlParser<'a> {
fn decode(&self, s: &'a str) -> Result<Cow<'a, str>, Error> {
percent_encoding::percent_decode(s.as_bytes())
.decode_utf8()
.map_err(|_| Error::ToDo)
.map_err(|_| Error::todo())
}
}
19 changes: 6 additions & 13 deletions postgres/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub(super) async fn connect(cfg: &mut Config) -> Result<(Client, Driver), Error>
}

/// async driver of [Client](crate::Client).
/// it handles IO and emit server sent message that do not belong to any query with [AsyncIterator]
/// it handles IO and emit server sent message that do not belong to any query with [AsyncLendingIterator]
/// trait impl.
///
/// # Examples:
Expand All @@ -80,8 +80,6 @@ pub(super) async fn connect(cfg: &mut Config) -> Result<(Client, Driver), Error>
/// ```
pub struct Driver {
inner: _Driver,
#[allow(dead_code)]
config: Config,
}

impl Driver {
Expand Down Expand Up @@ -111,44 +109,39 @@ impl Driver {

#[cfg(not(feature = "quic"))]
impl Driver {
pub(super) fn tcp(drv: GenericDriver<TcpStream>, config: Config) -> Self {
pub(super) fn tcp(drv: GenericDriver<TcpStream>) -> Self {
Self {
inner: _Driver::Tcp(drv),
config,
}
}

#[cfg(feature = "tls")]
pub(super) fn tls(drv: GenericDriver<TlsStream<ClientConnection, TcpStream>>, config: Config) -> Self {
pub(super) fn tls(drv: GenericDriver<TlsStream<ClientConnection, TcpStream>>) -> Self {
Self {
inner: _Driver::Tls(drv),
config,
}
}

#[cfg(unix)]
pub(super) fn unix(drv: GenericDriver<UnixStream>, config: Config) -> Self {
pub(super) fn unix(drv: GenericDriver<UnixStream>) -> Self {
Self {
inner: _Driver::Unix(drv),
config,
}
}

#[cfg(all(unix, feature = "tls"))]
pub(super) fn unix_tls(drv: GenericDriver<TlsStream<ClientConnection, UnixStream>>, config: Config) -> Self {
pub(super) fn unix_tls(drv: GenericDriver<TlsStream<ClientConnection, UnixStream>>) -> Self {
Self {
inner: _Driver::UnixTls(drv),
config,
}
}
}

#[cfg(feature = "quic")]
impl Driver {
pub(super) fn quic(drv: QuicDriver, config: Config) -> Self {
pub(super) fn quic(drv: QuicDriver) -> Self {
Self {
inner: _Driver::Quic(drv),
config,
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions postgres/src/driver/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub(super) async fn _connect(host: Host, cfg: &Config) -> Result<(ClientTx, Driv
let mut drv = QuicDriver::new(streams);
prepare_session(&mut drv, cfg).await?;
drv.close_tx().await;
Ok((tx, Driver::quic(drv, cfg.clone())))
Ok((tx, Driver::quic(drv)))
}
_ => unreachable!(),
}
Expand Down Expand Up @@ -150,9 +150,9 @@ async fn connect_quic(host: &str, ports: &[u16]) -> Result<ClientTx, Error> {
match endpoint.connect(addr, host) {
Ok(conn) => match conn.await {
Ok(inner) => return Ok(ClientTx::new(inner)),
Err(_) => err = Some(Error::ToDo),
Err(_) => err = Some(Error::todo()),
},
Err(_) => err = Some(Error::ToDo),
Err(_) => err = Some(Error::todo()),
}
}

Expand Down Expand Up @@ -186,7 +186,7 @@ impl QuicDriver {
.read_chunk(4096, true)
.await
.map(|c| c.map(|c| c.bytes))
.map_err(|_| Error::ToDo)
.map_err(|_| Error::todo())
.transpose()
}

Expand All @@ -201,7 +201,7 @@ impl QuicDriver {
Ok(None)
| Err(ReadError::ConnectionLost(ConnectionError::ApplicationClosed(_)))
| Err(ReadError::ConnectionLost(ConnectionError::LocallyClosed)) => return Ok(None),
Err(_) => return Err(Error::ToDo),
Err(_) => return Err(Error::todo()),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion postgres/src/driver/quic/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl Response {
loop {
match backend::Message::parse(&mut self.buf)? {
// TODO: error response.
Some(backend::Message::ErrorResponse(_body)) => return Err(Error::ToDo),
Some(backend::Message::ErrorResponse(_body)) => return Err(Error::todo()),
Some(msg) => return Ok(msg),
None => {
let chunk = self
Expand Down
10 changes: 5 additions & 5 deletions postgres/src/driver/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub(super) async fn _connect(host: Host, cfg: &mut Config) -> Result<(ClientTx,
let io = tls::connect(io, host, cfg).await?;
let (mut drv, tx) = GenericDriver::new(io);
prepare_session(&mut drv, cfg).await?;
Ok((ClientTx(tx), Driver::tls(drv, cfg.clone())))
Ok((ClientTx(tx), Driver::tls(drv)))
}
#[cfg(not(feature = "tls"))]
{
Expand All @@ -77,7 +77,7 @@ pub(super) async fn _connect(host: Host, cfg: &mut Config) -> Result<(ClientTx,
} else {
let (mut drv, tx) = GenericDriver::new(io);
prepare_session(&mut drv, cfg).await?;
Ok((ClientTx(tx), Driver::tcp(drv, cfg.clone())))
Ok((ClientTx(tx), Driver::tcp(drv)))
}
}
#[cfg(unix)]
Expand All @@ -90,7 +90,7 @@ pub(super) async fn _connect(host: Host, cfg: &mut Config) -> Result<(ClientTx,
let io = tls::connect(io, host.as_ref(), cfg).await?;
let (mut drv, tx) = GenericDriver::new(io);
prepare_session(&mut drv, cfg).await?;
Ok((ClientTx(tx), Driver::unix_tls(drv, cfg.clone())))
Ok((ClientTx(tx), Driver::unix_tls(drv)))
}
#[cfg(not(feature = "tls"))]
{
Expand All @@ -99,7 +99,7 @@ pub(super) async fn _connect(host: Host, cfg: &mut Config) -> Result<(ClientTx,
} else {
let (mut drv, tx) = GenericDriver::new(io);
prepare_session(&mut drv, cfg).await?;
Ok((ClientTx(tx), Driver::unix(drv, cfg.clone())))
Ok((ClientTx(tx), Driver::unix(drv)))
}
}
_ => unreachable!(),
Expand Down Expand Up @@ -131,7 +131,7 @@ where
match cfg.get_ssl_mode() {
SslMode::Disable => Ok(false),
mode => match (_should_connect_tls(io).await?, mode) {
(false, SslMode::Require) => Err(Error::ToDo),
(false, SslMode::Require) => Err(Error::todo()),
(bool, _) => Ok(bool),
},
}
Expand Down
9 changes: 6 additions & 3 deletions postgres/src/driver/raw/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use core::{
use postgres_protocol::message::backend;
use xitca_io::bytes::BytesMut;

use crate::{driver::codec::ResponseReceiver, error::Error};
use crate::{
driver::codec::ResponseReceiver,
error::{DriverDown, Error},
};

pub struct Response {
rx: ResponseReceiver,
Expand All @@ -24,13 +27,13 @@ impl Response {
pub(crate) fn recv(&mut self) -> impl Future<Output = Result<backend::Message, Error>> + '_ {
poll_fn(|cx| {
if self.buf.is_empty() {
self.buf = ready!(self.rx.poll_recv(cx)).ok_or_else(|| Error::DriverDown(BytesMut::new()))?;
self.buf = ready!(self.rx.poll_recv(cx)).ok_or_else(|| DriverDown(BytesMut::new()))?;
}

let res = match backend::Message::parse(&mut self.buf)?.expect("must not parse message from empty buffer.")
{
// TODO: error response.
backend::Message::ErrorResponse(_body) => Err(Error::ToDo),
backend::Message::ErrorResponse(_body) => Err(Error::todo()),
msg => Ok(msg),
};

Expand Down
4 changes: 2 additions & 2 deletions postgres/src/driver/raw/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ pub(super) async fn connect<Io>(io: Io, host: &str, cfg: &mut Config) -> Result<
where
Io: AsyncIo,
{
let name = ServerName::try_from(host).map_err(|_| Error::ToDo)?.to_owned();
let name = ServerName::try_from(host).map_err(|_| Error::todo())?.to_owned();
let config = dangerous_config(Vec::new());
let session = ClientConnection::new(config, name).map_err(|_| Error::ToDo)?;
let session = ClientConnection::new(config, name).map_err(|_| Error::todo())?;

let stream = TlsStream::handshake(io, session).await?;

Expand Down
Loading
Loading