Skip to content

Commit

Permalink
Add application defined sequential consistency for certification
Browse files Browse the repository at this point in the history
Client state methods before_prepare() and before_commit() accept
a callback which is called by the provider after it can guarantee
sequential consistency. This allows application threads which wish to
maintain sequential consistency to enter before_prepare() and
before_commit() calls concurrently without waiting the prior call
to finish.
  • Loading branch information
temeo committed Nov 28, 2024
1 parent 1c61b80 commit 3de594b
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 31 deletions.
42 changes: 40 additions & 2 deletions include/wsrep/client_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,11 +413,49 @@ namespace wsrep

/** @name Commit ordering interface */
/** @{ */
int before_prepare();

/**
* This method should be called before the transaction
* is prepared. This call certifies the transaction and
* assigns write set meta data.
*
* @param seq_cb Callback which is passed to underlying
* certify() call. See wsrep::provider::certify().
*
* @return Zero on success, non-zero on failure.
*/
int before_prepare(const wsrep::provider::seq_cb_t* seq_cb);

/** Same as before_prepare() above, but nullptr is passed
* to seq_cb. */
int before_prepare()
{
return before_prepare(nullptr);
}

int after_prepare();

int before_commit();
/**
* This method should be called before transaction is committed.
* This call makes the transaction to enter commit time
* critical section. The critical section is left by calling
* ordered_commit().
*
* If before_prepare() is not called before this call, the
* before_prepare() is called internally.
*
* @param seq_cb Callback which is passed to underlying
* before_prepare() call.
*
* @return Zero on success, non-zero on failure.
*/
int before_commit(const wsrep::provider::seq_cb_t* seq_cb);

/** Same as before_commit(), but nullptr is passed to seq_cb. */
int before_commit()
{
return before_commit(nullptr);
}

int ordered_commit();

Expand Down
33 changes: 30 additions & 3 deletions include/wsrep/provider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,37 @@ namespace wsrep
virtual int append_key(wsrep::ws_handle&, const wsrep::key&) = 0;
virtual enum status append_data(
wsrep::ws_handle&, const wsrep::const_buffer&) = 0;

/**
* Callback for application defined sequential consistency.
* The provider will call
* the callback once it can guarantee sequential consistency. */
typedef struct seq_cb {
/** Opaque caller context */
void *ctx;
/** Function to be called by the provider when sequential
* consistency is guaranteed. */
void (*fn)(void *ctx);
} seq_cb_t;

/**
* Certify the write set.
*
* @param client_id[in] Id of the client session.
* @param ws_handle[in,out] Write set handle associated to the current
* transaction.
* @param flags[in] Flags associated to the write set (see struct flag).
* @param ws_meta[out] Write set meta data associated to the
* replicated write set.
* @param seq_cb[in] Optional callback for application defined
* sequential consistency.
*
* @return Status code defined in struct status.
*/
virtual enum status
certify(wsrep::client_id, wsrep::ws_handle&,
int,
wsrep::ws_meta&) = 0;
certify(wsrep::client_id client_id, wsrep::ws_handle& ws_handle,
int flags, wsrep::ws_meta& ws_meta, const seq_cb_t* seq_cb)
= 0;
/**
* BF abort a transaction inside provider.
*
Expand Down
8 changes: 5 additions & 3 deletions include/wsrep/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,12 @@ namespace wsrep

int after_row();

int before_prepare(wsrep::unique_lock<wsrep::mutex>&);
int before_prepare(wsrep::unique_lock<wsrep::mutex>&,
const wsrep::provider::seq_cb_t*);

int after_prepare(wsrep::unique_lock<wsrep::mutex>&);

int before_commit();
int before_commit(const wsrep::provider::seq_cb_t*);

int ordered_commit();

Expand Down Expand Up @@ -248,7 +249,8 @@ namespace wsrep
bool abort_or_interrupt(wsrep::unique_lock<wsrep::mutex>&);
int streaming_step(wsrep::unique_lock<wsrep::mutex>&, bool force = false);
int certify_fragment(wsrep::unique_lock<wsrep::mutex>&);
int certify_commit(wsrep::unique_lock<wsrep::mutex>&);
int certify_commit(wsrep::unique_lock<wsrep::mutex>&,
const wsrep::provider::seq_cb_t*);
int append_sr_keys_for_commit();
int release_commit_order(wsrep::unique_lock<wsrep::mutex>&);
void remove_fragments_in_storage_service_scope(
Expand Down
8 changes: 4 additions & 4 deletions src/client_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,12 @@ int wsrep::client_state::next_fragment(const wsrep::ws_meta& meta)
return transaction_.next_fragment(meta);
}

int wsrep::client_state::before_prepare()
int wsrep::client_state::before_prepare(const wsrep::provider::seq_cb_t* seq_cb)
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(owning_thread_id_ == wsrep::this_thread::get_id());
assert(state_ == s_exec);
return transaction_.before_prepare(lock);
return transaction_.before_prepare(lock, seq_cb);
}

int wsrep::client_state::after_prepare()
Expand All @@ -381,11 +381,11 @@ int wsrep::client_state::after_prepare()
return transaction_.after_prepare(lock);
}

int wsrep::client_state::before_commit()
int wsrep::client_state::before_commit(const wsrep::provider::seq_cb_t* seq_cb)
{
assert(owning_thread_id_ == wsrep::this_thread::get_id());
assert(state_ == s_exec || mode_ == m_local);
return transaction_.before_commit();
return transaction_.before_commit(seq_cb);
}

int wsrep::client_state::ordered_commit()
Expand Down
22 changes: 11 additions & 11 deletions src/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ int wsrep::transaction::after_row()
return ret;
}

int wsrep::transaction::before_prepare(
wsrep::unique_lock<wsrep::mutex>& lock)
int wsrep::transaction::before_prepare(wsrep::unique_lock<wsrep::mutex>& lock,
const wsrep::provider::seq_cb_t* seq_cb)
{
assert(lock.owns_lock());
int ret(0);
Expand Down Expand Up @@ -349,7 +349,7 @@ int wsrep::transaction::before_prepare(
}
else
{
ret = certify_commit(lock);
ret = certify_commit(lock, seq_cb);
}

assert((ret == 0 && state() == s_preparing) ||
Expand Down Expand Up @@ -445,7 +445,7 @@ int wsrep::transaction::after_prepare(
return ret;
}

int wsrep::transaction::before_commit()
int wsrep::transaction::before_commit(const wsrep::provider::seq_cb* seq_cb)
{
int ret(1);

Expand All @@ -465,7 +465,7 @@ int wsrep::transaction::before_commit()
case wsrep::client_state::m_local:
if (state() == s_executing)
{
ret = before_prepare(lock) || after_prepare(lock);
ret = before_prepare(lock, seq_cb) || after_prepare(lock);
assert((ret == 0 &&
(state() == s_committing || state() == s_prepared))
||
Expand Down Expand Up @@ -495,7 +495,7 @@ int wsrep::transaction::before_commit()

if (ret == 0 && state() == s_prepared)
{
ret = certify_commit(lock);
ret = certify_commit(lock, nullptr);
assert((ret == 0 && state() == s_committing) ||
(state() == s_must_abort ||
state() == s_must_replay ||
Expand Down Expand Up @@ -543,7 +543,7 @@ int wsrep::transaction::before_commit()
}
else if (state() == s_executing || state() == s_replaying)
{
ret = before_prepare(lock) || after_prepare(lock);
ret = before_prepare(lock, nullptr) || after_prepare(lock);
}
else
{
Expand Down Expand Up @@ -1195,7 +1195,7 @@ int wsrep::transaction::commit_or_rollback_by_xid(const wsrep::xid& xid,
provider().certify(client_state_.id(),
ws_handle_,
flags(),
meta));
meta, nullptr));

int ret;
if (cert_ret == wsrep::provider::success)
Expand Down Expand Up @@ -1622,7 +1622,7 @@ int wsrep::transaction::certify_fragment(
cert_ret = provider().certify(client_state_.id(),
ws_handle_,
flags(),
sr_ws_meta);
sr_ws_meta, nullptr);
client_service_.debug_crash(
"crash_replicate_fragment_after_certify");

Expand Down Expand Up @@ -1744,7 +1744,7 @@ int wsrep::transaction::certify_fragment(
}

int wsrep::transaction::certify_commit(
wsrep::unique_lock<wsrep::mutex>& lock)
wsrep::unique_lock<wsrep::mutex>& lock, const provider::seq_cb_t* seq_cb)
{
assert(lock.owns_lock());
assert(active());
Expand Down Expand Up @@ -1828,7 +1828,7 @@ int wsrep::transaction::certify_commit(
cert_ret(provider().certify(client_state_.id(),
ws_handle_,
flags(),
ws_meta_));
ws_meta_, seq_cb));
client_service_.debug_sync("wsrep_after_certification");

lock.lock();
Expand Down
24 changes: 19 additions & 5 deletions src/wsrep_provider_v26.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,7 @@ namespace
}

wsrep_node_isolation_mode_set_fn_v1 node_isolation_mode_set;
wsrep_certify_fn_v1 certify_v1;
}


Expand Down Expand Up @@ -721,6 +722,9 @@ void wsrep::wsrep_provider_v26::init_services(
node_isolation_mode_set
= wsrep_impl::resolve_function<wsrep_node_isolation_mode_set_fn_v1>(
wsrep_->dlh, WSREP_NODE_ISOLATION_MODE_SET_V1);

certify_v1 = wsrep_impl::resolve_function<wsrep_certify_fn_v1>(
wsrep_->dlh, WSREP_CERTIFY_V1);
}

void wsrep::wsrep_provider_v26::deinit_services()
Expand Down Expand Up @@ -922,14 +926,24 @@ enum wsrep::provider::status
wsrep::wsrep_provider_v26::certify(wsrep::client_id client_id,
wsrep::ws_handle& ws_handle,
int flags,
wsrep::ws_meta& ws_meta)
wsrep::ws_meta& ws_meta,
const seq_cb_t* seq_cb)
{
mutable_ws_handle mwsh(ws_handle);
mutable_ws_meta mmeta(ws_meta, flags);
return map_return_value(
wsrep_->certify(wsrep_, client_id.get(), mwsh.native(),
mmeta.native_flags(),
mmeta.native()));
if (seq_cb && certify_v1)
{
wsrep_seq_cb_t wseq_cb{seq_cb->ctx, seq_cb->fn};
return map_return_value(certify_v1(wsrep_, client_id.get(),
mwsh.native(), mmeta.native_flags(),
mmeta.native(), &wseq_cb));
}
else
{
return map_return_value(
wsrep_->certify(wsrep_, client_id.get(), mwsh.native(),
mmeta.native_flags(), mmeta.native()));
}
}

enum wsrep::provider::status
Expand Down
2 changes: 1 addition & 1 deletion src/wsrep_provider_v26.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ namespace wsrep
enum wsrep::provider::status
certify(wsrep::client_id, wsrep::ws_handle&,
int,
wsrep::ws_meta&) WSREP_OVERRIDE;
wsrep::ws_meta&, const seq_cb_t*) WSREP_OVERRIDE;
enum wsrep::provider::status
bf_abort(wsrep::seqno,
wsrep::transaction_id,
Expand Down
3 changes: 2 additions & 1 deletion test/mock_provider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ namespace wsrep
certify(wsrep::client_id client_id,
wsrep::ws_handle& ws_handle,
int flags,
wsrep::ws_meta& ws_meta)
wsrep::ws_meta& ws_meta,
const seq_cb* /* Ignored in unit tests. */)
WSREP_OVERRIDE
{
ws_handle = wsrep::ws_handle(ws_handle.transaction_id(), (void*)1);
Expand Down
2 changes: 1 addition & 1 deletion wsrep-API/v26
Submodule v26 updated 1 files
+64 −2 wsrep_api.h

0 comments on commit 3de594b

Please sign in to comment.