在线程间通信的一种明显途径是使用条件变量。若我们把检测条件的任务称为检测任务,把对条件做出反应的任务称为反应任务,则策略表述起来很简单:反应任务等待着条件变量,而检测任务则在事件发生时通知条件变量。给定:
std::condition_variable cv;
std::mutex m;
检测任务的代码:
cv.notify_one();
如果有多个反应任务需要通知到,那么使用 notify_all
替换 notify_one
才合适。但不妨假设只有一个反应任务。
反应任务的代码稍显复杂,因为在条件变量之上调用 wait
之前,必须通过 std::unique_lock
锁定互斥量:
{
std::unique_lock<std::mutex> lk(m);
cv.wait(lk);
...
}
然而,这段代码中 std::mutex
的使用不是很恰当,并且还有两个可能存在的问题:
- 如果检测任务在反应任务调用
wait
前就通知了条件变量,则反应任务将永远无法唤醒。 - 反应任务的
wait
语句无法应对虚假唤醒。 此处反应任务不能够确认它所等待的条件是否成立。
一种办法是使用共享的布尔标志:
std::atomic<bool> flag(false);
...
flag = true; // 检测事件 通知反应任务
反应线程轮询标志,一旦标志被设置,则它正在等待的事件就发生了:
while (!flag);
...
但是这种方法会导致等待调用的任务被一直阻塞,并且会产生语境切换成本。
常用手法是结合条件变量和标志位的设计,检测任务长这样:
std::condition_variable cv;
std::Mutex m;
bool flag{false};
...
{
std::lock_guard<std::mutex> g(m);
flag = true;
}
cv.notify_one();
以下是反应任务的实现:
{
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [] { return flag; }); // 防止虚假唤醒
}
另一种办法是摆脱条件变量,互斥量和标志位,让反应任务去等待检测任务设置的期值。检测任务有一个 std::promise
对象,反应任务有对应的期值。当检测任务发现它正在查找的事件已经发生时,它会设置 std::promise
类型对象,即向信道写入。与此同时,反应任务调用 wait
以等待它的期值。该 wait
调用会阻塞反应任务直至 std::promise
类型对象被设置为止。
在本例中,没有数据需要发送,我们所需要的 std::promise
和期值的模板类型可以设置为 void
:
std::promise<void> p; // 信道的约值
... // 检测事件
p.set_value(); // 通知反应任务
... // 准备反应
p.get_future().wait(); // 等待p对应的期值
... // 针对事件做出反应
这种设计只针对一次性的通信,不能重复使用。
假定你只想暂停线程一次(在它创建之后,但在它运行其线程函数之前),使用 void
期值的设计就是合理的选择:
std::promise<void> p;
void react(); // 反应任务
void detect() // 检测任务
{
std::thread t([]
{
p.get_future().wait(); // 暂停t
react(); // 直至其期值被设置
});
...
p.set_value(); // 取消暂停t
...
t.join();
}
当然,如果使用RAII:
std::promise<void> p;
void react(); // 反应任务
void detect() // 检测任务
{
ThreadRAII tr(
std::thread([]
{
p.get_future().wait(); // 暂停t
react(); // 直至其期值被设置
}),
ThreadRAII::DtorAction::join
);
...
p.set_value(); // 取消暂停t
...
}
如果第一个省略号中抛出异常,那么线程中的 wait
将永远无法返回,反应函数无法被执行,这是个需要考虑的问题。
可以将原始代码加以扩充,实现多个反应任务实施先暂停再取消暂停的功能:
std::promise<void> p;
void detect() // 检测任务
{
auto sf = p.get_future().share(); // std::shared_future<void>
std::vector<std::thread> vt;
for (int i = 0; i < threadsToRun; ++i) {
vt.emplace_back([sf]{ sf.wait(); // sf局部副本上wait
react(); });
}
...
p.set_value(); // 取消暂停所有线程
...
for (auto& t : vt) {
t.join();
}
}