目录
threadsafe_queue.hpp:
thread_pool_easy.hpp:
测试代码:
代码和个人遇到的一些小问题,详细解释可以参考上一篇博客
threadsafe_queue.hpp:
//实现线程安全的队列容器(通过封装一个queue)
//成员变量:
// 1.通过mutex互斥对pop、push操作加锁,从而实现“安全”
// 2.内部封装的queue,queue容器内部存储share_ptr,数据通过share_ptr间接存储好处:
// 将每个数据的share_ptr初始化放在push()中,wait_and_pop就算异常安全的
// 对share_ptr实例分配内存的操作可以脱离锁的保护,有利于增强性能
// 3.条件变量等待条件成立(pop之前必须先push)
//成员函数:
// 1.构造函数
// 2.push
// 3.pop 通过函数参数传递要pop的数据:
// 两种pop,try_pop立即返回,即使队列为空(通过返回值表示操作失败)
// wait_and_pop 一直等到有数据才pop
// 4.通过返回值传递要pop的数据
// 5.empty
//
#ifndef _THREADSAFE_QUEUE_HPP_
#define _THREADSAFE_QUEUE_HPP_
#include<queue>
#include<condition_variable>
#include<mutex>
#include<memory>
template<typename T>
class threadsafe_queue{
mutable std::mutex mut;
std::queue<std::shared_ptr<T>> queue_data;
std::condition_variable data_cond;
public:
threadsafe_queue(){};
void push(T data){ //push(T& data)
//std::share_ptr<T> data_ptr = std::make_share<T>(data);
std::shared_ptr<T> data_ptr(std::make_shared<T>(std::move(data)));
std::lock_guard<std::mutex> lk(mut);
queue_data.push(data_ptr);
data_cond.notify_one();
}
bool try_pop(T& value){
std::lock_guard<std::mutex> lk(mut);
if(queue_data.empty())
return false;
else{
value = std::move(*queue_data.front());
queue_data.pop();
return true;
}
}
void wait_and_pop(T& value){
std::lock_guard<std::mutex> lk(mut);
//data_cond.wait(lk,[=](){return this->empty()}); 1.
data_cond.wait(lk,[this]{return !queue_data.empty();});
value = std::move(*queue_data.front());
queue_data.pop();
}
std::shared_ptr<T> try_pop(){
//std::shared_ptr<T> res 2.
std::lock_guard<std::mutex> lk(mut);
if(queue_data.empty())
return nullptr;//return shared_ptr<T>() 3.
else{
//std::shared_ptr<T> res = queue_data.front();
std::shared_ptr<T> res = std::move(queue_data.front());
queue_data.pop();
return res;
}
}
std::shared_ptr<T> wait_and_pop(){
std::lock_guard<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !queue_data.empty();});
std::shared_ptr<T> res = std::move(queue_data.front());
queue_data.pop();
return res;
}
bool empty() const{
std::lock_guard<std::mutex> lk(mut);
return queue_data.empty();
}
};
#endif
1.//data_cond.wait(lk,[=](){return this->empty()});
data_cond.wait(lk,[this]{return !queue_data.empty();});
二者效果一样
个人理解:使用data_cond.wait(lk,[=](){return this->empty()});会按值捕获更多的信息,但是我们并不需要,data_cond.wait(lk,[=](){return this->empty()});按引用捕获。
2.在try_pop()中锁外声明shared_ptr抛出异常的话可能出问题
3.return nullptr;//return shared_ptr<T>()
二者效果一样,nullptr还省了构造。
thread_pool_easy.hpp:
//最简单的线程池::工作线程数目固定,当有任务要处理时就把他放进任务队列
//所以需要一个任务队列,用threadsafe_queue来实现
//各线程从任务队列中领取任务
//工作线程存储在vector容器中,并被引用到join_threads中,进行统一的析构管理
//
#ifndef _THREAD_POOL_EASY_HPP_
#define _THREAD_POOL_EASY_HPP_
#include "ThreadRAII.h" //join_threads
#include "threadsafe_queue.hpp" //
#include <atomic>
#include <functional>
#include <thread>
#include <vector>
#include <memory>
class thread_pool
{
private:
std::atomic_bool done;
threadsafe_queue<std::function<void()>> work_queue;
std::vector<std::thread> threads;
join_threads joiner;
void work_thread(){
while(!done){
std::function<void()> task;
//std::shared_ptr<std::function<void()>> task;
if(work_queue.try_pop(task)){
task();
}
else{
std::this_thread::yield();
}
}
};
public:
thread_pool():done(false),joiner(threads)
{
//auto thread_number = std::thread::hardware_concurrency(); 1.
unsigned const thread_number = std::thread::hardware_concurrency();
try{
for(unsigned i =0; i<thread_number; i++){
//threads.push_back(std::thread(work_thread)); 2.
threads.push_back(std::thread(&thread_pool::work_thread,this));
}
}
catch(...){
done = true;
throw;
}
}
~thread_pool(){
done = true;
}
template <typename Function>
void submit(Function F){
//work_queue.push(F); 3.
work_queue.push(std::function<void()>(F));
}
};
#endif
1.unsigned const thread_number = std::thread::hardware_concurrency();
thread_number类型是unsigned int
2.第一个参数是一个函数指针,指向类的目标成员函数;第二个参数需要给出相应的对象,以在它之上调用成员函数(这个参数可以是指向对象的指针,或对象本身,或有std::ref封装的对象)
3.//work_queue.push(F);
work_queue.push(std::function<void()>(F));
在我的测试代码中,二者效果一样。
测试代码:
test_easy.cpp:
#include "threadsafe_queue.hpp"
#include "ThreadRAII.h"
#include "thread_pool_easy.hpp"
#include<thread>
#include<iostream>
#include<mutex>
std::mutex mut;
void Print(){
std::lock_guard<std::mutex> lk(mut);
std::cout<<std::this_thread::get_id()<<std::endl;
}
int main(){
thread_pool pool;
for(int i=0;i<100; i++){
pool.submit(Print);
//pool.submit<void(*)()>(Print);
}
return 0;
}
Print里不加锁的话,多线程会抢占输出,输出会变杂乱。
接下来会实现一个能进行任务窃取的线程池。