From ad0bedd148e4263cf232f9b89371132c7524e847 Mon Sep 17 00:00:00 2001 From: Siyang Tang Date: Fri, 29 Nov 2024 17:04:50 +0800 Subject: [PATCH 1/5] [fix](meta-service) Avoid rowset meta exceeds 2G result in protobuf fatal --- cloud/src/meta-service/meta_service.cpp | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 4f374832925dd7..148ee0d9d862ed 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -1310,15 +1310,24 @@ void internal_get_rowset(Transaction* txn, int64_t start, int64_t end, while (it->has_next()) { auto [k, v] = it->next(); - auto rs = response->add_rowset_meta(); + auto* rs = response->add_rowset_meta(); + auto byte_size = rs->ByteSizeLong(); + if (byte_size + v.size() > std::numeric_limits::max()) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = "rowset meta exceeded 2G, unable to serialize"; + LOG(WARNING) << msg << " key=" << hex(k); + return; + } if (!rs->ParseFromArray(v.data(), v.size())) { code = MetaServiceCode::PROTOBUF_PARSE_ERR; - msg = "malformed rowset meta, unable to deserialize"; + msg = "malformed rowset meta, unable to serialize"; LOG(WARNING) << msg << " key=" << hex(k); return; } ++num_rowsets; - if (!it->has_next()) key0 = k; + if (!it->has_next()) { + key0 = k; + } } key0.push_back('\x00'); // Update to next smallest key for iteration } while (it->more()); From e29ec137b912792a4e8513bd23ef5bd503ce0ac3 Mon Sep 17 00:00:00 2001 From: Siyang Tang Date: Thu, 5 Dec 2024 20:36:44 +0800 Subject: [PATCH 2/5] log details and add ut --- cloud/src/meta-service/meta_service.cpp | 15 +++-- cloud/test/txn_lazy_commit_test.cpp | 90 ++++++++++++++++++++++++- 2 files changed, 100 insertions(+), 5 deletions(-) diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 148ee0d9d862ed..2562d15339ac7b 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -87,7 +87,9 @@ std::string get_instance_id(const std::shared_ptr& rc_mgr, std::vector nodes; std::string err = rc_mgr->get_node(cloud_unique_id, &nodes); - { TEST_SYNC_POINT_CALLBACK("get_instance_id_err", &err); } + { + TEST_SYNC_POINT_CALLBACK("get_instance_id_err", &err); + } std::string instance_id; if (!err.empty()) { // cache can't find cloud_unique_id, so degraded by parse cloud_unique_id @@ -284,7 +286,9 @@ void MetaServiceImpl::get_version(::google::protobuf::RpcController* controller, response->set_version(version_pb.version()); response->add_version_update_time_ms(version_pb.update_time_ms()); } - { TEST_SYNC_POINT_CALLBACK("get_version_code", &code); } + { + TEST_SYNC_POINT_CALLBACK("get_version_code", &code); + } return; } else if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { msg = "not found"; @@ -1312,10 +1316,13 @@ void internal_get_rowset(Transaction* txn, int64_t start, int64_t end, auto [k, v] = it->next(); auto* rs = response->add_rowset_meta(); auto byte_size = rs->ByteSizeLong(); + TEST_SYNC_POINT_CALLBACK("get_rowset:meta_exceed_limit", &byte_size); if (byte_size + v.size() > std::numeric_limits::max()) { code = MetaServiceCode::PROTOBUF_PARSE_ERR; - msg = "rowset meta exceeded 2G, unable to serialize"; - LOG(WARNING) << msg << " key=" << hex(k); + msg = std::format( + "rowset meta exceeded 2G, unable to serialize, key={}. byte_size={}", + hex(k), byte_size); + LOG(WARNING) << msg; return; } if (!rs->ParseFromArray(v.data(), v.size())) { diff --git a/cloud/test/txn_lazy_commit_test.cpp b/cloud/test/txn_lazy_commit_test.cpp index 9a7679f3dd9e23..93abc184cee273 100644 --- a/cloud/test/txn_lazy_commit_test.cpp +++ b/cloud/test/txn_lazy_commit_test.cpp @@ -25,7 +25,9 @@ #include #include +#include #include +#include #include #include #include @@ -1812,4 +1814,90 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase4Test) { ASSERT_TRUE(abort_timeout_txn_hit); ASSERT_EQ(txn_id, txn_info_pb.txn_id()); } -} // namespace doris::cloud \ No newline at end of file + +TEST(TxnLazyCommitTest, RowsetMetaSizeExceedTest) { + auto txn_kv = get_mem_txn_kv(); + + int64_t db_id = 5252025; + int64_t table_id = 35201043384; + int64_t index_id = 256439; + int64_t partition_id = 732536259; + + auto meta_service = get_meta_service(txn_kv, true); + int64_t tablet_id = 25910248; + + { + create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, + tablet_id); + } + { + int tmp_txn_id = 0; + { + brpc::Controller cntl; + BeginTxnRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + TxnInfoPB txn_info_pb; + txn_info_pb.set_db_id(db_id); + txn_info_pb.set_label("test_label_32ae213dasg3"); + txn_info_pb.add_table_ids(table_id); + txn_info_pb.set_timeout_ms(36000); + req.mutable_txn_info()->CopyFrom(txn_info_pb); + BeginTxnResponse res; + meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + tmp_txn_id = res.txn_id(); + ASSERT_GT(res.txn_id(), 0); + } + { + auto tmp_rowset = create_rowset(tmp_txn_id, tablet_id, partition_id); + CreateRowsetResponse res; + commit_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + { + brpc::Controller cntl; + CommitTxnRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_db_id(db_id); + req.set_txn_id(tmp_txn_id); + req.set_is_2pc(false); + req.set_enable_txn_lazy_commit(true); + CommitTxnResponse res; + meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + } + + auto* sp = SyncPoint::get_instance(); + sp->set_call_back("get_rowset:meta_exceed_limit", [](auto&& args) { + auto* byte_size = try_any_cast(args[0]); + *byte_size = std::numeric_limits::max(); + ++(*byte_size); + }); + + sp->enable_processing(); + { + brpc::Controller cntl; + GetRowsetRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + auto* tablet_idx = req.mutable_idx(); + tablet_idx->set_table_id(table_id); + tablet_idx->set_index_id(index_id); + tablet_idx->set_partition_id(partition_id); + tablet_idx->set_tablet_id(tablet_id); + req.set_start_version(0); + req.set_end_version(-1); + req.set_cumulative_compaction_cnt(0); + req.set_base_compaction_cnt(0); + req.set_cumulative_point(2); + + GetRowsetResponse res; + meta_service->get_rowset(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, + &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::PROTOBUF_SERIALIZE_ERR); + } +} + +} // namespace doris::cloud From 7ba20a7ea1a730d8c9d7a079fd2f06c6f2e505ab Mon Sep 17 00:00:00 2001 From: Siyang Tang Date: Thu, 5 Dec 2024 20:42:07 +0800 Subject: [PATCH 3/5] Update meta_service.cpp --- cloud/src/meta-service/meta_service.cpp | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 2562d15339ac7b..72110660fe42b5 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -87,9 +87,7 @@ std::string get_instance_id(const std::shared_ptr& rc_mgr, std::vector nodes; std::string err = rc_mgr->get_node(cloud_unique_id, &nodes); - { - TEST_SYNC_POINT_CALLBACK("get_instance_id_err", &err); - } + { TEST_SYNC_POINT_CALLBACK("get_instance_id_err", &err); } std::string instance_id; if (!err.empty()) { // cache can't find cloud_unique_id, so degraded by parse cloud_unique_id @@ -286,9 +284,7 @@ void MetaServiceImpl::get_version(::google::protobuf::RpcController* controller, response->set_version(version_pb.version()); response->add_version_update_time_ms(version_pb.update_time_ms()); } - { - TEST_SYNC_POINT_CALLBACK("get_version_code", &code); - } + { TEST_SYNC_POINT_CALLBACK("get_version_code", &code); } return; } else if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { msg = "not found"; From 7b7804e441cce0f3ef4846d6a438911a48a45441 Mon Sep 17 00:00:00 2001 From: Siyang Tang Date: Thu, 5 Dec 2024 23:10:42 +0800 Subject: [PATCH 4/5] Update meta_service.cpp --- cloud/src/meta-service/meta_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 72110660fe42b5..a9e457dbecd6ce 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -1315,7 +1315,7 @@ void internal_get_rowset(Transaction* txn, int64_t start, int64_t end, TEST_SYNC_POINT_CALLBACK("get_rowset:meta_exceed_limit", &byte_size); if (byte_size + v.size() > std::numeric_limits::max()) { code = MetaServiceCode::PROTOBUF_PARSE_ERR; - msg = std::format( + msg = fmt::format( "rowset meta exceeded 2G, unable to serialize, key={}. byte_size={}", hex(k), byte_size); LOG(WARNING) << msg; From 70adf940645caf63d1db655d5cd9168fd961fc48 Mon Sep 17 00:00:00 2001 From: Siyang Tang Date: Thu, 5 Dec 2024 23:35:18 +0800 Subject: [PATCH 5/5] Update txn_lazy_commit_test.cpp --- cloud/test/txn_lazy_commit_test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloud/test/txn_lazy_commit_test.cpp b/cloud/test/txn_lazy_commit_test.cpp index 93abc184cee273..0f284508a3f34e 100644 --- a/cloud/test/txn_lazy_commit_test.cpp +++ b/cloud/test/txn_lazy_commit_test.cpp @@ -1896,7 +1896,7 @@ TEST(TxnLazyCommitTest, RowsetMetaSizeExceedTest) { GetRowsetResponse res; meta_service->get_rowset(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr); - ASSERT_EQ(res.status().code(), MetaServiceCode::PROTOBUF_SERIALIZE_ERR); + ASSERT_EQ(res.status().code(), MetaServiceCode::PROTOBUF_PARSE_ERR); } }