当前位置:网站首页>Learning records of thread pool
Learning records of thread pool
2022-06-09 22:02:00 【Gy648】
List of articles
One 、 What is a thread pool ?
Thread pool is a method for , The technology of thread reuse , When we need to use multithreading to complete our work , For the frequency of threads establish And The destruction It's going to cost a lot , To reduce this overhead, we use this method ,
We need to make sure that our threads are in a number that can maintain normal operation , Will not collapse because of too many peak tasks , Not too many threads will be idle and occupy resources because of a small number of jobs
Two 、 analysis
1. For a thread pool, the body should be roughly composed of three parts
- The task queue used to store the tasks to be processed
- Threads of work (worker) Used to process a single task , Keep checking the task queue , If there's a task, deal with it , If not, it's blocked
- Manager thread (manager) It is mainly used to detect the number of threads and tasks of the whole thread , If the number of threads is more than the task to be processed , Then reduce the number of some threads to reduce the overhead , If there are many, the opposite is true
2. Module analysis
1.main()
Creating a thread pool
Add tasks to the thread pool , Processing tasks with callback functions
Destroy thread pool
2.pthreadpool_create()
Create thread pool structure pointer
Initialize the thread pool structure (n A variable )
establish N Task threads
establish 1 A manager thread
Destroy all creation spaces on failure
3.pthread_destory
For the closing attribute, the parameter is set to 1
Recycle manager thread
Wake up blocked consumer threads , Let him commit suicide
Free heap memory , Destroy lock variables
Release the thread pool structure pointer
4.pthread_add
Add tasks to the task queue
5.threadeixt
Let the unqualified thread exit
The rest may need to be recreated or subdivided
3、 ... and 、 Simulation creation
- For task queue creation
typedef struct Task
{
void (*function)(void *arg); // A callback function
void *arg; // Pass parameters to the callback function above
} Task;
The task queue is basically composed of callback functions , In this way, you can call more flexibly , There is no need to convert because of different parameters , The second parameter is to pass parameters to the callback function
- Thread pool structure
struct ThreadPool
{
// Definition of the properties associated with the task queue
Task *taskQ; // Use the structure of the array to simulate the queue
int queueCapacity; // Capacity
int queueSize; // Number of current tasks
int queueFront; // Team head -> Take the data
int queueRear; // A party -> Put the data
// Management of thread pool parameters
pthread_t managerID; // Manager thread ID
pthread_t *threadIDs; // Threads of work ID The working thread id Through array management
int minNum; // Minimum number of threads , Create the minimum number of threads to work on at the beginning
int maxNum; // Maximum number of threads
int busyNum; // Number of working threads
int liveNum; // Number of surviving threads
int exitNum; // Number of threads to destroy
pthread_mutex_t mutexPool; // Lock the entire thread pool
pthread_mutex_t mutexBusy; // Lock locks variables that often need to be read and changed in the critical area
pthread_cond_t notFull; // Is the task queue full
pthread_cond_t notEmpty; // Is the task queue empty
int shutdown; // Do you want to destroy the thread pool , Destroy as 1, Not destroyed as 0
};
typedef struct ThreadPool ThreadPool;
- Origination
ThreadPool* threadPoolCreate(int min, int max, int queueSize)
{
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
do
{
if (pool == NULL)
{
printf("malloc threadpool fail...\n");
break;
}
// For thread arrays id To initialize
pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
if (pool->threadIDs == NULL)
{
printf("malloc threadIDs fail...\n");
break;
}
memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
pool->minNum = min;
pool->maxNum = max;
pool->busyNum = 0;
pool->liveNum = min; // Equal to the minimum number
pool->exitNum = 0;
if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
pthread_cond_init(&pool->notFull, NULL) != 0)
{
printf("mutex or condition init fail...\n");
break;
}
// Task queue
pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);
pool->queueCapacity = queueSize;
pool->queueSize = 0; // The initial task is 0
pool->queueFront = 0;// The head and tail pointers point to the head
pool->queueRear = 0;
pool->shutdown = 0;
// Create thread
pthread_create(&pool->managerID, NULL, manager, pool);
for (int i = 0; i < min; ++i)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
}
return pool;
} while (0);
// Release resources
// If the memory request in any of the above steps is wrong, the previously requested resources will be released
if (pool && pool->threadIDs) free(pool->threadIDs);
if (pool && pool->taskQ) free(pool->taskQ);
if (pool) free(pool);
return NULL;
}
min Minimum number of threads created for max Maximum number of threads
queueusize For the number of tasks
- Add tasks to the thread pool
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{
pthread_mutex_lock(&pool->mutexPool);
while (pool->queueSize == pool->queueCapacity && !pool->shutdown)
{
// Block producer thread
pthread_cond_wait(&pool->notFull, &pool->mutexPool);
}
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
return;
}
// Add tasks
pool->taskQ[pool->queueRear].function = func;
pool->taskQ[pool->queueRear].arg = arg;
pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
pool->queueSize++;
pthread_cond_signal(&pool->notEmpty);
pthread_mutex_unlock(&pool->mutexPool);
}
The first parameter is the thread pool structure pointer , The second parameter is to add a callback function to the thread pool structure , The third is to pass parameters into the callback function
- Functions that perform tasks
void* worker(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
while (1)
{
pthread_mutex_lock(&pool->mutexPool);
// Whether the current task queue is empty
while (pool->queueSize == 0 && !pool->shutdown)
{
// Blocking worker threads
pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
// Determine whether to destroy the thread
// The current thread condition if the symbol is reduced
// when , Set the reduced number of threads
if (pool->exitNum > 0)
{
pool->exitNum--;
if (pool->liveNum > pool->minNum)
{
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
}
}
// Judge whether the thread pool is closed
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
// Take a task out of the task queue
Task task;
task.function = pool->taskQ[pool->queueFront].function;
task.arg = pool->taskQ[pool->queueFront].arg;
// Move the head node
pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
pool->queueSize--;
// Unlock
pthread_cond_signal(&pool->notFull);
pthread_mutex_unlock(&pool->mutexPool);
printf("thread %ld start working...\n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexBusy);
task.function(task.arg);
free(task.arg);
task.arg = NULL;
printf("thread %ld end working...\n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexBusy);
}
return NULL;
}
- Manager thread
void* manager(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
while (!pool->shutdown)
{
// every other 5s Test once
sleep(5);
// Take out the number of tasks in the thread pool and the number of current threads
pthread_mutex_lock(&pool->mutexPool);
int queueSize = pool->queueSize;
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
// Get the number of busy threads
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
// Add thread
// The number of tasks > Number of threads alive && Number of threads alive < Maximum number of threads
if (queueSize > liveNum && liveNum < pool->maxNum)
{
pthread_mutex_lock(&pool->mutexPool);
int counter = 0;
for (int i = 0; i < pool->maxNum && counter < NUMBER
&& pool->liveNum < pool->maxNum; ++i)
{
if (pool->threadIDs[i] == 0)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
counter++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexPool);
}
// Destruction of the thread
// Busy thread *2 < Number of threads alive && Surviving threads > Minimum number of threads
if (busyNum * 2 < liveNum && liveNum > pool->minNum)
{
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPool);
// Let the working thread commit suicide
for (int i = 0; i < NUMBER; ++i)
{
pthread_cond_signal(&pool->notEmpty);
}
}
}
return NULL;
}
The task of the manager thread is mainly to check the number of tasks and threads , Determine whether to increase or decrease threads , More specific conditions for reducing and increasing judgments can be set according to the conditions
- Set the function to reduce the number of threads
void threadExit(ThreadPool* pool)
{
pthread_t tid = pthread_self();
for (int i = 0; i < pool->maxNum; ++i)
{
if (pool->threadIDs[i] == tid)
{
pool->threadIDs[i] = 0;
printf("threadExit() called, %ld exiting...\n", tid);
break;
}
}
pthread_exit(NULL);
}
The main function is based on the previously set thread array id Push the thread
Complete code
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include<unistd.h>
const int NUMBER = 0;
void *manager(void *arg);
void *worker(void *arg);
typedef struct ThreadPool ThreadPool;
// Create a thread pool and initialize
ThreadPool *threadPoolCreate(int min, int max, int queueSize);
// Destroy thread pool
int threadPoolDestroy(ThreadPool* pool);
// Add tasks to the thread pool
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);
// Get the number of threads working in the thread pool
int threadPoolBusyNum(ThreadPool* pool);
// Get the number of live threads in the thread pool
int threadPoolAliveNum(ThreadPool* pool);
//
// Threads of work ( Consumer thread ) Task function
void* worker(void* arg);
// Manager thread task function
void* manager(void* arg);
// A single thread exits
void threadExit(ThreadPool* pool);
// Task structure
typedef struct Task
{
void (*function)(void *arg);
void *arg;
} Task;
// Thread pool structure
struct ThreadPool
{
// Task queue
Task *taskQ;
int queueCapacity; // Capacity
int queueSize; // Number of current tasks
int queueFront; // Team head -> Take the data
int queueRear; // A party -> Put the data
pthread_t managerID; // Manager thread ID
pthread_t *threadIDs; // Threads of work ID
int minNum; // Minimum number of threads
int maxNum; // Maximum number of threads
int busyNum; // Number of busy threads
int liveNum; // Number of surviving threads
int exitNum; // Number of threads to destroy
pthread_mutex_t mutexPool; // Lock the entire thread pool
pthread_mutex_t mutexBusy; // lock busyNum Variable
pthread_cond_t notFull; // Is the task queue full
pthread_cond_t notEmpty; // Is the task queue empty
int shutdown; // Do you want to destroy the thread pool , Destroy as 1, Not destroyed as 0
};
typedef struct ThreadPool ThreadPool;
// Create a thread pool and initialize
void perr_exit(const char *S)
{
perror(S);
exit(1);
}
ThreadPool* threadPoolCreate(int min, int max, int queueSize)
{
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
do
{
if (pool == NULL)
{
printf("malloc threadpool fail...\n");
break;
}
pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
if (pool->threadIDs == NULL)
{
printf("malloc threadIDs fail...\n");
break;
}
memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
pool->minNum = min;
pool->maxNum = max;
pool->busyNum = 0;
pool->liveNum = min; // Equal to the minimum number
pool->exitNum = 0;
if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
pthread_cond_init(&pool->notFull, NULL) != 0)
{
printf("mutex or condition init fail...\n");
break;
}
// Task queue
pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);
pool->queueCapacity = queueSize;
pool->queueSize = 0;
pool->queueFront = 0;
pool->queueRear = 0;
pool->shutdown = 0;
// Create thread
pthread_create(&pool->managerID, NULL, manager, pool);
for (int i = 0; i < min; ++i)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
}
return pool;
} while (0);
// Release resources
if (pool && pool->threadIDs) free(pool->threadIDs);
if (pool && pool->taskQ) free(pool->taskQ);
if (pool) free(pool);
return NULL;
}
int threadPoolDestroy(ThreadPool* pool)
{
if (pool == NULL)
{
return -1;
}
// Close thread pool
pool->shutdown = 1;
// Blocking the recycle manager thread
pthread_join(pool->managerID, NULL);
// Wake up blocked consumer threads
for (int i = 0; i < pool->liveNum; ++i)
{
pthread_cond_signal(&pool->notEmpty);
}
// Free heap memory
if (pool->taskQ)
{
free(pool->taskQ);
}
if (pool->threadIDs)
{
free(pool->threadIDs);
}
pthread_mutex_destroy(&pool->mutexPool);
pthread_mutex_destroy(&pool->mutexBusy);
pthread_cond_destroy(&pool->notEmpty);
pthread_cond_destroy(&pool->notFull);
free(pool);
pool = NULL;
return 0;
}
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{
pthread_mutex_lock(&pool->mutexPool);
while (pool->queueSize == pool->queueCapacity && !pool->shutdown)
{
// Block producer thread
pthread_cond_wait(&pool->notFull, &pool->mutexPool);
}
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
return;
}
// Add tasks
pool->taskQ[pool->queueRear].function = func;
pool->taskQ[pool->queueRear].arg = arg;
pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
pool->queueSize++;
pthread_cond_signal(&pool->notEmpty);
pthread_mutex_unlock(&pool->mutexPool);
}
int threadPoolBusyNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
return busyNum;
}
int threadPoolAliveNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexPool);
int aliveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
return aliveNum;
}
void* worker(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
while (1)
{
pthread_mutex_lock(&pool->mutexPool);
// Whether the current task queue is empty
while (pool->queueSize == 0 && !pool->shutdown)
{
// Blocking worker threads
pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
// Determine whether to destroy the thread
if (pool->exitNum > 0)
{
pool->exitNum--;
if (pool->liveNum > pool->minNum)
{
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
}
}
// Judge whether the thread pool is closed
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
// Take a task out of the task queue
Task task;
task.function = pool->taskQ[pool->queueFront].function;
task.arg = pool->taskQ[pool->queueFront].arg;
// Move the head node
pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
pool->queueSize--;
// Unlock
pthread_cond_signal(&pool->notFull);
pthread_mutex_unlock(&pool->mutexPool);
printf("thread %ld start working...\n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexBusy);
task.function(task.arg);
free(task.arg);
task.arg = NULL;
printf("thread %ld end working...\n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexBusy);
}
return NULL;
}
void* manager(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
while (!pool->shutdown)
{
// every other 3s Test once
sleep(3);
// Take out the number of tasks in the thread pool and the number of current threads
pthread_mutex_lock(&pool->mutexPool);
int queueSize = pool->queueSize;
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
// Get the number of busy threads
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
// Add thread
// The number of tasks > Number of threads alive && Number of threads alive < Maximum number of threads
if (queueSize > liveNum && liveNum < pool->maxNum)
{
pthread_mutex_lock(&pool->mutexPool);
int counter = 0;
for (int i = 0; i < pool->maxNum && counter < NUMBER
&& pool->liveNum < pool->maxNum; ++i)
{
if (pool->threadIDs[i] == 0)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
counter++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexPool);
}
// Destruction of the thread
// Busy thread *2 < Number of threads alive && Surviving threads > Minimum number of threads
if (busyNum * 2 < liveNum && liveNum > pool->minNum)
{
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPool);
// Let the working thread commit suicide
for (int i = 0; i < NUMBER; ++i)
{
pthread_cond_signal(&pool->notEmpty);
}
}
}
return NULL;
}
void threadExit(ThreadPool* pool)
{
pthread_t tid = pthread_self();
for (int i = 0; i < pool->maxNum; ++i)
{
if (pool->threadIDs[i] == tid)
{
pool->threadIDs[i] = 0;
printf("threadExit() called, %ld exiting...\n", tid);
break;
}
}
pthread_exit(NULL);
}
void taskFunc(void *arg)
{
int num = *(int *)arg;
printf("thread %ld is working, number = %d\n",
pthread_self(), num);
sleep(1);
}
int main()
{
// Creating a thread pool
ThreadPool *pool = threadPoolCreate(3, 10, 100);
for (int i = 0; i < 100; ++i)
{
int *num = (int *)malloc(sizeof(int));
*num = i + 100;
threadPoolAdd(pool, taskFunc, num);
}
sleep(30);
threadPoolDestroy(pool);
return 0;
}
边栏推荐
- 保存和复制绘图时保留最少的空白
- 浏览器从下载到渲染整个流程
- Table structure query statements commonly used in PostgreSQL recently
- Thread pool
- 深入理解 Go Modules 的 go.mod 與 go.sum
- JS basic data type and reference data type
- The 14th Sudoku - true Sudoku - day 5-20220120
- Unity get the content information of XML file
- chez scheme 环境搭建
- List of resources of yimai.com development interest buying project 0605
猜你喜欢

SPIDER Pi Intelligent Vision hexapode Robot VNC Connect Robot 0603

STM32 memory knowledge

Comment localiser la raison de la montée en flèche du processeur du serveur

Alternative scheme for electric energy acquisition terminal of Langer ffc3 watt hour meter (DLMS infrared photoelectric collector)

Scratch Programming flying birds Development Notes 0604

Wechat applet obtains provincial and urban information according to longitude and latitude

Unity get the content information of XML file

原型对象的应用 给数组对象强化功能 0526

Spider PI intelligent vision hexapod robot VNC connecting robot 0603

MFC connection database shows no data source name found and no default driver specified
随机推荐
邦纳雷达传感器Q120RAQ-CN-AF19719
Modbus协议与SerialPort端口读写
Spider PI intelligent vision hexapod robot VNC connecting robot 0603
【luogu P8330】众数(根号分治)
Spider PI intelligent vision hexapod robot patrol function 0603
node.js 连接sqlserver封装mssql
Express order information extraction [3] - five labeled data to improve accuracy. Only five labeled samples are required to quickly complete the express order information task
Database SQL and Gorm practice of design pattern | notes on youth training camp
Comprendre le go des modules go. MOD et go. SUM
Understand go modules' go Mod and go sum
Basic use of WinForm programming control treeview tree view 20220527
mysql(mariadb)无法打开,报错:找不到mysqld.sock且Can‘t connect to MySQL server on 127.0.0.1(111)
如何定位到服务器CPU飙高的原因
Spider PI intelligent vision hexapod robot tag recognition apirltag tag 0604
How image search works in Dropbox
购买内存条前的行动
保存和复制绘图时保留最少的空白
How to search for keywords in Oracle tables?
Online salon | open source show -- Application Practice of database technology
Day5-t2029 & T39 -2022-01-20-not answer by yourself