当前位置:网站首页>基于可变参模板实现的线程池
基于可变参模板实现的线程池
2022-06-29 03:45:00 【~怎么回事啊~】
并发和并行
并发
单核上,多个线程占用不同的CPU时间片,物理上还是串行执行的,但是由于每个线程占用的CPU时间片非常短(比如10ms),看起来就像是多个线程都在共同执行一样,这样的场景称作并发(concurrent)。

并行
在多核或者多CPU上,多个线程是在真正的同时执行,这样的场景称作并行(parallel)。

多线程的优势
多线程程序一定就好吗?不一定,要看具体的应用场景:
IO密集型
涉及I/O操作:如设置操作,文件操作,网络操作(等待客户端连接),如果IO没有准备好,IO操作会把程序堵塞
无论是CPU单核、CPU多核、多CPU,都是比较适合多线程程序的
I/O密集型更适合设计成多线程程序。
CPU密集型
大量计算,深度学习,运算
在:CPU单核的情况下 ,多线程存在上下文切换,(CPU寄存器信息保存到线程的内核栈上,上下文切换,然后恢复到CPU寄存器上),是额外的花销,线程越多上下文切换所花费的额外时间也越多,倒不如一个线程一直进行计算。
CPU多核、多CPU 情况下
多个线程可以并行执行,对CPU利用率好。
线程的消耗
为了完成任务,创建很多的线程可以吗?线程真的是越多越好?
1.线程的创建和销毁都是非常"重"的操作(性能消耗,耗时严重)

2.线程栈本身占用大量内存

栈空间默认:8M

一个进程大概可以创建380个线程。(扣除代码段,数据段,共享库区域…)
线程栈:线程创建的时候,线程函数占用的栈空间。
3.线程的上下文切换要占用大量时间
4.大量线程同时唤醒会使系统经常出现锯齿状负载或者瞬间负载量很大导致宕机
创建多少个线程比较好?
有几个核,创建几个线程。如果I/O操作多,可以多创建几个线程。
线程池的优势
操作系统上创建线程和销毁线程都是很"重"的操作,耗时耗性能都比较多,那么在服务执行的过程中,如果业务量比较大,实时的去创建线程、执行业务、业务完成后销毁线程,那么会导致系统的实时性能降低,业务的处理能力也会降低。
线程池的优势就是(每个池都有自己的优势),在服务进程启动之初,就事先创建好线程池里面的线程,当业务流量到来时需要分配线程,直接从线程池中获取一个空闲线程执行task任务即可,task执行完成后,也不用释放线程,而是把线程归还到线程池中继续给后续的task提供服务。
fixed模式线程池
线程池里面的线程个数是固定不变的,一般是ThreadPool创建时根据当前机器的CPU核心数量进行指定。
cached模式线程池
线程池里面的线程个数是可动态增长的,根据任务的数量动态的增加线程的数量,但是会设置一个线程数量的阈值(线程过多的坏处上面已经讲过了),任务处理完成,如果动态增长的线程空闲了60s还没有处理其它任务,那么关闭线程,保持池中最初数量的线程即可。
线程同步之线程互斥-mutex互斥锁和原子类型


添加互斥锁(比较重):锁的获取是由内核来保证的,在多线程环境下,一次只有一个线程获取到锁,其他线程阻塞在这把锁(悲观锁)上,就绪态变成阻塞态,在阻塞队列上,但是不一定非得阻塞:还有trylock(活锁,乐观锁,不会阻塞线程,原地转圈,用在获取时间非常短,频率大)
C++11提供的CAS原子类型:基于操作系统的CAS
“无锁机制”,这种锁非常轻量,效率和性能高。
适用场景:对共享变量++,- -
线程同步之线程通信-条件变量cond
线程的调度完全没有顺序可言,受内核的调度算法控制。
但是如果有这样的需求场景:

线程执行,如果发现条件不成立,进入等待wait状态,等待条件成立,相当于处于不受调度的状态,不在就绪队列中,如果被通知notify,从等待状态转向阻塞状态,获取到锁,进入就绪队列,收CPU调度了,就可以继续向下执行。

一个线程的代码块依赖于另一个线程的代码块的结果,需要线程间的通知机制。
条件变量:对于线程的通信的控制更加精确

线程同步之线程通信-信号量
比条件变量 控制的程度相对来说不精细。
项目设计

如何排查死锁?
linux:gdb 打印死锁进程的每个线程栈,查看每个线程当前执行的位置,是不是处于条件变量的wait
package_task 和future的使用
C++11之packaged_task使用介绍_Jimmy1224的博客-CSDN博客_packaged_task
C++11中std::future的使用_fengbingchun的博客-CSDN博客_c++ future
代码:编译时需要启用c++17
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <iostream>
#include <vector>
#include <queue>
#include <memory>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <unordered_map>
#include <thread>
#include <future>
// 线程池支持的模式
enum class PoolMode
{
MODE_FIXED, // 固定数量的线程
MODE_CACHED, // 线程数量可动态增长
};
// 线程类型
class Thread
{
public:
// 线程函数对象类型
using ThreadFunc = std::function<void(int)>;
// 线程构造
Thread(ThreadFunc func)
: func_(func)
, threadId_(generateId_++)
{}
// 线程析构
~Thread() = default;
// 启动线程
void start()
{
// 创建一个线程来执行一个线程函数 pthread_create
std::thread t(func_, threadId_); // C++11来说 线程对象t 和线程函数func_
t.detach(); // 设置分离线程 pthread_detach pthread_t设置成分离线程
}
// 获取线程id
int getId()const
{
return threadId_;
}
private:
ThreadFunc func_;
static int generateId_;
int threadId_; // 保存线程id
};
// 线程池类型
class ThreadPool
{
public:
// 线程池构造
ThreadPool();
// 线程池析构
~ThreadPool();
// 设置线程池的工作模式
void setMode(PoolMode mode);
// 设置task任务队列上线阈值
void setTaskQueMaxThreshHold(int threshhold);
// 设置线程池cached模式下线程阈值
void setThreadSizeThreshHold(int threshhold);
// 给线程池提交任务
// 使用可变参模板编程,让submitTask可以接收任意任务函数和任意数量的参数
// pool.submitTask(sum1, 10, 20); csdn 大秦坑王 右值引用+引用折叠原理
// 返回值future<>
template<typename Func, typename... Args>
auto submitTask(Func&& func, Args&&... args) -> std::future<decltype(func(args...))>
{
// 打包任务,放入任务队列里面
using RType = decltype(func(args...));
auto task = std::make_shared<std::packaged_task<RType()>>(
std::bind(std::forward<Func>(func), std::forward<Args>(args)...));//std::forward 完美转发;Args&& 引用折叠有可能是左值或右值
std::future<RType> result = task->get_future();
// 获取锁
std::unique_lock<std::mutex> lock(taskQueMtx_);
// 用户提交任务,最长不能阻塞超过1s,否则判断提交任务失败,返回
if (!notFull_.wait_for(lock, std::chrono::seconds(1),
[&]()->bool { return taskQue_.size() < (size_t)taskQueMaxThreshHold_; }))
{
// 表示notFull_等待1s种,条件依然没有满足
std::cerr << "task queue is full, submit task fail." << std::endl;
auto task = std::make_shared<std::packaged_task<RType()>>(
[]()->RType { return RType(); });
(*task)();
return task->get_future();//返回空
}
// 如果有空余,把任务放入任务队列中
// using Task = std::function<void()>; 任务队列中的task是没有返回值的,需要处理
taskQue_.emplace([task]() {(*task)(); });//重新封装一个任务,返回值是空,参数是空
taskSize_++;
// 因为新放了任务,任务队列肯定不空了,在notEmpty_上进行通知,赶快分配线程执行任务
notEmpty_.notify_all();
// cached模式 任务处理比较紧急 场景:小而快的任务 需要根据任务数量和空闲线程的数量,判断是否需要创建新的线程出来
if (poolMode_ == PoolMode::MODE_CACHED
&& taskSize_ > idleThreadSize_
&& curThreadSize_ < threadSizeThreshHold_)
{
std::cout << ">>> create new thread..." << std::endl;
// 创建新的线程对象
auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));
int threadId = ptr->getId();
threads_.emplace(threadId, std::move(ptr));
// 启动线程
threads_[threadId]->start();
// 修改线程个数相关的变量
curThreadSize_++;
idleThreadSize_++;
}
// 返回任务的Result对象
return result;
}
// 开启线程池
void start(int initThreadSize = std::thread::hardware_concurrency());
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
private:
// 定义线程函数
void threadFunc(int threadid);
// 检查pool的运行状态
bool checkRunningState() const
{
return isPoolRunning_;
}
private:
std::unordered_map<int, std::unique_ptr<Thread>> threads_; // 线程列表
int initThreadSize_; // 初始的线程数量
int threadSizeThreshHold_; // 线程数量上限阈值
std::atomic_int curThreadSize_; // 记录当前线程池里面线程的总数量
std::atomic_int idleThreadSize_; // 记录空闲线程的数量
// Task任务 =》 函数对象
using Task = std::function<void()>;
std::queue<Task> taskQue_; // 任务队列
std::atomic_int taskSize_; // 任务的数量
int taskQueMaxThreshHold_; // 任务队列数量上限阈值
std::mutex taskQueMtx_; // 保证任务队列的线程安全
std::condition_variable notFull_; // 表示任务队列不满
std::condition_variable notEmpty_; // 表示任务队列不空
std::condition_variable exitCond_; // 等到线程资源全部回收
PoolMode poolMode_; // 当前线程池的工作模式
std::atomic_bool isPoolRunning_; // 表示当前线程池的启动状态
};
#endif
#include <iostream>
#include "threadpool.h"
const int TASK_MAX_THRESHHOLD = INT32_MAX; // INT32_MAX;
const int THREAD_MAX_THRESHHOLD = 1024;
const int THREAD_MAX_IDLE_TIME = 60; // 单位:秒
int Thread::generateId_ = 0;
// 线程池构造
ThreadPool::ThreadPool()
: initThreadSize_(0)
, taskSize_(0)
, idleThreadSize_(0)
, curThreadSize_(0)
, taskQueMaxThreshHold_(TASK_MAX_THRESHHOLD)
, threadSizeThreshHold_(THREAD_MAX_THRESHHOLD)
, poolMode_(PoolMode::MODE_FIXED)
, isPoolRunning_(false)
{}
// 线程池析构
ThreadPool::~ThreadPool()
{
isPoolRunning_ = false;
// 等待线程池里面所有的线程返回 有两种状态:阻塞 & 正在执行任务中
std::unique_lock<std::mutex> lock(taskQueMtx_);
notEmpty_.notify_all();
exitCond_.wait(lock, [&]()->bool {return threads_.size() == 0; });
}
// 设置线程池的工作模式
void ThreadPool::setMode(PoolMode mode)
{
if (checkRunningState())
return;
poolMode_ = mode;
}
// 设置task任务队列上线阈值
void ThreadPool::setTaskQueMaxThreshHold(int threshhold)
{
if (checkRunningState())
return;
taskQueMaxThreshHold_ = threshhold;
}
// 设置线程池cached模式下线程阈值
void ThreadPool::setThreadSizeThreshHold(int threshhold)
{
if (checkRunningState())
return;
if (poolMode_ == PoolMode::MODE_CACHED)
{
threadSizeThreshHold_ = threshhold;
}
}
// 开启线程池
void ThreadPool::start(int initThreadSize)
{
// 设置线程池的运行状态
isPoolRunning_ = true;
// 记录初始线程个数
initThreadSize_ = initThreadSize;
curThreadSize_ = initThreadSize;
// 创建线程对象
for (int i = 0; i < initThreadSize_; i++)
{
// 创建thread线程对象的时候,把线程函数给到thread线程对象
auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));
int threadId = ptr->getId();
threads_.emplace(threadId, std::move(ptr));
// threads_.emplace_back(std::move(ptr));
}
// 启动所有线程 std::vector<Thread*> threads_;
for (int i = 0; i < initThreadSize_; i++)
{
threads_[i]->start(); // 需要去执行一个线程函数
idleThreadSize_++; // 记录初始空闲线程的数量
}
}
// 定义线程函数
void ThreadPool::threadFunc(int threadid)
{
auto lastTime = std::chrono::high_resolution_clock().now();
// 所有任务必须执行完成,线程池才可以回收所有线程资源
for (;;)
{
Task task;
{
// 先获取锁
std::unique_lock<std::mutex> lock(taskQueMtx_);
std::cout << "tid:" << std::this_thread::get_id()
<< "尝试获取任务..." << std::endl;
// cached模式下,有可能已经创建了很多的线程,但是空闲时间超过60s,应该把多余的线程
// 结束回收掉(超过initThreadSize_数量的线程要进行回收)
// 当前时间 - 上一次线程执行的时间 > 60s
// 每一秒中返回一次 怎么区分:超时返回?还是有任务待执行返回
// 锁 + 双重判断
while (taskQue_.size() == 0)
{
// 线程池要结束,回收线程资源
if (!isPoolRunning_)
{
threads_.erase(threadid); // std::this_thread::getid()
std::cout << "threadid:" << std::this_thread::get_id() << " exit!"
<< std::endl;
exitCond_.notify_all();
return; // 线程函数结束,线程结束
}
if (poolMode_ == PoolMode::MODE_CACHED)
{
// 条件变量,超时返回了
if (std::cv_status::timeout ==
notEmpty_.wait_for(lock, std::chrono::seconds(1)))
{
auto now = std::chrono::high_resolution_clock().now();
auto dur = std::chrono::duration_cast<std::chrono::seconds>(now - lastTime);
if (dur.count() >= THREAD_MAX_IDLE_TIME
&& curThreadSize_ > initThreadSize_)
{
// 开始回收当前线程
// 记录线程数量的相关变量的值修改
// 把线程对象从线程列表容器中删除 没有办法 threadFunc《=》thread对象
// threadid => thread对象 => 删除
threads_.erase(threadid); // std::this_thread::getid()
curThreadSize_--;
idleThreadSize_--;
std::cout << "threadid:" << std::this_thread::get_id() << " exit!"
<< std::endl;
return;
}
}
}
else
{
// 等待notEmpty条件
notEmpty_.wait(lock);
}
}
idleThreadSize_--;
std::cout << "tid:" << std::this_thread::get_id()
<< "获取任务成功..." << std::endl;
// 从任务队列种取一个任务出来
task = taskQue_.front();
taskQue_.pop();
taskSize_--;
// 如果依然有剩余任务,继续通知其它得线程执行任务
if (taskQue_.size() > 0)
{
notEmpty_.notify_all();
}
// 取出一个任务,进行通知,通知可以继续提交生产任务
notFull_.notify_all();
} // 就应该把锁释放掉
// 当前线程负责执行这个任务
if (task != nullptr)
{
task(); // 执行function<void()>
}
idleThreadSize_++;
lastTime = std::chrono::high_resolution_clock().now(); // 更新线程执行完任务的时间
}
}
// 线程池项目-最终版.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
//
#include <iostream>
#include <functional>
#include <thread>
#include <future>
#include <chrono>
using namespace std;
#include "threadpool.h"
int sum1(int a, int b)
{
this_thread::sleep_for(chrono::seconds(2));
// 比较耗时
return a + b;
}
int sum2(int a, int b, int c)
{
this_thread::sleep_for(chrono::seconds(2));
return a + b + c;
}
// io线程
void io_thread(int listenfd)
{
}
// worker线程
void worker_thread(int clientfd)
{
}
/*
packaged_task 和future的使用:
packaged_task<int(int, int)> task(sum1);
future<int> res = task.get_future();
thread t(std::move(task), 10, 20);
t.detach();
cout << res.get() << endl;
*/
int main()
{
ThreadPool pool;
pool.start();
future<int> r1 = pool.submitTask(sum1, 1, 2);
future<int> r2 = pool.submitTask(sum2, 1, 2, 3);
future<int> r3 = pool.submitTask([](int b, int e)->int {
int sum = 0;
for (int i = b; i <= e; i++)
sum += i;
return sum;
}, 1, 100);
future<int> r4 = pool.submitTask([](int b, int e)->int {
int sum = 0;
for (int i = b; i <= e; i++)
sum += i;
return sum;
}, 1, 100);
future<int> r5 = pool.submitTask([](int b, int e)->int {
int sum = 0;
for (int i = b; i <= e; i++)
sum += i;
return sum;
}, 1, 100);
cout << r1.get() << endl;
cout << r2.get() << endl;
cout << r3.get() << endl;
cout << r4.get() << endl;
cout << r5.get() << endl;
return 0;
}边栏推荐
- [tcapulusdb knowledge base] Introduction to tcapulusdb table data caching
- 做 SQL 性能优化真是让人干瞪眼
- 你为什么做测试/开发程序员?还能回想出来吗......
- 欧拉开源社区第二届理事会第二次会议召开,新华三、超聚变和龙芯中科成为理事会成员单位
- 二叉树序列化与反序列化(leetcode(困难))
- 百度智能云服务网格产品CSM发布 | 火热公测中
- 【TcaplusDB知识库】查看tcapdir目录服务器
- An annotation elegant implementation of interface data desensitization
- [World Ocean Day] tcapulusdb calls on you to protect marine biodiversity together
- 为什么信息化 ≠ 数字化?终于有人讲明白了
猜你喜欢

leetcode:560. Subarray with and K

【C语言】 详解线程退出函数 pthread_exit

Input input box click with border

leetcode - 295. Median data flow

leetcode:304. 二维区域和检索 - 矩阵不可变

【TcaplusDB知识库】TcaplusDB-tcapsvrmgr工具介绍(一)
![[tcapulusdb knowledge base] Introduction to tcapulusdb tcapsvrmgr tool (III)](/img/7b/8c4f1549054ee8c0184495d9e8e378.png)
[tcapulusdb knowledge base] Introduction to tcapulusdb tcapsvrmgr tool (III)

Seekbar custom pictures are not displayed completely up, down, left, right / bitmaptodrawable / bitmaptodrawable inter rotation / paddingstart/paddingend /thumboffset

Tech Cloud Report: Mixed Office B side: How Safety and Efficiency can be combined?

分布式id解决方案
随机推荐
Data collection and management [4]
[ruoyi] ztree initialization
Tech Cloud Report: Mixed Office B side: How Safety and Efficiency can be combined?
【TcaplusDB知识库】TcaplusDB-tcapsvrmgr工具介绍(三)
MobileOne: 移动端仅需1ms的高性能骨干
高性能限流器 Guava RateLimiter
[tcapulusdb knowledge base] Introduction to tcapulusdb data import
go实现分布式锁
Qtableview gets all currently selected cells
[MCU framework][dfu] DFU upgrade example with CRC verification + timeout mechanism +led indicator + chip locking + chip self erasure
【TcaplusDB知识库】修改业务修改集群cluster
[tcapulusdb knowledge base] tcapulusdb technical support introduction
Open source demo| you draw and I guess -- make your life more interesting
87.(cesium篇)cesium热力图(贴地形)
mysql varcahr 转 int
【TcaplusDB知识库】TcaplusDB-tcapsvrmgr工具介绍(一)
Error: schema validation failed with the following error: the data path '' should not have other properties (projects)
logstash启动过慢甚至卡死
基于redis实现的扣减库存
技术:如何设计zkVM电路
