当前位置:网站首页>Pool de Threads - langage C
Pool de Threads - langage C
2022-06-30 07:18:00 【Segment de programmation】
J'a i trouvé un article en ligne qui explique en détail le pool de Threads,Apprenez la documentation de l'auteur et le matériel vidéo et tapez le Code avec la main vidéo,Liens ci - dessous:
Introduction au pool de fils et explication vidéo
Catalogue des articles
threadpool.h
#ifndef _THREADPOOL_H
#define _THREADPOOL_H
typedef struct ThreadPool ThreadPool;
//Créer un pool de Threads et initialiser
ThreadPool* threadPoolCreate(int min, int max, int queueCapacity);
//Détruire le pool de threads
int threadPoolDestroy(ThreadPool* pool);
//Ajouter une tâche au pool de Threads
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);
//Obtient le nombre de Threads pour lesquels le pool de Threads fonctionne
int threadPoolBusyNum(ThreadPool* pool);
// Obtient le nombre de Threads vivants dans le pool de threads
int threadPoolAliveNum(ThreadPool* pool);
void* worker(void* arg);
void* manager(void* arg);
void threadExit(ThreadPool* pool);
#endif
threadpool.c
#include"threadpool.h"
#include<pthread.h>
#include<stdlib.h>
#include<stdio.h>
#include<string.h>
#include<unistd.h>
const int NUMBER = 2;
//Structure de la tâche
typedef struct Task
{
void (*function)(void* arg);
void* arg;
}Task;
//Structure du pool de Threads
struct ThreadPool
{
//File d'attente des tâches
Task* taskQ;
int queueCapacity; //Capacité
int queueSize; //Nombre actuel de tâches
int queueFront; //Chef d'équipe -> Accès aux données
int queueRear; //Fin de l'équipe -> Mettre les données
pthread_t managerID; //Manager threadID
pthread_t* threadIDs; //Les fils qui travaillentID
int minNum; //Nombre minimum de fils
int maxNum; //Nombre maximum de fils
int busyNum; //Nombre de Threads occupés
int liveNum; //Nombre de fils survivants
int exitNum; // Nombre de Threads à tuer
pthread_mutex_t mutexPool; //Verrouiller tout le pool de Threads
pthread_mutex_t mutexBusy; //VerrouillagebusyNumVariables
pthread_cond_t notFull; //La file d'attente est pleine
pthread_cond_t notEmpty; //La file d'attente est vide
int shutdown; //Est - ce que je vais détruire le pool de Threads,Détruit comme suit:1,Ne pas détruire pour0
};
ThreadPool* threadPoolCreate(int min, int max, int queueCapacity)
{
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
do
{
if(pool == NULL)
{
printf("malloc threadpool fail...\n");
break;
}
pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t)*max);
if(pool->threadIDs == NULL)
{
printf("malloc threadIDs fail...\n");
break;
}
memset(pool->threadIDs, 0, sizeof(pthread_t));
pool->minNum = min;
pool->maxNum = max;
pool->busyNum = 0;
pool->liveNum = min; //égal au nombre minimum
pool->exitNum = 0;
if( pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
pthread_cond_init(&pool->notFull, NULL) != 0 )
{
printf("mutex or condition init fail...\n");
break;
}
//File d'attente des tâches
pool->taskQ = (Task*)malloc(sizeof(Task) * queueCapacity);
pool->queueCapacity = queueCapacity;
pool->queueSize = 0;
pool->queueFront = 0;
pool->queueRear = 0;
pool->shutdown = 0;
//Créer un thread
pthread_create(&pool->managerID, NULL, manager, pool);
for(int i=0; i<min; i++)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
}
return pool;
}while(0);
//Libérer des ressources
if(pool && pool->threadIDs) free(pool->threadIDs);
if(pool && pool->taskQ) free(pool->taskQ);
if(pool) free(pool);
return NULL;
}
int threadPoolDestroy(ThreadPool* pool)
{
if(pool == NULL)
{
return -1;
}
//Fermer le pool de Threads
pool->shutdown = 1;
//Blocage du thread Recycling Manager
pthread_join(pool->managerID, NULL);
//Wake up Blocked Consumer thread
for(int i=0; i<pool->liveNum; i++)
{
pthread_cond_signal(&pool->notEmpty);
}
//Libérer la mémoire tas
if(pool->taskQ)
{
free(pool->taskQ);
}
if(pool->threadIDs)
{
free(pool->threadIDs);
}
pthread_mutex_destroy(&pool->mutexPool);
pthread_mutex_destroy(&pool->mutexBusy);
pthread_cond_destroy(&pool->notEmpty);
pthread_cond_destroy(&pool->notFull);
free(pool);
pool = NULL;
return 0;
}
//Producteurs
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{
pthread_mutex_lock(&pool->mutexPool);
while(pool->queueSize == pool->queueCapacity && !pool->shutdown)
{
//Blocage des fils du producteur
pthread_cond_wait(&pool->notFull, &pool->mutexPool);
}
if(pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
return;
}
//Ajouter une tâche
pool->taskQ[pool->queueRear].function = func;
pool->taskQ[pool->queueRear].arg = arg;
pool->queueRear = (pool->queueRear+1) % pool->queueCapacity;
pool->queueSize++;
pthread_cond_signal(&pool->notEmpty);
pthread_mutex_unlock(&pool->mutexPool);
}
int threadPoolBusyNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
return busyNum;
}
int threadPoolAliveNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexPool);
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
return liveNum;
}
void* worker(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
while(1)
{
pthread_mutex_lock(&pool->mutexPool);
// La tâche actuelle est - elle vide?
//PourquoiwhileJuger au lieu deifJugement?
//Parce que, Si plus d'un thread est bloqué , Un déverrouillage arrière , Les autres doivent déterminer si la file d'attente est vide dans la boucle ,
// Sinon, la file d'attente est vide.
while(pool->queueSize == 0 && !pool->shutdown)
{
// Blocage des fils de travail
pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
// Déterminer si elle doit être détruite
if(pool->exitNum>0)
{
pool->exitNum--;
if(pool->liveNum > pool->minNum)
{
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
}
}
// Déterminer si le pool de fils est fermé
// Les Threads bloqués sont également réveillés dans la destruction du pool de threads , À ce stade, la file d'attente des tâches se termine, que la file d'attente soit vide ou non.
// Il faut donc ajouter ceci. ifJugement
if(pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
//Retirer une tâche de la file d'attente des tâches
Task task;
task.function = pool->taskQ[pool->queueFront].function;
task.arg = pool->taskQ[pool->queueFront].arg;
//Déplacer le noeud de tête
pool->queueFront = (pool->queueFront+1) % pool->queueCapacity;
pool->queueSize--;
//Déverrouiller
pthread_cond_signal(&pool->notFull);
pthread_mutex_unlock(&pool->mutexPool);
printf("thread start working...\n");
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexBusy);
task.function(task.arg);
//(*task.function)(task.arg);
free(task.arg);
task.arg = NULL;
printf("thread end working...\n");
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexBusy);
}
return NULL;
}
void* manager(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
//shutdown Seuls les Threads Manager sont modifiés ,Il n'est donc pas nécessaire d'ajouter une serrure
while(!pool->shutdown)
{
//Tous les3sTestez une fois
sleep(3);
//Extraire le nombre de tâches dans le pool de Threads et le nombre de Threads actuels
pthread_mutex_lock(&pool->mutexPool);
int queueSize = pool->queueSize;
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
//Nombre de Threads occupés récupérés
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
//Ajouter un fil
//Nombre de tâches>Nombre de fils survivants && Nombre de fils survivants<Nombre maximum de fils
if(queueSize>liveNum && liveNum<pool->maxNum)
{
pthread_mutex_lock(&pool->mutexPool);
int counter = 0;
for(int i=0; i<pool->maxNum && counter<NUMBER && pool->liveNum<pool->maxNum; i++)
{
if(pool->threadIDs[i] == 0)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
counter++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexPool);
}
//Détruire le thread
//Fils occupés*2<Nombre de fils survivants && Fils vivants>Nombre minimum de fils
if(busyNum*2<liveNum && liveNum>pool->minNum)
{
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPool);
//Laissez les fils de travail se suicider
for(int i=0; i<NUMBER; i++)
{
pthread_cond_signal(&pool->notEmpty);
}
}
}
}
void threadExit(ThreadPool* pool)
{
pthread_t tid = pthread_self();
for(int i=0; i<pool->maxNum; i++)
{
if(pool->threadIDs[i] == tid)
{
pool->threadIDs[i] = 0;
printf("%s called, %ld exiting...\n", __FUNCTION__, tid);
break;
}
}
pthread_exit(NULL);
}
Code d'essai
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>
#include"threadpool.h"
void taskFunc(void* arg)
{
int num = *(int*)arg;
printf("thread %ld is working, number = %d\n", pthread_self(), num);
sleep(1);
}
int main()
{
// Créer un pool de threads
ThreadPool* pool = threadPoolCreate(3, 10, 100);
for(int i=0; i<100; i++)
{
int* num = (int*)malloc(sizeof(int));
*num = i + 100;
threadPoolAdd(pool, taskFunc, num);
}
sleep(30);
threadPoolDestroy(pool);
return 0;
}
边栏推荐
- Network security - layer 3 switching technology and internal network planning
- Lt268 the most convenient TFT-LCD serial port screen chip in the whole network
- 2022年6月29日--使用C#迈出第一步--使用 C# 中的“if”、“else”和“else if”语句向代码添加决策逻辑
- Detailed analysis of message signals occupying multiple bytes and bits
- 【已解决】ERROR 1290 (HY000): Unknown error 1290
- Merge: extension click the El table table data to expand
- SwiftUI打造一款美美哒自定义按压反馈按钮
- Determine whether the picture is in JPG picture format
- 对占用多字节和位的报文信号解析详解
- 网络安全-抓包和IP包头分析
猜你喜欢
Deploying web projects using idea
[datawhale team learning] task02: mathematical operation, string and text, list
SwiftUI打造一款美美哒自定义按压反馈按钮
1285_ Expand macros defined by AUTOSAR functions and variables with scripts to improve readability
Xshell transfer file
Vs2019 and SQL
Egret P2 physical engine (1) small ball falling demo
Stm32g0 porting FreeRTOS
Class templates and friends
记录开发过程中无法使用管理员身份修改系统文件问题
随机推荐
Goland常用快捷键设置
Essence of signal slot macros signal and slot
Network security ARP protocol and defense
TC397 QSPI(CPU)
Four great happenings on earth
Merge: extension click the El table table data to expand
【SemiDrive源码分析】【X9芯片启动流程】34 - RTOS侧 Display模块 sdm_display_init 显示初始化源码分析
Utilisation de la commande grep
Daemon and user threads
[implemented] server jar package startup script and shell script
vs2019和sql
nRF52832 GPIO LED
Base64 encoding method implemented by native JS
安装Go语言开发工具
Linux服務器安裝Redis
网络安全-三层交换技术和内部网络规划
What if I don't know what to do after graduating from university?
网络安全-路由原理
2021-07-02
Deploying web projects using idea