Skip to content

Commit

Permalink
[Enhancement] limit tablet write request size (#50302)
Browse files Browse the repository at this point in the history
Signed-off-by: silverbullet233 <[email protected]>
(cherry picked from commit ce25287)
  • Loading branch information
silverbullet233 authored and mergify[bot] committed Sep 19, 2024
1 parent f4c3bcd commit 739852a
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 9 deletions.
6 changes: 6 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,12 @@ CONF_Int32(metric_late_materialization_ratio, "1000");

// Max batched bytes for each transmit request. (256KB)
CONF_Int64(max_transmit_batched_bytes, "262144");
// max chunk size for each tablet write request. (512MB)
// see: https://github.com/StarRocks/starrocks/pull/50302
// NOTE: If there are a large number of columns when loading,
// a too small max_tablet_write_chunk_bytes may cause more frequent RPCs, which may affect performance.
// In this case, we can try to increase the value to avoid the problem.
CONF_mInt64(max_tablet_write_chunk_bytes, "536870912");

CONF_Int16(bitmap_max_filter_items, "30");

Expand Down
38 changes: 29 additions & 9 deletions be/src/exec/tablet_sink_index_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,20 @@ bool NodeChannel::is_full() {
return false;
}

void NodeChannel::_reset_cur_chunk(Chunk* input) {
int64_t before_consumed_bytes = CurrentThread::current().get_consumed_bytes();
_cur_chunk = input->clone_empty_with_slot();
int64_t after_consumed_bytes = CurrentThread::current().get_consumed_bytes();
_cur_chunk_mem_usage += after_consumed_bytes - before_consumed_bytes;
}

void NodeChannel::_append_data_to_cur_chunk(const Chunk& src, const uint32_t* indexes, uint32_t from, uint32_t size) {
int64_t before_consumed_bytes = CurrentThread::current().get_consumed_bytes();
_cur_chunk->append_selective(src, indexes, from, size);
int64_t after_consumed_bytes = CurrentThread::current().get_consumed_bytes();
_cur_chunk_mem_usage += after_consumed_bytes - before_consumed_bytes;
}

Status NodeChannel::add_chunk(Chunk* input, const std::vector<int64_t>& tablet_ids,
const std::vector<uint32_t>& indexes, uint32_t from, uint32_t size) {
if (_cancelled || _closed) {
Expand All @@ -420,7 +434,7 @@ Status NodeChannel::add_chunk(Chunk* input, const std::vector<int64_t>& tablet_i

DCHECK(_rpc_request.requests_size() == 1);
if (UNLIKELY(_cur_chunk == nullptr)) {
_cur_chunk = input->clone_empty_with_slot();
_reset_cur_chunk(input);
}

if (is_full()) {
Expand All @@ -433,7 +447,7 @@ Status NodeChannel::add_chunk(Chunk* input, const std::vector<int64_t>& tablet_i
SCOPED_TIMER(_ts_profile->pack_chunk_timer);
// 1. append data
if (_where_clause == nullptr) {
_cur_chunk->append_selective(*input, indexes.data(), from, size);
_append_data_to_cur_chunk(*input, indexes.data(), from, size);
auto req = _rpc_request.mutable_requests(0);
for (size_t i = 0; i < size; ++i) {
req->add_tablet_ids(tablet_ids[indexes[from + i]]);
Expand All @@ -442,14 +456,16 @@ Status NodeChannel::add_chunk(Chunk* input, const std::vector<int64_t>& tablet_i
std::vector<uint32_t> filtered_indexes;
RETURN_IF_ERROR(_filter_indexes_with_where_expr(input, indexes, filtered_indexes));
size_t filter_size = filtered_indexes.size();
_cur_chunk->append_selective(*input, filtered_indexes.data(), from, filter_size);
_append_data_to_cur_chunk(*input, filtered_indexes.data(), from, filter_size);

auto req = _rpc_request.mutable_requests(0);
for (size_t i = 0; i < filter_size; ++i) {
req->add_tablet_ids(tablet_ids[filtered_indexes[from + i]]);
}
}

if (_cur_chunk->num_rows() < _runtime_state->chunk_size()) {
if (_cur_chunk->num_rows() <= 0 || (_cur_chunk->num_rows() < _runtime_state->chunk_size() &&
_cur_chunk_mem_usage < config::max_tablet_write_chunk_bytes)) {
// 2. chunk not full
if (_request_queue.empty()) {
return Status::OK();
Expand All @@ -459,7 +475,7 @@ Status NodeChannel::add_chunk(Chunk* input, const std::vector<int64_t>& tablet_i
// 3. chunk full push back to queue
_mem_tracker->consume(_cur_chunk->memory_usage());
_request_queue.emplace_back(std::move(_cur_chunk), _rpc_request);
_cur_chunk = input->clone_empty_with_slot();
_reset_cur_chunk(input);
_rpc_request.mutable_requests(0)->clear_tablet_ids();
}

Expand All @@ -480,7 +496,7 @@ Status NodeChannel::add_chunks(Chunk* input, const std::vector<std::vector<int64

DCHECK(index_tablet_ids.size() == _rpc_request.requests_size());
if (UNLIKELY(_cur_chunk == nullptr)) {
_cur_chunk = input->clone_empty_with_slot();
_reset_cur_chunk(input);
}

if (is_full()) {
Expand All @@ -490,16 +506,19 @@ Status NodeChannel::add_chunks(Chunk* input, const std::vector<std::vector<int64
}

SCOPED_TIMER(_ts_profile->pack_chunk_timer);

// 1. append data
_cur_chunk->append_selective(*input, indexes.data(), from, size);
_append_data_to_cur_chunk(*input, indexes.data(), from, size);

for (size_t index_i = 0; index_i < index_tablet_ids.size(); ++index_i) {
auto req = _rpc_request.mutable_requests(index_i);
for (size_t i = from; i < size; ++i) {
req->add_tablet_ids(index_tablet_ids[index_i][indexes[from + i]]);
}
}

if (_cur_chunk->num_rows() < _runtime_state->chunk_size()) {
if (_cur_chunk->num_rows() <= 0 || (_cur_chunk->num_rows() < _runtime_state->chunk_size() &&
_cur_chunk_mem_usage < config::max_tablet_write_chunk_bytes)) {
// 2. chunk not full
if (_request_queue.empty()) {
return Status::OK();
Expand All @@ -509,7 +528,7 @@ Status NodeChannel::add_chunks(Chunk* input, const std::vector<std::vector<int64
// 3. chunk full push back to queue
_mem_tracker->consume(_cur_chunk->memory_usage());
_request_queue.emplace_back(std::move(_cur_chunk), _rpc_request);
_cur_chunk = input->clone_empty_with_slot();
_reset_cur_chunk(input);
for (size_t index_i = 0; index_i < index_tablet_ids.size(); ++index_i) {
_rpc_request.mutable_requests(index_i)->clear_tablet_ids();
}
Expand Down Expand Up @@ -564,6 +583,7 @@ Status NodeChannel::_send_request(bool eos, bool finished) {
_mem_tracker->consume(_cur_chunk->memory_usage());
_request_queue.emplace_back(std::move(_cur_chunk), _rpc_request);
_cur_chunk = nullptr;
_cur_chunk_mem_usage = 0;
}

// try to send chunk in queue first
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/tablet_sink_index_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ class NodeChannel {
Status _filter_indexes_with_where_expr(Chunk* input, const std::vector<uint32_t>& indexes,
std::vector<uint32_t>& filtered_indexes);

void _reset_cur_chunk(Chunk* input);
void _append_data_to_cur_chunk(const Chunk& src, const uint32_t* indexes, uint32_t from, uint32_t size);

std::unique_ptr<MemTracker> _mem_tracker = nullptr;

OlapTableSink* _parent = nullptr;
Expand Down Expand Up @@ -232,6 +235,7 @@ class NodeChannel {
size_t _max_parallel_request_size = 1;
std::vector<ReusableClosure<PTabletWriterAddBatchResult>*> _add_batch_closures;
std::unique_ptr<Chunk> _cur_chunk;
int64_t _cur_chunk_mem_usage = 0;

PTabletWriterAddChunksRequest _rpc_request;
using AddMultiChunkReq = std::pair<std::unique_ptr<Chunk>, PTabletWriterAddChunksRequest>;
Expand Down

0 comments on commit 739852a

Please sign in to comment.