手写最简单的线程池

news/2024/7/8 1:12:24

目录

threadsafe_queue.hpp:

thread_pool_easy.hpp:

测试代码:


代码和个人遇到的一些小问题,详细解释可以参考上一篇博客 

threadsafe_queue.hpp:

//实现线程安全的队列容器(通过封装一个queue)
//成员变量:
//    1.通过mutex互斥对pop、push操作加锁,从而实现“安全”
//    2.内部封装的queue,queue容器内部存储share_ptr,数据通过share_ptr间接存储好处:
//        将每个数据的share_ptr初始化放在push()中,wait_and_pop就算异常安全的
//        对share_ptr实例分配内存的操作可以脱离锁的保护,有利于增强性能
//    3.条件变量等待条件成立(pop之前必须先push)
//成员函数:
//    1.构造函数
//    2.push
//    3.pop 通过函数参数传递要pop的数据:
//        两种pop,try_pop立即返回,即使队列为空(通过返回值表示操作失败)
//        wait_and_pop 一直等到有数据才pop
//    4.通过返回值传递要pop的数据
//    5.empty
//

#ifndef _THREADSAFE_QUEUE_HPP_
#define _THREADSAFE_QUEUE_HPP_

#include<queue>
#include<condition_variable>
#include<mutex>
#include<memory>
template<typename T>
class threadsafe_queue{
	mutable std::mutex mut;
	std::queue<std::shared_ptr<T>> queue_data;
	std::condition_variable data_cond;
public:
	threadsafe_queue(){};
	void push(T data){ //push(T& data)
		//std::share_ptr<T> data_ptr = std::make_share<T>(data);
		std::shared_ptr<T> data_ptr(std::make_shared<T>(std::move(data)));
		std::lock_guard<std::mutex> lk(mut);
		queue_data.push(data_ptr);
		data_cond.notify_one();
	}
	bool try_pop(T& value){
		std::lock_guard<std::mutex> lk(mut);
		if(queue_data.empty())
			return false;
		else{
			value = std::move(*queue_data.front());
			queue_data.pop();
			return true;
		}
	}
	void wait_and_pop(T& value){
		std::lock_guard<std::mutex> lk(mut);
		//data_cond.wait(lk,[=](){return this->empty()}); 1.
		data_cond.wait(lk,[this]{return !queue_data.empty();});
		value = std::move(*queue_data.front());
		queue_data.pop();
	}

	std::shared_ptr<T> try_pop(){
		//std::shared_ptr<T> res 2.
		std::lock_guard<std::mutex> lk(mut);
		if(queue_data.empty())
			return nullptr;//return shared_ptr<T>() 3.
		else{
			//std::shared_ptr<T> res = queue_data.front();
			std::shared_ptr<T> res = std::move(queue_data.front());
			queue_data.pop();
			return res;
		}
	}
	std::shared_ptr<T> wait_and_pop(){
		std::lock_guard<std::mutex> lk(mut);
		data_cond.wait(lk,[this]{return !queue_data.empty();});
		std::shared_ptr<T> res = std::move(queue_data.front());
		queue_data.pop();
		return res;
	}
	bool empty() const{
		std::lock_guard<std::mutex> lk(mut);
		return queue_data.empty();
	}
};
#endif

1.//data_cond.wait(lk,[=](){return this->empty()});
        data_cond.wait(lk,[this]{return !queue_data.empty();});

二者效果一样

个人理解:使用data_cond.wait(lk,[=](){return this->empty()});会按值捕获更多的信息,但是我们并不需要,data_cond.wait(lk,[=](){return this->empty()});按引用捕获。

2.在try_pop()中锁外声明shared_ptr抛出异常的话可能出问题

3.return nullptr;//return shared_ptr<T>()

二者效果一样,nullptr还省了构造。

thread_pool_easy.hpp:

//最简单的线程池::工作线程数目固定,当有任务要处理时就把他放进任务队列
//所以需要一个任务队列,用threadsafe_queue来实现
//各线程从任务队列中领取任务
//工作线程存储在vector容器中,并被引用到join_threads中,进行统一的析构管理
//

#ifndef _THREAD_POOL_EASY_HPP_
#define _THREAD_POOL_EASY_HPP_
#include "ThreadRAII.h" //join_threads
#include "threadsafe_queue.hpp"    //


#include <atomic>
#include <functional>
#include <thread>
#include <vector>
#include <memory>
class thread_pool
{
private:
	std::atomic_bool done;
	threadsafe_queue<std::function<void()>> work_queue;
	std::vector<std::thread> threads;
	join_threads joiner;
	void work_thread(){
		while(!done){
			std::function<void()> task;
			//std::shared_ptr<std::function<void()>> task;
			if(work_queue.try_pop(task)){
				task();
			}
			else{
				std::this_thread::yield();
			}
		}
	};
public:
	thread_pool():done(false),joiner(threads)
	{
		//auto thread_number = std::thread::hardware_concurrency(); 1.
		unsigned const thread_number = std::thread::hardware_concurrency();
		try{
			for(unsigned i =0; i<thread_number; i++){
                        //threads.push_back(std::thread(work_thread)); 2.
                        threads.push_back(std::thread(&thread_pool::work_thread,this));
               		 }

		}
		catch(...){
			done = true;
			throw;
		}
	}
	~thread_pool(){
		done = true;
	}
	template <typename Function>
	void submit(Function F){
		//work_queue.push(F); 3.
		work_queue.push(std::function<void()>(F));
	}
};
#endif 

1.unsigned const thread_number = std::thread::hardware_concurrency();

thread_number类型是unsigned int

2.第一个参数是一个函数指针,指向类的目标成员函数;第二个参数需要给出相应的对象,以在它之上调用成员函数(这个参数可以是指向对象的指针,或对象本身,或有std::ref封装的对象)

3.//work_queue.push(F);
        work_queue.push(std::function<void()>(F));

在我的测试代码中,二者效果一样。

测试代码:

test_easy.cpp:

#include "threadsafe_queue.hpp"
#include "ThreadRAII.h"
#include "thread_pool_easy.hpp"
#include<thread>
#include<iostream>

#include<mutex>
std::mutex mut;
void Print(){
	std::lock_guard<std::mutex> lk(mut);
	std::cout<<std::this_thread::get_id()<<std::endl;
}
int main(){
	thread_pool pool;
	for(int i=0;i<100; i++){
		pool.submit(Print);
		//pool.submit<void(*)()>(Print);
	}
	return 0;
}

Print里不加锁的话,多线程会抢占输出,输出会变杂乱。

接下来会实现一个能进行任务窃取的线程池。


http://lihuaxi.xjx100.cn/news/1595795.html

相关文章

R语言提交后台任务Rstudio\nohup

R语言后台任务用法 在进行大规模数据分析时&#xff0c;R语言提供了后台计算的功能&#xff0c;能将计算任务提交到后台执行&#xff0c;不影响当前窗口的活动&#xff0c;而且不会受到网络波动导致任务中断&#xff0c;提交后就不用盯着一直看&#xff0c;后台运行就可以下班。…

力扣 -- 647. 回文子串

解题步骤&#xff1a; 参考代码&#xff1a; class Solution { public:int countSubstrings(string s) {int ns.size();vector<vector<bool>> dp(n,vector<bool>(n));//无需初始化int ret0;//一定要从下往上填写每一行for(int in-1;i>0;i--){//每一行的i…

5.绘制多点

愿你出走半生,归来仍是少年&#xff01; 前面几个案例通过TS传入点参数实现绘制&#xff0c;但是每次的绘制都是单独绘制一个点&#xff0c;未实现一次绘制多个点的效果。该案例通过GL的Buffer实现一次绘制多点。 1.知识点 1.1.缓冲区绘制流程 通过缓冲区对象处理数据时&…

【高级rabbitmq】

文章目录 1. 消息丢失问题1.1 发送者消息丢失1.2 MQ消息丢失1.3 消费者消息丢失1.3.1 消费失败重试机制 总结 2. 死信交换机2.1 TTL 3. 惰性队列3.1 总结&#xff1a; 4. MQ集群 消息队列在使用过程中&#xff0c;面临着很多实际问题需要思考&#xff1a; 1. 消息丢失问题 1.1…

如何优雅构建自定义 Spring Boot 验证器,让你的代码更加丝滑!

作为一名开发人员&#xff0c;你应该知道确保应用程序中流动的数据的准确性和完整性是多么重要。Spring Boot提供了强大的验证功能&#xff0c;但有时我们需要额外的验证&#xff0c;创建适合特定需求的自定义验证器。 接下来&#xff0c;我们来介绍下如何完整的创建一个自定义…

2023年中国产业互联网行业交易规模及发展前景分析:产业互联网价值快速显现,发展潜力将进一步释放[图]

产业互联网是基于互联网技术和生态&#xff0c;对各个垂直产业的产业链和内部的价值链进行重塑和改造&#xff0c;从而形成的互联网生态和形态。产业互联网是一种新的经济形态&#xff0c;利用信息技术与互联网平台&#xff0c;充分发挥互联网在生产要素配置中的优化和集成作用…

和力链携手纷享销客推动CRM业财一体化,引领大健康产业数智化发展

两化深度融合正在加速产业转型升级、重塑产业结构&#xff0c;为传统行业注入发展新活力&#xff0c;江西和力物联实业有限公司&#xff08;以下简称“和力链”&#xff09;正是这样一家推动医药大健康产业数智化发展的高新技术企业。 和力链是国内首家大健康供应链产能数字化…

张量-数据操作相关函数

tf.slice(input,begin,size,name None),该函数对输入的数据input进行切片分割操作。其中,参数begin是一个int32或int64类型的tensor,表示的是每一个维度的起始位置。size也是一个int32或int64类型的tensor,表示的是每个维度要取的元素个数。 示例代码如下: import tensorflo…