Skip to content

Commit

Permalink
export transaction isolation level.
Browse files Browse the repository at this point in the history
  • Loading branch information
fakeshadow committed Feb 7, 2025
1 parent 103d324 commit 9ad2acf
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 13 deletions.
5 changes: 4 additions & 1 deletion postgres/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
# unreleased 0.3.0
## Add
- export `transaction::builder::IsolationLevel` for building transaction with specific level of isolation

## Remove
- remove `ExecuteMut` trait. It's role is replaced by `impl Execute<&mut C>`
- remove `error::AuthenticationError` type. It's error condition is covered by `error::ConfigError`

## Change
- change `pool::Pool`'s dead connection detection lifecycle.
- change `pool::Pool`'s dead connection detection lifecycle

# 0.2.1
## Fix
Expand Down
32 changes: 21 additions & 11 deletions postgres/src/driver/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use futures_core::task::__internal::AtomicWaker;
use postgres_protocol::message::{backend, frontend};
use xitca_io::{
bytes::{Buf, BufRead, BytesMut},
io::{AsyncIo, Interest},
io::{AsyncIo, Interest, Ready},
};
use xitca_unsafe_collection::futures::{Select as _, SelectOutput};

Expand Down Expand Up @@ -194,51 +194,61 @@ where
return Ok(Some(msg));
}

let ready = match (&mut self.read_state, &mut self.write_state) {
enum InterestOrReady {
Interest(Interest),
Ready(Ready),
}

let state = match (&mut self.read_state, &mut self.write_state) {
(ReadState::WantRead, WriteState::Waiting) => {
match self.shared_state.wait().select(self.io.ready(Interest::READABLE)).await {
SelectOutput::A(WaitState::WantWrite) => {
self.write_state = WriteState::WantWrite;
self.io.ready(INTEREST_READ_WRITE).await?
InterestOrReady::Interest(INTEREST_READ_WRITE)
}
SelectOutput::A(WaitState::WantClose) => {
self.write_state = WriteState::Closed(None);
continue;
}
SelectOutput::B(ready) => ready?,
SelectOutput::B(ready) => InterestOrReady::Ready(ready?),
}
}
(ReadState::WantRead, WriteState::WantWrite) => self.io.ready(INTEREST_READ_WRITE).await?,
(ReadState::WantRead, WriteState::WantWrite) => InterestOrReady::Interest(INTEREST_READ_WRITE),
(ReadState::WantRead, WriteState::WantFlush) => {
// before flush io do a quick buffer check and go into write io state if possible.
if !self.shared_state.guarded.lock().unwrap().buf.is_empty() {
self.write_state = WriteState::WantWrite;
}
self.io.ready(INTEREST_READ_WRITE).await?
InterestOrReady::Interest(INTEREST_READ_WRITE)
}
(ReadState::WantRead, WriteState::Closed(_)) => self.io.ready(Interest::READABLE).await?,
(ReadState::WantRead, WriteState::Closed(_)) => InterestOrReady::Interest(Interest::READABLE),
(ReadState::Closed(_), WriteState::WantFlush | WriteState::WantWrite) => {
self.io.ready(Interest::WRITABLE).await?
InterestOrReady::Interest(Interest::WRITABLE)
}
(ReadState::Closed(_), WriteState::Waiting) => match self.shared_state.wait().await {
WaitState::WantWrite => {
self.write_state = WriteState::WantWrite;
self.io.ready(Interest::WRITABLE).await?
InterestOrReady::Interest(Interest::WRITABLE)
}
WaitState::WantClose => {
self.write_state = WriteState::Closed(None);
continue;
}
},
(ReadState::Closed(None), WriteState::Closed(None)) => {
poll_fn(|cx| Pin::new(&mut self.io).poll_shutdown(cx)).await?;
Box::pin(poll_fn(|cx| Pin::new(&mut self.io).poll_shutdown(cx))).await?;
return Ok(None);
}
(ReadState::Closed(read_err), WriteState::Closed(write_err)) => {
return Err(Error::driver_io(read_err.take(), write_err.take()))
return Err(Error::driver_io(read_err.take(), write_err.take()));
}
};

let ready = match state {
InterestOrReady::Ready(ready) => ready,
InterestOrReady::Interest(interest) => self.io.ready(interest).await?,
};

if ready.is_readable() {
if let Err(e) = self.try_read() {
self.on_read_err(e);
Expand Down
2 changes: 1 addition & 1 deletion postgres/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use super::{
BoxedFuture,
};

pub use builder::TransactionBuilder;
pub use builder::{IsolationLevel, TransactionBuilder};
pub use portal::Portal;

pub struct Transaction<'a, C>
Expand Down

0 comments on commit 9ad2acf

Please sign in to comment.