-
-
Notifications
You must be signed in to change notification settings - Fork 256
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sub-transactions API #912
base: develop
Are you sure you want to change the base?
Sub-transactions API #912
Changes from 9 commits
f7455b1
a35d24e
aa8ae18
8998091
351d157
b38d9d7
df654a1
d04aea8
1cc57bc
3ed2afe
94f7cf8
cb812f4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
/* | ||
Portions Copyright 2019-2021 ZomboDB, LLC. | ||
Portions Copyright 2021-2022 Technology Concepts & Design, Inc. <[email protected]> | ||
|
||
All rights reserved. | ||
|
||
Use of this source code is governed by the MIT license that can be found in the LICENSE file. | ||
*/ | ||
|
||
#[cfg(any(test, feature = "pg_test"))] | ||
#[pgx::pg_schema] | ||
mod tests { | ||
#[allow(unused_imports)] | ||
use crate as pgx_tests; | ||
|
||
use pgx::prelude::*; | ||
use pgx::SpiClient; | ||
|
||
#[pg_test] | ||
fn test_subxact_smoketest() { | ||
Spi::execute(|c| { | ||
c.update("CREATE TABLE a (v INTEGER)", None, None); | ||
let c = c.sub_transaction(|xact| { | ||
xact.update("INSERT INTO a VALUES (0)", None, None); | ||
assert_eq!( | ||
0, | ||
xact.select("SELECT v FROM a", Some(1), None) | ||
.first() | ||
.get_datum::<i32>(1) | ||
.unwrap() | ||
); | ||
let xact = xact.sub_transaction(|xact| { | ||
xact.update("INSERT INTO a VALUES (1)", None, None); | ||
assert_eq!( | ||
2, | ||
xact.select("SELECT COUNT(*) FROM a", Some(1), None) | ||
.first() | ||
.get_datum::<i32>(1) | ||
.unwrap() | ||
); | ||
xact.rollback() | ||
}); | ||
xact.rollback() | ||
}); | ||
assert_eq!( | ||
0, | ||
c.select("SELECT COUNT(*) FROM a", Some(1), None) | ||
.first() | ||
.get_datum::<i32>(1) | ||
.unwrap() | ||
); | ||
}) | ||
} | ||
|
||
#[pg_test] | ||
fn test_commit_on_drop() { | ||
Spi::execute(|c| { | ||
c.update("CREATE TABLE a (v INTEGER)", None, None); | ||
// The type below is explicit to ensure it's commit on drop by default | ||
c.sub_transaction(|xact: SubTransaction<SpiClient, CommitOnDrop>| { | ||
xact.update("INSERT INTO a VALUES (0)", None, None); | ||
// Dropped explicitly for illustration purposes | ||
drop(xact); | ||
}); | ||
// Create a new client to check the state | ||
Spi::execute(|c| { | ||
// The above insert should have been committed | ||
assert_eq!( | ||
1, | ||
c.select("SELECT COUNT(*) FROM a", Some(1), None) | ||
.first() | ||
.get_datum::<i32>(1) | ||
.unwrap() | ||
); | ||
}); | ||
}) | ||
} | ||
|
||
#[pg_test] | ||
fn test_rollback_on_drop() { | ||
Spi::execute(|c| { | ||
c.update("CREATE TABLE a (v INTEGER)", None, None); | ||
// The type below is explicit to ensure it's commit on drop by default | ||
c.sub_transaction(|xact: SubTransaction<SpiClient, CommitOnDrop>| { | ||
xact.update("INSERT INTO a VALUES (0)", None, None); | ||
let xact = xact.rollback_on_drop(); | ||
// Dropped explicitly for illustration purposes | ||
drop(xact); | ||
}); | ||
// Create a new client to check the state | ||
Spi::execute(|c| { | ||
// The above insert should NOT have been committed | ||
assert_eq!( | ||
0, | ||
c.select("SELECT COUNT(*) FROM a", Some(1), None) | ||
.first() | ||
.get_datum::<i32>(1) | ||
.unwrap() | ||
); | ||
}); | ||
}) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,240 @@ | ||
use crate::{pg_sys, PgMemoryContexts, SpiClient}; | ||
use std::fmt::Debug; | ||
use std::ops::Deref; | ||
|
||
/// Releases a sub-transaction on Drop | ||
pub trait ReleaseOnDrop {} | ||
|
||
/// Sub-transaction's contextual information | ||
#[derive(Clone, Copy)] | ||
pub struct Context { | ||
memory_context: pg_sys::MemoryContext, | ||
// Resource ownership before the transaction | ||
// | ||
// Based on information from src/backend/utils/resowner/README | ||
// as well as practical use of it in src/pl/plpython/plpy_spi.c | ||
resource_owner: pg_sys::ResourceOwner, | ||
} | ||
|
||
impl Context { | ||
/// Captures the context | ||
fn capture() -> Self { | ||
// Remember the memory context before starting the sub-transaction | ||
let memory_context = PgMemoryContexts::CurrentMemoryContext.value(); | ||
// Remember resource owner before starting the sub-transaction | ||
let resource_owner = unsafe { pg_sys::CurrentResourceOwner }; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we get a note mentioning when this variable is set (i.e. "what is the last write we are expecting to be reading")? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please elaborate on this request? I am not 100% sure I understood what you're asking me to do. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think Jubilee means when might Postgres initially set (or later change) this and how are we certain it even has a valid value at this point in time? |
||
Self { memory_context, resource_owner } | ||
} | ||
} | ||
|
||
impl From<Context> for CommitOnDrop { | ||
fn from(context: Context) -> Self { | ||
CommitOnDrop(context) | ||
} | ||
} | ||
|
||
impl From<Context> for RollbackOnDrop { | ||
fn from(context: Context) -> Self { | ||
RollbackOnDrop(context) | ||
} | ||
} | ||
|
||
/// Commits a sub-transaction on Drop | ||
pub struct CommitOnDrop(Context); | ||
|
||
impl Drop for CommitOnDrop { | ||
fn drop(&mut self) { | ||
unsafe { | ||
pg_sys::ReleaseCurrentSubTransaction(); | ||
pg_sys::CurrentResourceOwner = self.0.resource_owner; | ||
} | ||
PgMemoryContexts::For(self.0.memory_context).set_as_current(); | ||
} | ||
} | ||
|
||
impl ReleaseOnDrop for CommitOnDrop {} | ||
|
||
/// Rolls back a sub-transaction on Drop | ||
pub struct RollbackOnDrop(Context); | ||
|
||
impl Drop for RollbackOnDrop { | ||
fn drop(&mut self) { | ||
unsafe { | ||
pg_sys::RollbackAndReleaseCurrentSubTransaction(); | ||
pg_sys::CurrentResourceOwner = self.0.resource_owner; | ||
} | ||
PgMemoryContexts::For(self.0.memory_context).set_as_current(); | ||
} | ||
} | ||
|
||
impl ReleaseOnDrop for RollbackOnDrop {} | ||
|
||
impl Into<RollbackOnDrop> for CommitOnDrop { | ||
fn into(self) -> RollbackOnDrop { | ||
let result = RollbackOnDrop(self.0); | ||
// IMPORTANT: avoid running Drop (that would commit) | ||
std::mem::forget(self); | ||
result | ||
} | ||
} | ||
|
||
impl Into<CommitOnDrop> for RollbackOnDrop { | ||
fn into(self) -> CommitOnDrop { | ||
let result = CommitOnDrop(self.0); | ||
// IMPORTANT: avoid running Drop (that would roll back) | ||
std::mem::forget(self); | ||
result | ||
} | ||
} | ||
|
||
struct NoOpOnDrop; | ||
|
||
impl ReleaseOnDrop for NoOpOnDrop {} | ||
|
||
/// Sub-transaction | ||
/// | ||
/// Can be created by calling `SpiClient::sub_transaction`, `SubTransaction<Parent>::sub_transaction` | ||
/// or any other implementation of `SubTransactionExt` and obtaining it as an argument to the provided closure. | ||
/// | ||
/// Unless rolled back or committed explicitly, it'll commit if `Release` generic parameter is `CommitOnDrop` | ||
/// (default) or roll back if it is `RollbackOnDrop`. | ||
#[derive(Debug)] | ||
pub struct SubTransaction<Parent: SubTransactionExt, Release: ReleaseOnDrop = CommitOnDrop> { | ||
// Transaction release mechanism (commit, drop) | ||
release: Release, | ||
// Transaction parent | ||
parent: Parent, | ||
} | ||
|
||
impl<Parent: SubTransactionExt, Release: ReleaseOnDrop> SubTransaction<Parent, Release> | ||
where | ||
Release: From<Context>, | ||
{ | ||
/// Create a new sub-transaction. | ||
fn new(parent: Parent) -> Self { | ||
let context = Context::capture(); | ||
let memory_context = context.memory_context; | ||
let release = context.into(); | ||
unsafe { | ||
pg_sys::BeginInternalSubTransaction(std::ptr::null() /* [no] transaction name */); | ||
} | ||
// Switch to the outer memory context so that all allocations remain | ||
// there instead of the sub-transaction's context | ||
PgMemoryContexts::For(memory_context).set_as_current(); | ||
Self { release, parent } | ||
} | ||
} | ||
|
||
impl<Parent: SubTransactionExt> SubTransaction<Parent, CommitOnDrop> { | ||
/// Commit the transaction, returning its parent | ||
pub fn commit(self) -> Parent { | ||
// `Self::do_nothing_on_drop()` will commit as `Release` is `CommitOnDrop` | ||
self.do_nothing_on_drop().parent | ||
} | ||
} | ||
|
||
impl<Parent: SubTransactionExt> SubTransaction<Parent, RollbackOnDrop> { | ||
/// Commit the transaction, returning its parent | ||
pub fn commit(self) -> Parent { | ||
// Make sub-transaction commit on drop and then use `commit` | ||
self.commit_on_drop().commit() | ||
} | ||
} | ||
|
||
impl<Parent: SubTransactionExt> SubTransaction<Parent, RollbackOnDrop> { | ||
/// Rollback the transaction, returning its parent | ||
pub fn rollback(self) -> Parent { | ||
// `Self::do_nothing_on_drop()` will roll back as `Release` is `RollbackOnDrop` | ||
self.do_nothing_on_drop().parent | ||
} | ||
} | ||
|
||
impl<Parent: SubTransactionExt> SubTransaction<Parent, CommitOnDrop> { | ||
/// Rollback the transaction, returning its parent | ||
pub fn rollback(self) -> Parent { | ||
// Make sub-transaction roll back on drop and then use `rollback` | ||
self.rollback_on_drop().rollback() | ||
} | ||
} | ||
|
||
impl<Parent: SubTransactionExt> SubTransaction<Parent, CommitOnDrop> { | ||
/// Make this sub-transaction roll back on drop | ||
pub fn rollback_on_drop(self) -> SubTransaction<Parent, RollbackOnDrop> { | ||
SubTransaction { parent: self.parent, release: self.release.into() } | ||
} | ||
} | ||
|
||
impl<Parent: SubTransactionExt> SubTransaction<Parent, RollbackOnDrop> { | ||
/// Make this sub-transaction commit on drop | ||
pub fn commit_on_drop(self) -> SubTransaction<Parent, CommitOnDrop> { | ||
SubTransaction { parent: self.parent, release: self.release.into() } | ||
} | ||
} | ||
|
||
impl<Parent: SubTransactionExt, Release: ReleaseOnDrop> SubTransaction<Parent, Release> { | ||
/// Make this sub-transaction do nothing on drop | ||
/// | ||
/// Releases the sub-transaction based on `Release` generic parameter. Further | ||
/// dropping of the sub-transaction will not do anything. | ||
fn do_nothing_on_drop(self) -> SubTransaction<Parent, NoOpOnDrop> { | ||
SubTransaction { parent: self.parent, release: NoOpOnDrop } | ||
} | ||
} | ||
|
||
// This allows SubTransaction to be de-referenced to SpiClient | ||
impl<'conn, Release: ReleaseOnDrop> Deref for SubTransaction<SpiClient<'conn>, Release> { | ||
type Target = SpiClient<'conn>; | ||
|
||
fn deref(&self) -> &Self::Target { | ||
&self.parent | ||
} | ||
} | ||
|
||
// This allows a SubTransaction of a SubTransaction to be de-referenced to SpiClient | ||
impl<Parent: SubTransactionExt, Release: ReleaseOnDrop> Deref | ||
for SubTransaction<SubTransaction<Parent>, Release> | ||
{ | ||
type Target = Parent; | ||
|
||
fn deref(&self) -> &Self::Target { | ||
&self.parent.parent | ||
} | ||
} | ||
|
||
/// Trait that allows creating a sub-transaction off any type | ||
pub trait SubTransactionExt { | ||
/// Parent's type | ||
/// | ||
/// In most common cases, it'll be equal to `Self`. However, in some cases | ||
/// it may be desirable to use a different type to achieve certain goals. | ||
type Parent: SubTransactionExt; | ||
|
||
/// Consume `self` and execute a closure with a sub-transaction | ||
/// | ||
/// If further use of the given sub-transaction is necessary, it must | ||
/// be returned by the closure alongside with its intended result. Otherwise, | ||
/// the sub-transaction be released when dropped. | ||
fn sub_transaction<F: FnOnce(SubTransaction<Self::Parent>) -> R, R>(self, f: F) -> R | ||
where | ||
Self: Sized; | ||
} | ||
|
||
impl<'a> SubTransactionExt for SpiClient<'a> { | ||
type Parent = Self; | ||
fn sub_transaction<F: FnOnce(SubTransaction<Self::Parent>) -> R, R>(self, f: F) -> R | ||
where | ||
Self: Sized, | ||
{ | ||
f(SubTransaction::new(self)) | ||
} | ||
} | ||
|
||
impl<Parent: SubTransactionExt> SubTransactionExt for SubTransaction<Parent> { | ||
type Parent = Self; | ||
fn sub_transaction<F: FnOnce(SubTransaction<Self::Parent>) -> R, R>(self, f: F) -> R | ||
where | ||
Self: Sized, | ||
{ | ||
f(SubTransaction::new(self)) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably better to limit this to just the module:
pub use crate::subxact;
.I think we're trying to be a little better about globbing everything into the current namespace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure this is the best. At the very least, I think we should consider
pub use crate::subxact::{self, SubTransactionExt}
for the sub-transaction functionality to be easily available.