1、介绍
该项目是一个实现多线程池的简单项目,我将它作为简单的C++项目入门,代码并不多但是精读该代码对于学习C++的新特性以及线程实现有很好的作用。本文会详细讲解代码及使用的库特性,作为初学者如有错误敬请指正。
2、头文件
2.1、头文件包含的库
#ifndef THREAD_POOL_H #define THREAD_POOL_H #include <vector> #include <queue> #include <memory> #include <thread> #include <mutex> #include <condition_variable> #include <future> #include <functional> #include <stdexcept>
memory库:智能指针库,操作用std::shared_ptr或std::make_shared创建指针,通过该库创建的指针可以在不再需要指针后自动释放,避免在线程未结束前指针被释放引起崩溃。
thread库:线程库,通过std::thread创建线程,并通过.join()保护线程防止在其他线程未结束之前主线程结束,.joinabel()判断是否可以保护线程。
mutex库:互斥锁库,配合线程使用防止不同线程对同一个变量进行操作,整个文件中只能有一个互斥锁定义std::mutex mtx。最简单的操作方式是在写操作之前mtx.lock()锁进程,用mtx.unlock()解锁进程。最最常用的方式是std::unique_lock<std:mutex> lg(mtx),当构造函数调用时,互斥量自动锁定,当析构函数调用时,互斥量自动释放。并且它只能在局部作用域中使用。相比于std::lock_guard<std:mutex> lg(mtx),std::unique_lock<std:mutex> lg(mtx)有能够限时等待的操作方式。
condition_variable库:条件变量库,std::condition_variable cv配合互斥锁使用,cv.wait(lock,false):需要等待。cv.notify_one():通知消费者来取任务。
future库:异步编程库,在该项目中很重要,因为future可以在不调用拷贝构造函数的情况下获取线程,不是为了节省内存!!!!!是因为线程不能调用拷贝构造函数。
2.2、类
class ThreadPool { public: ThreadPool(size_t); template<class F, class... Args> auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>; ~ThreadPool(); private: std::vector< std::thread > workers; std::queue< std::function<void()> > tasks; std::mutex queue_mutex; std::condition_variable condition; bool stop; };
文件中只有一个类,有一个构造函数、一个析构函数以及一个成员函数enqueue,其中成员函数enqueue是为了添加线程。成员有5个,workers线程数组,tasks任务队列(函数作为元素),queue_mutex互斥锁,condition环境变量,stop线程池终止变量。
在学习过程中最难了解的是enqueue的定义,其用到了万能引用&&。在C++中左值引用为&,右值引用为&&,在函数模板中才有万能模板&&可以根据数据进行左值引用和右值引用的调整。其中f是函数,args是函数参数,...表示函数参数数量不受限制。 -> std::future<typename std::result_of<F(Args...)>::type>表示auto的返回值固定为std::future。
2.3、构造函数
inline ThreadPool::ThreadPool(size_t threads) : stop(false) { for(size_t i = 0;i<threads;++i) workers.emplace_back( [this] { for(;;) { std::function<void()> task; { std::unique_lock<std::mutex> lock(this->queue_mutex); this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); }); if(this->stop && this->tasks.empty()) return; task = std::move(this->tasks.front()); this->tasks.pop(); } task(); } } ); }
首先为stop进行赋值,之后对线程进行任务分配,workers通过使用emplace_back进行push_back,同样因为push_back会调用拷贝构造函数,而thread不能使用拷贝构造函数。
在emplace_back中使用了lambda表达式并捕获类内所有局部变量(即this指针所能获取的所有成员)。由于使用了内联函数的原因,不能使用while只能使用for(;;)。其中一些空的{}是为了作为局部作用域,解除互斥锁。
如果在for(;;)中没有任务可分配了那么就等待任务到达分配,如果stop被置true那么就直接结束构造函数。有任务立即执行!!!!!!
非常重要的一点std::move为从一个对象转移到另一个对象而不经过拷贝构造函数的操作,最后task执行。
2.4、任务添加成员函数
template<class F, class... Args> auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; auto task = std::make_shared< std::packaged_task<return_type()> >( std::bind(std::forward<F>(f), std::forward<Args>(args)...) ); std::future<return_type> res = task->get_future(); { std::unique_lock<std::mutex> lock(queue_mutex); if(stop) throw std::runtime_error("enqueue on stopped ThreadPool"); tasks.emplace([task](){ (*task)(); }); } condition.notify_one(); return res; }
该成员函数使用auto作为返回值类型,并通过->指定返回类型,其中std::result_of<F(Args...)>::type用以获取函数返回结果,typename确定类型,std::future来保存结果。
为了方便之后指定类型,使用using return_type = typename std::result_of<F(Args...)>::type保存输出结果的类型。
这里对task使用了std::make_shared进行智能指针的使用,std::packaged_task封装异步操作,使其能不调用拷贝构造函数的情况下获取结果。
使用std::bind进行函数参数绑定,std::forward完美转发使其能够在左值引用和右值引用通用。
利用test指针获取.get_future()函数获取future并通过.get()获取值。这里有个重要小技巧,可以通过promise对象跨线程获取值。
最后通过*task获取最终结果,在加入任务成功后利用condition.notify_one()通知线程过来取任务。来活了兄弟们!!!
2.5、析构函数
inline ThreadPool::~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); for(std::thread &worker: workers) worker.join(); }
互斥锁全局只有一个,在任务结束之前不允许其他线程调用,因此这里的操作必须在所有线程都执行结束后才能进行,首先互斥锁上锁,将stop置true。
condition.notify_all()通知各线程将任务队列中的所有任务做完,之后检查每个线程都是否结束,最后结束主线程。
3、结语
找工作学习的第一个小项目,如有错误敬请指正讨论,一起提高。