Skip to content

Commit

Permalink
fix calculation of pg used_bytes (eBay#173)
Browse files Browse the repository at this point in the history
* fix calculation of pg used_bytes

* add comments

* enable update total_occupied_blk_count when creating shard

* upgrade homestore version

* upgrade sisl version

* adopt homestore 6.4
  • Loading branch information
JacksonYao287 authored May 15, 2024
1 parent 7dda682 commit acb04e8
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 26 deletions.
6 changes: 3 additions & 3 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.0.1"
version = "2.0.2"
homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
topics = ("ebay")
Expand Down Expand Up @@ -40,8 +40,8 @@ def build_requirements(self):
self.build_requires("gtest/1.14.0")

def requirements(self):
self.requires("homestore/[~6.2, include_prerelease=True]@oss/master")
self.requires("sisl/[~12.1, include_prerelease=True]@oss/master")
self.requires("homestore/[~6.4, include_prerelease=True]@oss/master")
self.requires("sisl/[~12.2, include_prerelease=True]@oss/master")
self.requires("lz4/1.9.4", override=True)

def validate(self):
Expand Down
1 change: 1 addition & 0 deletions src/lib/homeobject_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ struct PG {
std::atomic< blob_id_t > blob_sequence_num{0ull};
std::atomic< uint64_t > active_blob_count{0ull};
std::atomic< uint64_t > tombstone_blob_count{0ull};
std::atomic< uint64_t > total_occupied_blk_count{0ull}; // this will only decrease after GC
};

PGInfo pg_info_;
Expand Down
9 changes: 5 additions & 4 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sis
homestore::MultiBlkId const& pbas,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< BlobManager::Result< BlobInfo > >* ctx{nullptr};
if (hs_ctx && hs_ctx->is_proposer) {
if (hs_ctx && hs_ctx->is_proposer()) {
ctx = boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< BlobInfo > > >(hs_ctx).get();
}

Expand Down Expand Up @@ -216,11 +216,12 @@ void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sis

// Update the durable counters. We need to update the blob_sequence_num here only for replay case, as the
// number is already updated in the put_blob call.
hs_pg->durable_entities_update([&blob_id](auto& de) {
hs_pg->durable_entities_update([&blob_id, &pbas](auto& de) {
auto existing_blob_id = de.blob_sequence_num.load();
while ((blob_id > existing_blob_id) &&
!de.blob_sequence_num.compare_exchange_weak(existing_blob_id, blob_id)) {}
de.active_blob_count.fetch_add(1, std::memory_order_relaxed);
de.total_occupied_blk_count.fetch_add(pbas.blk_count(), std::memory_order_relaxed);
});
}
}
Expand Down Expand Up @@ -313,7 +314,7 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard,
homestore::ReplResult< homestore::blk_alloc_hints >
HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< BlobManager::Result< BlobInfo > >* ctx{nullptr};
if (hs_ctx && hs_ctx->is_proposer) {
if (hs_ctx && hs_ctx->is_proposer()) {
ctx = boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< BlobInfo > > >(hs_ctx).get();
}

Expand Down Expand Up @@ -377,7 +378,7 @@ BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blo
void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< BlobManager::Result< BlobInfo > >* ctx{nullptr};
if (hs_ctx && hs_ctx->is_proposer) {
if (hs_ctx && hs_ctx->is_proposer()) {
ctx = boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< BlobInfo > > >(hs_ctx).get();
}

Expand Down
1 change: 1 addition & 0 deletions src/lib/homestore_backend/hs_cp_callbacks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ folly::Future< bool > HSHomeObject::MyCPCallbacks::cp_flush(CP* cp) {
hs_pg->pg_sb_->blob_sequence_num = hs_pg->durable_entities().blob_sequence_num.load();
hs_pg->pg_sb_->active_blob_count = hs_pg->durable_entities().active_blob_count.load();
hs_pg->pg_sb_->tombstone_blob_count = hs_pg->durable_entities().tombstone_blob_count.load();
hs_pg->pg_sb_->total_occupied_blk_count = hs_pg->durable_entities().total_occupied_blk_count.load();
dirty_pg_list.push_back(hs_pg);
}
}
Expand Down
20 changes: 12 additions & 8 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@ class HSHomeObject : public HomeObjectImpl {
peer_id_t replica_set_uuid;
homestore::uuid_t index_table_uuid;
blob_id_t blob_sequence_num;
uint64_t active_blob_count; // Total number of active blobs
uint64_t tombstone_blob_count; // Total number of tombstones
pg_members members[1]; // ISO C++ forbids zero-size array
uint64_t active_blob_count; // Total number of active blobs
uint64_t tombstone_blob_count; // Total number of tombstones
uint64_t total_occupied_blk_count; // Total number of occupied blocks
pg_members members[1]; // ISO C++ forbids zero-size array

uint32_t size() const { return sizeof(pg_info_superblk) + ((num_members - 1) * sizeof(pg_members)); }
static std::string name() { return _pg_meta_name; }
Expand Down Expand Up @@ -141,26 +142,25 @@ class HSHomeObject : public HomeObjectImpl {
struct HS_PG : public PG {
struct PGMetrics : public sisl::MetricsGroup {
public:
PGMetrics(HS_PG const& pg) :
sisl::MetricsGroup{"PG", std::to_string(pg.pg_info_.id)}, pg_(pg) {
PGMetrics(HS_PG const& pg) : sisl::MetricsGroup{"PG", std::to_string(pg.pg_info_.id)}, pg_(pg) {
// We use replica_set_uuid instead of pg_id for metrics to make it globally unique to allow aggregating
// across multiple nodes
REGISTER_GAUGE(shard_count, "Number of shards");
REGISTER_GAUGE(open_shard_count, "Number of open shards");
REGISTER_GAUGE(active_blob_count, "Number of valid blobs present");
REGISTER_GAUGE(tombstone_blob_count, "Number of tombstone blobs which can be garbage collected");
REGISTER_GAUGE(total_occupied_space,
"Total Size occupied (including padding, user_key, blob) rounded to block size");
REGISTER_COUNTER(total_user_key_size, "Total user key size provided",
sisl::_publish_as::publish_as_gauge);
REGISTER_COUNTER(total_occupied_space,
"Total Size occupied (including padding, user_key, blob) rounded to block size",
sisl::_publish_as::publish_as_gauge);

REGISTER_HISTOGRAM(blobs_per_shard,
"Distribution of blobs per shard"); // TODO: Add a bucket for blob sizes
REGISTER_HISTOGRAM(actual_blob_size, "Distribution of actual blob sizes");

register_me_to_farm();
attach_gather_cb(std::bind(&PGMetrics::on_gather, this));
blk_size = pg_.repl_dev_->get_blk_size();
}
~PGMetrics() { deregister_me_from_farm(); }
PGMetrics(const PGMetrics&) = delete;
Expand All @@ -175,10 +175,14 @@ class HSHomeObject : public HomeObjectImpl {
pg_.durable_entities().active_blob_count.load(std::memory_order_relaxed));
GAUGE_UPDATE(*this, tombstone_blob_count,
pg_.durable_entities().tombstone_blob_count.load(std::memory_order_relaxed));
GAUGE_UPDATE(*this, total_occupied_space,
pg_.durable_entities().total_occupied_blk_count.load(std::memory_order_relaxed) *
blk_size);
}

private:
HS_PG const& pg_;
uint32_t blk_size;
};

public:
Expand Down
7 changes: 5 additions & 2 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ void HSHomeObject::on_create_pg_message_commit(int64_t lsn, sisl::blob const& he
shared< homestore::ReplDev > repl_dev,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< PGManager::NullResult >* ctx{nullptr};
if (hs_ctx && hs_ctx->is_proposer) {
if (hs_ctx && hs_ctx->is_proposer()) {
ctx = boost::static_pointer_cast< repl_result_ctx< PGManager::NullResult > >(hs_ctx).get();
}

Expand Down Expand Up @@ -237,6 +237,7 @@ HSHomeObject::HS_PG::HS_PG(PGInfo info, shared< homestore::ReplDev > rdev, share
pg_sb_->index_table_uuid = index_table_->uuid();
pg_sb_->active_blob_count = 0;
pg_sb_->tombstone_blob_count = 0;
pg_sb_->total_occupied_blk_count = 0;

uint32_t i{0};
for (auto const& m : pg_info_.members) {
Expand All @@ -254,6 +255,7 @@ HSHomeObject::HS_PG::HS_PG(homestore::superblk< HSHomeObject::pg_info_superblk >
durable_entities_.blob_sequence_num = pg_sb_->blob_sequence_num;
durable_entities_.active_blob_count = pg_sb_->active_blob_count;
durable_entities_.tombstone_blob_count = pg_sb_->tombstone_blob_count;
durable_entities_.total_occupied_blk_count = pg_sb_->total_occupied_blk_count;
}

uint32_t HSHomeObject::HS_PG::total_shards() const { return shards_.size(); }
Expand Down Expand Up @@ -305,7 +307,8 @@ bool HSHomeObject::_get_stats(pg_id_t id, PGStats& stats) const {
if (pdev_id_hint.has_value()) {
stats.avail_open_shards = chunk_selector()->avail_num_chunks(pdev_id_hint.value());
stats.avail_bytes = chunk_selector()->avail_blks(pdev_id_hint) * blk_size;
stats.used_bytes = chunk_selector()->total_blks(pdev_id_hint.value()) * blk_size - stats.avail_bytes;
stats.used_bytes =
hs_pg->durable_entities().total_occupied_blk_count.load(std::memory_order_relaxed) * blk_size;
} else {
// if no shard has been created on this PG yet, it means this PG could arrive on any drive that has the most
// available open shards;
Expand Down
19 changes: 16 additions & 3 deletions src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_seal_shard(ShardInfo const
bool HSHomeObject::on_shard_message_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< ShardManager::Result< ShardInfo > >* ctx{nullptr};
if (hs_ctx && hs_ctx->is_proposer) {
if (hs_ctx && hs_ctx->is_proposer()) {
ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx).get();
}
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes());
Expand Down Expand Up @@ -238,7 +238,7 @@ bool HSHomeObject::on_shard_message_pre_commit(int64_t lsn, sisl::blob const& he
void HSHomeObject::on_shard_message_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< ShardManager::Result< ShardInfo > >* ctx{nullptr};
if (hs_ctx && hs_ctx->is_proposer) {
if (hs_ctx && hs_ctx->is_proposer()) {
ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx).get();
}
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes());
Expand Down Expand Up @@ -277,7 +277,7 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom
shared< homestore::ReplDev > repl_dev,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< ShardManager::Result< ShardInfo > >* ctx{nullptr};
if (hs_ctx && hs_ctx->is_proposer) {
if (hs_ctx && hs_ctx->is_proposer()) {
ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx).get();
}

Expand Down Expand Up @@ -328,6 +328,19 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom
chunk_selector_->select_specific_chunk(blkids.chunk_num());
}
if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); }

// update pg's total_occupied_blk_count
HS_PG* hs_pg{nullptr};
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(shard_info.placement_group);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
hs_pg = static_cast< HS_PG* >(iter->second.get());
}
hs_pg->durable_entities_update([&blkids](auto& de) {
de.total_occupied_blk_count.fetch_add(blkids.blk_count(), std::memory_order_relaxed);
});

break;
}

Expand Down
17 changes: 13 additions & 4 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header,
void ReplicationStateMachine::on_error(ReplServiceError error, const sisl::blob& header, const sisl::blob& key,
cintrusive< repl_req_ctx >& ctx) {
RELEASE_ASSERT(ctx, "ctx should not be nullptr in on_error");
RELEASE_ASSERT(ctx->is_proposer, "on_error should only be called from proposer");
RELEASE_ASSERT(ctx->is_proposer(), "on_error should only be called from proposer");
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes());
LOGE("on_error, message type {} with lsn {}, error {}", msg_header->msg_type, ctx->lsn, error);
LOGE("on_error, message type {} with lsn {}, error {}", msg_header->msg_type, ctx->lsn(), error);
switch (msg_header->msg_type) {
case ReplicationMessageType::CREATE_PG_MSG: {
auto result_ctx = boost::static_pointer_cast< repl_result_ctx< PGManager::NullResult > >(ctx).get();
Expand All @@ -103,7 +103,7 @@ void ReplicationStateMachine::on_error(ReplServiceError error, const sisl::blob&
break;
}
default: {
LOGE("Unknown message type, error unhandled , error :{}, lsn {}", error, ctx->lsn);
LOGE("Unknown message type, error unhandled , error :{}, lsn {}", error, ctx->lsn());
break;
}
}
Expand Down Expand Up @@ -146,6 +146,15 @@ ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header, uint32_t
return homestore::blk_alloc_hints();
}

void ReplicationStateMachine::on_replica_stop() {}
void ReplicationStateMachine::on_destroy() {
// TODO:: add the logic to handle destroy
LOGI("replica destroyed");
}

homestore::AsyncReplResult<> ReplicationStateMachine::create_snapshot(homestore::repl_snapshot& s) {
// TODO::add create snapshot logic
LOGI("create snapshot, last_log_idx_: {} , last_log_term_: {}", s.last_log_idx_, s.last_log_term_);
return folly::makeSemiFuture< homestore::ReplResult< folly::Unit > >(folly::Unit{});
}

} // namespace homeobject
7 changes: 5 additions & 2 deletions src/lib/homestore_backend/replication_state_machine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,11 @@ class ReplicationStateMachine : public homestore::ReplDevListener {
homestore::ReplResult< homestore::blk_alloc_hints > get_blk_alloc_hints(sisl::blob const& header,
uint32_t data_size) override;

/// @brief Called when the replica set is being stopped
void on_replica_stop() override;
/// @brief Called when the replica is being destroyed by nuraft;
void on_destroy() override;

/// @brief Called when the snapshot is being created by nuraft;
homestore::AsyncReplResult<> create_snapshot(homestore::repl_snapshot& s) override;

private:
HSHomeObject* home_object_{nullptr};
Expand Down

0 comments on commit acb04e8

Please sign in to comment.