Skip to content

Commit

Permalink
[Enhancement](MS) Add fix tablet data size api for meta service (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
Yukang-Lian committed Nov 7, 2024
1 parent 85f839f commit ffa6fef
Show file tree
Hide file tree
Showing 9 changed files with 528 additions and 0 deletions.
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ Status CloudFullCompaction::modify_rowsets() {
compaction_job->set_num_output_rows(_output_rowset->num_rows());
compaction_job->set_size_input_rowsets(_input_rowsets_size);
compaction_job->set_size_output_rowsets(_output_rowset->data_disk_size());
DBUG_EXECUTE_IF("CloudFullCompaction::modify_rowsets.wrong_compaction_data_size", {
compaction_job->set_size_input_rowsets(1);
compaction_job->set_size_output_rowsets(10000001);
})
compaction_job->set_num_input_segments(_input_segments);
compaction_job->set_num_output_segments(_output_rowset->num_segments());
compaction_job->set_num_input_rowsets(_input_rowsets.size());
Expand Down
50 changes: 50 additions & 0 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2188,4 +2188,54 @@ std::pair<MetaServiceCode, std::string> MetaServiceImpl::get_instance_info(
return {code, std::move(msg)};
}

std::pair<std::string, std::string> init_key_pair(std::string instance_id, int64_t table_id) {
std::string begin_key = stats_tablet_key({instance_id, table_id, 0, 0, 0});
std::string end_key = stats_tablet_key({instance_id, table_id + 1, 0, 0, 0});
return std::make_pair(begin_key, end_key);
}

MetaServiceResponseStatus MetaServiceImpl::fix_tablet_stats(std::string cloud_unique_id_str,
std::string table_id_str) {
// parse params
int64_t table_id;
std::string instance_id;
MetaServiceResponseStatus st = parse_fix_tablet_stats_param(
resource_mgr_, table_id_str, cloud_unique_id_str, table_id, instance_id);
if (st.code() != MetaServiceCode::OK) {
return st;
}

std::pair<std::string, std::string> key_pair = init_key_pair(instance_id, table_id);
std::string old_begin_key;
while (old_begin_key < key_pair.first) {
// get tablet stats
std::vector<std::shared_ptr<TabletStatsPB>> tablet_stat_shared_ptr_vec_batch;
old_begin_key = key_pair.first;

// fix tablet stats
size_t retry = 0;
do {
st = fix_tablet_stats_internal(txn_kv_, key_pair, tablet_stat_shared_ptr_vec_batch,
instance_id);
if (st.code() != MetaServiceCode::OK) {
LOG_WARNING("failed to fix tablet stats")
.tag("err", st.msg())
.tag("table id", table_id)
.tag("retry time", retry);
}
retry++;
} while (st.code() != MetaServiceCode::OK && retry < 3);
if (st.code() != MetaServiceCode::OK) {
return st;
}

// Check tablet stats
st = check_new_tablet_stats(txn_kv_, instance_id, tablet_stat_shared_ptr_vec_batch);
if (st.code() != MetaServiceCode::OK) {
return st;
}
}
return st;
}

} // namespace doris::cloud
7 changes: 7 additions & 0 deletions cloud/src/meta-service/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ class Transaction;

constexpr std::string_view BUILT_IN_STORAGE_VAULT_NAME = "built_in_storage_vault";

void internal_get_rowset(Transaction* txn, int64_t start, int64_t end,
const std::string& instance_id, int64_t tablet_id, MetaServiceCode& code,
std::string& msg, GetRowsetResponse* response);

class MetaServiceImpl : public cloud::MetaService {
public:
MetaServiceImpl(std::shared_ptr<TxnKv> txn_kv, std::shared_ptr<ResourceManager> resource_mgr,
Expand Down Expand Up @@ -298,6 +302,9 @@ class MetaServiceImpl : public cloud::MetaService {
const std::string& cloud_unique_id,
InstanceInfoPB* instance);

MetaServiceResponseStatus fix_tablet_stats(std::string cloud_unique_id_str,
std::string table_id_str);

private:
std::pair<MetaServiceCode, std::string> alter_instance(
const AlterInstanceRequest* request,
Expand Down
14 changes: 14 additions & 0 deletions cloud/src/meta-service/meta_service_http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,16 @@ static HttpResponse process_get_tablet_stats(MetaServiceImpl* service, brpc::Con
return http_text_reply(resp.status(), body);
}

static HttpResponse process_fix_tablet_stats(MetaServiceImpl* service, brpc::Controller* ctrl) {
auto& uri = ctrl->http_request().uri();
std::string_view cloud_unique_id = http_query(uri, "cloud_unique_id");
std::string_view table_id = http_query(uri, "table_id");

MetaServiceResponseStatus st =
service->fix_tablet_stats(std::string(cloud_unique_id), std::string(table_id));
return http_text_reply(st, st.DebugString());
}

static HttpResponse process_get_stage(MetaServiceImpl* service, brpc::Controller* ctrl) {
GetStageRequest req;
PARSE_MESSAGE_OR_RETURN(ctrl, req);
Expand Down Expand Up @@ -575,13 +585,17 @@ void MetaServiceImpl::http(::google::protobuf::RpcController* controller,
{"get_value", process_get_value},
{"show_meta_ranges", process_show_meta_ranges},
{"txn_lazy_commit", process_txn_lazy_commit},
{"injection_point", process_injection_point},
{"fix_tablet_stats", process_fix_tablet_stats},
{"v1/decode_key", process_decode_key},
{"v1/encode_key", process_encode_key},
{"v1/get_value", process_get_value},
{"v1/show_meta_ranges", process_show_meta_ranges},
{"v1/txn_lazy_commit", process_txn_lazy_commit},
// for get
{"get_instance", process_get_instance_info},
// for get
{"get_instance", process_get_instance_info},
{"get_obj_store_info", process_get_obj_store_info},
{"get_cluster", process_get_cluster},
{"get_tablet_stats", process_get_tablet_stats},
Expand Down
245 changes: 245 additions & 0 deletions cloud/src/meta-service/meta_service_tablet_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,22 @@

#include "meta-service/meta_service_tablet_stats.h"

#include <fmt/core.h>
#include <fmt/format.h>
#include <gen_cpp/cloud.pb.h>

#include <cstdint>
#include <memory>
#include <string>
#include <string_view>

#include "common/logging.h"
#include "common/util.h"
#include "meta-service/keys.h"
#include "meta-service/meta_service.h"
#include "meta-service/meta_service_helper.h"
#include "meta-service/txn_kv.h"
#include "meta-service/txn_kv_error.h"

namespace doris::cloud {

Expand Down Expand Up @@ -156,4 +165,240 @@ void internal_get_tablet_stats(MetaServiceCode& code, std::string& msg, Transact
merge_tablet_stats(stats, detached_stats);
}

MetaServiceResponseStatus parse_fix_tablet_stats_param(
std::shared_ptr<ResourceManager> resource_mgr, const std::string& table_id_str,
const std::string& cloud_unique_id_str, int64_t& table_id, std::string& instance_id) {
MetaServiceCode code = MetaServiceCode::OK;
std::string msg;
MetaServiceResponseStatus st;
st.set_code(MetaServiceCode::OK);

// parse params
try {
table_id = std::stoll(table_id_str);
} catch (...) {
st.set_code(MetaServiceCode::INVALID_ARGUMENT);
st.set_msg("Invalid table_id, table_id: " + table_id_str);
return st;
}

instance_id = get_instance_id(resource_mgr, cloud_unique_id_str);
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id_str;
st.set_code(code);
st.set_msg(msg);
return st;
}
return st;
}

MetaServiceResponseStatus fix_tablet_stats_internal(
std::shared_ptr<TxnKv> txn_kv, std::pair<std::string, std::string>& key_pair,
std::vector<std::shared_ptr<TabletStatsPB>>& tablet_stat_shared_ptr_vec_batch,
const std::string& instance_id, size_t batch_size) {
std::unique_ptr<Transaction> txn;
MetaServiceResponseStatus st;
st.set_code(MetaServiceCode::OK);
MetaServiceCode code = MetaServiceCode::OK;
std::unique_ptr<RangeGetIterator> it;
std::vector<std::shared_ptr<TabletStatsPB>> tmp_tablet_stat_vec;

TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
st.set_code(cast_as<ErrCategory::CREATE>(err));
st.set_msg("failed to create txn");
return st;
}

// read tablet stats
err = txn->get(key_pair.first, key_pair.second, &it, true);
if (err != TxnErrorCode::TXN_OK) {
st.set_code(cast_as<ErrCategory::READ>(err));
st.set_msg(fmt::format("failed to get tablet stats, err={} ", err));
return st;
}

size_t tablet_cnt = 0;
while (it->has_next() && tablet_cnt < batch_size) {
auto [k, v] = it->next();
key_pair.first = k;
auto k1 = k;
k1.remove_prefix(1);
std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
decode_key(&k1, &out);

// 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id} -> TabletStatsPB
if (out.size() == 7) {
tablet_cnt++;
TabletStatsPB tablet_stat;
tablet_stat.ParseFromArray(v.data(), v.size());
tmp_tablet_stat_vec.emplace_back(std::make_shared<TabletStatsPB>(tablet_stat));
}
}
if (it->has_next()) {
key_pair.first = it->next().first;
}

for (const auto& tablet_stat_ptr : tmp_tablet_stat_vec) {
GetRowsetResponse resp;
std::string msg;
// get rowsets in tablet and accumulate disk size
internal_get_rowset(txn.get(), 0, std::numeric_limits<int64_t>::max() - 1, instance_id,
tablet_stat_ptr->idx().tablet_id(), code, msg, &resp);
if (code != MetaServiceCode::OK) {
st.set_code(code);
st.set_msg(msg);
return st;
}
int64_t total_disk_size = 0;
for (const auto& rs_meta : resp.rowset_meta()) {
total_disk_size += rs_meta.total_disk_size();
}

// set new disk size to tabletPB and write it back
TabletStatsPB tablet_stat;
tablet_stat.CopyFrom(*tablet_stat_ptr);
tablet_stat.set_data_size(total_disk_size);
// record tablet stats batch
tablet_stat_shared_ptr_vec_batch.emplace_back(std::make_shared<TabletStatsPB>(tablet_stat));
std::string tablet_stat_key;
std::string tablet_stat_value;
tablet_stat_key = stats_tablet_key(
{instance_id, tablet_stat.idx().table_id(), tablet_stat.idx().index_id(),
tablet_stat.idx().partition_id(), tablet_stat.idx().tablet_id()});
if (!tablet_stat.SerializeToString(&tablet_stat_value)) {
st.set_code(MetaServiceCode::PROTOBUF_SERIALIZE_ERR);
st.set_msg("failed to serialize tablet stat");
return st;
}
txn->put(tablet_stat_key, tablet_stat_value);

// read num segs
// 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id} "num_segs" -> int64
std::string tablet_stat_num_segs_key;
stats_tablet_num_segs_key(
{instance_id, tablet_stat_ptr->idx().table_id(), tablet_stat_ptr->idx().index_id(),
tablet_stat_ptr->idx().partition_id(), tablet_stat_ptr->idx().tablet_id()},
&tablet_stat_num_segs_key);
int64_t tablet_stat_num_segs = 0;
std::string tablet_stat_num_segs_value(sizeof(tablet_stat_num_segs), '\0');
err = txn->get(tablet_stat_num_segs_key, &tablet_stat_num_segs_value);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
st.set_code(cast_as<ErrCategory::READ>(err));
}
if (tablet_stat_num_segs_value.size() != sizeof(tablet_stat_num_segs)) [[unlikely]] {
LOG(WARNING) << " malformed tablet stats value v.size="
<< tablet_stat_num_segs_value.size()
<< " value=" << hex(tablet_stat_num_segs_value);
}
std::memcpy(&tablet_stat_num_segs, tablet_stat_num_segs_value.data(),
sizeof(tablet_stat_num_segs));
if constexpr (std::endian::native == std::endian::big) {
tablet_stat_num_segs = bswap_64(tablet_stat_num_segs);
}

if (tablet_stat_num_segs > 0) {
// set tablet stats data size = 0
// 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id} "data_size" -> int64
std::string tablet_stat_data_size_key;
stats_tablet_data_size_key(
{instance_id, tablet_stat.idx().table_id(), tablet_stat.idx().index_id(),
tablet_stat.idx().partition_id(), tablet_stat.idx().tablet_id()},
&tablet_stat_data_size_key);
int64_t tablet_stat_data_size = 0;
std::string tablet_stat_data_size_value(sizeof(tablet_stat_data_size), '\0');
memcpy(tablet_stat_data_size_value.data(), &tablet_stat_data_size,
sizeof(tablet_stat_data_size));
txn->put(tablet_stat_data_size_key, tablet_stat_data_size_value);
}
}

err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
st.set_code(cast_as<ErrCategory::COMMIT>(err));
st.set_msg("failed to commit txn");
return st;
}
return st;
}

MetaServiceResponseStatus check_new_tablet_stats(
std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id,
const std::vector<std::shared_ptr<TabletStatsPB>>& tablet_stat_shared_ptr_vec_batch) {
std::unique_ptr<Transaction> txn;
MetaServiceResponseStatus st;
st.set_code(MetaServiceCode::OK);

TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
st.set_code(cast_as<ErrCategory::CREATE>(err));
st.set_msg("failed to create txn");
return st;
}

for (const auto& tablet_stat_ptr : tablet_stat_shared_ptr_vec_batch) {
// check tablet stats
std::string tablet_stat_key;
std::string tablet_stat_value;
tablet_stat_key = stats_tablet_key(
{instance_id, tablet_stat_ptr->idx().table_id(), tablet_stat_ptr->idx().index_id(),
tablet_stat_ptr->idx().partition_id(), tablet_stat_ptr->idx().tablet_id()});
err = txn->get(tablet_stat_key, &tablet_stat_value);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
st.set_code(cast_as<ErrCategory::READ>(err));
return st;
}
TabletStatsPB tablet_stat_check;
tablet_stat_check.ParseFromArray(tablet_stat_value.data(), tablet_stat_value.size());
if (tablet_stat_check.DebugString() != tablet_stat_ptr->DebugString() &&
// If anyone data size of tablet_stat_check and tablet_stat_ptr is twice bigger than another,
// we need to rewrite it this tablet_stat.
(tablet_stat_check.data_size() > 2 * tablet_stat_ptr->data_size() ||
tablet_stat_ptr->data_size() > 2 * tablet_stat_check.data_size())) {
LOG_WARNING("[fix tablet stats]:tablet stats check failed")
.tag("tablet stat", tablet_stat_ptr->DebugString())
.tag("check tabelt stat", tablet_stat_check.DebugString());
}

// check data size
std::string tablet_stat_data_size_key;
stats_tablet_data_size_key(
{instance_id, tablet_stat_ptr->idx().table_id(), tablet_stat_ptr->idx().index_id(),
tablet_stat_ptr->idx().partition_id(), tablet_stat_ptr->idx().tablet_id()},
&tablet_stat_data_size_key);
int64_t tablet_stat_data_size = 0;
std::string tablet_stat_data_size_value(sizeof(tablet_stat_data_size), '\0');
err = txn->get(tablet_stat_data_size_key, &tablet_stat_data_size_value);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
st.set_code(cast_as<ErrCategory::READ>(err));
return st;
}
int64_t tablet_stat_data_size_check;

if (tablet_stat_data_size_value.size() != sizeof(tablet_stat_data_size_check))
[[unlikely]] {
LOG(WARNING) << " malformed tablet stats value v.size="
<< tablet_stat_data_size_value.size()
<< " value=" << hex(tablet_stat_data_size_value);
}
std::memcpy(&tablet_stat_data_size_check, tablet_stat_data_size_value.data(),
sizeof(tablet_stat_data_size_check));
if constexpr (std::endian::native == std::endian::big) {
tablet_stat_data_size_check = bswap_64(tablet_stat_data_size_check);
}
if (tablet_stat_data_size_check != tablet_stat_data_size &&
// ditto
(tablet_stat_data_size_check > 2 * tablet_stat_data_size ||
tablet_stat_data_size > 2 * tablet_stat_data_size_check)) {
LOG_WARNING("[fix tablet stats]:data size check failed")
.tag("data size", tablet_stat_data_size)
.tag("check data size", tablet_stat_data_size_check);
}
}

return st;
}

} // namespace doris::cloud
Loading

0 comments on commit ffa6fef

Please sign in to comment.