当前位置:网站首页>线程池——C语言
线程池——C语言
2022-06-30 07:17:00 【编程小段】
网上找到一篇对线程池讲解很详细的资料,学习作者文档及视频资料并跟着视频手敲了一下代码,链接如下:
线程池介绍及视频讲解
threadpool.h
#ifndef _THREADPOOL_H
#define _THREADPOOL_H
typedef struct ThreadPool ThreadPool;
//创建线程池并初始化
ThreadPool* threadPoolCreate(int min, int max, int queueCapacity);
//销毁线程池
int threadPoolDestroy(ThreadPool* pool);
//给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);
//获取线程池工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool);
//获取线程池活着的线程的个数
int threadPoolAliveNum(ThreadPool* pool);
void* worker(void* arg);
void* manager(void* arg);
void threadExit(ThreadPool* pool);
#endif
threadpool.c
#include"threadpool.h"
#include<pthread.h>
#include<stdlib.h>
#include<stdio.h>
#include<string.h>
#include<unistd.h>
const int NUMBER = 2;
//任务结构体
typedef struct Task
{
void (*function)(void* arg);
void* arg;
}Task;
//线程池结构体
struct ThreadPool
{
//任务队列
Task* taskQ;
int queueCapacity; //容量
int queueSize; //当前任务个数
int queueFront; //队头 -> 取数据
int queueRear; //队尾 -> 放数据
pthread_t managerID; //管理者线程ID
pthread_t* threadIDs; //工作的线程ID
int minNum; //最小线程数量
int maxNum; //最大线程数量
int busyNum; //忙的线程的个数
int liveNum; //存活的线程的个数
int exitNum; //要杀死的线程个数
pthread_mutex_t mutexPool; //锁整个的线程池
pthread_mutex_t mutexBusy; //锁busyNum变量
pthread_cond_t notFull; //任务队列是不是满了
pthread_cond_t notEmpty; //任务队列是不是空了
int shutdown; //是不是要销毁线程池,销毁为1,不销毁为0
};
ThreadPool* threadPoolCreate(int min, int max, int queueCapacity)
{
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
do
{
if(pool == NULL)
{
printf("malloc threadpool fail...\n");
break;
}
pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t)*max);
if(pool->threadIDs == NULL)
{
printf("malloc threadIDs fail...\n");
break;
}
memset(pool->threadIDs, 0, sizeof(pthread_t));
pool->minNum = min;
pool->maxNum = max;
pool->busyNum = 0;
pool->liveNum = min; //和最小个数相等
pool->exitNum = 0;
if( pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
pthread_cond_init(&pool->notFull, NULL) != 0 )
{
printf("mutex or condition init fail...\n");
break;
}
//任务队列
pool->taskQ = (Task*)malloc(sizeof(Task) * queueCapacity);
pool->queueCapacity = queueCapacity;
pool->queueSize = 0;
pool->queueFront = 0;
pool->queueRear = 0;
pool->shutdown = 0;
//创建线程
pthread_create(&pool->managerID, NULL, manager, pool);
for(int i=0; i<min; i++)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
}
return pool;
}while(0);
//释放资源
if(pool && pool->threadIDs) free(pool->threadIDs);
if(pool && pool->taskQ) free(pool->taskQ);
if(pool) free(pool);
return NULL;
}
int threadPoolDestroy(ThreadPool* pool)
{
if(pool == NULL)
{
return -1;
}
//关闭线程池
pool->shutdown = 1;
//阻塞回收管理者线程
pthread_join(pool->managerID, NULL);
//唤醒阻塞的消费者线程
for(int i=0; i<pool->liveNum; i++)
{
pthread_cond_signal(&pool->notEmpty);
}
//释放堆内存
if(pool->taskQ)
{
free(pool->taskQ);
}
if(pool->threadIDs)
{
free(pool->threadIDs);
}
pthread_mutex_destroy(&pool->mutexPool);
pthread_mutex_destroy(&pool->mutexBusy);
pthread_cond_destroy(&pool->notEmpty);
pthread_cond_destroy(&pool->notFull);
free(pool);
pool = NULL;
return 0;
}
//生产者
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{
pthread_mutex_lock(&pool->mutexPool);
while(pool->queueSize == pool->queueCapacity && !pool->shutdown)
{
//阻塞生产者线程
pthread_cond_wait(&pool->notFull, &pool->mutexPool);
}
if(pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
return;
}
//添加任务
pool->taskQ[pool->queueRear].function = func;
pool->taskQ[pool->queueRear].arg = arg;
pool->queueRear = (pool->queueRear+1) % pool->queueCapacity;
pool->queueSize++;
pthread_cond_signal(&pool->notEmpty);
pthread_mutex_unlock(&pool->mutexPool);
}
int threadPoolBusyNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
return busyNum;
}
int threadPoolAliveNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexPool);
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
return liveNum;
}
void* worker(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
while(1)
{
pthread_mutex_lock(&pool->mutexPool);
//当前任务是否为空
//为什么是while判断而不是if判断?
//因为,如果有多个线程阻塞,一个解锁后向后执行,其他的必须在循环判断队列是否为空,
//不然队列已经空了还往下执行
while(pool->queueSize == 0 && !pool->shutdown)
{
//阻塞工作的线程
pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
//判断是不是要销毁
if(pool->exitNum>0)
{
pool->exitNum--;
if(pool->liveNum > pool->minNum)
{
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
}
}
//判断线程池是否被关闭
//线程池销毁中也会全部唤醒阻塞的线程,这时候不管任务队列是否为空都要结束
//所以必须要加这个if判断
if(pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
//从任务队列中取出一个任务
Task task;
task.function = pool->taskQ[pool->queueFront].function;
task.arg = pool->taskQ[pool->queueFront].arg;
//移动头节点
pool->queueFront = (pool->queueFront+1) % pool->queueCapacity;
pool->queueSize--;
//解锁
pthread_cond_signal(&pool->notFull);
pthread_mutex_unlock(&pool->mutexPool);
printf("thread start working...\n");
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexBusy);
task.function(task.arg);
//(*task.function)(task.arg);
free(task.arg);
task.arg = NULL;
printf("thread end working...\n");
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexBusy);
}
return NULL;
}
void* manager(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
//shutdown只有管理者线程才会修改,所以不需要加锁
while(!pool->shutdown)
{
//每隔3s检测一次
sleep(3);
//取出线程池中任务的数量和当前线程的数量
pthread_mutex_lock(&pool->mutexPool);
int queueSize = pool->queueSize;
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
//取出忙的线程的数量
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
//添加线程
//任务的个数>存活的线程个数 && 存活的线程数<最大线程数
if(queueSize>liveNum && liveNum<pool->maxNum)
{
pthread_mutex_lock(&pool->mutexPool);
int counter = 0;
for(int i=0; i<pool->maxNum && counter<NUMBER && pool->liveNum<pool->maxNum; i++)
{
if(pool->threadIDs[i] == 0)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
counter++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexPool);
}
//销毁线程
//忙的线程*2<存活的线程数 && 存活的线程>最小线程数
if(busyNum*2<liveNum && liveNum>pool->minNum)
{
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPool);
//让工作的线程自杀
for(int i=0; i<NUMBER; i++)
{
pthread_cond_signal(&pool->notEmpty);
}
}
}
}
void threadExit(ThreadPool* pool)
{
pthread_t tid = pthread_self();
for(int i=0; i<pool->maxNum; i++)
{
if(pool->threadIDs[i] == tid)
{
pool->threadIDs[i] = 0;
printf("%s called, %ld exiting...\n", __FUNCTION__, tid);
break;
}
}
pthread_exit(NULL);
}
测试代码
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>
#include"threadpool.h"
void taskFunc(void* arg)
{
int num = *(int*)arg;
printf("thread %ld is working, number = %d\n", pthread_self(), num);
sleep(1);
}
int main()
{
//创建出一个线程池
ThreadPool* pool = threadPoolCreate(3, 10, 100);
for(int i=0; i<100; i++)
{
int* num = (int*)malloc(sizeof(int));
*num = i + 100;
threadPoolAdd(pool, taskFunc, num);
}
sleep(30);
threadPoolDestroy(pool);
return 0;
}

边栏推荐
- June 29, 2022 -- take the first step with C # -- add decision logic to the code using the "if", "else" and "else if" statements in C #
- 【Hot100】15. Sum of three numbers
- grep命令用法
- QT generate random number qrandomgenerator
- next InitializeSecurityContext failed: Unknown error (0x80092012) - 吊销功能无法检查证书是否吊销。
- Raspberry pie trivial configuration
- Basic knowledge of system software development
- js创建pdf文件
- 网络安全-VLAN和Tunk方法详解
- Vs2019 and SQL
猜你喜欢

网络安全-抓包和IP包头分析

Xshell transfer file

I graduated this year, but I don't know what I want to do

The maximum expression in Oracle database message list is 1000 error

Network security - packet capture and IP packet header analysis

解决:div获取不到键盘事件
![[daily question] 535 Encryption and decryption of tinyurl](/img/73/35674cd5df3d937af0762645041421.png)
[daily question] 535 Encryption and decryption of tinyurl
![[docsify basic use]](/img/9d/db689f5f13708f3e241474afeca1d0.png)
[docsify basic use]
![[datawhale team learning] task02: mathematical operation, string and text, list](/img/3e/c9c984d7576a2e410da7f81a8c0aa6.jpg)
[datawhale team learning] task02: mathematical operation, string and text, list

神经网络计算量及参数量
随机推荐
The maximum expression in Oracle database message list is 1000 error
记录开发过程中无法使用管理员身份修改系统文件问题
next InitializeSecurityContext failed: Unknown error (0x80092012) - 吊销功能无法检查证书是否吊销。
[introduction to Expo application] v Expert recommendation letter template
Out of class implementation of member function of class template
Linux server installation redis
Essence of signal slot macros signal and slot
Grep command usage
QT signal slot alarm QObject:: connect:cannot connect (null)
网络安全-VLAN和Tunk方法详解
28 rounds of interviews with 10 companies in two and a half years
Promise async/await
SwiftUI打造一款美美哒自定义按压反馈按钮
Develop common dependency Libraries
Skillfully use 5 keys to improve office efficiency
网络安全-抓包和IP包头分析
2021-07-02
MySQL encounters the problem of expression 1 of select list is not in group by claim and contains nonaggre
Network security - layer 3 switching technology and internal network planning
Determine whether the picture is in JPG picture format