Skip to content

Commit

Permalink
[fix](recycler) Fix CountdownEvent error and hang (#45760)
Browse files Browse the repository at this point in the history
Fix CountdownEvent error "Invoking add_count() after wait() was invoked"

---------

Co-authored-by: Gavin Chou <[email protected]>
  • Loading branch information
2 people authored and Your Name committed Dec 24, 2024
1 parent 38e8cc4 commit 7ad4af6
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 4 deletions.
5 changes: 4 additions & 1 deletion cloud/src/common/simple_thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <atomic>
#include <condition_variable>
#include <iostream>
#include <memory>
#include <mutex>
#include <thread>
Expand Down Expand Up @@ -154,8 +155,10 @@ class SimpleThreadPool {
}
try {
job();
} catch (const std::exception& e) {
std::cerr << "exception happened when execute job. err: " << e.what() << std::endl;
} catch (...) {
// do nothing
std::cerr << "exception happened when execute job." << std::endl;
}
}
}
Expand Down
1 change: 0 additions & 1 deletion cloud/src/recycler/recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1278,7 +1278,6 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, int64_
LOG_WARNING("failed to recycle tablet").tag("instance_id", instance_id_);
return -1;
}
sync_executor.reset();
if (tablet_keys.empty() && tablet_idx_keys.empty()) return 0;
// sort the vector using key's order
std::sort(tablet_keys.begin(), tablet_keys.end(),
Expand Down
26 changes: 24 additions & 2 deletions cloud/src/recycler/sync_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
#pragma once

#include <bthread/countdown_event.h>
#include <cpp/sync_point.h>
#include <fmt/core.h>
#include <gen_cpp/cloud.pb.h>
#include <glog/logging.h>

#include <chrono>
#include <future>
#include <string>

Expand All @@ -48,10 +50,12 @@ class SyncExecutor {
return *this;
}
std::vector<T> when_all(bool* finished) {
std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, [&](int*) { _reset(); });
timespec current_time;
auto current_time_second = time(nullptr);
current_time.tv_sec = current_time_second + 300;
current_time.tv_nsec = 0;
// Wait for all tasks to complete
while (0 != _count.timed_wait(current_time)) {
current_time.tv_sec += 300;
LOG(WARNING) << _name_tag << " has already taken 5 min, cost: "
Expand All @@ -65,11 +69,26 @@ class SyncExecutor {
*finished = false;
return res;
}
res.emplace_back((*task).get());
size_t max_wait_ms = 10000;
TEST_SYNC_POINT_CALLBACK("SyncExecutor::when_all.set_wait_time", &max_wait_ms);
// _count.timed_wait has already ensured that all tasks are completed.
// The 10 seconds here is just waiting for the task results to be returned,
// so 10 seconds is more than enough.
auto status = task->wait_for(max_wait_ms);
if (status == std::future_status::ready) {
res.emplace_back(task->get());
} else {
*finished = false;
LOG(WARNING) << _name_tag << " task timed out after 10 seconds";
return res;
}
}
return res;
}
void reset() {

private:
void _reset() {
_count.reset(0);
_res.clear();
_stop_token = false;
}
Expand Down Expand Up @@ -98,6 +117,9 @@ class SyncExecutor {
}
_pro.set_value(std::move(t));
}
std::future_status wait_for(size_t milliseconds) {
return _fut.wait_for(std::chrono::milliseconds(milliseconds));
}
bool valid() { return _valid; }
T get() { return _fut.get(); }

Expand Down
90 changes: 90 additions & 0 deletions cloud/test/util_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "recycler/util.h"

#include <chrono>
#include <stdexcept>
#include <string>
#include <string_view>
#include <thread>
Expand All @@ -28,6 +29,7 @@
#include "common/logging.h"
#include "common/simple_thread_pool.h"
#include "common/string_util.h"
#include "cpp/sync_point.h"
#include "gtest/gtest.h"
#include "recycler/recycler.h"
#include "recycler/sync_executor.h"
Expand Down Expand Up @@ -235,3 +237,91 @@ TEST(UtilTest, normal) {
std::for_each(res.begin(), res.end(), [&s](auto&& n) { ASSERT_EQ(s, n); });
}
}

TEST(UtilTest, test_add_after_when_all) {
auto f = []() {
auto pool = std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
pool->start();
SyncExecutor<int> sync_executor(pool, "test add after when all: inside",
[](int k) { return k != 0; });
auto f1 = []() { return 0; };
sync_executor.add(f1);
bool finished = true;
std::vector<int> res = sync_executor.when_all(&finished);
sync_executor.add(f1);
res = sync_executor.when_all(&finished);
EXPECT_EQ(1, res.size());
EXPECT_EQ(finished, true);
std::for_each(res.begin(), res.end(), [](auto&& n) { EXPECT_EQ(0, n); });
return 0;
};

auto s3_producer_pool = std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
s3_producer_pool->start();
SyncExecutor<int> s3_sync_executor(s3_producer_pool, "test add after when all: outside",
[](int k) { return k != 0; });
s3_sync_executor.add(f);
bool finished = true;
std::vector<int> res = s3_sync_executor.when_all(&finished);
EXPECT_EQ(1, res.size());
EXPECT_EQ(finished, true);
std::for_each(res.begin(), res.end(), [](auto&& n) { EXPECT_EQ(0, n); });
}

TEST(UtilTest, exception) {
auto s3_producer_pool = std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
s3_producer_pool->start();
{
SyncExecutor<int> sync_executor(s3_producer_pool, "exception test",
[](int k) { return k != 0; });
auto f = []() {
throw(std::runtime_error("test exception"));
return 1;
};
sync_executor.add(f);
bool finished = true;
std::vector<int> res = sync_executor.when_all(&finished);
EXPECT_EQ(0, res.size());
EXPECT_EQ(finished, false);
std::for_each(res.begin(), res.end(), [](auto&& n) { EXPECT_EQ(1, n); });
}
}

TEST(UtilTest, test_sync_executor) {
auto f = []() {
sleep(1);
auto pool = std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
pool->start();
SyncExecutor<int> sync_executor(pool, "test sync executor: inside",
[](int k) { return k != 0; });
auto f1 = []() { return 0; };
sync_executor.add(f1);
bool finished = true;
std::vector<int> res = sync_executor.when_all(&finished);
sync_executor.add(f1);
res = sync_executor.when_all(&finished);
EXPECT_EQ(1, res.size());
EXPECT_EQ(finished, true);
std::for_each(res.begin(), res.end(), [](auto&& n) { EXPECT_EQ(0, n); });
return 0;
};
std::mutex go_mutex;

auto* sp = doris::SyncPoint::get_instance();
sp->set_call_back("SyncExecutor::when_all.set_wait_time", [&](auto&& args) {
std::unique_lock<std::mutex> _lock(go_mutex);
auto max_wait_time = *doris::try_any_cast<size_t*>(args[0]);
max_wait_time = 100;
});

auto s3_producer_pool = std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
s3_producer_pool->start();
SyncExecutor<int> s3_sync_executor(s3_producer_pool, "test sync executor: outside",
[](int k) { return k != 0; });
s3_sync_executor.add(f);
bool finished = true;
std::vector<int> res = s3_sync_executor.when_all(&finished);
EXPECT_EQ(1, res.size());
EXPECT_EQ(finished, true);
std::for_each(res.begin(), res.end(), [](auto&& n) { EXPECT_EQ(0, n); });
}

0 comments on commit 7ad4af6

Please sign in to comment.