Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
Yukang-Lian committed Dec 23, 2024
1 parent 55c26e0 commit 17dc7ca
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 4 deletions.
5 changes: 4 additions & 1 deletion cloud/src/common/simple_thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <atomic>
#include <condition_variable>
#include <iostream>
#include <memory>
#include <mutex>
#include <thread>
Expand Down Expand Up @@ -154,8 +155,10 @@ class SimpleThreadPool {
}
try {
job();
} catch (const std::exception& e) {
std::cerr << "exception happened when execute job. err: " << e.what() << std::endl;
} catch (...) {
// do nothing
std::cerr << "exception happened when execute job." << std::endl;
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion cloud/src/recycler/recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,8 @@ int InstanceRecycler::recycle_indexes() {

LOG_INFO("begin to recycle indexes").tag("instance_id", instance_id_);

throw std::exception();

int64_t start_time = duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
register_recycle_task(task_name, start_time);

Expand Down Expand Up @@ -1278,7 +1280,6 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, int64_
LOG_WARNING("failed to recycle tablet").tag("instance_id", instance_id_);
return -1;
}
sync_executor.reset();
if (tablet_keys.empty() && tablet_idx_keys.empty()) return 0;
// sort the vector using key's order
std::sort(tablet_keys.begin(), tablet_keys.end(),
Expand Down
18 changes: 16 additions & 2 deletions cloud/src/recycler/sync_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class SyncExecutor {
return *this;
}
std::vector<T> when_all(bool* finished) {
std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, [&](int*) { _reset(); });
timespec current_time;
auto current_time_second = time(nullptr);
current_time.tv_sec = current_time_second + 300;
Expand All @@ -65,11 +66,21 @@ class SyncExecutor {
*finished = false;
return res;
}
res.emplace_back((*task).get());
auto status = task->wait_for(10);
if (status == std::future_status::ready) {
res.emplace_back(task->get());
} else {
*finished = false;
LOG(WARNING) << _name_tag << " task timed out after 10 seconds";
return res;
}
}
return res;
}
void reset() {

private:
void _reset() {
_count.reset(0);
_res.clear();
_stop_token = false;
}
Expand Down Expand Up @@ -98,6 +109,9 @@ class SyncExecutor {
}
_pro.set_value(std::move(t));
}
std::future_status wait_for(size_t seconds) {
return _fut.wait_for(std::chrono::seconds(seconds));
}
bool valid() { return _valid; }
T get() { return _fut.get(); }

Expand Down
50 changes: 50 additions & 0 deletions cloud/test/util_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "recycler/util.h"

#include <chrono>
#include <stdexcept>
#include <string>
#include <string_view>
#include <thread>
Expand Down Expand Up @@ -235,3 +236,52 @@ TEST(UtilTest, normal) {
std::for_each(res.begin(), res.end(), [&s](auto&& n) { ASSERT_EQ(s, n); });
}
}

TEST(UtilTest, test_add_after_when_all) {
auto f = []() {
auto pool = std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
pool->start();
SyncExecutor<int> sync_executor(pool, "test add after when all: inside",
[](int k) { return k != 0; });
auto f1 = []() { return 0; };
sync_executor.add(f1);
bool finished = true;
std::vector<int> res = sync_executor.when_all(&finished);
sync_executor.add(f1);
res = sync_executor.when_all(&finished);
EXPECT_EQ(1, res.size());
EXPECT_EQ(finished, true);
std::for_each(res.begin(), res.end(), [](auto&& n) { EXPECT_EQ(0, n); });
return 0;
};

auto s3_producer_pool = std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
s3_producer_pool->start();
SyncExecutor<int> s3_sync_executor(s3_producer_pool, "test add after when all: outside",
[](int k) { return k != 0; });
s3_sync_executor.add(f);
bool finished = true;
std::vector<int> res = s3_sync_executor.when_all(&finished);
EXPECT_EQ(1, res.size());
EXPECT_EQ(finished, true);
std::for_each(res.begin(), res.end(), [](auto&& n) { EXPECT_EQ(0, n); });
}

TEST(UtilTest, exception) {
auto s3_producer_pool = std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
s3_producer_pool->start();
{
SyncExecutor<int> sync_executor(s3_producer_pool, "exception test",
[](int k) { return k != 0; });
auto f = []() {
throw(std::runtime_error("test exception"));
return 1;
};
sync_executor.add(f);
bool finished = true;
std::vector<int> res = sync_executor.when_all(&finished);
EXPECT_EQ(0, res.size());
EXPECT_EQ(finished, false);
std::for_each(res.begin(), res.end(), [](auto&& n) { EXPECT_EQ(1, n); });
}
}

0 comments on commit 17dc7ca

Please sign in to comment.