diff --git a/be/src/common/config.h b/be/src/common/config.h index 9d7f6a561bd96..27357eeca95aa 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -703,6 +703,12 @@ CONF_Int32(metric_late_materialization_ratio, "1000"); // Max batched bytes for each transmit request. (256KB) CONF_Int64(max_transmit_batched_bytes, "262144"); +// max chunk size for each tablet write request. (512MB) +// see: https://github.com/StarRocks/starrocks/pull/50302 +// NOTE: If there are a large number of columns when loading, +// a too small max_tablet_write_chunk_bytes may cause more frequent RPCs, which may affect performance. +// In this case, we can try to increase the value to avoid the problem. +CONF_mInt64(max_tablet_write_chunk_bytes, "536870912"); CONF_Int16(bitmap_max_filter_items, "30"); diff --git a/be/src/exec/tablet_sink_index_channel.cpp b/be/src/exec/tablet_sink_index_channel.cpp index c3b1f852d9798..ffd39948d5408 100644 --- a/be/src/exec/tablet_sink_index_channel.cpp +++ b/be/src/exec/tablet_sink_index_channel.cpp @@ -412,6 +412,20 @@ bool NodeChannel::is_full() { return false; } +void NodeChannel::_reset_cur_chunk(Chunk* input) { + int64_t before_consumed_bytes = CurrentThread::current().get_consumed_bytes(); + _cur_chunk = input->clone_empty_with_slot(); + int64_t after_consumed_bytes = CurrentThread::current().get_consumed_bytes(); + _cur_chunk_mem_usage += after_consumed_bytes - before_consumed_bytes; +} + +void NodeChannel::_append_data_to_cur_chunk(const Chunk& src, const uint32_t* indexes, uint32_t from, uint32_t size) { + int64_t before_consumed_bytes = CurrentThread::current().get_consumed_bytes(); + _cur_chunk->append_selective(src, indexes, from, size); + int64_t after_consumed_bytes = CurrentThread::current().get_consumed_bytes(); + _cur_chunk_mem_usage += after_consumed_bytes - before_consumed_bytes; +} + Status NodeChannel::add_chunk(Chunk* input, const std::vector& tablet_ids, const std::vector& indexes, uint32_t from, uint32_t size) { if (_cancelled || _closed) { @@ -420,7 +434,7 @@ Status NodeChannel::add_chunk(Chunk* input, const std::vector& tablet_i DCHECK(_rpc_request.requests_size() == 1); if (UNLIKELY(_cur_chunk == nullptr)) { - _cur_chunk = input->clone_empty_with_slot(); + _reset_cur_chunk(input); } if (is_full()) { @@ -433,7 +447,7 @@ Status NodeChannel::add_chunk(Chunk* input, const std::vector& tablet_i SCOPED_TIMER(_ts_profile->pack_chunk_timer); // 1. append data if (_where_clause == nullptr) { - _cur_chunk->append_selective(*input, indexes.data(), from, size); + _append_data_to_cur_chunk(*input, indexes.data(), from, size); auto req = _rpc_request.mutable_requests(0); for (size_t i = 0; i < size; ++i) { req->add_tablet_ids(tablet_ids[indexes[from + i]]); @@ -442,14 +456,16 @@ Status NodeChannel::add_chunk(Chunk* input, const std::vector& tablet_i std::vector filtered_indexes; RETURN_IF_ERROR(_filter_indexes_with_where_expr(input, indexes, filtered_indexes)); size_t filter_size = filtered_indexes.size(); - _cur_chunk->append_selective(*input, filtered_indexes.data(), from, filter_size); + _append_data_to_cur_chunk(*input, filtered_indexes.data(), from, filter_size); + auto req = _rpc_request.mutable_requests(0); for (size_t i = 0; i < filter_size; ++i) { req->add_tablet_ids(tablet_ids[filtered_indexes[from + i]]); } } - if (_cur_chunk->num_rows() < _runtime_state->chunk_size()) { + if (_cur_chunk->num_rows() <= 0 || (_cur_chunk->num_rows() < _runtime_state->chunk_size() && + _cur_chunk_mem_usage < config::max_tablet_write_chunk_bytes)) { // 2. chunk not full if (_request_queue.empty()) { return Status::OK(); @@ -459,7 +475,7 @@ Status NodeChannel::add_chunk(Chunk* input, const std::vector& tablet_i // 3. chunk full push back to queue _mem_tracker->consume(_cur_chunk->memory_usage()); _request_queue.emplace_back(std::move(_cur_chunk), _rpc_request); - _cur_chunk = input->clone_empty_with_slot(); + _reset_cur_chunk(input); _rpc_request.mutable_requests(0)->clear_tablet_ids(); } @@ -480,7 +496,7 @@ Status NodeChannel::add_chunks(Chunk* input, const std::vectorclone_empty_with_slot(); + _reset_cur_chunk(input); } if (is_full()) { @@ -490,8 +506,10 @@ Status NodeChannel::add_chunks(Chunk* input, const std::vectorpack_chunk_timer); + // 1. append data - _cur_chunk->append_selective(*input, indexes.data(), from, size); + _append_data_to_cur_chunk(*input, indexes.data(), from, size); + for (size_t index_i = 0; index_i < index_tablet_ids.size(); ++index_i) { auto req = _rpc_request.mutable_requests(index_i); for (size_t i = from; i < size; ++i) { @@ -499,7 +517,8 @@ Status NodeChannel::add_chunks(Chunk* input, const std::vectornum_rows() < _runtime_state->chunk_size()) { + if (_cur_chunk->num_rows() <= 0 || (_cur_chunk->num_rows() < _runtime_state->chunk_size() && + _cur_chunk_mem_usage < config::max_tablet_write_chunk_bytes)) { // 2. chunk not full if (_request_queue.empty()) { return Status::OK(); @@ -509,7 +528,7 @@ Status NodeChannel::add_chunks(Chunk* input, const std::vectorconsume(_cur_chunk->memory_usage()); _request_queue.emplace_back(std::move(_cur_chunk), _rpc_request); - _cur_chunk = input->clone_empty_with_slot(); + _reset_cur_chunk(input); for (size_t index_i = 0; index_i < index_tablet_ids.size(); ++index_i) { _rpc_request.mutable_requests(index_i)->clear_tablet_ids(); } @@ -564,6 +583,7 @@ Status NodeChannel::_send_request(bool eos, bool finished) { _mem_tracker->consume(_cur_chunk->memory_usage()); _request_queue.emplace_back(std::move(_cur_chunk), _rpc_request); _cur_chunk = nullptr; + _cur_chunk_mem_usage = 0; } // try to send chunk in queue first diff --git a/be/src/exec/tablet_sink_index_channel.h b/be/src/exec/tablet_sink_index_channel.h index cdada1cdcdb59..4eeb8582d82fa 100644 --- a/be/src/exec/tablet_sink_index_channel.h +++ b/be/src/exec/tablet_sink_index_channel.h @@ -183,6 +183,9 @@ class NodeChannel { Status _filter_indexes_with_where_expr(Chunk* input, const std::vector& indexes, std::vector& filtered_indexes); + void _reset_cur_chunk(Chunk* input); + void _append_data_to_cur_chunk(const Chunk& src, const uint32_t* indexes, uint32_t from, uint32_t size); + std::unique_ptr _mem_tracker = nullptr; OlapTableSink* _parent = nullptr; @@ -232,6 +235,7 @@ class NodeChannel { size_t _max_parallel_request_size = 1; std::vector*> _add_batch_closures; std::unique_ptr _cur_chunk; + int64_t _cur_chunk_mem_usage = 0; PTabletWriterAddChunksRequest _rpc_request; using AddMultiChunkReq = std::pair, PTabletWriterAddChunksRequest>;