优秀的编程知识分享平台

网站首页 > 技术文章 正文

C++ 多线程编程系列 | condition_variable 如何实现的

nanyue 2024-07-25 06:02:43 技术文章 23 ℃

4、条件变量 condition_variable

4.1、std::condition_variable

std::condition_variable 是结合 std::mutex 使用的同步语句,可以阻塞和唤醒一个或者多个修改共享数据的线程。

比如下面的例子,worker 线程一直阻塞,直到 main() 函数填充数据并唤醒 worker 线程。main() 然后等待 worker 线程处理数据。

#include <condition_variable>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>

std::mutex m;
std::condition_variable cv;
std::string data;
bool ready = false;
bool processed = false;

void worker_thread() {
  // Wait until main() sends data
  std::unique_lock lk(m);
  cv.wait(lk, [] { return ready; });

  // after the wait, we own the lock.
  std::cout << "Worker thread is processing data\n";
  data += " after processing";

  // Send data back to main()
  processed = true;
  std::cout << "Worker thread signals data processing completed\n";

  // Manual unlocking is done before notifying, to avoid waking up
  // the waiting thread only to block again (see notify_one for details)
  lk.unlock();
  cv.notify_one();
}

int main() {
  std::thread worker(worker_thread);

  data = "Example data";
  // send data to the worker thread
  {
    std::lock_guard lk(m);
    ready = true;
    std::cout << "main() signals data ready for processing\n";
  }
  cv.notify_one();

  // wait for the worker
  {
    std::unique_lock lk(m);
    cv.wait(lk, [] { return processed; });
  }
  std::cout << "Back in main(), data = " << data << '\n';

  worker.join();
}

std::condition_variable 只有一个 __condvar 类型的成员变量 _M_cond,最后的实现都是在 __condvar 中。

/// condition_variable
  /// cv_status
  enum class cv_status { no_timeout, timeout };

  /// condition_variable
  class condition_variable
  {
    using steady_clock = chrono::steady_clock;
    using system_clock = chrono::system_clock;
#ifdef _GLIBCXX_USE_PTHREAD_COND_CLOCKWAIT
    using __clock_t = steady_clock;
#else
    using __clock_t = system_clock;
#endif

    __condvar _M_cond;

  public:
    typedef __gthread_cond_t* 		native_handle_type;

    condition_variable() noexcept;
    ~condition_variable() noexcept;

    condition_variable(const condition_variable&) = delete;
    condition_variable& operator=(const condition_variable&) = delete;

    void
    notify_one() noexcept;

    void
    notify_all() noexcept;

    void
    wait(unique_lock<mutex>& __lock);

    template<typename _Predicate>
      void
      wait(unique_lock<mutex>& __lock, _Predicate __p)
      {
    while (!__p())
      wait(__lock);
      }

#ifdef _GLIBCXX_USE_PTHREAD_COND_CLOCKWAIT
    template<typename _Duration>
      cv_status
      wait_until(unique_lock<mutex>& __lock,
         const chrono::time_point<steady_clock, _Duration>& __atime)
      { return __wait_until_impl(__lock, __atime); }
#endif

    template<typename _Duration>
      cv_status
      wait_until(unique_lock<mutex>& __lock,
         const chrono::time_point<system_clock, _Duration>& __atime)
      { return __wait_until_impl(__lock, __atime); }

    template<typename _Clock, typename _Duration>
      cv_status
      wait_until(unique_lock<mutex>& __lock,
         const chrono::time_point<_Clock, _Duration>& __atime)
      {
#if __cplusplus > 201703L
    static_assert(chrono::is_clock_v<_Clock>);
#endif
    using __s_dur = typename __clock_t::duration;
    const typename _Clock::time_point __c_entry = _Clock::now();
    const __clock_t::time_point __s_entry = __clock_t::now();
    const auto __delta = __atime - __c_entry;
    const auto __s_atime = __s_entry +
      chrono::__detail::ceil<__s_dur>(__delta);

    if (__wait_until_impl(__lock, __s_atime) == cv_status::no_timeout)
      return cv_status::no_timeout;
    // We got a timeout when measured against __clock_t but
    // we need to check against the caller-supplied clock
    // to tell whether we should return a timeout.
    if (_Clock::now() < __atime)
      return cv_status::no_timeout;
    return cv_status::timeout;
      }

    template<typename _Clock, typename _Duration, typename _Predicate>
      bool
      wait_until(unique_lock<mutex>& __lock,
         const chrono::time_point<_Clock, _Duration>& __atime,
         _Predicate __p)
      {
    while (!__p())
      if (wait_until(__lock, __atime) == cv_status::timeout)
        return __p();
    return true;
      }

    template<typename _Rep, typename _Period>
      cv_status
      wait_for(unique_lock<mutex>& __lock,
           const chrono::duration<_Rep, _Period>& __rtime)
      {
    using __dur = typename steady_clock::duration;
    return wait_until(__lock,
              steady_clock::now() +
              chrono::__detail::ceil<__dur>(__rtime));
      }

    template<typename _Rep, typename _Period, typename _Predicate>
      bool
      wait_for(unique_lock<mutex>& __lock,
           const chrono::duration<_Rep, _Period>& __rtime,
           _Predicate __p)
      {
    using __dur = typename steady_clock::duration;
    return wait_until(__lock,
              steady_clock::now() +
              chrono::__detail::ceil<__dur>(__rtime),
              std::move(__p));
      }

    native_handle_type
    native_handle()
    { return _M_cond.native_handle(); }

  private:
#ifdef _GLIBCXX_USE_PTHREAD_COND_CLOCKWAIT
    template<typename _Dur>
      cv_status
      __wait_until_impl(unique_lock<mutex>& __lock,
            const chrono::time_point<steady_clock, _Dur>& __atime)
      {
    auto __s = chrono::time_point_cast<chrono::seconds>(__atime);
    auto __ns = chrono::duration_cast<chrono::nanoseconds>(__atime - __s);

    __gthread_time_t __ts =
      {
        static_cast<std::time_t>(__s.time_since_epoch().count()),
        static_cast<long>(__ns.count())
      };
  // 调用 __condvar::wait_until 实现
    _M_cond.wait_until(*__lock.mutex(), CLOCK_MONOTONIC, __ts);

    return (steady_clock::now() < __atime
        ? cv_status::no_timeout : cv_status::timeout);
      }
#endif

    template<typename _Dur>
      cv_status
      __wait_until_impl(unique_lock<mutex>& __lock,
            const chrono::time_point<system_clock, _Dur>& __atime)
      {
    auto __s = chrono::time_point_cast<chrono::seconds>(__atime);
    auto __ns = chrono::duration_cast<chrono::nanoseconds>(__atime - __s);

    __gthread_time_t __ts =
      {
        static_cast<std::time_t>(__s.time_since_epoch().count()),
        static_cast<long>(__ns.count())
      };

    _M_cond.wait_until(*__lock.mutex(), __ts);

    return (system_clock::now() < __atime
        ? cv_status::no_timeout : cv_status::timeout);
      }
  };

wait/notify_one/notify_all() 三个函数也是同样调用 __condvar 的接口。

/// condition_variable.cc
  void
  condition_variable::wait(unique_lock<mutex>& __lock)
  {
    _M_cond.wait(*__lock.mutex());
  }

  void
  condition_variable::notify_one() noexcept
  {
    _M_cond.notify_one();
  }

  void
  condition_variable::notify_all() noexcept
  {
    _M_cond.notify_all();
  }

在 Linux 平台,__condvar 就是封装了 pthread_cond_t 数据结构。

/// std_mutex.h
  class __condvar
  {
    using timespec = __gthread_time_t;

  public:
    __condvar() noexcept
    {
#ifndef __GTHREAD_COND_INIT
      __GTHREAD_COND_INIT_FUNCTION(&_M_cond);
#endif
    }

    ~__condvar()
    {
      int __e __attribute__((__unused__)) = __gthread_cond_destroy(&_M_cond);
      __glibcxx_assert(__e != EBUSY); // threads are still blocked
    }

    __condvar(const __condvar&) = delete;
    __condvar& operator=(const __condvar&) = delete;

    __gthread_cond_t* native_handle() noexcept { return &_M_cond; }

    // Expects: Calling thread has locked __m.
    void
    wait(mutex& __m)
    {
      int __e __attribute__((__unused__))
    = __gthread_cond_wait(&_M_cond, __m.native_handle());
      __glibcxx_assert(__e == 0);
    }

    void
    wait_until(mutex& __m, timespec& __abs_time)
    {
      __gthread_cond_timedwait(&_M_cond, __m.native_handle(), &__abs_time);
    }

#ifdef _GLIBCXX_USE_PTHREAD_COND_CLOCKWAIT
    void
    wait_until(mutex& __m, clockid_t __clock, timespec& __abs_time)
    {
      pthread_cond_clockwait(&_M_cond, __m.native_handle(), __clock,
                 &__abs_time);
    }
#endif

    void
    notify_one() noexcept
    {
      int __e __attribute__((__unused__)) = __gthread_cond_signal(&_M_cond);
      __glibcxx_assert(__e == 0);
    }

    void
    notify_all() noexcept
    {
      int __e __attribute__((__unused__)) = __gthread_cond_broadcast(&_M_cond);
      __glibcxx_assert(__e == 0);
    }

  protected:
#ifdef __GTHREAD_COND_INIT
    __gthread_cond_t _M_cond = __GTHREAD_COND_INIT;
#else
    __gthread_cond_t _M_cond;
#endif
  };

4.2、std::condition_variable_any

std::condition_variable_any 是 std::condition_variable 的泛化版本。

std::condition_variable 只能结合 std::unique_lock<std::mutex> 使用,而 std::condition_variable_any 可以和任何互斥锁配合使用,即使它不是标准库提供的互斥锁,只要提供 lock/unlock 结构即可。

比如可以结合 std::recursive_mutex 使用。

#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>

// This mutex is used for three purposes:
// 1) to synchronize accesses to i
// 2) to synchronize accesses to std::cerr
// 3) for the condition variable cv
std::recursive_mutex cv_m;
std::condition_variable_any cv;

int i = 0;

void waits() {
  std::unique_lock<std::recursive_mutex> lk(cv_m);
  std::cerr << "Waiting... \n";
  cv.wait(lk, [] { return i == 1; });
  std::cerr << "...finished waiting. i == 1\n";
}

void signals() {
  std::this_thread::sleep_for(std::chrono::seconds(1));
  {
    std::lock_guard<std::recursive_mutex> lk(cv_m);
    std::cerr << "Notifying...\n";
  }
  cv.notify_all();

  std::this_thread::sleep_for(std::chrono::seconds(1));

  {
    std::lock_guard<std::recursive_mutex> lk(cv_m);
    i = 1;
    std::cerr << "Notifying again...\n";
  }
  cv.notify_all();
}

int main() {
  std::thread t1(waits), t2(waits), t3(waits), t4(signals);
  t1.join();
  t2.join();
  t3.join();
  t4.join();
}

首先介绍 Unlock 类,在构造函数中执行 unlock,在析构函数中执行 lock。我们知道在条件变量 wait 过程中,需要将互斥锁 unlock;从 wait() 调用返回后,再执行 lock 获取互斥锁。

/// condition_variable
    // scoped unlock - unlocks in ctor, re-locks in dtor
    template<typename _Lock>
      struct _Unlock
      {
    explicit _Unlock(_Lock& __lk) : _M_lock(__lk) { __lk.unlock(); }

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
    ~_Unlock() noexcept(false)
    {
      if (uncaught_exception())
        {
          __try
          { _M_lock.lock(); }
          __catch(const __cxxabiv1::__forced_unwind&)
          { __throw_exception_again; }
          __catch(...)
          { }
        }
      else
        _M_lock.lock();
    }
#pragma GCC diagnostic pop

    _Unlock(const _Unlock&) = delete;
    _Unlock& operator=(const _Unlock&) = delete;

    _Lock& _M_lock;
      };

std::condition_variable_any 的实现给我们如何使用 mutex 做出了指示。condition_variable_any 拥有一个条件变量 M_cond 和一个互斥锁 M_mutex。需要注意的是,M_mutex 是 std::shared_ptr<std::mutex> 类型的变量,而非普通成员变量。

使用 shared_ptr 是为了确保 wait() 可以正确执行。当通知了所有阻塞在 condition_variable_any 上的线程后,condition_variable_any 对象就可以析构。而此时某些线程 wait() 还没有执行。使用 shared_ptr 可以确保所有线程的 wait() 执行时,都能正确获取互斥锁。

条件变量 wait 的执行分为三步:1)unlock 互斥变量;2)阻塞在条件变量上;3)lock 互斥变量。并且 1)和 2)应该具有原子语义,不能被拆分执行,否则 notify 将会被丢失,wait 一直阻塞。为了保证 wait 原子性,condition_variable_any 使用了互斥锁 M_mutex,将 unlock 和 wait 保护其他不被打断。notify 也需要先获取互斥锁 M_mutex。

下面是 condition_variable_any 的完整实现,后面会详细分析 wait 和 notify 函数的实现。

  class condition_variable_any
  {
#ifdef _GLIBCXX_USE_PTHREAD_COND_CLOCKWAIT
    using __clock_t = chrono::steady_clock;
#else
    using __clock_t = chrono::system_clock;
#endif
    condition_variable			_M_cond;
    shared_ptr<mutex>			_M_mutex;

  public:
    condition_variable_any() : _M_mutex(std::make_shared<mutex>()) { }
    ~condition_variable_any() = default;

    condition_variable_any(const condition_variable_any&) = delete;
    condition_variable_any& operator=(const condition_variable_any&) = delete;

    void
    notify_one() noexcept
    {
      lock_guard<mutex> __lock(*_M_mutex);
      _M_cond.notify_one();
    }

    void
    notify_all() noexcept
    {
      lock_guard<mutex> __lock(*_M_mutex);
      _M_cond.notify_all();
    }

    template<typename _Lock>
      void
      wait(_Lock& __lock)
      {
    shared_ptr<mutex> __mutex = _M_mutex;
    unique_lock<mutex> __my_lock(*__mutex);
    _Unlock<_Lock> __unlock(__lock);
    // *__mutex must be unlocked before re-locking __lock so move
    // ownership of *__mutex lock to an object with shorter lifetime.
    unique_lock<mutex> __my_lock2(std::move(__my_lock));
    _M_cond.wait(__my_lock2);
      }

    template<typename _Lock, typename _Predicate>
      void
      wait(_Lock& __lock, _Predicate __p)
      {
    while (!__p())
      wait(__lock);
      }

    template<typename _Lock, typename _Clock, typename _Duration>
      cv_status
      wait_until(_Lock& __lock,
         const chrono::time_point<_Clock, _Duration>& __atime)
      {
    shared_ptr<mutex> __mutex = _M_mutex;
    unique_lock<mutex> __my_lock(*__mutex);
    _Unlock<_Lock> __unlock(__lock);
    // *__mutex must be unlocked before re-locking __lock so move
    // ownership of *__mutex lock to an object with shorter lifetime.
    unique_lock<mutex> __my_lock2(std::move(__my_lock));
    return _M_cond.wait_until(__my_lock2, __atime);
      }

    template<typename _Lock, typename _Clock,
         typename _Duration, typename _Predicate>
      bool
      wait_until(_Lock& __lock,
         const chrono::time_point<_Clock, _Duration>& __atime,
         _Predicate __p)
      {
    while (!__p())
      if (wait_until(__lock, __atime) == cv_status::timeout)
        return __p();
    return true;
      }

    template<typename _Lock, typename _Rep, typename _Period>
      cv_status
      wait_for(_Lock& __lock, const chrono::duration<_Rep, _Period>& __rtime)
      { return wait_until(__lock, __clock_t::now() + __rtime); }

    template<typename _Lock, typename _Rep,
         typename _Period, typename _Predicate>
      bool
      wait_for(_Lock& __lock,
           const chrono::duration<_Rep, _Period>& __rtime, _Predicate __p)
      { return wait_until(__lock, __clock_t::now() + __rtime, std::move(__p)); }

#ifdef __cpp_lib_jthread
...
#endif
  };

wait() 首先拷贝 M_mutex,使得 M_mutex 在当前线程 wait 执行过程中,不被析构;然后获取互斥锁 M_mutex,保证 unlock 和 wait 执行不被打断;然后执行 unlock 和 wait。需要注意,wait() 调用前,(1) 先于 (2) 执行,wait() 返回后,(1) 对应的互斥锁也应该先于 (2) 对应的互斥锁操作,所以 __my_lock 被移动到 __unlock 之后。栈变量自动析构,__my_lock2 先析构。

/// condition_variable
template<typename _Lock>
  void
  wait(_Lock& __lock)
  {
shared_ptr<mutex> __mutex = _M_mutex;
unique_lock<mutex> __my_lock(*__mutex); // (1)
_Unlock<_Lock> __unlock(__lock); // (2)
// *__mutex must be unlocked before re-locking __lock so move
// ownership of *__mutex lock to an object with shorter lifetime.
unique_lock<mutex> __my_lock2(std::move(__my_lock));
_M_cond.wait(__my_lock2);
  }

Tags:

最近发表
标签列表