当前位置:网站首页>線程池——C語言
線程池——C語言
2022-06-30 07:18:00 【編程小段】
網上找到一篇對線程池講解很詳細的資料,學習作者文檔及視頻資料並跟著視頻手敲了一下代碼,鏈接如下:
線程池介紹及視頻講解
threadpool.h
#ifndef _THREADPOOL_H
#define _THREADPOOL_H
typedef struct ThreadPool ThreadPool;
//創建線程池並初始化
ThreadPool* threadPoolCreate(int min, int max, int queueCapacity);
//銷毀線程池
int threadPoolDestroy(ThreadPool* pool);
//給線程池添加任務
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);
//獲取線程池工作的線程的個數
int threadPoolBusyNum(ThreadPool* pool);
//獲取線程池活著的線程的個數
int threadPoolAliveNum(ThreadPool* pool);
void* worker(void* arg);
void* manager(void* arg);
void threadExit(ThreadPool* pool);
#endif
threadpool.c
#include"threadpool.h"
#include<pthread.h>
#include<stdlib.h>
#include<stdio.h>
#include<string.h>
#include<unistd.h>
const int NUMBER = 2;
//任務結構體
typedef struct Task
{
void (*function)(void* arg);
void* arg;
}Task;
//線程池結構體
struct ThreadPool
{
//任務隊列
Task* taskQ;
int queueCapacity; //容量
int queueSize; //當前任務個數
int queueFront; //隊頭 -> 取數據
int queueRear; //隊尾 -> 放數據
pthread_t managerID; //管理者線程ID
pthread_t* threadIDs; //工作的線程ID
int minNum; //最小線程數量
int maxNum; //最大線程數量
int busyNum; //忙的線程的個數
int liveNum; //存活的線程的個數
int exitNum; //要殺死的線程個數
pthread_mutex_t mutexPool; //鎖整個的線程池
pthread_mutex_t mutexBusy; //鎖busyNum變量
pthread_cond_t notFull; //任務隊列是不是滿了
pthread_cond_t notEmpty; //任務隊列是不是空了
int shutdown; //是不是要銷毀線程池,銷毀為1,不銷毀為0
};
ThreadPool* threadPoolCreate(int min, int max, int queueCapacity)
{
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
do
{
if(pool == NULL)
{
printf("malloc threadpool fail...\n");
break;
}
pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t)*max);
if(pool->threadIDs == NULL)
{
printf("malloc threadIDs fail...\n");
break;
}
memset(pool->threadIDs, 0, sizeof(pthread_t));
pool->minNum = min;
pool->maxNum = max;
pool->busyNum = 0;
pool->liveNum = min; //和最小個數相等
pool->exitNum = 0;
if( pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
pthread_cond_init(&pool->notFull, NULL) != 0 )
{
printf("mutex or condition init fail...\n");
break;
}
//任務隊列
pool->taskQ = (Task*)malloc(sizeof(Task) * queueCapacity);
pool->queueCapacity = queueCapacity;
pool->queueSize = 0;
pool->queueFront = 0;
pool->queueRear = 0;
pool->shutdown = 0;
//創建線程
pthread_create(&pool->managerID, NULL, manager, pool);
for(int i=0; i<min; i++)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
}
return pool;
}while(0);
//釋放資源
if(pool && pool->threadIDs) free(pool->threadIDs);
if(pool && pool->taskQ) free(pool->taskQ);
if(pool) free(pool);
return NULL;
}
int threadPoolDestroy(ThreadPool* pool)
{
if(pool == NULL)
{
return -1;
}
//關閉線程池
pool->shutdown = 1;
//阻塞回收管理者線程
pthread_join(pool->managerID, NULL);
//喚醒阻塞的消費者線程
for(int i=0; i<pool->liveNum; i++)
{
pthread_cond_signal(&pool->notEmpty);
}
//釋放堆內存
if(pool->taskQ)
{
free(pool->taskQ);
}
if(pool->threadIDs)
{
free(pool->threadIDs);
}
pthread_mutex_destroy(&pool->mutexPool);
pthread_mutex_destroy(&pool->mutexBusy);
pthread_cond_destroy(&pool->notEmpty);
pthread_cond_destroy(&pool->notFull);
free(pool);
pool = NULL;
return 0;
}
//生產者
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{
pthread_mutex_lock(&pool->mutexPool);
while(pool->queueSize == pool->queueCapacity && !pool->shutdown)
{
//阻塞生產者線程
pthread_cond_wait(&pool->notFull, &pool->mutexPool);
}
if(pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
return;
}
//添加任務
pool->taskQ[pool->queueRear].function = func;
pool->taskQ[pool->queueRear].arg = arg;
pool->queueRear = (pool->queueRear+1) % pool->queueCapacity;
pool->queueSize++;
pthread_cond_signal(&pool->notEmpty);
pthread_mutex_unlock(&pool->mutexPool);
}
int threadPoolBusyNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
return busyNum;
}
int threadPoolAliveNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexPool);
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
return liveNum;
}
void* worker(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
while(1)
{
pthread_mutex_lock(&pool->mutexPool);
//當前任務是否為空
//為什麼是while判斷而不是if判斷?
//因為,如果有多個線程阻塞,一個解鎖後向後執行,其他的必須在循環判斷隊列是否為空,
//不然隊列已經空了還往下執行
while(pool->queueSize == 0 && !pool->shutdown)
{
//阻塞工作的線程
pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
//判斷是不是要銷毀
if(pool->exitNum>0)
{
pool->exitNum--;
if(pool->liveNum > pool->minNum)
{
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
}
}
//判斷線程池是否被關閉
//線程池銷毀中也會全部喚醒阻塞的線程,這時候不管任務隊列是否為空都要結束
//所以必須要加這個if判斷
if(pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
//從任務隊列中取出一個任務
Task task;
task.function = pool->taskQ[pool->queueFront].function;
task.arg = pool->taskQ[pool->queueFront].arg;
//移動頭節點
pool->queueFront = (pool->queueFront+1) % pool->queueCapacity;
pool->queueSize--;
//解鎖
pthread_cond_signal(&pool->notFull);
pthread_mutex_unlock(&pool->mutexPool);
printf("thread start working...\n");
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexBusy);
task.function(task.arg);
//(*task.function)(task.arg);
free(task.arg);
task.arg = NULL;
printf("thread end working...\n");
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexBusy);
}
return NULL;
}
void* manager(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
//shutdown只有管理者線程才會修改,所以不需要加鎖
while(!pool->shutdown)
{
//每隔3s檢測一次
sleep(3);
//取出線程池中任務的數量和當前線程的數量
pthread_mutex_lock(&pool->mutexPool);
int queueSize = pool->queueSize;
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
//取出忙的線程的數量
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
//添加線程
//任務的個數>存活的線程個數 && 存活的線程數<最大線程數
if(queueSize>liveNum && liveNum<pool->maxNum)
{
pthread_mutex_lock(&pool->mutexPool);
int counter = 0;
for(int i=0; i<pool->maxNum && counter<NUMBER && pool->liveNum<pool->maxNum; i++)
{
if(pool->threadIDs[i] == 0)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
counter++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexPool);
}
//銷毀線程
//忙的線程*2<存活的線程數 && 存活的線程>最小線程數
if(busyNum*2<liveNum && liveNum>pool->minNum)
{
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPool);
//讓工作的線程自殺
for(int i=0; i<NUMBER; i++)
{
pthread_cond_signal(&pool->notEmpty);
}
}
}
}
void threadExit(ThreadPool* pool)
{
pthread_t tid = pthread_self();
for(int i=0; i<pool->maxNum; i++)
{
if(pool->threadIDs[i] == tid)
{
pool->threadIDs[i] = 0;
printf("%s called, %ld exiting...\n", __FUNCTION__, tid);
break;
}
}
pthread_exit(NULL);
}
測試代碼
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>
#include"threadpool.h"
void taskFunc(void* arg)
{
int num = *(int*)arg;
printf("thread %ld is working, number = %d\n", pthread_self(), num);
sleep(1);
}
int main()
{
//創建出一個線程池
ThreadPool* pool = threadPoolCreate(3, 10, 100);
for(int i=0; i<100; i++)
{
int* num = (int*)malloc(sizeof(int));
*num = i + 100;
threadPoolAdd(pool, taskFunc, num);
}
sleep(30);
threadPoolDestroy(pool);
return 0;
}

边栏推荐
- Network security - detailed explanation of VLAN and tunk methods
- 【SemiDrive源码分析】【X9芯片启动流程】34 - RTOS侧 Display模块 sdm_display_init 显示初始化源码分析
- Go common commands
- Win10踩坑-开机0xc0000225
- 4diac getting started example
- [datawhale team learning] task02: mathematical operation, string and text, list
- 套接字socket编程——UDP
- Develop common dependency Libraries
- Grep command usage
- Go语言指针介绍
猜你喜欢

2、 Layout system

MySQL encounters the problem of expression 1 of select list is not in group by claim and contains nonaggre

The maximum expression in Oracle database message list is 1000 error

next InitializeSecurityContext failed: Unknown error (0x80092012) - 吊销功能无法检查证书是否吊销。

Basic knowledge of system software development

QT generate random number qrandomgenerator

Out of class implementation of member function of class template

Matter protocol

Vs2019 and SQL

年轻人搞副业有多疯狂:月薪3000,副业收入3W
随机推荐
Record common problems: spaces in encodeuricomponent decoding and the use of Schema in third-party apps to invoke apps
What if I don't know what to do after graduating from university?
系统软件开发基础知识
2、 Layout system
Embedded test process
failed to create symbolic link ‘/usr/bin/mysql’: File exists
已解决:initialize specified but the data directory has files in it. Aborting
Qdebug small details
【申博攻略】五.专家推荐信模板
Network security - routing principle
JS null judgment operators | and? Usage of
The first up Master of station B paid to watch the video still came! Price "Persuading" netizens
对占用多字节和位的报文信号解析详解
Network security - layer 3 switching technology and internal network planning
Egret P2 physical engine (1) small ball falling demo
踩坑记录:supervisor 日志返回信息:redis扩展未安装
年轻人搞副业有多疯狂:月薪3000,副业收入3W
Cypress nor flash driver - s29glxxxs
【最全】linux服务器上安装Mysql
QT common macro definitions