diff --git a/cloud/src/common/simple_thread_pool.h b/cloud/src/common/simple_thread_pool.h index e18d6787bf7a46..37a4cedbdadd73 100644 --- a/cloud/src/common/simple_thread_pool.h +++ b/cloud/src/common/simple_thread_pool.h @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -154,8 +155,10 @@ class SimpleThreadPool { } try { job(); + } catch (const std::exception& e) { + std::cerr << "exception happened when execute job. err: " << e.what() << std::endl; } catch (...) { - // do nothing + std::cerr << "exception happened when execute job." << std::endl; } } } diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 6877d7e433b253..ca22b28e031c91 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -1278,7 +1278,6 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, int64_ LOG_WARNING("failed to recycle tablet").tag("instance_id", instance_id_); return -1; } - sync_executor.reset(); if (tablet_keys.empty() && tablet_idx_keys.empty()) return 0; // sort the vector using key's order std::sort(tablet_keys.begin(), tablet_keys.end(), diff --git a/cloud/src/recycler/sync_executor.h b/cloud/src/recycler/sync_executor.h index c84e5e22467a9c..909f36a56c4c9a 100644 --- a/cloud/src/recycler/sync_executor.h +++ b/cloud/src/recycler/sync_executor.h @@ -18,10 +18,12 @@ #pragma once #include +#include #include #include #include +#include #include #include @@ -48,10 +50,12 @@ class SyncExecutor { return *this; } std::vector when_all(bool* finished) { + std::unique_ptr> defer((int*)0x01, [&](int*) { _reset(); }); timespec current_time; auto current_time_second = time(nullptr); current_time.tv_sec = current_time_second + 300; current_time.tv_nsec = 0; + // Wait for all tasks to complete while (0 != _count.timed_wait(current_time)) { current_time.tv_sec += 300; LOG(WARNING) << _name_tag << " has already taken 5 min, cost: " @@ -65,11 +69,26 @@ class SyncExecutor { *finished = false; return res; } - res.emplace_back((*task).get()); + size_t max_wait_ms = 10000; + TEST_SYNC_POINT_CALLBACK("SyncExecutor::when_all.set_wait_time", &max_wait_ms); + // _count.timed_wait has already ensured that all tasks are completed. + // The 10 seconds here is just waiting for the task results to be returned, + // so 10 seconds is more than enough. + auto status = task->wait_for(max_wait_ms); + if (status == std::future_status::ready) { + res.emplace_back(task->get()); + } else { + *finished = false; + LOG(WARNING) << _name_tag << " task timed out after 10 seconds"; + return res; + } } return res; } - void reset() { + +private: + void _reset() { + _count.reset(0); _res.clear(); _stop_token = false; } @@ -98,6 +117,9 @@ class SyncExecutor { } _pro.set_value(std::move(t)); } + std::future_status wait_for(size_t milliseconds) { + return _fut.wait_for(std::chrono::milliseconds(milliseconds)); + } bool valid() { return _valid; } T get() { return _fut.get(); } diff --git a/cloud/test/util_test.cpp b/cloud/test/util_test.cpp index c88ef555f82806..e505b2b99a52da 100644 --- a/cloud/test/util_test.cpp +++ b/cloud/test/util_test.cpp @@ -18,6 +18,7 @@ #include "recycler/util.h" #include +#include #include #include #include @@ -28,6 +29,7 @@ #include "common/logging.h" #include "common/simple_thread_pool.h" #include "common/string_util.h" +#include "cpp/sync_point.h" #include "gtest/gtest.h" #include "recycler/recycler.h" #include "recycler/sync_executor.h" @@ -235,3 +237,91 @@ TEST(UtilTest, normal) { std::for_each(res.begin(), res.end(), [&s](auto&& n) { ASSERT_EQ(s, n); }); } } + +TEST(UtilTest, test_add_after_when_all) { + auto f = []() { + auto pool = std::make_shared(config::recycle_pool_parallelism); + pool->start(); + SyncExecutor sync_executor(pool, "test add after when all: inside", + [](int k) { return k != 0; }); + auto f1 = []() { return 0; }; + sync_executor.add(f1); + bool finished = true; + std::vector res = sync_executor.when_all(&finished); + sync_executor.add(f1); + res = sync_executor.when_all(&finished); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(finished, true); + std::for_each(res.begin(), res.end(), [](auto&& n) { EXPECT_EQ(0, n); }); + return 0; + }; + + auto s3_producer_pool = std::make_shared(config::recycle_pool_parallelism); + s3_producer_pool->start(); + SyncExecutor s3_sync_executor(s3_producer_pool, "test add after when all: outside", + [](int k) { return k != 0; }); + s3_sync_executor.add(f); + bool finished = true; + std::vector res = s3_sync_executor.when_all(&finished); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(finished, true); + std::for_each(res.begin(), res.end(), [](auto&& n) { EXPECT_EQ(0, n); }); +} + +TEST(UtilTest, exception) { + auto s3_producer_pool = std::make_shared(config::recycle_pool_parallelism); + s3_producer_pool->start(); + { + SyncExecutor sync_executor(s3_producer_pool, "exception test", + [](int k) { return k != 0; }); + auto f = []() { + throw(std::runtime_error("test exception")); + return 1; + }; + sync_executor.add(f); + bool finished = true; + std::vector res = sync_executor.when_all(&finished); + EXPECT_EQ(0, res.size()); + EXPECT_EQ(finished, false); + std::for_each(res.begin(), res.end(), [](auto&& n) { EXPECT_EQ(1, n); }); + } +} + +TEST(UtilTest, test_sync_executor) { + auto f = []() { + sleep(1); + auto pool = std::make_shared(config::recycle_pool_parallelism); + pool->start(); + SyncExecutor sync_executor(pool, "test sync executor: inside", + [](int k) { return k != 0; }); + auto f1 = []() { return 0; }; + sync_executor.add(f1); + bool finished = true; + std::vector res = sync_executor.when_all(&finished); + sync_executor.add(f1); + res = sync_executor.when_all(&finished); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(finished, true); + std::for_each(res.begin(), res.end(), [](auto&& n) { EXPECT_EQ(0, n); }); + return 0; + }; + std::mutex go_mutex; + + auto* sp = doris::SyncPoint::get_instance(); + sp->set_call_back("SyncExecutor::when_all.set_wait_time", [&](auto&& args) { + std::unique_lock _lock(go_mutex); + auto max_wait_time = *doris::try_any_cast(args[0]); + max_wait_time = 100; + }); + + auto s3_producer_pool = std::make_shared(config::recycle_pool_parallelism); + s3_producer_pool->start(); + SyncExecutor s3_sync_executor(s3_producer_pool, "test sync executor: outside", + [](int k) { return k != 0; }); + s3_sync_executor.add(f); + bool finished = true; + std::vector res = s3_sync_executor.when_all(&finished); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(finished, true); + std::for_each(res.begin(), res.end(), [](auto&& n) { EXPECT_EQ(0, n); }); +} \ No newline at end of file