Skip to content
This repository has been archived by the owner on Jan 3, 2024. It is now read-only.

rgw/sfs: honor retry_raced_bucket_write mechanism #240

Draft
wants to merge 1 commit into
base: s3gw
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 65 additions & 24 deletions src/rgw/driver/sfs/bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ namespace rgw::sal {

SFSBucket::SFSBucket(SFStore* _store, sfs::BucketRef _bucket)
: StoreBucket(_bucket->get_info()), store(_store), bucket(_bucket) {
update_views();
}

void SFSBucket::update_views() {
get_info() = bucket->get_info();
set_attrs(bucket->get_attrs());

auto it = attrs.find(RGW_ATTR_ACL);
Expand All @@ -56,6 +61,47 @@ SFSBucket::SFSBucket(SFStore* _store, sfs::BucketRef _bucket)
}
}

int SFSBucket::try_metadata_update(
const std::function<int(sfs::sqlite::DBOPBucketInfo& current_state)>&
apply_delta
) {
auto current_state = sfs::sqlite::DBOPBucketInfo(get_info(), get_attrs());
auto db_conn = get_store().db_conn;
int res =
db_conn->transact([&](rgw::sal::sfs::sqlite::StorageRef storage) -> int {
auto db_state = sfs::get_meta_buckets(db_conn)->get_bucket(
bucket->get_bucket_id(), storage
);
if (!db_state) {
// this is an error, the operation should not be retried
return -ERR_NO_SUCH_BUCKET;
}
if (current_state != *db_state) {
// the operation will be retried
return -ECANCELED;
}
// current_state == db_state, we apply the delta and we store the bucket.
int res = apply_delta(current_state);
if (res) {
return res;
}
sfs::get_meta_buckets(db_conn)->store_bucket(current_state, storage);
return 0;
});

if (!res) {
store->_refresh_buckets_safe();
auto bref = store->get_bucket_ref(get_name());
if (!bref) {
// if we go here, the state of this bucket is inconsistent
return -ERR_NO_SUCH_ENTITY;
}
bucket = bref;
update_views();
}
return res;
}

void SFSBucket::write_meta(const DoutPrefixProvider* /*dpp*/) {
// TODO
}
Expand Down Expand Up @@ -404,28 +450,12 @@ int SFSBucket::
int SFSBucket::merge_and_store_attrs(
const DoutPrefixProvider* /*dpp*/, Attrs& new_attrs, optional_yield /*y*/
) {
for (auto& it : new_attrs) {
attrs[it.first] = it.second;

if (it.first == RGW_ATTR_ACL) {
auto lval = it.second.cbegin();
acls.decode(lval);
}
}
for (auto& it : attrs) {
auto it_find = new_attrs.find(it.first);
if (it_find == new_attrs.end()) {
// this is an old attr that is not defined in the new_attrs
// delete it
attrs.erase(it.first);
}
}

sfs::get_meta_buckets(get_store().db_conn)
->store_bucket(sfs::sqlite::DBOPBucketInfo(get_info(), get_attrs()));

store->_refresh_buckets_safe();
return 0;
return try_metadata_update(
[&](sfs::sqlite::DBOPBucketInfo& current_state) -> int {
current_state.battrs = new_attrs;
return 0;
}
);
}

// try_resolve_mp_from_oid tries to parse an integer id from oid to
Expand Down Expand Up @@ -529,11 +559,22 @@ int SFSBucket::abort_multiparts(
return sfs::SFSMultipartUploadV2::abort_multiparts(dpp, store, this);
}

/**
* @brief Refresh this bucket object with the state obtained from the store.
Indeed it can happen that the state of this bucket is obsolete due to
concurrent threads updating metadata using their own SFSBucket instance.
*/
int SFSBucket::try_refresh_info(
Copy link
Member

@irq0 irq0 Oct 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be only half the solution. I didn't dig too deep, but hope you know more :)

If I inline and simplify the logic a bit we have:

int retry_raced_bucket_write(const DoutPrefixProvider *dpp, rgw::sal::Bucket* b, const F& f) {
  auto r = f();
  for (auto i = 0u; i < 15u && r == -ECANCELED; ++i) {
    r = b->try_refresh_info(dpp, nullptr);
    if (r >= 0) {
      rgw::sal::Attrs attrs = s->bucket->get_attrs();
      attrs[RGW_ATTR_TAGS] = tags_bl;
      r = s->bucket->merge_and_store_attrs(this, attrs, y);
    }
  }
  return r;
}

What if something goes state between try_refresh_info and merge_and_store_attrs? Is merge_and_store_attrs responsible to return -ECANCELED when it detects staleness? Can we detect merge conflicts there?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I have to say the truth, I don't think the retry_raced_bucket_write can solve the problem for all cases, it can only mitigate the problem.
Imagine 2 physical threads A and B and a bucket with an initial state S0, they both race and run true-parallel on their own CPU core the try_refresh_info function; at this point for both A and B the state of the bucket in memory is S0, they both think the S0 state is the latest one and they happily update the bucket both with their new data.
Now, because in any decent implementation, an update will be done mutual exclusively, A or B will run a portion of critical code in a serialized way, so in the end, either the state of A or the state of B will prevaricate on the other.
Is this an error? I don't think so, it is part of the game when you allow a bucket to be modified concurrently by more than 1 physical thread.
At a certain point, a thread will have to say: "Can I safely assume this state in my cached memory is good enough?"; it doesn't matter how much far you go checking this, at a certain point you simply must go on.
This retry_raced_bucket_write mechanism can work well with logical threads mapped over 1 physical thread.
In this case they can always be serialized in some way; in this case the retry_raced_bucket_write solves something it is correct to solve because the 2 logical threads run over a single core as if they where a single logical sequence of statements.
This is the reasoning I've done myself trying to figure out what were the original "meaning and purposes" of the original authors of this code.
Let's always keep in mind that the actual Truth, until we are working downstream, and dealing with parts developed upstream will be hard to be completely clarified; we would need the support of the original developers to really understand what was the aim of something.
The best we can do now is to hope that our guess is good enough :) .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's assume we are in s->bucket->merge_and_store_attrs(this, new, y);

State we have: database state db, our current copy in bucket current and the update in new.
Caller based new on current fetched from db.

We must transition from db to new. Right now we transition form current to new. This is a bug, because current may be stale. This is the case if current != db.

If we make the update transaction conditional on current == db, we would transition from db to new in turn. If at time of the update current != db, we fail with -ECANCELED and let the retry logic compute a fresh update.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, now I think I got what you mean, thanks; I try to explain the limit case so we can discuss from that:
Let's assume both threads A and B reach update() concurrently.
Let's assume that update() is the function that flushes a state on the db and cannot be executed by 2 threads concurrently because mutex protected.

thread A

try_update_data()
	-> fetch_from_db() -> db_state_0
	->  current_state = db_state_0

update(current_state + new_A)

now bucket's state on db is db_state_A

thread B

try_update_data()
	-> fetch_from_db() -> db_state_0
	-> current_state = db_state_0

update(current_state_0 + new_B)

now bucket's state on db is db_state_B

final state on db: db_state_B

db_state_A != db_state_B

new_A is lost.

To avoid the scenario above, we should implement a mechanism inside the update() mutex protected function (and it must be only that way for the following check).
So update() should be something like:

int update(const State &current, const &new_delta)
{
  mutex.lock()
  defer mutex.unlock()

  db_state = _fetch_db_state()
  if current != db_state return -ECANCELED
  return _update(current+new_delta);
}

A good candidate for the update() function could/should be sfs::get_meta_buckets(get_store().db_conn) ->store_bucket(), correct?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be one layer deeper in the database update. Our Bucket code still keeps a shared copy of the database representation in a map, so this might be a bit more tricky with the database access and mutex around the map.

In essence I think we need something like 'update bucket set attrs = new attrs where attrs = cached attrs'

  • If that transaction didn't change anything, bail out and let the SAL layer update and retry.

Unfortunately I don't think can't say 'attrs = cached attrs', because both are ceph serialized blobs. A transaction that fetches, deserializes, compares and conditionally rolls back / commits may work.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've done a patch in this direction that seemed promising, but in the end I stumbled in an issue that IMO requires comments from upstream, so I've opened: https://tracker.ceph.com/issues/63560

const DoutPrefixProvider* dpp, ceph::real_time* /*pmtime*/
) {
lsfs_warn(dpp) << __func__ << ": TODO" << dendl;
return -ENOTSUP;
auto bref = store->get_bucket_ref(get_name());
if (!bref) {
lsfs_dout(dpp, 0) << fmt::format("no such bucket! {}", get_name()) << dendl;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might have been a concurrent bucket delete + GC, right? This would mean this isn't 'excessively safe', just normal way of doing business. -ERR_NO_SUCH_BUCKET sounds sensible to me, logging an error not (because this is normal behavior). Suggest to remote the message or make it debug + improve it. There isn't much to debug on with 'no such bucket! NAME' in hand.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that there could be a concurrent delete, I will remove the comment.
I think the "error" logging message has its own dignity to exist, maybe lowering at warning level, this could help to spot a client bug in its application logic (a client issuing a concurrent delete + update metadata on the same bucket could smell a bit) .
How would you improve the message anyway (what fields dump)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From our perspective it is hard to tell if this is a client issue or normal behavior. There might be concurrent clients doing this operation. It is not our job to log warnings for legal but possible weird client behavior. If someone wants to debug this, there is the operation log feature and we already log that the bucket doens't exists with the standard request log that includes the error return. A "no such bucket" warning is redundant at best.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, will remove that.

return -ERR_NO_SUCH_BUCKET;
}
bucket = bref;
update_views();
return 0;
}

int SFSBucket::read_usage(
Expand Down
19 changes: 19 additions & 0 deletions src/rgw/driver/sfs/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,25 @@ class SFSBucket : public StoreBucket {
SFSBucket(SFStore* _store, sfs::BucketRef _bucket);
SFSBucket& operator=(const SFSBucket&) = delete;

/**
* This method updates the in-memory views of this object fetching
* from this.bucket.
* This method should be called every time this.bucket is updated
* from the backing storage.
*
* Views updated:
*
* - get_info()
* - get_attrs()
* - acls
*/
void update_views();

int try_metadata_update(
const std::function<int(sfs::sqlite::DBOPBucketInfo& current_state)>&
apply_delta
);

virtual std::unique_ptr<Bucket> clone() override {
return std::unique_ptr<Bucket>(new SFSBucket{*this});
}
Expand Down
10 changes: 10 additions & 0 deletions src/rgw/driver/sfs/sqlite/buckets/bucket_definitions.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ struct DBOPBucketInfo {

DBOPBucketInfo(const DBOPBucketInfo& other) = default;
DBOPBucketInfo& operator=(const DBOPBucketInfo& other) = default;

bool operator==(const DBOPBucketInfo& other) const {
if (this->deleted != other.deleted) return false;
if (this->battrs != other.battrs) return false;
ceph::bufferlist this_binfo_bl;
this->binfo.encode(this_binfo_bl);
ceph::bufferlist other_binfo_bl;
other.binfo.encode(other_binfo_bl);
return this_binfo_bl == other_binfo_bl;
}
};

using DBDeletedObjectItem =
Expand Down
7 changes: 7 additions & 0 deletions src/rgw/driver/sfs/sqlite/dbconn.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ class DBConn {
std::vector<sqlite3*> sqlite_conns;
const std::thread::id main_thread;
mutable std::shared_mutex storage_pool_mutex;
mutable std::mutex transactional_block_mutex;

public:
CephContext* const cct;
Expand All @@ -289,6 +290,12 @@ class DBConn {
return dbapi::sqlite::database(get_storage()->filename());
}

int transact(const std::function<int(StorageRef)>& block) {
auto storage = get_storage();
std::unique_lock<std::mutex> lock(transactional_block_mutex);
return block(storage);
}

static std::string getDBPath(CephContext* cct) {
auto rgw_sfs_path = cct->_conf.get_val<std::string>("rgw_sfs_data_path");
auto db_path =
Expand Down
91 changes: 65 additions & 26 deletions src/rgw/driver/sfs/sqlite/sqlite_buckets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ std::vector<DBOPBucketInfo> get_rgw_buckets(
}

std::optional<DBOPBucketInfo> SQLiteBuckets::get_bucket(
const std::string& bucket_id
const std::string& bucket_id, rgw::sal::sfs::sqlite::StorageRef storage
) const {
auto storage = conn->get_storage();
if (!storage) {
storage = conn->get_storage();
}
auto bucket = storage->get_pointer<DBBucket>(bucket_id);
std::optional<DBOPBucketInfo> ret_value;
if (bucket) {
Expand All @@ -50,9 +52,11 @@ std::optional<DBOPBucketInfo> SQLiteBuckets::get_bucket(
}

std::optional<std::pair<std::string, std::string>> SQLiteBuckets::get_owner(
const std::string& bucket_id
const std::string& bucket_id, rgw::sal::sfs::sqlite::StorageRef storage
) const {
auto storage = conn->get_storage();
if (!storage) {
storage = conn->get_storage();
}
const auto rows = storage->select(
columns(&DBUser::user_id, &DBUser::display_name),
inner_join<DBUser>(on(is_equal(&DBBucket::owner_id, &DBUser::user_id))),
Expand All @@ -66,62 +70,92 @@ std::optional<std::pair<std::string, std::string>> SQLiteBuckets::get_owner(
}

std::vector<DBOPBucketInfo> SQLiteBuckets::get_bucket_by_name(
const std::string& bucket_name
const std::string& bucket_name, rgw::sal::sfs::sqlite::StorageRef storage
) const {
auto storage = conn->get_storage();
if (!storage) {
storage = conn->get_storage();
}
return get_rgw_buckets(
storage->get_all<DBBucket>(where(c(&DBBucket::bucket_name) = bucket_name))
);
}

void SQLiteBuckets::store_bucket(const DBOPBucketInfo& bucket) const {
auto storage = conn->get_storage();
void SQLiteBuckets::store_bucket(
const DBOPBucketInfo& bucket, rgw::sal::sfs::sqlite::StorageRef storage
) const {
if (!storage) {
storage = conn->get_storage();
}
auto db_bucket = get_db_bucket(bucket);
storage->replace(db_bucket);
}

void SQLiteBuckets::remove_bucket(const std::string& bucket_name) const {
auto storage = conn->get_storage();
void SQLiteBuckets::remove_bucket(
const std::string& bucket_name, rgw::sal::sfs::sqlite::StorageRef storage
) const {
if (!storage) {
storage = conn->get_storage();
}
storage->remove<DBBucket>(bucket_name);
}

std::vector<std::string> SQLiteBuckets::get_bucket_ids() const {
auto storage = conn->get_storage();
std::vector<std::string> SQLiteBuckets::get_bucket_ids(
rgw::sal::sfs::sqlite::StorageRef storage
) const {
if (!storage) {
storage = conn->get_storage();
}
return storage->select(&DBBucket::bucket_name);
}

std::vector<std::string> SQLiteBuckets::get_bucket_ids(
const std::string& user_id
const std::string& user_id, rgw::sal::sfs::sqlite::StorageRef storage
) const {
auto storage = conn->get_storage();
if (!storage) {
storage = conn->get_storage();
}
return storage->select(
&DBBucket::bucket_name, where(c(&DBBucket::owner_id) = user_id)
);
}

std::vector<DBOPBucketInfo> SQLiteBuckets::get_buckets() const {
auto storage = conn->get_storage();
std::vector<DBOPBucketInfo> SQLiteBuckets::get_buckets(
rgw::sal::sfs::sqlite::StorageRef storage
) const {
if (!storage) {
storage = conn->get_storage();
}
return get_rgw_buckets(storage->get_all<DBBucket>());
}

std::vector<DBOPBucketInfo> SQLiteBuckets::get_buckets(
const std::string& user_id
const std::string& user_id, rgw::sal::sfs::sqlite::StorageRef storage
) const {
auto storage = conn->get_storage();
if (!storage) {
storage = conn->get_storage();
}
return get_rgw_buckets(
storage->get_all<DBBucket>(where(c(&DBBucket::owner_id) = user_id))
);
}

std::vector<std::string> SQLiteBuckets::get_deleted_buckets_ids() const {
auto storage = conn->get_storage();
std::vector<std::string> SQLiteBuckets::get_deleted_buckets_ids(
rgw::sal::sfs::sqlite::StorageRef storage
) const {
if (!storage) {
storage = conn->get_storage();
}
return storage->select(
&DBBucket::bucket_id, where(c(&DBBucket::deleted) = true)
);
}

bool SQLiteBuckets::bucket_empty(const std::string& bucket_id) const {
auto storage = conn->get_storage();
bool SQLiteBuckets::bucket_empty(
const std::string& bucket_id, rgw::sal::sfs::sqlite::StorageRef storage
) const {
if (!storage) {
storage = conn->get_storage();
}
auto num_ids = storage->count<DBVersionedObject>(
inner_join<DBObject>(
on(is_equal(&DBObject::uuid, &DBVersionedObject::object_id))
Expand All @@ -136,9 +170,12 @@ bool SQLiteBuckets::bucket_empty(const std::string& bucket_id) const {
}

std::optional<DBDeletedObjectItems> SQLiteBuckets::delete_bucket_transact(
const std::string& bucket_id, uint max_objects, bool& bucket_deleted
const std::string& bucket_id, uint max_objects, bool& bucket_deleted,
rgw::sal::sfs::sqlite::StorageRef storage
) const {
auto storage = conn->get_storage();
if (!storage) {
storage = conn->get_storage();
}
RetrySQLiteBusy<DBDeletedObjectItems> retry([&]() {
bucket_deleted = false;
DBDeletedObjectItems ret_values;
Expand Down Expand Up @@ -186,9 +223,11 @@ std::optional<DBDeletedObjectItems> SQLiteBuckets::delete_bucket_transact(
}

const std::optional<SQLiteBuckets::Stats> SQLiteBuckets::get_stats(
const std::string& bucket_id
const std::string& bucket_id, rgw::sal::sfs::sqlite::StorageRef storage
) const {
auto storage = conn->get_storage();
if (!storage) {
storage = conn->get_storage();
}
std::optional<SQLiteBuckets::Stats> stats;

auto res = storage->select(
Expand Down
Loading
Loading