当前位置:网站首页>Thread pool - C language
Thread pool - C language
2022-06-30 07:19:00 【Programming segment】
Find a thread pool on the Internet to explain very detailed information , Learn the author's documents and video materials and follow the video hand to tap the code , Links are as follows :
Thread pool introduction and video explanation
List of articles
threadpool.h
#ifndef _THREADPOOL_H
#define _THREADPOOL_H
typedef struct ThreadPool ThreadPool;
// Create a thread pool and initialize
ThreadPool* threadPoolCreate(int min, int max, int queueCapacity);
// Destroy thread pool
int threadPoolDestroy(ThreadPool* pool);
// Add tasks to the thread pool
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);
// Get the number of threads working in the thread pool
int threadPoolBusyNum(ThreadPool* pool);
// Get the number of threads alive in the thread pool
int threadPoolAliveNum(ThreadPool* pool);
void* worker(void* arg);
void* manager(void* arg);
void threadExit(ThreadPool* pool);
#endif
threadpool.c
#include"threadpool.h"
#include<pthread.h>
#include<stdlib.h>
#include<stdio.h>
#include<string.h>
#include<unistd.h>
const int NUMBER = 2;
// Task structure
typedef struct Task
{
void (*function)(void* arg);
void* arg;
}Task;
// Thread pool structure
struct ThreadPool
{
// Task queue
Task* taskQ;
int queueCapacity; // Capacity
int queueSize; // Number of current tasks
int queueFront; // Team head -> Take the data
int queueRear; // A party -> Put the data
pthread_t managerID; // Manager thread ID
pthread_t* threadIDs; // Threads of work ID
int minNum; // Minimum number of threads
int maxNum; // Maximum number of threads
int busyNum; // Number of busy threads
int liveNum; // Number of surviving threads
int exitNum; // Number of threads to kill
pthread_mutex_t mutexPool; // Lock the entire thread pool
pthread_mutex_t mutexBusy; // lock busyNum Variable
pthread_cond_t notFull; // Is the task queue full
pthread_cond_t notEmpty; // Is the task queue empty
int shutdown; // Do you want to destroy the thread pool , Destroy as 1, Not destroyed as 0
};
ThreadPool* threadPoolCreate(int min, int max, int queueCapacity)
{
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
do
{
if(pool == NULL)
{
printf("malloc threadpool fail...\n");
break;
}
pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t)*max);
if(pool->threadIDs == NULL)
{
printf("malloc threadIDs fail...\n");
break;
}
memset(pool->threadIDs, 0, sizeof(pthread_t));
pool->minNum = min;
pool->maxNum = max;
pool->busyNum = 0;
pool->liveNum = min; // Equal to the minimum number
pool->exitNum = 0;
if( pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
pthread_cond_init(&pool->notFull, NULL) != 0 )
{
printf("mutex or condition init fail...\n");
break;
}
// Task queue
pool->taskQ = (Task*)malloc(sizeof(Task) * queueCapacity);
pool->queueCapacity = queueCapacity;
pool->queueSize = 0;
pool->queueFront = 0;
pool->queueRear = 0;
pool->shutdown = 0;
// Create thread
pthread_create(&pool->managerID, NULL, manager, pool);
for(int i=0; i<min; i++)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
}
return pool;
}while(0);
// Release resources
if(pool && pool->threadIDs) free(pool->threadIDs);
if(pool && pool->taskQ) free(pool->taskQ);
if(pool) free(pool);
return NULL;
}
int threadPoolDestroy(ThreadPool* pool)
{
if(pool == NULL)
{
return -1;
}
// Close thread pool
pool->shutdown = 1;
// Blocking the recycle manager thread
pthread_join(pool->managerID, NULL);
// Wake up blocked consumer threads
for(int i=0; i<pool->liveNum; i++)
{
pthread_cond_signal(&pool->notEmpty);
}
// Free heap memory
if(pool->taskQ)
{
free(pool->taskQ);
}
if(pool->threadIDs)
{
free(pool->threadIDs);
}
pthread_mutex_destroy(&pool->mutexPool);
pthread_mutex_destroy(&pool->mutexBusy);
pthread_cond_destroy(&pool->notEmpty);
pthread_cond_destroy(&pool->notFull);
free(pool);
pool = NULL;
return 0;
}
// producer
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{
pthread_mutex_lock(&pool->mutexPool);
while(pool->queueSize == pool->queueCapacity && !pool->shutdown)
{
// Block producer thread
pthread_cond_wait(&pool->notFull, &pool->mutexPool);
}
if(pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
return;
}
// Add tasks
pool->taskQ[pool->queueRear].function = func;
pool->taskQ[pool->queueRear].arg = arg;
pool->queueRear = (pool->queueRear+1) % pool->queueCapacity;
pool->queueSize++;
pthread_cond_signal(&pool->notEmpty);
pthread_mutex_unlock(&pool->mutexPool);
}
int threadPoolBusyNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
return busyNum;
}
int threadPoolAliveNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexPool);
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
return liveNum;
}
void* worker(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
while(1)
{
pthread_mutex_lock(&pool->mutexPool);
// Whether the current task is empty
// Why while Judge, not if Judge ?
// because , If there are multiple threads blocking , A backward execution after unlocking , Others must judge whether the queue is empty in the loop ,
// Otherwise, the queue is empty, and the execution will continue
while(pool->queueSize == 0 && !pool->shutdown)
{
// Blocked working threads
pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
// Judge whether to destroy
if(pool->exitNum>0)
{
pool->exitNum--;
if(pool->liveNum > pool->minNum)
{
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
}
}
// Judge whether the thread pool is closed
// The thread pool destruction will also wake up all blocked threads , At this time, the task queue will end regardless of whether it is empty
// So we have to add this if Judge
if(pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
// Take a task out of the task queue
Task task;
task.function = pool->taskQ[pool->queueFront].function;
task.arg = pool->taskQ[pool->queueFront].arg;
// Mobile head node
pool->queueFront = (pool->queueFront+1) % pool->queueCapacity;
pool->queueSize--;
// Unlock
pthread_cond_signal(&pool->notFull);
pthread_mutex_unlock(&pool->mutexPool);
printf("thread start working...\n");
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexBusy);
task.function(task.arg);
//(*task.function)(task.arg);
free(task.arg);
task.arg = NULL;
printf("thread end working...\n");
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexBusy);
}
return NULL;
}
void* manager(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
//shutdown Only the manager thread can modify , So there's no need to lock
while(!pool->shutdown)
{
// every other 3s Test once
sleep(3);
// Take out the number of tasks in the thread pool and the number of current threads
pthread_mutex_lock(&pool->mutexPool);
int queueSize = pool->queueSize;
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
// Get the number of busy threads
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
// Add thread
// The number of tasks > Number of threads alive && Number of threads alive < Maximum number of threads
if(queueSize>liveNum && liveNum<pool->maxNum)
{
pthread_mutex_lock(&pool->mutexPool);
int counter = 0;
for(int i=0; i<pool->maxNum && counter<NUMBER && pool->liveNum<pool->maxNum; i++)
{
if(pool->threadIDs[i] == 0)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
counter++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexPool);
}
// Destruction of the thread
// Busy thread *2< Number of threads alive && Surviving threads > Minimum number of threads
if(busyNum*2<liveNum && liveNum>pool->minNum)
{
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPool);
// Let the working thread commit suicide
for(int i=0; i<NUMBER; i++)
{
pthread_cond_signal(&pool->notEmpty);
}
}
}
}
void threadExit(ThreadPool* pool)
{
pthread_t tid = pthread_self();
for(int i=0; i<pool->maxNum; i++)
{
if(pool->threadIDs[i] == tid)
{
pool->threadIDs[i] = 0;
printf("%s called, %ld exiting...\n", __FUNCTION__, tid);
break;
}
}
pthread_exit(NULL);
}
Test code
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>
#include"threadpool.h"
void taskFunc(void* arg)
{
int num = *(int*)arg;
printf("thread %ld is working, number = %d\n", pthread_self(), num);
sleep(1);
}
int main()
{
// Create a thread pool
ThreadPool* pool = threadPoolCreate(3, 10, 100);
for(int i=0; i<100; i++)
{
int* num = (int*)malloc(sizeof(int));
*num = i + 100;
threadPoolAdd(pool, taskFunc, num);
}
sleep(30);
threadPoolDestroy(pool);
return 0;
}

边栏推荐
- The class imported by idea import clearly exists, but it is red?
- failed to create symbolic link ‘/usr/bin/mysql’: File exists
- [resolved] MySQL exception: error 1045 (28000): unknown error 1045, forgetting the initial password
- nRF52832 GPIO LED
- vs2019和sql
- 社招两年半10个公司28轮面试面经
- Linux服務器安裝Redis
- [introduction to Expo application] v Expert recommendation letter template
- Skillfully use 5 keys to improve office efficiency
- 【已解决】MySQL异常:ERROR 1045 (28000): Unknown error 1045,忘记初始密码
猜你喜欢
随机推荐
Qstring to const char*
MySQL encounters the problem of expression 1 of select list is not in group by claim and contains nonaggre
已解决:initialize specified but the data directory has files in it. Aborting
Resolved: initialize specified but the data directory has files in it Aborting
动态内存管理
JS widget wave JS implementation of wave progress bar animation style
单测调用对象的私有方法
Install go language development tools
[implemented] server jar package startup script and shell script
Detailed methods for copying local computer files to virtual machine system
The class imported by idea import clearly exists, but it is red?
grep命令用法
[solved] failed! Error: Unknown error 1130
2021-07-02
[most complete] install MySQL on a Linux server
Golan common shortcut key settings
All errors reported by NPM
Use of ecostruxure (3) creating composite function blocks
June 29, 2022 -- take the first step with C # -- add decision logic to the code using the "if", "else" and "else if" statements in C #
Merge: extension click the El table table data to expand








