Skip to content

RUST-813 Add documentation examples for transactions #349

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/client/options/resolver_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ impl ResolverConfig {
/// Creates a default configuration, using 1.1.1.1, 1.0.0.1 and 2606:4700:4700::1111,
/// 2606:4700:4700::1001 (thank you, Cloudflare).
///
/// Please see: https://www.cloudflare.com/dns/
/// Please see: <https://www.cloudflare.com/dns/>
pub fn cloudflare() -> Self {
ResolverConfig {
inner: TrustDnsResolverConfig::cloudflare(),
Expand All @@ -34,7 +34,7 @@ impl ResolverConfig {
/// Creates a configuration, using 9.9.9.9, 149.112.112.112 and 2620:fe::fe, 2620:fe::fe:9, the
/// “secure” variants of the quad9 settings (thank you, Quad9).
///
/// Please see: https://www.quad9.net/faq/
/// Please see: <https://www.quad9.net/faq/>
pub fn quad9() -> Self {
ResolverConfig {
inner: TrustDnsResolverConfig::quad9(),
Expand Down
76 changes: 74 additions & 2 deletions src/client/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,67 @@ lazy_static! {
///
/// `ClientSession` instances are not thread safe or fork safe. They can only be used by one thread
/// or process at a time.
///
/// ## Transactions
/// Transactions are used to execute a series of operations across multiple documents and
/// collections atomically. For more information about when and how to use transactions in MongoDB,
/// see the [manual](https://docs.mongodb.com/manual/core/transactions/).
///
/// Replica set transactions are supported on MongoDB 4.0+. Transactions are associated with a
/// `ClientSession`. To begin a transaction, call [`ClientSession::start_transaction`] on a
/// `ClientSession`. The `ClientSession` must be passed to operations to be executed within the
/// transaction.
///
/// ```rust
/// use mongodb::{
/// bson::doc,
/// error::{Result, TRANSIENT_TRANSACTION_ERROR, UNKNOWN_TRANSACTION_COMMIT_RESULT},
/// options::{Acknowledgment, ReadConcern, TransactionOptions, WriteConcern},
/// # Client,
/// ClientSession,
/// Collection,
/// };
///
/// # async fn do_stuff() -> Result<()> {
/// # let client = Client::with_uri_str("mongodb://example.com").await?;
/// # let coll = client.database("foo").collection("bar");
/// let mut session = client.start_session(None).await?;
/// let options = TransactionOptions::builder()
/// .read_concern(ReadConcern::majority())
/// .write_concern(WriteConcern::builder().w(Acknowledgment::Majority).build())
/// .build();
/// session.start_transaction(options).await?;
/// // A "TransientTransactionError" label indicates that the entire transaction can be retried
/// // with a reasonable expectation that it will succeed.
/// while let Err(error) = execute_transaction(&coll, &mut session).await {
/// if !error.contains_label(TRANSIENT_TRANSACTION_ERROR) {
/// break;
/// }
/// }
/// # Ok(())
/// # }
///
/// async fn execute_transaction(coll: &Collection, session: &mut ClientSession) -> Result<()> {
/// coll.insert_one_with_session(doc! { "x": 1 }, None, session).await?;
/// coll.delete_one_with_session(doc! { "y": 2 }, None, session).await?;
/// // An "UnknownTransactionCommitResult" label indicates that it is unknown whether the
/// // commit has satisfied the write concern associated with the transaction. If an error
/// // with this label is returned, it is safe to retry the commit until the write concern is
/// // satisfied or an error without the label is returned.
/// loop {
/// let result = session.commit_transaction().await;
/// if let Err(ref error) = result {
/// if error.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT) {
/// continue;
/// }
/// }
/// result?
/// }
/// }
/// ```
// TODO RUST-122 Remove this note and adjust the above description to indicate that sharded
// transactions are supported on 4.2+
/// Note: the driver does not currently support transactions on sharded clusters.
#[derive(Clone, Debug)]
pub struct ClientSession {
cluster_time: Option<ClusterTime>,
Expand Down Expand Up @@ -193,6 +254,10 @@ impl ClientSession {
/// be passed into each operation within the transaction; otherwise, the operation will be
/// executed outside of the transaction.
///
/// Errors returned from operations executed within a transaction may include a
/// [`crate::error::TRANSIENT_TRANSACTION_ERROR`] label. This label indicates that the entire
/// transaction can be retried with a reasonable expectation that it will succeed.
///
/// Transactions are supported on MongoDB 4.0+. The Rust driver currently only supports
/// transactions on replica sets.
///
Expand Down Expand Up @@ -276,6 +341,13 @@ impl ClientSession {

/// Commits the transaction that is currently active on this session.
///
///
/// This method may return an error with a [`crate::error::UNKNOWN_TRANSACTION_COMMIT_RESULT`]
/// label. This label indicates that it is unknown whether the commit has satisfied the write
/// concern associated with the transaction. If an error with this label is returned, it is
/// safe to retry the commit until the write concern is satisfied or an error without the label
/// is returned.
///
/// ```rust
/// # use mongodb::{bson::{doc, Document}, error::Result, Client, ClientSession};
/// #
Expand Down Expand Up @@ -344,14 +416,14 @@ impl ClientSession {
/// # let coll = client.database("foo").collection::<Document>("bar");
/// # let mut session = client.start_session(None).await?;
/// session.start_transaction(None).await?;
/// match execute_transaction(coll, &mut session).await {
/// match execute_transaction(&coll, &mut session).await {
/// Ok(_) => session.commit_transaction().await?,
/// Err(_) => session.abort_transaction().await?,
/// }
/// # Ok(())
/// # }
///
/// async fn execute_transaction(coll: Collection, session: &mut ClientSession) -> Result<()> {
/// async fn execute_transaction(coll: &Collection, session: &mut ClientSession) -> Result<()> {
/// coll.insert_one_with_session(doc! { "x": 1 }, None, session).await?;
/// coll.delete_one_with_session(doc! { "y": 2 }, None, session).await?;
/// Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl AsyncRuntime {
}

/// Create a new `Interval` that yields with interval of `duration`.
/// See: https://docs.rs/tokio/latest/tokio/time/fn.interval.html
/// See: <https://docs.rs/tokio/latest/tokio/time/fn.interval.html>
pub(crate) fn interval(self, duration: Duration) -> Interval {
match self {
#[cfg(feature = "tokio-runtime")]
Expand Down
2 changes: 1 addition & 1 deletion src/test/spec/initial_dns_seedlist_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async fn run() {

// "encoded-userinfo-and-db.json" specifies a database name with a question mark which is
// disallowed on Windows. See
// https://docs.mongodb.com/manual/reference/limits/#restrictions-on-db-names
// <https://docs.mongodb.com/manual/reference/limits/#restrictions-on-db-names>
if let Some(ref mut options) = test_file.parsed_options {
if options.db.as_deref() == Some("mydb?") && cfg!(target_os = "windows") {
options.db = Some("mydb".to_string());
Expand Down
2 changes: 1 addition & 1 deletion src/test/util/failpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl FailPoint {
}

/// Create a failCommand failpoint.
/// See https://github.com/mongodb/mongo/wiki/The-%22failCommand%22-fail-point for more info.
/// See <https://github.com/mongodb/mongo/wiki/The-%22failCommand%22-fail-point> for more info.
pub fn fail_command(
fail_commands: &[&str],
mode: FailPointMode,
Expand Down
74 changes: 74 additions & 0 deletions tests/transaction_examples.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#![allow(dead_code)]
#![cfg(not(feature = "sync"))]

// START TRANSACTIONS EXAMPLE
use mongodb::{
bson::{doc, Document},
error::{Result, TRANSIENT_TRANSACTION_ERROR, UNKNOWN_TRANSACTION_COMMIT_RESULT},
options::{Acknowledgment, ReadConcern, TransactionOptions, WriteConcern},
ClientSession,
};

async fn update_employee_info(session: &mut ClientSession) -> Result<()> {
let transaction_options = TransactionOptions::builder()
.read_concern(ReadConcern::snapshot())
.write_concern(WriteConcern::builder().w(Acknowledgment::Majority).build())
.build();
session.start_transaction(transaction_options).await?;

execute_transaction_with_retry(session).await
}

async fn execute_transaction_with_retry(session: &mut ClientSession) -> Result<()> {
while let Err(err) = execute_employee_info_transaction(session).await {
println!("Transaction aborted. Error returned during transaction.");
if err.contains_label(TRANSIENT_TRANSACTION_ERROR) {
println!("Encountered TransientTransactionError, retrying transaction.");
continue;
} else {
return Err(err);
}
}
Ok(())
}

async fn execute_employee_info_transaction(session: &mut ClientSession) -> Result<()> {
let client = session.client();
let employees = client.database("hr").collection::<Document>("employees");
let events = client
.database("reporting")
.collection::<Document>("events");

employees
.update_one_with_session(
doc! { "employee": 3 },
doc! { "$set": { "status": "Inactive" } },
None,
session,
)
.await?;
events
.insert_one_with_session(
doc! { "employee": 3, "status": { "new": "Inactive", "old": "Active" } },
None,
session,
)
.await?;

commit_with_retry(session).await
}

async fn commit_with_retry(session: &mut ClientSession) -> Result<()> {
while let Err(err) = session.commit_transaction().await {
if err.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT) {
println!("Encountered UnknownTransactionCommitResult, retrying commit operation.");
continue;
} else {
println!("Encountered non-retryable error during commit.");
return Err(err);
}
}
println!("Transaction committed.");
Ok(())
}
// END TRANSACTIONS EXAMPLE