关于线程池是很早就想去看的,在github上找了star最多的C++ 11 版本,然而一下子看不懂,就搁置在那了。
因为昨天百度面试问到了,我甚至回答我看过但是没看完,有点尴尬,今天花了点时间学了一下基础,并对源码进行了注解。

github 地址

先说一下我现在对上面这个线程池实现原理的理解。

理解之后可以说是很简单了,这个线程池本质上是互斥和条件变量的妙用。
我先开一个事件队列,再开几个线程,首先这些线程在整体上都是死循环,但会在事件队列为空的时候进入睡眠状态(由条件变量控制)。当事件队列非空,那么我就从事件队列中取出一个事件,再将这个事件搭载在线程上运行。事件运行结束后这个线程进入下一个循环,如果事件队列为空,则又会进入睡眠状态。如此循环往复……

这里再分享一下我的一点学习经历,关于future,胡言乱语的一些理解,有错误后续会更改。
我当前的理解是,future是作为一个异步机制的需要而出现(猜测也是开了个线程异步执行。
future可由packaged_task封装后get_future,promise关联,async生产所得。
可使用get方法获其结果。

其中有值的注意的两点。

  1. 多线程类在销毁也就是析构的时候是比较困难的(这在《Linux多线程服务器编程》这本书的第一章就有讲到)。这个源码采用一个bool类型的flag为标准,来判断是否正在销毁。具体操作见源码。
  2. 另外一个是事件作为packaged_task的智能指针来申请。直觉告诉我,好处并只有方便而已。学到后面这里会补上。

注解版Code

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <thread>
#include <vector>

class ThreadPool {
public:
    ThreadPool(size_t);
    template <class F, class... Args>
    auto enqueue(F&& f, Args&&... args) 
        -> std::future<typename std::result_of<F(Args...)>::type>; //模板可变参   用rusult_of获得返回类型
    ~ThreadPool();
    int size() const { return workers.size(); }; //可以注意到这个size对于小并发不会又任何变化

private:
    std::vector<std::thread> workers;        // 线程池主体
    std::queue<std::function<void()>> tasks; //任务队列

    // 同步
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop; // 多线程类的析构是一个大问题,这里用一个stop的bool作为是否开始析构的标志
};

// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
    : stop(false)
{
    for (size_t i = 0; i < threads; ++i)
        workers.emplace_back( //压入线程
            [this] {
                for (;;) {
                    std::function<void()> task; //有个不是很懂的地方就是 为何 void() 会与 return_type() 对应起来……
                    {                           //这里加 {} 是为了让下面定义的 unique_lock 自动析构
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock, // 等待析构或者任务
                            [this] { return this->stop || !this->tasks.empty(); });
                        if (this->stop && this->tasks.empty()) // 尽管开始析构,依然允许将任务队列中的任务做完
                            return;
                        task = std::move(this->tasks.front()); // 取出一个任务
                        this->tasks.pop();                     // 弹出
                    }
                    task(); //线程主体
                }
            });
}

template <class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
    -> std::future<typename std::result_of<F(Args...)>::type>
{
    using return_type = typename std::result_of<F(Args...)>::type; //线程的返回类型 F(Args...)就是线程主体,通过result_of获取类型

    auto task = std::make_shared<std::packaged_task<return_type()>>( // 以智能指针形式存储packaged_task 即封装的异步任务
        std::bind(std::forward<F>(f), std::forward<Args>(args)...)); //通过bind使得function在形式上的参数统一为void

    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queue_mutex);

        if (stop) //若线程池已经开始析构,这是不允许加入新事件的
            throw std::runtime_error("enqueue on stopped ThreadPool");

        tasks.emplace([task]() { (*task)(); }); //将新任务加入任务队列
    }
    condition.notify_one();
    return res;
}

inline ThreadPool::~ThreadPool()
{
    { //开始析构
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for (std::thread& worker : workers) //等待所有线程结束
        worker.join();
}
#endif

主函数就不发了,点上面的git链接就好了。