Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] fix tablet writer add chunk when larger than 2GB #51152

Merged
merged 7 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 41 additions & 9 deletions be/src/exec/tablet_sink_index_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -548,14 +548,43 @@ Status NodeChannel::_filter_indexes_with_where_expr(Chunk* input, const std::vec
return Status::OK();
}

// Seperate chunk from protobuf, so `SerializeToZeroCopyStream` won't fail because of >2GB serialize.
//
// IOBuf format:
// | protobuf len |
// | protobuf (without chunk) |
// | chunk (1) |
// | chunk (2) |
// | chunk (...) |
// | chunk (N) |
template <typename T>
void serialize_to_iobuf(const T& proto_obj, butil::IOBuf* iobuf) {
butil::IOBuf tmp_iobuf;
butil::IOBufAsZeroCopyOutputStream wrapper(&tmp_iobuf);
void serialize_to_iobuf(T& proto_obj, butil::IOBuf* iobuf) {
wyb marked this conversation as resolved.
Show resolved Hide resolved
butil::IOBuf proto_iobuf; // used for store protbuf serialize data
butil::IOBuf chunk_iobuf; // used for store chunks
if constexpr (std::is_same<T, PTabletWriterAddChunkRequest>::value) {
auto chunk = proto_obj.mutable_chunk();
chunk->set_data_size(chunk->data().size());
chunk_iobuf.append(chunk->data());
chunk->clear_data(); // clear data, so protobuf serialize won't return >2GB error.
chunk->mutable_data()->shrink_to_fit();
} else if constexpr (std::is_same<T, PTabletWriterAddChunksRequest>::value) {
for (int i = 0; i < proto_obj.requests_size(); i++) {
auto request = proto_obj.mutable_requests(i);
auto chunk = request->mutable_chunk();
chunk->set_data_size(chunk->data().size());
chunk_iobuf.append(chunk->data());
chunk->clear_data(); // clear data, so protobuf serialize won't return >2GB error.
chunk->mutable_data()->shrink_to_fit();
}
}
butil::IOBufAsZeroCopyOutputStream wrapper(&proto_iobuf);
proto_obj.SerializeToZeroCopyStream(&wrapper);
size_t request_size = tmp_iobuf.size();
iobuf->append(&request_size, sizeof(request_size));
iobuf->append(tmp_iobuf);
// append protobuf
size_t proto_iobuf_size = proto_iobuf.size();
iobuf->append(&proto_iobuf_size, sizeof(proto_iobuf_size));
iobuf->append(proto_iobuf);
// append chunk
iobuf->append(chunk_iobuf);
}

Status NodeChannel::_send_request(bool eos, bool finished) {
Expand Down Expand Up @@ -645,7 +674,8 @@ Status NodeChannel::_send_request(bool eos, bool finished) {
auto closure = _add_batch_closures[_current_request_index];
serialize_to_iobuf<PTabletWriterAddChunksRequest>(request, &closure->cntl.request_attachment());
res.value()->tablet_writer_add_chunks_via_http(&closure->cntl, nullptr, &closure->result, closure);
VLOG(2) << "NodeChannel::_send_request() issue a http rpc, request size = " << request.ByteSizeLong();
VLOG(2) << "NodeChannel::_send_request() issue a http rpc, request size = "
<< closure->cntl.request_attachment().size();
} else {
_stub->tablet_writer_add_chunks(&_add_batch_closures[_current_request_index]->cntl, &request,
&_add_batch_closures[_current_request_index]->result,
Expand All @@ -663,9 +693,11 @@ Status NodeChannel::_send_request(bool eos, bool finished) {
return res.status();
}
auto closure = _add_batch_closures[_current_request_index];
serialize_to_iobuf<PTabletWriterAddChunkRequest>(request.requests(0), &closure->cntl.request_attachment());
serialize_to_iobuf<PTabletWriterAddChunkRequest>(*request.mutable_requests(0),
&closure->cntl.request_attachment());
res.value()->tablet_writer_add_chunk_via_http(&closure->cntl, nullptr, &closure->result, closure);
VLOG(2) << "NodeChannel::_send_request() issue a http rpc, request size = " << request.ByteSizeLong();
VLOG(2) << "NodeChannel::_send_request() issue a http rpc, request size = "
<< closure->cntl.request_attachment().size();
} else {
_stub->tablet_writer_add_chunk(
luohaha marked this conversation as resolved.
Show resolved Hide resolved
&_add_batch_closures[_current_request_index]->cntl, request.mutable_requests(0),
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/tablet_sink_index_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class OlapTableSink; // forward declaration
class TabletSinkSender; // forward declaration

template <typename T>
void serialize_to_iobuf(const T& proto_obj, butil::IOBuf* iobuf);
void serialize_to_iobuf(T& proto_obj, butil::IOBuf* iobuf);

// The counter of add_batch rpc of a single node
struct AddBatchCounter {
Expand Down
46 changes: 37 additions & 9 deletions be/src/service/service_be/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,22 +98,50 @@ void BackendInternalServiceImpl<T>::tablet_writer_add_chunks(google::protobuf::R

template <typename T>
static bool parse_from_iobuf(butil::IOBuf& iobuf, T* proto_obj) {
// deserialize
size_t request_size = 0;
if (!iobuf.cutn(&request_size, sizeof(request_size))) {
LOG(ERROR) << "Failed to read request size";
// 1. deserialize protobuf
size_t protobuf_size = 0;
if (!iobuf.cutn(&protobuf_size, sizeof(protobuf_size))) {
LOG(ERROR) << "Failed to read protobuf_size";
return false;
}
butil::IOBuf request_from;
if (!iobuf.cutn(&request_from, request_size)) {
LOG(ERROR) << "Failed to cut the required size from the io buffer";
butil::IOBuf protobuf_buf;
if (!iobuf.cutn(&protobuf_buf, protobuf_size)) {
LOG(ERROR) << "Failed to cut the protobuf_size from the io buffer";
return false;
}
butil::IOBufAsZeroCopyInputStream wrapper(request_from);
butil::IOBufAsZeroCopyInputStream wrapper(protobuf_buf);
if (!proto_obj->ParseFromZeroCopyStream(&wrapper)) {
LOG(ERROR) << "Failed to parse the request";
LOG(ERROR) << "Failed to parse the protobuf";
return false;
}
// 2. deserialize chunks
if constexpr (std::is_same<T, PTabletWriterAddChunkRequest>::value) {
auto chunk = proto_obj->mutable_chunk();
if (iobuf.size() < chunk->data_size()) {
LOG(ERROR) << fmt::format("Not enough data in iobuf. Expected: {}, available: {}.", chunk->data_size(),
iobuf.size());
return false;
}
auto size = iobuf.cutn(chunk->mutable_data(), chunk->data_size());
if (size != chunk->data_size()) {
LOG(ERROR) << fmt::format("iobuf read {} != expected {}.", size, chunk->data_size());
return false;
}
} else if constexpr (std::is_same<T, PTabletWriterAddChunksRequest>::value) {
for (int i = 0; i < proto_obj->requests_size(); i++) {
auto chunk = proto_obj->mutable_requests(i)->mutable_chunk();
if (iobuf.size() < chunk->data_size()) {
wyb marked this conversation as resolved.
Show resolved Hide resolved
LOG(ERROR) << fmt::format("Not enough data in iobuf. Expected: {}, available: {}.", chunk->data_size(),
iobuf.size());
return false;
}
auto size = iobuf.cutn(chunk->mutable_data(), chunk->data_size());
if (size != chunk->data_size()) {
LOG(ERROR) << fmt::format("iobuf read {} != expected {}.", size, chunk->data_size());
return false;
}
}
}
return true;
}

luohaha marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
2 changes: 1 addition & 1 deletion be/src/util/brpc_stub_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class HttpBrpcStubCache {
}
// create
auto stub = std::make_shared<PInternalService_RecoverableStub>(endpoint);
if (!stub->reset_channel().ok()) {
if (!stub->reset_channel("http").ok()) {
return Status::RuntimeError("init brpc http channel error on " + taddr.hostname + ":" +
std::to_string(taddr.port));
}
Expand Down
11 changes: 8 additions & 3 deletions be/src/util/internal_service_recoverable_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,18 @@ PInternalService_RecoverableStub::PInternalService_RecoverableStub(const butil::

PInternalService_RecoverableStub::~PInternalService_RecoverableStub() = default;

Status PInternalService_RecoverableStub::reset_channel() {
Status PInternalService_RecoverableStub::reset_channel(const std::string& protocol) {
std::lock_guard<std::mutex> l(_mutex);
brpc::ChannelOptions options;
options.connect_timeout_ms = config::rpc_connect_timeout_ms;
if (protocol == "http") {
options.protocol = protocol;
} else {
// http does not support these.
options.connection_type = config::brpc_connection_type;
options.connection_group = std::to_string(_connection_group++);
}
options.max_retry = 3;
options.connection_type = config::brpc_connection_type;
options.connection_group = std::to_string(_connection_group++);
std::unique_ptr<brpc::Channel> channel(new brpc::Channel());
if (channel->Init(_endpoint, &options)) {
LOG(WARNING) << "Fail to init channel " << _endpoint;
Expand Down
2 changes: 1 addition & 1 deletion be/src/util/internal_service_recoverable_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class PInternalService_RecoverableStub : public PInternalService,
PInternalService_RecoverableStub(const butil::EndPoint& endpoint);
~PInternalService_RecoverableStub();

Status reset_channel();
Status reset_channel(const std::string& protocol = "");

#ifdef BE_TEST
PInternalService_Stub* stub() { return _stub.get(); }
Expand Down
Loading