From 17dc7ca6bac49a4eb8c4a97744b30165bbd39700 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Mon, 23 Dec 2024 20:59:44 +0800 Subject: [PATCH] 1 --- cloud/src/common/simple_thread_pool.h | 5 ++- cloud/src/recycler/recycler.cpp | 3 +- cloud/src/recycler/sync_executor.h | 18 ++++++++-- cloud/test/util_test.cpp | 50 +++++++++++++++++++++++++++ 4 files changed, 72 insertions(+), 4 deletions(-) diff --git a/cloud/src/common/simple_thread_pool.h b/cloud/src/common/simple_thread_pool.h index e18d6787bf7a46..37a4cedbdadd73 100644 --- a/cloud/src/common/simple_thread_pool.h +++ b/cloud/src/common/simple_thread_pool.h @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -154,8 +155,10 @@ class SimpleThreadPool { } try { job(); + } catch (const std::exception& e) { + std::cerr << "exception happened when execute job. err: " << e.what() << std::endl; } catch (...) { - // do nothing + std::cerr << "exception happened when execute job." << std::endl; } } } diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 6877d7e433b253..23b853e34fbc82 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -734,6 +734,8 @@ int InstanceRecycler::recycle_indexes() { LOG_INFO("begin to recycle indexes").tag("instance_id", instance_id_); + throw std::exception(); + int64_t start_time = duration_cast(steady_clock::now().time_since_epoch()).count(); register_recycle_task(task_name, start_time); @@ -1278,7 +1280,6 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, int64_ LOG_WARNING("failed to recycle tablet").tag("instance_id", instance_id_); return -1; } - sync_executor.reset(); if (tablet_keys.empty() && tablet_idx_keys.empty()) return 0; // sort the vector using key's order std::sort(tablet_keys.begin(), tablet_keys.end(), diff --git a/cloud/src/recycler/sync_executor.h b/cloud/src/recycler/sync_executor.h index c84e5e22467a9c..b58da063cd6840 100644 --- a/cloud/src/recycler/sync_executor.h +++ b/cloud/src/recycler/sync_executor.h @@ -48,6 +48,7 @@ class SyncExecutor { return *this; } std::vector when_all(bool* finished) { + std::unique_ptr> defer((int*)0x01, [&](int*) { _reset(); }); timespec current_time; auto current_time_second = time(nullptr); current_time.tv_sec = current_time_second + 300; @@ -65,11 +66,21 @@ class SyncExecutor { *finished = false; return res; } - res.emplace_back((*task).get()); + auto status = task->wait_for(10); + if (status == std::future_status::ready) { + res.emplace_back(task->get()); + } else { + *finished = false; + LOG(WARNING) << _name_tag << " task timed out after 10 seconds"; + return res; + } } return res; } - void reset() { + +private: + void _reset() { + _count.reset(0); _res.clear(); _stop_token = false; } @@ -98,6 +109,9 @@ class SyncExecutor { } _pro.set_value(std::move(t)); } + std::future_status wait_for(size_t seconds) { + return _fut.wait_for(std::chrono::seconds(seconds)); + } bool valid() { return _valid; } T get() { return _fut.get(); } diff --git a/cloud/test/util_test.cpp b/cloud/test/util_test.cpp index c88ef555f82806..331b4e77ca1b89 100644 --- a/cloud/test/util_test.cpp +++ b/cloud/test/util_test.cpp @@ -18,6 +18,7 @@ #include "recycler/util.h" #include +#include #include #include #include @@ -235,3 +236,52 @@ TEST(UtilTest, normal) { std::for_each(res.begin(), res.end(), [&s](auto&& n) { ASSERT_EQ(s, n); }); } } + +TEST(UtilTest, test_add_after_when_all) { + auto f = []() { + auto pool = std::make_shared(config::recycle_pool_parallelism); + pool->start(); + SyncExecutor sync_executor(pool, "test add after when all: inside", + [](int k) { return k != 0; }); + auto f1 = []() { return 0; }; + sync_executor.add(f1); + bool finished = true; + std::vector res = sync_executor.when_all(&finished); + sync_executor.add(f1); + res = sync_executor.when_all(&finished); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(finished, true); + std::for_each(res.begin(), res.end(), [](auto&& n) { EXPECT_EQ(0, n); }); + return 0; + }; + + auto s3_producer_pool = std::make_shared(config::recycle_pool_parallelism); + s3_producer_pool->start(); + SyncExecutor s3_sync_executor(s3_producer_pool, "test add after when all: outside", + [](int k) { return k != 0; }); + s3_sync_executor.add(f); + bool finished = true; + std::vector res = s3_sync_executor.when_all(&finished); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(finished, true); + std::for_each(res.begin(), res.end(), [](auto&& n) { EXPECT_EQ(0, n); }); +} + +TEST(UtilTest, exception) { + auto s3_producer_pool = std::make_shared(config::recycle_pool_parallelism); + s3_producer_pool->start(); + { + SyncExecutor sync_executor(s3_producer_pool, "exception test", + [](int k) { return k != 0; }); + auto f = []() { + throw(std::runtime_error("test exception")); + return 1; + }; + sync_executor.add(f); + bool finished = true; + std::vector res = sync_executor.when_all(&finished); + EXPECT_EQ(0, res.size()); + EXPECT_EQ(finished, false); + std::for_each(res.begin(), res.end(), [](auto&& n) { EXPECT_EQ(1, n); }); + } +} \ No newline at end of file