当前位置:网站首页>線程池——C語言
線程池——C語言
2022-06-30 07:18:00 【編程小段】
網上找到一篇對線程池講解很詳細的資料,學習作者文檔及視頻資料並跟著視頻手敲了一下代碼,鏈接如下:
線程池介紹及視頻講解
threadpool.h
#ifndef _THREADPOOL_H
#define _THREADPOOL_H
typedef struct ThreadPool ThreadPool;
//創建線程池並初始化
ThreadPool* threadPoolCreate(int min, int max, int queueCapacity);
//銷毀線程池
int threadPoolDestroy(ThreadPool* pool);
//給線程池添加任務
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);
//獲取線程池工作的線程的個數
int threadPoolBusyNum(ThreadPool* pool);
//獲取線程池活著的線程的個數
int threadPoolAliveNum(ThreadPool* pool);
void* worker(void* arg);
void* manager(void* arg);
void threadExit(ThreadPool* pool);
#endif
threadpool.c
#include"threadpool.h"
#include<pthread.h>
#include<stdlib.h>
#include<stdio.h>
#include<string.h>
#include<unistd.h>
const int NUMBER = 2;
//任務結構體
typedef struct Task
{
void (*function)(void* arg);
void* arg;
}Task;
//線程池結構體
struct ThreadPool
{
//任務隊列
Task* taskQ;
int queueCapacity; //容量
int queueSize; //當前任務個數
int queueFront; //隊頭 -> 取數據
int queueRear; //隊尾 -> 放數據
pthread_t managerID; //管理者線程ID
pthread_t* threadIDs; //工作的線程ID
int minNum; //最小線程數量
int maxNum; //最大線程數量
int busyNum; //忙的線程的個數
int liveNum; //存活的線程的個數
int exitNum; //要殺死的線程個數
pthread_mutex_t mutexPool; //鎖整個的線程池
pthread_mutex_t mutexBusy; //鎖busyNum變量
pthread_cond_t notFull; //任務隊列是不是滿了
pthread_cond_t notEmpty; //任務隊列是不是空了
int shutdown; //是不是要銷毀線程池,銷毀為1,不銷毀為0
};
ThreadPool* threadPoolCreate(int min, int max, int queueCapacity)
{
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
do
{
if(pool == NULL)
{
printf("malloc threadpool fail...\n");
break;
}
pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t)*max);
if(pool->threadIDs == NULL)
{
printf("malloc threadIDs fail...\n");
break;
}
memset(pool->threadIDs, 0, sizeof(pthread_t));
pool->minNum = min;
pool->maxNum = max;
pool->busyNum = 0;
pool->liveNum = min; //和最小個數相等
pool->exitNum = 0;
if( pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
pthread_cond_init(&pool->notFull, NULL) != 0 )
{
printf("mutex or condition init fail...\n");
break;
}
//任務隊列
pool->taskQ = (Task*)malloc(sizeof(Task) * queueCapacity);
pool->queueCapacity = queueCapacity;
pool->queueSize = 0;
pool->queueFront = 0;
pool->queueRear = 0;
pool->shutdown = 0;
//創建線程
pthread_create(&pool->managerID, NULL, manager, pool);
for(int i=0; i<min; i++)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
}
return pool;
}while(0);
//釋放資源
if(pool && pool->threadIDs) free(pool->threadIDs);
if(pool && pool->taskQ) free(pool->taskQ);
if(pool) free(pool);
return NULL;
}
int threadPoolDestroy(ThreadPool* pool)
{
if(pool == NULL)
{
return -1;
}
//關閉線程池
pool->shutdown = 1;
//阻塞回收管理者線程
pthread_join(pool->managerID, NULL);
//喚醒阻塞的消費者線程
for(int i=0; i<pool->liveNum; i++)
{
pthread_cond_signal(&pool->notEmpty);
}
//釋放堆內存
if(pool->taskQ)
{
free(pool->taskQ);
}
if(pool->threadIDs)
{
free(pool->threadIDs);
}
pthread_mutex_destroy(&pool->mutexPool);
pthread_mutex_destroy(&pool->mutexBusy);
pthread_cond_destroy(&pool->notEmpty);
pthread_cond_destroy(&pool->notFull);
free(pool);
pool = NULL;
return 0;
}
//生產者
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{
pthread_mutex_lock(&pool->mutexPool);
while(pool->queueSize == pool->queueCapacity && !pool->shutdown)
{
//阻塞生產者線程
pthread_cond_wait(&pool->notFull, &pool->mutexPool);
}
if(pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
return;
}
//添加任務
pool->taskQ[pool->queueRear].function = func;
pool->taskQ[pool->queueRear].arg = arg;
pool->queueRear = (pool->queueRear+1) % pool->queueCapacity;
pool->queueSize++;
pthread_cond_signal(&pool->notEmpty);
pthread_mutex_unlock(&pool->mutexPool);
}
int threadPoolBusyNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
return busyNum;
}
int threadPoolAliveNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexPool);
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
return liveNum;
}
void* worker(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
while(1)
{
pthread_mutex_lock(&pool->mutexPool);
//當前任務是否為空
//為什麼是while判斷而不是if判斷?
//因為,如果有多個線程阻塞,一個解鎖後向後執行,其他的必須在循環判斷隊列是否為空,
//不然隊列已經空了還往下執行
while(pool->queueSize == 0 && !pool->shutdown)
{
//阻塞工作的線程
pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
//判斷是不是要銷毀
if(pool->exitNum>0)
{
pool->exitNum--;
if(pool->liveNum > pool->minNum)
{
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
}
}
//判斷線程池是否被關閉
//線程池銷毀中也會全部喚醒阻塞的線程,這時候不管任務隊列是否為空都要結束
//所以必須要加這個if判斷
if(pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
//從任務隊列中取出一個任務
Task task;
task.function = pool->taskQ[pool->queueFront].function;
task.arg = pool->taskQ[pool->queueFront].arg;
//移動頭節點
pool->queueFront = (pool->queueFront+1) % pool->queueCapacity;
pool->queueSize--;
//解鎖
pthread_cond_signal(&pool->notFull);
pthread_mutex_unlock(&pool->mutexPool);
printf("thread start working...\n");
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexBusy);
task.function(task.arg);
//(*task.function)(task.arg);
free(task.arg);
task.arg = NULL;
printf("thread end working...\n");
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexBusy);
}
return NULL;
}
void* manager(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
//shutdown只有管理者線程才會修改,所以不需要加鎖
while(!pool->shutdown)
{
//每隔3s檢測一次
sleep(3);
//取出線程池中任務的數量和當前線程的數量
pthread_mutex_lock(&pool->mutexPool);
int queueSize = pool->queueSize;
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
//取出忙的線程的數量
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
//添加線程
//任務的個數>存活的線程個數 && 存活的線程數<最大線程數
if(queueSize>liveNum && liveNum<pool->maxNum)
{
pthread_mutex_lock(&pool->mutexPool);
int counter = 0;
for(int i=0; i<pool->maxNum && counter<NUMBER && pool->liveNum<pool->maxNum; i++)
{
if(pool->threadIDs[i] == 0)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
counter++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexPool);
}
//銷毀線程
//忙的線程*2<存活的線程數 && 存活的線程>最小線程數
if(busyNum*2<liveNum && liveNum>pool->minNum)
{
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPool);
//讓工作的線程自殺
for(int i=0; i<NUMBER; i++)
{
pthread_cond_signal(&pool->notEmpty);
}
}
}
}
void threadExit(ThreadPool* pool)
{
pthread_t tid = pthread_self();
for(int i=0; i<pool->maxNum; i++)
{
if(pool->threadIDs[i] == tid)
{
pool->threadIDs[i] = 0;
printf("%s called, %ld exiting...\n", __FUNCTION__, tid);
break;
}
}
pthread_exit(NULL);
}
測試代碼
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>
#include"threadpool.h"
void taskFunc(void* arg)
{
int num = *(int*)arg;
printf("thread %ld is working, number = %d\n", pthread_self(), num);
sleep(1);
}
int main()
{
//創建出一個線程池
ThreadPool* pool = threadPoolCreate(3, 10, 100);
for(int i=0; i<100; i++)
{
int* num = (int*)malloc(sizeof(int));
*num = i + 100;
threadPoolAdd(pool, taskFunc, num);
}
sleep(30);
threadPoolDestroy(pool);
return 0;
}

边栏推荐
- [resolved] error 1290 (HY000): unknown error 1290
- 4diac getting started example
- QT signal slot alarm QObject:: connect:cannot connect (null)
- 视频播放器(二):视频解码
- El input can only input numbers and has a decimal point. At most two digits can be reserved
- Goland常用快捷键设置
- 2022年6月29日--使用C#迈出第一步--使用 C# 中的“if”、“else”和“else if”语句向代码添加决策逻辑
- [docsify basic use]
- Cubemx completes STM32F103 dual serial port 485 transceiver transmission
- Network security - detailed explanation of VLAN and tunk methods
猜你喜欢

Install go language development tools

28 rounds of interviews with 10 companies in two and a half years

Egret P2 pit encountered by physical engine (1)

Go项目目录结构介绍

解决:div获取不到键盘事件

【最全】linux服务器上安装Mysql

QT generate random number qrandomgenerator

Nested if statement in sum function in SQL Server2005

How to use string branches for switch case

Local unloading traffic of 5g application
随机推荐
Essence of signal slot macros signal and slot
QT signal slot alarm QObject:: connect:cannot connect (null)
Resolution: div failed to get keyboard event
grep命令用法
编写并运行第一个Go语言程序
[introduction to Expo application] v Expert recommendation letter template
[implemented] server jar package startup script and shell script
Idea running run and services
Linu基础-分区规划与使用
Network security ARP protocol and defense
Detailed analysis of message signals occupying multiple bytes and bits
Linu foundation - zoning planning and use
Vs2019 and SQL
How to determine the size of the platform byte order?
All errors reported by NPM
Cubemx completes STM32F103 dual serial port 485 transceiver transmission
Keil plug-in Usage Summary
大学刚毕业不知道做什么工作怎么办?
【SemiDrive源码分析】【X9芯片启动流程】33 - Display模块 相关概念解析
Network security - routing principle