当前位置:网站首页>线程池——C语言
线程池——C语言
2022-06-30 07:17:00 【编程小段】
网上找到一篇对线程池讲解很详细的资料,学习作者文档及视频资料并跟着视频手敲了一下代码,链接如下:
线程池介绍及视频讲解
threadpool.h
#ifndef _THREADPOOL_H
#define _THREADPOOL_H
typedef struct ThreadPool ThreadPool;
//创建线程池并初始化
ThreadPool* threadPoolCreate(int min, int max, int queueCapacity);
//销毁线程池
int threadPoolDestroy(ThreadPool* pool);
//给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);
//获取线程池工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool);
//获取线程池活着的线程的个数
int threadPoolAliveNum(ThreadPool* pool);
void* worker(void* arg);
void* manager(void* arg);
void threadExit(ThreadPool* pool);
#endif
threadpool.c
#include"threadpool.h"
#include<pthread.h>
#include<stdlib.h>
#include<stdio.h>
#include<string.h>
#include<unistd.h>
const int NUMBER = 2;
//任务结构体
typedef struct Task
{
void (*function)(void* arg);
void* arg;
}Task;
//线程池结构体
struct ThreadPool
{
//任务队列
Task* taskQ;
int queueCapacity; //容量
int queueSize; //当前任务个数
int queueFront; //队头 -> 取数据
int queueRear; //队尾 -> 放数据
pthread_t managerID; //管理者线程ID
pthread_t* threadIDs; //工作的线程ID
int minNum; //最小线程数量
int maxNum; //最大线程数量
int busyNum; //忙的线程的个数
int liveNum; //存活的线程的个数
int exitNum; //要杀死的线程个数
pthread_mutex_t mutexPool; //锁整个的线程池
pthread_mutex_t mutexBusy; //锁busyNum变量
pthread_cond_t notFull; //任务队列是不是满了
pthread_cond_t notEmpty; //任务队列是不是空了
int shutdown; //是不是要销毁线程池,销毁为1,不销毁为0
};
ThreadPool* threadPoolCreate(int min, int max, int queueCapacity)
{
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
do
{
if(pool == NULL)
{
printf("malloc threadpool fail...\n");
break;
}
pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t)*max);
if(pool->threadIDs == NULL)
{
printf("malloc threadIDs fail...\n");
break;
}
memset(pool->threadIDs, 0, sizeof(pthread_t));
pool->minNum = min;
pool->maxNum = max;
pool->busyNum = 0;
pool->liveNum = min; //和最小个数相等
pool->exitNum = 0;
if( pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
pthread_cond_init(&pool->notFull, NULL) != 0 )
{
printf("mutex or condition init fail...\n");
break;
}
//任务队列
pool->taskQ = (Task*)malloc(sizeof(Task) * queueCapacity);
pool->queueCapacity = queueCapacity;
pool->queueSize = 0;
pool->queueFront = 0;
pool->queueRear = 0;
pool->shutdown = 0;
//创建线程
pthread_create(&pool->managerID, NULL, manager, pool);
for(int i=0; i<min; i++)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
}
return pool;
}while(0);
//释放资源
if(pool && pool->threadIDs) free(pool->threadIDs);
if(pool && pool->taskQ) free(pool->taskQ);
if(pool) free(pool);
return NULL;
}
int threadPoolDestroy(ThreadPool* pool)
{
if(pool == NULL)
{
return -1;
}
//关闭线程池
pool->shutdown = 1;
//阻塞回收管理者线程
pthread_join(pool->managerID, NULL);
//唤醒阻塞的消费者线程
for(int i=0; i<pool->liveNum; i++)
{
pthread_cond_signal(&pool->notEmpty);
}
//释放堆内存
if(pool->taskQ)
{
free(pool->taskQ);
}
if(pool->threadIDs)
{
free(pool->threadIDs);
}
pthread_mutex_destroy(&pool->mutexPool);
pthread_mutex_destroy(&pool->mutexBusy);
pthread_cond_destroy(&pool->notEmpty);
pthread_cond_destroy(&pool->notFull);
free(pool);
pool = NULL;
return 0;
}
//生产者
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{
pthread_mutex_lock(&pool->mutexPool);
while(pool->queueSize == pool->queueCapacity && !pool->shutdown)
{
//阻塞生产者线程
pthread_cond_wait(&pool->notFull, &pool->mutexPool);
}
if(pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
return;
}
//添加任务
pool->taskQ[pool->queueRear].function = func;
pool->taskQ[pool->queueRear].arg = arg;
pool->queueRear = (pool->queueRear+1) % pool->queueCapacity;
pool->queueSize++;
pthread_cond_signal(&pool->notEmpty);
pthread_mutex_unlock(&pool->mutexPool);
}
int threadPoolBusyNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
return busyNum;
}
int threadPoolAliveNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexPool);
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
return liveNum;
}
void* worker(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
while(1)
{
pthread_mutex_lock(&pool->mutexPool);
//当前任务是否为空
//为什么是while判断而不是if判断?
//因为,如果有多个线程阻塞,一个解锁后向后执行,其他的必须在循环判断队列是否为空,
//不然队列已经空了还往下执行
while(pool->queueSize == 0 && !pool->shutdown)
{
//阻塞工作的线程
pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
//判断是不是要销毁
if(pool->exitNum>0)
{
pool->exitNum--;
if(pool->liveNum > pool->minNum)
{
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
}
}
//判断线程池是否被关闭
//线程池销毁中也会全部唤醒阻塞的线程,这时候不管任务队列是否为空都要结束
//所以必须要加这个if判断
if(pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
//从任务队列中取出一个任务
Task task;
task.function = pool->taskQ[pool->queueFront].function;
task.arg = pool->taskQ[pool->queueFront].arg;
//移动头节点
pool->queueFront = (pool->queueFront+1) % pool->queueCapacity;
pool->queueSize--;
//解锁
pthread_cond_signal(&pool->notFull);
pthread_mutex_unlock(&pool->mutexPool);
printf("thread start working...\n");
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexBusy);
task.function(task.arg);
//(*task.function)(task.arg);
free(task.arg);
task.arg = NULL;
printf("thread end working...\n");
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexBusy);
}
return NULL;
}
void* manager(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
//shutdown只有管理者线程才会修改,所以不需要加锁
while(!pool->shutdown)
{
//每隔3s检测一次
sleep(3);
//取出线程池中任务的数量和当前线程的数量
pthread_mutex_lock(&pool->mutexPool);
int queueSize = pool->queueSize;
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
//取出忙的线程的数量
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
//添加线程
//任务的个数>存活的线程个数 && 存活的线程数<最大线程数
if(queueSize>liveNum && liveNum<pool->maxNum)
{
pthread_mutex_lock(&pool->mutexPool);
int counter = 0;
for(int i=0; i<pool->maxNum && counter<NUMBER && pool->liveNum<pool->maxNum; i++)
{
if(pool->threadIDs[i] == 0)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
counter++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexPool);
}
//销毁线程
//忙的线程*2<存活的线程数 && 存活的线程>最小线程数
if(busyNum*2<liveNum && liveNum>pool->minNum)
{
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPool);
//让工作的线程自杀
for(int i=0; i<NUMBER; i++)
{
pthread_cond_signal(&pool->notEmpty);
}
}
}
}
void threadExit(ThreadPool* pool)
{
pthread_t tid = pthread_self();
for(int i=0; i<pool->maxNum; i++)
{
if(pool->threadIDs[i] == tid)
{
pool->threadIDs[i] = 0;
printf("%s called, %ld exiting...\n", __FUNCTION__, tid);
break;
}
}
pthread_exit(NULL);
}
测试代码
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>
#include"threadpool.h"
void taskFunc(void* arg)
{
int num = *(int*)arg;
printf("thread %ld is working, number = %d\n", pthread_self(), num);
sleep(1);
}
int main()
{
//创建出一个线程池
ThreadPool* pool = threadPoolCreate(3, 10, 100);
for(int i=0; i<100; i++)
{
int* num = (int*)malloc(sizeof(int));
*num = i + 100;
threadPoolAdd(pool, taskFunc, num);
}
sleep(30);
threadPoolDestroy(pool);
return 0;
}

边栏推荐
猜你喜欢

Class template case - encapsulation of array classes

2、 Layout system

vs2019和sql

SwiftUI打造一款美美哒自定义按压反馈按钮

TC397 QSPI(CPU)

Nested if statement in sum function in SQL Server2005

Network security ARP protocol and defense

Basic knowledge of system software development

Qtcreator debug code after configuring CDB debugger view variable value display card

Linux服務器安裝Redis
随机推荐
2022年6月29日--使用C#迈出第一步--使用 C# 中的“if”、“else”和“else if”语句向代码添加决策逻辑
Go语言指针介绍
我今年毕业,但我不知道我要做什么
视频播放器(一):流程
Calculation and parameter quantity of neural network
Win10 step pit - power on 0xc0000225
Raspberry pie trivial configuration
How to determine the size of the platform byte order?
[semidrive source code analysis] [x9 chip startup process] 33 - Analysis of related concepts of display module
对占用多字节和位的报文信号解析详解
网络安全-ARP协议和防御
已解决:initialize specified but the data directory has files in it. Aborting
Jingwei Hengrun won the 10ppm quality award of paccar group again
Connection flood attack principle
What does the real name free domain name mean?
Nested if statement in sum function in SQL Server2005
Traverse map
QT common macro definitions
halcon:读取摄像头并二值化
Goland常用快捷键设置