Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize shared connection lock. #977

Merged
merged 1 commit into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions postgres/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,20 @@ fn _assert_pipe_send() {
crate::_assert_send2::<Pipeline<'_>>();
}

impl Default for Pipeline<'_, true> {
fn default() -> Self {
Self::new()
}
}

impl Pipeline<'_, true> {
/// start a new pipeline.
///
/// pipeline is sync by default. which means every query inside is considered separate binding
/// and the pipeline is transparent to database server. the pipeline only happen on socket
/// transport where minimal amount of syscall is needed.
///
/// for more relaxed [Pipeline Mode][libpq_link] see [Client::pipeline_unsync] api.
/// for more relaxed [Pipeline Mode][libpq_link] see [Pipeline::unsync] api.
///
/// [libpq_link]: https://www.postgresql.org/docs/current/libpq-pipeline-mode.html
pub fn new() -> Self {
Expand All @@ -79,7 +85,7 @@ impl Pipeline<'_, false> {
/// in un-sync mode pipeline treat all queries inside as one single binding and database server
/// can see them as no sync point in between which can result in potential performance gain.
///
/// it behaves the same on transportation level as [Client::pipeline] where minimal amount
/// it behaves the same on transportation level as [Pipeline::new] where minimal amount
/// of socket syscall is needed.
#[inline]
pub fn unsync() -> Self {
Expand Down Expand Up @@ -142,7 +148,14 @@ impl Client {
frontend::sync(&mut pipe.buf);
}

self.pipeline_buf(pipe.sync_count, pipe.buf, pipe.columns).await
self.tx
.send_multi(pipe.sync_count, pipe.buf)
.await
.map(|res| PipelineStream {
res,
columns: pipe.columns,
ranges: Vec::new(),
})
}

pub(crate) async fn _pipeline<'a, const SYNC_MODE: bool>(
Expand All @@ -159,19 +172,6 @@ impl Client {

self.tx.send_multi(*sync_count, buf).await
}

pub(crate) async fn pipeline_buf<'a>(
&self,
sync_count: usize,
buf: BytesMut,
columns: VecDeque<&'a [Column]>,
) -> Result<PipelineStream<'a>, Error> {
self.tx.send_multi(sync_count, buf).await.map(|res| PipelineStream {
res,
columns: columns,
ranges: Vec::new(),
})
}
}

/// streaming response of pipeline.
Expand Down
109 changes: 76 additions & 33 deletions postgres/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ impl Spawner {
}
}
}

#[cold]
#[inline(never)]
async fn wait_for_spawn(&self) {
let notify = self.notify.lock().clone();
if let Some(notify) = notify {
notify.notified().await;
}
}
}

impl SharedClient {
Expand Down Expand Up @@ -81,7 +90,7 @@ impl SharedClient {
I::IntoIter: ExactSizeIterator,
I::Item: BorrowToSql,
{
let cli = self.inner.read().await;
let cli = self.read().await;
match cli.query_raw(stmt, params).await {
Err(Error::DriverDown(msg)) => {
drop(cli);
Expand All @@ -96,7 +105,7 @@ impl SharedClient {
}

pub async fn query_simple(&self, stmt: &str) -> Result<RowSimpleStream, Error> {
let cli = self.inner.read().await;
let cli = self.read().await;
match cli.query_simple(stmt).await {
Err(Error::DriverDown(msg)) => {
drop(cli);
Expand All @@ -116,7 +125,7 @@ impl SharedClient {
types: &[Type],
) -> Result<StatementGuarded<RwLockReadGuard<'_, Client>>, Error> {
loop {
let cli = self.inner.read().await;
let cli = self.read().await;
match cli._prepare(query, types).await {
Ok(stmt) => return Ok(stmt.into_guarded(cli)),
Err(Error::DriverDown(_)) => {
Expand All @@ -141,34 +150,6 @@ impl SharedClient {
Ok(stmt)
}

#[cfg(not(feature = "quic"))]
pub async fn pipeline<'a, const SYNC_MODE: bool>(
&self,
mut pipe: crate::pipeline::Pipeline<'a, SYNC_MODE>,
) -> Result<crate::pipeline::PipelineStream<'a>, Error> {
let cli = self.inner.read().await;
match cli._pipeline::<SYNC_MODE>(&mut pipe.sync_count, pipe.buf).await {
Ok(res) => Ok(crate::pipeline::PipelineStream {
res,
columns: pipe.columns,
ranges: Vec::new(),
}),
Err(Error::DriverDown(buf)) => {
drop(cli);
Box::pin(async move {
self.reconnect().await;
self.inner
.read()
.await
.pipeline_buf(pipe.sync_count, buf, pipe.columns)
.await
})
.await
}
Err(e) => Err(e),
}
}

#[cold]
#[inline(never)]
async fn reconnect(&self) {
Expand All @@ -187,10 +168,10 @@ impl SharedClient {
}
}

let mut cli = self.inner.write().await;

let _guard = SpawnGuard(&self.persist.spawner);

let mut cli = self.inner.write().await;

let (cli_new, drv) = {
loop {
match connect(&mut self.persist.config.clone()).await {
Expand All @@ -207,6 +188,68 @@ impl SharedClient {
}

*cli = cli_new;

// release rwlock before spawn guard. when waiters are notified it's important that the lock
// is free for read lock.
drop(cli);
drop(_guard);
}
}
}

async fn read(&self) -> RwLockReadGuard<'_, Client> {
loop {
match self.inner.try_read() {
Ok(cli) => return cli,
// failing to acquire read lock means certain task is spawning new connection.
// if there is no notify existing in spawner it means the spawn process has finished(or cancelled).
// in that case just try read lock again.
Err(_) => self.persist.spawner.wait_for_spawn().await,
}
}
}
}

#[cfg(not(feature = "quic"))]
impl SharedClient {
pub async fn pipeline<'a, const SYNC_MODE: bool>(
&self,
mut pipe: crate::pipeline::Pipeline<'a, SYNC_MODE>,
) -> Result<crate::pipeline::PipelineStream<'a>, Error> {
let cli = self.read().await;
match cli._pipeline::<SYNC_MODE>(&mut pipe.sync_count, pipe.buf).await {
Ok(res) => Ok(crate::pipeline::PipelineStream {
res,
columns: pipe.columns,
ranges: Vec::new(),
}),
Err(Error::DriverDown(buf)) => {
drop(cli);
pipe.buf = buf;
Box::pin(self.pipeline_slow::<SYNC_MODE>(pipe)).await
}
Err(e) => Err(e),
}
}

async fn pipeline_slow<'a, const SYNC_MODE: bool>(
&self,
mut pipe: crate::pipeline::Pipeline<'a, SYNC_MODE>,
) -> Result<crate::pipeline::PipelineStream<'a>, Error> {
loop {
self.reconnect().await;
match self.read().await.tx.send_multi(pipe.sync_count, pipe.buf).await {
Ok(res) => {
return Ok(crate::pipeline::PipelineStream {
res,
columns: pipe.columns,
ranges: Vec::new(),
})
}
Err(Error::DriverDown(buf)) => {
pipe.buf = buf;
}
Err(e) => return Err(e),
}
}
}
Expand Down
Loading