4、条件变量 condition_variable
4.1、std::condition_variable
std::condition_variable 是结合 std::mutex 使用的同步语句,可以阻塞和唤醒一个或者多个修改共享数据的线程。
比如下面的例子,worker 线程一直阻塞,直到 main() 函数填充数据并唤醒 worker 线程。main() 然后等待 worker 线程处理数据。
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
std::mutex m;
std::condition_variable cv;
std::string data;
bool ready = false;
bool processed = false;
void worker_thread() {
// Wait until main() sends data
std::unique_lock lk(m);
cv.wait(lk, [] { return ready; });
// after the wait, we own the lock.
std::cout << "Worker thread is processing data\n";
data += " after processing";
// Send data back to main()
processed = true;
std::cout << "Worker thread signals data processing completed\n";
// Manual unlocking is done before notifying, to avoid waking up
// the waiting thread only to block again (see notify_one for details)
lk.unlock();
cv.notify_one();
}
int main() {
std::thread worker(worker_thread);
data = "Example data";
// send data to the worker thread
{
std::lock_guard lk(m);
ready = true;
std::cout << "main() signals data ready for processing\n";
}
cv.notify_one();
// wait for the worker
{
std::unique_lock lk(m);
cv.wait(lk, [] { return processed; });
}
std::cout << "Back in main(), data = " << data << '\n';
worker.join();
}
std::condition_variable 只有一个 __condvar 类型的成员变量 _M_cond,最后的实现都是在 __condvar 中。
/// condition_variable
/// cv_status
enum class cv_status { no_timeout, timeout };
/// condition_variable
class condition_variable
{
using steady_clock = chrono::steady_clock;
using system_clock = chrono::system_clock;
#ifdef _GLIBCXX_USE_PTHREAD_COND_CLOCKWAIT
using __clock_t = steady_clock;
#else
using __clock_t = system_clock;
#endif
__condvar _M_cond;
public:
typedef __gthread_cond_t* native_handle_type;
condition_variable() noexcept;
~condition_variable() noexcept;
condition_variable(const condition_variable&) = delete;
condition_variable& operator=(const condition_variable&) = delete;
void
notify_one() noexcept;
void
notify_all() noexcept;
void
wait(unique_lock<mutex>& __lock);
template<typename _Predicate>
void
wait(unique_lock<mutex>& __lock, _Predicate __p)
{
while (!__p())
wait(__lock);
}
#ifdef _GLIBCXX_USE_PTHREAD_COND_CLOCKWAIT
template<typename _Duration>
cv_status
wait_until(unique_lock<mutex>& __lock,
const chrono::time_point<steady_clock, _Duration>& __atime)
{ return __wait_until_impl(__lock, __atime); }
#endif
template<typename _Duration>
cv_status
wait_until(unique_lock<mutex>& __lock,
const chrono::time_point<system_clock, _Duration>& __atime)
{ return __wait_until_impl(__lock, __atime); }
template<typename _Clock, typename _Duration>
cv_status
wait_until(unique_lock<mutex>& __lock,
const chrono::time_point<_Clock, _Duration>& __atime)
{
#if __cplusplus > 201703L
static_assert(chrono::is_clock_v<_Clock>);
#endif
using __s_dur = typename __clock_t::duration;
const typename _Clock::time_point __c_entry = _Clock::now();
const __clock_t::time_point __s_entry = __clock_t::now();
const auto __delta = __atime - __c_entry;
const auto __s_atime = __s_entry +
chrono::__detail::ceil<__s_dur>(__delta);
if (__wait_until_impl(__lock, __s_atime) == cv_status::no_timeout)
return cv_status::no_timeout;
// We got a timeout when measured against __clock_t but
// we need to check against the caller-supplied clock
// to tell whether we should return a timeout.
if (_Clock::now() < __atime)
return cv_status::no_timeout;
return cv_status::timeout;
}
template<typename _Clock, typename _Duration, typename _Predicate>
bool
wait_until(unique_lock<mutex>& __lock,
const chrono::time_point<_Clock, _Duration>& __atime,
_Predicate __p)
{
while (!__p())
if (wait_until(__lock, __atime) == cv_status::timeout)
return __p();
return true;
}
template<typename _Rep, typename _Period>
cv_status
wait_for(unique_lock<mutex>& __lock,
const chrono::duration<_Rep, _Period>& __rtime)
{
using __dur = typename steady_clock::duration;
return wait_until(__lock,
steady_clock::now() +
chrono::__detail::ceil<__dur>(__rtime));
}
template<typename _Rep, typename _Period, typename _Predicate>
bool
wait_for(unique_lock<mutex>& __lock,
const chrono::duration<_Rep, _Period>& __rtime,
_Predicate __p)
{
using __dur = typename steady_clock::duration;
return wait_until(__lock,
steady_clock::now() +
chrono::__detail::ceil<__dur>(__rtime),
std::move(__p));
}
native_handle_type
native_handle()
{ return _M_cond.native_handle(); }
private:
#ifdef _GLIBCXX_USE_PTHREAD_COND_CLOCKWAIT
template<typename _Dur>
cv_status
__wait_until_impl(unique_lock<mutex>& __lock,
const chrono::time_point<steady_clock, _Dur>& __atime)
{
auto __s = chrono::time_point_cast<chrono::seconds>(__atime);
auto __ns = chrono::duration_cast<chrono::nanoseconds>(__atime - __s);
__gthread_time_t __ts =
{
static_cast<std::time_t>(__s.time_since_epoch().count()),
static_cast<long>(__ns.count())
};
// 调用 __condvar::wait_until 实现
_M_cond.wait_until(*__lock.mutex(), CLOCK_MONOTONIC, __ts);
return (steady_clock::now() < __atime
? cv_status::no_timeout : cv_status::timeout);
}
#endif
template<typename _Dur>
cv_status
__wait_until_impl(unique_lock<mutex>& __lock,
const chrono::time_point<system_clock, _Dur>& __atime)
{
auto __s = chrono::time_point_cast<chrono::seconds>(__atime);
auto __ns = chrono::duration_cast<chrono::nanoseconds>(__atime - __s);
__gthread_time_t __ts =
{
static_cast<std::time_t>(__s.time_since_epoch().count()),
static_cast<long>(__ns.count())
};
_M_cond.wait_until(*__lock.mutex(), __ts);
return (system_clock::now() < __atime
? cv_status::no_timeout : cv_status::timeout);
}
};
wait/notify_one/notify_all() 三个函数也是同样调用 __condvar 的接口。
/// condition_variable.cc
void
condition_variable::wait(unique_lock<mutex>& __lock)
{
_M_cond.wait(*__lock.mutex());
}
void
condition_variable::notify_one() noexcept
{
_M_cond.notify_one();
}
void
condition_variable::notify_all() noexcept
{
_M_cond.notify_all();
}
在 Linux 平台,__condvar 就是封装了 pthread_cond_t 数据结构。
/// std_mutex.h
class __condvar
{
using timespec = __gthread_time_t;
public:
__condvar() noexcept
{
#ifndef __GTHREAD_COND_INIT
__GTHREAD_COND_INIT_FUNCTION(&_M_cond);
#endif
}
~__condvar()
{
int __e __attribute__((__unused__)) = __gthread_cond_destroy(&_M_cond);
__glibcxx_assert(__e != EBUSY); // threads are still blocked
}
__condvar(const __condvar&) = delete;
__condvar& operator=(const __condvar&) = delete;
__gthread_cond_t* native_handle() noexcept { return &_M_cond; }
// Expects: Calling thread has locked __m.
void
wait(mutex& __m)
{
int __e __attribute__((__unused__))
= __gthread_cond_wait(&_M_cond, __m.native_handle());
__glibcxx_assert(__e == 0);
}
void
wait_until(mutex& __m, timespec& __abs_time)
{
__gthread_cond_timedwait(&_M_cond, __m.native_handle(), &__abs_time);
}
#ifdef _GLIBCXX_USE_PTHREAD_COND_CLOCKWAIT
void
wait_until(mutex& __m, clockid_t __clock, timespec& __abs_time)
{
pthread_cond_clockwait(&_M_cond, __m.native_handle(), __clock,
&__abs_time);
}
#endif
void
notify_one() noexcept
{
int __e __attribute__((__unused__)) = __gthread_cond_signal(&_M_cond);
__glibcxx_assert(__e == 0);
}
void
notify_all() noexcept
{
int __e __attribute__((__unused__)) = __gthread_cond_broadcast(&_M_cond);
__glibcxx_assert(__e == 0);
}
protected:
#ifdef __GTHREAD_COND_INIT
__gthread_cond_t _M_cond = __GTHREAD_COND_INIT;
#else
__gthread_cond_t _M_cond;
#endif
};
4.2、std::condition_variable_any
std::condition_variable_any 是 std::condition_variable 的泛化版本。
std::condition_variable 只能结合 std::unique_lock<std::mutex> 使用,而 std::condition_variable_any 可以和任何互斥锁配合使用,即使它不是标准库提供的互斥锁,只要提供 lock/unlock 结构即可。
比如可以结合 std::recursive_mutex 使用。
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
// This mutex is used for three purposes:
// 1) to synchronize accesses to i
// 2) to synchronize accesses to std::cerr
// 3) for the condition variable cv
std::recursive_mutex cv_m;
std::condition_variable_any cv;
int i = 0;
void waits() {
std::unique_lock<std::recursive_mutex> lk(cv_m);
std::cerr << "Waiting... \n";
cv.wait(lk, [] { return i == 1; });
std::cerr << "...finished waiting. i == 1\n";
}
void signals() {
std::this_thread::sleep_for(std::chrono::seconds(1));
{
std::lock_guard<std::recursive_mutex> lk(cv_m);
std::cerr << "Notifying...\n";
}
cv.notify_all();
std::this_thread::sleep_for(std::chrono::seconds(1));
{
std::lock_guard<std::recursive_mutex> lk(cv_m);
i = 1;
std::cerr << "Notifying again...\n";
}
cv.notify_all();
}
int main() {
std::thread t1(waits), t2(waits), t3(waits), t4(signals);
t1.join();
t2.join();
t3.join();
t4.join();
}
首先介绍 Unlock 类,在构造函数中执行 unlock,在析构函数中执行 lock。我们知道在条件变量 wait 过程中,需要将互斥锁 unlock;从 wait() 调用返回后,再执行 lock 获取互斥锁。
/// condition_variable
// scoped unlock - unlocks in ctor, re-locks in dtor
template<typename _Lock>
struct _Unlock
{
explicit _Unlock(_Lock& __lk) : _M_lock(__lk) { __lk.unlock(); }
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
~_Unlock() noexcept(false)
{
if (uncaught_exception())
{
__try
{ _M_lock.lock(); }
__catch(const __cxxabiv1::__forced_unwind&)
{ __throw_exception_again; }
__catch(...)
{ }
}
else
_M_lock.lock();
}
#pragma GCC diagnostic pop
_Unlock(const _Unlock&) = delete;
_Unlock& operator=(const _Unlock&) = delete;
_Lock& _M_lock;
};
std::condition_variable_any 的实现给我们如何使用 mutex 做出了指示。condition_variable_any 拥有一个条件变量 M_cond 和一个互斥锁 M_mutex。需要注意的是,M_mutex 是 std::shared_ptr<std::mutex> 类型的变量,而非普通成员变量。
使用 shared_ptr 是为了确保 wait() 可以正确执行。当通知了所有阻塞在 condition_variable_any 上的线程后,condition_variable_any 对象就可以析构。而此时某些线程 wait() 还没有执行。使用 shared_ptr 可以确保所有线程的 wait() 执行时,都能正确获取互斥锁。
条件变量 wait 的执行分为三步:1)unlock 互斥变量;2)阻塞在条件变量上;3)lock 互斥变量。并且 1)和 2)应该具有原子语义,不能被拆分执行,否则 notify 将会被丢失,wait 一直阻塞。为了保证 wait 原子性,condition_variable_any 使用了互斥锁 M_mutex,将 unlock 和 wait 保护其他不被打断。notify 也需要先获取互斥锁 M_mutex。
下面是 condition_variable_any 的完整实现,后面会详细分析 wait 和 notify 函数的实现。
class condition_variable_any
{
#ifdef _GLIBCXX_USE_PTHREAD_COND_CLOCKWAIT
using __clock_t = chrono::steady_clock;
#else
using __clock_t = chrono::system_clock;
#endif
condition_variable _M_cond;
shared_ptr<mutex> _M_mutex;
public:
condition_variable_any() : _M_mutex(std::make_shared<mutex>()) { }
~condition_variable_any() = default;
condition_variable_any(const condition_variable_any&) = delete;
condition_variable_any& operator=(const condition_variable_any&) = delete;
void
notify_one() noexcept
{
lock_guard<mutex> __lock(*_M_mutex);
_M_cond.notify_one();
}
void
notify_all() noexcept
{
lock_guard<mutex> __lock(*_M_mutex);
_M_cond.notify_all();
}
template<typename _Lock>
void
wait(_Lock& __lock)
{
shared_ptr<mutex> __mutex = _M_mutex;
unique_lock<mutex> __my_lock(*__mutex);
_Unlock<_Lock> __unlock(__lock);
// *__mutex must be unlocked before re-locking __lock so move
// ownership of *__mutex lock to an object with shorter lifetime.
unique_lock<mutex> __my_lock2(std::move(__my_lock));
_M_cond.wait(__my_lock2);
}
template<typename _Lock, typename _Predicate>
void
wait(_Lock& __lock, _Predicate __p)
{
while (!__p())
wait(__lock);
}
template<typename _Lock, typename _Clock, typename _Duration>
cv_status
wait_until(_Lock& __lock,
const chrono::time_point<_Clock, _Duration>& __atime)
{
shared_ptr<mutex> __mutex = _M_mutex;
unique_lock<mutex> __my_lock(*__mutex);
_Unlock<_Lock> __unlock(__lock);
// *__mutex must be unlocked before re-locking __lock so move
// ownership of *__mutex lock to an object with shorter lifetime.
unique_lock<mutex> __my_lock2(std::move(__my_lock));
return _M_cond.wait_until(__my_lock2, __atime);
}
template<typename _Lock, typename _Clock,
typename _Duration, typename _Predicate>
bool
wait_until(_Lock& __lock,
const chrono::time_point<_Clock, _Duration>& __atime,
_Predicate __p)
{
while (!__p())
if (wait_until(__lock, __atime) == cv_status::timeout)
return __p();
return true;
}
template<typename _Lock, typename _Rep, typename _Period>
cv_status
wait_for(_Lock& __lock, const chrono::duration<_Rep, _Period>& __rtime)
{ return wait_until(__lock, __clock_t::now() + __rtime); }
template<typename _Lock, typename _Rep,
typename _Period, typename _Predicate>
bool
wait_for(_Lock& __lock,
const chrono::duration<_Rep, _Period>& __rtime, _Predicate __p)
{ return wait_until(__lock, __clock_t::now() + __rtime, std::move(__p)); }
#ifdef __cpp_lib_jthread
...
#endif
};
wait() 首先拷贝 M_mutex,使得 M_mutex 在当前线程 wait 执行过程中,不被析构;然后获取互斥锁 M_mutex,保证 unlock 和 wait 执行不被打断;然后执行 unlock 和 wait。需要注意,wait() 调用前,(1) 先于 (2) 执行,wait() 返回后,(1) 对应的互斥锁也应该先于 (2) 对应的互斥锁操作,所以 __my_lock 被移动到 __unlock 之后。栈变量自动析构,__my_lock2 先析构。
/// condition_variable
template<typename _Lock>
void
wait(_Lock& __lock)
{
shared_ptr<mutex> __mutex = _M_mutex;
unique_lock<mutex> __my_lock(*__mutex); // (1)
_Unlock<_Lock> __unlock(__lock); // (2)
// *__mutex must be unlocked before re-locking __lock so move
// ownership of *__mutex lock to an object with shorter lifetime.
unique_lock<mutex> __my_lock2(std::move(__my_lock));
_M_cond.wait(__my_lock2);
}