Skip to content

Commit

Permalink
[BugFix] fix tablet writer add chunk when larger than 2GB (#51152)
Browse files Browse the repository at this point in the history
Signed-off-by: luohaha <[email protected]>
(cherry picked from commit 1766f02)

# Conflicts:
#	be/src/util/brpc_stub_cache.h
#	be/src/util/internal_service_recoverable_stub.cpp
#	be/src/util/internal_service_recoverable_stub.h
  • Loading branch information
luohaha authored and mergify[bot] committed Sep 20, 2024
1 parent 0c2c01e commit 94aedf4
Show file tree
Hide file tree
Showing 6 changed files with 349 additions and 19 deletions.
50 changes: 41 additions & 9 deletions be/src/exec/tablet_sink_index_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -564,14 +564,43 @@ Status NodeChannel::_filter_indexes_with_where_expr(Chunk* input, const std::vec
return Status::OK();
}

// Seperate chunk from protobuf, so `SerializeToZeroCopyStream` won't fail because of >2GB serialize.
//
// IOBuf format:
// | protobuf len |
// | protobuf (without chunk) |
// | chunk (1) |
// | chunk (2) |
// | chunk (...) |
// | chunk (N) |
template <typename T>
void serialize_to_iobuf(const T& proto_obj, butil::IOBuf* iobuf) {
butil::IOBuf tmp_iobuf;
butil::IOBufAsZeroCopyOutputStream wrapper(&tmp_iobuf);
void serialize_to_iobuf(T& proto_obj, butil::IOBuf* iobuf) {
butil::IOBuf proto_iobuf; // used for store protbuf serialize data
butil::IOBuf chunk_iobuf; // used for store chunks
if constexpr (std::is_same<T, PTabletWriterAddChunkRequest>::value) {
auto chunk = proto_obj.mutable_chunk();
chunk->set_data_size(chunk->data().size());
chunk_iobuf.append(chunk->data());
chunk->clear_data(); // clear data, so protobuf serialize won't return >2GB error.
chunk->mutable_data()->shrink_to_fit();
} else if constexpr (std::is_same<T, PTabletWriterAddChunksRequest>::value) {
for (int i = 0; i < proto_obj.requests_size(); i++) {
auto request = proto_obj.mutable_requests(i);
auto chunk = request->mutable_chunk();
chunk->set_data_size(chunk->data().size());
chunk_iobuf.append(chunk->data());
chunk->clear_data(); // clear data, so protobuf serialize won't return >2GB error.
chunk->mutable_data()->shrink_to_fit();
}
}
butil::IOBufAsZeroCopyOutputStream wrapper(&proto_iobuf);
proto_obj.SerializeToZeroCopyStream(&wrapper);
size_t request_size = tmp_iobuf.size();
iobuf->append(&request_size, sizeof(request_size));
iobuf->append(tmp_iobuf);
// append protobuf
size_t proto_iobuf_size = proto_iobuf.size();
iobuf->append(&proto_iobuf_size, sizeof(proto_iobuf_size));
iobuf->append(proto_iobuf);
// append chunk
iobuf->append(chunk_iobuf);
}

Status NodeChannel::_send_request(bool eos, bool finished) {
Expand Down Expand Up @@ -662,7 +691,8 @@ Status NodeChannel::_send_request(bool eos, bool finished) {
auto closure = _add_batch_closures[_current_request_index];
serialize_to_iobuf<PTabletWriterAddChunksRequest>(request, &closure->cntl.request_attachment());
res.value()->tablet_writer_add_chunks_via_http(&closure->cntl, nullptr, &closure->result, closure);
VLOG(2) << "NodeChannel::_send_request() issue a http rpc, request size = " << request.ByteSizeLong();
VLOG(2) << "NodeChannel::_send_request() issue a http rpc, request size = "
<< closure->cntl.request_attachment().size();
} else {
_stub->tablet_writer_add_chunks(&_add_batch_closures[_current_request_index]->cntl, &request,
&_add_batch_closures[_current_request_index]->result,
Expand All @@ -680,9 +710,11 @@ Status NodeChannel::_send_request(bool eos, bool finished) {
return res.status();
}
auto closure = _add_batch_closures[_current_request_index];
serialize_to_iobuf<PTabletWriterAddChunkRequest>(request.requests(0), &closure->cntl.request_attachment());
serialize_to_iobuf<PTabletWriterAddChunkRequest>(*request.mutable_requests(0),
&closure->cntl.request_attachment());
res.value()->tablet_writer_add_chunk_via_http(&closure->cntl, nullptr, &closure->result, closure);
VLOG(2) << "NodeChannel::_send_request() issue a http rpc, request size = " << request.ByteSizeLong();
VLOG(2) << "NodeChannel::_send_request() issue a http rpc, request size = "
<< closure->cntl.request_attachment().size();
} else {
_stub->tablet_writer_add_chunk(
&_add_batch_closures[_current_request_index]->cntl, request.mutable_requests(0),
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/tablet_sink_index_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class OlapTableSink; // forward declaration
class TabletSinkSender; // forward declaration

template <typename T>
void serialize_to_iobuf(const T& proto_obj, butil::IOBuf* iobuf);
void serialize_to_iobuf(T& proto_obj, butil::IOBuf* iobuf);

// The counter of add_batch rpc of a single node
struct AddBatchCounter {
Expand Down
46 changes: 37 additions & 9 deletions be/src/service/service_be/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,22 +98,50 @@ void BackendInternalServiceImpl<T>::tablet_writer_add_chunks(google::protobuf::R

template <typename T>
static bool parse_from_iobuf(butil::IOBuf& iobuf, T* proto_obj) {
// deserialize
size_t request_size = 0;
if (!iobuf.cutn(&request_size, sizeof(request_size))) {
LOG(ERROR) << "Failed to read request size";
// 1. deserialize protobuf
size_t protobuf_size = 0;
if (!iobuf.cutn(&protobuf_size, sizeof(protobuf_size))) {
LOG(ERROR) << "Failed to read protobuf_size";
return false;
}
butil::IOBuf request_from;
if (!iobuf.cutn(&request_from, request_size)) {
LOG(ERROR) << "Failed to cut the required size from the io buffer";
butil::IOBuf protobuf_buf;
if (!iobuf.cutn(&protobuf_buf, protobuf_size)) {
LOG(ERROR) << "Failed to cut the protobuf_size from the io buffer";
return false;
}
butil::IOBufAsZeroCopyInputStream wrapper(request_from);
butil::IOBufAsZeroCopyInputStream wrapper(protobuf_buf);
if (!proto_obj->ParseFromZeroCopyStream(&wrapper)) {
LOG(ERROR) << "Failed to parse the request";
LOG(ERROR) << "Failed to parse the protobuf";
return false;
}
// 2. deserialize chunks
if constexpr (std::is_same<T, PTabletWriterAddChunkRequest>::value) {
auto chunk = proto_obj->mutable_chunk();
if (iobuf.size() < chunk->data_size()) {
LOG(ERROR) << fmt::format("Not enough data in iobuf. Expected: {}, available: {}.", chunk->data_size(),
iobuf.size());
return false;
}
auto size = iobuf.cutn(chunk->mutable_data(), chunk->data_size());
if (size != chunk->data_size()) {
LOG(ERROR) << fmt::format("iobuf read {} != expected {}.", size, chunk->data_size());
return false;
}
} else if constexpr (std::is_same<T, PTabletWriterAddChunksRequest>::value) {
for (int i = 0; i < proto_obj->requests_size(); i++) {
auto chunk = proto_obj->mutable_requests(i)->mutable_chunk();
if (iobuf.size() < chunk->data_size()) {
LOG(ERROR) << fmt::format("Not enough data in iobuf. Expected: {}, available: {}.", chunk->data_size(),
iobuf.size());
return false;
}
auto size = iobuf.cutn(chunk->mutable_data(), chunk->data_size());
if (size != chunk->data_size()) {
LOG(ERROR) << fmt::format("iobuf read {} != expected {}.", size, chunk->data_size());
return false;
}
}
}
return true;
}

Expand Down
5 changes: 5 additions & 0 deletions be/src/util/brpc_stub_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ class HttpBrpcStubCache {
return *stub_ptr;
}
// create
<<<<<<< HEAD
brpc::ChannelOptions options;
options.connect_timeout_ms = config::rpc_connect_timeout_ms;
options.protocol = "http";
Expand All @@ -184,6 +185,10 @@ class HttpBrpcStubCache {
options.max_retry = 3;
std::unique_ptr<brpc::Channel> channel(new brpc::Channel());
if (channel->Init(endpoint, &options)) {
=======
auto stub = std::make_shared<PInternalService_RecoverableStub>(endpoint);
if (!stub->reset_channel("http").ok()) {
>>>>>>> 1766f02e1d ([BugFix] fix tablet writer add chunk when larger than 2GB (#51152))
return Status::RuntimeError("init brpc http channel error on " + taddr.hostname + ":" +
std::to_string(taddr.port));
}
Expand Down
171 changes: 171 additions & 0 deletions be/src/util/internal_service_recoverable_stub.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "util/internal_service_recoverable_stub.h"

#include <utility>

#include "common/config.h"

namespace starrocks {

class RecoverableClosure : public ::google::protobuf::Closure {
public:
RecoverableClosure(std::shared_ptr<starrocks::PInternalService_RecoverableStub> stub,
::google::protobuf::RpcController* controller, ::google::protobuf::Closure* done)
: _stub(std::move(std::move(stub))), _controller(controller), _done(done) {}

void Run() override {
auto* cntl = static_cast<brpc::Controller*>(_controller);
if (cntl->Failed() && cntl->ErrorCode() == EHOSTDOWN) {
auto st = _stub->reset_channel();
if (!st.ok()) {
LOG(WARNING) << "Fail to reset channel: " << st.to_string();
}
}
_done->Run();
delete this;
}

private:
std::shared_ptr<starrocks::PInternalService_RecoverableStub> _stub;
::google::protobuf::RpcController* _controller;
::google::protobuf::Closure* _done;
};

PInternalService_RecoverableStub::PInternalService_RecoverableStub(const butil::EndPoint& endpoint)
: _endpoint(endpoint) {}

PInternalService_RecoverableStub::~PInternalService_RecoverableStub() = default;

Status PInternalService_RecoverableStub::reset_channel(const std::string& protocol) {
std::lock_guard<std::mutex> l(_mutex);
brpc::ChannelOptions options;
options.connect_timeout_ms = config::rpc_connect_timeout_ms;
if (protocol == "http") {
options.protocol = protocol;
} else {
// http does not support these.
options.connection_type = config::brpc_connection_type;
options.connection_group = std::to_string(_connection_group++);
}
options.max_retry = 3;
std::unique_ptr<brpc::Channel> channel(new brpc::Channel());
if (channel->Init(_endpoint, &options)) {
LOG(WARNING) << "Fail to init channel " << _endpoint;
return Status::InternalError("Fail to init channel");
}
_stub = std::make_shared<PInternalService_Stub>(channel.release(), google::protobuf::Service::STUB_OWNS_CHANNEL);
return Status::OK();
}

void PInternalService_RecoverableStub::tablet_writer_open(::google::protobuf::RpcController* controller,
const ::starrocks::PTabletWriterOpenRequest* request,
::starrocks::PTabletWriterOpenResult* response,
::google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->tablet_writer_open(controller, request, response, closure);
}

void PInternalService_RecoverableStub::tablet_writer_cancel(::google::protobuf::RpcController* controller,
const ::starrocks::PTabletWriterCancelRequest* request,
::starrocks::PTabletWriterCancelResult* response,
::google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->tablet_writer_cancel(controller, request, response, closure);
}

void PInternalService_RecoverableStub::transmit_chunk(::google::protobuf::RpcController* controller,
const ::starrocks::PTransmitChunkParams* request,
::starrocks::PTransmitChunkResult* response,
::google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->transmit_chunk(controller, request, response, closure);
}

void PInternalService_RecoverableStub::transmit_chunk_via_http(::google::protobuf::RpcController* controller,
const ::starrocks::PHttpRequest* request,
::starrocks::PTransmitChunkResult* response,
::google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->transmit_chunk_via_http(controller, request, response, closure);
}

void PInternalService_RecoverableStub::tablet_writer_add_chunk(::google::protobuf::RpcController* controller,
const ::starrocks::PTabletWriterAddChunkRequest* request,
::starrocks::PTabletWriterAddBatchResult* response,
::google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->tablet_writer_add_chunk(controller, request, response, closure);
}

void PInternalService_RecoverableStub::tablet_writer_add_chunks(
::google::protobuf::RpcController* controller, const ::starrocks::PTabletWriterAddChunksRequest* request,
::starrocks::PTabletWriterAddBatchResult* response, ::google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->tablet_writer_add_chunks(controller, request, response, closure);
}

void PInternalService_RecoverableStub::tablet_writer_add_chunk_via_http(
::google::protobuf::RpcController* controller, const ::starrocks::PHttpRequest* request,
::starrocks::PTabletWriterAddBatchResult* response, ::google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->tablet_writer_add_chunk_via_http(controller, request, response, closure);
}

void PInternalService_RecoverableStub::tablet_writer_add_chunks_via_http(
::google::protobuf::RpcController* controller, const ::starrocks::PHttpRequest* request,
::starrocks::PTabletWriterAddBatchResult* response, ::google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->tablet_writer_add_chunks_via_http(controller, request, response, closure);
}

void PInternalService_RecoverableStub::tablet_writer_add_segment(
::google::protobuf::RpcController* controller, const ::starrocks::PTabletWriterAddSegmentRequest* request,
::starrocks::PTabletWriterAddSegmentResult* response, ::google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->tablet_writer_add_segment(controller, request, response, closure);
}

void PInternalService_RecoverableStub::transmit_runtime_filter(::google::protobuf::RpcController* controller,
const ::starrocks::PTransmitRuntimeFilterParams* request,
::starrocks::PTransmitRuntimeFilterResult* response,
::google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->transmit_runtime_filter(controller, request, response, closure);
}

void PInternalService_RecoverableStub::local_tablet_reader_multi_get(
::google::protobuf::RpcController* controller, const ::starrocks::PTabletReaderMultiGetRequest* request,
::starrocks::PTabletReaderMultiGetResult* response, ::google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->local_tablet_reader_multi_get(controller, request, response, closure);
}

void PInternalService_RecoverableStub::execute_command(::google::protobuf::RpcController* controller,
const ::starrocks::ExecuteCommandRequestPB* request,
::starrocks::ExecuteCommandResultPB* response,
::google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->execute_command(controller, request, response, closure);
}

void PInternalService_RecoverableStub::process_dictionary_cache(
::google::protobuf::RpcController* controller, const ::starrocks::PProcessDictionaryCacheRequest* request,
::starrocks::PProcessDictionaryCacheResult* response, ::google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->process_dictionary_cache(controller, request, response, closure);
}

} // namespace starrocks
Loading

0 comments on commit 94aedf4

Please sign in to comment.